summaryrefslogtreecommitdiffstats
path: root/src/plugins/cnat/test
diff options
context:
space:
mode:
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>2020-09-10 17:44:41 +0200
committerDave Barach <openvpp@barachs.net>2020-09-25 19:55:39 +0000
commit613b2c3c78fbec12cc87a0095ee5488252449698 (patch)
treeb85ba0853e4ad8127a1213242d2a42d85e256b85 /src/plugins/cnat/test
parentece39214bcb05c535ba5de9af97b5f84f6911cba (diff)
cnat: Add support for SNat ICMP
Type: feature snat supports : * echo request/reply by allocating an identifier when translating echo requests * icmp errors in the same manner as dnat Change-Id: I684e983b0181f95c5eace5a984d40084e5625fa4 Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
Diffstat (limited to 'src/plugins/cnat/test')
-rw-r--r--src/plugins/cnat/test/test_cnat.py271
1 files changed, 201 insertions, 70 deletions
diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py
index 34cd8b58240..3f8d33cec4e 100644
--- a/src/plugins/cnat/test/test_cnat.py
+++ b/src/plugins/cnat/test/test_cnat.py
@@ -10,6 +10,7 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
import struct
@@ -123,34 +124,6 @@ class VppCNatTranslation(VppObject):
return c[0][self.id]
-class VppCNATSourceNat(VppObject):
-
- def __init__(self, test, address, exclude_subnets=[]):
- self._test = test
- self.address = address
- self.exclude_subnets = exclude_subnets
-
- def add_vpp_config(self):
- a = ip_address(self.address)
- if 4 == a.version:
- self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
- else:
- self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
- for subnet in self.exclude_subnets:
- self.cnat_exclude_subnet(subnet, True)
-
- def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
- add = 1 if isAdd else 0
- self._test.vapi.cnat_add_del_snat_prefix(
- prefix=exclude_subnet, is_add=add)
-
- def query_vpp_config(self):
- return False
-
- def remove_vpp_config(self):
- return False
-
-
class TestCNatTranslation(VppTestCase):
""" CNat Translation """
extra_vpp_punt_config = ["cnat", "{",
@@ -568,49 +541,219 @@ class TestCNatSourceNAT(VppTestCase):
i.config_ip6()
i.resolve_ndp()
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+ self.pg1.generate_remote_hosts(2)
+ self.pg1.configure_ipv4_neighbors()
+ self.pg1.configure_ipv6_neighbors()
+
+ self.vapi.cli("test cnat scanner off")
+ self.vapi.cnat_set_snat_addresses(
+ snat_ip4=self.pg2.remote_hosts[0].ip4,
+ snat_ip6=self.pg2.remote_hosts[0].ip6)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip6-unicast",
+ feature_name="ip6-cnat-snat",
+ sw_if_index=self.pg0.sw_if_index)
+ self.vapi.feature_enable_disable(
+ enable=1,
+ arc_name="ip4-unicast",
+ feature_name="ip4-cnat-snat",
+ sw_if_index=self.pg0.sw_if_index)
+
def tearDown(self):
+ self.vapi.cnat_session_purge()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
super(TestCNatSourceNAT, self).tearDown()
- def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
- t1 = VppCNATSourceNat(self, srcNatAddr)
- t1.add_vpp_config()
- cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
- cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
- self.vapi.feature_enable_disable(
- enable=1,
- arc_name=cnat_arc_name,
- feature_name=cnat_feature_name,
- sw_if_index=interface.sw_if_index)
+ def test_snat_v6(self):
+ # """ CNat Source Nat v6 """
+ self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
+ self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
+ self.sourcenat_test_icmp_err_conf(isV6=True)
+ self.sourcenat_test_icmp_echo6_conf()
- return t1
+ def test_snat_v4(self):
+ # """ CNat Source Nat v4 """
+ self.sourcenat_test_tcp_udp_conf(TCP)
+ self.sourcenat_test_tcp_udp_conf(UDP)
+ self.sourcenat_test_icmp_err_conf()
+ self.sourcenat_test_icmp_echo4_conf()
- def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
- IP46 = IPv6 if isV6 else IP
- sports = [1234, 1235, 1236]
- dports = [6661, 6662, 6663]
+ def sourcenat_test_icmp_echo6_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
- self.pg0.generate_remote_hosts(1)
- self.pg0.configure_ipv4_neighbors()
- self.pg0.configure_ipv6_neighbors()
- self.pg1.generate_remote_hosts(len(sports))
- self.pg1.configure_ipv4_neighbors()
- self.pg1.configure_ipv6_neighbors()
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
- self.vapi.cli("test cnat scanner on")
- t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IPv6(src=client_addr, dst=remote_addr) /
+ ICMPv6EchoRequest(id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMPv6EchoRequest].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IPv6(src=remote_addr, dst=src_nat_addr) /
+ ICMPv6EchoReply(id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IPv6].src, remote_addr)
+ self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
+
+ def sourcenat_test_icmp_echo4_conf(self):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+
+ # ping from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ ICMP(type=8, id=0xfeed) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ self.assert_packet_checksums_valid(rx)
+
+ received_id = rx[0][ICMP].id
+ # ping reply from outside to pods
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMP(type=0, id=received_id))
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP].id, 0xfeed)
+
+ def sourcenat_test_icmp_err_conf(self, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
for nbr, remote_host in enumerate(self.pg1.remote_hosts):
if isV6:
+ IP46 = IPv6
client_addr = self.pg0.remote_hosts[0].ip6
remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ ICMP46 = ICMPv6DestUnreach
+ ICMPelem = ICMPv6DestUnreach(code=1)
+ IP46error = IPerror6
else:
+ IP46 = IP
client_addr = self.pg0.remote_hosts[0].ip4
remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ IP46error = IPerror
+ ICMP46 = ICMP
+ ICMPelem = ICMP(type=11)
+
+ # from pods to outside network
+ p1 = (
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
+ TCP(sport=sports[nbr], dport=dports[nbr]) /
+ Raw())
+
+ rxs = self.send_and_expect(
+ self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[TCP].dport, dports[nbr])
+ self.assertEqual(rx[IP46].src, src_nat_addr)
+ sport = rx[TCP].sport
+
+ InnerIP = rxs[0][IP46]
+ # from outside to pods, ICMP error
+ p2 = (
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(
+ self.pg1,
+ p2 * N_PKTS,
+ self.pg0)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].sport, sports[nbr])
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPerror].dport, dports[nbr])
+
+ def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
+ sports = [1234, 1235]
+ dports = [6661, 6662]
+
+ for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ if isV6:
+ IP46 = IPv6
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ src_nat_addr = self.pg2.remote_hosts[0].ip6
+ exclude_prefix = ip_network(
+ "%s/100" % remote_addr, strict=False)
+ else:
+ IP46 = IP
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ src_nat_addr = self.pg2.remote_hosts[0].ip4
+ exclude_prefix = ip_network(
+ "%s/16" % remote_addr, strict=False)
# from pods to outside network
p1 = (
Ether(dst=self.pg0.local_mac,
@@ -627,14 +770,14 @@ class TestCNatSourceNAT(VppTestCase):
self.assert_packet_checksums_valid(rx)
self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, srcNatAddr)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
sport = rx[l4p].sport
# from outside to pods
p2 = (
Ether(dst=self.pg1.local_mac,
src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=srcNatAddr) /
+ IP46(src=remote_addr, dst=src_nat_addr) /
l4p(sport=dports[nbr], dport=sport) /
Raw())
@@ -651,11 +794,7 @@ class TestCNatSourceNAT(VppTestCase):
self.assertEqual(rx[IP46].src, remote_addr)
# add remote host to exclude list
- subnet_mask = 100 if isV6 else 16
- subnet = "%s/%d" % (remote_addr, subnet_mask)
- exclude_subnet = ip_network(subnet, strict=False)
-
- t1.cnat_exclude_subnet(exclude_subnet)
+ self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
@@ -669,7 +808,7 @@ class TestCNatSourceNAT(VppTestCase):
self.assertEqual(rx[IP46].src, client_addr)
# remove remote host from exclude list
- t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
+ self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
self.vapi.cnat_session_purge()
rxs = self.send_and_expect(
@@ -681,17 +820,9 @@ class TestCNatSourceNAT(VppTestCase):
self.assert_packet_checksums_valid(rx)
self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, srcNatAddr)
+ self.assertEqual(rx[IP46].src, src_nat_addr)
- def test_cnat6_sourcenat(self):
- # """ CNat Source Nat ipv6 """
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
-
- def test_cnat4_sourcenat(self):
- # """ CNat Source Nat ipv4 """
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
- self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
+ self.vapi.cnat_session_purge()
if __name__ == '__main__':