From: Olivier Matz Date: Wed, 3 Jun 2015 08:26:59 +0000 (+0200) Subject: initial revision of pycol X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=HEAD;p=pycol.git initial revision of pycol Signed-off-by: Olivier Matz --- 3f0a304fd615d6f69aa7c524a64e79e548eb4a75 diff --git a/clicmd.py b/clicmd.py new file mode 100644 index 0000000..56206b9 --- /dev/null +++ b/clicmd.py @@ -0,0 +1,918 @@ +#!/usr/bin/python3 + +# todo +# - command name +# - better parser +# - readline: http://bip.weizmann.ac.il/course/python/PyMOTW/PyMOTW/docs/readline/index.html +# - expression parser +# - more commands (int, ip, ether, date, file, ...) +# - how to specify help for commands? +# - gerer les guillemets, les retours chariot, les espaces +# - to_str() -> repr() ? +# - ambiguous match as an exception? +# - use logging module +# - cmd -> subcmd +# - should display command help string if no help for token: how to do it + +""" +match object: + +show interface |all [detailed] + ethif: EthIfCmd() + +show interface all + { "show": "show", + "interface": "interface" + "all": "all" } + +show interface eth0 detailed + { "show": "show", + "interface": "interface", + "ethif": "eth0", + "detailed": "detailed" } + + + +filter [in ]/[out ]/[proto ipv4]/[proto ipv6] drop|pass + in-iface: EthIfCmd() + out-iface: EthIfCmd() + + +Les tokens nommés sont remplis dans un dictionnaire +Impossible d'avoir 2 fois le même nom, a verifier dans le createur de commande + -> faux. exemple au dessus. + +Sinon comment gerer le ambiguous match? + + + + +complete object: + +Pour chaque completion possible (meme vide ?), il faut retourner: + - la chaine du token complet + - l'aide du token (ou l'objet) + - la commande ? (ou alors ça c'est géré dans la partie rdline) + + + + + +legacy = blabla VERSION{1,2} +new = blabla wpa1/wpa2 + +legacy = blublu IFACE{1,*} +new = blublu + + +pour la définition d'un token, on pourrait avoir une fonction +display() qui permettrait d'afficher ( [ ...]) +a la place de +(ou pas) + + +""" + +import shlex, os +import expparser +import re +import socket + +_PYCOL_PKG_DIR = os.path.dirname(__file__) +DEBUG = False + +def debug(*args, **kvargs): + if DEBUG: + print(*args, **kvargs) + +class PycolMatch(dict): + def __init__(self): + pass + def __repr__(self): + return ""%(dict.__repr__(self)) + def merge(self, match_res): + for k in match_res: + self[k] = match_res[k] + +# XXX describe that it's a list of tuples +class PycolCompleteList(list): + def __init__(self, l = []): + if not isinstance(l, list): + l = [l] + list.__init__(self, l) + def __repr__(self): + return ""%(list.__repr__(self)) + def merge(self, comp_res): + for e in comp_res: + if e in self: + continue + self.append(e) + # XXX own(): keep the list but set token ref + +class PycolComplete(object): + def __init__(self, token, cmd, terminal = True): + self.token = token + self.cmd = cmd + self.terminal = terminal + +class PycolContext(list): + def __init__(self, l = []): + list.__init__(self, l) + def complete(self, tokens): + res = PycolCompleteList() + for cmd in self: + res2 = cmd.complete(tokens) + res.merge(res2) + return res + +class CliCmd: + def __init__(self, key = None, cb = None, help_str = None): + self.key = key + assert(key == None or type(key) is str), "key is not a string: %s"%(key) + self.cb = cb + self.help_str = help_str + assert(help_str == None or type(help_str) is str), \ + "help_str is not a string: %s"%(help_str) + self.default_help_str = "no help" + + def match(self, tokens): + """return a match object + len(tokens) always >= 1 + the last token is the one to complete (may be empty) + the first tokens are the ones to match + """ + return PycolMatch() + + def complete(self, tokens): + # XXX return None when no completion + # does not match + m = self.match(tokens) + if m != None or (len(tokens) == 1 and tokens[0] == ""): + return PycolCompleteList(PycolComplete(tokens[0], self)) + return PycolCompleteList() + + def to_expr(self): + if self.key != None: + return self.key + else: + return str(self) + + def test_match(self, in_buf): + tokens = shlex.split(in_buf, comments = True) + match_result = self.match(tokens) + debug("test match: %s %s"%(tokens, match_result)) + if match_result == None: + return False + else: + return True + + # quick test for assert + def test_complete(self, in_buf): + complete_result = PycolCompleteList() + tokens = shlex.split(in_buf, comments = False) + # whitespace after the first token, it means we want to complete + # the second token + debug("test_complete: %s", in_buf) + if len(tokens) == 0 or not in_buf.endswith(tokens[-1]): + tokens.append("") + complete_result = self.complete(tokens) + debug("test complete: %s %s"%(tokens, complete_result)) + return list(map(lambda x:x.token, complete_result)) + + def __repr__(self): + return "" + + # XXX say that it can return a multiple lines string + def get_help(self): + if self.help_str == None: + return self.default_help_str + else: + return self.help_str + +class CmdBuilder(CliCmd): + def __init__(self, expr, token_desc, key = None, cb = None, help_str = None): + super().__init__(key = key, cb = cb, help_str = help_str) + assert(isinstance(expr,str)), "expr must be a string" + assert(type(token_desc) is dict), "token_desc must be a dict" + self.token_desc = token_desc + parser = expparser.PycolExprParser() + tokens = parser.tokenize(expr) + expr = parser.parse(tokens) + debug(expr.to_str()) + self.token_desc_list = [] + self.cmd = self.__expr2clicmd(expr, token_desc) + + def match(self, tokens): + m = self.cmd.match(tokens) + if m != None and self.key != None: + m[self.key] = " ".join(tokens) + return m + + def complete(self, tokens): + return self.cmd.complete(tokens) + + def to_expr(self): + return self.cmd.to_expr() + + def __repr__(self): + return ""%(self.cmd) + + def __expr2clicmd(self, expr, token_desc): + l = [] + for c in expr.children: + l.append(self.__expr2clicmd(c, token_desc)) + if expr.is_var(): + varname = expr.op + assert(varname in token_desc) + cmd = token_desc[varname] + cmd.key = varname + # keep the order of the variables for help displaying + self.token_desc_list.append(varname) + elif expr.op == " ": + cmd = SeqCliCmd(l) + elif expr.op == "|": + cmd = OrCliCmd(l) + elif expr.op == "[": + cmd = OptionalCliCmd(l[0]) + elif expr.op == "(": + cmd = BypassCliCmd(l[0]) + #elif expr.op == " ": #XXX + return cmd + + def get_help(self): + if self.help_str == None: + help_str = self.default_help_str + else: + help_str = self.help_str + for t in self.token_desc_list: + help_str += "\n %s: %s"%(t, self.token_desc[t].get_help()) + return help_str + + def to_expr(self): + return self.cmd.to_expr() + +class TextCliCmd(CliCmd): + def __init__(self, text, key = None, cb = None, help_str = None): + super().__init__(key = key, cb = cb, help_str = help_str) + assert(not " " in text) # XXX find better? + self.text = text + debug("TextCliCmd(%s)"%text) + + def match(self, tokens): + if tokens == [self.text]: + res = PycolMatch() + if self.key: + res[self.key] = self.text + return res + return None + + def complete(self, tokens): + # more than one token as input, the string does not match + if len(tokens) > 1: + return PycolCompleteList() + # the beginning of the string does not match + if len(tokens) == 1 and not self.text.startswith(tokens[0]): + return PycolCompleteList() + # complete + return PycolCompleteList(PycolComplete(self.text, self)) + + def to_expr(self): + return self.text + + def __repr__(self): + return ""%(self.text) + + +text_cmd = TextCliCmd("toto") + +assert(text_cmd.test_match("") == False) +assert(text_cmd.test_match("tot") == False) +assert(text_cmd.test_match("toto toto") == False) +assert(text_cmd.test_match(" toto") == True) +assert(text_cmd.test_match("toto ") == True) +assert(text_cmd.test_match(" toto \n\n") == True) +assert(text_cmd.test_match("toto # coin") == True) +assert(text_cmd.test_match("toto") == True) + +assert(text_cmd.test_complete("") == ["toto"]) +assert(text_cmd.test_complete("to") == ["toto"]) +assert(text_cmd.test_complete("tot") == ["toto"]) +assert(text_cmd.test_complete(" tot") == ["toto"]) +assert(text_cmd.test_complete("toto") == ["toto"]) +assert(text_cmd.test_complete("d") == []) +assert(text_cmd.test_complete("toto ") == []) +assert(text_cmd.test_complete("toto#") == []) + +class SeqCliCmd(CliCmd): + def __init__(self, cmdlist, key = None): + self.cmdlist = cmdlist + self.key = key + debug("SeqCliCmd(%s)"%cmdlist) + + def match(self, tokens): + # only one command, try to match all tokens + if len(self.cmdlist) == 1: + m = self.cmdlist[0].match(tokens) + if m != None and self.key != None: + m[self.key] = " ".join(tokens) + return m + # several commands, try to match the first command with 1 to N tokens, + # and do a recursive call for the following + n_match = 0 + ret = PycolMatch() + for i in range(len(tokens)+1): + m = self.cmdlist[0].match(tokens[:i]) + if m == None: + continue + m2 = SeqCliCmd(self.cmdlist[1:]).match(tokens[i:]) + if m2 == None: + continue + if n_match >= 1: + EXC() + ret.merge(m) + ret.merge(m2) + if self.key != None: + ret[self.key] = " ".join(tokens[:i]) + n_match += 1 + if n_match > 0: + return ret + return None + + def complete(self, tokens): + debug("----------complete seq <%s>"%tokens) + + match_tokens = tokens[:-1] + complete_token = tokens[-1] + + # there is no match_token (only whitespaces), try to complete the first + # command + if len(match_tokens) == 0: + return self.cmdlist[0].complete(tokens) + + debug("match_tokens=<%s> complete_token=<%s>"%(match_tokens, complete_token)) + res = PycolCompleteList() + + # try to match match_tokens with 1 to N commands + for i in range(1, len(self.cmdlist)): + # if it does not match, continue + if SeqCliCmd(self.cmdlist[0:i]).match(match_tokens) == None: + continue + # XXX res.own() + # if it matches, try to complete the last token + res2 = self.cmdlist[i].complete([complete_token]) + res.merge(res2) + + return res + + def to_expr(self): + return " ".join([c.to_expr() for c in self.cmdlist]) + + def __repr__(self): + return ""%(str(self.cmdlist)) + +seq_cmd = SeqCliCmd([TextCliCmd("toto"), TextCliCmd("titi"), TextCliCmd("tutu")]) +assert(seq_cmd.test_match("") == False) +assert(seq_cmd.test_match("toto") == False) +assert(seq_cmd.test_match("titi") == False) +assert(seq_cmd.test_match("toto d") == False) +assert(seq_cmd.test_match("toto # titi tutu") == False) +assert(seq_cmd.test_match("toto titi tutu") == True) +assert(seq_cmd.test_match(" toto titi tutu") == True) +assert(seq_cmd.test_match("toto titi tutu #") == True) +assert(seq_cmd.test_match("toto titi tutu") == True) + +assert(seq_cmd.test_complete("") == ["toto"]) +assert(seq_cmd.test_complete("toto ") == ["titi"]) +assert(seq_cmd.test_complete("toto t") == ["titi"]) +assert(seq_cmd.test_complete("toto") == ["toto"]) +assert(seq_cmd.test_complete("titi") == []) +assert(seq_cmd.test_complete("toto d") == []) +assert(seq_cmd.test_complete("toto # titi tutu") == []) +assert(seq_cmd.test_complete("toto titi tut") == ["tutu"]) +assert(seq_cmd.test_complete(" toto titi tut") == ["tutu"]) +assert(seq_cmd.test_complete("toto titi tutu #") == []) +assert(seq_cmd.test_complete("toto titi tutu") == ["tutu"]) + + +class OrCliCmd(CliCmd): + def __init__(self, cmdlist, key = None): + self.cmdlist = cmdlist + self.key = key + debug("OrCliCmd(%s)"%cmdlist) + + def match(self, tokens): + # try to match all commands + for cmd in self.cmdlist: + ret = cmd.match(tokens) + if ret == None: + continue + if self.key != None: + ret[self.key] = " ".join(tokens) + return ret + return None + + def complete(self, tokens): + debug("----------complete or <%s>"%tokens) + + res = PycolCompleteList() + for cmd in self.cmdlist: + res2 = cmd.complete(tokens) + res.merge(res2) + + return res + + def to_expr(self): + return "|".join([c.to_expr() for c in self.cmdlist]) + + def __repr__(self): + return ""%(str(self.cmdlist)) + +or_cmd = OrCliCmd([TextCliCmd("toto"), TextCliCmd("titi"), TextCliCmd("tutu")]) +assert(or_cmd.test_match("") == False) +assert(or_cmd.test_match("toto") == True) +assert(or_cmd.test_match("titi") == True) +assert(or_cmd.test_match("tutu") == True) +assert(or_cmd.test_match(" toto # titi tutu") == True) +assert(or_cmd.test_match("toto titi tutu") == False) +assert(or_cmd.test_match(" toto t") == False) +assert(or_cmd.test_match("toto d#") == False) + +assert(or_cmd.test_complete("") == ["toto", "titi", "tutu"]) +assert(or_cmd.test_complete("t") == ["toto", "titi", "tutu"]) +assert(or_cmd.test_complete("to") == ["toto"]) +assert(or_cmd.test_complete(" tot") == ["toto"]) +assert(or_cmd.test_complete(" titi") == ["titi"]) +assert(or_cmd.test_complete(" titi to") == []) +assert(or_cmd.test_complete("titid") == []) +assert(or_cmd.test_complete(" ti#") == []) + + + +class OptionalCliCmd(CliCmd): + def __init__(self, cmd, key = None): + self.cmd = cmd + self.key = key + + def match(self, tokens): + # match an empty buffer + if len(tokens) == 0: + ret = PycolMatch() + if self.key != None: + ret[self.key] = "" + return ret + # else, try to match sub command + ret = self.cmd.match(tokens) + if ret != None and self.key != None: + ret[self.key] = " ".join(tokens) + return ret + + def complete(self, tokens): + debug("----------complete optional <%s>"%tokens) + return self.cmd.complete(tokens) + + def to_expr(self): + return "[" + self.cmd.to_expr() + "]" + + def __repr__(self): + return ""%(self.cmd) + +opt_cmd = OptionalCliCmd(TextCliCmd("toto")) +assert(opt_cmd.test_match("") == True) +assert(opt_cmd.test_match("toto") == True) +assert(opt_cmd.test_match(" toto ") == True) +assert(opt_cmd.test_match(" toto # tutu") == True) +assert(opt_cmd.test_match("titi") == False) +assert(opt_cmd.test_match("toto titi") == False) +assert(opt_cmd.test_match(" toto t") == False) +assert(opt_cmd.test_match("toto d#") == False) + +assert(opt_cmd.test_complete("") == ["toto"]) +assert(opt_cmd.test_complete("t") == ["toto"]) +assert(opt_cmd.test_complete("to") == ["toto"]) +assert(opt_cmd.test_complete(" tot") == ["toto"]) +assert(opt_cmd.test_complete(" ") == ["toto"]) +assert(opt_cmd.test_complete(" titi to") == []) +assert(opt_cmd.test_complete("titid") == []) +assert(opt_cmd.test_complete(" ti#") == []) + +class BypassCliCmd(CliCmd): + def __init__(self, cmd, key = None): + self.cmd = cmd + self.key = key + + def match(self, tokens): + m = self.cmd.match(tokens) + if m != None and self.key != None: + m[self.key] = " ".join(tokens) + return m + + def complete(self, tokens): + return self.cmd.complete(tokens) + + def to_expr(self): + return "(" + self.cmd.to_expr() + ")" + + def __repr__(self): + return ""%(self.cmd) + + +class AnyTextCliCmd(CliCmd): + def __init__(self, key = None, help_str = None): + debug("AnyTextCliCmd()") + self.key = key + # XXX factorize all help and key in mother class? + if help_str != None: + self.help_str = help_str + else: + self.help_str = "Anytext token" + + def match(self, tokens): + debug("----------match anytext <%s>"%tokens) + if len(tokens) != 1: + return None + res = PycolMatch() + if self.key: + res[self.key] = tokens[0] + return res + + def complete(self, tokens): + debug("----------complete anytext <%s>"%tokens) + # does not match + if len(tokens) != 1: + return PycolCompleteList() + # match, but cannot complete + return PycolCompleteList(PycolComplete(tokens[0], self)) + + def to_expr(self): + return "" # XXX + + def __repr__(self): + return "" + + def get_help(self): + return self.help_str + + +anytext_cmd = AnyTextCliCmd() + +assert(anytext_cmd.test_match("") == False) +assert(anytext_cmd.test_match("toto toto") == False) +assert(anytext_cmd.test_match(" toto") == True) +assert(anytext_cmd.test_match("toto ") == True) +assert(anytext_cmd.test_match(" toto \n\n") == True) +assert(anytext_cmd.test_match("toto # coin") == True) +assert(anytext_cmd.test_match("toto") == True) + +assert(anytext_cmd.test_complete("") == [""]) +assert(anytext_cmd.test_complete("to") == ["to"]) +assert(anytext_cmd.test_complete("tot") == ["tot"]) +assert(anytext_cmd.test_complete(" tot") == ["tot"]) +assert(anytext_cmd.test_complete("toto") == ["toto"]) +#assert(anytext_cmd.test_complete("toto#") == []) #XXX later + + + + +class IntCliCmd(CliCmd): + def __init__(self, val_min = None, val_max = None, key = None, + help_str = None): + debug("IntCliCmd()") + self.key = key + # XXX factorize all help and key in mother class? + if help_str != None: + self.help_str = help_str + else: + self.help_str = "Int token" + # XXX val_min val_max + + def match(self, tokens): + debug("----------match int <%s>"%tokens) + if len(tokens) != 1: + return None + try: + val = int(tokens[0]) + except: + return None + res = PycolMatch() + if self.key: + res[self.key] = val + return res + + def to_expr(self): + return "" # XXX + + def __repr__(self): + return "" + + def get_help(self): + return self.help_str + + +int_cmd = IntCliCmd() + +assert(int_cmd.test_match("") == False) +assert(int_cmd.test_match("toto") == False) +assert(int_cmd.test_match(" 13") == True) +assert(int_cmd.test_match("-12342 ") == True) +assert(int_cmd.test_match(" 33 \n\n") == True) +assert(int_cmd.test_match("1 # coin") == True) +assert(int_cmd.test_match("0") == True) + +assert(int_cmd.test_complete("") == [""]) +assert(int_cmd.test_complete("1") == ["1"]) +assert(int_cmd.test_complete("12") == ["12"]) +assert(int_cmd.test_complete(" ee") == []) + + +class RegexpCliCmd(CliCmd): + def __init__(self, regexp, cmd = None, key = None, + store_re_match = False, help_str = None): + debug("RegexpCliCmd()") + self.regexp = regexp + self.key = key + self.cmd = cmd + self.store_re_match = store_re_match + # XXX factorize all help and key in mother class? + if help_str != None: + self.help_str = help_str + else: + self.help_str = "Regexp token" + + def match(self, tokens): + debug("----------match regexp <%s>"%tokens) + if len(tokens) != 1: + return None + m = re.fullmatch(self.regexp, tokens[0]) + if m == None: + return None + if self.cmd != None: + res = self.cmd.match(tokens) + if res == None: + return None + else: + res = PycolMatch() + if self.key: + if self.store_re_match: + res[self.key] = m + else: + res[self.key] = tokens[0] + return res + + def complete(self, tokens): + if self.cmd == None: + return CliCmd.complete(self, tokens) + res = PycolCompleteList() + completions = self.cmd.complete(tokens) + for c in completions: + if self.match([c[0]]): + res.append(PycolComplete(c[0], self)) + return res + + def __repr__(self): + return "" + + def get_help(self): + return self.help_str + + +regexp_cmd = RegexpCliCmd("x[123]") + +assert(regexp_cmd.test_match("") == False) +assert(regexp_cmd.test_match("toto") == False) +assert(regexp_cmd.test_match(" x1") == True) +assert(regexp_cmd.test_match("x3 ") == True) +assert(regexp_cmd.test_match(" x2\n\n") == True) +assert(regexp_cmd.test_match("x1 # coin") == True) + +assert(regexp_cmd.test_complete("") == [""]) # XXX handle this case at upper level? +assert(regexp_cmd.test_complete("x1") == ["x1"]) +assert(regexp_cmd.test_complete(" x2") == ["x2"]) +assert(regexp_cmd.test_complete(" ee") == []) + + + +class IPv4CliCmd(CliCmd): + def __init__(self, cmd = None, key = None, help_str = None): + debug("IPv4CliCmd()") + self.key = key + self.cmd = cmd + # XXX factorize all help and key in mother class? + if help_str != None: + self.help_str = help_str + else: + self.help_str = "IPv4 token" + + def match(self, tokens): + debug("----------match IPv4 <%s>"%tokens) + if len(tokens) != 1: + return None + try: + val = socket.inet_aton(tokens[0]) + except: + return None + if len(tokens[0].split(".")) != 4: + return None + res = PycolMatch() + if self.key: + res[self.key] = tokens[0] + return res + + def complete(self, tokens): + if self.cmd == None: + return CliCmd.complete(self, tokens) + res = PycolCompleteList() + completions = self.cmd.complete(tokens) + for c in completions: + if self.match([c[0]]): + res.append(PycolComplete(c[0], self)) + return res + + def to_expr(self): + return "" # XXX + + def __repr__(self): + return "" + + def get_help(self): + return self.help_str + + +ipv4_cmd = IPv4CliCmd() + +assert(ipv4_cmd.test_match("") == False) +assert(ipv4_cmd.test_match("toto") == False) +assert(ipv4_cmd.test_match(" 13.3.1.3") == True) +assert(ipv4_cmd.test_match("255.255.255.255 ") == True) +assert(ipv4_cmd.test_match(" 0.0.0.0 \n\n") == True) +assert(ipv4_cmd.test_match("1.2.3.4 # coin") == True) +assert(ipv4_cmd.test_match("300.2.2.2") == False) + +assert(ipv4_cmd.test_complete("") == [""]) +assert(ipv4_cmd.test_complete("1.2.3.4") == ["1.2.3.4"]) +assert(ipv4_cmd.test_complete(" ee") == []) + + + + +class FileCliCmd(CliCmd): + def __init__(self, key = None, help_str = None): + debug("FileCliCmd()") + self.key = key + # XXX factorize all help and key in mother class? + if help_str != None: + self.help_str = help_str + else: + self.help_str = "File token" + + def match(self, tokens): + debug("----------match File <%s>"%tokens) + if len(tokens) != 1: + return None + if not os.path.exists(tokens[0]): + return None + res = PycolMatch() + if self.key: + res[self.key] = tokens[0] + return res + + def complete(self, tokens): + debug("----------complete File <%s>"%tokens) + res = PycolCompleteList() + if len(tokens) != 1: + return res + dirname = os.path.dirname(tokens[0]) + basename = os.path.basename(tokens[0]) + if dirname != "" and not os.path.exists(dirname): + return res + debug("dirname=%s basename=%s"%(dirname, basename)) + if dirname == "": + ls = os.listdir() + else: + ls = os.listdir(dirname) + debug("ls=%s"%(str(ls))) + for f in ls: + path = os.path.join(dirname, f) + debug(path) + if path.startswith(tokens[0]): + if os.path.isdir(path): + path += "/" + res.append(PycolComplete(path, self, terminal = False)) + else: + res.append(PycolComplete(path, self, terminal = True)) + + debug(res) + return res + + def to_expr(self): + return "" # XXX + + def __repr__(self): + return "" + + def get_help(self): + return self.help_str + + +file_cmd = FileCliCmd() + +assert(file_cmd.test_match("") == False) +assert(file_cmd.test_match("DEDEDzx") == False) +assert(file_cmd.test_match("/tmp") == True) +assert(file_cmd.test_match("/etc/passwd ") == True) + +assert(file_cmd.test_complete("/tm") == ["/tmp/"]) +assert(file_cmd.test_complete(" eededezezzzc") == []) + + + + +class ChoiceCliCmd(CliCmd): + def __init__(self, choice, key = None, cb = None, help_str = None): + super().__init__(key = key, cb = cb, help_str = help_str) + if isinstance(choice, str): + self.get_list_func = lambda:[choice] + elif isinstance(choice, list): + self.get_list_func = lambda:choice + else: # XXX isinstance(func) + self.get_list_func = choice + debug("ChoiceCliCmd(%s)"%self.get_list_func) + + def match(self, tokens): + # more than one token as input, does not match + if len(tokens) > 1: + return None + l = self.get_list_func() + for e in l: + if tokens == [e]: + res = PycolMatch() + if self.key: + res[self.key] = e + return res + return None + + def complete(self, tokens): + # more than one token as input, does not match + if len(tokens) > 1: + return PycolCompleteList() + l = self.get_list_func() + complete = PycolCompleteList() + for e in l: + # the beginning of the string does not match + if len(tokens) == 1 and not e.startswith(tokens[0]): + continue + complete.append(PycolComplete(e, self)) + return complete + + def __repr__(self): + return ""%(self.get_list_func) + + +choice_cmd = ChoiceCliCmd(["toto", "titi"]) + +assert(choice_cmd.test_match("") == False) +assert(choice_cmd.test_match("tot") == False) +assert(choice_cmd.test_match("toto toto") == False) +assert(choice_cmd.test_match(" toto") == True) +assert(choice_cmd.test_match("titi ") == True) +assert(choice_cmd.test_match(" toto \n\n") == True) +assert(choice_cmd.test_match("toto # coin") == True) +assert(choice_cmd.test_match("toto") == True) + +assert(choice_cmd.test_complete("") == ["toto", "titi"]) +assert(choice_cmd.test_complete("to") == ["toto"]) +assert(choice_cmd.test_complete("tot") == ["toto"]) +assert(choice_cmd.test_complete(" tot") == ["toto"]) +assert(choice_cmd.test_complete("toto") == ["toto"]) +assert(choice_cmd.test_complete("d") == []) +assert(choice_cmd.test_complete("toto ") == []) +assert(choice_cmd.test_complete("toto#") == []) + + +if __name__ == '__main__': + # XXX add tests + cmd = CmdBuilder( + "toto a|b [coin] [bar]", + token_desc = { + "toto": TextCliCmd("toto", help_str = "help for toto"), + "a": TextCliCmd("a", help_str = "help for a"), + "b": TextCliCmd("b", help_str = "help for b"), + "coin": TextCliCmd("coin", help_str = "help for coin"), + "bar": TextCliCmd("bar", help_str = "help for bar"), + } + ) + print(cmd.to_expr()) + assert(cmd.test_match("toto a") == True) + assert(cmd.test_match("toto b coin") == True) + assert(cmd.test_match("toto") == False) + + + +# float + +# file + +# mac + +# ipv6 + +# date diff --git a/example.py b/example.py new file mode 100644 index 0000000..aebbbac --- /dev/null +++ b/example.py @@ -0,0 +1,150 @@ +#!/usr/bin/python3 + +import pycol +from clicmd import * +import sys +import traceback +import textwrap + +def callback(cmdline, kvargs): + print("match: %s"%(str(kvargs))) + +normal = "\033[0m" +bold = "\033[1m" + +ip_list = [] + +def help_callback(cmdline, kvargs): + wrapper = textwrap.TextWrapper(initial_indent=" ", + subsequent_indent=" ") + wrapper2 = textwrap.TextWrapper(initial_indent=" ", + subsequent_indent=" ") + for c in cmdline.context: + # XXX if terminal supports it, use bold and italic + print("%s%s%s"%(bold, c.to_expr(), normal)) + lines = c.get_help().split("\n") + print("\n".join(wrapper.wrap(lines[0]))) + for l in lines[1:]: + print("\n".join(wrapper2.wrap(l))) + # XXX use a pager + +def ip_callback_add(cmdline, kvargs): + global ip_list + if kvargs[""] in ip_list: + print("already in list") + return + ip_list.append(kvargs[""]) + +def ip_callback_del(cmdline, kvargs): + global ip_list + ip_list.remove(kvargs[""]) + +def ip_callback_info(cmdline, kvargs): + global ip_list + if "" in kvargs: + print("%s is in list"%kvargs[""]) + else: + print("%s is not in list"%kvargs[""]) + +ctx = PycolContext([ + CmdBuilder( + expr = "toto a|b [coin] [bar]", + token_desc = { + "toto": TextCliCmd("toto", help_str = "help for toto"), + "a": TextCliCmd("a", help_str = "help for a"), + "b": TextCliCmd("b", help_str = "help for b"), + "coin": TextCliCmd("coin", help_str = "help for coin"), + "bar": TextCliCmd("bar", help_str = "help for bar"), + }, + cb = callback, + help_str = "help for command toto. This is a very very " + + "long long help to check that the wrapper is able to " + + "wrap the text properly." + ), + + CmdBuilder( + expr = "titi [pouet] ", + token_desc = { + "titi": TextCliCmd("titi", help_str = "help for titi"), + "pouet": TextCliCmd("pouet", help_str = "help for pouet"), + "": AnyTextCliCmd(help_str = "help for anytext"), + "": IntCliCmd(help_str = "help for count"), + }, + cb = callback, + help_str = "help for command titi" + ), + + CmdBuilder( + expr = "regexp ", + token_desc = { + "regexp": TextCliCmd("regexp", help_str = "help for regexp"), + "": RegexpCliCmd("0x([0-9a-fA-F])", help_str = "an hex number"), + }, + cb = callback, + help_str = "help for command regexp" + ), + + CmdBuilder( + expr = "ip add ", + token_desc = { + "ip": TextCliCmd("ip", help_str = "help for ip"), + "add": TextCliCmd("add", help_str = "help for add"), + "": IPv4CliCmd(help_str = "An IPv4 address. The format of " + + "an IPv4 address is A.B.C.D, each letter is a number" + + "between 0 and 255."), + }, + cb = ip_callback_add, + help_str = "help for command ip" + ), + + CmdBuilder( + expr = "ip del ", + token_desc = { + "ip": TextCliCmd("ip", help_str = "help for ip"), + "del": TextCliCmd("del", help_str = "help for del"), + "": ChoiceCliCmd(ip_list, help_str = "an IP previously " + + "added"), + }, + cb = ip_callback_del, + help_str = "help for command ip" + ), + + CmdBuilder( + expr = "ip info |", # XXX talk about prio ! + token_desc = { + "ip": TextCliCmd("ip", help_str = "help for ip"), + "info": TextCliCmd("info", help_str = "help for info"), + "": ChoiceCliCmd(ip_list, help_str = "an IP previously " + + "added"), + "": IPv4CliCmd(help_str = "An IPv4 address"), + }, + cb = ip_callback_info, + help_str = "help for command ip" + ), + + CmdBuilder( + expr = "file ", + token_desc = { + "file": TextCliCmd("file", help_str = "Help for file. The file " + + "can be either a directory or a regular file."), + "": FileCliCmd(help_str = "a file"), + }, + cb = callback, + help_str = "help for command file" + ), + + CmdBuilder( + expr = "help", + token_desc = { + "help": TextCliCmd("help", help_str = "help for help"), + }, + cb = help_callback, + help_str = "help for command toto" + ), +]) + +cmdline = pycol.Cmdline() +cmdline.add_context("my_context", ctx) +cmdline.set_context("my_context") +cmdline.set_prompt("my_context> ") +cmdline.input_loop() diff --git a/expparser.py b/expparser.py new file mode 100644 index 0000000..2f276bf --- /dev/null +++ b/expparser.py @@ -0,0 +1,323 @@ +#!/usr/bin/python3 + +# todo +# - better checks before starting to parse (ex: no double spaces ?) +# - better error messages +# - document expressions +# - unit tests +# - reduce binary tree in tree +# - handle quotes? +# - better comments and names (operator is valid for '(' ?) +# - ignore chars +# - pylint +# - tests only if main +# - "not" operator? useful? +# - we could completely remove all_ops + +DEBUG = False + +def debug(*args, **kvargs): + if DEBUG: + print(*args, **kvargs) + +class PycolExprParser(object): + def __init__(self, binary_ops = None, unary_ops = None, + group_ops = None, var_chars = None): + if binary_ops == None: + # ordered by priority + self.binary_ops = [ "|", "/", " " ] + else: + self.binary_ops = binary_ops + if unary_ops == None: + self.unary_ops = [ ] + else: + self.unary_ops = unary_ops + if group_ops == None: + self.group_ops = { "(" : ")", "[" : "]" } + else: + self.group_ops = group_ops + if var_chars == None: + self.var_chars = "<>-_abcdefghijklmnopqrstuvwxyz" + \ + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + else: + self.var_chars = var_chars + + self.all_ops = [] + self.all_ops += self.binary_ops + self.all_ops += self.unary_ops + self.all_ops += self.group_ops.keys() + self.all_ops += self.group_ops.values() + + # return operator priority (high value = high priority) + def op_prio(self, op): + assert(isinstance(op, str)) + if not op in self.binary_ops: + return 0 + return self.binary_ops.index(op) + 1 + + # The argument tokens is an list of tokens that must start with an + # opening group operator, for instance an opening bracket '('. This + # function returns the list of tokens representing the inbound + # expression without the first opening operator and its corresponding + # closing operator + def get_group(self, tokens): + debug(tokens, self.group_ops.keys()) + assert(tokens[0] in self.group_ops.keys()) + stack = [] + + for i, t in enumerate(tokens): + debug((i,t)) + if t in self.group_ops.keys(): + stack.append(t) + elif t == self.group_ops[stack[-1]]: + stack.pop() + if len(stack) == 0: + break + assert(len(stack) == 0) + debug(tokens[1:i]) + return tokens[1:i] + + # convert a string into a list of tokens + def tokenize(self, s): + debug("tokenize(%s)"%(s)) + tokens = [] + while s != "": + # try to parse an operator + op_found = False + for op in self.all_ops: + if s.startswith(op): + tokens.append(op) + s = s[len(op):] + op_found = True + break + if op_found: + continue + # try to parse a variable name + length = 0 + for c in s: + if c in self.var_chars: + length += 1 + else: + break + if length == 0: + debug(tokens) + debug("invalid token") + excp() + tokens.append(s[:length]) + s = s[length:] + debug("tokenize() return %s"%(tokens)) + return tokens + + def binary_parse(self, tokens): + top = None + exp = None + debug(tokens) + while len(tokens) > 0: + t = tokens.pop(0) + exp = PycolBinExpr(t, self) + debug("token '%s'"%t) + + if t in self.binary_ops: + # if it is the first node, error + if top == None: + debug("expression starts with a binary operator") + exce() + + # the rightest leaf must be a variable + tmp = top + while tmp.right != None: + tmp = tmp.right + if not tmp.is_var(): + debug("the rightest leaf must be a variable") + exce() + + # if new node priority is higher than root node, it + # becomes the new root + if self.op_prio(t) > self.op_prio(top.op): + exp.left = top + top = exp + # else, exp is placed at the "rightest" leaf. Check + # that blabla XXX + else: + tmp = top + while tmp.right != None and \ + self.op_prio(exp.op) < self.op_prio(tmp.right.op): + tmp = tmp.right + + if tmp.right == None: + debug("invalid right node") + exce() + + exp.left = tmp.right + tmp.right = exp + + elif t in self.group_ops: + # if operator is an opening bracket, recursively call + # ourself with the sub-expression + gr = self.get_group([t] + tokens) + exp.right = self.binary_parse(gr[:]) + if exp.right == None: + exce() + tokens = tokens[len(gr) + 1:] + debug("subexp parsed, tokens=%s"%tokens) + + # if it is the first node + if top == None: + top = exp; + continue + + # exp is placed at the "rightest" leaf. Check + # that our parent is not a variable + tmp = top + while tmp.right != None: + tmp = tmp.right + if tmp.is_var(): + debug("the rightest leaf must not be a variable") + exce() + + tmp.right = exp + + else: # unary operator or variable + debug("unary operator or variable") + + # if it is the first node + if top == None: + top = exp; + continue + + # exp is placed at the "rightest" leaf. Check + # that our parent is not a variable + tmp = top + while tmp.right != None: + tmp = tmp.right + if tmp.is_var(): + debug("the rightest leaf must not be a variable") + exce() + + tmp.right = exp + + if top == None: + debug("no top node") + exce() + + # the rightest leaf must be a variable + tmp = top + while tmp.right != None: + tmp = tmp.right + if not tmp.is_var(): + debug("the rightest leaf must be a variable") + exce() + + return top + + def parse(self, tokens): + bin_expr = self.binary_parse(tokens) + expr = PycolExpr(bin_expr) + expr.reduce() + return expr + +class PycolBinExpr(object): + def __init__(self, op, parser): + debug("PycolBinExpr(%s)"%op) + self.op = op + self.parser = parser + self.left = None + self.right = None + + def dump(self, level=0): + print("vertice %d %s@0x%x"%(level, self.op, self.__hash__())) + if self.left: + self.left.dump(level+1) + if self.right: + self.right.dump(level+1) + + def to_str(self): + # it's a variable name + if self.is_var(): + return self.op + s = "" + # dump left expression if it's a binary operator (in case of + # unary operator, we only use the right child) + if self.op in self.parser.binary_ops: + return "%s%s%s"%(self.left.to_str(), self.op, self.right.to_str()) + + # group operator + if self.op in self.parser.group_ops.keys(): + return "%s%s%s"%(self.op, self.right.to_str(), + self.parser.group_ops[self.op]) + + # unary + return "%s%s"%(self.op, self.right) + + def is_var(self): + return not self.op in self.parser.all_ops + +class PycolExpr(object): + def __init__(self, bin_expr): + self.op = bin_expr.op + self.parser = bin_expr.parser + self.children = [] + if bin_expr.left != None: + self.children.append(PycolExpr(bin_expr.left)) + if bin_expr.right != None: + self.children.append(PycolExpr(bin_expr.right)) + + def dump(self, level=0): + print("vertice %d %s@0x%x"%(level, self.op, self.__hash__())) + for c in self.children: + c.dump(level+1) + + def to_str(self): + # it's a variable name + if self.is_var(): + return self.op + s = "" + + # binary operators + if self.op in self.parser.binary_ops: + for i, c in enumerate(self.children): + if i == 0: + s += "%s"%(c.to_str()) + else: + s += "%s%s"%(self.op, c.to_str()) + return s + + # group operator + if self.op in self.parser.group_ops.keys(): + return "%s%s%s"%(self.op, self.children[0].to_str(), + self.parser.group_ops[self.op]) + + # unary + return "%s%s"%(self.op, self.children[0]) + + def is_var(self): + return not self.op in self.parser.all_ops + + def reduce(self): + for c in self.children: + c.reduce() + children = [] + for c in self.children: + if c.op == self.op: + children += c.children + else: + children += [c] + self.children = children + +if __name__ == '__main__': + s = "[opt1/opt2] (|a|(b|c)|[x]) (no bidule)|coin" + #s = "x|y" + #tokenize(s) + + parser = PycolExprParser() + tokens = parser.tokenize(s) + e = parser.parse(tokens) + print(e.dump()) + print(e.to_str()) + e.reduce() + print(e.dump()) + print(e.to_str()) + + #print(get_group(tokenize("(fr fr (fr<3>)(xe)) sss"))) + #print(get_group(tokenize("<(fr fr> (fr<3>)(xe))> sss"))) + diff --git a/gre.py b/gre.py new file mode 100644 index 0000000..8e2284f --- /dev/null +++ b/gre.py @@ -0,0 +1,322 @@ +#!/usr/bin/python3 + +import pycol +from clicmd import * +import sys +import traceback +import textwrap +import xml.etree.ElementTree as ET +import xml.dom.minidom +import pydoc + +default_conf = """ + + + + gre1 + GRE Tunnel + + + gre1 + 1.1.1.1 + 1.1.1.2 + + + + + gre2 + GRE Tunnel + + + gre2 + 2.2.2.1 + 2.2.2.2 + + + + + +""" + +xmlconf = ET.XML(default_conf) +cur_xmlconf = xmlconf +ip_list = [] + +# from http://stackoverflow.com/questions/749796/pretty-printing-xml-in-python +def xml_indent(elem, level=0): + i = "\n" + level*" " + if len(elem): + if not elem.text or not elem.text.strip(): + elem.text = i + " " + if not elem.tail or not elem.tail.strip(): + elem.tail = i + for elem in elem: + xml_indent(elem, level+1) + if not elem.tail or not elem.tail.strip(): + elem.tail = i + else: + if level and (not elem.tail or not elem.tail.strip()): + elem.tail = i + +def text_indent(s, indent = 0, end = "\n"): + return " " * indent + s + end + +def help_callback(cmdline, kvargs): + cmd_wr = textwrap.TextWrapper(initial_indent=" ", + subsequent_indent=" ") + token_wr = textwrap.TextWrapper(initial_indent=" ", + subsequent_indent=" ") + s = "" + for c in cmdline.context: + s += "%s\n"%(c.to_expr()) + lines = c.get_help().split("\n") + # first line is the command + its help + s += ("\n".join(cmd_wr.wrap(lines[0]))) + "\n" + for l in lines[1:]: + s += ("\n".join(token_wr.wrap(l))) + "\n" + pydoc.pager(s) + +# used in main and gre context +help_cmd = CmdBuilder( + expr = "help", + token_desc = { + "help": TextCliCmd("help", help_str = "Show the help."), + }, + cb = help_callback, + help_str = "help for command toto" +) + +def get_interface_text_conf(conf, indent = 0): + s = "" + local = conf.find("encapsulation/gre-tunnel/local-endpoint") + remote = conf.find("encapsulation/gre-tunnel/remote-endpoint") + if local != None and remote != None: + bind_str = "bind %s %s"%(local.text, remote.text) + vrfid = conf.find("encapsulation/gre-tunnel/link-vrf-id") + if vrfid != None: + bind_str += " link-vrfid %s"%(vrfid.text) + s += text_indent(bind_str, indent) + key = conf.find("encapsulation/gre-tunnel/key") + if key != None: + if key.text == None: + s += text_indent("key enable", indent) + else: + s += text_indent("key %s"%(key.text), indent) + checksum = conf.find("encapsulation/gre-tunnel/checksum") + if checksum != None: + s += text_indent("checksum enable", indent) + return s + +def get_main_text_conf(conf, indent = 0): + s = text_indent("# GRE", indent) + for node in xmlconf.findall("interfaces/interface"): + s += text_indent(node.find("name").text, indent) + s += get_interface_text_conf(node, indent=(indent + 4)) + return s + +def display_cb(cmdline, kvargs): + if "xml" in kvargs: + xml_indent(cur_xmlconf) + s = ET.tostring(cur_xmlconf, method="xml", encoding="unicode") + else: + if cmdline.context == main_ctx: + s = get_main_text_conf(cur_xmlconf) + else: + s = get_interface_text_conf(cur_xmlconf) + pydoc.pager(s) + +# used in main and gre context +display_cmd = CmdBuilder( + expr = "display [xml]", + token_desc = { + "display": TextCliCmd("display", help_str = "Display the " + + "current configuration."), + "xml": TextCliCmd("xml", help_str = "If present, dump in XML format."), + }, + cb = display_cb, + help_str = "Display the current configuration." +) + +def bind_cb(cmdline, kvargs): + tunnel = cur_xmlconf.find("encapsulation/gre-tunnel") + local = tunnel.find("local-endpoint") + if local == None: + local = ET.XML("") + tunnel.append(local) + local.text = kvargs[""] + remote = tunnel.find("remote-endpoint") + if remote == None: + remote = ET.XML("") + tunnel.append(remote) + remote.text = kvargs[""] + vrfid = tunnel.find("link-vrf-id") + if vrfid != None: + tunnel.remove(vrfid) + if "" in kvargs: + vrfid = ET.XML("") + tunnel.append(vrfid) + vrfid.text = str(kvargs[""]) + +def key_cb(cmdline, kvargs): + tunnel = cur_xmlconf.find("encapsulation/gre-tunnel") + key = tunnel.find("key") + if key != None: + tunnel.remove(key) + if "disable" in kvargs: + return + key = ET.XML("") + tunnel.append(key) + if "" in kvargs: + key.text = str(kvargs[""]) + +def checksum_cb(cmdline, kvargs): + # XXX handle input/output + tunnel = cur_xmlconf.find("encapsulation/gre-tunnel") + checksum = tunnel.find("checksum") + if checksum != None: + tunnel.remove(checksum) + if "disable" in kvargs: + return + checksum = ET.XML("") + tunnel.append(checksum) + +def exit_cb(cmdline, kvargs): + global cur_xmlconf, xmlconf + cmdline.set_context("main") + cmdline.set_prompt("router{} ") + cur_xmlconf = xmlconf + +gre_ctx = PycolContext([ + CmdBuilder( + expr = "bind [link-vrfid ]", + token_desc = { + "bind": TextCliCmd("bind", help_str = "Bind the tunnel."), + "": IPv4CliCmd(help_str = "The local IPv4 address (A.B.C.D)."), + "": IPv4CliCmd(help_str = "The remote IPv4 address (A.B.C.D)."), + "link-vrfid": TextCliCmd("link-vrfid", help_str = ""), + "": IntCliCmd(help_str = "Link vrfid."), + }, + cb = bind_cb, + help_str = "Bind the GRE tunnel on local and remote addresses." + ), + CmdBuilder( + expr = "checksum|checksum-input|checksum-output enable|disable", + token_desc = { + "checksum": TextCliCmd("checksum", help_str = ""), + "checksum-input": TextCliCmd("checksum-input", help_str = ""), + "checksum-output": TextCliCmd("checksum-output", help_str = ""), + "enable": TextCliCmd("enable", help_str = ""), + "disable": TextCliCmd("disable", help_str = ""), + }, + cb = checksum_cb, + help_str = "Enable or disable GRE checksum." + ), + CmdBuilder( + expr = "key |enable|disable", + token_desc = { + "key": TextCliCmd("key", help_str = ""), + "": IntCliCmd(val_min = 0, val_max = (1<<32) - 1, + help_str = "The GRE key."), + "enable": TextCliCmd("enable", help_str = ""), + "disable": TextCliCmd("disable", help_str = ""), + }, + cb = key_cb, + help_str = "Enable or disable GRE checksum." + ), + CmdBuilder( + expr = "exit", + token_desc = { + "exit": TextCliCmd("exit", help_str = "Exit from GRE context."), + }, + cb = exit_cb, + help_str = "Exit from GRE context." + ), + display_cmd, + help_cmd, +]) + +def list_gre_ctx(): + return [ node.text for node in \ + xmlconf.findall("interfaces/interface/encapsulation/gre-tunnel/gre-port") ] + +def enter_gre(ifname): + global xmlconf, cur_xmlconf + for node in xmlconf.findall("interfaces/interface"): + if node.find("name").text == ifname: + cur_xmlconf = node + break + cmdline.set_prompt("router{%s} "%(ifname)) + cmdline.set_context("gre") + +def enter_gre_cb(cmdline, kvargs): + ifname = kvargs[""] + return enter_gre(ifname) + +def create_gre_cb(cmdline, kvargs): + global xmlconf, cur_xmlconf + + # create the context first + ifname = kvargs[""] + xml_str = """ + + {name} + GRE Tunnel + + + {name} + + + + """.format(name = ifname) + node = ET.XML(xml_str) + xmlconf.find("interfaces").append(node) + return enter_gre(ifname) + +def delete_gre(cmdline, kvargs): + interfaces = xmlconf.find("interfaces") + for interface in interfaces.findall("interface"): + if interface.find("name").text == kvargs[""]: + interfaces.remove(interface) + break + +main_ctx = PycolContext([ + CmdBuilder( + expr = "", + token_desc = { + "": ChoiceCliCmd(choice = list_gre_ctx, + help_str = "An existing gre context."), + }, + cb = enter_gre_cb, + help_str = "Enter an existing GRE context." + ), + CmdBuilder( + expr = "", + token_desc = { + "": RegexpCliCmd("gre[0-9]+", + help_str = "A new gre context (format " + + "is gre[0-9]+)."), + }, + cb = create_gre_cb, + help_str = "Create and enter a new GRE context." + ), + CmdBuilder( + expr = "delete ", + token_desc = { + "delete": TextCliCmd("delete", help_str = ""), + "": ChoiceCliCmd(choice = list_gre_ctx, + help_str = "An existing gre context."), + }, + cb = delete_gre, + help_str = "Delete a GRE context." + ), + display_cmd, + help_cmd, +]) + + +cmdline = pycol.Cmdline() +cmdline.add_context("main", main_ctx) +cmdline.add_context("gre", gre_ctx) +cmdline.set_context("main") +cmdline.set_prompt("router{} ") +cmdline.input_loop() diff --git a/pycol.py b/pycol.py new file mode 100644 index 0000000..0f483ca --- /dev/null +++ b/pycol.py @@ -0,0 +1,138 @@ +#!/usr/bin/python3 + +import readline +import clicmd +import shlex +import sys +import traceback + +class Cmdline(object): + def __init__(self): + self.contexts = {} + self.prompt = "> " + self.context = None + self.completions = [] + + # Call the completer when tab is hit + readline.set_completer(self.complete) + readline.parse_and_bind('tab: complete') + + readline.set_completion_display_matches_hook(self.display_matches) + + # remove some word breaks + delims = ' \t\n' + readline.set_completer_delims(delims) + + def display_matches(self, sustitution, matches, longest_match_length): + print() + for m in matches: + print(" %s"%m) + print("%s%s"%(self.prompt, readline.get_line_buffer()), + end = "", flush = True) + readline.forced_update_display() + + def add_context(self, name, ctx): + assert(not name in self.contexts), "duplicate context name %s"%(name) + self.contexts[name] = ctx + + def set_context(self, name): + self.context = self.contexts[name] + + def set_prompt(self, prompt): + self.prompt = prompt + + def complete(self, text, state): + response = None + + # state is not 0, the list of completions is already stored in + # self.completions[]: just return the next one + if state != 0: + if state >= len(self.completions): + return None + return self.completions[state] + + # else, try to build the completion list + try: + line = readline.get_line_buffer() # full line + end = readline.get_endidx() # cursor + + #print("<%s>"%origline[:end]) + in_buf = line[:end] + + tokens = shlex.split(in_buf, comments = False) # XXX check all calls to shlex + # whitespace after the first token, it means we want to complete + # the second token + if len(tokens) == 0 or not in_buf.endswith(tokens[-1]): + tokens.append("") + completions = self.context.complete(tokens) + + # Build the completion list in the readline format (a list of + # string). It may include the help if tab is pressed twice. + self.completions = [] + completion_type = readline.get_completion_type() + # completion_key = readline.get_completion_invoking_key() # XXX + completion_key = 0 + if chr(completion_type) == "?": + for c in completions: + help_str = c.cmd.get_help() + if c.token == "": + self.completions.append(c.cmd.to_expr() + ": " + help_str) + elif help_str == "": + self.completions.append(c.token) + else: + self.completions.append(c.token + ": " + help_str) + # check if it matches the command, in this case display [return] + # XXX does it work well? + result = None + for cmd in self.context: + result = cmd.match(tokens) + if result != None: + self.completions.append("[return]") + break + else: + for c in completions: + if c.token == "": + continue + if c.terminal == True: + self.completions.append(c.token + " ") + else: + self.completions.append(c.token) + + if len(self.completions) == 0: + return None + + return self.completions[0] + + except: + traceback.print_exc() + + return None + + def input_loop(self): + line = '' + while line != 'stop': + try: + line = input(self.prompt) + except KeyboardInterrupt: + print() + continue + except EOFError: + print() + return + tokens = shlex.split(line, comments = True) # XXX + if len(tokens) == 0: + continue + assert(self.context != None), "context not set, use set_context()" + result = None + for cmd in self.context: + result = cmd.match(tokens) + if result != None: + break + if result == None: + print("Invalid command or syntax error") + else: + assert(cmd.cb != None), \ + "no callback function for %s"%(cmd) + cmd.cb(self, result) + + diff --git a/token_parser.py b/token_parser.py new file mode 100644 index 0000000..13a0fdb --- /dev/null +++ b/token_parser.py @@ -0,0 +1,47 @@ +#!/usr/bin/python3 + +import shlex +from io import StringIO + +class SHL(shlex.shlex): + def __init__(self, *args, **kvargs): + shlex.shlex.__init__(self, *args, **kvargs) + def read_token(self, *args, **kvargs): + x = shlex.shlex.read_token(self, *args, **kvargs) + print("read_token <%s>"%x) + return x + def get_token(self, *args, **kvargs): + x = shlex.shlex.get_token(self, *args, **kvargs) + print("get_token <%s>"%x) + return x + def __next__(self, *args, **kvargs): + x = shlex.shlex.__next__(self, *args, **kvargs) + print("__next__ <%s>"%x) + return x + +class XIO(StringIO): + def __init__(self, *args, **kvargs): + StringIO.__init__(self, *args, **kvargs) + def readline(self, *args, **kvargs): + print("readline") + return StringIO.readline(self, *args, **kvargs) + def read(self, *args, **kvargs): + x = StringIO.readline(self, *args, **kvargs) + print("read <%s>"%x) + return x + +s = SHL("", posix=True) +s.debug = 1 +s.whitespace_split = True + +s.state = ' ' +s.push_source(XIO("xx xx")) +print("------------- %s"%list(s)) + +s.state = ' ' +s.push_source(XIO("yy yy")) +print("------------- %s"%list(s)) + +s.state = ' ' +s.push_source(XIO('a "c cds cds" d')) +print("------------- %s"%list(s))