net/virtio: fix incorrect cast of void *
[dpdk.git] / test / trs_ipv6opts.py
1 #!/usr/bin/env python3
2
3 from scapy.all import *
4 import unittest
5 import pkttest
6
7
8 SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
9 DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
10 SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
11 DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
12
13
14 def config():
15     return """
16 sp ipv6 out esp protect 5 pri 1 \\
17 src {0} \\
18 dst {1} \\
19 sport 0:65535 dport 0:65535
20
21 sp ipv6 in esp protect 6 pri 1 \\
22 src {1} \\
23 dst {0} \\
24 sport 0:65535 dport 0:65535
25
26 sa out 5 cipher_algo null auth_algo null mode transport
27 sa in 6 cipher_algo null auth_algo null mode transport
28
29 rt ipv6 dst {0} port 1
30 rt ipv6 dst {1} port 0
31 """.format(SRC_NET, DST_NET)
32
33
34 class TestTransportWithIPv6Ext(unittest.TestCase):
35     # There is a bug in the IPsec Scapy implementation
36     # which causes invalid packet reconstruction after
37     # successful decryption. This method is a workaround.
38     @staticmethod
39     def decrypt(pkt, sa):
40         esp = pkt[ESP]
41
42         # decrypt dummy packet with no extensions
43         d = sa.decrypt(IPv6()/esp)
44
45         # fix 'next header' in the preceding header of the original
46         # packet and remove ESP
47         pkt[ESP].underlayer.nh = d[IPv6].nh
48         pkt[ESP].underlayer.remove_payload()
49
50         # combine L3 header with decrypted payload
51         npkt = pkt/d[IPv6].payload
52
53         # fix length
54         npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
55
56         return npkt
57
58     def setUp(self):
59         self.px = pkttest.PacketXfer()
60         self.outb_sa = SecurityAssociation(ESP, spi=5)
61         self.inb_sa = SecurityAssociation(ESP, spi=6)
62
63     def test_outb_ipv6_noopt(self):
64         pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
65         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
66
67         # send and check response
68         resp = self.px.xfer_unprotected(pkt)
69         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
70         self.assertEqual(resp[ESP].spi, 5)
71
72         # decrypt response, check packet after decryption
73         d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
74         self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
75         self.assertEqual(d[UDP].sport, 123)
76         self.assertEqual(d[UDP].dport, 456)
77         self.assertEqual(bytes(d[UDP].payload), b'abc')
78
79     def test_outb_ipv6_opt(self):
80         hoptions = []
81         hoptions.append(RouterAlert(value=2))
82         hoptions.append(Jumbo(jumboplen=5000))
83         hoptions.append(Pad1())
84
85         doptions = []
86         doptions.append(HAO(hoa="1234::4321"))
87
88         pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
89         pkt /= IPv6ExtHdrHopByHop(options=hoptions)
90         pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
91         pkt /= IPv6ExtHdrDestOpt(options=doptions)
92         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
93
94         # send and check response
95         resp = self.px.xfer_unprotected(pkt)
96         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
97
98         # check extensions
99         self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
100         self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
101         self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
102
103         # check ESP
104         self.assertEqual(resp[ESP].spi, 5)
105
106         # decrypt response, check packet after decryption
107         d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
108         self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
109         self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
110         self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
111         self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
112
113         # check UDP
114         self.assertEqual(d[UDP].sport, 123)
115         self.assertEqual(d[UDP].dport, 456)
116         self.assertEqual(bytes(d[UDP].payload), b'abc')
117
118     def test_inb_ipv6_noopt(self):
119         # encrypt and send raw UDP packet
120         pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
121         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
122         e = self.inb_sa.encrypt(pkt)
123
124         # send and check response
125         resp = self.px.xfer_protected(e)
126         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
127
128         # check UDP packet
129         self.assertEqual(resp[UDP].sport, 123)
130         self.assertEqual(resp[UDP].dport, 456)
131         self.assertEqual(bytes(resp[UDP].payload), b'abc')
132
133     def test_inb_ipv6_opt(self):
134         hoptions = []
135         hoptions.append(RouterAlert(value=2))
136         hoptions.append(Jumbo(jumboplen=5000))
137         hoptions.append(Pad1())
138
139         doptions = []
140         doptions.append(HAO(hoa="1234::4321"))
141
142         # prepare packet with options
143         pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
144         pkt /= IPv6ExtHdrHopByHop(options=hoptions)
145         pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
146         pkt /= IPv6ExtHdrDestOpt(options=doptions)
147         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
148         e = self.inb_sa.encrypt(pkt)
149
150         # self encrypted packet and check response
151         resp = self.px.xfer_protected(e)
152         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
153         self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
154         self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
155         self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
156
157         # check UDP
158         self.assertEqual(resp[UDP].sport, 123)
159         self.assertEqual(resp[UDP].dport, 456)
160         self.assertEqual(bytes(resp[UDP].payload), b'abc')
161
162     def test_inb_ipv6_frag(self):
163         # prepare ESP payload
164         pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
165         e = self.inb_sa.encrypt(pkt)
166
167         # craft and send inbound packet
168         e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
169         resp = self.px.xfer_protected(e)
170
171         # check response
172         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
173         self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
174
175         # check UDP
176         self.assertEqual(resp[UDP].sport, 123)
177         self.assertEqual(resp[UDP].dport, 456)
178         self.assertEqual(bytes(resp[UDP].payload), b'abc')
179
180
181 pkttest.pkttest()