2 # SPDX-License-Identifier: BSD-3-Clause
3 # Copyright(c) 2019 Intel Corporation
5 from scapy.all import *
10 SRC_ADDR_IPV4_1 = "192.168.1.1"
11 DST_ADDR_IPV4_1 = "192.168.2.1"
14 SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001"
15 DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001"
18 SRC_ADDR_IPV4_2 = "192.168.11.1"
19 DST_ADDR_IPV4_2 = "192.168.12.1"
20 SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001"
21 DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001"
24 SRC_ADDR_IPV4_3 = "192.168.21.1"
25 DST_ADDR_IPV4_3 = "192.168.22.1"
26 SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001"
27 DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001"
31 #outter-ipv4 inner-ipv4 tunnel mode test
32 sp ipv4 out esp protect 5 pri 1 \\
35 sport 0:65535 dport 0:65535
37 sp ipv4 in esp protect 6 pri 1 \\
40 sport 0:65535 dport 0:65535
42 sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\
44 sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\
47 rt ipv4 dst {0}/32 port 1
48 rt ipv4 dst {1}/32 port 0
50 #outter-ipv6 inner-ipv6 tunnel mode test
51 sp ipv6 out esp protect 7 pri 1 \\
54 sport 0:65535 dport 0:65535
56 sp ipv6 in esp protect 8 pri 1 \\
59 sport 0:65535 dport 0:65535
61 sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\
63 sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\
66 rt ipv6 dst {2}/128 port 1
67 rt ipv6 dst {3}/128 port 0
69 #outter-ipv4 inner-ipv6 tunnel mode test
70 sp ipv6 out esp protect 9 pri 1 \\
73 sport 0:65535 dport 0:65535
75 sp ipv6 in esp protect 10 pri 1 \\
78 sport 0:65535 dport 0:65535
80 sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\
82 sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\
85 rt ipv6 dst {4}/128 port 1
86 rt ipv4 dst {7}/32 port 0
88 #outter-ipv6 inner-ipv4 tunnel mode test
89 sp ipv4 out esp protect 11 pri 1 \\
92 sport 0:65535 dport 0:65535
94 sp ipv4 in esp protect 12 pri 1 \\
97 sport 0:65535 dport 0:65535
99 sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\
101 sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\
104 rt ipv4 dst {8}/32 port 1
105 rt ipv6 dst {11}/128 port 0
106 """.format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
107 SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
108 SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2,
109 SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3)
117 class TestTunnelHeaderReconstruct(unittest.TestCase):
119 self.px = pkttest.PacketXfer()
120 th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1)
121 self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th)
123 th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1)
124 self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th)
126 th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2)
127 self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th)
129 th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3)
130 self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th)
132 def gen_pkt_plain_ipv4(self, src, dst, tos):
133 pkt = IP(src=src, dst=dst, tos=tos)
134 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
137 def gen_pkt_plain_ipv6(self, src, dst, tc):
138 pkt = IPv6(src=src, dst=dst, tc=tc)
139 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
142 def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner):
143 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1,
145 pkt = self.sa_ipv4v4.encrypt(pkt)
146 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
147 self.assertEqual(pkt[ESP].spi, 6)
148 pkt[IP].tos = tos_outter
151 def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner):
152 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1,
154 pkt = self.sa_ipv6v6.encrypt(pkt)
155 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
156 self.assertEqual(pkt[ESP].spi, 8)
157 pkt[IPv6].tc = tc_outter
160 def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner):
161 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2,
163 pkt = self.sa_ipv4v6.encrypt(pkt)
164 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
165 self.assertEqual(pkt[ESP].spi, 10)
166 pkt[IP].tos = tos_outter
169 def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner):
170 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3,
172 pkt = self.sa_ipv6v4.encrypt(pkt)
173 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
174 self.assertEqual(pkt[ESP].spi, 12)
175 pkt[IPv6].tc = tc_outter
178 #RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field
179 def test_outb_ipv4v4_ecn(self):
180 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
182 resp = self.px.xfer_unprotected(pkt)
183 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
184 self.assertEqual(resp[ESP].spi, 5)
185 self.assertEqual(resp[IP].tos, ECN_ECT1)
187 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
189 resp = self.px.xfer_unprotected(pkt)
190 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
191 self.assertEqual(resp[ESP].spi, 5)
192 self.assertEqual(resp[IP].tos, ECN_ECT0)
194 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
196 resp = self.px.xfer_unprotected(pkt)
197 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
198 self.assertEqual(resp[ESP].spi, 5)
199 self.assertEqual(resp[IP].tos, ECN_CE)
201 def test_outb_ipv6v6_ecn(self):
202 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
204 resp = self.px.xfer_unprotected(pkt)
205 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
206 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
208 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
210 resp = self.px.xfer_unprotected(pkt)
211 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
212 self.assertEqual(resp[ESP].spi, 7)
213 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
215 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
217 resp = self.px.xfer_unprotected(pkt)
218 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
219 self.assertEqual(resp[ESP].spi, 7)
220 self.assertEqual(resp[IPv6].tc, ECN_CE)
222 def test_outb_ipv4v6_ecn(self):
223 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
225 resp = self.px.xfer_unprotected(pkt)
226 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
227 self.assertEqual(resp[IP].tos, ECN_ECT1)
229 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
231 resp = self.px.xfer_unprotected(pkt)
232 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
233 self.assertEqual(resp[IP].tos, ECN_ECT0)
235 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
237 resp = self.px.xfer_unprotected(pkt)
238 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
239 self.assertEqual(resp[IP].tos, ECN_CE)
241 def test_outb_ipv6v4_ecn(self):
242 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
244 resp = self.px.xfer_unprotected(pkt)
245 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
246 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
248 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
250 resp = self.px.xfer_unprotected(pkt)
251 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
252 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
254 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
256 resp = self.px.xfer_unprotected(pkt)
257 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
258 self.assertEqual(resp[IPv6].tc, ECN_CE)
260 #RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets
261 #ECN is overwritten to CE, otherwise no change
263 #Outter header not CE, Inner header should be no change
264 def test_inb_ipv4v4_ecn_inner_no_change(self):
265 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0)
266 resp = self.px.xfer_protected(pkt)
267 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
268 self.assertEqual(resp[IP].tos, ECN_ECT0)
270 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1)
271 resp = self.px.xfer_protected(pkt)
272 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
273 self.assertEqual(resp[IP].tos, ECN_ECT1)
275 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE)
276 resp = self.px.xfer_protected(pkt)
277 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
278 self.assertEqual(resp[IP].tos, ECN_CE)
280 def test_inb_ipv6v6_ecn_inner_no_change(self):
281 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0)
282 resp = self.px.xfer_protected(pkt)
283 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
284 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
286 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1)
287 resp = self.px.xfer_protected(pkt)
288 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
289 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
291 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE)
292 resp = self.px.xfer_protected(pkt)
293 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
294 self.assertEqual(resp[IPv6].tc, ECN_CE)
296 def test_inb_ipv4v6_ecn_inner_no_change(self):
297 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0)
298 resp = self.px.xfer_protected(pkt)
299 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
300 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
302 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1)
303 resp = self.px.xfer_protected(pkt)
304 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
305 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
307 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE)
308 resp = self.px.xfer_protected(pkt)
309 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
310 self.assertEqual(resp[IPv6].tc, ECN_CE)
312 def test_inb_ipv6v4_ecn_inner_no_change(self):
313 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0)
314 resp = self.px.xfer_protected(pkt)
315 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
316 self.assertEqual(resp[IP].tos, ECN_ECT0)
318 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1)
319 resp = self.px.xfer_protected(pkt)
320 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
321 self.assertEqual(resp[IP].tos, ECN_ECT1)
323 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE)
324 resp = self.px.xfer_protected(pkt)
325 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
326 self.assertEqual(resp[IP].tos, ECN_CE)
328 #Outter header CE, Inner header should be changed to CE
329 def test_inb_ipv4v4_ecn_inner_change(self):
330 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0)
331 resp = self.px.xfer_protected(pkt)
332 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
333 self.assertEqual(resp[IP].tos, ECN_CE)
335 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1)
336 resp = self.px.xfer_protected(pkt)
337 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
338 self.assertEqual(resp[IP].tos, ECN_CE)
340 def test_inb_ipv6v6_ecn_inner_change(self):
341 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0)
342 resp = self.px.xfer_protected(pkt)
343 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
344 self.assertEqual(resp[IPv6].tc, ECN_CE)
346 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1)
347 resp = self.px.xfer_protected(pkt)
348 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
349 self.assertEqual(resp[IPv6].tc, ECN_CE)
351 def test_inb_ipv4v6_ecn_inner_change(self):
352 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0)
353 resp = self.px.xfer_protected(pkt)
354 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
355 self.assertEqual(resp[IPv6].tc, ECN_CE)
357 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1)
358 resp = self.px.xfer_protected(pkt)
359 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
360 self.assertEqual(resp[IPv6].tc, ECN_CE)
362 def test_inb_ipv6v4_ecn_inner_change(self):
363 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0)
364 resp = self.px.xfer_protected(pkt)
365 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
366 self.assertEqual(resp[IP].tos, ECN_CE)
368 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1)
369 resp = self.px.xfer_protected(pkt)
370 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
371 self.assertEqual(resp[IP].tos, ECN_CE)
373 #RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field
374 def test_outb_ipv4v4_dscp(self):
375 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
377 resp = self.px.xfer_unprotected(pkt)
378 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
379 self.assertEqual(resp[ESP].spi, 5)
380 self.assertEqual(resp[IP].tos, DSCP_1)
382 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
384 resp = self.px.xfer_unprotected(pkt)
385 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
386 self.assertEqual(resp[ESP].spi, 5)
387 self.assertEqual(resp[IP].tos, DSCP_3F)
389 def test_outb_ipv6v6_dscp(self):
390 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
392 resp = self.px.xfer_unprotected(pkt)
393 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
394 self.assertEqual(resp[ESP].spi, 7)
395 self.assertEqual(resp[IPv6].tc, DSCP_1)
397 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
399 resp = self.px.xfer_unprotected(pkt)
400 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
401 self.assertEqual(resp[ESP].spi, 7)
402 self.assertEqual(resp[IPv6].tc, DSCP_3F)
404 def test_outb_ipv4v6_dscp(self):
405 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
407 resp = self.px.xfer_unprotected(pkt)
408 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
409 self.assertEqual(resp[ESP].spi, 9)
410 self.assertEqual(resp[IP].tos, DSCP_1)
412 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
414 resp = self.px.xfer_unprotected(pkt)
415 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
416 self.assertEqual(resp[ESP].spi, 9)
417 self.assertEqual(resp[IP].tos, DSCP_3F)
419 def test_outb_ipv6v4_dscp(self):
420 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
422 resp = self.px.xfer_unprotected(pkt)
423 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
424 self.assertEqual(resp[ESP].spi, 11)
425 self.assertEqual(resp[IPv6].tc, DSCP_1)
427 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
429 resp = self.px.xfer_unprotected(pkt)
430 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
431 self.assertEqual(resp[ESP].spi, 11)
432 self.assertEqual(resp[IPv6].tc, DSCP_3F)
434 #RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field
435 def test_inb_ipv4v4_dscp(self):
436 pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1)
437 resp = self.px.xfer_protected(pkt)
438 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
439 self.assertEqual(resp[IP].tos, DSCP_1)
441 pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F)
442 resp = self.px.xfer_protected(pkt)
443 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
444 self.assertEqual(resp[IP].tos, DSCP_3F)
446 def test_inb_ipv6v6_dscp(self):
447 pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1)
448 resp = self.px.xfer_protected(pkt)
449 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
450 self.assertEqual(resp[IPv6].tc, DSCP_1)
452 pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F)
453 resp = self.px.xfer_protected(pkt)
454 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
455 self.assertEqual(resp[IPv6].tc, DSCP_3F)
457 def test_inb_ipv4v6_dscp(self):
458 pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1)
459 resp = self.px.xfer_protected(pkt)
460 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
461 self.assertEqual(resp[IPv6].tc, DSCP_1)
463 pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F)
464 resp = self.px.xfer_protected(pkt)
465 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
466 self.assertEqual(resp[IPv6].tc, DSCP_3F)
468 def test_inb_ipv6v4_dscp(self):
469 pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1)
470 resp = self.px.xfer_protected(pkt)
471 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
472 self.assertEqual(resp[IP].tos, DSCP_1)
474 pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F)
475 resp = self.px.xfer_protected(pkt)
476 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
477 self.assertEqual(resp[IP].tos, DSCP_3F)