aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/nat/test/test_nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/nat/test/test_nat.py')
-rw-r--r--src/plugins/nat/test/test_nat.py150
1 files changed, 95 insertions, 55 deletions
diff --git a/src/plugins/nat/test/test_nat.py b/src/plugins/nat/test/test_nat.py
index 29fd5ca23c5..0daa61042c1 100644
--- a/src/plugins/nat/test/test_nat.py
+++ b/src/plugins/nat/test/test_nat.py
@@ -2606,61 +2606,6 @@ class TestNAT44(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
- def test_max_translations_per_user(self):
- """ MAX translations per user - recycle the least recently used """
-
- self.nat44_add_address(self.nat_addr)
- flags = self.config_flags.NAT_IS_INSIDE
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
- self.vapi.nat44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- # get maximum number of translations per user
- nat44_config = self.vapi.nat_show_config()
-
- # send more than maximum number of translations per user packets
- pkts_num = nat44_config.max_translations_per_user + 5
- pkts = []
- for port in range(0, pkts_num):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1025 + port))
- pkts.append(p)
- self.pg0.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # verify number of translated packet
- self.pg1.get_capture(pkts_num)
-
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user)
- self.assertEqual(user.nstaticsessions, 0)
-
- tcp_port = 22
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- tcp_port, tcp_port,
- proto=IP_PROTOS.tcp)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=tcp_port))
- self.pg0.add_stream(p)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.pg1.get_capture(1)
- users = self.vapi.nat44_user_dump()
- for user in users:
- if user.ip_address == self.pg0.remote_ip4:
- self.assertEqual(user.nsessions,
- nat44_config.max_translations_per_user - 1)
- self.assertEqual(user.nstaticsessions, 1)
-
def test_interface_addr(self):
""" Acquire NAT44 addresses from interface """
self.vapi.nat44_add_del_interface_addr(
@@ -4386,6 +4331,101 @@ class TestNAT44(MethodHolder):
self.logger.info(self.vapi.cli("show nat ha"))
+class TestNAT44EndpointDependent2(MethodHolder):
+ """ Endpoint-Dependent session test cases """
+
+ icmp_timeout = 2
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent",
+ "translation", "hash", "buckets", "1",
+ "icmp", "timeout", str(cls.icmp_timeout), "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44EndpointDependent2, cls).setUpClass()
+ try:
+ translation_buckets = 1
+ cls.max_translations = 10 * translation_buckets
+
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces[0:2])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.pg0.generate_remote_hosts(1)
+ cls.pg0.configure_ipv4_neighbors()
+
+ cls.pg1.generate_remote_hosts(1)
+ cls.pg1.configure_ipv4_neighbors()
+
+ except Exception:
+ super(TestNAT44EndpointDependent2, cls).tearDownClass()
+ raise
+
+ def create_icmp_stream(self, in_if, out_if, count):
+ """
+ Create ICMP packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ :param count: Number of packets
+ """
+
+ self.assertTrue(count > 0)
+ icmp_id = random.randint(0, 65535 - (count - 1))
+
+ pkts = list()
+ for i in range(count):
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) /
+ ICMP(id=icmp_id + i, type='echo-request'))
+ pkts.append(p)
+ return pkts
+
+ def send_pkts(self, pkts, expected=None):
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ return self.pg1.get_capture(
+ len(pkts) if expected is None else expected)
+
+ def test_session_cleanup(self):
+ """ NAT44 session cleanup test """
+
+ self.nat44_add_address(self.pg1.local_ip4)
+ flags = self.config_flags.NAT_IS_INSIDE
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg0.sw_if_index,
+ flags=flags, is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ sw_if_index=self.pg1.sw_if_index,
+ is_add=1)
+
+ nat_config = self.vapi.nat_show_config()
+ self.assertEqual(1, nat_config.endpoint_dependent)
+
+ pkts = self.create_icmp_stream(self.pg0, self.pg1,
+ self.max_translations + 2)
+ sz = len(pkts)
+
+ # positive test
+ self.send_pkts(pkts[0:self.max_translations])
+
+ # false positive test
+ self.send_pkts(pkts[self.max_translations:sz - 1], 0)
+
+ sleep(self.icmp_timeout)
+
+ # positive test
+ self.send_pkts(pkts[self.max_translations + 1:sz])
+
+
class TestNAT44EndpointDependent(MethodHolder):
""" Endpoint-Dependent mapping and filtering test cases """