examples/ipsec-secgw: update required Scapy version
[dpdk.git] / examples / ipsec-secgw / test / pkttest.py
1 #!/usr/bin/env python3
2
3 import fcntl
4 import pkg_resources
5 import socket
6 import struct
7 import sys
8 import unittest
9
10
11 if sys.version_info < (3, 0):
12     print("Python3 is required to run this script")
13     sys.exit(1)
14
15
16 try:
17     from scapy.all import Ether
18 except ImportError:
19     print("Scapy module is required")
20     sys.exit(1)
21
22
23 PKTTEST_REQ = [
24     "scapy>=2.4.3",
25 ]
26
27
28 def assert_requirements(req):
29     """
30     assert requirement is met
31     req can hold a string or a list of strings
32     """
33     try:
34         pkg_resources.require(req)
35     except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
36         print("Requirement assertion: " + str(e))
37         sys.exit(1)
38
39
40 TAP_UNPROTECTED = "dtap1"
41 TAP_PROTECTED = "dtap0"
42
43
44 class Interface(object):
45     ETH_P_ALL = 3
46     MAX_PACKET_SIZE = 1280
47     IOCTL_GET_INFO = 0x8927
48     SOCKET_TIMEOUT = 0.5
49     def __init__(self, ifname):
50         self.name = ifname
51
52         # create and bind socket to specified interface
53         self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
54         self.s.settimeout(Interface.SOCKET_TIMEOUT)
55         self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
56
57         # get interface MAC address
58         info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
59         self.mac = ':'.join(['%02x' % i for i in info[18:24]])
60
61     def __del__(self):
62         self.s.close()
63
64     def send_l3packet(self, pkt, mac):
65         e = Ether(src=self.mac, dst=mac)
66         self.send_packet(e/pkt)
67
68     def send_packet(self, pkt):
69         self.send_bytes(bytes(pkt))
70
71     def send_bytes(self, bytedata):
72         self.s.send(bytedata)
73
74     def recv_packet(self):
75         return Ether(self.recv_bytes())
76
77     def recv_bytes(self):
78         return self.s.recv(Interface.MAX_PACKET_SIZE)
79
80     def get_mac(self):
81         return self.mac
82
83
84 class PacketXfer(object):
85     def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
86         self.protected_port = Interface(protected_iface)
87         self.unprotected_port = Interface(unprotected_iface)
88
89     def send_to_protected_port(self, pkt, remote_mac=None):
90         if remote_mac is None:
91             remote_mac = self.unprotected_port.get_mac()
92         self.protected_port.send_l3packet(pkt, remote_mac)
93
94     def send_to_unprotected_port(self, pkt, remote_mac=None):
95         if remote_mac is None:
96             remote_mac = self.protected_port.get_mac()
97         self.unprotected_port.send_l3packet(pkt, remote_mac)
98
99     def xfer_unprotected(self, pkt):
100         self.send_to_unprotected_port(pkt)
101         return self.protected_port.recv_packet()
102
103     def xfer_protected(self, pkt):
104         self.send_to_protected_port(pkt)
105         return self.unprotected_port.recv_packet()
106
107
108 def pkttest():
109     if len(sys.argv) == 1:
110         sys.exit(unittest.main(verbosity=2))
111     elif len(sys.argv) == 2:
112         if sys.argv[1] == "config":
113             module = __import__('__main__')
114             try:
115                 print(module.config())
116             except AttributeError:
117                 sys.stderr.write("Cannot find \"config()\" in a test")
118                 sys.exit(1)
119     else:
120         sys.exit(1)
121
122
123 if __name__ == "__main__":
124     if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
125         assert_requirements(PKTTEST_REQ)
126     else:
127         print("Usage: " + sys.argv[0] + " check_reqs")