--- /dev/null
+#!/usr/bin/python3
+
+# todo
+# - command name
+# - better parser
+# - readline: http://bip.weizmann.ac.il/course/python/PyMOTW/PyMOTW/docs/readline/index.html
+# - expression parser
+# - more commands (int, ip, ether, date, file, ...)
+# - how to specify help for commands?
+# - gerer les guillemets, les retours chariot, les espaces
+# - to_str() -> repr() ?
+# - ambiguous match as an exception?
+# - use logging module
+# - cmd -> subcmd
+# - should display command help string if no help for token: how to do it
+
+"""
+match object:
+
+show interface <ethif>|all [detailed]
+ ethif: EthIfCmd()
+
+show interface all
+ { "show": "show",
+ "interface": "interface"
+ "all": "all" }
+
+show interface eth0 detailed
+ { "show": "show",
+ "interface": "interface",
+ "ethif": "eth0",
+ "detailed": "detailed" }
+
+
+
+filter [in <in-iface>]/[out <out-iface>]/[proto ipv4]/[proto ipv6] drop|pass
+ in-iface: EthIfCmd()
+ out-iface: EthIfCmd()
+
+
+Les tokens nommés sont remplis dans un dictionnaire
+Impossible d'avoir 2 fois le même nom, a verifier dans le createur de commande
+ -> faux. exemple au dessus.
+
+Sinon comment gerer le ambiguous match?
+
+
+
+
+complete object:
+
+Pour chaque completion possible (meme vide ?), il faut retourner:
+ - la chaine du token complet
+ - l'aide du token (ou l'objet)
+ - la commande ? (ou alors ça c'est géré dans la partie rdline)
+
+
+
+
+
+legacy = blabla VERSION{1,2}
+new = blabla wpa1/wpa2
+
+legacy = blublu IFACE{1,*}
+new = blublu <interfaces-list>
+
+
+pour la définition d'un token, on pourrait avoir une fonction
+display() qui permettrait d'afficher (<iface1> [<iface2> ...])
+a la place de <interfaces-list>
+(ou pas)
+
+
+"""
+
+import shlex, os
+import expparser
+import re
+import socket
+
+_PYCOL_PKG_DIR = os.path.dirname(__file__)
+DEBUG = False
+
+def debug(*args, **kvargs):
+ if DEBUG:
+ print(*args, **kvargs)
+
+class PycolMatch(dict):
+ def __init__(self):
+ pass
+ def __repr__(self):
+ return "<Match %s>"%(dict.__repr__(self))
+ def merge(self, match_res):
+ for k in match_res:
+ self[k] = match_res[k]
+
+# XXX describe that it's a list of tuples
+class PycolCompleteList(list):
+ def __init__(self, l = []):
+ if not isinstance(l, list):
+ l = [l]
+ list.__init__(self, l)
+ def __repr__(self):
+ return "<CompleteList %s>"%(list.__repr__(self))
+ def merge(self, comp_res):
+ for e in comp_res:
+ if e in self:
+ continue
+ self.append(e)
+ # XXX own(): keep the list but set token ref
+
+class PycolComplete(object):
+ def __init__(self, token, cmd, terminal = True):
+ self.token = token
+ self.cmd = cmd
+ self.terminal = terminal
+
+class PycolContext(list):
+ def __init__(self, l = []):
+ list.__init__(self, l)
+ def complete(self, tokens):
+ res = PycolCompleteList()
+ for cmd in self:
+ res2 = cmd.complete(tokens)
+ res.merge(res2)
+ return res
+
+class CliCmd:
+ def __init__(self, key = None, cb = None, help_str = None):
+ self.key = key
+ assert(key == None or type(key) is str), "key is not a string: %s"%(key)
+ self.cb = cb
+ self.help_str = help_str
+ assert(help_str == None or type(help_str) is str), \
+ "help_str is not a string: %s"%(help_str)
+ self.default_help_str = "no help"
+
+ def match(self, tokens):
+ """return a match object
+ len(tokens) always >= 1
+ the last token is the one to complete (may be empty)
+ the first tokens are the ones to match
+ """
+ return PycolMatch()
+
+ def complete(self, tokens):
+ # XXX return None when no completion
+ # does not match
+ m = self.match(tokens)
+ if m != None or (len(tokens) == 1 and tokens[0] == ""):
+ return PycolCompleteList(PycolComplete(tokens[0], self))
+ return PycolCompleteList()
+
+ def to_expr(self):
+ if self.key != None:
+ return self.key
+ else:
+ return str(self)
+
+ def test_match(self, in_buf):
+ tokens = shlex.split(in_buf, comments = True)
+ match_result = self.match(tokens)
+ debug("test match: %s %s"%(tokens, match_result))
+ if match_result == None:
+ return False
+ else:
+ return True
+
+ # quick test for assert
+ def test_complete(self, in_buf):
+ complete_result = PycolCompleteList()
+ tokens = shlex.split(in_buf, comments = False)
+ # whitespace after the first token, it means we want to complete
+ # the second token
+ debug("test_complete: %s", in_buf)
+ if len(tokens) == 0 or not in_buf.endswith(tokens[-1]):
+ tokens.append("")
+ complete_result = self.complete(tokens)
+ debug("test complete: %s %s"%(tokens, complete_result))
+ return list(map(lambda x:x.token, complete_result))
+
+ def __repr__(self):
+ return "<CliCmd>"
+
+ # XXX say that it can return a multiple lines string
+ def get_help(self):
+ if self.help_str == None:
+ return self.default_help_str
+ else:
+ return self.help_str
+
+class CmdBuilder(CliCmd):
+ def __init__(self, expr, token_desc, key = None, cb = None, help_str = None):
+ super().__init__(key = key, cb = cb, help_str = help_str)
+ assert(isinstance(expr,str)), "expr must be a string"
+ assert(type(token_desc) is dict), "token_desc must be a dict"
+ self.token_desc = token_desc
+ parser = expparser.PycolExprParser()
+ tokens = parser.tokenize(expr)
+ expr = parser.parse(tokens)
+ debug(expr.to_str())
+ self.token_desc_list = []
+ self.cmd = self.__expr2clicmd(expr, token_desc)
+
+ def match(self, tokens):
+ m = self.cmd.match(tokens)
+ if m != None and self.key != None:
+ m[self.key] = " ".join(tokens)
+ return m
+
+ def complete(self, tokens):
+ return self.cmd.complete(tokens)
+
+ def to_expr(self):
+ return self.cmd.to_expr()
+
+ def __repr__(self):
+ return "<CmdBuilder(%s)>"%(self.cmd)
+
+ def __expr2clicmd(self, expr, token_desc):
+ l = []
+ for c in expr.children:
+ l.append(self.__expr2clicmd(c, token_desc))
+ if expr.is_var():
+ varname = expr.op
+ assert(varname in token_desc)
+ cmd = token_desc[varname]
+ cmd.key = varname
+ # keep the order of the variables for help displaying
+ self.token_desc_list.append(varname)
+ elif expr.op == " ":
+ cmd = SeqCliCmd(l)
+ elif expr.op == "|":
+ cmd = OrCliCmd(l)
+ elif expr.op == "[":
+ cmd = OptionalCliCmd(l[0])
+ elif expr.op == "(":
+ cmd = BypassCliCmd(l[0])
+ #elif expr.op == " ": #XXX
+ return cmd
+
+ def get_help(self):
+ if self.help_str == None:
+ help_str = self.default_help_str
+ else:
+ help_str = self.help_str
+ for t in self.token_desc_list:
+ help_str += "\n %s: %s"%(t, self.token_desc[t].get_help())
+ return help_str
+
+ def to_expr(self):
+ return self.cmd.to_expr()
+
+class TextCliCmd(CliCmd):
+ def __init__(self, text, key = None, cb = None, help_str = None):
+ super().__init__(key = key, cb = cb, help_str = help_str)
+ assert(not " " in text) # XXX find better?
+ self.text = text
+ debug("TextCliCmd(%s)"%text)
+
+ def match(self, tokens):
+ if tokens == [self.text]:
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = self.text
+ return res
+ return None
+
+ def complete(self, tokens):
+ # more than one token as input, the string does not match
+ if len(tokens) > 1:
+ return PycolCompleteList()
+ # the beginning of the string does not match
+ if len(tokens) == 1 and not self.text.startswith(tokens[0]):
+ return PycolCompleteList()
+ # complete
+ return PycolCompleteList(PycolComplete(self.text, self))
+
+ def to_expr(self):
+ return self.text
+
+ def __repr__(self):
+ return "<TextCliCmd(%s)>"%(self.text)
+
+
+text_cmd = TextCliCmd("toto")
+
+assert(text_cmd.test_match("") == False)
+assert(text_cmd.test_match("tot") == False)
+assert(text_cmd.test_match("toto toto") == False)
+assert(text_cmd.test_match(" toto") == True)
+assert(text_cmd.test_match("toto ") == True)
+assert(text_cmd.test_match(" toto \n\n") == True)
+assert(text_cmd.test_match("toto # coin") == True)
+assert(text_cmd.test_match("toto") == True)
+
+assert(text_cmd.test_complete("") == ["toto"])
+assert(text_cmd.test_complete("to") == ["toto"])
+assert(text_cmd.test_complete("tot") == ["toto"])
+assert(text_cmd.test_complete(" tot") == ["toto"])
+assert(text_cmd.test_complete("toto") == ["toto"])
+assert(text_cmd.test_complete("d") == [])
+assert(text_cmd.test_complete("toto ") == [])
+assert(text_cmd.test_complete("toto#") == [])
+
+class SeqCliCmd(CliCmd):
+ def __init__(self, cmdlist, key = None):
+ self.cmdlist = cmdlist
+ self.key = key
+ debug("SeqCliCmd(%s)"%cmdlist)
+
+ def match(self, tokens):
+ # only one command, try to match all tokens
+ if len(self.cmdlist) == 1:
+ m = self.cmdlist[0].match(tokens)
+ if m != None and self.key != None:
+ m[self.key] = " ".join(tokens)
+ return m
+ # several commands, try to match the first command with 1 to N tokens,
+ # and do a recursive call for the following
+ n_match = 0
+ ret = PycolMatch()
+ for i in range(len(tokens)+1):
+ m = self.cmdlist[0].match(tokens[:i])
+ if m == None:
+ continue
+ m2 = SeqCliCmd(self.cmdlist[1:]).match(tokens[i:])
+ if m2 == None:
+ continue
+ if n_match >= 1:
+ EXC()
+ ret.merge(m)
+ ret.merge(m2)
+ if self.key != None:
+ ret[self.key] = " ".join(tokens[:i])
+ n_match += 1
+ if n_match > 0:
+ return ret
+ return None
+
+ def complete(self, tokens):
+ debug("----------complete seq <%s>"%tokens)
+
+ match_tokens = tokens[:-1]
+ complete_token = tokens[-1]
+
+ # there is no match_token (only whitespaces), try to complete the first
+ # command
+ if len(match_tokens) == 0:
+ return self.cmdlist[0].complete(tokens)
+
+ debug("match_tokens=<%s> complete_token=<%s>"%(match_tokens, complete_token))
+ res = PycolCompleteList()
+
+ # try to match match_tokens with 1 to N commands
+ for i in range(1, len(self.cmdlist)):
+ # if it does not match, continue
+ if SeqCliCmd(self.cmdlist[0:i]).match(match_tokens) == None:
+ continue
+ # XXX res.own()
+ # if it matches, try to complete the last token
+ res2 = self.cmdlist[i].complete([complete_token])
+ res.merge(res2)
+
+ return res
+
+ def to_expr(self):
+ return " ".join([c.to_expr() for c in self.cmdlist])
+
+ def __repr__(self):
+ return "<SeqCliCmd(%s)>"%(str(self.cmdlist))
+
+seq_cmd = SeqCliCmd([TextCliCmd("toto"), TextCliCmd("titi"), TextCliCmd("tutu")])
+assert(seq_cmd.test_match("") == False)
+assert(seq_cmd.test_match("toto") == False)
+assert(seq_cmd.test_match("titi") == False)
+assert(seq_cmd.test_match("toto d") == False)
+assert(seq_cmd.test_match("toto # titi tutu") == False)
+assert(seq_cmd.test_match("toto titi tutu") == True)
+assert(seq_cmd.test_match(" toto titi tutu") == True)
+assert(seq_cmd.test_match("toto titi tutu #") == True)
+assert(seq_cmd.test_match("toto titi tutu") == True)
+
+assert(seq_cmd.test_complete("") == ["toto"])
+assert(seq_cmd.test_complete("toto ") == ["titi"])
+assert(seq_cmd.test_complete("toto t") == ["titi"])
+assert(seq_cmd.test_complete("toto") == ["toto"])
+assert(seq_cmd.test_complete("titi") == [])
+assert(seq_cmd.test_complete("toto d") == [])
+assert(seq_cmd.test_complete("toto # titi tutu") == [])
+assert(seq_cmd.test_complete("toto titi tut") == ["tutu"])
+assert(seq_cmd.test_complete(" toto titi tut") == ["tutu"])
+assert(seq_cmd.test_complete("toto titi tutu #") == [])
+assert(seq_cmd.test_complete("toto titi tutu") == ["tutu"])
+
+
+class OrCliCmd(CliCmd):
+ def __init__(self, cmdlist, key = None):
+ self.cmdlist = cmdlist
+ self.key = key
+ debug("OrCliCmd(%s)"%cmdlist)
+
+ def match(self, tokens):
+ # try to match all commands
+ for cmd in self.cmdlist:
+ ret = cmd.match(tokens)
+ if ret == None:
+ continue
+ if self.key != None:
+ ret[self.key] = " ".join(tokens)
+ return ret
+ return None
+
+ def complete(self, tokens):
+ debug("----------complete or <%s>"%tokens)
+
+ res = PycolCompleteList()
+ for cmd in self.cmdlist:
+ res2 = cmd.complete(tokens)
+ res.merge(res2)
+
+ return res
+
+ def to_expr(self):
+ return "|".join([c.to_expr() for c in self.cmdlist])
+
+ def __repr__(self):
+ return "<OrCliCmd(%s)>"%(str(self.cmdlist))
+
+or_cmd = OrCliCmd([TextCliCmd("toto"), TextCliCmd("titi"), TextCliCmd("tutu")])
+assert(or_cmd.test_match("") == False)
+assert(or_cmd.test_match("toto") == True)
+assert(or_cmd.test_match("titi") == True)
+assert(or_cmd.test_match("tutu") == True)
+assert(or_cmd.test_match(" toto # titi tutu") == True)
+assert(or_cmd.test_match("toto titi tutu") == False)
+assert(or_cmd.test_match(" toto t") == False)
+assert(or_cmd.test_match("toto d#") == False)
+
+assert(or_cmd.test_complete("") == ["toto", "titi", "tutu"])
+assert(or_cmd.test_complete("t") == ["toto", "titi", "tutu"])
+assert(or_cmd.test_complete("to") == ["toto"])
+assert(or_cmd.test_complete(" tot") == ["toto"])
+assert(or_cmd.test_complete(" titi") == ["titi"])
+assert(or_cmd.test_complete(" titi to") == [])
+assert(or_cmd.test_complete("titid") == [])
+assert(or_cmd.test_complete(" ti#") == [])
+
+
+
+class OptionalCliCmd(CliCmd):
+ def __init__(self, cmd, key = None):
+ self.cmd = cmd
+ self.key = key
+
+ def match(self, tokens):
+ # match an empty buffer
+ if len(tokens) == 0:
+ ret = PycolMatch()
+ if self.key != None:
+ ret[self.key] = ""
+ return ret
+ # else, try to match sub command
+ ret = self.cmd.match(tokens)
+ if ret != None and self.key != None:
+ ret[self.key] = " ".join(tokens)
+ return ret
+
+ def complete(self, tokens):
+ debug("----------complete optional <%s>"%tokens)
+ return self.cmd.complete(tokens)
+
+ def to_expr(self):
+ return "[" + self.cmd.to_expr() + "]"
+
+ def __repr__(self):
+ return "<OptionalCliCmd(%s)>"%(self.cmd)
+
+opt_cmd = OptionalCliCmd(TextCliCmd("toto"))
+assert(opt_cmd.test_match("") == True)
+assert(opt_cmd.test_match("toto") == True)
+assert(opt_cmd.test_match(" toto ") == True)
+assert(opt_cmd.test_match(" toto # tutu") == True)
+assert(opt_cmd.test_match("titi") == False)
+assert(opt_cmd.test_match("toto titi") == False)
+assert(opt_cmd.test_match(" toto t") == False)
+assert(opt_cmd.test_match("toto d#") == False)
+
+assert(opt_cmd.test_complete("") == ["toto"])
+assert(opt_cmd.test_complete("t") == ["toto"])
+assert(opt_cmd.test_complete("to") == ["toto"])
+assert(opt_cmd.test_complete(" tot") == ["toto"])
+assert(opt_cmd.test_complete(" ") == ["toto"])
+assert(opt_cmd.test_complete(" titi to") == [])
+assert(opt_cmd.test_complete("titid") == [])
+assert(opt_cmd.test_complete(" ti#") == [])
+
+class BypassCliCmd(CliCmd):
+ def __init__(self, cmd, key = None):
+ self.cmd = cmd
+ self.key = key
+
+ def match(self, tokens):
+ m = self.cmd.match(tokens)
+ if m != None and self.key != None:
+ m[self.key] = " ".join(tokens)
+ return m
+
+ def complete(self, tokens):
+ return self.cmd.complete(tokens)
+
+ def to_expr(self):
+ return "(" + self.cmd.to_expr() + ")"
+
+ def __repr__(self):
+ return "<BypassCliCmd(%s)>"%(self.cmd)
+
+
+class AnyTextCliCmd(CliCmd):
+ def __init__(self, key = None, help_str = None):
+ debug("AnyTextCliCmd()")
+ self.key = key
+ # XXX factorize all help and key in mother class?
+ if help_str != None:
+ self.help_str = help_str
+ else:
+ self.help_str = "Anytext token"
+
+ def match(self, tokens):
+ debug("----------match anytext <%s>"%tokens)
+ if len(tokens) != 1:
+ return None
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = tokens[0]
+ return res
+
+ def complete(self, tokens):
+ debug("----------complete anytext <%s>"%tokens)
+ # does not match
+ if len(tokens) != 1:
+ return PycolCompleteList()
+ # match, but cannot complete
+ return PycolCompleteList(PycolComplete(tokens[0], self))
+
+ def to_expr(self):
+ return "<anytext>" # XXX
+
+ def __repr__(self):
+ return "<AnyTextCliCmd()>"
+
+ def get_help(self):
+ return self.help_str
+
+
+anytext_cmd = AnyTextCliCmd()
+
+assert(anytext_cmd.test_match("") == False)
+assert(anytext_cmd.test_match("toto toto") == False)
+assert(anytext_cmd.test_match(" toto") == True)
+assert(anytext_cmd.test_match("toto ") == True)
+assert(anytext_cmd.test_match(" toto \n\n") == True)
+assert(anytext_cmd.test_match("toto # coin") == True)
+assert(anytext_cmd.test_match("toto") == True)
+
+assert(anytext_cmd.test_complete("") == [""])
+assert(anytext_cmd.test_complete("to") == ["to"])
+assert(anytext_cmd.test_complete("tot") == ["tot"])
+assert(anytext_cmd.test_complete(" tot") == ["tot"])
+assert(anytext_cmd.test_complete("toto") == ["toto"])
+#assert(anytext_cmd.test_complete("toto#") == []) #XXX later
+
+
+
+
+class IntCliCmd(CliCmd):
+ def __init__(self, val_min = None, val_max = None, key = None,
+ help_str = None):
+ debug("IntCliCmd()")
+ self.key = key
+ # XXX factorize all help and key in mother class?
+ if help_str != None:
+ self.help_str = help_str
+ else:
+ self.help_str = "Int token"
+ # XXX val_min val_max
+
+ def match(self, tokens):
+ debug("----------match int <%s>"%tokens)
+ if len(tokens) != 1:
+ return None
+ try:
+ val = int(tokens[0])
+ except:
+ return None
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = val
+ return res
+
+ def to_expr(self):
+ return "<int>" # XXX
+
+ def __repr__(self):
+ return "<IntCliCmd()>"
+
+ def get_help(self):
+ return self.help_str
+
+
+int_cmd = IntCliCmd()
+
+assert(int_cmd.test_match("") == False)
+assert(int_cmd.test_match("toto") == False)
+assert(int_cmd.test_match(" 13") == True)
+assert(int_cmd.test_match("-12342 ") == True)
+assert(int_cmd.test_match(" 33 \n\n") == True)
+assert(int_cmd.test_match("1 # coin") == True)
+assert(int_cmd.test_match("0") == True)
+
+assert(int_cmd.test_complete("") == [""])
+assert(int_cmd.test_complete("1") == ["1"])
+assert(int_cmd.test_complete("12") == ["12"])
+assert(int_cmd.test_complete(" ee") == [])
+
+
+class RegexpCliCmd(CliCmd):
+ def __init__(self, regexp, cmd = None, key = None,
+ store_re_match = False, help_str = None):
+ debug("RegexpCliCmd()")
+ self.regexp = regexp
+ self.key = key
+ self.cmd = cmd
+ self.store_re_match = store_re_match
+ # XXX factorize all help and key in mother class?
+ if help_str != None:
+ self.help_str = help_str
+ else:
+ self.help_str = "Regexp token"
+
+ def match(self, tokens):
+ debug("----------match regexp <%s>"%tokens)
+ if len(tokens) != 1:
+ return None
+ m = re.fullmatch(self.regexp, tokens[0])
+ if m == None:
+ return None
+ if self.cmd != None:
+ res = self.cmd.match(tokens)
+ if res == None:
+ return None
+ else:
+ res = PycolMatch()
+ if self.key:
+ if self.store_re_match:
+ res[self.key] = m
+ else:
+ res[self.key] = tokens[0]
+ return res
+
+ def complete(self, tokens):
+ if self.cmd == None:
+ return CliCmd.complete(self, tokens)
+ res = PycolCompleteList()
+ completions = self.cmd.complete(tokens)
+ for c in completions:
+ if self.match([c[0]]):
+ res.append(PycolComplete(c[0], self))
+ return res
+
+ def __repr__(self):
+ return "<RegexpCliCmd()>"
+
+ def get_help(self):
+ return self.help_str
+
+
+regexp_cmd = RegexpCliCmd("x[123]")
+
+assert(regexp_cmd.test_match("") == False)
+assert(regexp_cmd.test_match("toto") == False)
+assert(regexp_cmd.test_match(" x1") == True)
+assert(regexp_cmd.test_match("x3 ") == True)
+assert(regexp_cmd.test_match(" x2\n\n") == True)
+assert(regexp_cmd.test_match("x1 # coin") == True)
+
+assert(regexp_cmd.test_complete("") == [""]) # XXX handle this case at upper level?
+assert(regexp_cmd.test_complete("x1") == ["x1"])
+assert(regexp_cmd.test_complete(" x2") == ["x2"])
+assert(regexp_cmd.test_complete(" ee") == [])
+
+
+
+class IPv4CliCmd(CliCmd):
+ def __init__(self, cmd = None, key = None, help_str = None):
+ debug("IPv4CliCmd()")
+ self.key = key
+ self.cmd = cmd
+ # XXX factorize all help and key in mother class?
+ if help_str != None:
+ self.help_str = help_str
+ else:
+ self.help_str = "IPv4 token"
+
+ def match(self, tokens):
+ debug("----------match IPv4 <%s>"%tokens)
+ if len(tokens) != 1:
+ return None
+ try:
+ val = socket.inet_aton(tokens[0])
+ except:
+ return None
+ if len(tokens[0].split(".")) != 4:
+ return None
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = tokens[0]
+ return res
+
+ def complete(self, tokens):
+ if self.cmd == None:
+ return CliCmd.complete(self, tokens)
+ res = PycolCompleteList()
+ completions = self.cmd.complete(tokens)
+ for c in completions:
+ if self.match([c[0]]):
+ res.append(PycolComplete(c[0], self))
+ return res
+
+ def to_expr(self):
+ return "<IPv4>" # XXX
+
+ def __repr__(self):
+ return "<IPv4CliCmd()>"
+
+ def get_help(self):
+ return self.help_str
+
+
+ipv4_cmd = IPv4CliCmd()
+
+assert(ipv4_cmd.test_match("") == False)
+assert(ipv4_cmd.test_match("toto") == False)
+assert(ipv4_cmd.test_match(" 13.3.1.3") == True)
+assert(ipv4_cmd.test_match("255.255.255.255 ") == True)
+assert(ipv4_cmd.test_match(" 0.0.0.0 \n\n") == True)
+assert(ipv4_cmd.test_match("1.2.3.4 # coin") == True)
+assert(ipv4_cmd.test_match("300.2.2.2") == False)
+
+assert(ipv4_cmd.test_complete("") == [""])
+assert(ipv4_cmd.test_complete("1.2.3.4") == ["1.2.3.4"])
+assert(ipv4_cmd.test_complete(" ee") == [])
+
+
+
+
+class FileCliCmd(CliCmd):
+ def __init__(self, key = None, help_str = None):
+ debug("FileCliCmd()")
+ self.key = key
+ # XXX factorize all help and key in mother class?
+ if help_str != None:
+ self.help_str = help_str
+ else:
+ self.help_str = "File token"
+
+ def match(self, tokens):
+ debug("----------match File <%s>"%tokens)
+ if len(tokens) != 1:
+ return None
+ if not os.path.exists(tokens[0]):
+ return None
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = tokens[0]
+ return res
+
+ def complete(self, tokens):
+ debug("----------complete File <%s>"%tokens)
+ res = PycolCompleteList()
+ if len(tokens) != 1:
+ return res
+ dirname = os.path.dirname(tokens[0])
+ basename = os.path.basename(tokens[0])
+ if dirname != "" and not os.path.exists(dirname):
+ return res
+ debug("dirname=%s basename=%s"%(dirname, basename))
+ if dirname == "":
+ ls = os.listdir()
+ else:
+ ls = os.listdir(dirname)
+ debug("ls=%s"%(str(ls)))
+ for f in ls:
+ path = os.path.join(dirname, f)
+ debug(path)
+ if path.startswith(tokens[0]):
+ if os.path.isdir(path):
+ path += "/"
+ res.append(PycolComplete(path, self, terminal = False))
+ else:
+ res.append(PycolComplete(path, self, terminal = True))
+
+ debug(res)
+ return res
+
+ def to_expr(self):
+ return "<File>" # XXX
+
+ def __repr__(self):
+ return "<FileCliCmd()>"
+
+ def get_help(self):
+ return self.help_str
+
+
+file_cmd = FileCliCmd()
+
+assert(file_cmd.test_match("") == False)
+assert(file_cmd.test_match("DEDEDzx") == False)
+assert(file_cmd.test_match("/tmp") == True)
+assert(file_cmd.test_match("/etc/passwd ") == True)
+
+assert(file_cmd.test_complete("/tm") == ["/tmp/"])
+assert(file_cmd.test_complete(" eededezezzzc") == [])
+
+
+
+
+class ChoiceCliCmd(CliCmd):
+ def __init__(self, choice, key = None, cb = None, help_str = None):
+ super().__init__(key = key, cb = cb, help_str = help_str)
+ if isinstance(choice, str):
+ self.get_list_func = lambda:[choice]
+ elif isinstance(choice, list):
+ self.get_list_func = lambda:choice
+ else: # XXX isinstance(func)
+ self.get_list_func = choice
+ debug("ChoiceCliCmd(%s)"%self.get_list_func)
+
+ def match(self, tokens):
+ # more than one token as input, does not match
+ if len(tokens) > 1:
+ return None
+ l = self.get_list_func()
+ for e in l:
+ if tokens == [e]:
+ res = PycolMatch()
+ if self.key:
+ res[self.key] = e
+ return res
+ return None
+
+ def complete(self, tokens):
+ # more than one token as input, does not match
+ if len(tokens) > 1:
+ return PycolCompleteList()
+ l = self.get_list_func()
+ complete = PycolCompleteList()
+ for e in l:
+ # the beginning of the string does not match
+ if len(tokens) == 1 and not e.startswith(tokens[0]):
+ continue
+ complete.append(PycolComplete(e, self))
+ return complete
+
+ def __repr__(self):
+ return "<ChoiceCliCmd(%s)>"%(self.get_list_func)
+
+
+choice_cmd = ChoiceCliCmd(["toto", "titi"])
+
+assert(choice_cmd.test_match("") == False)
+assert(choice_cmd.test_match("tot") == False)
+assert(choice_cmd.test_match("toto toto") == False)
+assert(choice_cmd.test_match(" toto") == True)
+assert(choice_cmd.test_match("titi ") == True)
+assert(choice_cmd.test_match(" toto \n\n") == True)
+assert(choice_cmd.test_match("toto # coin") == True)
+assert(choice_cmd.test_match("toto") == True)
+
+assert(choice_cmd.test_complete("") == ["toto", "titi"])
+assert(choice_cmd.test_complete("to") == ["toto"])
+assert(choice_cmd.test_complete("tot") == ["toto"])
+assert(choice_cmd.test_complete(" tot") == ["toto"])
+assert(choice_cmd.test_complete("toto") == ["toto"])
+assert(choice_cmd.test_complete("d") == [])
+assert(choice_cmd.test_complete("toto ") == [])
+assert(choice_cmd.test_complete("toto#") == [])
+
+
+if __name__ == '__main__':
+ # XXX add tests
+ cmd = CmdBuilder(
+ "toto a|b [coin] [bar]",
+ token_desc = {
+ "toto": TextCliCmd("toto", help_str = "help for toto"),
+ "a": TextCliCmd("a", help_str = "help for a"),
+ "b": TextCliCmd("b", help_str = "help for b"),
+ "coin": TextCliCmd("coin", help_str = "help for coin"),
+ "bar": TextCliCmd("bar", help_str = "help for bar"),
+ }
+ )
+ print(cmd.to_expr())
+ assert(cmd.test_match("toto a") == True)
+ assert(cmd.test_match("toto b coin") == True)
+ assert(cmd.test_match("toto") == False)
+
+
+
+# float
+
+# file
+
+# mac
+
+# ipv6
+
+# date
--- /dev/null
+#!/usr/bin/python3
+
+import pycol
+from clicmd import *
+import sys
+import traceback
+import textwrap
+
+def callback(cmdline, kvargs):
+ print("match: %s"%(str(kvargs)))
+
+normal = "\033[0m"
+bold = "\033[1m"
+
+ip_list = []
+
+def help_callback(cmdline, kvargs):
+ wrapper = textwrap.TextWrapper(initial_indent=" ",
+ subsequent_indent=" ")
+ wrapper2 = textwrap.TextWrapper(initial_indent=" ",
+ subsequent_indent=" ")
+ for c in cmdline.context:
+ # XXX if terminal supports it, use bold and italic
+ print("%s%s%s"%(bold, c.to_expr(), normal))
+ lines = c.get_help().split("\n")
+ print("\n".join(wrapper.wrap(lines[0])))
+ for l in lines[1:]:
+ print("\n".join(wrapper2.wrap(l)))
+ # XXX use a pager
+
+def ip_callback_add(cmdline, kvargs):
+ global ip_list
+ if kvargs["<ip>"] in ip_list:
+ print("already in list")
+ return
+ ip_list.append(kvargs["<ip>"])
+
+def ip_callback_del(cmdline, kvargs):
+ global ip_list
+ ip_list.remove(kvargs["<saved-ip>"])
+
+def ip_callback_info(cmdline, kvargs):
+ global ip_list
+ if "<saved-ip>" in kvargs:
+ print("%s is in list"%kvargs["<saved-ip>"])
+ else:
+ print("%s is not in list"%kvargs["<ip>"])
+
+ctx = PycolContext([
+ CmdBuilder(
+ expr = "toto a|b [coin] [bar]",
+ token_desc = {
+ "toto": TextCliCmd("toto", help_str = "help for toto"),
+ "a": TextCliCmd("a", help_str = "help for a"),
+ "b": TextCliCmd("b", help_str = "help for b"),
+ "coin": TextCliCmd("coin", help_str = "help for coin"),
+ "bar": TextCliCmd("bar", help_str = "help for bar"),
+ },
+ cb = callback,
+ help_str = "help for command toto. This is a very very " +
+ "long long help to check that the wrapper is able to " +
+ "wrap the text properly."
+ ),
+
+ CmdBuilder(
+ expr = "titi [pouet] <anytext> <count>",
+ token_desc = {
+ "titi": TextCliCmd("titi", help_str = "help for titi"),
+ "pouet": TextCliCmd("pouet", help_str = "help for pouet"),
+ "<anytext>": AnyTextCliCmd(help_str = "help for anytext"),
+ "<count>": IntCliCmd(help_str = "help for count"),
+ },
+ cb = callback,
+ help_str = "help for command titi"
+ ),
+
+ CmdBuilder(
+ expr = "regexp <re>",
+ token_desc = {
+ "regexp": TextCliCmd("regexp", help_str = "help for regexp"),
+ "<re>": RegexpCliCmd("0x([0-9a-fA-F])", help_str = "an hex number"),
+ },
+ cb = callback,
+ help_str = "help for command regexp"
+ ),
+
+ CmdBuilder(
+ expr = "ip add <ip>",
+ token_desc = {
+ "ip": TextCliCmd("ip", help_str = "help for ip"),
+ "add": TextCliCmd("add", help_str = "help for add"),
+ "<ip>": IPv4CliCmd(help_str = "An IPv4 address. The format of " +
+ "an IPv4 address is A.B.C.D, each letter is a number" +
+ "between 0 and 255."),
+ },
+ cb = ip_callback_add,
+ help_str = "help for command ip"
+ ),
+
+ CmdBuilder(
+ expr = "ip del <saved-ip>",
+ token_desc = {
+ "ip": TextCliCmd("ip", help_str = "help for ip"),
+ "del": TextCliCmd("del", help_str = "help for del"),
+ "<saved-ip>": ChoiceCliCmd(ip_list, help_str = "an IP previously " +
+ "added"),
+ },
+ cb = ip_callback_del,
+ help_str = "help for command ip"
+ ),
+
+ CmdBuilder(
+ expr = "ip info <saved-ip>|<ip>", # XXX talk about prio !
+ token_desc = {
+ "ip": TextCliCmd("ip", help_str = "help for ip"),
+ "info": TextCliCmd("info", help_str = "help for info"),
+ "<saved-ip>": ChoiceCliCmd(ip_list, help_str = "an IP previously " +
+ "added"),
+ "<ip>": IPv4CliCmd(help_str = "An IPv4 address"),
+ },
+ cb = ip_callback_info,
+ help_str = "help for command ip"
+ ),
+
+ CmdBuilder(
+ expr = "file <file>",
+ token_desc = {
+ "file": TextCliCmd("file", help_str = "Help for file. The file " +
+ "can be either a directory or a regular file."),
+ "<file>": FileCliCmd(help_str = "a file"),
+ },
+ cb = callback,
+ help_str = "help for command file"
+ ),
+
+ CmdBuilder(
+ expr = "help",
+ token_desc = {
+ "help": TextCliCmd("help", help_str = "help for help"),
+ },
+ cb = help_callback,
+ help_str = "help for command toto"
+ ),
+])
+
+cmdline = pycol.Cmdline()
+cmdline.add_context("my_context", ctx)
+cmdline.set_context("my_context")
+cmdline.set_prompt("my_context> ")
+cmdline.input_loop()
--- /dev/null
+#!/usr/bin/python3
+
+# todo
+# - better checks before starting to parse (ex: no double spaces ?)
+# - better error messages
+# - document expressions
+# - unit tests
+# - reduce binary tree in tree
+# - handle quotes?
+# - better comments and names (operator is valid for '(' ?)
+# - ignore chars
+# - pylint
+# - tests only if main
+# - "not" operator? useful?
+# - we could completely remove all_ops
+
+DEBUG = False
+
+def debug(*args, **kvargs):
+ if DEBUG:
+ print(*args, **kvargs)
+
+class PycolExprParser(object):
+ def __init__(self, binary_ops = None, unary_ops = None,
+ group_ops = None, var_chars = None):
+ if binary_ops == None:
+ # ordered by priority
+ self.binary_ops = [ "|", "/", " " ]
+ else:
+ self.binary_ops = binary_ops
+ if unary_ops == None:
+ self.unary_ops = [ ]
+ else:
+ self.unary_ops = unary_ops
+ if group_ops == None:
+ self.group_ops = { "(" : ")", "[" : "]" }
+ else:
+ self.group_ops = group_ops
+ if var_chars == None:
+ self.var_chars = "<>-_abcdefghijklmnopqrstuvwxyz" + \
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ else:
+ self.var_chars = var_chars
+
+ self.all_ops = []
+ self.all_ops += self.binary_ops
+ self.all_ops += self.unary_ops
+ self.all_ops += self.group_ops.keys()
+ self.all_ops += self.group_ops.values()
+
+ # return operator priority (high value = high priority)
+ def op_prio(self, op):
+ assert(isinstance(op, str))
+ if not op in self.binary_ops:
+ return 0
+ return self.binary_ops.index(op) + 1
+
+ # The argument tokens is an list of tokens that must start with an
+ # opening group operator, for instance an opening bracket '('. This
+ # function returns the list of tokens representing the inbound
+ # expression without the first opening operator and its corresponding
+ # closing operator
+ def get_group(self, tokens):
+ debug(tokens, self.group_ops.keys())
+ assert(tokens[0] in self.group_ops.keys())
+ stack = []
+
+ for i, t in enumerate(tokens):
+ debug((i,t))
+ if t in self.group_ops.keys():
+ stack.append(t)
+ elif t == self.group_ops[stack[-1]]:
+ stack.pop()
+ if len(stack) == 0:
+ break
+ assert(len(stack) == 0)
+ debug(tokens[1:i])
+ return tokens[1:i]
+
+ # convert a string into a list of tokens
+ def tokenize(self, s):
+ debug("tokenize(%s)"%(s))
+ tokens = []
+ while s != "":
+ # try to parse an operator
+ op_found = False
+ for op in self.all_ops:
+ if s.startswith(op):
+ tokens.append(op)
+ s = s[len(op):]
+ op_found = True
+ break
+ if op_found:
+ continue
+ # try to parse a variable name
+ length = 0
+ for c in s:
+ if c in self.var_chars:
+ length += 1
+ else:
+ break
+ if length == 0:
+ debug(tokens)
+ debug("invalid token")
+ excp()
+ tokens.append(s[:length])
+ s = s[length:]
+ debug("tokenize() return %s"%(tokens))
+ return tokens
+
+ def binary_parse(self, tokens):
+ top = None
+ exp = None
+ debug(tokens)
+ while len(tokens) > 0:
+ t = tokens.pop(0)
+ exp = PycolBinExpr(t, self)
+ debug("token '%s'"%t)
+
+ if t in self.binary_ops:
+ # if it is the first node, error
+ if top == None:
+ debug("expression starts with a binary operator")
+ exce()
+
+ # the rightest leaf must be a variable
+ tmp = top
+ while tmp.right != None:
+ tmp = tmp.right
+ if not tmp.is_var():
+ debug("the rightest leaf must be a variable")
+ exce()
+
+ # if new node priority is higher than root node, it
+ # becomes the new root
+ if self.op_prio(t) > self.op_prio(top.op):
+ exp.left = top
+ top = exp
+ # else, exp is placed at the "rightest" leaf. Check
+ # that blabla XXX
+ else:
+ tmp = top
+ while tmp.right != None and \
+ self.op_prio(exp.op) < self.op_prio(tmp.right.op):
+ tmp = tmp.right
+
+ if tmp.right == None:
+ debug("invalid right node")
+ exce()
+
+ exp.left = tmp.right
+ tmp.right = exp
+
+ elif t in self.group_ops:
+ # if operator is an opening bracket, recursively call
+ # ourself with the sub-expression
+ gr = self.get_group([t] + tokens)
+ exp.right = self.binary_parse(gr[:])
+ if exp.right == None:
+ exce()
+ tokens = tokens[len(gr) + 1:]
+ debug("subexp parsed, tokens=%s"%tokens)
+
+ # if it is the first node
+ if top == None:
+ top = exp;
+ continue
+
+ # exp is placed at the "rightest" leaf. Check
+ # that our parent is not a variable
+ tmp = top
+ while tmp.right != None:
+ tmp = tmp.right
+ if tmp.is_var():
+ debug("the rightest leaf must not be a variable")
+ exce()
+
+ tmp.right = exp
+
+ else: # unary operator or variable
+ debug("unary operator or variable")
+
+ # if it is the first node
+ if top == None:
+ top = exp;
+ continue
+
+ # exp is placed at the "rightest" leaf. Check
+ # that our parent is not a variable
+ tmp = top
+ while tmp.right != None:
+ tmp = tmp.right
+ if tmp.is_var():
+ debug("the rightest leaf must not be a variable")
+ exce()
+
+ tmp.right = exp
+
+ if top == None:
+ debug("no top node")
+ exce()
+
+ # the rightest leaf must be a variable
+ tmp = top
+ while tmp.right != None:
+ tmp = tmp.right
+ if not tmp.is_var():
+ debug("the rightest leaf must be a variable")
+ exce()
+
+ return top
+
+ def parse(self, tokens):
+ bin_expr = self.binary_parse(tokens)
+ expr = PycolExpr(bin_expr)
+ expr.reduce()
+ return expr
+
+class PycolBinExpr(object):
+ def __init__(self, op, parser):
+ debug("PycolBinExpr(%s)"%op)
+ self.op = op
+ self.parser = parser
+ self.left = None
+ self.right = None
+
+ def dump(self, level=0):
+ print("vertice %d %s@0x%x"%(level, self.op, self.__hash__()))
+ if self.left:
+ self.left.dump(level+1)
+ if self.right:
+ self.right.dump(level+1)
+
+ def to_str(self):
+ # it's a variable name
+ if self.is_var():
+ return self.op
+ s = ""
+ # dump left expression if it's a binary operator (in case of
+ # unary operator, we only use the right child)
+ if self.op in self.parser.binary_ops:
+ return "%s%s%s"%(self.left.to_str(), self.op, self.right.to_str())
+
+ # group operator
+ if self.op in self.parser.group_ops.keys():
+ return "%s%s%s"%(self.op, self.right.to_str(),
+ self.parser.group_ops[self.op])
+
+ # unary
+ return "%s%s"%(self.op, self.right)
+
+ def is_var(self):
+ return not self.op in self.parser.all_ops
+
+class PycolExpr(object):
+ def __init__(self, bin_expr):
+ self.op = bin_expr.op
+ self.parser = bin_expr.parser
+ self.children = []
+ if bin_expr.left != None:
+ self.children.append(PycolExpr(bin_expr.left))
+ if bin_expr.right != None:
+ self.children.append(PycolExpr(bin_expr.right))
+
+ def dump(self, level=0):
+ print("vertice %d %s@0x%x"%(level, self.op, self.__hash__()))
+ for c in self.children:
+ c.dump(level+1)
+
+ def to_str(self):
+ # it's a variable name
+ if self.is_var():
+ return self.op
+ s = ""
+
+ # binary operators
+ if self.op in self.parser.binary_ops:
+ for i, c in enumerate(self.children):
+ if i == 0:
+ s += "%s"%(c.to_str())
+ else:
+ s += "%s%s"%(self.op, c.to_str())
+ return s
+
+ # group operator
+ if self.op in self.parser.group_ops.keys():
+ return "%s%s%s"%(self.op, self.children[0].to_str(),
+ self.parser.group_ops[self.op])
+
+ # unary
+ return "%s%s"%(self.op, self.children[0])
+
+ def is_var(self):
+ return not self.op in self.parser.all_ops
+
+ def reduce(self):
+ for c in self.children:
+ c.reduce()
+ children = []
+ for c in self.children:
+ if c.op == self.op:
+ children += c.children
+ else:
+ children += [c]
+ self.children = children
+
+if __name__ == '__main__':
+ s = "[opt1/opt2] (<desdes>|a|(b|c)|[x]) (no bidule)|coin"
+ #s = "x|y"
+ #tokenize(s)
+
+ parser = PycolExprParser()
+ tokens = parser.tokenize(s)
+ e = parser.parse(tokens)
+ print(e.dump())
+ print(e.to_str())
+ e.reduce()
+ print(e.dump())
+ print(e.to_str())
+
+ #print(get_group(tokenize("(fr fr (fr<3>)(xe)) sss")))
+ #print(get_group(tokenize("<(fr fr> (fr<3>)(xe))> sss")))
+
--- /dev/null
+#!/usr/bin/python3
+
+import pycol
+from clicmd import *
+import sys
+import traceback
+import textwrap
+import xml.etree.ElementTree as ET
+import xml.dom.minidom
+import pydoc
+
+default_conf = """
+<configuration name="test_cli">
+ <interfaces>
+ <interface>
+ <name>gre1</name>
+ <description>GRE Tunnel</description>
+ <encapsulation>
+ <gre-tunnel>
+ <gre-port>gre1</gre-port>
+ <local-endpoint>1.1.1.1</local-endpoint>
+ <remote-endpoint>1.1.1.2</remote-endpoint>
+ </gre-tunnel>
+ </encapsulation>
+ </interface>
+ <interface>
+ <name>gre2</name>
+ <description>GRE Tunnel</description>
+ <encapsulation>
+ <gre-tunnel>
+ <gre-port>gre2</gre-port>
+ <local-endpoint>2.2.2.1</local-endpoint>
+ <remote-endpoint>2.2.2.2</remote-endpoint>
+ </gre-tunnel>
+ </encapsulation>
+ </interface>
+ </interfaces>
+</configuration>
+"""
+
+xmlconf = ET.XML(default_conf)
+cur_xmlconf = xmlconf
+ip_list = []
+
+# from http://stackoverflow.com/questions/749796/pretty-printing-xml-in-python
+def xml_indent(elem, level=0):
+ i = "\n" + level*" "
+ if len(elem):
+ if not elem.text or not elem.text.strip():
+ elem.text = i + " "
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ for elem in elem:
+ xml_indent(elem, level+1)
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ else:
+ if level and (not elem.tail or not elem.tail.strip()):
+ elem.tail = i
+
+def text_indent(s, indent = 0, end = "\n"):
+ return " " * indent + s + end
+
+def help_callback(cmdline, kvargs):
+ cmd_wr = textwrap.TextWrapper(initial_indent=" ",
+ subsequent_indent=" ")
+ token_wr = textwrap.TextWrapper(initial_indent=" ",
+ subsequent_indent=" ")
+ s = ""
+ for c in cmdline.context:
+ s += "%s\n"%(c.to_expr())
+ lines = c.get_help().split("\n")
+ # first line is the command + its help
+ s += ("\n".join(cmd_wr.wrap(lines[0]))) + "\n"
+ for l in lines[1:]:
+ s += ("\n".join(token_wr.wrap(l))) + "\n"
+ pydoc.pager(s)
+
+# used in main and gre context
+help_cmd = CmdBuilder(
+ expr = "help",
+ token_desc = {
+ "help": TextCliCmd("help", help_str = "Show the help."),
+ },
+ cb = help_callback,
+ help_str = "help for command toto"
+)
+
+def get_interface_text_conf(conf, indent = 0):
+ s = ""
+ local = conf.find("encapsulation/gre-tunnel/local-endpoint")
+ remote = conf.find("encapsulation/gre-tunnel/remote-endpoint")
+ if local != None and remote != None:
+ bind_str = "bind %s %s"%(local.text, remote.text)
+ vrfid = conf.find("encapsulation/gre-tunnel/link-vrf-id")
+ if vrfid != None:
+ bind_str += " link-vrfid %s"%(vrfid.text)
+ s += text_indent(bind_str, indent)
+ key = conf.find("encapsulation/gre-tunnel/key")
+ if key != None:
+ if key.text == None:
+ s += text_indent("key enable", indent)
+ else:
+ s += text_indent("key %s"%(key.text), indent)
+ checksum = conf.find("encapsulation/gre-tunnel/checksum")
+ if checksum != None:
+ s += text_indent("checksum enable", indent)
+ return s
+
+def get_main_text_conf(conf, indent = 0):
+ s = text_indent("# GRE", indent)
+ for node in xmlconf.findall("interfaces/interface"):
+ s += text_indent(node.find("name").text, indent)
+ s += get_interface_text_conf(node, indent=(indent + 4))
+ return s
+
+def display_cb(cmdline, kvargs):
+ if "xml" in kvargs:
+ xml_indent(cur_xmlconf)
+ s = ET.tostring(cur_xmlconf, method="xml", encoding="unicode")
+ else:
+ if cmdline.context == main_ctx:
+ s = get_main_text_conf(cur_xmlconf)
+ else:
+ s = get_interface_text_conf(cur_xmlconf)
+ pydoc.pager(s)
+
+# used in main and gre context
+display_cmd = CmdBuilder(
+ expr = "display [xml]",
+ token_desc = {
+ "display": TextCliCmd("display", help_str = "Display the " +
+ "current configuration."),
+ "xml": TextCliCmd("xml", help_str = "If present, dump in XML format."),
+ },
+ cb = display_cb,
+ help_str = "Display the current configuration."
+)
+
+def bind_cb(cmdline, kvargs):
+ tunnel = cur_xmlconf.find("encapsulation/gre-tunnel")
+ local = tunnel.find("local-endpoint")
+ if local == None:
+ local = ET.XML("<local-endpoint/>")
+ tunnel.append(local)
+ local.text = kvargs["<local-ip>"]
+ remote = tunnel.find("remote-endpoint")
+ if remote == None:
+ remote = ET.XML("<remote-endpoint/>")
+ tunnel.append(remote)
+ remote.text = kvargs["<remote-ip>"]
+ vrfid = tunnel.find("link-vrf-id")
+ if vrfid != None:
+ tunnel.remove(vrfid)
+ if "<vrfid>" in kvargs:
+ vrfid = ET.XML("<link-vrf-id/>")
+ tunnel.append(vrfid)
+ vrfid.text = str(kvargs["<vrfid>"])
+
+def key_cb(cmdline, kvargs):
+ tunnel = cur_xmlconf.find("encapsulation/gre-tunnel")
+ key = tunnel.find("key")
+ if key != None:
+ tunnel.remove(key)
+ if "disable" in kvargs:
+ return
+ key = ET.XML("<key/>")
+ tunnel.append(key)
+ if "<key>" in kvargs:
+ key.text = str(kvargs["<key>"])
+
+def checksum_cb(cmdline, kvargs):
+ # XXX handle input/output
+ tunnel = cur_xmlconf.find("encapsulation/gre-tunnel")
+ checksum = tunnel.find("checksum")
+ if checksum != None:
+ tunnel.remove(checksum)
+ if "disable" in kvargs:
+ return
+ checksum = ET.XML("<checksum/>")
+ tunnel.append(checksum)
+
+def exit_cb(cmdline, kvargs):
+ global cur_xmlconf, xmlconf
+ cmdline.set_context("main")
+ cmdline.set_prompt("router{} ")
+ cur_xmlconf = xmlconf
+
+gre_ctx = PycolContext([
+ CmdBuilder(
+ expr = "bind <local-ip> <remote-ip> [link-vrfid <vrfid>]",
+ token_desc = {
+ "bind": TextCliCmd("bind", help_str = "Bind the tunnel."),
+ "<local-ip>": IPv4CliCmd(help_str = "The local IPv4 address (A.B.C.D)."),
+ "<remote-ip>": IPv4CliCmd(help_str = "The remote IPv4 address (A.B.C.D)."),
+ "link-vrfid": TextCliCmd("link-vrfid", help_str = ""),
+ "<vrfid>": IntCliCmd(help_str = "Link vrfid."),
+ },
+ cb = bind_cb,
+ help_str = "Bind the GRE tunnel on local and remote addresses."
+ ),
+ CmdBuilder(
+ expr = "checksum|checksum-input|checksum-output enable|disable",
+ token_desc = {
+ "checksum": TextCliCmd("checksum", help_str = ""),
+ "checksum-input": TextCliCmd("checksum-input", help_str = ""),
+ "checksum-output": TextCliCmd("checksum-output", help_str = ""),
+ "enable": TextCliCmd("enable", help_str = ""),
+ "disable": TextCliCmd("disable", help_str = ""),
+ },
+ cb = checksum_cb,
+ help_str = "Enable or disable GRE checksum."
+ ),
+ CmdBuilder(
+ expr = "key <key>|enable|disable",
+ token_desc = {
+ "key": TextCliCmd("key", help_str = ""),
+ "<key>": IntCliCmd(val_min = 0, val_max = (1<<32) - 1,
+ help_str = "The GRE key."),
+ "enable": TextCliCmd("enable", help_str = ""),
+ "disable": TextCliCmd("disable", help_str = ""),
+ },
+ cb = key_cb,
+ help_str = "Enable or disable GRE checksum."
+ ),
+ CmdBuilder(
+ expr = "exit",
+ token_desc = {
+ "exit": TextCliCmd("exit", help_str = "Exit from GRE context."),
+ },
+ cb = exit_cb,
+ help_str = "Exit from GRE context."
+ ),
+ display_cmd,
+ help_cmd,
+])
+
+def list_gre_ctx():
+ return [ node.text for node in \
+ xmlconf.findall("interfaces/interface/encapsulation/gre-tunnel/gre-port") ]
+
+def enter_gre(ifname):
+ global xmlconf, cur_xmlconf
+ for node in xmlconf.findall("interfaces/interface"):
+ if node.find("name").text == ifname:
+ cur_xmlconf = node
+ break
+ cmdline.set_prompt("router{%s} "%(ifname))
+ cmdline.set_context("gre")
+
+def enter_gre_cb(cmdline, kvargs):
+ ifname = kvargs["<gre-ctx>"]
+ return enter_gre(ifname)
+
+def create_gre_cb(cmdline, kvargs):
+ global xmlconf, cur_xmlconf
+
+ # create the context first
+ ifname = kvargs["<new-gre-ctx>"]
+ xml_str = """
+ <interface>
+ <name>{name}</name>
+ <description>GRE Tunnel</description>
+ <encapsulation>
+ <gre-tunnel>
+ <gre-port>{name}</gre-port>
+ </gre-tunnel>
+ </encapsulation>
+ </interface>
+ """.format(name = ifname)
+ node = ET.XML(xml_str)
+ xmlconf.find("interfaces").append(node)
+ return enter_gre(ifname)
+
+def delete_gre(cmdline, kvargs):
+ interfaces = xmlconf.find("interfaces")
+ for interface in interfaces.findall("interface"):
+ if interface.find("name").text == kvargs["<gre-ctx>"]:
+ interfaces.remove(interface)
+ break
+
+main_ctx = PycolContext([
+ CmdBuilder(
+ expr = "<gre-ctx>",
+ token_desc = {
+ "<gre-ctx>": ChoiceCliCmd(choice = list_gre_ctx,
+ help_str = "An existing gre context."),
+ },
+ cb = enter_gre_cb,
+ help_str = "Enter an existing GRE context."
+ ),
+ CmdBuilder(
+ expr = "<new-gre-ctx>",
+ token_desc = {
+ "<new-gre-ctx>": RegexpCliCmd("gre[0-9]+",
+ help_str = "A new gre context (format " +
+ "is gre[0-9]+)."),
+ },
+ cb = create_gre_cb,
+ help_str = "Create and enter a new GRE context."
+ ),
+ CmdBuilder(
+ expr = "delete <gre-ctx>",
+ token_desc = {
+ "delete": TextCliCmd("delete", help_str = ""),
+ "<gre-ctx>": ChoiceCliCmd(choice = list_gre_ctx,
+ help_str = "An existing gre context."),
+ },
+ cb = delete_gre,
+ help_str = "Delete a GRE context."
+ ),
+ display_cmd,
+ help_cmd,
+])
+
+
+cmdline = pycol.Cmdline()
+cmdline.add_context("main", main_ctx)
+cmdline.add_context("gre", gre_ctx)
+cmdline.set_context("main")
+cmdline.set_prompt("router{} ")
+cmdline.input_loop()
--- /dev/null
+#!/usr/bin/python3
+
+import readline
+import clicmd
+import shlex
+import sys
+import traceback
+
+class Cmdline(object):
+ def __init__(self):
+ self.contexts = {}
+ self.prompt = "> "
+ self.context = None
+ self.completions = []
+
+ # Call the completer when tab is hit
+ readline.set_completer(self.complete)
+ readline.parse_and_bind('tab: complete')
+
+ readline.set_completion_display_matches_hook(self.display_matches)
+
+ # remove some word breaks
+ delims = ' \t\n'
+ readline.set_completer_delims(delims)
+
+ def display_matches(self, sustitution, matches, longest_match_length):
+ print()
+ for m in matches:
+ print(" %s"%m)
+ print("%s%s"%(self.prompt, readline.get_line_buffer()),
+ end = "", flush = True)
+ readline.forced_update_display()
+
+ def add_context(self, name, ctx):
+ assert(not name in self.contexts), "duplicate context name %s"%(name)
+ self.contexts[name] = ctx
+
+ def set_context(self, name):
+ self.context = self.contexts[name]
+
+ def set_prompt(self, prompt):
+ self.prompt = prompt
+
+ def complete(self, text, state):
+ response = None
+
+ # state is not 0, the list of completions is already stored in
+ # self.completions[]: just return the next one
+ if state != 0:
+ if state >= len(self.completions):
+ return None
+ return self.completions[state]
+
+ # else, try to build the completion list
+ try:
+ line = readline.get_line_buffer() # full line
+ end = readline.get_endidx() # cursor
+
+ #print("<%s>"%origline[:end])
+ in_buf = line[:end]
+
+ tokens = shlex.split(in_buf, comments = False) # XXX check all calls to shlex
+ # whitespace after the first token, it means we want to complete
+ # the second token
+ if len(tokens) == 0 or not in_buf.endswith(tokens[-1]):
+ tokens.append("")
+ completions = self.context.complete(tokens)
+
+ # Build the completion list in the readline format (a list of
+ # string). It may include the help if tab is pressed twice.
+ self.completions = []
+ completion_type = readline.get_completion_type()
+ # completion_key = readline.get_completion_invoking_key() # XXX
+ completion_key = 0
+ if chr(completion_type) == "?":
+ for c in completions:
+ help_str = c.cmd.get_help()
+ if c.token == "":
+ self.completions.append(c.cmd.to_expr() + ": " + help_str)
+ elif help_str == "":
+ self.completions.append(c.token)
+ else:
+ self.completions.append(c.token + ": " + help_str)
+ # check if it matches the command, in this case display [return]
+ # XXX does it work well?
+ result = None
+ for cmd in self.context:
+ result = cmd.match(tokens)
+ if result != None:
+ self.completions.append("[return]")
+ break
+ else:
+ for c in completions:
+ if c.token == "":
+ continue
+ if c.terminal == True:
+ self.completions.append(c.token + " ")
+ else:
+ self.completions.append(c.token)
+
+ if len(self.completions) == 0:
+ return None
+
+ return self.completions[0]
+
+ except:
+ traceback.print_exc()
+
+ return None
+
+ def input_loop(self):
+ line = ''
+ while line != 'stop':
+ try:
+ line = input(self.prompt)
+ except KeyboardInterrupt:
+ print()
+ continue
+ except EOFError:
+ print()
+ return
+ tokens = shlex.split(line, comments = True) # XXX
+ if len(tokens) == 0:
+ continue
+ assert(self.context != None), "context not set, use set_context()"
+ result = None
+ for cmd in self.context:
+ result = cmd.match(tokens)
+ if result != None:
+ break
+ if result == None:
+ print("Invalid command or syntax error")
+ else:
+ assert(cmd.cb != None), \
+ "no callback function for %s"%(cmd)
+ cmd.cb(self, result)
+
+
--- /dev/null
+#!/usr/bin/python3
+
+import shlex
+from io import StringIO
+
+class SHL(shlex.shlex):
+ def __init__(self, *args, **kvargs):
+ shlex.shlex.__init__(self, *args, **kvargs)
+ def read_token(self, *args, **kvargs):
+ x = shlex.shlex.read_token(self, *args, **kvargs)
+ print("read_token <%s>"%x)
+ return x
+ def get_token(self, *args, **kvargs):
+ x = shlex.shlex.get_token(self, *args, **kvargs)
+ print("get_token <%s>"%x)
+ return x
+ def __next__(self, *args, **kvargs):
+ x = shlex.shlex.__next__(self, *args, **kvargs)
+ print("__next__ <%s>"%x)
+ return x
+
+class XIO(StringIO):
+ def __init__(self, *args, **kvargs):
+ StringIO.__init__(self, *args, **kvargs)
+ def readline(self, *args, **kvargs):
+ print("readline")
+ return StringIO.readline(self, *args, **kvargs)
+ def read(self, *args, **kvargs):
+ x = StringIO.readline(self, *args, **kvargs)
+ print("read <%s>"%x)
+ return x
+
+s = SHL("", posix=True)
+s.debug = 1
+s.whitespace_split = True
+
+s.state = ' '
+s.push_source(XIO("xx xx"))
+print("------------- %s"%list(s))
+
+s.state = ' '
+s.push_source(XIO("yy yy"))
+print("------------- %s"%list(s))
+
+s.state = ' '
+s.push_source(XIO('a "c cds cds" d'))
+print("------------- %s"%list(s))