--- /dev/null
+#! /usr/bin/env python
+
+import os,sys,termios,atexit
+import serial
+from select import select
+import cmd
+#import pylab
+from matplotlib import pylab
+from math import *
+
+import struct
+import numpy
+import shlex
+import time
+import math
+import warnings
+warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__)
+
+import logging
+log = logging.getLogger("XbeeShell")
+_handler = logging.StreamHandler()
+_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
+log.addHandler(_handler)
+log.setLevel(1)
+
+class SerialLogger:
+ def __init__(self, ser, filein, fileout=None):
+ self.ser = ser
+ self.filein = filein
+ self.fin = open(filein, "a", 0)
+ if fileout:
+ self.fileout = fileout
+ self.fout = open(fileout, "a", 0)
+ else:
+ self.fileout = filein
+ self.fout = self.fin
+ def fileno(self):
+ return self.ser.fileno()
+ def read(self, *args):
+ res = self.ser.read(*args)
+ self.fin.write(res)
+ return res
+ def write(self, s):
+ self.fout.write(s)
+ self.ser.write(s)
+ def readline(self, *args):
+ res = self.ser.readline(*args)
+ self.fin.write(res)
+ return res
+
+class Interp(cmd.Cmd):
+ prompt = "Xbee> "
+ def __init__(self, tty, baudrate=57600):
+ cmd.Cmd.__init__(self)
+ self.ser = serial.Serial(tty,baudrate=baudrate,timeout=0.1)
+ self.escape = "\x01" # C-a
+ self.quitraw = "\x02" # C-b
+ self.serial_logging = False
+ self.default_in_log_file = "/tmp/xbee.in.log"
+ self.default_out_log_file = "/tmp/xbee.out.log"
+
+ def do_log(self, args):
+ """Activate serial logs.
+ log <filename> logs input and output to <filename>
+ log <filein> <fileout> logs input to <filein> and output to <fileout>
+ log logs to /tmp/microb.log or the last used file"""
+
+ if self.serial_logging:
+ log.error("Already logging to %s and %s" % (self.ser.filein,
+ self.ser.fileout))
+ else:
+ self.serial_logging = True
+ files = [os.path.expanduser(x) for x in args.split()]
+ if len(files) == 0:
+ files = [self.default_in_log_file, self.default_out_log_file]
+ elif len(files) == 1:
+ self.default_in_log_file = files[0]
+ self.default_out_log_file = None
+ elif len(files) == 2:
+ self.default_in_log_file = files[0]
+ self.default_out_log_file = files[1]
+ else:
+ print "Can't parse arguments"
+
+ self.ser = SerialLogger(self.ser, *files)
+ log.info("Starting serial logging to %s and %s" % (self.ser.filein,
+ self.ser.fileout))
+
+
+ def do_unlog(self, args):
+ if self.serial_logging:
+ log.info("Stopping serial logging to %s and %s" % (self.ser.filein,
+ self.ser.fileout))
+ self.ser = self.ser.ser
+ self.serial_logging = False
+ else:
+ log.error("No log to stop")
+
+ def do_raw(self, args):
+ "Switch to RAW mode"
+ stdin = os.open("/dev/stdin",os.O_RDONLY)
+ stdout = os.open("/dev/stdout",os.O_WRONLY)
+
+ stdin_termios = termios.tcgetattr(stdin)
+ raw_termios = stdin_termios[:]
+
+ try:
+ log.info("Switching to RAW mode")
+
+ # iflag
+ raw_termios[0] &= ~(termios.IGNBRK | termios.BRKINT |
+ termios.PARMRK | termios.ISTRIP |
+ termios.INLCR | termios.IGNCR |
+ termios.ICRNL | termios.IXON)
+ # oflag
+ raw_termios[1] &= ~termios.OPOST;
+ # cflag
+ raw_termios[2] &= ~(termios.CSIZE | termios.PARENB);
+ raw_termios[2] |= termios.CS8;
+ # lflag
+ raw_termios[3] &= ~(termios.ECHO | termios.ECHONL |
+ termios.ICANON | termios.ISIG |
+ termios.IEXTEN);
+
+ termios.tcsetattr(stdin, termios.TCSADRAIN, raw_termios)
+
+ mode = "normal"
+ while True:
+ ins,outs,errs=select([stdin,self.ser],[],[])
+ for x in ins:
+ if x == stdin:
+ c = os.read(stdin,1)
+ if mode == "escape":
+ mode =="normal"
+ if c == self.escape:
+ self.ser.write(self.escape)
+ elif c == self.quitraw:
+ return
+ else:
+ self.ser.write(self.escape)
+ self.ser.write(c)
+ else:
+ if c == self.escape:
+ mode = "escape"
+ else:
+ self.ser.write(c)
+ elif x == self.ser:
+ os.write(stdout,self.ser.read())
+ finally:
+ termios.tcsetattr(stdin, termios.TCSADRAIN, stdin_termios)
+ log.info("Back to normal mode")
+
+ def do_dump_stats(self, args):
+ """send hello at power 4 and dump stats regulary"""
+
+ self.ser.write("write power-level 4\n")
+ prev_reset = time.time()
+ try:
+ while True:
+ # send hellos during 1 s
+ self.ser.write("rc_proto_hello wing 30 30 tototiti\n")
+ time.sleep(1.5)
+
+ # send a reset every 60 s
+ t = time.time()
+ if t - prev_reset > 60:
+ self.ser.write("write soft-reset\n")
+ prev_reset = t
+
+ # display time & stats
+ self.ser.fin.write("\nTIME:%s\n"%(time.time()))
+ self.ser.write("rc_proto_stats show\n")
+ while True:
+ l = self.ser.readline()
+ sys.stdout.write(l)
+ if l == "":
+ break
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__ == "__main__":
+ try:
+ import readline,atexit
+ except ImportError:
+ pass
+ else:
+ histfile = os.path.join(os.environ["HOME"], ".xbee")
+ atexit.register(readline.write_history_file, histfile)
+ try:
+ readline.read_history_file(histfile)
+ except IOError:
+ pass
+
+ device = "/dev/ttyS0"
+ if len(sys.argv) > 1:
+ device = sys.argv[1]
+ interp = Interp(device)
+ interp.do_log("")
+ while 1:
+ try:
+ interp.cmdloop()
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ l = str(e).strip()
+ if l:
+ log.exception("%s" % l.splitlines()[-1])
+ continue
+ break