]> git.droids-corp.org - red-alert.git/commitdiff
add command line
authorOlivier Matz <zer0@droids-corp.org>
Wed, 28 Aug 2019 21:52:42 +0000 (23:52 +0200)
committerOlivier Matz <zer0@droids-corp.org>
Wed, 28 Aug 2019 21:52:42 +0000 (23:52 +0200)
cmdline/MANIFEST.in [new file with mode: 0644]
cmdline/README [new file with mode: 0644]
cmdline/redalert/__init__.py [new file with mode: 0644]
cmdline/setup.py [new file with mode: 0644]

diff --git a/cmdline/MANIFEST.in b/cmdline/MANIFEST.in
new file mode 100644 (file)
index 0000000..4210178
--- /dev/null
@@ -0,0 +1,6 @@
+include README
+include MANIFEST.in
+graft redalert
+global-exclude *.pyc
+global-exclude *.pyo
+global-exclude *~
diff --git a/cmdline/README b/cmdline/README
new file mode 100644 (file)
index 0000000..874a0a5
--- /dev/null
@@ -0,0 +1,8 @@
+Python3 library and command line for red-alert device
+=====================================================
+
+The library provides helpers to send commands to the red-alert device
+through the USB-serial interface, to control the LEDs and the buzzer.
+
+The command line interface exposes these commands in a shell, for
+testing/debug purposes.
diff --git a/cmdline/redalert/__init__.py b/cmdline/redalert/__init__.py
new file mode 100644 (file)
index 0000000..fa5f3eb
--- /dev/null
@@ -0,0 +1,503 @@
+#! /usr/bin/env python3
+
+#
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright 2019, Olivier MATZ <zer0@droids-corp.org>
+#
+
+# XXX setup.py with pyserial
+# XXX lib
+# XXX arg parser
+# XXX fix help (12 bits, led width, ...)
+# XXX pylint
+
+import atexit
+import cmd
+import logging
+import os
+import readline
+from select import select
+import struct
+import sys
+import threading
+import time
+
+import serial
+
+SPM_PAGE_SIZE = 128
+
+logging.basicConfig()
+DEV_LOG = logging.getLogger('device')
+DEV_LOG.setLevel(logging.DEBUG)
+LOG = logging.getLogger('local')
+LOG.setLevel(logging.DEBUG)
+
+class ProgrammingError(Exception):
+    """
+    Raised when an error occurs while programming.
+    """
+    def __init__(self, message, exc=None):
+        Exception.__init__(self, message)
+        print("XXXXXXXXXXXXX")
+        print(message)
+        self.message = message
+        self.exc = exc
+
+    def __str__(self):
+        return '%s (%d)' % (self.message, self.errno)
+
+    def __repr__(self):
+        return '%s (%d)' % (self.message, self.errno)
+
+class RedAlertSerial(serial.Serial): # pylint: disable=too-many-ancestor
+    """
+    Communication with red-alert device through serial line.
+    """
+    CMD_LED_DEBUG = 0
+    CMD_BUZZER = 1
+    CMD_LED_GYRO_ALL = 2
+    CMD_LED_GYRO_RAY = 3
+    CMD_RESET = 5
+    CMD_BUZZER_VIBRATO = 6
+    VIBRATO_SAW = 1
+    VIBRATO_TRIANGLE = 2
+    VIBRATO_SIN = 3
+    VIBRATO_COS = 4
+    VIBRATO_SQUARE = 5
+    read_lock = threading.Lock()
+    logbuf = ''
+
+    def __init__(self, *args, log=True, **kwargs):
+        super().__init__(*args, **kwargs)
+        if log:
+            thread = threading.Thread(target=self._log_thread)
+            thread.daemon = True
+            thread.start()
+
+    def _log_thread(self):
+        """
+        Executed in a thread, it reads messages sent by the red-alert
+        device and display it as logs.
+        """
+        while True:
+            _, _, _ = select([self], [], [])
+            # something is readable, retry with lock held
+            self.read_lock.acquire()
+            try:
+                ins, _, _ = select([self], [], [], 0)
+                if not ins:
+                    return
+                self.logbuf += self.read().decode()
+                # only log after \n
+                splitted = self.logbuf.split('\n')
+                for line in splitted[:-1]:
+                    if line.strip():
+                        DEV_LOG.debug(line)
+                self.logbuf = splitted[-1]
+            finally:
+                self.read_lock.release()
+
+    def _send_cmd(self, command, args):
+        """
+        Send a command to the device.
+        """
+        length = len(args) + 3
+        array = bytes([command, length]) + args
+        csum = sum(array)
+        csum = (csum >> 8) + (csum & 0xff)
+        csum = (csum >> 8) + (csum & 0xff)
+        csum = (~csum) & 0xff
+        array += bytes([csum])
+        self.write(array)
+
+    def led_debug(self, on):
+        """
+        Switch debug led on or off.
+
+        :arg bool on:
+          True to switch the led on.
+        """
+        self._send_cmd(self.CMD_LED_DEBUG, struct.pack('B', on))
+
+    def buzzer(self, period_us, duration_ms):
+        """
+        Emit a tone on the buzzer.
+
+        :arg int period_us:
+          The tone period in microseconds (ex: 500 for a 2Khz tone).
+        :arg int duration_ms:
+          The tone duration in milliseconds.
+        """
+        self._send_cmd(self.CMD_BUZZER,
+                       struct.pack('>HH', period_us, duration_ms))
+
+    def buzzer_vibrato(self, period1_us, period2_us, duration_ms,
+                       vibrato_period_ms, vibrato_type):
+        """
+        Emit a tone on the buzzer, with a vibrato effect (siren).
+
+        :arg int period1_us:
+          The first tone period in microseconds (ex: 500 for a 2Khz tone).
+        :arg int period2_us:
+          The second tone period in microseconds (ex: 500 for a 2Khz tone).
+        :arg int duration_ms:
+          The tone duration in milliseconds.
+        :arg int vibrato_period_ms:
+          The period of the vibrato in millisecond.
+        :arg int vibratotype:
+          The mode of the vibrato (saw, triangle, sin, ...)
+        """
+        self._send_cmd(self.CMD_BUZZER_VIBRATO,
+                       struct.pack('>HHHHB', period1_us, period2_us,
+                                   duration_ms, vibrato_period_ms,
+                                   vibrato_type))
+
+    def led_gyro(self, intensity, stop_ms):
+        """
+        Switch on or off all gyro LEDs.
+
+        :arg int intensity:
+          LEDs intensity (0 = off, 65535 = max).
+        :arg int stop_ms:
+          Time in microseconds after which the LEDs are stopped. If 0,
+          keep the LEDs like this forever.
+        """
+        self._send_cmd(self.CMD_LED_GYRO_ALL,
+                       struct.pack('>HHHH', intensity, 1000,
+                                   0, stop_ms))
+
+    def led_gyro_blink(self, intensity, blink_on_ms, blink_off_ms, stop_ms):
+        """
+        Blink all gyro LEDs.
+
+        :arg int intensity:
+          LEDs intensity (0 = off, 65535 = max) when LEDs are on.
+        :arg int blink_on_ms:
+          Time in millisecond during which LEDs are switched on.
+        :arg int blink_off_ms:
+          Time in millisecond during which LEDs are switched off.
+        :arg int stop_ms:
+          Time in microseconds after which the LEDs are stopped. If 0,
+          keep the LEDs like this forever.
+        """
+        self._send_cmd(self.CMD_LED_GYRO_ALL,
+                       struct.pack('>HHHH', intensity, blink_on_ms,
+                                   blink_off_ms, stop_ms))
+
+    # pylint: 
+    def led_gyro_rotate(self, angle, intensity, width, rot_speed, stop_ms):
+        """
+        Switch on an area of gyro LEDs and perform a rotation.
+
+        :arg int angle:
+          Start angle (0 to 65535).
+        :arg int intensity:
+          LEDs intensity (0 = off, 65535 = max).
+        :arg int width:
+          Width of the area to be switched on (5461 = 1 LED, 65535 = all LEDs)
+        :arg int rot_speed:
+          Rotation speed is angle per 10ms (so 655 is ~1 round/sec).
+        :arg int stop_ms:
+          Time in microseconds after which the LEDs are stopped. If 0,
+          continue forever.
+        """
+        self._send_cmd(self.CMD_LED_GYRO_RAY,
+                       struct.pack('>HHHhH', angle, intensity, width,
+                                   rot_speed, stop_ms))
+    def reset(self):
+        """
+        Send a command to reset the red-alert device.
+        """
+        self._send_cmd(self.CMD_RESET, bytes())
+
+    def _djb_hash(self, buf):
+        """
+        Hash the buffer, using a simple algorithm (the same is implemented
+        in the device).
+        """
+        hval = 5381
+        for char in buf:
+            hval = (hval * 33 + char) & 0xffffffff
+        return hval
+
+    def check_hash(self, buf, offset, size):
+        """
+        Process the hash of the buffer, ask for a hash of the flash area
+        on the avr, and check if value is the same. Return True if hash
+        is the same.
+
+        :arg bytes() buf:
+          The buffer to hash.
+        :arg int offset:
+          The offset in the flash.
+        :arg int size:
+          The size of the buffer in the flash.
+        """
+        # XXX
+        #if len(buf) != size:
+        #    raise ProgrammingError('Invalid size: len(buf)=%d != size=%d' % (
+        #        len(buf), size))
+
+        # go in hash mode
+        self.flushInput()
+        self.write(b'h')
+        line = self.readline()
+
+        # send addr
+        line = self.readline()
+        if not line.endswith(b'addr?\r\n'):
+            raise ProgrammingError('Failed to send address: <%s>' % line)
+        self.write(b'%x\n' % offset)
+
+        # send size
+        line = self.readline()
+        if not line.startswith(b'size?'):
+            raise ProgrammingError('Failed to send size: <%s>' % line)
+        self.write(b'%x\n' % size)
+
+        # compare hash
+        local_hash = self._djb_hash(buf[offset:offset+size])
+        avr_hash = int(self.readline()[0:8], 16)
+        if local_hash != avr_hash:
+            return False
+
+        return True
+
+    def _prog_page(self, addr, buf):
+        """
+        Program a page from buf at given addr.
+        """
+
+        # switch in program mode
+        self.flushInput()
+        self.write(b'p')
+        line = self.readline()
+
+        # send address
+        line = self.readline()
+        if not line.endswith(b'addr?\r\n'):
+            raise ProgrammingError('Failed to send address: <%s>' % line)
+        self.write(b'%x\n' % addr)
+        line = self.readline()
+        if not line.startswith(b'ok'):
+            raise ProgrammingError('Failed to send address: <%s>' % line)
+
+        # fill page with buf data
+        page = bytes(
+            [buf[i] if i < len(buf) else 0xff for i in range(SPM_PAGE_SIZE)])
+        self.write(page)
+
+        sys.stdout.write('.')
+        sys.stdout.flush()
+
+        # compare hash
+        avr_hash = int(self.readline()[0:8], 16)
+        local_hash = self._djb_hash(page)
+
+        if local_hash != avr_hash:
+            self.write(b'n')
+            raise ProgrammingError('Hash check failed: avr=%x prog=%x' % (
+                avr_hash, local_hash))
+
+        self.write(b'y')
+        line = self.readline()
+        if not line.startswith(b'OK'):
+            raise ProgrammingError('Failed to program page: <%s>' % line)
+
+    def _program(self, filename):
+        self._send_cmd(self.CMD_RESET, bytes())
+        time.sleep(0.5)
+
+        # XXX get default file
+
+        with open(filename, 'rb') as bin_file:
+            buf = bin_file.read()
+        if not buf:
+            raise ProgrammingError('Failed to program, empty file')
+
+        self.flushInput()
+        self.write(b'?')
+        line = self.readline()
+
+        line = self.readline()
+        if not b'p:prog_page' in line:
+            raise ProgrammingError('Failed to program: <%s>' % line)
+
+        self.flushInput()
+
+        addr = 0
+        while addr < len(buf):
+            if self.check_hash(buf, addr, SPM_PAGE_SIZE):
+                sys.stdout.write('*')
+                sys.stdout.flush()
+            self._prog_page(addr, buf[addr:addr+SPM_PAGE_SIZE])
+            addr += SPM_PAGE_SIZE
+        if not self.check_hash(buf, 0, len(buf)):
+            raise ProgrammingError('Failed to program, invalid global hash')
+
+        print('Done. Starting application.')
+        self.write(b'x')
+
+    def program(self, filename):
+        """
+        Program the device with the given file.
+
+        :arg str filename:
+          The path to the image file, in raw binary format.
+        """
+        self.read_lock.acquire()
+        try:
+            self._program(filename)
+        finally:
+            self.read_lock.release()
+
+
+class RedAlertShell(cmd.Cmd):
+    """
+    The interactive shell that controls the red-alert device.
+    """
+
+    intro = 'Welcome to the red-alert shell. Type help or ? to list commands.\n'
+    prompt = "red-alert> "
+
+    def __init__(self, raserial):
+        super().__init__()
+        self.serial = raserial
+
+    def _bad_args(self):
+        print('bad arguments: %r' % self.lastcmd)
+        self.do_help(self.lastcmd.split()[0])
+
+    def emptyline(self):
+        pass
+
+    @classmethod
+    def do_quit(cls, args): # pylint: disable=unused-argument
+        """Exit red-alert shell."""
+        return True
+
+    def do_reset(self, args): # pylint: disable=unused-argument
+        """Reset the gyro device."""
+        self.serial.reset()
+
+    def do_program(self, args):
+        """
+        Program the device. Must be in bootloader.
+        Syntax: program <file>
+        """
+        self.serial.program(args)
+
+    def do_led_debug(self, args):
+        """
+        Switch debug led on or off.
+        Syntax: led_debug on|off|blink
+        """
+        if args == 'on':
+            self.serial.led_debug(1)
+        elif args == 'off':
+            self.serial.led_debug(0)
+        elif args == 'blink':
+            for _ in range(5):
+                self.serial.led_debug(1)
+                time.sleep(0.1)
+                self.serial.led_debug(0)
+                time.sleep(0.1)
+        else:
+            self._bad_args()
+
+    def do_buzzer(self, args):
+        """
+        Play a frequency on the buzzer
+        Syntax: buzzer <period_us> <duration_ms>
+        """
+        try:
+            period_us, duration_ms = [int(x) for x in args.split()]
+        except (IndexError, ValueError):
+            self._bad_args()
+            return
+        self.serial.buzzer(period_us, duration_ms)
+
+    def do_buzzer_vibrato(self, args):
+        """
+        Play a frequency on the buzzer, changing the frequency
+        Syntax: buzzer_vibrato <period1_us> <period2_us> <duration_ms>
+                               <vibrato_period_ms> <vibrato_type>
+        """
+        try:
+            (period1_us, period2_us, duration_ms, vibrato_period_ms,
+             vibrato_type) = [int(x) for x in args.split()]
+        except (IndexError, ValueError):
+            self._bad_args()
+            return
+        self.serial.buzzer_vibrato(period1_us, period2_us, duration_ms,
+                                   vibrato_period_ms, vibrato_type)
+
+    def do_led_gyro_all(self, args):
+        """
+        Control all leds at the same time with specified intensity.
+        Syntax: led_gyro_all <intensity> <blink_on_ms> <blink_off_ms> <stop_ms>
+        intensity:    Led intensity between 0 and 4095
+        blink_ms:     Time during which leds are on when blinking. If 0, don't
+                      blink.
+        blink_off_ms: Time during which leds are off when blinking. If 0,
+                      don't blink. Note that on + off times must be < 65536.
+        stop_ms:      Time after which all the leds are switched off. If 0,
+                      never stop.
+        """
+        try:
+            intensity, blink_on_ms, blink_off_ms, stop_ms = [
+                int(x) for x in args.split()]
+        except (IndexError, ValueError):
+            self._bad_args()
+            return
+        self.serial.led_gyro_blink(intensity, blink_on_ms, blink_off_ms, stop_ms)
+
+    def do_led_gyro_ray(self, args):
+        """
+        Switch on the leds at specified intensity in one direction.
+        Syntax: led_gyro_ray <angle> <intensity> <width> <rot_speed> <stop_ms>
+        angle:        Angle of the ray, between 0 and 65535.
+        intensity:    Led intensity between 0 and 4095.
+        width:        Angle of the enabled zone, between 0 and 65535 (one led
+                      is 6553).
+        rot_speed:    Rotation speed is angle per 10ms (so 655 is ~1 round/sec).
+        stop_ms:      Time after which all the leds are switched off. If 0,
+                      never stop.
+        """
+        try:
+            angle, intensity, width, rot_speed, stop_ms = [
+                int(x) for x in args.split()]
+        except (IndexError, ValueError):
+            self._bad_args()
+            return
+        self.serial.led_gyro_rotate(angle, intensity, width, rot_speed, stop_ms)
+
+def main():
+    """
+    Connect to the red-alert device, and start an interactive shell
+    to control it.
+    """
+
+    histfile = os.path.join(os.environ["HOME"], ".redalert_history")
+    atexit.register(readline.write_history_file, histfile)
+    try:
+        readline.read_history_file(histfile)
+    except FileNotFoundError:
+        pass
+
+    device = "/dev/ttyUSB0"
+    if len(sys.argv) > 1:
+        device = sys.argv[1]
+    raserial = RedAlertSerial(device, baudrate=57600)
+    shell = RedAlertShell(raserial)
+    while 1:
+        try:
+            shell.cmdloop()
+        except KeyboardInterrupt:
+            print()
+        except: # pylint: disable=bare-except
+            LOG.exception('Exception in mainloop')
+        break
+
+if __name__ == "__main__":
+    main()
diff --git a/cmdline/setup.py b/cmdline/setup.py
new file mode 100644 (file)
index 0000000..cdb062f
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+#
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright 2019, Olivier MATZ <zer0@droids-corp.org>
+#
+
+from setuptools import setup
+
+setup(
+    name='redalert',
+    version='0.1.0',
+    author='Olivier Matz',
+    author_email='zer0@droids-corp.org',
+    description='API and shell for red-alert device.',
+    long_description=open('README').read(),
+    url='http://www.droids-corp.org/',
+    license='BSD',
+    packages=['redalert'],
+    entry_points={
+        'console_scripts': [
+            'redalert-cli = redalert:main',
+        ]
+    },
+    install_requires=['pyserial'],
+)