3 import os,sys,termios,atexit
5 from select import select
8 from matplotlib import pylab
17 warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__)
20 log = logging.getLogger("XbeeShell")
21 _handler = logging.StreamHandler()
22 _handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
23 log.addHandler(_handler)
27 def __init__(self, ser, filein, fileout=None):
30 self.fin = open(filein, "a", 0)
32 self.fileout = fileout
33 self.fout = open(fileout, "a", 0)
38 return self.ser.fileno()
39 def read(self, *args):
40 res = self.ser.read(*args)
46 def readline(self, *args):
47 res = self.ser.readline(*args)
51 class Interp(cmd.Cmd):
53 def __init__(self, tty, baudrate=57600):
54 cmd.Cmd.__init__(self)
55 self.ser = serial.Serial(tty,baudrate=baudrate,timeout=0.1)
56 self.escape = "\x01" # C-a
57 self.quitraw = "\x02" # C-b
58 self.serial_logging = False
59 self.default_in_log_file = "/tmp/xbee.in.log"
60 self.default_out_log_file = "/tmp/xbee.out.log"
62 def do_log(self, args):
63 """Activate serial logs.
64 log <filename> logs input and output to <filename>
65 log <filein> <fileout> logs input to <filein> and output to <fileout>
66 log logs to /tmp/microb.log or the last used file"""
68 if self.serial_logging:
69 log.error("Already logging to %s and %s" % (self.ser.filein,
72 self.serial_logging = True
73 files = [os.path.expanduser(x) for x in args.split()]
75 files = [self.default_in_log_file, self.default_out_log_file]
77 self.default_in_log_file = files[0]
78 self.default_out_log_file = None
80 self.default_in_log_file = files[0]
81 self.default_out_log_file = files[1]
83 print "Can't parse arguments"
85 self.ser = SerialLogger(self.ser, *files)
86 log.info("Starting serial logging to %s and %s" % (self.ser.filein,
90 def do_unlog(self, args):
91 if self.serial_logging:
92 log.info("Stopping serial logging to %s and %s" % (self.ser.filein,
94 self.ser = self.ser.ser
95 self.serial_logging = False
97 log.error("No log to stop")
99 def do_raw(self, args):
101 stdin = os.open("/dev/stdin",os.O_RDONLY)
102 stdout = os.open("/dev/stdout",os.O_WRONLY)
104 stdin_termios = termios.tcgetattr(stdin)
105 raw_termios = stdin_termios[:]
108 log.info("Switching to RAW mode")
111 raw_termios[0] &= ~(termios.IGNBRK | termios.BRKINT |
112 termios.PARMRK | termios.ISTRIP |
113 termios.INLCR | termios.IGNCR |
114 termios.ICRNL | termios.IXON)
116 raw_termios[1] &= ~termios.OPOST;
118 raw_termios[2] &= ~(termios.CSIZE | termios.PARENB);
119 raw_termios[2] |= termios.CS8;
121 raw_termios[3] &= ~(termios.ECHO | termios.ECHONL |
122 termios.ICANON | termios.ISIG |
125 termios.tcsetattr(stdin, termios.TCSADRAIN, raw_termios)
129 ins,outs,errs=select([stdin,self.ser],[],[])
136 self.ser.write(self.escape)
137 elif c == self.quitraw:
140 self.ser.write(self.escape)
148 os.write(stdout,self.ser.read())
150 termios.tcsetattr(stdin, termios.TCSADRAIN, stdin_termios)
151 log.info("Back to normal mode")
153 def do_dump_stats(self, args):
154 """send hello at power 4 and dump stats regulary"""
156 self.ser.write("write power-level 4\n")
157 prev_reset = time.time()
160 # send hellos during 1 s
161 self.ser.write("rc_proto_hello wing 30 30 tototiti\n")
164 # send a reset every 60 s
166 if t - prev_reset > 60:
167 self.ser.write("write soft-reset\n")
170 # display time & stats
171 self.ser.fin.write("\nTIME:%s\n"%(time.time()))
172 self.ser.write("rc_proto_stats show\n")
174 l = self.ser.readline()
178 except KeyboardInterrupt:
182 if __name__ == "__main__":
184 import readline,atexit
188 histfile = os.path.join(os.environ["HOME"], ".xbee")
189 atexit.register(readline.write_history_file, histfile)
191 readline.read_history_file(histfile)
195 device = "/dev/ttyS0"
196 if len(sys.argv) > 1:
198 interp = Interp(device)
203 except KeyboardInterrupt:
208 log.exception("%s" % l.splitlines()[-1])