examples/ipsec-secgw: add scapy based tests
authorMarcin Smoczynski <marcinx.smoczynski@intel.com>
Mon, 24 Jun 2019 13:40:00 +0000 (15:40 +0200)
committerAkhil Goyal <akhil.goyal@nxp.com>
Fri, 5 Jul 2019 13:28:14 +0000 (15:28 +0200)
Add new unittest-like mechanism which uses scapy to craft custom
packets and a set of assertions to check how ipsec-secgw example
application is processing them. Python3 with scapy module is
required by pkttest.sh to run test scripts.

A new mechanism is used to test IPv6 transport mode traffic with
header extensions (trs_ipv6opts.py).

Fix incomplete test log problem by disabling buffering of ipsec-secgw
standard output with stdbuf application.

Signed-off-by: Marcin Smoczynski <marcinx.smoczynski@intel.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Acked-by: Akhil Goyal <akhil.goyal@nxp.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
examples/ipsec-secgw/test/common_defs.sh
examples/ipsec-secgw/test/common_defs_secgw.sh [new file with mode: 0644]
examples/ipsec-secgw/test/pkttest.py [new file with mode: 0755]
examples/ipsec-secgw/test/pkttest.sh [new file with mode: 0755]
examples/ipsec-secgw/test/run_test.sh [changed mode: 0644->0755]
examples/ipsec-secgw/test/trs_ipv6opts.py [new file with mode: 0755]

index 8dc574b..63ad541 100644 (file)
@@ -1,22 +1,10 @@
 #! /bin/bash
 
-#check that env vars are properly defined
-
-#check SGW_PATH
-if [[ -z "${SGW_PATH}" || ! -x ${SGW_PATH} ]]; then
-       echo "SGW_PATH is invalid"
-       exit 127
-fi
-
 #check ETH_DEV
 if [[ -z "${ETH_DEV}" ]]; then
        echo "ETH_DEV is invalid"
        exit 127
 fi
-
-#setup SGW_LCORE
-SGW_LCORE=${SGW_LCORE:-0}
-
 #check that REMOTE_HOST is reachable
 ssh ${REMOTE_HOST} echo
 st=$?
@@ -47,14 +35,6 @@ LOCAL_IPV6=fd12:3456:789a:0031:0000:0000:0000:0092
 DPDK_PATH=${RTE_SDK:-${PWD}}
 DPDK_BUILD=${RTE_TARGET:-x86_64-native-linux-gcc}
 
-SGW_OUT_FILE=./ipsec-secgw.out1
-
-SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4 ${ETH_DEV}"
-SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
-SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
-
-SGW_CFG_FILE=$(mktemp)
-
 # configure local host/ifaces
 config_local_iface()
 {
@@ -126,37 +106,7 @@ config6_iface()
        config6_remote_iface
 }
 
