test/ipsec: fix a typo in function name
[dpdk.git] / examples / ipsec-secgw / test / trs_ipv6opts.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: BSD-3-Clause
3
4 from scapy.all import *
5 import unittest
6 import pkttest
7
8
9 SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
10 DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
11 SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
12 DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
13
14
15 def config():
16     return """
17 sp ipv6 out esp protect 5 pri 1 \\
18 src {0} \\
19 dst {1} \\
20 sport 0:65535 dport 0:65535
21
22 sp ipv6 in esp protect 6 pri 1 \\
23 src {1} \\
24 dst {0} \\
25 sport 0:65535 dport 0:65535
26
27 sa out 5 cipher_algo null auth_algo null mode transport
28 sa in 6 cipher_algo null auth_algo null mode transport
29
30 rt ipv6 dst {0} port 1
31 rt ipv6 dst {1} port 0
32 """.format(SRC_NET, DST_NET)
33
34
35 class TestTransportWithIPv6Ext(unittest.TestCase):
36     # There is a bug in the IPsec Scapy implementation
37     # which causes invalid packet reconstruction after
38     # successful decryption. This method is a workaround.
39     @staticmethod
40     def decrypt(pkt, sa):
41         esp = pkt[ESP]
42
43         # decrypt dummy packet with no extensions
44         d = sa.decrypt(IPv6()/esp)
45
46         # fix 'next header' in the preceding header of the original
47         # packet and remove ESP
48         pkt[ESP].underlayer.nh = d[IPv6].nh
49         pkt[ESP].underlayer.remove_payload()
50
51         # combine L3 header with decrypted payload
52         npkt = pkt/d[IPv6].payload
53
54         # fix length
55         npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
56
57         return npkt
58
59     def setUp(self):
60         self.px = pkttest.PacketXfer()
61         self.outb_sa = SecurityAssociation(ESP, spi=5)
62         self.inb_sa = SecurityAssociation(ESP, spi=6)
63
64     def test_outb_ipv6_noopt(self):
65         pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
66         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
67
68         # send and check response
69         resp = self.px.xfer_unprotected(pkt)
70         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
71         self.assertEqual(resp[ESP].spi, 5)
72
73         # decrypt response, check packet after decryption
74         d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
75         self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
76         self.assertEqual(d[UDP].sport, 123)
77         self.assertEqual(d[UDP].dport, 456)
78         self.assertEqual(bytes(d[UDP].payload), b'abc')
79
80     def test_outb_ipv6_opt(self):
81         hoptions = []
82         hoptions.append(RouterAlert(value=2))
83         hoptions.append(Jumbo(jumboplen=5000))
84         hoptions.append(Pad1())
85
86         doptions = []
87         doptions.append(HAO(hoa="1234::4321"))
88
89         pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
90         pkt /= IPv6ExtHdrHopByHop(options=hoptions)
91         pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
92         pkt /= IPv6ExtHdrDestOpt(options=doptions)
93         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
94
95         # send and check response
96         resp = self.px.xfer_unprotected(pkt)
97         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
98
99         # check extensions
100         self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
101         self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
102         self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
103
104         # check ESP
105         self.assertEqual(resp[ESP].spi, 5)
106
107         # decrypt response, check packet after decryption
108         d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
109         self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
110         self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
111         self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
112         self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
113
114         # check UDP
115         self.assertEqual(d[UDP].sport, 123)
116         self.assertEqual(d[UDP].dport, 456)
117         self.assertEqual(bytes(d[UDP].payload), b'abc')
118
119     def test_inb_ipv6_noopt(self):
120         # encrypt and send raw UDP packet
121         pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
122         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
123         e = self.inb_sa.encrypt(pkt)
124
125         # send and check response
126         resp = self.px.xfer_protected(e)
127         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
128
129         # check UDP packet
130         self.assertEqual(resp[UDP].sport, 123)
131         self.assertEqual(resp[UDP].dport, 456)
132         self.assertEqual(bytes(resp[UDP].payload), b'abc')
133
134     def test_inb_ipv6_opt(self):
135         hoptions = []
136         hoptions.append(RouterAlert(value=2))
137         hoptions.append(Jumbo(jumboplen=5000))
138         hoptions.append(Pad1())
139
140         doptions = []
141         doptions.append(HAO(hoa="1234::4321"))
142
143         # prepare packet with options
144         pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
145         pkt /= IPv6ExtHdrHopByHop(options=hoptions)
146         pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
147         pkt /= IPv6ExtHdrDestOpt(options=doptions)
148         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
149         e = self.inb_sa.encrypt(pkt)
150
151         # self encrypted packet and check response
152         resp = self.px.xfer_protected(e)
153         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
154         self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
155         self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
156         self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
157
158         # check UDP
159         self.assertEqual(resp[UDP].sport, 123)
160         self.assertEqual(resp[UDP].dport, 456)
161         self.assertEqual(bytes(resp[UDP].payload), b'abc')
162
163     def test_inb_ipv6_frag(self):
164         # prepare ESP payload
165         pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
166         e = self.inb_sa.encrypt(pkt)
167
168         # craft and send inbound packet
169         e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
170         resp = self.px.xfer_protected(e)
171
172         # check response
173         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
174         self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
175
176         # check UDP
177         self.assertEqual(resp[UDP].sport, 123)
178         self.assertEqual(resp[UDP].dport, 456)
179         self.assertEqual(bytes(resp[UDP].payload), b'abc')
180
181
182 pkttest.pkttest()