From: Marcin Smoczynski Date: Mon, 24 Jun 2019 13:40:00 +0000 (+0200) Subject: examples/ipsec-secgw: add scapy based tests X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=9a18283a54d418da86fa4836321bf2806ee920da;p=dpdk.git examples/ipsec-secgw: add scapy based tests Add new unittest-like mechanism which uses scapy to craft custom packets and a set of assertions to check how ipsec-secgw example application is processing them. Python3 with scapy module is required by pkttest.sh to run test scripts. A new mechanism is used to test IPv6 transport mode traffic with header extensions (trs_ipv6opts.py). Fix incomplete test log problem by disabling buffering of ipsec-secgw standard output with stdbuf application. Signed-off-by: Marcin Smoczynski Acked-by: Konstantin Ananyev Acked-by: Akhil Goyal Tested-by: Konstantin Ananyev --- diff --git a/examples/ipsec-secgw/test/common_defs.sh b/examples/ipsec-secgw/test/common_defs.sh index 8dc574b50e..63ad5415dd 100644 --- a/examples/ipsec-secgw/test/common_defs.sh +++ b/examples/ipsec-secgw/test/common_defs.sh @@ -1,22 +1,10 @@ #! /bin/bash -#check that env vars are properly defined - -#check SGW_PATH -if [[ -z "${SGW_PATH}" || ! -x ${SGW_PATH} ]]; then - echo "SGW_PATH is invalid" - exit 127 -fi - #check ETH_DEV if [[ -z "${ETH_DEV}" ]]; then echo "ETH_DEV is invalid" exit 127 fi - -#setup SGW_LCORE -SGW_LCORE=${SGW_LCORE:-0} - #check that REMOTE_HOST is reachable ssh ${REMOTE_HOST} echo st=$? @@ -47,14 +35,6 @@ LOCAL_IPV6=fd12:3456:789a:0031:0000:0000:0000:0092 DPDK_PATH=${RTE_SDK:-${PWD}} DPDK_BUILD=${RTE_TARGET:-x86_64-native-linux-gcc} -SGW_OUT_FILE=./ipsec-secgw.out1 - -SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4 ${ETH_DEV}" -SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})" -SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\"" - -SGW_CFG_FILE=$(mktemp) - # configure local host/ifaces config_local_iface() { @@ -126,37 +106,7 @@ config6_iface() config6_remote_iface } -#start ipsec-secgw -secgw_start() -{ - SGW_EXEC_FILE=$(mktemp) - cat < ${SGW_EXEC_FILE} -${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \ ---vdev="net_tap0,mac=fixed" \ --- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \ -${SGW_OUT_FILE} 2>&1 & -p=\$! -echo \$p -EOF - - cat ${SGW_EXEC_FILE} - SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}` - - # wait till ipsec-secgw start properly - i=0 - st=1 - while [[ $i -ne 10 && st -ne 0 ]]; do - sleep 1 - ifconfig ${LOCAL_IFACE} - st=$? - let i++ - done -} - -#stop ipsec-secgw and cleanup -secgw_stop() -{ - kill ${SGW_PID} - rm -f ${SGW_EXEC_FILE} - rm -f ${SGW_CFG_FILE} -} +# secgw application parameters setup +SGW_PORT_CFG="--vdev=\"net_tap0,mac=fixed\" ${ETH_DEV}" +SGW_WAIT_DEV="${LOCAL_IFACE}" +. ${DIR}/common_defs_secgw.sh diff --git a/examples/ipsec-secgw/test/common_defs_secgw.sh b/examples/ipsec-secgw/test/common_defs_secgw.sh new file mode 100644 index 0000000000..a50c03cb3c --- /dev/null +++ b/examples/ipsec-secgw/test/common_defs_secgw.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +# check required parameters +SGW_REQ_VARS="SGW_PATH SGW_PORT_CFG SGW_WAIT_DEV" +for reqvar in ${SGW_REQ_VARS} +do + if [[ -z "${!reqvar}" ]]; then + echo "Required parameter ${reqvar} is empty" + exit 127 + fi +done + +# check if SGW_PATH point to an executable +if [[ ! -x ${SGW_PATH} ]]; then + echo "${SGW_PATH} is not executable" + exit 127 +fi + +# setup SGW_LCORE +SGW_LCORE=${SGW_LCORE:-0} + +# setup config and output filenames +SGW_OUT_FILE=./ipsec-secgw.out1 +SGW_CFG_FILE=$(mktemp) + +# setup secgw parameters +SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4" +SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})" +SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\"" + +# start ipsec-secgw +secgw_start() +{ + SGW_EXEC_FILE=$(mktemp) + cat < ${SGW_EXEC_FILE} +stdbuf -o0 ${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \ +${SGW_PORT_CFG} ${SGW_EAL_XPRM} \ +-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \ +${SGW_OUT_FILE} 2>&1 & +p=\$! +echo \$p +EOF + + cat ${SGW_EXEC_FILE} + cat ${SGW_CFG_FILE} + SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}` + + # wait till ipsec-secgw start properly + i=0 + st=1 + while [[ $i -ne 10 && $st -ne 0 ]]; do + sleep 1 + ifconfig ${SGW_WAIT_DEV} + st=$? + let i++ + done +} + +# stop ipsec-secgw and cleanup +secgw_stop() +{ + kill ${SGW_PID} + rm -f ${SGW_EXEC_FILE} + rm -f ${SGW_CFG_FILE} +} diff --git a/examples/ipsec-secgw/test/pkttest.py b/examples/ipsec-secgw/test/pkttest.py new file mode 100755 index 0000000000..bcad2156b5 --- /dev/null +++ b/examples/ipsec-secgw/test/pkttest.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +import fcntl +import pkg_resources +import socket +import struct +import sys +import unittest + + +if sys.version_info < (3, 0): + print("Python3 is required to run this script") + sys.exit(1) + + +try: + from scapy.all import Ether +except ImportError: + print("Scapy module is required") + sys.exit(1) + + +PKTTEST_REQ = [ + "scapy==2.4.3rc1", +] + + +def assert_requirements(req): + """ + assert requirement is met + req can hold a string or a list of strings + """ + try: + pkg_resources.require(req) + except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e: + print("Requirement assertion: " + str(e)) + sys.exit(1) + + +TAP_UNPROTECTED = "dtap1" +TAP_PROTECTED = "dtap0" + + +class Interface(object): + ETH_P_ALL = 3 + MAX_PACKET_SIZE = 1280 + IOCTL_GET_INFO = 0x8927 + SOCKET_TIMEOUT = 0.5 + def __init__(self, ifname): + self.name = ifname + + # create and bind socket to specified interface + self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL)) + self.s.settimeout(Interface.SOCKET_TIMEOUT) + self.s.bind((self.name, 0, socket.PACKET_OTHERHOST)) + + # get interface MAC address + info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii'))) + self.mac = ':'.join(['%02x' % i for i in info[18:24]]) + + def __del__(self): + self.s.close() + + def send_l3packet(self, pkt, mac): + e = Ether(src=self.mac, dst=mac) + self.send_packet(e/pkt) + + def send_packet(self, pkt): + self.send_bytes(bytes(pkt)) + + def send_bytes(self, bytedata): + self.s.send(bytedata) + + def recv_packet(self): + return Ether(self.recv_bytes()) + + def recv_bytes(self): + return self.s.recv(Interface.MAX_PACKET_SIZE) + + def get_mac(self): + return self.mac + + +class PacketXfer(object): + def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED): + self.protected_port = Interface(protected_iface) + self.unprotected_port = Interface(unprotected_iface) + + def send_to_protected_port(self, pkt, remote_mac=None): + if remote_mac is None: + remote_mac = self.unprotected_port.get_mac() + self.protected_port.send_l3packet(pkt, remote_mac) + + def send_to_unprotected_port(self, pkt, remote_mac=None): + if remote_mac is None: + remote_mac = self.protected_port.get_mac() + self.unprotected_port.send_l3packet(pkt, remote_mac) + + def xfer_unprotected(self, pkt): + self.send_to_unprotected_port(pkt) + return self.protected_port.recv_packet() + + def xfer_protected(self, pkt): + self.send_to_protected_port(pkt) + return self.unprotected_port.recv_packet() + + +def pkttest(): + if len(sys.argv) == 1: + sys.exit(unittest.main(verbosity=2)) + elif len(sys.argv) == 2: + if sys.argv[1] == "config": + module = __import__('__main__') + try: + print(module.config()) + except AttributeError: + sys.stderr.write("Cannot find \"config()\" in a test") + sys.exit(1) + else: + sys.exit(1) + + +if __name__ == "__main__": + if len(sys.argv) == 2 and sys.argv[1] == "check_reqs": + assert_requirements(PKTTEST_REQ) + else: + print("Usage: " + sys.argv[0] + " check_reqs") diff --git a/examples/ipsec-secgw/test/pkttest.sh b/examples/ipsec-secgw/test/pkttest.sh new file mode 100755 index 0000000000..04cd96b2e8 --- /dev/null +++ b/examples/ipsec-secgw/test/pkttest.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +DIR=$(dirname $0) + +if [ $(id -u) -ne 0 ]; then + echo "Run as root" + exit 1 +fi + +# check python requirements +python3 ${DIR}/pkttest.py check_reqs +if [ $? -ne 0 ]; then + echo "Requirements for Python not met, exiting" + exit 1 +fi + +# secgw application parameters setup +CRYPTO_DEV="--vdev=crypto_null0" +SGW_PORT_CFG="--vdev=net_tap0,mac=fixed --vdev=net_tap1,mac=fixed" +SGW_EAL_XPRM="--no-pci" +SGW_CMD_XPRM=-l +SGW_WAIT_DEV="dtap0" +. ${DIR}/common_defs_secgw.sh + +echo "Running tests: $*" +for testcase in $* +do + # check test file presence + testfile="${DIR}/${testcase}.py" + if [ ! -f ${testfile} ]; then + echo "Invalid test ${testcase}" + continue + fi + + # prepare test config + python3 ${testfile} config > ${SGW_CFG_FILE} + if [ $? -ne 0 ]; then + rm -f ${SGW_CFG_FILE} + echo "Cannot get secgw configuration for test ${testcase}" + exit 1 + fi + + # start the application + secgw_start + + # setup interfaces + ifconfig dtap0 up + ifconfig dtap1 up + + # run the test + echo "Running test case: ${testcase}" + python3 ${testfile} + st=$? + + # stop the application + secgw_stop + + # report test result and exit on failure + if [ $st -eq 0 ]; then + echo "Test case ${testcase} succeeded" + else + echo "Test case ${testcase} failed!" + exit $st + fi +done diff --git a/examples/ipsec-secgw/test/run_test.sh b/examples/ipsec-secgw/test/run_test.sh old mode 100644 new mode 100755 index 3a1a7d4b4c..4969effdb3 --- a/examples/ipsec-secgw/test/run_test.sh +++ b/examples/ipsec-secgw/test/run_test.sh @@ -17,6 +17,17 @@ # naming convention: # 'old' means that ipsec-secgw will run in legacy (non-librte_ipsec mode) # 'tun/trs' refer to tunnel/transport mode respectively + +usage() +{ + echo "Usage:" + echo -e "\t$0 -[46p]" + echo -e "\t\t-4 Perform Linux IPv4 network tests" + echo -e "\t\t-6 Perform Linux IPv6 network tests" + echo -e "\t\t-p Perform packet validation tests" + echo -e "\t\t-h Display this help" +} + LINUX_TEST="tun_aescbc_sha1 \ tun_aescbc_sha1_esn \ tun_aescbc_sha1_esn_atom \ @@ -50,47 +61,82 @@ trs_3descbc_sha1_old \ trs_3descbc_sha1_esn \ trs_3descbc_sha1_esn_atom" -DIR=`dirname $0` +PKT_TESTS="trs_ipv6opts" + +DIR=$(dirname $0) # get input options -st=0 run4=0 run6=0 -while [[ ${st} -eq 0 ]]; do - getopts ":46" opt - st=$? - if [[ "${opt}" == "4" ]]; then - run4=1 - elif [[ "${opt}" == "6" ]]; then - run6=1 - fi +runpkt=0 +while getopts ":46ph" opt +do + case $opt in + 4) + run4=1 + ;; + 6) + run6=1 + ;; + p) + runpkt=1 + ;; + h) + usage + exit 0 + ;; + ?) + echo "Invalid option" + usage + exit 127 + ;; + esac done -if [[ ${run4} -eq 0 && ${run6} -eq 0 ]]; then +# no test suite has been selected +if [[ ${run4} -eq 0 && ${run6} -eq 0 && ${runpkt} -eq 0 ]]; then + usage exit 127 fi -for i in ${LINUX_TEST}; do - - echo "starting test ${i}" +# perform packet processing validation tests +st=0 +if [ $runpkt -eq 1 ]; then + echo "Performing packet validation tests" + /bin/bash ${DIR}/pkttest.sh ${PKT_TESTS} + st=$? - st4=0 - if [[ ${run4} -ne 0 ]]; then - /bin/bash ${DIR}/linux_test4.sh ${i} - st4=$? - echo "test4 ${i} finished with status ${st4}" + echo "pkttests finished with status ${st}" + if [[ ${st} -ne 0 ]]; then + echo "ERROR pkttests FAILED" + exit ${st} fi +fi - st6=0 - if [[ ${run6} -ne 0 ]]; then - /bin/bash ${DIR}/linux_test6.sh ${i} - st6=$? - echo "test6 ${i} finished with status ${st6}" - fi +# perform network tests +if [[ ${run4} -eq 1 || ${run6} -eq 1 ]]; then + for i in ${LINUX_TEST}; do - let "st = st4 + st6" - if [[ $st -ne 0 ]]; then - echo "ERROR test ${i} FAILED" - exit $st - fi -done + echo "starting test ${i}" + + st4=0 + if [[ ${run4} -ne 0 ]]; then + /bin/bash ${DIR}/linux_test4.sh ${i} + st4=$? + echo "test4 ${i} finished with status ${st4}" + fi + + st6=0 + if [[ ${run6} -ne 0 ]]; then + /bin/bash ${DIR}/linux_test6.sh ${i} + st6=$? + echo "test6 ${i} finished with status ${st6}" + fi + + let "st = st4 + st6" + if [[ $st -ne 0 ]]; then + echo "ERROR test ${i} FAILED" + exit $st + fi + done +fi diff --git a/examples/ipsec-secgw/test/trs_ipv6opts.py b/examples/ipsec-secgw/test/trs_ipv6opts.py new file mode 100755 index 0000000000..167c896178 --- /dev/null +++ b/examples/ipsec-secgw/test/trs_ipv6opts.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 + +from scapy.all import * +import unittest +import pkttest + + +SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001" +DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001" +SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64" +DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64" + + +def config(): + return """ +sp ipv6 out esp protect 5 pri 1 \\ +src {0} \\ +dst {1} \\ +sport 0:65535 dport 0:65535 + +sp ipv6 in esp protect 6 pri 1 \\ +src {1} \\ +dst {0} \\ +sport 0:65535 dport 0:65535 + +sa out 5 cipher_algo null auth_algo null mode transport +sa in 6 cipher_algo null auth_algo null mode transport + +rt ipv6 dst {0} port 1 +rt ipv6 dst {1} port 0 +""".format(SRC_NET, DST_NET) + + +class TestTransportWithIPv6Ext(unittest.TestCase): + # There is a bug in the IPsec Scapy implementation + # which causes invalid packet reconstruction after + # successful decryption. This method is a workaround. + @staticmethod + def decrypt(pkt, sa): + esp = pkt[ESP] + + # decrypt dummy packet with no extensions + d = sa.decrypt(IPv6()/esp) + + # fix 'next header' in the preceding header of the original + # packet and remove ESP + pkt[ESP].underlayer.nh = d[IPv6].nh + pkt[ESP].underlayer.remove_payload() + + # combine L3 header with decrypted payload + npkt = pkt/d[IPv6].payload + + # fix length + npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload) + + return npkt + + def setUp(self): + self.px = pkttest.PacketXfer() + self.outb_sa = SecurityAssociation(ESP, spi=5) + self.inb_sa = SecurityAssociation(ESP, spi=6) + + def test_outb_ipv6_noopt(self): + pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + + # send and check response + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) + self.assertEqual(resp[ESP].spi, 5) + + # decrypt response, check packet after decryption + d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) + self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP) + self.assertEqual(d[UDP].sport, 123) + self.assertEqual(d[UDP].dport, 456) + self.assertEqual(bytes(d[UDP].payload), b'abc') + + def test_outb_ipv6_opt(self): + hoptions = [] + hoptions.append(RouterAlert(value=2)) + hoptions.append(Jumbo(jumboplen=5000)) + hoptions.append(Pad1()) + + doptions = [] + doptions.append(HAO(hoa="1234::4321")) + + pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) + pkt /= IPv6ExtHdrHopByHop(options=hoptions) + pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) + pkt /= IPv6ExtHdrDestOpt(options=doptions) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + + # send and check response + resp = self.px.xfer_unprotected(pkt) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) + + # check extensions + self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) + self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) + self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP) + + # check ESP + self.assertEqual(resp[ESP].spi, 5) + + # decrypt response, check packet after decryption + d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) + self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS) + self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) + self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) + self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) + + # check UDP + self.assertEqual(d[UDP].sport, 123) + self.assertEqual(d[UDP].dport, 456) + self.assertEqual(bytes(d[UDP].payload), b'abc') + + def test_inb_ipv6_noopt(self): + # encrypt and send raw UDP packet + pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + e = self.inb_sa.encrypt(pkt) + + # send and check response + resp = self.px.xfer_protected(e) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) + + # check UDP packet + self.assertEqual(resp[UDP].sport, 123) + self.assertEqual(resp[UDP].dport, 456) + self.assertEqual(bytes(resp[UDP].payload), b'abc') + + def test_inb_ipv6_opt(self): + hoptions = [] + hoptions.append(RouterAlert(value=2)) + hoptions.append(Jumbo(jumboplen=5000)) + hoptions.append(Pad1()) + + doptions = [] + doptions.append(HAO(hoa="1234::4321")) + + # prepare packet with options + pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) + pkt /= IPv6ExtHdrHopByHop(options=hoptions) + pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) + pkt /= IPv6ExtHdrDestOpt(options=doptions) + pkt /= UDP(sport=123,dport=456)/Raw(load="abc") + e = self.inb_sa.encrypt(pkt) + + # self encrypted packet and check response + resp = self.px.xfer_protected(e) + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) + self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) + self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) + self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) + + # check UDP + self.assertEqual(resp[UDP].sport, 123) + self.assertEqual(resp[UDP].dport, 456) + self.assertEqual(bytes(resp[UDP].payload), b'abc') + + def test_inb_ipv6_frag(self): + # prepare ESP payload + pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc") + e = self.inb_sa.encrypt(pkt) + + # craft and send inbound packet + e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload + resp = self.px.xfer_protected(e) + + # check response + self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT) + self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP) + + # check UDP + self.assertEqual(resp[UDP].sport, 123) + self.assertEqual(resp[UDP].dport, 456) + self.assertEqual(bytes(resp[UDP].payload), b'abc') + + +pkttest.pkttest()