-#start ipsec-secgw
-secgw_start()
-{
-       SGW_EXEC_FILE=$(mktemp)
-       cat <<EOF > ${SGW_EXEC_FILE}
-${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
---vdev="net_tap0,mac=fixed" \
--- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
-${SGW_OUT_FILE} 2>&1 &
-p=\$!
-echo \$p
-EOF
-
-       cat ${SGW_EXEC_FILE}
-       SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
-
-       # wait till ipsec-secgw start properly
-       i=0
-       st=1
-       while [[ $i -ne 10 && st -ne 0 ]]; do
-               sleep 1
-               ifconfig ${LOCAL_IFACE}
-               st=$?
-               let i++
-       done
-}
-
-#stop ipsec-secgw and cleanup
-secgw_stop()
-{
-       kill ${SGW_PID}
-       rm -f ${SGW_EXEC_FILE}
-       rm -f ${SGW_CFG_FILE}
-}
+# secgw application parameters setup
+SGW_PORT_CFG="--vdev=\"net_tap0,mac=fixed\" ${ETH_DEV}"
+SGW_WAIT_DEV="${LOCAL_IFACE}"
+. ${DIR}/common_defs_secgw.sh
diff --git a/examples/ipsec-secgw/test/common_defs_secgw.sh b/examples/ipsec-secgw/test/common_defs_secgw.sh
new file mode 100644 (file)
index 0000000..a50c03c
--- /dev/null
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+# check required parameters
+SGW_REQ_VARS="SGW_PATH SGW_PORT_CFG SGW_WAIT_DEV"
+for reqvar in ${SGW_REQ_VARS}
+do
+       if [[ -z "${!reqvar}" ]]; then
+               echo "Required parameter ${reqvar} is empty"
+               exit 127
+       fi
+done
+
+# check if SGW_PATH point to an executable
+if [[ ! -x ${SGW_PATH} ]]; then
+       echo "${SGW_PATH} is not executable"
+       exit 127
+fi
+
+# setup SGW_LCORE
+SGW_LCORE=${SGW_LCORE:-0}
+
+# setup config and output filenames
+SGW_OUT_FILE=./ipsec-secgw.out1
+SGW_CFG_FILE=$(mktemp)
+
+# setup secgw parameters
+SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4"
+SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
+SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
+
+# start ipsec-secgw
+secgw_start()
+{
+       SGW_EXEC_FILE=$(mktemp)
+       cat <<EOF > ${SGW_EXEC_FILE}
+stdbuf -o0 ${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
+${SGW_PORT_CFG} ${SGW_EAL_XPRM} \
+-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
+${SGW_OUT_FILE} 2>&1 &
+p=\$!
+echo \$p
+EOF
+
+       cat ${SGW_EXEC_FILE}
+       cat ${SGW_CFG_FILE}
+       SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
+
+       # wait till ipsec-secgw start properly
+       i=0
+       st=1
+       while [[ $i -ne 10 && $st -ne 0 ]]; do
+               sleep 1
+               ifconfig ${SGW_WAIT_DEV}
+               st=$?
+               let i++
+       done
+}
+
+# stop ipsec-secgw and cleanup
+secgw_stop()
+{
+       kill ${SGW_PID}
+       rm -f ${SGW_EXEC_FILE}
+       rm -f ${SGW_CFG_FILE}
+}
diff --git a/examples/ipsec-secgw/test/pkttest.py b/examples/ipsec-secgw/test/pkttest.py
new file mode 100755 (executable)
index 0000000..bcad215
--- /dev/null
@@ -0,0 +1,127 @@
+#!/usr/bin/env python3
+
+import fcntl
+import pkg_resources
+import socket
+import struct
+import sys
+import unittest
+
+
+if sys.version_info < (3, 0):
+    print("Python3 is required to run this script")
+    sys.exit(1)
+
+
+try:
+    from scapy.all import Ether
+except ImportError:
+    print("Scapy module is required")
+    sys.exit(1)
+
+
+PKTTEST_REQ = [
+    "scapy==2.4.3rc1",
+]
+
+
+def assert_requirements(req):
+    """
+    assert requirement is met
+    req can hold a string or a list of strings
+    """
+    try:
+        pkg_resources.require(req)
+    except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
+        print("Requirement assertion: " + str(e))
+        sys.exit(1)
+
+
+TAP_UNPROTECTED = "dtap1"
+TAP_PROTECTED = "dtap0"
+
+
+class Interface(object):
+    ETH_P_ALL = 3
+    MAX_PACKET_SIZE = 1280
+    IOCTL_GET_INFO = 0x8927
+    SOCKET_TIMEOUT = 0.5
+    def __init__(self, ifname):
+        self.name = ifname
+
+        # create and bind socket to specified interface
+        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
+        self.s.settimeout(Interface.SOCKET_TIMEOUT)
+        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
+
+        # get interface MAC address
+        info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
+        self.mac = ':'.join(['%02x' % i for i in info[18:24]])
+
+    def __del__(self):
+        self.s.close()
+
+    def send_l3packet(self, pkt, mac):
+        e = Ether(src=self.mac, dst=mac)
+        self.send_packet(e/pkt)
+
+    def send_packet(self, pkt):
+        self.send_bytes(bytes(pkt))
+
+    def send_bytes(self, bytedata):
+        self.s.send(bytedata)
+
+    def recv_packet(self):
+        return Ether(self.recv_bytes())
+
+    def recv_bytes(self):
+        return self.s.recv(Interface.MAX_PACKET_SIZE)
+
+    def get_mac(self):
+        return self.mac
+
+
+class PacketXfer(object):
+    def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
+        self.protected_port = Interface(protected_iface)
+        self.unprotected_port = Interface(unprotected_iface)
+
+    def send_to_protected_port(self, pkt, remote_mac=None):
+        if remote_mac is None:
+            remote_mac = self.unprotected_port.get_mac()
+        self.protected_port.send_l3packet(pkt, remote_mac)
+
+    def send_to_unprotected_port(self, pkt, remote_mac=None):
+        if remote_mac is None:
+            remote_mac = self.protected_port.get_mac()
+        self.unprotected_port.send_l3packet(pkt, remote_mac)
+
+    def xfer_unprotected(self, pkt):
+        self.send_to_unprotected_port(pkt)
+        return self.protected_port.recv_packet()
+
+    def xfer_protected(self, pkt):
+        self.send_to_protected_port(pkt)
+        return self.unprotected_port.recv_packet()
+
+
+def pkttest():
+    if len(sys.argv) == 1:
+        sys.exit(unittest.main(verbosity=2))
+    elif len(sys.argv) == 2:
+        if sys.argv[1] == "config":
+            module = __import__('__main__')
+            try:
+                print(module.config())
+            except AttributeError:
+                sys.stderr.write("Cannot find \"config()\" in a test")
+                sys.exit(1)
+    else:
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
+        assert_requirements(PKTTEST_REQ)
+    else:
+        print("Usage: " + sys.argv[0] + " check_reqs")
diff --git a/examples/ipsec-secgw/test/pkttest.sh b/examples/ipsec-secgw/test/pkttest.sh
new file mode 100755 (executable)
index 0000000..04cd96b
--- /dev/null
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+DIR=$(dirname $0)
+
+if [ $(id -u) -ne 0 ]; then
+       echo "Run as root"
+       exit 1
+fi
+
+# check python requirements
+python3 ${DIR}/pkttest.py check_reqs
+if [ $? -ne 0 ]; then
+       echo "Requirements for Python not met, exiting"
+       exit 1
+fi
+
+# secgw application parameters setup
+CRYPTO_DEV="--vdev=crypto_null0"
+SGW_PORT_CFG="--vdev=net_tap0,mac=fixed --vdev=net_tap1,mac=fixed"
+SGW_EAL_XPRM="--no-pci"
+SGW_CMD_XPRM=-l
+SGW_WAIT_DEV="dtap0"
+. ${DIR}/common_defs_secgw.sh
+
+echo "Running tests: $*"
+for testcase in $*
+do
+       # check test file presence
+       testfile="${DIR}/${testcase}.py"
+       if [ ! -f ${testfile} ]; then
+               echo "Invalid test ${testcase}"
+               continue
+       fi
+
+       # prepare test config
+       python3 ${testfile} config > ${SGW_CFG_FILE}
+       if [ $? -ne 0 ]; then
+               rm -f ${SGW_CFG_FILE}
+               echo "Cannot get secgw configuration for test ${testcase}"
+               exit 1
+       fi
+
+       # start the application
+       secgw_start
+
+       # setup interfaces
+       ifconfig dtap0 up
+       ifconfig dtap1 up
+
+       # run the test
+       echo "Running test case: ${testcase}"
+       python3 ${testfile}
+       st=$?
+
+       # stop the application
+       secgw_stop
+
+       # report test result and exit on failure
+       if [ $st -eq 0 ]; then
+               echo "Test case ${testcase} succeeded"
+       else
+               echo "Test case ${testcase} failed!"
+               exit $st
+       fi
+done
old mode 100644 (file)
new mode 100755 (executable)
index 3a1a7d4..4969eff
 # naming convention:
 # 'old' means that ipsec-secgw will run in legacy (non-librte_ipsec mode)
 # 'tun/trs' refer to tunnel/transport mode respectively
+
+usage()
+{
+       echo "Usage:"
+       echo -e "\t$0 -[46p]"
+       echo -e "\t\t-4 Perform Linux IPv4 network tests"
+       echo -e "\t\t-6 Perform Linux IPv6 network tests"
+       echo -e "\t\t-p Perform packet validation tests"
+       echo -e "\t\t-h Display this help"
+}
+
 LINUX_TEST="tun_aescbc_sha1 \
 tun_aescbc_sha1_esn \
 tun_aescbc_sha1_esn_atom \
@@ -50,47 +61,82 @@ trs_3descbc_sha1_old \
 trs_3descbc_sha1_esn \
 trs_3descbc_sha1_esn_atom"
 
-DIR=`dirname $0`
+PKT_TESTS="trs_ipv6opts"
+
+DIR=$(dirname $0)
 
 # get input options
-st=0
 run4=0
 run6=0
-while [[ ${st} -eq 0 ]]; do
-       getopts ":46" opt
-       st=$?
-       if [[ "${opt}" == "4" ]]; then
-               run4=1
-       elif [[ "${opt}" == "6" ]]; then
-               run6=1
-       fi
+runpkt=0
+while getopts ":46ph" opt
+do
+       case $opt in
+               4)
+                       run4=1
+                       ;;
+               6)
+                       run6=1
+                       ;;
+               p)
+                       runpkt=1
+                       ;;
+               h)
+                       usage
+                       exit 0
+                       ;;
+               ?)
+                       echo "Invalid option"
+                       usage
+                       exit 127
+                       ;;
+       esac
 done
 
