11 if sys.version_info < (3, 0):
12 print("Python3 is required to run this script")
17 from scapy.all import Ether
19 print("Scapy module is required")
28 def assert_requirements(req):
30 assert requirement is met
31 req can hold a string or a list of strings
34 pkg_resources.require(req)
35 except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
36 print("Requirement assertion: " + str(e))
40 TAP_UNPROTECTED = "dtap1"
41 TAP_PROTECTED = "dtap0"
44 class Interface(object):
46 MAX_PACKET_SIZE = 1280
47 IOCTL_GET_INFO = 0x8927
49 def __init__(self, ifname):
52 # create and bind socket to specified interface
53 self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
54 self.s.settimeout(Interface.SOCKET_TIMEOUT)
55 self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
57 # get interface MAC address
58 info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
59 self.mac = ':'.join(['%02x' % i for i in info[18:24]])
64 def send_l3packet(self, pkt, mac):
65 e = Ether(src=self.mac, dst=mac)
66 self.send_packet(e/pkt)
68 def send_packet(self, pkt):
69 self.send_bytes(bytes(pkt))
71 def send_bytes(self, bytedata):
74 def recv_packet(self):
75 return Ether(self.recv_bytes())
78 return self.s.recv(Interface.MAX_PACKET_SIZE)
84 class PacketXfer(object):
85 def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
86 self.protected_port = Interface(protected_iface)
87 self.unprotected_port = Interface(unprotected_iface)
89 def send_to_protected_port(self, pkt, remote_mac=None):
90 if remote_mac is None:
91 remote_mac = self.unprotected_port.get_mac()
92 self.protected_port.send_l3packet(pkt, remote_mac)
94 def send_to_unprotected_port(self, pkt, remote_mac=None):
95 if remote_mac is None:
96 remote_mac = self.protected_port.get_mac()
97 self.unprotected_port.send_l3packet(pkt, remote_mac)
99 def xfer_unprotected(self, pkt):
100 self.send_to_unprotected_port(pkt)
101 return self.protected_port.recv_packet()
103 def xfer_protected(self, pkt):
104 self.send_to_protected_port(pkt)
105 return self.unprotected_port.recv_packet()
109 if len(sys.argv) == 1:
110 sys.exit(unittest.main(verbosity=2))
111 elif len(sys.argv) == 2:
112 if sys.argv[1] == "config":
113 module = __import__('__main__')
115 print(module.config())
116 except AttributeError:
117 sys.stderr.write("Cannot find \"config()\" in a test")
123 if __name__ == "__main__":
124 if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
125 assert_requirements(PKTTEST_REQ)
127 print("Usage: " + sys.argv[0] + " check_reqs")