3 from scapy.all import *
8 SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001"
9 DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001"
10 SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64"
11 DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64"
16 sp ipv6 out esp protect 5 pri 1 \\
19 sport 0:65535 dport 0:65535
21 sp ipv6 in esp protect 6 pri 1 \\
24 sport 0:65535 dport 0:65535
26 sa out 5 cipher_algo null auth_algo null mode transport
27 sa in 6 cipher_algo null auth_algo null mode transport
29 rt ipv6 dst {0} port 1
30 rt ipv6 dst {1} port 0
31 """.format(SRC_NET, DST_NET)
34 class TestTransportWithIPv6Ext(unittest.TestCase):
35 # There is a bug in the IPsec Scapy implementation
36 # which causes invalid packet reconstruction after
37 # successful decryption. This method is a workaround.
42 # decrypt dummy packet with no extensions
43 d = sa.decrypt(IPv6()/esp)
45 # fix 'next header' in the preceding header of the original
46 # packet and remove ESP
47 pkt[ESP].underlayer.nh = d[IPv6].nh
48 pkt[ESP].underlayer.remove_payload()
50 # combine L3 header with decrypted payload
51 npkt = pkt/d[IPv6].payload
54 npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
59 self.px = pkttest.PacketXfer()
60 self.outb_sa = SecurityAssociation(ESP, spi=5)
61 self.inb_sa = SecurityAssociation(ESP, spi=6)
63 def test_outb_ipv6_noopt(self):
64 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
65 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
67 # send and check response
68 resp = self.px.xfer_unprotected(pkt)
69 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
70 self.assertEqual(resp[ESP].spi, 5)
72 # decrypt response, check packet after decryption
73 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
74 self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
75 self.assertEqual(d[UDP].sport, 123)
76 self.assertEqual(d[UDP].dport, 456)
77 self.assertEqual(bytes(d[UDP].payload), b'abc')
79 def test_outb_ipv6_opt(self):
81 hoptions.append(RouterAlert(value=2))
82 hoptions.append(Jumbo(jumboplen=5000))
83 hoptions.append(Pad1())
86 doptions.append(HAO(hoa="1234::4321"))
88 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
89 pkt /= IPv6ExtHdrHopByHop(options=hoptions)
90 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
91 pkt /= IPv6ExtHdrDestOpt(options=doptions)
92 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
94 # send and check response
95 resp = self.px.xfer_unprotected(pkt)
96 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
99 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
100 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
101 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
104 self.assertEqual(resp[ESP].spi, 5)
106 # decrypt response, check packet after decryption
107 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
108 self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
109 self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
110 self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
111 self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
114 self.assertEqual(d[UDP].sport, 123)
115 self.assertEqual(d[UDP].dport, 456)
116 self.assertEqual(bytes(d[UDP].payload), b'abc')
118 def test_inb_ipv6_noopt(self):
119 # encrypt and send raw UDP packet
120 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
121 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
122 e = self.inb_sa.encrypt(pkt)
124 # send and check response
125 resp = self.px.xfer_protected(e)
126 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
129 self.assertEqual(resp[UDP].sport, 123)
130 self.assertEqual(resp[UDP].dport, 456)
131 self.assertEqual(bytes(resp[UDP].payload), b'abc')
133 def test_inb_ipv6_opt(self):
135 hoptions.append(RouterAlert(value=2))
136 hoptions.append(Jumbo(jumboplen=5000))
137 hoptions.append(Pad1())
140 doptions.append(HAO(hoa="1234::4321"))
142 # prepare packet with options
143 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
144 pkt /= IPv6ExtHdrHopByHop(options=hoptions)
145 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
146 pkt /= IPv6ExtHdrDestOpt(options=doptions)
147 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
148 e = self.inb_sa.encrypt(pkt)
150 # self encrypted packet and check response
151 resp = self.px.xfer_protected(e)
152 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
153 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
154 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
155 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
158 self.assertEqual(resp[UDP].sport, 123)
159 self.assertEqual(resp[UDP].dport, 456)
160 self.assertEqual(bytes(resp[UDP].payload), b'abc')
162 def test_inb_ipv6_frag(self):
163 # prepare ESP payload
164 pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
165 e = self.inb_sa.encrypt(pkt)
167 # craft and send inbound packet
168 e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
169 resp = self.px.xfer_protected(e)
172 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
173 self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
176 self.assertEqual(resp[UDP].sport, 123)
177 self.assertEqual(resp[UDP].dport, 456)
178 self.assertEqual(bytes(resp[UDP].payload), b'abc')