From: Olivier Matz Date: Wed, 28 Aug 2019 21:52:42 +0000 (+0200) Subject: add command line X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=93a3052dccc40b20e4a7d0aa90b0c2286aae2ce8;p=red-alert.git add command line --- diff --git a/cmdline/MANIFEST.in b/cmdline/MANIFEST.in new file mode 100644 index 0000000..4210178 --- /dev/null +++ b/cmdline/MANIFEST.in @@ -0,0 +1,6 @@ +include README +include MANIFEST.in +graft redalert +global-exclude *.pyc +global-exclude *.pyo +global-exclude *~ diff --git a/cmdline/README b/cmdline/README new file mode 100644 index 0000000..874a0a5 --- /dev/null +++ b/cmdline/README @@ -0,0 +1,8 @@ +Python3 library and command line for red-alert device +===================================================== + +The library provides helpers to send commands to the red-alert device +through the USB-serial interface, to control the LEDs and the buzzer. + +The command line interface exposes these commands in a shell, for +testing/debug purposes. diff --git a/cmdline/redalert/__init__.py b/cmdline/redalert/__init__.py new file mode 100644 index 0000000..fa5f3eb --- /dev/null +++ b/cmdline/redalert/__init__.py @@ -0,0 +1,503 @@ +#! /usr/bin/env python3 + +# +# SPDX-License-Identifier: BSD-3-Clause +# Copyright 2019, Olivier MATZ +# + +# XXX setup.py with pyserial +# XXX lib +# XXX arg parser +# XXX fix help (12 bits, led width, ...) +# XXX pylint + +import atexit +import cmd +import logging +import os +import readline +from select import select +import struct +import sys +import threading +import time + +import serial + +SPM_PAGE_SIZE = 128 + +logging.basicConfig() +DEV_LOG = logging.getLogger('device') +DEV_LOG.setLevel(logging.DEBUG) +LOG = logging.getLogger('local') +LOG.setLevel(logging.DEBUG) + +class ProgrammingError(Exception): + """ + Raised when an error occurs while programming. + """ + def __init__(self, message, exc=None): + Exception.__init__(self, message) + print("XXXXXXXXXXXXX") + print(message) + self.message = message + self.exc = exc + + def __str__(self): + return '%s (%d)' % (self.message, self.errno) + + def __repr__(self): + return '%s (%d)' % (self.message, self.errno) + +class RedAlertSerial(serial.Serial): # pylint: disable=too-many-ancestor + """ + Communication with red-alert device through serial line. + """ + CMD_LED_DEBUG = 0 + CMD_BUZZER = 1 + CMD_LED_GYRO_ALL = 2 + CMD_LED_GYRO_RAY = 3 + CMD_RESET = 5 + CMD_BUZZER_VIBRATO = 6 + VIBRATO_SAW = 1 + VIBRATO_TRIANGLE = 2 + VIBRATO_SIN = 3 + VIBRATO_COS = 4 + VIBRATO_SQUARE = 5 + read_lock = threading.Lock() + logbuf = '' + + def __init__(self, *args, log=True, **kwargs): + super().__init__(*args, **kwargs) + if log: + thread = threading.Thread(target=self._log_thread) + thread.daemon = True + thread.start() + + def _log_thread(self): + """ + Executed in a thread, it reads messages sent by the red-alert + device and display it as logs. + """ + while True: + _, _, _ = select([self], [], []) + # something is readable, retry with lock held + self.read_lock.acquire() + try: + ins, _, _ = select([self], [], [], 0) + if not ins: + return + self.logbuf += self.read().decode() + # only log after \n + splitted = self.logbuf.split('\n') + for line in splitted[:-1]: + if line.strip(): + DEV_LOG.debug(line) + self.logbuf = splitted[-1] + finally: + self.read_lock.release() + + def _send_cmd(self, command, args): + """ + Send a command to the device. + """ + length = len(args) + 3 + array = bytes([command, length]) + args + csum = sum(array) + csum = (csum >> 8) + (csum & 0xff) + csum = (csum >> 8) + (csum & 0xff) + csum = (~csum) & 0xff + array += bytes([csum]) + self.write(array) + + def led_debug(self, on): + """ + Switch debug led on or off. + + :arg bool on: + True to switch the led on. + """ + self._send_cmd(self.CMD_LED_DEBUG, struct.pack('B', on)) + + def buzzer(self, period_us, duration_ms): + """ + Emit a tone on the buzzer. + + :arg int period_us: + The tone period in microseconds (ex: 500 for a 2Khz tone). + :arg int duration_ms: + The tone duration in milliseconds. + """ + self._send_cmd(self.CMD_BUZZER, + struct.pack('>HH', period_us, duration_ms)) + + def buzzer_vibrato(self, period1_us, period2_us, duration_ms, + vibrato_period_ms, vibrato_type): + """ + Emit a tone on the buzzer, with a vibrato effect (siren). + + :arg int period1_us: + The first tone period in microseconds (ex: 500 for a 2Khz tone). + :arg int period2_us: + The second tone period in microseconds (ex: 500 for a 2Khz tone). + :arg int duration_ms: + The tone duration in milliseconds. + :arg int vibrato_period_ms: + The period of the vibrato in millisecond. + :arg int vibratotype: + The mode of the vibrato (saw, triangle, sin, ...) + """ + self._send_cmd(self.CMD_BUZZER_VIBRATO, + struct.pack('>HHHHB', period1_us, period2_us, + duration_ms, vibrato_period_ms, + vibrato_type)) + + def led_gyro(self, intensity, stop_ms): + """ + Switch on or off all gyro LEDs. + + :arg int intensity: + LEDs intensity (0 = off, 65535 = max). + :arg int stop_ms: + Time in microseconds after which the LEDs are stopped. If 0, + keep the LEDs like this forever. + """ + self._send_cmd(self.CMD_LED_GYRO_ALL, + struct.pack('>HHHH', intensity, 1000, + 0, stop_ms)) + + def led_gyro_blink(self, intensity, blink_on_ms, blink_off_ms, stop_ms): + """ + Blink all gyro LEDs. + + :arg int intensity: + LEDs intensity (0 = off, 65535 = max) when LEDs are on. + :arg int blink_on_ms: + Time in millisecond during which LEDs are switched on. + :arg int blink_off_ms: + Time in millisecond during which LEDs are switched off. + :arg int stop_ms: + Time in microseconds after which the LEDs are stopped. If 0, + keep the LEDs like this forever. + """ + self._send_cmd(self.CMD_LED_GYRO_ALL, + struct.pack('>HHHH', intensity, blink_on_ms, + blink_off_ms, stop_ms)) + + # pylint: + def led_gyro_rotate(self, angle, intensity, width, rot_speed, stop_ms): + """ + Switch on an area of gyro LEDs and perform a rotation. + + :arg int angle: + Start angle (0 to 65535). + :arg int intensity: + LEDs intensity (0 = off, 65535 = max). + :arg int width: + Width of the area to be switched on (5461 = 1 LED, 65535 = all LEDs) + :arg int rot_speed: + Rotation speed is angle per 10ms (so 655 is ~1 round/sec). + :arg int stop_ms: + Time in microseconds after which the LEDs are stopped. If 0, + continue forever. + """ + self._send_cmd(self.CMD_LED_GYRO_RAY, + struct.pack('>HHHhH', angle, intensity, width, + rot_speed, stop_ms)) + def reset(self): + """ + Send a command to reset the red-alert device. + """ + self._send_cmd(self.CMD_RESET, bytes()) + + def _djb_hash(self, buf): + """ + Hash the buffer, using a simple algorithm (the same is implemented + in the device). + """ + hval = 5381 + for char in buf: + hval = (hval * 33 + char) & 0xffffffff + return hval + + def check_hash(self, buf, offset, size): + """ + Process the hash of the buffer, ask for a hash of the flash area + on the avr, and check if value is the same. Return True if hash + is the same. + + :arg bytes() buf: + The buffer to hash. + :arg int offset: + The offset in the flash. + :arg int size: + The size of the buffer in the flash. + """ + # XXX + #if len(buf) != size: + # raise ProgrammingError('Invalid size: len(buf)=%d != size=%d' % ( + # len(buf), size)) + + # go in hash mode + self.flushInput() + self.write(b'h') + line = self.readline() + + # send addr + line = self.readline() + if not line.endswith(b'addr?\r\n'): + raise ProgrammingError('Failed to send address: <%s>' % line) + self.write(b'%x\n' % offset) + + # send size + line = self.readline() + if not line.startswith(b'size?'): + raise ProgrammingError('Failed to send size: <%s>' % line) + self.write(b'%x\n' % size) + + # compare hash + local_hash = self._djb_hash(buf[offset:offset+size]) + avr_hash = int(self.readline()[0:8], 16) + if local_hash != avr_hash: + return False + + return True + + def _prog_page(self, addr, buf): + """ + Program a page from buf at given addr. + """ + + # switch in program mode + self.flushInput() + self.write(b'p') + line = self.readline() + + # send address + line = self.readline() + if not line.endswith(b'addr?\r\n'): + raise ProgrammingError('Failed to send address: <%s>' % line) + self.write(b'%x\n' % addr) + line = self.readline() + if not line.startswith(b'ok'): + raise ProgrammingError('Failed to send address: <%s>' % line) + + # fill page with buf data + page = bytes( + [buf[i] if i < len(buf) else 0xff for i in range(SPM_PAGE_SIZE)]) + self.write(page) + + sys.stdout.write('.') + sys.stdout.flush() + + # compare hash + avr_hash = int(self.readline()[0:8], 16) + local_hash = self._djb_hash(page) + + if local_hash != avr_hash: + self.write(b'n') + raise ProgrammingError('Hash check failed: avr=%x prog=%x' % ( + avr_hash, local_hash)) + + self.write(b'y') + line = self.readline() + if not line.startswith(b'OK'): + raise ProgrammingError('Failed to program page: <%s>' % line) + + def _program(self, filename): + self._send_cmd(self.CMD_RESET, bytes()) + time.sleep(0.5) + + # XXX get default file + + with open(filename, 'rb') as bin_file: + buf = bin_file.read() + if not buf: + raise ProgrammingError('Failed to program, empty file') + + self.flushInput() + self.write(b'?') + line = self.readline() + + line = self.readline() + if not b'p:prog_page' in line: + raise ProgrammingError('Failed to program: <%s>' % line) + + self.flushInput() + + addr = 0 + while addr < len(buf): + if self.check_hash(buf, addr, SPM_PAGE_SIZE): + sys.stdout.write('*') + sys.stdout.flush() + self._prog_page(addr, buf[addr:addr+SPM_PAGE_SIZE]) + addr += SPM_PAGE_SIZE + if not self.check_hash(buf, 0, len(buf)): + raise ProgrammingError('Failed to program, invalid global hash') + + print('Done. Starting application.') + self.write(b'x') + + def program(self, filename): + """ + Program the device with the given file. + + :arg str filename: + The path to the image file, in raw binary format. + """ + self.read_lock.acquire() + try: + self._program(filename) + finally: + self.read_lock.release() + + +class RedAlertShell(cmd.Cmd): + """ + The interactive shell that controls the red-alert device. + """ + + intro = 'Welcome to the red-alert shell. Type help or ? to list commands.\n' + prompt = "red-alert> " + + def __init__(self, raserial): + super().__init__() + self.serial = raserial + + def _bad_args(self): + print('bad arguments: %r' % self.lastcmd) + self.do_help(self.lastcmd.split()[0]) + + def emptyline(self): + pass + + @classmethod + def do_quit(cls, args): # pylint: disable=unused-argument + """Exit red-alert shell.""" + return True + + def do_reset(self, args): # pylint: disable=unused-argument + """Reset the gyro device.""" + self.serial.reset() + + def do_program(self, args): + """ + Program the device. Must be in bootloader. + Syntax: program + """ + self.serial.program(args) + + def do_led_debug(self, args): + """ + Switch debug led on or off. + Syntax: led_debug on|off|blink + """ + if args == 'on': + self.serial.led_debug(1) + elif args == 'off': + self.serial.led_debug(0) + elif args == 'blink': + for _ in range(5): + self.serial.led_debug(1) + time.sleep(0.1) + self.serial.led_debug(0) + time.sleep(0.1) + else: + self._bad_args() + + def do_buzzer(self, args): + """ + Play a frequency on the buzzer + Syntax: buzzer + """ + try: + period_us, duration_ms = [int(x) for x in args.split()] + except (IndexError, ValueError): + self._bad_args() + return + self.serial.buzzer(period_us, duration_ms) + + def do_buzzer_vibrato(self, args): + """ + Play a frequency on the buzzer, changing the frequency + Syntax: buzzer_vibrato + + """ + try: + (period1_us, period2_us, duration_ms, vibrato_period_ms, + vibrato_type) = [int(x) for x in args.split()] + except (IndexError, ValueError): + self._bad_args() + return + self.serial.buzzer_vibrato(period1_us, period2_us, duration_ms, + vibrato_period_ms, vibrato_type) + + def do_led_gyro_all(self, args): + """ + Control all leds at the same time with specified intensity. + Syntax: led_gyro_all + intensity: Led intensity between 0 and 4095 + blink_ms: Time during which leds are on when blinking. If 0, don't + blink. + blink_off_ms: Time during which leds are off when blinking. If 0, + don't blink. Note that on + off times must be < 65536. + stop_ms: Time after which all the leds are switched off. If 0, + never stop. + """ + try: + intensity, blink_on_ms, blink_off_ms, stop_ms = [ + int(x) for x in args.split()] + except (IndexError, ValueError): + self._bad_args() + return + self.serial.led_gyro_blink(intensity, blink_on_ms, blink_off_ms, stop_ms) + + def do_led_gyro_ray(self, args): + """ + Switch on the leds at specified intensity in one direction. + Syntax: led_gyro_ray + angle: Angle of the ray, between 0 and 65535. + intensity: Led intensity between 0 and 4095. + width: Angle of the enabled zone, between 0 and 65535 (one led + is 6553). + rot_speed: Rotation speed is angle per 10ms (so 655 is ~1 round/sec). + stop_ms: Time after which all the leds are switched off. If 0, + never stop. + """ + try: + angle, intensity, width, rot_speed, stop_ms = [ + int(x) for x in args.split()] + except (IndexError, ValueError): + self._bad_args() + return + self.serial.led_gyro_rotate(angle, intensity, width, rot_speed, stop_ms) + +def main(): + """ + Connect to the red-alert device, and start an interactive shell + to control it. + """ + + histfile = os.path.join(os.environ["HOME"], ".redalert_history") + atexit.register(readline.write_history_file, histfile) + try: + readline.read_history_file(histfile) + except FileNotFoundError: + pass + + device = "/dev/ttyUSB0" + if len(sys.argv) > 1: + device = sys.argv[1] + raserial = RedAlertSerial(device, baudrate=57600) + shell = RedAlertShell(raserial) + while 1: + try: + shell.cmdloop() + except KeyboardInterrupt: + print() + except: # pylint: disable=bare-except + LOG.exception('Exception in mainloop') + break + +if __name__ == "__main__": + main() diff --git a/cmdline/setup.py b/cmdline/setup.py new file mode 100644 index 0000000..cdb062f --- /dev/null +++ b/cmdline/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +# +# SPDX-License-Identifier: BSD-3-Clause +# Copyright 2019, Olivier MATZ +# + +from setuptools import setup + +setup( + name='redalert', + version='0.1.0', + author='Olivier Matz', + author_email='zer0@droids-corp.org', + description='API and shell for red-alert device.', + long_description=open('README').read(), + url='http://www.droids-corp.org/', + license='BSD', + packages=['redalert'], + entry_points={ + 'console_scripts': [ + 'redalert-cli = redalert:main', + ] + }, + install_requires=['pyserial'], +)