aboutsummaryrefslogtreecommitdiffstats
path: root/test/asf/test_session.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/asf/test_session.py')
-rw-r--r--test/asf/test_session.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/test/asf/test_session.py b/test/asf/test_session.py
index 184ec4fba54..957f3234271 100644
--- a/test/asf/test_session.py
+++ b/test/asf/test_session.py
@@ -8,7 +8,9 @@ from asfframework import (
tag_fixme_vpp_workers,
tag_run_solo,
)
+from vpp_papi import VppEnum
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from ipaddress import IPv4Network
from config import config
@@ -149,6 +151,85 @@ class TestSessionUnitTests(VppAsfTestCase):
self.vapi.session_enable_disable(is_enable=0)
+@tag_fixme_vpp_workers
+class TestSessionRuleTableTests(VppAsfTestCase):
+ """Session Rule Table Tests Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSessionRuleTableTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSessionRuleTableTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestSessionRuleTableTests, self).setUp()
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_RULE_TABLE
+ )
+
+ def test_session_rule_table(self):
+ """Session Rule Table Tests"""
+
+ LCL_IP = "172.100.1.1/32"
+ RMT_IP = "172.100.1.2/32"
+ LCL_PORT = 5000
+ RMT_PORT = 80
+
+ # Add a rule table entry
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl=LCL_IP,
+ rmt=RMT_IP,
+ lcl_port=LCL_PORT,
+ rmt_port=RMT_PORT,
+ action_index=1,
+ is_add=1,
+ appns_index=0,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ tag="rule-1",
+ )
+
+ # Verify it is correctly injected
+ dump = self.vapi.session_rules_dump()
+ self.assertTrue(len(dump) > 1)
+ self.assertEqual(dump[1].rmt_port, RMT_PORT)
+ self.assertEqual(dump[1].lcl_port, LCL_PORT)
+ self.assertEqual(dump[1].lcl, IPv4Network(LCL_IP))
+ self.assertEqual(dump[1].rmt, IPv4Network(RMT_IP))
+ self.assertEqual(dump[1].action_index, 1)
+ self.assertEqual(dump[1].appns_index, 0)
+ self.assertEqual(
+ dump[1].scope,
+ VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ )
+
+ # Delete the entry
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl=LCL_IP,
+ rmt=RMT_IP,
+ lcl_port=LCL_PORT,
+ rmt_port=RMT_PORT,
+ action_index=1,
+ is_add=0,
+ appns_index=0,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ tag="rule-1",
+ )
+ dump2 = self.vapi.session_rules_dump()
+
+ # Verify it is removed
+ self.assertTrue((len(dump) - 1) == len(dump2))
+
+ def tearDown(self):
+ super(TestSessionRuleTableTests, self).tearDown()
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_DISABLE
+ )
+
+
@tag_run_solo
class TestSegmentManagerTests(VppAsfTestCase):
"""SVM Fifo Unit Tests Case"""