-if [[ ${run4} -eq 0 && ${run6} -eq 0 ]]; then
+# no test suite has been selected
+if [[ ${run4} -eq 0 && ${run6} -eq 0 && ${runpkt} -eq 0 ]]; then
+       usage
        exit 127
 fi
 
-for i in ${LINUX_TEST}; do
-
-       echo "starting test ${i}"
+# perform packet processing validation tests
+st=0
+if [ $runpkt -eq 1 ]; then
+       echo "Performing packet validation tests"
+       /bin/bash ${DIR}/pkttest.sh ${PKT_TESTS}
+       st=$?
 
-       st4=0
-       if [[ ${run4} -ne 0 ]]; then
-               /bin/bash ${DIR}/linux_test4.sh ${i}
-               st4=$?
-               echo "test4 ${i} finished with status ${st4}"
+       echo "pkttests finished with status ${st}"
+       if [[ ${st} -ne 0 ]]; then
+               echo "ERROR pkttests FAILED"
+               exit ${st}
        fi
+fi
 
-       st6=0
-       if [[ ${run6} -ne 0 ]]; then
-               /bin/bash ${DIR}/linux_test6.sh ${i}
-               st6=$?
-               echo "test6 ${i} finished with status ${st6}"
-       fi
+# perform network tests
+if [[ ${run4} -eq 1 || ${run6} -eq 1 ]]; then
+       for i in ${LINUX_TEST}; do
 
