aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/cnat/test/test_cnat.py
diff options
context:
space:
mode:
authorNathan Skrzypczak <nathan.skrzypczak@gmail.com>2020-09-08 15:16:08 +0200
committerDave Barach <openvpp@barachs.net>2020-09-25 19:55:39 +0000
commitece39214bcb05c535ba5de9af97b5f84f6911cba (patch)
treef7c606d8fb1b880d59f150fdfcd9ab1f17da9b1d /src/plugins/cnat/test/test_cnat.py
parent277d402bee6f4ad0ffe185515de44e6457a57a9e (diff)
cnat: Ip ICMP error support
Type: feature Add CNAT translation for ICMP 4 & 6 errors inner packet will be translated according to existing sessions. Change-Id: If118751988f44ef96b800878596296d1ab8ab6f8 Signed-off-by: Nathan Skrzypczak <nathan.skrzypczak@gmail.com>
Diffstat (limited to 'src/plugins/cnat/test/test_cnat.py')
-rw-r--r--src/plugins/cnat/test/test_cnat.py182
1 files changed, 140 insertions, 42 deletions
diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py
index 518d7335edc..34cd8b58240 100644
--- a/src/plugins/cnat/test/test_cnat.py
+++ b/src/plugins/cnat/test/test_cnat.py
@@ -7,8 +7,11 @@ from vpp_ip import DpoProto
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import IP, UDP, TCP, ICMP
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
+
+import struct
from ipaddress import ip_address, ip_network, \
IPv4Address, IPv6Address, IPv4Network, IPv6Network
@@ -39,6 +42,10 @@ class Ep(object):
return {'addr': self.ip,
'port': self.port}
+ @property
+ def isV6(self):
+ return ":" in self.ip
+
def __str__(self):
return ("%s:%d" % (self.ip, self.port))
@@ -180,10 +187,10 @@ class TestCNatTranslation(VppTestCase):
i.admin_down()
super(TestCNatTranslation, self).tearDown()
- def cnat_create_translation(self, vip, nbr, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
+ def cnat_create_translation(self, vip, nbr):
+ ip_v = "ip6" if vip.isV6 else "ip4"
dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
- sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
+ sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
t1 = VppCNatTranslation(
self, vip.l4p, vip,
[EpTuple(sep, dep), EpTuple(sep, dep)])
@@ -341,7 +348,7 @@ class TestCNatTranslation(VppTestCase):
trs = []
for nbr, vip in enumerate(vips):
- trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
+ trs.append(self.cnat_create_translation(vip, nbr))
self.logger.info(self.vapi.cli("sh cnat client"))
self.logger.info(self.vapi.cli("sh cnat translation"))
@@ -372,8 +379,10 @@ class TestCNatTranslation(VppTestCase):
n_tries += 1
sessions = self.vapi.cnat_session_dump()
self.sleep(2)
+ print(self.vapi.cli("show cnat session verbose"))
self.assertTrue(n_tries < 100)
+ self.vapi.cli("test cnat scanner off")
#
# load some flows again and purge
@@ -398,6 +407,109 @@ class TestCNatTranslation(VppTestCase):
self.vapi.cnat_session_purge()
self.assertFalse(self.vapi.cnat_session_dump())
+ def test_icmp(self):
+ vips = [
+ Ep("30.0.0.1", 5555),
+ Ep("30.0.0.2", 5554),
+ Ep("30.0.0.2", 5553, UDP),
+ Ep("30::1", 6666),
+ Ep("30::2", 5553, UDP),
+ ]
+ sport = 1234
+
+ self.pg0.generate_remote_hosts(len(vips))
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+
+ self.pg1.generate_remote_hosts(len(vips))
+ self.pg1.configure_ipv6_neighbors()
+ self.pg1.configure_ipv4_neighbors()
+
+ self.vapi.cli("test cnat scanner off")
+ trs = []
+ for nbr, vip in enumerate(vips):
+ trs.append(self.cnat_create_translation(vip, nbr))
+
+ self.logger.info(self.vapi.cli("sh cnat client"))
+ self.logger.info(self.vapi.cli("sh cnat translation"))
+
+ for nbr, vip in enumerate(vips):
+ if vip.isV6:
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ remote2_addr = self.pg2.remote_hosts[0].ip6
+ else:
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ remote2_addr = self.pg2.remote_hosts[0].ip4
+ IP46 = IPv6 if vip.isV6 else IP
+ # from client to vip
+ p1 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=vip.ip) /
+ vip.l4p(sport=sport, dport=vip.port) /
+ Raw())
+
+ rxs = self.send_and_expect(self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
+ self.assertEqual(rx[IP46].src, client_addr)
+ self.assertEqual(rx[vip.l4p].sport, sport)
+
+ InnerIP = rxs[0][IP46]
+
+ ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
+ ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
+ # from vip to client, ICMP error
+ p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP46(src=remote_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ # from other remote to client, ICMP error
+ # outside shouldn't be NAT-ed
+ p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP46(src=remote2_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote2_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ self.vapi.cnat_session_purge()
+
def test_cnat6(self):
# """ CNat Translation ipv6 """
vips = [
@@ -478,7 +590,7 @@ class TestCNatSourceNAT(VppTestCase):
def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
ip_v = "ip6" if isV6 else "ip4"
- ip_class = IPv6 if isV6 else IP
+ IP46 = IPv6 if isV6 else IP
sports = [1234, 1235, 1236]
dports = [6661, 6662, 6663]
@@ -493,14 +605,17 @@ class TestCNatSourceNAT(VppTestCase):
t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
for nbr, remote_host in enumerate(self.pg1.remote_hosts):
+ if isV6:
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ else:
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
# from pods to outside network
p1 = (
- Ether(
- dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- ip_class(
- src=getattr(self.pg0.remote_hosts[0], ip_v),
- dst=getattr(remote_host, ip_v)) /
+ Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=remote_addr) /
l4p(sport=sports[nbr], dport=dports[nbr]) /
Raw())
@@ -510,21 +625,16 @@ class TestCNatSourceNAT(VppTestCase):
self.pg1)
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- srcNatAddr)
+ self.assertEqual(rx[IP46].src, srcNatAddr)
sport = rx[l4p].sport
# from outside to pods
p2 = (
- Ether(
- dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
+ Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_hosts[nbr].mac) /
+ IP46(src=remote_addr, dst=srcNatAddr) /
l4p(sport=dports[nbr], dport=sport) /
Raw())
@@ -535,18 +645,14 @@ class TestCNatSourceNAT(VppTestCase):
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(self.pg0.remote_hosts[0], ip_v))
+ self.assertEqual(rx[IP46].dst, client_addr)
self.assertEqual(rx[l4p].dport, sports[nbr])
self.assertEqual(rx[l4p].sport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].src, remote_addr)
# add remote host to exclude list
subnet_mask = 100 if isV6 else 16
- subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
+ subnet = "%s/%d" % (remote_addr, subnet_mask)
exclude_subnet = ip_network(subnet, strict=False)
t1.cnat_exclude_subnet(exclude_subnet)
@@ -558,13 +664,9 @@ class TestCNatSourceNAT(VppTestCase):
self.pg1)
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- getattr(self.pg0.remote_hosts[0], ip_v))
+ self.assertEqual(rx[IP46].src, client_addr)
# remove remote host from exclude list
t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
@@ -577,13 +679,9 @@ class TestCNatSourceNAT(VppTestCase):
for rx in rxs:
self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(remote_host, ip_v))
+ self.assertEqual(rx[IP46].dst, remote_addr)
self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(
- rx[ip_class].src,
- srcNatAddr)
+ self.assertEqual(rx[IP46].src, srcNatAddr)
def test_cnat6_sourcenat(self):
# """ CNat Source Nat ipv6 """