aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorPiotr Bronowski <piotrx.bronowski@intel.com>2022-09-20 14:44:36 +0000
committerFan Zhang <royzhang1980@hotmail.com>2022-09-21 15:11:54 +0000
commit06abf235269558fe75d72019a437337b24d7199e (patch)
treef1827b0781e2821e5ceed55117f21afa428180b2 /test
parenta2a7a4031be4896529cce591c26e8cebe8ca22ec (diff)
ipsec: introduce fast path ipv6 inbound matching
This patch introduces fast path matching for inbound traffic ipv6. Fast path uses bihash tables in order to find matching policy. Adding and removing policies in fast path is much faster than in current implementation. It is still new feature and further work needs and can be done in order to improve the perfromance. Type: feature Change-Id: Iaef6638033666ad6eb028ffe0c8a4f4374451753 Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Diffstat (limited to 'test')
-rw-r--r--test/test_ipsec_spd_fp_input.py117
1 files changed, 77 insertions, 40 deletions
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index 199fbdf7c5d..9037ed25902 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -7,7 +7,6 @@ from framework import VppTestRunner
from template_ipsec import IPSecIPv4Fwd
from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
-import pdb
def debug_signal_handler(signal, frame):
@@ -35,30 +34,6 @@ class SpdFastPathInbound(IPSecIPv4Fwd):
cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
- @classmethod
- def create_enc_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
- packets = []
- params = self.params[socket.AF_INET]
- for i in range(pkt_count):
- # create packet info stored in the test case instance
- info = self.create_packet_info(src_if, dst_if)
- # convert the info into packet payload
- payload = self.info_to_payload(info)
- # create the packet itself
- p = Ether(
- src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
- ) / params.scapy_tra_sa.encrypt(
- IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
- )
- # store a copy of the packet in the packet info
- info.data = p.copy()
- # append the packet to the list
- packets.append(p)
- # return the created packet list
- return packets
-
class SpdFastPathInboundProtect(TemplateIpsecEsp):
@classmethod
@@ -98,6 +73,29 @@ class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+
+ @classmethod
+ def setUpClass(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(SpdFastPathIPv6InboundProtect, self).setUp()
+
+ def tearDown(self):
+ self.unconfig_network()
+ super(SpdFastPathIPv6InboundProtect, self).tearDown()
+
+
class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
""" IPSec/IPv4 inbound: Policy mode test case with fast path \
(add bypass)"""
@@ -206,17 +204,12 @@ class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
1,
- self.pg1,
self.pg0,
+ self.pg1,
socket.IPPROTO_UDP,
is_out=0,
priority=10,
policy_type="discard",
- ip_range=True,
- local_ip_start=self.pg0.remote_ip4,
- local_ip_stop=self.pg0.remote_ip4,
- remote_ip_start=self.pg1.remote_ip4,
- remote_ip_stop=self.pg1.remote_ip4,
)
# create output rule so we can capture forwarded packets
@@ -264,16 +257,9 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
super(IPSec4SpdTestCaseProtect, self).tearDown()
def test_ipsec_spd_inbound_protect(self):
- # In this test case, packets in IPv4 FWD path are configured
+ # In this test case, encrypted packets in IPv4
+ # PROTECT path are configured
# to go through IPSec inbound SPD policy lookup.
- #
- # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
- # - High priority rule action is set to DISCARD.
- # - Low priority rule action is set to BYPASS.
- #
- # Since BYPASS rules take precedence over DISCARD
- # (the order being PROTECT, BYPASS, DISCARD) we expect the
- # BYPASS rule to match and traffic to be correctly forwarded.
pkt_count = 5
payload_size = 64
@@ -840,5 +826,56 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
self.verify_policy_match(0, policy_22)
+class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add protect)"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(IPSec6SpdTestCaseProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(IPSec6SpdTestCaseProtect, self).setUp()
+
+ def tearDown(self):
+ super(IPSec6SpdTestCaseProtect, self).tearDown()
+
+ def test_ipsec6_spd_inbound_protect(self):
+ pkt_count = 5
+ payload_size = 64
+ p = self.params[socket.AF_INET6]
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tra_sa,
+ self.tra_if,
+ src=self.tra_if.local_ip6,
+ dst=self.tra_if.remote_ip6,
+ count=pkt_count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
+
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ pkts = p.tra_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ pkts = p.tra_sa_out.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ self.assertEqual(p.tra_sa_out.get_lost(), 0)
+ self.assertEqual(p.tra_sa_in.get_lost(), 0)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)