summaryrefslogtreecommitdiffstats
path: root/test/test_nat44_ed.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_nat44_ed.py')
-rw-r--r--test/test_nat44_ed.py193
1 files changed, 192 insertions, 1 deletions
diff --git a/test/test_nat44_ed.py b/test/test_nat44_ed.py
index 77459874c09..d90afd27025 100644
--- a/test/test_nat44_ed.py
+++ b/test/test_nat44_ed.py
@@ -71,7 +71,7 @@ class TestNAT44ED(VppTestCase):
@staticmethod
def random_port():
- return randint(1025, 65535)
+ return randint(1024, 65535)
@staticmethod
def proto2layer(proto):
@@ -2358,6 +2358,197 @@ class TestNAT44ED(VppTestCase):
% (p_sent[IP].src, p_recvd[IP].src, a),
)
+ def test_dynamic_edge_ports(self):
+ """NAT44ED dynamic translation test: edge ports"""
+
+ worker_count = self.vpp_worker_count or 1
+ port_offset = 1024
+ port_per_thread = (65536 - port_offset) // worker_count
+ port_count = port_per_thread * worker_count
+
+ # worker thread edge ports
+ thread_edge_ports = {0, port_offset - 1, 65535}
+ for i in range(0, worker_count):
+ port_thread_offset = (port_per_thread * i) + port_offset
+ for port_range_offset in [0, port_per_thread - 1]:
+ port = port_thread_offset + port_range_offset
+ thread_edge_ports.add(port)
+ thread_drop_ports = set(
+ filter(
+ lambda x: x not in range(port_offset, port_offset + port_count),
+ thread_edge_ports,
+ )
+ )
+
+ in_if = self.pg7
+ out_if = self.pg8
+
+ self.nat_add_address(self.nat_addr)
+
+ try:
+ self.configure_ip4_interface(in_if, hosts=worker_count)
+ self.configure_ip4_interface(out_if)
+
+ self.nat_add_inside_interface(in_if)
+ self.nat_add_outside_interface(out_if)
+
+ # in2out
+ tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+ uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+ ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+ dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+ pkt_count = worker_count * len(thread_edge_ports)
+
+ i2o_pkts = [[] for x in range(0, worker_count)]
+ for i in range(0, worker_count):
+ remote_host = in_if.remote_hosts[i]
+ for port in thread_edge_ports:
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / TCP(sport=port, dport=port)
+ )
+ i2o_pkts[i].append(p)
+
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / UDP(sport=port, dport=port)
+ )
+ i2o_pkts[i].append(p)
+
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=remote_host.ip4, dst=out_if.remote_ip4)
+ / ICMP(id=port, seq=port, type="echo-request")
+ )
+ i2o_pkts[i].append(p)
+
+ for i in range(0, worker_count):
+ if len(i2o_pkts[i]) > 0:
+ in_if.add_stream(i2o_pkts[i], worker=i)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = out_if.get_capture(pkt_count * 3)
+ for packet in capture:
+ self.assert_packet_checksums_valid(packet)
+ if packet.haslayer(TCP):
+ self.assert_in_range(
+ packet[TCP].sport,
+ port_offset,
+ port_offset + port_count,
+ "src TCP port",
+ )
+ elif packet.haslayer(UDP):
+ self.assert_in_range(
+ packet[UDP].sport,
+ port_offset,
+ port_offset + port_count,
+ "src UDP port",
+ )
+ elif packet.haslayer(ICMP):
+ self.assert_in_range(
+ packet[ICMP].id,
+ port_offset,
+ port_offset + port_count,
+ "ICMP id",
+ )
+ else:
+ self.fail(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
+
+ if_idx = in_if.sw_if_index
+ tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"]
+ uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"]
+ ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"]
+ dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"]
+
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
+
+ # out2in
+ tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+ uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+ ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+ dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+ dc3 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+ # replies to unchanged thread ports should pass on each worker,
+ # excluding packets outside dynamic port range
+ drop_count = worker_count * len(thread_drop_ports)
+ pass_count = worker_count * len(thread_edge_ports) - drop_count
+
+ o2i_pkts = [[] for x in range(0, worker_count)]
+ for i in range(0, worker_count):
+ for port in thread_edge_ports:
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=port, dport=port)
+ )
+ o2i_pkts[i].append(p)
+
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / UDP(sport=port, dport=port)
+ )
+ o2i_pkts[i].append(p)
+
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / ICMP(id=port, seq=port, type="echo-reply")
+ )
+ o2i_pkts[i].append(p)
+
+ for i in range(0, worker_count):
+ if len(o2i_pkts[i]) > 0:
+ out_if.add_stream(o2i_pkts[i], worker=i)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = in_if.get_capture(pass_count * 3)
+ for packet in capture:
+ self.assert_packet_checksums_valid(packet)
+ if packet.haslayer(TCP):
+ self.assertIn(packet[TCP].dport, thread_edge_ports, "dst TCP port")
+ self.assertEqual(packet[TCP].dport, packet[TCP].sport, "TCP ports")
+ elif packet.haslayer(UDP):
+ self.assertIn(packet[UDP].dport, thread_edge_ports, "dst UDP port")
+ self.assertEqual(packet[UDP].dport, packet[UDP].sport, "UDP ports")
+ elif packet.haslayer(ICMP):
+ self.assertIn(packet[ICMP].id, thread_edge_ports, "ICMP id")
+ self.assertEqual(packet[ICMP].id, packet[ICMP].seq, "ICMP id & seq")
+ else:
+ self.fail(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
+
+ if_idx = out_if.sw_if_index
+ tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"]
+ uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"]
+ ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"]
+ dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"]
+ dc4 = self.statistics["/nat44-ed/out2in/slowpath/drops"]
+
+ self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pass_count)
+ self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pass_count)
+ self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pass_count)
+ self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0)
+ self.assertEqual(
+ dc4[:, if_idx].sum() - dc3[:, if_idx].sum(), drop_count * 3
+ )
+
+ finally:
+ in_if.unconfig()
+ out_if.unconfig()
+
class TestNAT44EDMW(TestNAT44ED):
"""NAT44ED MW Test Case"""