2 # SPDX-License-Identifier: BSD-3-Clause
4 from scapy.all import *
9 SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001"
10 DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001"
11 SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64"
12 DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64"
17 sp ipv6 out esp protect 5 pri 1 \\
20 sport 0:65535 dport 0:65535
22 sp ipv6 in esp protect 6 pri 1 \\
25 sport 0:65535 dport 0:65535
27 sa out 5 cipher_algo null auth_algo null mode transport
28 sa in 6 cipher_algo null auth_algo null mode transport
30 rt ipv6 dst {0} port 1
31 rt ipv6 dst {1} port 0
32 """.format(SRC_NET, DST_NET)
35 class TestTransportWithIPv6Ext(unittest.TestCase):
36 # There is a bug in the IPsec Scapy implementation
37 # which causes invalid packet reconstruction after
38 # successful decryption. This method is a workaround.
43 # decrypt dummy packet with no extensions
44 d = sa.decrypt(IPv6()/esp)
46 # fix 'next header' in the preceding header of the original
47 # packet and remove ESP
48 pkt[ESP].underlayer.nh = d[IPv6].nh
49 pkt[ESP].underlayer.remove_payload()
51 # combine L3 header with decrypted payload
52 npkt = pkt/d[IPv6].payload
55 npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
60 self.px = pkttest.PacketXfer()
61 self.outb_sa = SecurityAssociation(ESP, spi=5)
62 self.inb_sa = SecurityAssociation(ESP, spi=6)
64 def test_outb_ipv6_noopt(self):
65 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
66 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
68 # send and check response
69 resp = self.px.xfer_unprotected(pkt)
70 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
71 self.assertEqual(resp[ESP].spi, 5)
73 # decrypt response, check packet after decryption
74 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
75 self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
76 self.assertEqual(d[UDP].sport, 123)
77 self.assertEqual(d[UDP].dport, 456)
78 self.assertEqual(bytes(d[UDP].payload), b'abc')
80 def test_outb_ipv6_opt(self):
82 hoptions.append(RouterAlert(value=2))
83 hoptions.append(Jumbo(jumboplen=5000))
84 hoptions.append(Pad1())
87 doptions.append(HAO(hoa="1234::4321"))
89 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
90 pkt /= IPv6ExtHdrHopByHop(options=hoptions)
91 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
92 pkt /= IPv6ExtHdrDestOpt(options=doptions)
93 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
95 # send and check response
96 resp = self.px.xfer_unprotected(pkt)
97 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
100 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
101 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
102 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
105 self.assertEqual(resp[ESP].spi, 5)
107 # decrypt response, check packet after decryption
108 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
109 self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
110 self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
111 self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
112 self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
115 self.assertEqual(d[UDP].sport, 123)
116 self.assertEqual(d[UDP].dport, 456)
117 self.assertEqual(bytes(d[UDP].payload), b'abc')
119 def test_inb_ipv6_noopt(self):
120 # encrypt and send raw UDP packet
121 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
122 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
123 e = self.inb_sa.encrypt(pkt)
125 # send and check response
126 resp = self.px.xfer_protected(e)
127 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
130 self.assertEqual(resp[UDP].sport, 123)
131 self.assertEqual(resp[UDP].dport, 456)
132 self.assertEqual(bytes(resp[UDP].payload), b'abc')
134 def test_inb_ipv6_opt(self):
136 hoptions.append(RouterAlert(value=2))
137 hoptions.append(Jumbo(jumboplen=5000))
138 hoptions.append(Pad1())
141 doptions.append(HAO(hoa="1234::4321"))
143 # prepare packet with options
144 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
145 pkt /= IPv6ExtHdrHopByHop(options=hoptions)
146 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
147 pkt /= IPv6ExtHdrDestOpt(options=doptions)
148 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
149 e = self.inb_sa.encrypt(pkt)
151 # self encrypted packet and check response
152 resp = self.px.xfer_protected(e)
153 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
154 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
155 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
156 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
159 self.assertEqual(resp[UDP].sport, 123)
160 self.assertEqual(resp[UDP].dport, 456)
161 self.assertEqual(bytes(resp[UDP].payload), b'abc')
163 def test_inb_ipv6_frag(self):
164 # prepare ESP payload
165 pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
166 e = self.inb_sa.encrypt(pkt)
168 # craft and send inbound packet
169 e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
170 resp = self.px.xfer_protected(e)
173 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
174 self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
177 self.assertEqual(resp[UDP].sport, 123)
178 self.assertEqual(resp[UDP].dport, 456)
179 self.assertEqual(bytes(resp[UDP].payload), b'abc')