aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_spd_fp_input.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_spd_fp_input.py')
-rw-r--r--test/test_ipsec_spd_fp_input.py117
1 files changed, 77 insertions, 40 deletions
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index 199fbdf7c5d..9037ed25902 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -7,7 +7,6 @@ from framework import VppTestRunner
from template_ipsec import IPSecIPv4Fwd
from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
-import pdb
def debug_signal_handler(signal, frame):
@@ -35,30 +34,6 @@ class SpdFastPathInbound(IPSecIPv4Fwd):
cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
- @classmethod
- def create_enc_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
- packets = []
- params = self.params[socket.AF_INET]
- for i in range(pkt_count):
- # create packet info stored in the test case instance
- info = self.create_packet_info(src_if, dst_if)
- # convert the info into packet payload
- payload = self.info_to_payload(info)
- # create the packet itself
- p = Ether(
- src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
- ) / params.scapy_tra_sa.encrypt(
- IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
- )
- # store a copy of the packet in the packet info
- info.data = p.copy()
- # append the packet to the list
- packets.append(p)
- # return the created packet list
- return packets
-
class SpdFastPathInboundProtect(TemplateIpsecEsp):
@classmethod
@@ -98,6 +73,29 @@ class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+
+ @classmethod
+ def setUpClass(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(SpdFastPathIPv6InboundProtect, self).setUp()
+
+ def tearDown(self):
+ self.unconfig_network()
+ super(SpdFastPathIPv6InboundProtect, self).tearDown()
+
+
class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
""" IPSec/IPv4 inbound: Policy mode test case with fast path \
(add bypass)"""
@@ -206,17 +204,12 @@ class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
1,
- self.pg1,
self.pg0,
+ self.pg1,
socket.IPPROTO_UDP,
is_out=0,
priority=10,
policy_type="discard",
- ip_range=True,
- local_ip_start=self.pg0.remote_ip4,
- local_ip_stop=self.pg0.remote_ip4,
- remote_ip_start=self.pg1.remote_ip4,
- remote_ip_stop=self.pg1.remote_ip4,
)
# create output rule so we can capture forwarded packets
@@ -264,16 +257,9 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
super(IPSec4SpdTestCaseProtect, self).tearDown()
def test_ipsec_spd_inbound_protect(self):
- # In this test case, packets in IPv4 FWD path are configured
+ # In this test case, encrypted packets in IPv4
+ # PROTECT path are configured
# to go through IPSec inbound SPD policy lookup.
- #
- # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
- # - High priority rule action is set to DISCARD.
- # - Low priority rule action is set to BYPASS.
- #
- # Since BYPASS rules take precedence over DISCARD
- # (the order being PROTECT, BYPASS, DISCARD) we expect the
- # BYPASS rule to match and traffic to be correctly forwarded.
pkt_count = 5
payload_size = 64
@@ -840,5 +826,56 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
self.verify_policy_match(0, policy_22)
+class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add protect)"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(IPSec6SpdTestCaseProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(IPSec6SpdTestCaseProtect, self).setUp()
+
+ def tearDown(self):
+ super(IPSec6SpdTestCaseProtect, self).tearDown()
+
+ def test_ipsec6_spd_inbound_protect(self):
+ pkt_count = 5
+ payload_size = 64
+ p = self.params[socket.AF_INET6]
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tra_sa,
+ self.tra_if,
+ src=self.tra_if.local_ip6,
+ dst=self.tra_if.remote_ip6,
+ count=pkt_count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ pkts = p.tra_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ pkts = p.tra_sa_out.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ self.assertEqual(p.tra_sa_out.get_lost(), 0)
+ self.assertEqual(p.tra_sa_in.get_lost(), 0)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)