examples/ipsec-secgw: add event helper config init/uninit
[dpdk.git] / examples / ipsec-secgw / test / tun_null_header_reconstruct.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: BSD-3-Clause
3 # Copyright(c) 2019 Intel Corporation
4
5 from scapy.all import *
6 import unittest
7 import pkttest
8
9 #{ipv4{ipv4}} test
10 SRC_ADDR_IPV4_1 = "192.168.1.1"
11 DST_ADDR_IPV4_1 = "192.168.2.1"
12
13 #{ipv6{ipv6}} test
14 SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001"
15 DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001"
16
17 #{ipv4{ipv6}} test
18 SRC_ADDR_IPV4_2 = "192.168.11.1"
19 DST_ADDR_IPV4_2 = "192.168.12.1"
20 SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001"
21 DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001"
22
23 #{ipv6{ipv4}} test
24 SRC_ADDR_IPV4_3 = "192.168.21.1"
25 DST_ADDR_IPV4_3 = "192.168.22.1"
26 SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001"
27 DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001"
28
29 def config():
30     return """
31 #outter-ipv4 inner-ipv4 tunnel mode test
32 sp ipv4 out esp protect 5 pri 1 \\
33 src {0}/32 \\
34 dst {1}/32 \\
35 sport 0:65535 dport 0:65535
36
37 sp ipv4 in esp protect 6 pri 1 \\
38 src {1}/32 \\
39 dst {0}/32 \\
40 sport 0:65535 dport 0:65535
41
42 sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\
43 src {0} dst {1}
44 sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\
45 src {1} dst {0}
46
47 rt ipv4 dst {0}/32 port 1
48 rt ipv4 dst {1}/32 port 0
49
50 #outter-ipv6 inner-ipv6 tunnel mode test
51 sp ipv6 out esp protect 7 pri 1 \\
52 src {2}/128 \\
53 dst {3}/128 \\
54 sport 0:65535 dport 0:65535
55
56 sp ipv6 in esp protect 8 pri 1 \\
57 src {3}/128 \\
58 dst {2}/128 \\
59 sport 0:65535 dport 0:65535
60
61 sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\
62 src {2} dst {3}
63 sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\
64 src {3} dst {2}
65
66 rt ipv6 dst {2}/128 port 1
67 rt ipv6 dst {3}/128 port 0
68
69 #outter-ipv4 inner-ipv6 tunnel mode test
70 sp ipv6 out esp protect 9 pri 1 \\
71 src {4}/128 \\
72 dst {5}/128 \\
73 sport 0:65535 dport 0:65535
74
75 sp ipv6 in esp protect 10 pri 1 \\
76 src {5}/128 \\
77 dst {4}/128 \\
78 sport 0:65535 dport 0:65535
79
80 sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\
81 src {6} dst {7}
82 sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\
83 src {7} dst {6}
84
85 rt ipv6 dst {4}/128 port 1
86 rt ipv4 dst {7}/32 port 0
87
88 #outter-ipv6 inner-ipv4 tunnel mode test
89 sp ipv4 out esp protect 11 pri 1 \\
90 src {8}/32 \\
91 dst {9}/32 \\
92 sport 0:65535 dport 0:65535
93
94 sp ipv4 in esp protect 12 pri 1 \\
95 src {9}/32 \\
96 dst {8}/32 \\
97 sport 0:65535 dport 0:65535
98
99 sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\
100 src {10} dst {11}
101 sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\
102 src {11} dst {10}
103
104 rt ipv4 dst {8}/32 port 1
105 rt ipv6 dst {11}/128 port 0
106 """.format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
107            SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
108            SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2,
109            SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3)
110
111 ECN_ECT0    = 0x02
112 ECN_ECT1    = 0x01
113 ECN_CE      = 0x03
114 DSCP_1      = 0x04
115 DSCP_3F     = 0xFC
116
117 class TestTunnelHeaderReconstruct(unittest.TestCase):
118     def setUp(self):
119         self.px = pkttest.PacketXfer()
120         th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1)
121         self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th)
122
123         th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1)
124         self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th)
125
126         th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2)
127         self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th)
128
129         th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3)
130         self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th)
131
132     def gen_pkt_plain_ipv4(self, src, dst, tos):
133         pkt = IP(src=src, dst=dst, tos=tos)
134         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
135         return pkt
136
137     def gen_pkt_plain_ipv6(self, src, dst, tc):
138         pkt = IPv6(src=src, dst=dst, tc=tc)
139         pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
140         return pkt
141
142     def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner):
143         pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1,
144                                       tos_inner)
145         pkt = self.sa_ipv4v4.encrypt(pkt)
146         self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
147         self.assertEqual(pkt[ESP].spi, 6)
148         pkt[IP].tos = tos_outter
149         return pkt
150
151     def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner):
152         pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1,
153                                       tc_inner)
154         pkt = self.sa_ipv6v6.encrypt(pkt)
155         self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
156         self.assertEqual(pkt[ESP].spi, 8)
157         pkt[IPv6].tc = tc_outter
158         return pkt
159
160     def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner):
161         pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2,
162                                       tc_inner)
163         pkt = self.sa_ipv4v6.encrypt(pkt)
164         self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
165         self.assertEqual(pkt[ESP].spi, 10)
166         pkt[IP].tos = tos_outter
167         return pkt
168
169     def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner):
170         pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3,
171                                       tos_inner)
172         pkt = self.sa_ipv6v4.encrypt(pkt)
173         self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
174         self.assertEqual(pkt[ESP].spi, 12)
175         pkt[IPv6].tc = tc_outter
176         return pkt
177
178 #RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field
179     def test_outb_ipv4v4_ecn(self):
180         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
181                                       ECN_ECT1)
182         resp = self.px.xfer_unprotected(pkt)
183         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
184         self.assertEqual(resp[ESP].spi, 5)
185         self.assertEqual(resp[IP].tos, ECN_ECT1)
186
187         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
188                                       ECN_ECT0)
189         resp = self.px.xfer_unprotected(pkt)
190         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
191         self.assertEqual(resp[ESP].spi, 5)
192         self.assertEqual(resp[IP].tos, ECN_ECT0)
193
194         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
195                                       ECN_CE)
196         resp = self.px.xfer_unprotected(pkt)
197         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
198         self.assertEqual(resp[ESP].spi, 5)
199         self.assertEqual(resp[IP].tos, ECN_CE)
200
201     def test_outb_ipv6v6_ecn(self):
202         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
203                                       ECN_ECT1)
204         resp = self.px.xfer_unprotected(pkt)
205         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
206         self.assertEqual(resp[IPv6].tc, ECN_ECT1)
207
208         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
209                                       ECN_ECT0)
210         resp = self.px.xfer_unprotected(pkt)
211         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
212         self.assertEqual(resp[ESP].spi, 7)
213         self.assertEqual(resp[IPv6].tc, ECN_ECT0)
214
215         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
216                                       ECN_CE)
217         resp = self.px.xfer_unprotected(pkt)
218         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
219         self.assertEqual(resp[ESP].spi, 7)
220         self.assertEqual(resp[IPv6].tc, ECN_CE)
221
222     def test_outb_ipv4v6_ecn(self):
223         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
224                                       ECN_ECT1)
225         resp = self.px.xfer_unprotected(pkt)
226         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
227         self.assertEqual(resp[IP].tos, ECN_ECT1)
228
229         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
230                                       ECN_ECT0)
231         resp = self.px.xfer_unprotected(pkt)
232         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
233         self.assertEqual(resp[IP].tos, ECN_ECT0)
234
235         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
236                                       ECN_CE)
237         resp = self.px.xfer_unprotected(pkt)
238         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
239         self.assertEqual(resp[IP].tos, ECN_CE)
240
241     def test_outb_ipv6v4_ecn(self):
242         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
243                                       ECN_ECT1)
244         resp = self.px.xfer_unprotected(pkt)
245         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
246         self.assertEqual(resp[IPv6].tc, ECN_ECT1)
247
248         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
249                                       ECN_ECT0)
250         resp = self.px.xfer_unprotected(pkt)
251         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
252         self.assertEqual(resp[IPv6].tc, ECN_ECT0)
253
254         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
255                                       ECN_CE)
256         resp = self.px.xfer_unprotected(pkt)
257         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
258         self.assertEqual(resp[IPv6].tc, ECN_CE)
259
260 #RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets
261 #ECN is overwritten to CE, otherwise no change
262
263 #Outter header not CE, Inner header should be no change
264     def test_inb_ipv4v4_ecn_inner_no_change(self):
265         pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0)
266         resp = self.px.xfer_protected(pkt)
267         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
268         self.assertEqual(resp[IP].tos, ECN_ECT0)
269
270         pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1)
271         resp = self.px.xfer_protected(pkt)
272         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
273         self.assertEqual(resp[IP].tos, ECN_ECT1)
274
275         pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE)
276         resp = self.px.xfer_protected(pkt)
277         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
278         self.assertEqual(resp[IP].tos, ECN_CE)
279
280     def test_inb_ipv6v6_ecn_inner_no_change(self):
281         pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0)
282         resp = self.px.xfer_protected(pkt)
283         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
284         self.assertEqual(resp[IPv6].tc, ECN_ECT0)
285
286         pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1)
287         resp = self.px.xfer_protected(pkt)
288         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
289         self.assertEqual(resp[IPv6].tc, ECN_ECT1)
290
291         pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE)
292         resp = self.px.xfer_protected(pkt)
293         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
294         self.assertEqual(resp[IPv6].tc, ECN_CE)
295
296     def test_inb_ipv4v6_ecn_inner_no_change(self):
297         pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0)
298         resp = self.px.xfer_protected(pkt)
299         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
300         self.assertEqual(resp[IPv6].tc, ECN_ECT0)
301
302         pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1)
303         resp = self.px.xfer_protected(pkt)
304         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
305         self.assertEqual(resp[IPv6].tc, ECN_ECT1)
306
307         pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE)
308         resp = self.px.xfer_protected(pkt)
309         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
310         self.assertEqual(resp[IPv6].tc, ECN_CE)
311
312     def test_inb_ipv6v4_ecn_inner_no_change(self):
313         pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0)
314         resp = self.px.xfer_protected(pkt)
315         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
316         self.assertEqual(resp[IP].tos, ECN_ECT0)
317
318         pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1)
319         resp = self.px.xfer_protected(pkt)
320         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
321         self.assertEqual(resp[IP].tos, ECN_ECT1)
322
323         pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE)
324         resp = self.px.xfer_protected(pkt)
325         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
326         self.assertEqual(resp[IP].tos, ECN_CE)
327
328 #Outter header CE, Inner header should be changed to CE
329     def test_inb_ipv4v4_ecn_inner_change(self):
330         pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0)
331         resp = self.px.xfer_protected(pkt)
332         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
333         self.assertEqual(resp[IP].tos, ECN_CE)
334
335         pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1)
336         resp = self.px.xfer_protected(pkt)
337         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
338         self.assertEqual(resp[IP].tos, ECN_CE)
339
340     def test_inb_ipv6v6_ecn_inner_change(self):
341         pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0)
342         resp = self.px.xfer_protected(pkt)
343         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
344         self.assertEqual(resp[IPv6].tc, ECN_CE)
345
346         pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1)
347         resp = self.px.xfer_protected(pkt)
348         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
349         self.assertEqual(resp[IPv6].tc, ECN_CE)
350
351     def test_inb_ipv4v6_ecn_inner_change(self):
352         pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0)
353         resp = self.px.xfer_protected(pkt)
354         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
355         self.assertEqual(resp[IPv6].tc, ECN_CE)
356
357         pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1)
358         resp = self.px.xfer_protected(pkt)
359         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
360         self.assertEqual(resp[IPv6].tc, ECN_CE)
361
362     def test_inb_ipv6v4_ecn_inner_change(self):
363         pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0)
364         resp = self.px.xfer_protected(pkt)
365         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
366         self.assertEqual(resp[IP].tos, ECN_CE)
367
368         pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1)
369         resp = self.px.xfer_protected(pkt)
370         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
371         self.assertEqual(resp[IP].tos, ECN_CE)
372
373 #RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field
374     def test_outb_ipv4v4_dscp(self):
375         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
376                                       DSCP_1)
377         resp = self.px.xfer_unprotected(pkt)
378         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
379         self.assertEqual(resp[ESP].spi, 5)
380         self.assertEqual(resp[IP].tos, DSCP_1)
381
382         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
383                                       DSCP_3F)
384         resp = self.px.xfer_unprotected(pkt)
385         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
386         self.assertEqual(resp[ESP].spi, 5)
387         self.assertEqual(resp[IP].tos, DSCP_3F)
388
389     def test_outb_ipv6v6_dscp(self):
390         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
391                                       DSCP_1)
392         resp = self.px.xfer_unprotected(pkt)
393         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
394         self.assertEqual(resp[ESP].spi, 7)
395         self.assertEqual(resp[IPv6].tc, DSCP_1)
396
397         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
398                                       DSCP_3F)
399         resp = self.px.xfer_unprotected(pkt)
400         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
401         self.assertEqual(resp[ESP].spi, 7)
402         self.assertEqual(resp[IPv6].tc, DSCP_3F)
403
404     def test_outb_ipv4v6_dscp(self):
405         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
406                                       DSCP_1)
407         resp = self.px.xfer_unprotected(pkt)
408         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
409         self.assertEqual(resp[ESP].spi, 9)
410         self.assertEqual(resp[IP].tos, DSCP_1)
411
412         pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
413                                       DSCP_3F)
414         resp = self.px.xfer_unprotected(pkt)
415         self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
416         self.assertEqual(resp[ESP].spi, 9)
417         self.assertEqual(resp[IP].tos, DSCP_3F)
418
419     def test_outb_ipv6v4_dscp(self):
420         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
421                                       DSCP_1)
422         resp = self.px.xfer_unprotected(pkt)
423         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
424         self.assertEqual(resp[ESP].spi, 11)
425         self.assertEqual(resp[IPv6].tc, DSCP_1)
426
427         pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
428                                       DSCP_3F)
429         resp = self.px.xfer_unprotected(pkt)
430         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
431         self.assertEqual(resp[ESP].spi, 11)
432         self.assertEqual(resp[IPv6].tc, DSCP_3F)
433
434 #RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field
435     def test_inb_ipv4v4_dscp(self):
436         pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1)
437         resp = self.px.xfer_protected(pkt)
438         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
439         self.assertEqual(resp[IP].tos, DSCP_1)
440
441         pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F)
442         resp = self.px.xfer_protected(pkt)
443         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
444         self.assertEqual(resp[IP].tos, DSCP_3F)
445
446     def test_inb_ipv6v6_dscp(self):
447         pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1)
448         resp = self.px.xfer_protected(pkt)
449         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
450         self.assertEqual(resp[IPv6].tc, DSCP_1)
451
452         pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F)
453         resp = self.px.xfer_protected(pkt)
454         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
455         self.assertEqual(resp[IPv6].tc, DSCP_3F)
456
457     def test_inb_ipv4v6_dscp(self):
458         pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1)
459         resp = self.px.xfer_protected(pkt)
460         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
461         self.assertEqual(resp[IPv6].tc, DSCP_1)
462
463         pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F)
464         resp = self.px.xfer_protected(pkt)
465         self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
466         self.assertEqual(resp[IPv6].tc, DSCP_3F)
467
468     def test_inb_ipv6v4_dscp(self):
469         pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1)
470         resp = self.px.xfer_protected(pkt)
471         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
472         self.assertEqual(resp[IP].tos, DSCP_1)
473
474         pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F)
475         resp = self.px.xfer_protected(pkt)
476         self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
477         self.assertEqual(resp[IP].tos, DSCP_3F)
478
479 pkttest.pkttest()