#! /bin/bash
-#check that env vars are properly defined
-
-#check SGW_PATH
-if [[ -z "${SGW_PATH}" || ! -x ${SGW_PATH} ]]; then
- echo "SGW_PATH is invalid"
- exit 127
-fi
-
#check ETH_DEV
if [[ -z "${ETH_DEV}" ]]; then
echo "ETH_DEV is invalid"
exit 127
fi
-
-#setup SGW_LCORE
-SGW_LCORE=${SGW_LCORE:-0}
-
#check that REMOTE_HOST is reachable
ssh ${REMOTE_HOST} echo
st=$?
DPDK_PATH=${RTE_SDK:-${PWD}}
DPDK_BUILD=${RTE_TARGET:-x86_64-native-linux-gcc}
-SGW_OUT_FILE=./ipsec-secgw.out1
-
-SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4 ${ETH_DEV}"
-SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
-SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
-
-SGW_CFG_FILE=$(mktemp)
-
# configure local host/ifaces
config_local_iface()
{
config6_remote_iface
}
-#start ipsec-secgw
-secgw_start()
-{
- SGW_EXEC_FILE=$(mktemp)
- cat <<EOF > ${SGW_EXEC_FILE}
-${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
---vdev="net_tap0,mac=fixed" \
--- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
-${SGW_OUT_FILE} 2>&1 &
-p=\$!
-echo \$p
-EOF
-
- cat ${SGW_EXEC_FILE}
- SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
-
- # wait till ipsec-secgw start properly
- i=0
- st=1
- while [[ $i -ne 10 && st -ne 0 ]]; do
- sleep 1
- ifconfig ${LOCAL_IFACE}
- st=$?
- let i++
- done
-}
-
-#stop ipsec-secgw and cleanup
-secgw_stop()
-{
- kill ${SGW_PID}
- rm -f ${SGW_EXEC_FILE}
- rm -f ${SGW_CFG_FILE}
-}
+# secgw application parameters setup
+SGW_PORT_CFG="--vdev=\"net_tap0,mac=fixed\" ${ETH_DEV}"
+SGW_WAIT_DEV="${LOCAL_IFACE}"
+. ${DIR}/common_defs_secgw.sh
--- /dev/null
+#!/bin/bash
+
+# check required parameters
+SGW_REQ_VARS="SGW_PATH SGW_PORT_CFG SGW_WAIT_DEV"
+for reqvar in ${SGW_REQ_VARS}
+do
+ if [[ -z "${!reqvar}" ]]; then
+ echo "Required parameter ${reqvar} is empty"
+ exit 127
+ fi
+done
+
+# check if SGW_PATH point to an executable
+if [[ ! -x ${SGW_PATH} ]]; then
+ echo "${SGW_PATH} is not executable"
+ exit 127
+fi
+
+# setup SGW_LCORE
+SGW_LCORE=${SGW_LCORE:-0}
+
+# setup config and output filenames
+SGW_OUT_FILE=./ipsec-secgw.out1
+SGW_CFG_FILE=$(mktemp)
+
+# setup secgw parameters
+SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4"
+SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
+SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
+
+# start ipsec-secgw
+secgw_start()
+{
+ SGW_EXEC_FILE=$(mktemp)
+ cat <<EOF > ${SGW_EXEC_FILE}
+stdbuf -o0 ${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
+${SGW_PORT_CFG} ${SGW_EAL_XPRM} \
+-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
+${SGW_OUT_FILE} 2>&1 &
+p=\$!
+echo \$p
+EOF
+
+ cat ${SGW_EXEC_FILE}
+ cat ${SGW_CFG_FILE}
+ SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
+
+ # wait till ipsec-secgw start properly
+ i=0
+ st=1
+ while [[ $i -ne 10 && $st -ne 0 ]]; do
+ sleep 1
+ ifconfig ${SGW_WAIT_DEV}
+ st=$?
+ let i++
+ done
+}
+
+# stop ipsec-secgw and cleanup
+secgw_stop()
+{
+ kill ${SGW_PID}
+ rm -f ${SGW_EXEC_FILE}
+ rm -f ${SGW_CFG_FILE}
+}
--- /dev/null
+#!/usr/bin/env python3
+
+import fcntl
+import pkg_resources
+import socket
+import struct
+import sys
+import unittest
+
+
+if sys.version_info < (3, 0):
+ print("Python3 is required to run this script")
+ sys.exit(1)
+
+
+try:
+ from scapy.all import Ether
+except ImportError:
+ print("Scapy module is required")
+ sys.exit(1)
+
+
+PKTTEST_REQ = [
+ "scapy==2.4.3rc1",
+]
+
+
+def assert_requirements(req):
+ """
+ assert requirement is met
+ req can hold a string or a list of strings
+ """
+ try:
+ pkg_resources.require(req)
+ except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
+ print("Requirement assertion: " + str(e))
+ sys.exit(1)
+
+
+TAP_UNPROTECTED = "dtap1"
+TAP_PROTECTED = "dtap0"
+
+
+class Interface(object):
+ ETH_P_ALL = 3
+ MAX_PACKET_SIZE = 1280
+ IOCTL_GET_INFO = 0x8927
+ SOCKET_TIMEOUT = 0.5
+ def __init__(self, ifname):
+ self.name = ifname
+
+ # create and bind socket to specified interface
+ self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
+ self.s.settimeout(Interface.SOCKET_TIMEOUT)
+ self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
+
+ # get interface MAC address
+ info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
+ self.mac = ':'.join(['%02x' % i for i in info[18:24]])
+
+ def __del__(self):
+ self.s.close()
+
+ def send_l3packet(self, pkt, mac):
+ e = Ether(src=self.mac, dst=mac)
+ self.send_packet(e/pkt)
+
+ def send_packet(self, pkt):
+ self.send_bytes(bytes(pkt))
+
+ def send_bytes(self, bytedata):
+ self.s.send(bytedata)
+
+ def recv_packet(self):
+ return Ether(self.recv_bytes())
+
+ def recv_bytes(self):
+ return self.s.recv(Interface.MAX_PACKET_SIZE)
+
+ def get_mac(self):
+ return self.mac
+
+
+class PacketXfer(object):
+ def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
+ self.protected_port = Interface(protected_iface)
+ self.unprotected_port = Interface(unprotected_iface)
+
+ def send_to_protected_port(self, pkt, remote_mac=None):
+ if remote_mac is None:
+ remote_mac = self.unprotected_port.get_mac()
+ self.protected_port.send_l3packet(pkt, remote_mac)
+
+ def send_to_unprotected_port(self, pkt, remote_mac=None):
+ if remote_mac is None:
+ remote_mac = self.protected_port.get_mac()
+ self.unprotected_port.send_l3packet(pkt, remote_mac)
+
+ def xfer_unprotected(self, pkt):
+ self.send_to_unprotected_port(pkt)
+ return self.protected_port.recv_packet()
+
+ def xfer_protected(self, pkt):
+ self.send_to_protected_port(pkt)
+ return self.unprotected_port.recv_packet()
+
+
+def pkttest():
+ if len(sys.argv) == 1:
+ sys.exit(unittest.main(verbosity=2))
+ elif len(sys.argv) == 2:
+ if sys.argv[1] == "config":
+ module = __import__('__main__')
+ try:
+ print(module.config())
+ except AttributeError:
+ sys.stderr.write("Cannot find \"config()\" in a test")
+ sys.exit(1)
+ else:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
+ assert_requirements(PKTTEST_REQ)
+ else:
+ print("Usage: " + sys.argv[0] + " check_reqs")
--- /dev/null
+#!/bin/bash
+
+DIR=$(dirname $0)
+
+if [ $(id -u) -ne 0 ]; then
+ echo "Run as root"
+ exit 1
+fi
+
+# check python requirements
+python3 ${DIR}/pkttest.py check_reqs
+if [ $? -ne 0 ]; then
+ echo "Requirements for Python not met, exiting"
+ exit 1
+fi
+
+# secgw application parameters setup
+CRYPTO_DEV="--vdev=crypto_null0"
+SGW_PORT_CFG="--vdev=net_tap0,mac=fixed --vdev=net_tap1,mac=fixed"
+SGW_EAL_XPRM="--no-pci"
+SGW_CMD_XPRM=-l
+SGW_WAIT_DEV="dtap0"
+. ${DIR}/common_defs_secgw.sh
+
+echo "Running tests: $*"
+for testcase in $*
+do
+ # check test file presence
+ testfile="${DIR}/${testcase}.py"
+ if [ ! -f ${testfile} ]; then
+ echo "Invalid test ${testcase}"
+ continue
+ fi
+
+ # prepare test config
+ python3 ${testfile} config > ${SGW_CFG_FILE}
+ if [ $? -ne 0 ]; then
+ rm -f ${SGW_CFG_FILE}
+ echo "Cannot get secgw configuration for test ${testcase}"
+ exit 1
+ fi
+
+ # start the application
+ secgw_start
+
+ # setup interfaces
+ ifconfig dtap0 up
+ ifconfig dtap1 up
+
+ # run the test
+ echo "Running test case: ${testcase}"
+ python3 ${testfile}
+ st=$?
+
+ # stop the application
+ secgw_stop
+
+ # report test result and exit on failure
+ if [ $st -eq 0 ]; then
+ echo "Test case ${testcase} succeeded"
+ else
+ echo "Test case ${testcase} failed!"
+ exit $st
+ fi
+done
# naming convention:
# 'old' means that ipsec-secgw will run in legacy (non-librte_ipsec mode)
# 'tun/trs' refer to tunnel/transport mode respectively
+
+usage()
+{
+ echo "Usage:"
+ echo -e "\t$0 -[46p]"
+ echo -e "\t\t-4 Perform Linux IPv4 network tests"
+ echo -e "\t\t-6 Perform Linux IPv6 network tests"
+ echo -e "\t\t-p Perform packet validation tests"
+ echo -e "\t\t-h Display this help"
+}
+
LINUX_TEST="tun_aescbc_sha1 \
tun_aescbc_sha1_esn \
tun_aescbc_sha1_esn_atom \
trs_3descbc_sha1_esn \
trs_3descbc_sha1_esn_atom"
-DIR=`dirname $0`
+PKT_TESTS="trs_ipv6opts"
+
+DIR=$(dirname $0)
# get input options
-st=0
run4=0
run6=0
-while [[ ${st} -eq 0 ]]; do
- getopts ":46" opt
- st=$?
- if [[ "${opt}" == "4" ]]; then
- run4=1
- elif [[ "${opt}" == "6" ]]; then
- run6=1
- fi
+runpkt=0
+while getopts ":46ph" opt
+do
+ case $opt in
+ 4)
+ run4=1
+ ;;
+ 6)
+ run6=1
+ ;;
+ p)
+ runpkt=1
+ ;;
+ h)
+ usage
+ exit 0
+ ;;
+ ?)
+ echo "Invalid option"
+ usage
+ exit 127
+ ;;
+ esac
done
-if [[ ${run4} -eq 0 && ${run6} -eq 0 ]]; then
+# no test suite has been selected
+if [[ ${run4} -eq 0 && ${run6} -eq 0 && ${runpkt} -eq 0 ]]; then
+ usage
exit 127
fi
-for i in ${LINUX_TEST}; do
-
- echo "starting test ${i}"
+# perform packet processing validation tests
+st=0
+if [ $runpkt -eq 1 ]; then
+ echo "Performing packet validation tests"
+ /bin/bash ${DIR}/pkttest.sh ${PKT_TESTS}
+ st=$?
- st4=0
- if [[ ${run4} -ne 0 ]]; then
- /bin/bash ${DIR}/linux_test4.sh ${i}
- st4=$?
- echo "test4 ${i} finished with status ${st4}"
+ echo "pkttests finished with status ${st}"
+ if [[ ${st} -ne 0 ]]; then
+ echo "ERROR pkttests FAILED"
+ exit ${st}
fi
+fi
- st6=0
- if [[ ${run6} -ne 0 ]]; then
- /bin/bash ${DIR}/linux_test6.sh ${i}
- st6=$?
- echo "test6 ${i} finished with status ${st6}"
- fi
+# perform network tests
+if [[ ${run4} -eq 1 || ${run6} -eq 1 ]]; then
+ for i in ${LINUX_TEST}; do
- let "st = st4 + st6"
- if [[ $st -ne 0 ]]; then
- echo "ERROR test ${i} FAILED"
- exit $st
- fi
-done
+ echo "starting test ${i}"
+
+ st4=0
+ if [[ ${run4} -ne 0 ]]; then
+ /bin/bash ${DIR}/linux_test4.sh ${i}
+ st4=$?
+ echo "test4 ${i} finished with status ${st4}"
+ fi
+
+ st6=0
+ if [[ ${run6} -ne 0 ]]; then
+ /bin/bash ${DIR}/linux_test6.sh ${i}
+ st6=$?
+ echo "test6 ${i} finished with status ${st6}"
+ fi
+
+ let "st = st4 + st6"
+ if [[ $st -ne 0 ]]; then
+ echo "ERROR test ${i} FAILED"
+ exit $st
+ fi
+ done
+fi
--- /dev/null
+#!/usr/bin/env python3
+
+from scapy.all import *
+import unittest
+import pkttest
+
+
+SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001"
+DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001"
+SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64"
+DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64"
+
+
+def config():
+ return """
+sp ipv6 out esp protect 5 pri 1 \\
+src {0} \\
+dst {1} \\
+sport 0:65535 dport 0:65535
+
+sp ipv6 in esp protect 6 pri 1 \\
+src {1} \\
+dst {0} \\
+sport 0:65535 dport 0:65535
+
+sa out 5 cipher_algo null auth_algo null mode transport
+sa in 6 cipher_algo null auth_algo null mode transport
+
+rt ipv6 dst {0} port 1
+rt ipv6 dst {1} port 0
+""".format(SRC_NET, DST_NET)
+
+
+class TestTransportWithIPv6Ext(unittest.TestCase):
+ # There is a bug in the IPsec Scapy implementation
+ # which causes invalid packet reconstruction after
+ # successful decryption. This method is a workaround.
+ @staticmethod
+ def decrypt(pkt, sa):
+ esp = pkt[ESP]
+
+ # decrypt dummy packet with no extensions
+ d = sa.decrypt(IPv6()/esp)
+
+ # fix 'next header' in the preceding header of the original
+ # packet and remove ESP
+ pkt[ESP].underlayer.nh = d[IPv6].nh
+ pkt[ESP].underlayer.remove_payload()
+
+ # combine L3 header with decrypted payload
+ npkt = pkt/d[IPv6].payload
+
+ # fix length
+ npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
+
+ return npkt
+
+ def setUp(self):
+ self.px = pkttest.PacketXfer()
+ self.outb_sa = SecurityAssociation(ESP, spi=5)
+ self.inb_sa = SecurityAssociation(ESP, spi=6)
+
+ def test_outb_ipv6_noopt(self):
+ pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+ pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+ # send and check response
+ resp = self.px.xfer_unprotected(pkt)
+ self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
+ self.assertEqual(resp[ESP].spi, 5)
+
+ # decrypt response, check packet after decryption
+ d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+ self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
+ self.assertEqual(d[UDP].sport, 123)
+ self.assertEqual(d[UDP].dport, 456)
+ self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+ def test_outb_ipv6_opt(self):
+ hoptions = []
+ hoptions.append(RouterAlert(value=2))
+ hoptions.append(Jumbo(jumboplen=5000))
+ hoptions.append(Pad1())
+
+ doptions = []
+ doptions.append(HAO(hoa="1234::4321"))
+
+ pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
+ pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+ pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+ pkt /= IPv6ExtHdrDestOpt(options=doptions)
+ pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+
+ # send and check response
+ resp = self.px.xfer_unprotected(pkt)
+ self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+
+ # check extensions
+ self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+ self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+ self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
+
+ # check ESP
+ self.assertEqual(resp[ESP].spi, 5)
+
+ # decrypt response, check packet after decryption
+ d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
+ self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
+ self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+ self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+ self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+ # check UDP
+ self.assertEqual(d[UDP].sport, 123)
+ self.assertEqual(d[UDP].dport, 456)
+ self.assertEqual(bytes(d[UDP].payload), b'abc')
+
+ def test_inb_ipv6_noopt(self):
+ # encrypt and send raw UDP packet
+ pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+ pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+ e = self.inb_sa.encrypt(pkt)
+
+ # send and check response
+ resp = self.px.xfer_protected(e)
+ self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
+
+ # check UDP packet
+ self.assertEqual(resp[UDP].sport, 123)
+ self.assertEqual(resp[UDP].dport, 456)
+ self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+ def test_inb_ipv6_opt(self):
+ hoptions = []
+ hoptions.append(RouterAlert(value=2))
+ hoptions.append(Jumbo(jumboplen=5000))
+ hoptions.append(Pad1())
+
+ doptions = []
+ doptions.append(HAO(hoa="1234::4321"))
+
+ # prepare packet with options
+ pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
+ pkt /= IPv6ExtHdrHopByHop(options=hoptions)
+ pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
+ pkt /= IPv6ExtHdrDestOpt(options=doptions)
+ pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
+ e = self.inb_sa.encrypt(pkt)
+
+ # self encrypted packet and check response
+ resp = self.px.xfer_protected(e)
+ self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
+ self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
+ self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
+ self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
+
+ # check UDP
+ self.assertEqual(resp[UDP].sport, 123)
+ self.assertEqual(resp[UDP].dport, 456)
+ self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+ def test_inb_ipv6_frag(self):
+ # prepare ESP payload
+ pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
+ e = self.inb_sa.encrypt(pkt)
+
+ # craft and send inbound packet
+ e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
+ resp = self.px.xfer_protected(e)
+
+ # check response
+ self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
+ self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
+
+ # check UDP
+ self.assertEqual(resp[UDP].sport, 123)
+ self.assertEqual(resp[UDP].dport, 456)
+ self.assertEqual(bytes(resp[UDP].payload), b'abc')
+
+
+pkttest.pkttest()