aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZachary Leaf <zachary.leaf@arm.com>2021-10-26 10:05:58 -0500
committerFan Zhang <roy.fan.zhang@intel.com>2022-03-08 17:43:43 +0000
commit26fec718f2fa7913a484008fca7b1bc015c6efb5 (patch)
treed1ff50ea37c938f7caa4b88ca25885c13f83d4bb
parent1031098b903e6eb4bca4d268350795e6827abdda (diff)
ipsec: input: drop by default for non-matching pkts
As per IPSec RFC4301 [1], any non-matching packets should be dropped by default. This is handled correctly in ipsec_output.c, however in ipsec_input.c non-matching packets are allowed to pass as per a matched BYPASS rule. For full details, see: https://lists.fd.io/g/vpp-dev/topic/ipsec_input_output_default/84943480 It appears the ipsec6_input_node only matches PROTECT policies. Until this is extended to handle BYPASS + DISCARD, we may wish to not drop by default here, since all IPv6 traffic not matching a PROTECT policy will be dropped. [1]: https://datatracker.ietf.org/doc/html/rfc4301 Type: fix Signed-off-by: Zachary Leaf <zachary.leaf@arm.com> Change-Id: Iddbfd008dbe082486d1928f6a10ffbd83d859a20
-rw-r--r--src/vnet/ipsec/ipsec_input.c14
-rw-r--r--test/template_ipsec.py34
-rw-r--r--test/test_ipsec_default.py156
-rw-r--r--test/test_ipsec_spd_flow_cache.py21
4 files changed, 214 insertions, 11 deletions
diff --git a/src/vnet/ipsec/ipsec_input.c b/src/vnet/ipsec/ipsec_input.c
index 96bad28c2b5..c47ea34f288 100644
--- a/src/vnet/ipsec/ipsec_input.c
+++ b/src/vnet/ipsec/ipsec_input.c
@@ -331,6 +331,11 @@ VLIB_NODE_FN (ipsec4_input_node) (vlib_main_t * vm,
p0 = 0;
pi0 = ~0;
};
+
+ /* Drop by default if no match on PROTECT, BYPASS or DISCARD */
+ ipsec_unprocessed += 1;
+ next[0] = IPSEC_INPUT_NEXT_DROP;
+
trace0:
if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE) &&
PREDICT_FALSE (b[0]->flags & VLIB_BUFFER_IS_TRACED))
@@ -427,6 +432,11 @@ VLIB_NODE_FN (ipsec4_input_node) (vlib_main_t * vm,
p0 = 0;
pi0 = ~0;
};
+
+ /* Drop by default if no match on PROTECT, BYPASS or DISCARD */
+ ipsec_unprocessed += 1;
+ next[0] = IPSEC_INPUT_NEXT_DROP;
+
trace1:
if (PREDICT_FALSE (node->flags & VLIB_NODE_FLAG_TRACE) &&
PREDICT_FALSE (b[0]->flags & VLIB_BUFFER_IS_TRACED))
@@ -581,6 +591,8 @@ VLIB_NODE_FN (ipsec6_input_node) (vlib_main_t * vm,
else
{
pi0 = ~0;
+ ipsec_unprocessed += 1;
+ next0 = IPSEC_INPUT_NEXT_DROP;
}
}
else if (ip0->protocol == IP_PROTOCOL_IPSEC_AH)
@@ -608,6 +620,8 @@ VLIB_NODE_FN (ipsec6_input_node) (vlib_main_t * vm,
else
{
pi0 = ~0;
+ ipsec_unprocessed += 1;
+ next0 = IPSEC_INPUT_NEXT_DROP;
}
}
else
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 725cec58846..c2a14e36097 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -1605,19 +1605,14 @@ class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
pass
-class SpdFlowCacheTemplate(VppTestCase):
+class IPSecIPv4Fwd(VppTestCase):
+ """ Test IPSec by capturing and verifying IPv4 forwarded pkts """
@classmethod
def setUpConstants(cls):
- super(SpdFlowCacheTemplate, cls).setUpConstants()
- # Override this method with required cmdline parameters e.g.
- # cls.vpp_cmdline.extend(["ipsec", "{",
- # "ipv4-outbound-spd-flow-cache on",
- # "}"])
- # cls.logger.info("VPP modified cmdline is %s" % " "
- # .join(cls.vpp_cmdline))
+ super(IPSecIPv4Fwd, cls).setUpConstants()
def setUp(self):
- super(SpdFlowCacheTemplate, self).setUp()
+ super(IPSecIPv4Fwd, self).setUp()
# store SPD objects so we can remove configs on tear down
self.spd_objs = []
self.spd_policies = []
@@ -1635,7 +1630,7 @@ class SpdFlowCacheTemplate(VppTestCase):
for pg in self.pg_interfaces:
pg.unconfig_ip4()
pg.admin_down()
- super(SpdFlowCacheTemplate, self).tearDown()
+ super(IPSecIPv4Fwd, self).tearDown()
def create_interfaces(self, num_ifs=2):
# create interfaces pg0 ... pg<num_ifs>
@@ -1772,6 +1767,24 @@ class SpdFlowCacheTemplate(VppTestCase):
"Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
self.assert_equal(pkt_count, matched_pkts)
+
+class SpdFlowCacheTemplate(IPSecIPv4Fwd):
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFlowCacheTemplate, cls).setUpConstants()
+ # Override this method with required cmdline parameters e.g.
+ # cls.vpp_cmdline.extend(["ipsec", "{",
+ # "ipv4-outbound-spd-flow-cache on",
+ # "}"])
+ # cls.logger.info("VPP modified cmdline is %s" % " "
+ # .join(cls.vpp_cmdline))
+
+ def setUp(self):
+ super(SpdFlowCacheTemplate, self).setUp()
+
+ def tearDown(self):
+ super(SpdFlowCacheTemplate, self).tearDown()
+
def get_spd_flow_cache_entries(self):
""" 'show ipsec spd' output:
ip4-outbound-spd-flow-cache-entries: 0
@@ -1809,6 +1822,5 @@ class SpdFlowCacheTemplate(VppTestCase):
self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
return False
-
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_ipsec_default.py b/test/test_ipsec_default.py
new file mode 100644
index 00000000000..71bbd759297
--- /dev/null
+++ b/test/test_ipsec_default.py
@@ -0,0 +1,156 @@
+import socket
+import unittest
+
+from util import ppp
+from framework import VppTestRunner
+from template_ipsec import IPSecIPv4Fwd
+
+"""
+When an IPSec SPD is configured on an interface, any inbound packets
+not matching inbound policies, or outbound packets not matching outbound
+policies, must be dropped by default as per RFC4301.
+
+This test uses simple IPv4 forwarding on interfaces with IPSec enabled
+to check if packets with no matching rules are dropped by default.
+
+The basic setup is a single SPD bound to two interfaces, pg0 and pg1.
+
+ ┌────┐ ┌────┐
+ │SPD1│ │SPD1│
+ ├────┤ ─────> ├────┤
+ │PG0 │ │PG1 │
+ └────┘ └────┘
+
+First, both inbound and outbound BYPASS policies are configured allowing
+traffic to pass from pg0 -> pg1.
+
+Packets are captured and verified at pg1.
+
+Then either the inbound or outbound policies are removed and we verify
+packets are dropped as expected.
+
+"""
+
+
+class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
+ """ IPSec: inbound packets drop by default with no matching rule """
+ def test_ipsec_inbound_default_drop(self):
+ # configure two interfaces and bind the same SPD to both
+ self.create_interfaces(2)
+ self.spd_create_and_intf_add(1, self.pg_interfaces)
+ pkt_count = 5
+
+ # catch-all inbound BYPASS policy, all interfaces
+ inbound_policy = self.spd_add_rem_policy(
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True)
+
+ # outbound BYPASS policy allowing traffic from pg0->pg1
+ outbound_policy = self.spd_add_rem_policy(
+ 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
+ is_out=1, priority=10, policy_type="bypass")
+
+ # create a packet stream pg0->pg1 + add to pg0
+ packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
+ self.pg0.add_stream(packets0)
+
+ # with inbound BYPASS rule at pg0, we expect to see forwarded
+ # packets on pg1
+ self.pg_interfaces[1].enable_capture()
+ self.pg_start()
+ cap1 = self.pg1.get_capture()
+ for packet in cap1:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(
+ ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(cap1.res))
+ # verify captures on pg1
+ self.verify_capture(self.pg0, self.pg1, cap1)
+ # verify policies matched correct number of times
+ self.verify_policy_match(pkt_count, inbound_policy)
+ self.verify_policy_match(pkt_count, outbound_policy)
+
+ # remove inbound catch-all BYPASS rule, traffic should now be dropped
+ self.spd_add_rem_policy( # inbound, all interfaces
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True, remove=True)
+
+ # create another packet stream pg0->pg1 + add to pg0
+ packets1 = self.create_stream(self.pg0, self.pg1, pkt_count)
+ self.pg0.add_stream(packets1)
+ self.pg_interfaces[1].enable_capture()
+ self.pg_start()
+ # confirm traffic has now been dropped
+ self.pg1.assert_nothing_captured("inbound pkts with no matching \
+ rules NOT dropped by default")
+ # both policies should not have matched any further packets
+ # since we've dropped at input stage
+ self.verify_policy_match(pkt_count, outbound_policy)
+ self.verify_policy_match(pkt_count, inbound_policy)
+
+
+class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
+ """ IPSec: outbound packets drop by default with no matching rule """
+ def test_ipsec_inbound_default_drop(self):
+ # configure two interfaces and bind the same SPD to both
+ self.create_interfaces(2)
+ self.spd_create_and_intf_add(1, self.pg_interfaces)
+ pkt_count = 5
+
+ # catch-all inbound BYPASS policy, all interfaces
+ inbound_policy = self.spd_add_rem_policy(
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True)
+
+ # outbound BYPASS policy allowing traffic from pg0->pg1
+ outbound_policy = self.spd_add_rem_policy(
+ 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
+ is_out=1, priority=10, policy_type="bypass")
+
+ # create a packet stream pg0->pg1 + add to pg0
+ packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
+ self.pg0.add_stream(packets0)
+
+ # with outbound BYPASS rule allowing pg0->pg1, we expect to see
+ # forwarded packets on pg1
+ self.pg_interfaces[1].enable_capture()
+ self.pg_start()
+ cap1 = self.pg1.get_capture()
+ for packet in cap1:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(
+ ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(cap1.res))
+ # verify captures on pg1
+ self.verify_capture(self.pg0, self.pg1, cap1)
+ # verify policies matched correct number of times
+ self.verify_policy_match(pkt_count, inbound_policy)
+ self.verify_policy_match(pkt_count, outbound_policy)
+
+ # remove outbound rule
+ self.spd_add_rem_policy(
+ 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
+ is_out=1, priority=10, policy_type="bypass",
+ remove=True)
+
+ # create another packet stream pg0->pg1 + add to pg0
+ packets1 = self.create_stream(self.pg0, self.pg1, pkt_count)
+ self.pg0.add_stream(packets1)
+ self.pg_interfaces[1].enable_capture()
+ self.pg_start()
+ # confirm traffic was dropped and not forwarded
+ self.pg1.assert_nothing_captured("outbound pkts with no matching \
+ rules NOT dropped by default")
+ # inbound rule should have matched twice the # of pkts now
+ self.verify_policy_match(pkt_count*2, inbound_policy)
+ # as dropped at outbound, outbound policy is the same
+ self.verify_policy_match(pkt_count, outbound_policy)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_ipsec_spd_flow_cache.py b/test/test_ipsec_spd_flow_cache.py
index 0c26e7b9e6a..54571c6741a 100644
--- a/test/test_ipsec_spd_flow_cache.py
+++ b/test/test_ipsec_spd_flow_cache.py
@@ -306,6 +306,13 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
1, self.pg2, self.pg0, socket.IPPROTO_UDP,
is_out=1, priority=10, policy_type="discard")
+ # interfaces bound to an SPD, will by default drop inbound
+ # traffic with no matching policies. add catch-all inbound
+ # bypass rule to SPD:
+ self.spd_add_rem_policy( # inbound, all interfaces
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True)
+
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -390,6 +397,13 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
1, self.pg2, self.pg0, socket.IPPROTO_UDP,
is_out=1, priority=10, policy_type="discard")
+ # interfaces bound to an SPD, will by default drop inbound
+ # traffic with no matching policies. add catch-all inbound
+ # bypass rule to SPD:
+ self.spd_add_rem_policy( # inbound, all interfaces
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True)
+
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -527,6 +541,13 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
1, self.pg2, self.pg0, socket.IPPROTO_UDP,
is_out=1, priority=10, policy_type="bypass")
+ # interfaces bound to an SPD, will by default drop inbound
+ # traffic with no matching policies. add catch-all inbound
+ # bypass rule to SPD:
+ self.spd_add_rem_policy( # inbound, all interfaces
+ 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
+ policy_type="bypass", all_ips=True)
+
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)