examples/ipsec-segw: add SPDX license tag
[dpdk.git] / examples / ipsec-secgw / test / pkttest.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: BSD-3-Clause
3
4 import fcntl
5 import pkg_resources
6 import socket
7 import struct
8 import sys
9 import unittest
10
11
12 if sys.version_info < (3, 0):
13     print("Python3 is required to run this script")
14     sys.exit(1)
15
16
17 try:
18     from scapy.all import Ether
19 except ImportError:
20     print("Scapy module is required")
21     sys.exit(1)
22
23
24 PKTTEST_REQ = [
25     "scapy>=2.4.3",
26 ]
27
28
29 def assert_requirements(req):
30     """
31     assert requirement is met
32     req can hold a string or a list of strings
33     """
34     try:
35         pkg_resources.require(req)
36     except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
37         print("Requirement assertion: " + str(e))
38         sys.exit(1)
39
40
41 TAP_UNPROTECTED = "dtap1"
42 TAP_PROTECTED = "dtap0"
43
44
45 class Interface(object):
46     ETH_P_ALL = 3
47     MAX_PACKET_SIZE = 1280
48     IOCTL_GET_INFO = 0x8927
49     SOCKET_TIMEOUT = 0.5
50     def __init__(self, ifname):
51         self.name = ifname
52
53         # create and bind socket to specified interface
54         self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
55         self.s.settimeout(Interface.SOCKET_TIMEOUT)
56         self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
57
58         # get interface MAC address
59         info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
60         self.mac = ':'.join(['%02x' % i for i in info[18:24]])
61
62     def __del__(self):
63         self.s.close()
64
65     def send_l3packet(self, pkt, mac):
66         e = Ether(src=self.mac, dst=mac)
67         self.send_packet(e/pkt)
68
69     def send_packet(self, pkt):
70         self.send_bytes(bytes(pkt))
71
72     def send_bytes(self, bytedata):
73         self.s.send(bytedata)
74
75     def recv_packet(self):
76         return Ether(self.recv_bytes())
77
78     def recv_bytes(self):
79         return self.s.recv(Interface.MAX_PACKET_SIZE)
80
81     def get_mac(self):
82         return self.mac
83
84
85 class PacketXfer(object):
86     def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
87         self.protected_port = Interface(protected_iface)
88         self.unprotected_port = Interface(unprotected_iface)
89
90     def send_to_protected_port(self, pkt, remote_mac=None):
91         if remote_mac is None:
92             remote_mac = self.unprotected_port.get_mac()
93         self.protected_port.send_l3packet(pkt, remote_mac)
94
95     def send_to_unprotected_port(self, pkt, remote_mac=None):
96         if remote_mac is None:
97             remote_mac = self.protected_port.get_mac()
98         self.unprotected_port.send_l3packet(pkt, remote_mac)
99
100     def xfer_unprotected(self, pkt):
101         self.send_to_unprotected_port(pkt)
102         return self.protected_port.recv_packet()
103
104     def xfer_protected(self, pkt):
105         self.send_to_protected_port(pkt)
106         return self.unprotected_port.recv_packet()
107
108
109 def pkttest():
110     if len(sys.argv) == 1:
111         sys.exit(unittest.main(verbosity=2))
112     elif len(sys.argv) == 2:
113         if sys.argv[1] == "config":
114             module = __import__('__main__')
115             try:
116                 print(module.config())
117             except AttributeError:
118                 sys.stderr.write("Cannot find \"config()\" in a test")
119                 sys.exit(1)
120     else:
121         sys.exit(1)
122
123
124 if __name__ == "__main__":
125     if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
126         assert_requirements(PKTTEST_REQ)
127     else:
128         print("Usage: " + sys.argv[0] + " check_reqs")