--- /dev/null
+#! /usr/bin/env python3
+
+#
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright 2019, Olivier MATZ <zer0@droids-corp.org>
+#
+
+# XXX setup.py with pyserial
+# XXX lib
+# XXX arg parser
+# XXX fix help (12 bits, led width, ...)
+# XXX pylint
+
+import atexit
+import cmd
+import logging
+import os
+import readline
+from select import select
+import struct
+import sys
+import threading
+import time
+
+import serial
+
+SPM_PAGE_SIZE = 128
+
+logging.basicConfig()
+DEV_LOG = logging.getLogger('device')
+DEV_LOG.setLevel(logging.DEBUG)
+LOG = logging.getLogger('local')
+LOG.setLevel(logging.DEBUG)
+
+class ProgrammingError(Exception):
+ """
+ Raised when an error occurs while programming.
+ """
+ def __init__(self, message, exc=None):
+ Exception.__init__(self, message)
+ print("XXXXXXXXXXXXX")
+ print(message)
+ self.message = message
+ self.exc = exc
+
+ def __str__(self):
+ return '%s (%d)' % (self.message, self.errno)
+
+ def __repr__(self):
+ return '%s (%d)' % (self.message, self.errno)
+
+class RedAlertSerial(serial.Serial): # pylint: disable=too-many-ancestor
+ """
+ Communication with red-alert device through serial line.
+ """
+ CMD_LED_DEBUG = 0
+ CMD_BUZZER = 1
+ CMD_LED_GYRO_ALL = 2
+ CMD_LED_GYRO_RAY = 3
+ CMD_RESET = 5
+ CMD_BUZZER_VIBRATO = 6
+ VIBRATO_SAW = 1
+ VIBRATO_TRIANGLE = 2
+ VIBRATO_SIN = 3
+ VIBRATO_COS = 4
+ VIBRATO_SQUARE = 5
+ read_lock = threading.Lock()
+ logbuf = ''
+
+ def __init__(self, *args, log=True, **kwargs):
+ super().__init__(*args, **kwargs)
+ if log:
+ thread = threading.Thread(target=self._log_thread)
+ thread.daemon = True
+ thread.start()
+
+ def _log_thread(self):
+ """
+ Executed in a thread, it reads messages sent by the red-alert
+ device and display it as logs.
+ """
+ while True:
+ _, _, _ = select([self], [], [])
+ # something is readable, retry with lock held
+ self.read_lock.acquire()
+ try:
+ ins, _, _ = select([self], [], [], 0)
+ if not ins:
+ return
+ self.logbuf += self.read().decode()
+ # only log after \n
+ splitted = self.logbuf.split('\n')
+ for line in splitted[:-1]:
+ if line.strip():
+ DEV_LOG.debug(line)
+ self.logbuf = splitted[-1]
+ finally:
+ self.read_lock.release()
+
+ def _send_cmd(self, command, args):
+ """
+ Send a command to the device.
+ """
+ length = len(args) + 3
+ array = bytes([command, length]) + args
+ csum = sum(array)
+ csum = (csum >> 8) + (csum & 0xff)
+ csum = (csum >> 8) + (csum & 0xff)
+ csum = (~csum) & 0xff
+ array += bytes([csum])
+ self.write(array)
+
+ def led_debug(self, on):
+ """
+ Switch debug led on or off.
+
+ :arg bool on:
+ True to switch the led on.
+ """
+ self._send_cmd(self.CMD_LED_DEBUG, struct.pack('B', on))
+
+ def buzzer(self, period_us, duration_ms):
+ """
+ Emit a tone on the buzzer.
+
+ :arg int period_us:
+ The tone period in microseconds (ex: 500 for a 2Khz tone).
+ :arg int duration_ms:
+ The tone duration in milliseconds.
+ """
+ self._send_cmd(self.CMD_BUZZER,
+ struct.pack('>HH', period_us, duration_ms))
+
+ def buzzer_vibrato(self, period1_us, period2_us, duration_ms,
+ vibrato_period_ms, vibrato_type):
+ """
+ Emit a tone on the buzzer, with a vibrato effect (siren).
+
+ :arg int period1_us:
+ The first tone period in microseconds (ex: 500 for a 2Khz tone).
+ :arg int period2_us:
+ The second tone period in microseconds (ex: 500 for a 2Khz tone).
+ :arg int duration_ms:
+ The tone duration in milliseconds.
+ :arg int vibrato_period_ms:
+ The period of the vibrato in millisecond.
+ :arg int vibratotype:
+ The mode of the vibrato (saw, triangle, sin, ...)
+ """
+ self._send_cmd(self.CMD_BUZZER_VIBRATO,
+ struct.pack('>HHHHB', period1_us, period2_us,
+ duration_ms, vibrato_period_ms,
+ vibrato_type))
+
+ def led_gyro(self, intensity, stop_ms):
+ """
+ Switch on or off all gyro LEDs.
+
+ :arg int intensity:
+ LEDs intensity (0 = off, 65535 = max).
+ :arg int stop_ms:
+ Time in microseconds after which the LEDs are stopped. If 0,
+ keep the LEDs like this forever.
+ """
+ self._send_cmd(self.CMD_LED_GYRO_ALL,
+ struct.pack('>HHHH', intensity, 1000,
+ 0, stop_ms))
+
+ def led_gyro_blink(self, intensity, blink_on_ms, blink_off_ms, stop_ms):
+ """
+ Blink all gyro LEDs.
+
+ :arg int intensity:
+ LEDs intensity (0 = off, 65535 = max) when LEDs are on.
+ :arg int blink_on_ms:
+ Time in millisecond during which LEDs are switched on.
+ :arg int blink_off_ms:
+ Time in millisecond during which LEDs are switched off.
+ :arg int stop_ms:
+ Time in microseconds after which the LEDs are stopped. If 0,
+ keep the LEDs like this forever.
+ """
+ self._send_cmd(self.CMD_LED_GYRO_ALL,
+ struct.pack('>HHHH', intensity, blink_on_ms,
+ blink_off_ms, stop_ms))
+
+ # pylint:
+ def led_gyro_rotate(self, angle, intensity, width, rot_speed, stop_ms):
+ """
+ Switch on an area of gyro LEDs and perform a rotation.
+
+ :arg int angle:
+ Start angle (0 to 65535).
+ :arg int intensity:
+ LEDs intensity (0 = off, 65535 = max).
+ :arg int width:
+ Width of the area to be switched on (5461 = 1 LED, 65535 = all LEDs)
+ :arg int rot_speed:
+ Rotation speed is angle per 10ms (so 655 is ~1 round/sec).
+ :arg int stop_ms:
+ Time in microseconds after which the LEDs are stopped. If 0,
+ continue forever.
+ """
+ self._send_cmd(self.CMD_LED_GYRO_RAY,
+ struct.pack('>HHHhH', angle, intensity, width,
+ rot_speed, stop_ms))
+ def reset(self):
+ """
+ Send a command to reset the red-alert device.
+ """
+ self._send_cmd(self.CMD_RESET, bytes())
+
+ def _djb_hash(self, buf):
+ """
+ Hash the buffer, using a simple algorithm (the same is implemented
+ in the device).
+ """
+ hval = 5381
+ for char in buf:
+ hval = (hval * 33 + char) & 0xffffffff
+ return hval
+
+ def check_hash(self, buf, offset, size):
+ """
+ Process the hash of the buffer, ask for a hash of the flash area
+ on the avr, and check if value is the same. Return True if hash
+ is the same.
+
+ :arg bytes() buf:
+ The buffer to hash.
+ :arg int offset:
+ The offset in the flash.
+ :arg int size:
+ The size of the buffer in the flash.
+ """
+ # XXX
+ #if len(buf) != size:
+ # raise ProgrammingError('Invalid size: len(buf)=%d != size=%d' % (
+ # len(buf), size))
+
+ # go in hash mode
+ self.flushInput()
+ self.write(b'h')
+ line = self.readline()
+
+ # send addr
+ line = self.readline()
+ if not line.endswith(b'addr?\r\n'):
+ raise ProgrammingError('Failed to send address: <%s>' % line)
+ self.write(b'%x\n' % offset)
+
+ # send size
+ line = self.readline()
+ if not line.startswith(b'size?'):
+ raise ProgrammingError('Failed to send size: <%s>' % line)
+ self.write(b'%x\n' % size)
+
+ # compare hash
+ local_hash = self._djb_hash(buf[offset:offset+size])
+ avr_hash = int(self.readline()[0:8], 16)
+ if local_hash != avr_hash:
+ return False
+
+ return True
+
+ def _prog_page(self, addr, buf):
+ """
+ Program a page from buf at given addr.
+ """
+
+ # switch in program mode
+ self.flushInput()
+ self.write(b'p')
+ line = self.readline()
+
+ # send address
+ line = self.readline()
+ if not line.endswith(b'addr?\r\n'):
+ raise ProgrammingError('Failed to send address: <%s>' % line)
+ self.write(b'%x\n' % addr)
+ line = self.readline()
+ if not line.startswith(b'ok'):
+ raise ProgrammingError('Failed to send address: <%s>' % line)
+
+ # fill page with buf data
+ page = bytes(
+ [buf[i] if i < len(buf) else 0xff for i in range(SPM_PAGE_SIZE)])
+ self.write(page)
+
+ sys.stdout.write('.')
+ sys.stdout.flush()
+
+ # compare hash
+ avr_hash = int(self.readline()[0:8], 16)
+ local_hash = self._djb_hash(page)
+
+ if local_hash != avr_hash:
+ self.write(b'n')
+ raise ProgrammingError('Hash check failed: avr=%x prog=%x' % (
+ avr_hash, local_hash))
+
+ self.write(b'y')
+ line = self.readline()
+ if not line.startswith(b'OK'):
+ raise ProgrammingError('Failed to program page: <%s>' % line)
+
+ def _program(self, filename):
+ self._send_cmd(self.CMD_RESET, bytes())
+ time.sleep(0.5)
+
+ # XXX get default file
+
+ with open(filename, 'rb') as bin_file:
+ buf = bin_file.read()
+ if not buf:
+ raise ProgrammingError('Failed to program, empty file')
+
+ self.flushInput()
+ self.write(b'?')
+ line = self.readline()
+
+ line = self.readline()
+ if not b'p:prog_page' in line:
+ raise ProgrammingError('Failed to program: <%s>' % line)
+
+ self.flushInput()
+
+ addr = 0
+ while addr < len(buf):
+ if self.check_hash(buf, addr, SPM_PAGE_SIZE):
+ sys.stdout.write('*')
+ sys.stdout.flush()
+ self._prog_page(addr, buf[addr:addr+SPM_PAGE_SIZE])
+ addr += SPM_PAGE_SIZE
+ if not self.check_hash(buf, 0, len(buf)):
+ raise ProgrammingError('Failed to program, invalid global hash')
+
+ print('Done. Starting application.')
+ self.write(b'x')
+
+ def program(self, filename):
+ """
+ Program the device with the given file.
+
+ :arg str filename:
+ The path to the image file, in raw binary format.
+ """
+ self.read_lock.acquire()
+ try:
+ self._program(filename)
+ finally:
+ self.read_lock.release()
+
+
+class RedAlertShell(cmd.Cmd):
+ """
+ The interactive shell that controls the red-alert device.
+ """
+
+ intro = 'Welcome to the red-alert shell. Type help or ? to list commands.\n'
+ prompt = "red-alert> "
+
+ def __init__(self, raserial):
+ super().__init__()
+ self.serial = raserial
+
+ def _bad_args(self):
+ print('bad arguments: %r' % self.lastcmd)
+ self.do_help(self.lastcmd.split()[0])
+
+ def emptyline(self):
+ pass
+
+ @classmethod
+ def do_quit(cls, args): # pylint: disable=unused-argument
+ """Exit red-alert shell."""
+ return True
+
+ def do_reset(self, args): # pylint: disable=unused-argument
+ """Reset the gyro device."""
+ self.serial.reset()
+
+ def do_program(self, args):
+ """
+ Program the device. Must be in bootloader.
+ Syntax: program <file>
+ """
+ self.serial.program(args)
+
+ def do_led_debug(self, args):
+ """
+ Switch debug led on or off.
+ Syntax: led_debug on|off|blink
+ """
+ if args == 'on':
+ self.serial.led_debug(1)
+ elif args == 'off':
+ self.serial.led_debug(0)
+ elif args == 'blink':
+ for _ in range(5):
+ self.serial.led_debug(1)
+ time.sleep(0.1)
+ self.serial.led_debug(0)
+ time.sleep(0.1)
+ else:
+ self._bad_args()
+
+ def do_buzzer(self, args):
+ """
+ Play a frequency on the buzzer
+ Syntax: buzzer <period_us> <duration_ms>
+ """
+ try:
+ period_us, duration_ms = [int(x) for x in args.split()]
+ except (IndexError, ValueError):
+ self._bad_args()
+ return
+ self.serial.buzzer(period_us, duration_ms)
+
+ def do_buzzer_vibrato(self, args):
+ """
+ Play a frequency on the buzzer, changing the frequency
+ Syntax: buzzer_vibrato <period1_us> <period2_us> <duration_ms>
+ <vibrato_period_ms> <vibrato_type>
+ """
+ try:
+ (period1_us, period2_us, duration_ms, vibrato_period_ms,
+ vibrato_type) = [int(x) for x in args.split()]
+ except (IndexError, ValueError):
+ self._bad_args()
+ return
+ self.serial.buzzer_vibrato(period1_us, period2_us, duration_ms,
+ vibrato_period_ms, vibrato_type)
+
+ def do_led_gyro_all(self, args):
+ """
+ Control all leds at the same time with specified intensity.
+ Syntax: led_gyro_all <intensity> <blink_on_ms> <blink_off_ms> <stop_ms>
+ intensity: Led intensity between 0 and 4095
+ blink_ms: Time during which leds are on when blinking. If 0, don't
+ blink.
+ blink_off_ms: Time during which leds are off when blinking. If 0,
+ don't blink. Note that on + off times must be < 65536.
+ stop_ms: Time after which all the leds are switched off. If 0,
+ never stop.
+ """
+ try:
+ intensity, blink_on_ms, blink_off_ms, stop_ms = [
+ int(x) for x in args.split()]
+ except (IndexError, ValueError):
+ self._bad_args()
+ return
+ self.serial.led_gyro_blink(intensity, blink_on_ms, blink_off_ms, stop_ms)
+
+ def do_led_gyro_ray(self, args):
+ """
+ Switch on the leds at specified intensity in one direction.
+ Syntax: led_gyro_ray <angle> <intensity> <width> <rot_speed> <stop_ms>
+ angle: Angle of the ray, between 0 and 65535.
+ intensity: Led intensity between 0 and 4095.
+ width: Angle of the enabled zone, between 0 and 65535 (one led
+ is 6553).
+ rot_speed: Rotation speed is angle per 10ms (so 655 is ~1 round/sec).
+ stop_ms: Time after which all the leds are switched off. If 0,
+ never stop.
+ """
+ try:
+ angle, intensity, width, rot_speed, stop_ms = [
+ int(x) for x in args.split()]
+ except (IndexError, ValueError):
+ self._bad_args()
+ return
+ self.serial.led_gyro_rotate(angle, intensity, width, rot_speed, stop_ms)
+
+def main():
+ """
+ Connect to the red-alert device, and start an interactive shell
+ to control it.
+ """
+
+ histfile = os.path.join(os.environ["HOME"], ".redalert_history")
+ atexit.register(readline.write_history_file, histfile)
+ try:
+ readline.read_history_file(histfile)
+ except FileNotFoundError:
+ pass
+
+ device = "/dev/ttyUSB0"
+ if len(sys.argv) > 1:
+ device = sys.argv[1]
+ raserial = RedAlertSerial(device, baudrate=57600)
+ shell = RedAlertShell(raserial)
+ while 1:
+ try:
+ shell.cmdloop()
+ except KeyboardInterrupt:
+ print()
+ except: # pylint: disable=bare-except
+ LOG.exception('Exception in mainloop')
+ break
+
+if __name__ == "__main__":
+ main()