-       let "st = st4 + st6"
-       if [[ $st -ne 0 ]]; then
-               echo "ERROR test ${i} FAILED"
-               exit $st
-       fi
-done
+               echo "starting test ${i}"
+
+               st4=0
+               if [[ ${run4} -ne 0 ]]; then
+                       /bin/bash ${DIR}/linux_test4.sh ${i}
+                       st4=$?
+                       echo "test4 ${i} finished with status ${st4}"
+               fi
+
+               st6=0
+               if [[ ${run6} -ne 0 ]]; then
+                       /bin/bash ${DIR}/linux_test6.sh ${i}
+                       st6=$?
+                       echo "test6 ${i} finished with status ${st6}"
+               fi
+
+               let "st = st4 + st6"
+               if [[ $st -ne 0 ]]; then
+                       echo "ERROR test ${i} FAILED"
+                       exit $st
+               fi
+       done
+fi
diff --git a/examples/ipsec-secgw/test/trs_ipv6opts.py b/examples/ipsec-secgw/test/trs_ipv6opts.py
new file mode 100755 (executable)
index 0000000..167c896
--- /dev/null
@@ -0,0 +1,181 @@
+#!/usr/bin/env python3
+
+from scapy.all import *
+import unittest
+import pkttest
+
+
+SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
+DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
+SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
+DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
+
+
+def config():
+    return """
+sp ipv6 out esp protect 5 pri 1 \\
+src {0} \\
+dst {1} \\
+sport 0:65535 dport 0:65535
+
+sp ipv6 in esp protect 6 pri 1 \\
+src {1} \\
+dst {0} \\
+sport 0:65535 dport 0:65535
+
+sa out 5 cipher_algo null auth_algo null mode transport
+sa in 6 cipher_algo null auth_algo null mode transport
+
+rt ipv6 dst {0} port 1
+rt ipv6 dst {1} port 0
+""".format(SRC_NET, DST_NET)
+
+
+class TestTransportWithIPv6Ext(unittest.TestCase):
+    # There is a bug in the IPsec Scapy implementation
+    # which causes invalid packet reconstruction after
+    # successful decryption. This method is a workaround.
+    @staticmethod
+    def decrypt(pkt, sa):
+        esp = pkt[ESP]
+
+        # decrypt dummy packet with no extensions
+        d = sa.decrypt(IPv6()/esp)
+
+        # fix 'next header' in the preceding header of the original
+        # packet and remove ESP
+        pkt[ESP].underlayer.nh = d[IPv6].nh
+        pkt[ESP].underlayer.remove_payload()
+
+        # combine L3 header with decrypted payload
+        npkt = pkt/d[IPv6].payload
+
+        # fix length
+        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
+
+        return npkt
+
+    def setUp(self):
+        self.px = pkttest.PacketXfer()
+        self.outb_sa = SecurityAssociation(ESP, spi=5)
+        self.inb_sa = SecurityAssociation(ESP, spi=6)
+
+    def test_outb_ipv6_noopt(self):
+        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+        # send and check response
+        resp = self.px.xfer_unprotected(pkt)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_outb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+        # send and check response
+        resp = self.px.xfer_unprotected(pkt)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+
+        # check extensions
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
+
+        # check ESP
+        self.assertEqual(resp[ESP].spi, 5)
+
+        # decrypt response, check packet after decryption
+        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(d[UDP].sport, 123)
+        self.assertEqual(d[UDP].dport, 456)
+        self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+    def test_inb_ipv6_noopt(self):
+        # encrypt and send raw UDP packet
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # send and check response
+        resp = self.px.xfer_protected(e)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
+
+        # check UDP packet
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_opt(self):
+        hoptions = []
+        hoptions.append(RouterAlert(value=2))
+        hoptions.append(Jumbo(jumboplen=5000))
+        hoptions.append(Pad1())
+
+        doptions = []
+        doptions.append(HAO(hoa="1234::4321"))
+
+        # prepare packet with options
+        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+        pkt /= IPv6ExtHdrDestOpt(options=doptions)
+        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # self encrypted packet and check response
+        resp = self.px.xfer_protected(e)
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+    def test_inb_ipv6_frag(self):
+        # prepare ESP payload
+        pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
+        e = self.inb_sa.encrypt(pkt)
+
+        # craft and send inbound packet
+        e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
+        resp = self.px.xfer_protected(e)
+
+        # check response
+        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
+        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
+
+        # check UDP
+        self.assertEqual(resp[UDP].sport, 123)
+        self.assertEqual(resp[UDP].dport, 456)
+        self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+
+pkttest.pkttest()