aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_cnat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_cnat.py')
-rw-r--r--test/test_cnat.py1515
1 files changed, 742 insertions, 773 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py
index ff4c44033cb..ff8e1ebbbbb 100644
--- a/test/test_cnat.py
+++ b/test/test_cnat.py
@@ -2,126 +2,148 @@
import unittest
-from framework import VppTestCase, VppTestRunner
-from vpp_ip import DpoProto, INVALID_INDEX
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from vpp_ip import INVALID_INDEX
from itertools import product
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP, ICMP
-from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
+from scapy.layers.inet import IPerror, TCPerror, UDPerror
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
-import struct
-
-from ipaddress import ip_address, ip_network, \
- IPv4Address, IPv6Address, IPv4Network, IPv6Network
+from ipaddress import ip_network
from vpp_object import VppObject
from vpp_papi import VppEnum
N_PKTS = 15
+N_REMOTE_HOSTS = 3
+
+SRC = 0
+DST = 1
+
+
+class CnatCommonTestCase(VppTestCase):
+ """CNat common test class"""
+
+ #
+ # turn the scanner off whilst testing otherwise sessions
+ # will time out
+ #
+ extra_vpp_config = [
+ "cnat",
+ "{",
+ "session-db-buckets",
+ "64",
+ "session-cleanup-timeout",
+ "0.1",
+ "session-max-age",
+ "1",
+ "tcp-max-age",
+ "1",
+ "scanner",
+ "off",
+ "}",
+ ]
+ @classmethod
+ def setUpClass(cls):
+ super(CnatCommonTestCase, cls).setUpClass()
-class Ep(object):
- """ CNat endpoint """
+ @classmethod
+ def tearDownClass(cls):
+ super(CnatCommonTestCase, cls).tearDownClass()
- def __init__(self, ip=None, port=0, l4p=TCP,
- sw_if_index=INVALID_INDEX, is_v6=False):
- self.ip = ip
- if ip is None:
- self.ip = "::" if is_v6 else "0.0.0.0"
- self.port = port
- self.l4p = l4p
- self.sw_if_index = sw_if_index
- if is_v6:
- self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
- else:
- self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
- def encode(self):
- return {'addr': self.ip,
- 'port': self.port,
- 'sw_if_index': self.sw_if_index,
- 'if_af': self.if_af}
+class Endpoint(object):
+ """CNat endpoint"""
- @classmethod
- def from_pg(cls, pg, is_v6=False):
- if pg is None:
- return cls(is_v6=is_v6)
+ def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
+ self.port = port
+ self.is_v6 = is_v6
+ self.sw_if_index = INVALID_INDEX
+ if pg is not None and pgi is not None:
+ # pg interface specified and remote index
+ self.ip = self.get_ip46(pg.remote_hosts[pgi])
+ elif pg is not None:
+ self.ip = None
+ self.sw_if_index = pg.sw_if_index
+ elif ip is not None:
+ self.ip = ip
else:
- return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
-
- @property
- def isV6(self):
- return ":" in self.ip
-
- def __str__(self):
- return ("%s:%d" % (self.ip, self.port))
+ self.ip = "::" if self.is_v6 else "0.0.0.0"
+ def get_ip46(self, obj):
+ if self.is_v6:
+ return obj.ip6
+ return obj.ip4
-class EpTuple(object):
- """ CNat endpoint """
+ def udpate(self, **kwargs):
+ self.__init__(**kwargs)
- def __init__(self, src, dst):
- self.src = src
- self.dst = dst
+ def _vpp_if_af(self):
+ if self.is_v6:
+ return VppEnum.vl_api_address_family_t.ADDRESS_IP6
+ return VppEnum.vl_api_address_family_t.ADDRESS_IP4
def encode(self):
- return {'src_ep': self.src.encode(),
- 'dst_ep': self.dst.encode()}
+ return {
+ "addr": self.ip,
+ "port": self.port,
+ "sw_if_index": self.sw_if_index,
+ "if_af": self._vpp_if_af(),
+ }
def __str__(self):
- return ("%s->%s" % (self.src, self.dst))
-
+ return "%s:%d" % (self.ip, self.port)
-class VppCNatTranslation(VppObject):
- def __init__(self, test, iproto, vip, paths):
+class Translation(VppObject):
+ def __init__(self, test, iproto, vip, paths, fhc):
self._test = test
self.vip = vip
self.iproto = iproto
self.paths = paths
- self.encoded_paths = []
- for path in self.paths:
- self.encoded_paths.append(path.encode())
+ self.fhc = fhc
+ self.id = None
def __str__(self):
- return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+ return "%s %s %s" % (self.vip, self.iproto, self.paths)
- @property
- def vl4_proto(self):
+ def _vl4_proto(self):
ip_proto = VppEnum.vl_api_ip_proto_t
return {
UDP: ip_proto.IP_API_PROTO_UDP,
TCP: ip_proto.IP_API_PROTO_TCP,
}[self.iproto]
+ def _encoded_paths(self):
+ return [
+ {"src_ep": src.encode(), "dst_ep": dst.encode()}
+ for (src, dst) in self.paths
+ ]
+
def add_vpp_config(self):
r = self._test.vapi.cnat_translation_update(
- {'vip': self.vip.encode(),
- 'ip_proto': self.vl4_proto,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
+ {
+ "vip": self.vip.encode(),
+ "ip_proto": self._vl4_proto(),
+ "n_paths": len(self.paths),
+ "paths": self._encoded_paths(),
+ "flow_hash_config": self.fhc,
+ }
+ )
self._test.registry.register(self, self._test.logger)
self.id = r.id
-
- def modify_vpp_config(self, paths):
- self.paths = paths
- self.encoded_paths = []
- for path in self.paths:
- self.encoded_paths.append(path.encode())
-
- r = self._test.vapi.cnat_translation_update(
- {'vip': self.vip.encode(),
- 'ip_proto': self.vl4_proto,
- 'n_paths': len(self.paths),
- 'paths': self.encoded_paths})
- self._test.registry.register(self, self._test.logger)
+ return self
def remove_vpp_config(self):
+ assert self.id is not None
self._test.vapi.cnat_translation_del(id=self.id)
+ return self
def query_vpp_config(self):
for t in self._test.vapi.cnat_translation_dump():
@@ -129,22 +151,192 @@ class VppCNatTranslation(VppObject):
return t.translation
return None
- def object_id(self):
- return ("cnat-translation-%s" % (self.vip))
- def get_stats(self):
- c = self._test.statistics.get_counter("/net/cnat-translation")
- return c[0][self.id]
+class CnatTestContext(object):
+ """
+ Usage :
+ ctx = CnatTestContext(self, TCP, is_v6=True)
-class TestCNatTranslation(VppTestCase):
- """ CNat Translation """
- extra_vpp_punt_config = ["cnat", "{",
- "session-db-buckets", "64",
- "session-cleanup-timeout", "0.1",
- "session-max-age", "1",
- "tcp-max-age", "1",
- "scanner", "off", "}"]
+ # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+
+ # We expect this to be NATed as
+ # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+
+ # After running cnat_expect, we can send back the received packet
+ # and expect it be 'unnated' so that we get the original packet
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # same thing for ICMP errors
+ ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+ """
+
+ def __init__(self, test, L4PROTO, is_v6):
+ self.L4PROTO = L4PROTO
+ self.is_v6 = is_v6
+ self._test = test
+
+ def get_ip46(self, obj):
+ if self.is_v6:
+ return obj.ip6
+ return obj.ip4
+
+ @property
+ def IP46(self):
+ return IPv6 if self.is_v6 else IP
+
+ def cnat_send(
+ self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
+ ):
+ if isinstance(src_id, int):
+ self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
+ else:
+ self.dst_addr = src_id
+ if isinstance(dst_id, int):
+ self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
+ else:
+ self.dst_addr = dst_id
+ self.src_port = src_port # also ICMP id
+ self.dst_port = dst_port # also ICMP type
+
+ if self.L4PROTO in [TCP, UDP]:
+ l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
+ elif self.L4PROTO in [ICMP] and not self.is_v6:
+ l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
+ elif self.L4PROTO in [ICMP] and self.is_v6:
+ l4 = ICMPv6EchoRequest(id=self.src_port)
+ p1 = (
+ Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
+ / self.IP46(src=self.src_addr, dst=self.dst_addr)
+ / l4
+ / Raw()
+ )
+
+ if no_replies:
+ self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
+ else:
+ self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
+ self.expected_src_pg = src_pg
+ self.expected_dst_pg = dst_pg
+ return self
+
+ def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
+ if isinstance(src_id, int):
+ self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
+ else:
+ self.expect_src_addr = src_id
+ if isinstance(dst_id, int):
+ self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
+ else:
+ self.expect_dst_addr = dst_id
+ self.expect_src_port = src_port
+ self.expect_dst_port = dst_port
+
+ if self.expect_src_port is None:
+ if self.L4PROTO in [TCP, UDP]:
+ self.expect_src_port = self.rxs[0][self.L4PROTO].sport
+ elif self.L4PROTO in [ICMP] and not self.is_v6:
+ self.expect_src_port = self.rxs[0][self.L4PROTO].id
+ elif self.L4PROTO in [ICMP] and self.is_v6:
+ self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
+
+ for rx in self.rxs:
+ self._test.assert_packet_checksums_valid(rx)
+ self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
+ self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
+ if self.L4PROTO in [TCP, UDP]:
+ self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
+ self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
+ elif self.L4PROTO in [ICMP] and not self.is_v6:
+ self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
+ self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
+ elif self.L4PROTO in [ICMP] and self.is_v6:
+ self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
+ return self
+
+ def cnat_send_return(self):
+ """This sends the return traffic"""
+ if self.L4PROTO in [TCP, UDP]:
+ l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
+ elif self.L4PROTO in [ICMP] and not self.is_v6:
+ # icmp type 0 if echo reply
+ l4 = self.L4PROTO(id=self.expect_src_port, type=0)
+ elif self.L4PROTO in [ICMP] and self.is_v6:
+ l4 = ICMPv6EchoReply(id=self.expect_src_port)
+ src_mac = self.expected_dst_pg.remote_mac
+ p1 = (
+ Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
+ / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
+ / l4
+ / Raw()
+ )
+
+ self.return_rxs = self._test.send_and_expect(
+ self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
+ )
+ return self
+
+ def cnat_expect_return(self):
+ for rx in self.return_rxs:
+ self._test.assert_packet_checksums_valid(rx)
+ self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
+ self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
+ if self.L4PROTO in [TCP, UDP]:
+ self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
+ self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
+ elif self.L4PROTO in [ICMP] and not self.is_v6:
+ # icmp type 0 if echo reply
+ self._test.assertEqual(rx[self.L4PROTO].type, 0)
+ self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
+ elif self.L4PROTO in [ICMP] and self.is_v6:
+ self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
+ return self
+
+ def cnat_send_icmp_return_error(self):
+ """
+ This called after cnat_expect will send an icmp error
+ on the reverse path
+ """
+ ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
+ InnerIP = self.rxs[0][self.IP46]
+ p1 = (
+ Ether(
+ src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
+ )
+ / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
+ / ICMPelem
+ / InnerIP
+ )
+ self.return_rxs = self._test.send_and_expect(
+ self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
+ )
+ return self
+
+ def cnat_expect_icmp_error_return(self):
+ ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
+ IP46err = IPerror6 if self.is_v6 else IPerror
+ L4err = TCPerror if self.L4PROTO is TCP else UDPerror
+ for rx in self.return_rxs:
+ self._test.assert_packet_checksums_valid(rx)
+ self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
+ self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
+ self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
+ self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
+ self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
+ self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
+ return self
+
+
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+# -------------------------------------------------------------------
+
+
+class TestCNatTranslation(CnatCommonTestCase):
+ """CNat Translation"""
@classmethod
def setUpClass(cls):
@@ -158,6 +350,8 @@ class TestCNatTranslation(VppTestCase):
super(TestCNatTranslation, self).setUp()
self.create_pg_interfaces(range(3))
+ self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
+ self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
for i in self.pg_interfaces:
i.admin_up()
@@ -165,188 +359,121 @@ class TestCNatTranslation(VppTestCase):
i.resolve_arp()
i.config_ip6()
i.resolve_ndp()
+ i.configure_ipv4_neighbors()
+ i.configure_ipv6_neighbors()
def tearDown(self):
+ for translation in self.translations:
+ translation.remove_vpp_config()
+
+ self.vapi.cnat_session_purge()
+ self.assertFalse(self.vapi.cnat_session_dump())
+
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
super(TestCNatTranslation, self).tearDown()
- def cnat_create_translation(self, vip, nbr):
- ip_v = "ip6" if vip.isV6 else "ip4"
- dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
- sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
- t1 = VppCNatTranslation(
- self, vip.l4p, vip,
- [EpTuple(sep, dep), EpTuple(sep, dep)])
- t1.add_vpp_config()
- return t1
-
- def cnat_test_translation(self, t1, nbr, sports, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
- ip_class = IPv6 if isV6 else IP
- vip = t1.vip
+ def cnat_fhc_translation(self):
+ """CNat Translation"""
+ self.logger.info(self.vapi.cli("sh cnat client"))
+ self.logger.info(self.vapi.cli("sh cnat translation"))
+
+ for nbr, translation in enumerate(self.mbtranslations):
+ vip = translation.vip
- #
- # Flows
- #
- for src in self.pg0.remote_hosts:
- for sport in sports:
+ #
+ # Flows to the VIP with same ips and different source ports are loadbalanced identically
+ # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
+ #
+ ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+ for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
# from client to vip
- p1 = (Ether(dst=self.pg0.local_mac,
- src=src.mac) /
- ip_class(src=getattr(src, ip_v), dst=vip.ip) /
- vip.l4p(sport=sport, dport=vip.port) /
- Raw())
-
- self.vapi.cli("trace add pg-input 1")
- rxs = self.send_and_expect(self.pg0,
- p1 * N_PKTS,
- self.pg1)
- self.logger.info(self.vapi.cli("show trace max 1"))
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(self.pg1.remote_hosts[nbr], ip_v))
- self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
- self.assertEqual(
- rx[ip_class].src,
- getattr(src, ip_v))
- self.assertEqual(rx[vip.l4p].sport, sport)
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
+ dport1 = ctx.rxs[0][ctx.L4PROTO].dport
+ ctx._test.assertIn(
+ dport1,
+ [translation.paths[0][DST].port, translation.paths[1][DST].port],
+ )
+ ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
+
+ ctx.cnat_send(
+ self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
+ )
+ dport2 = ctx.rxs[0][ctx.L4PROTO].dport
+ ctx._test.assertIn(
+ dport2,
+ [translation.paths[0][DST].port, translation.paths[1][DST].port],
+ )
+ ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
+
+ ctx._test.assertEqual(dport1, dport2)
+
+ def cnat_translation(self):
+ """CNat Translation"""
+ self.logger.info(self.vapi.cli("sh cnat client"))
+ self.logger.info(self.vapi.cli("sh cnat translation"))
+ for nbr, translation in enumerate(self.translations):
+ vip = translation.vip
+
+ #
+ # Test Flows to the VIP
+ #
+ ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+ for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
+ # from client to vip
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
+ dst_port = translation.paths[0][DST].port
+ ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
# from vip to client
- p1 = (Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_mac) /
- ip_class(src=getattr(
- self.pg1.remote_hosts[nbr],
- ip_v),
- dst=getattr(src, ip_v)) /
- vip.l4p(sport=4000 + nbr, dport=sport) /
- Raw())
-
- rxs = self.send_and_expect(self.pg1,
- p1 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(
- rx[ip_class].dst,
- getattr(src, ip_v))
- self.assertEqual(rx[vip.l4p].dport, sport)
- self.assertEqual(rx[ip_class].src, vip.ip)
- self.assertEqual(rx[vip.l4p].sport, vip.port)
+ ctx.cnat_send_return().cnat_expect_return()
#
# packets to the VIP that do not match a
# translation are dropped
#
- p1 = (Ether(dst=self.pg0.local_mac,
- src=src.mac) /
- ip_class(src=getattr(src, ip_v), dst=vip.ip) /
- vip.l4p(sport=sport, dport=6666) /
- Raw())
-
- self.send_and_assert_no_replies(self.pg0,
- p1 * N_PKTS,
- self.pg1)
+ ctx.cnat_send(
+ self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
+ )
#
# packets from the VIP that do not match a
# session are forwarded
#
- p1 = (Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_mac) /
- ip_class(src=getattr(
- self.pg1.remote_hosts[nbr],
- ip_v),
- dst=getattr(src, ip_v)) /
- vip.l4p(sport=6666, dport=sport) /
- Raw())
-
- rxs = self.send_and_expect(self.pg1,
- p1 * N_PKTS,
- self.pg0)
-
- def cnat_test_translation_update(self, t1, sports, isV6=False):
- ip_v = "ip6" if isV6 else "ip4"
- ip_class = IPv6 if isV6 else IP
- vip = t1.vip
-
- #
- # modify the translation to use a different backend
- #
- dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
- sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
- t1.modify_vpp_config([EpTuple(sep, dep)])
-
- #
- # existing flows follow the old path
- #
- for src in self.pg0.remote_hosts:
- for sport in sports:
- # from client to vip
- p1 = (Ether(dst=self.pg0.local_mac,
- src=src.mac) /
- ip_class(src=getattr(src, ip_v), dst=vip.ip) /
- vip.l4p(sport=sport, dport=vip.port) /
- Raw())
-
- rxs = self.send_and_expect(self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- #
- # new flows go to the new backend
- #
- for src in self.pg0.remote_hosts:
- p1 = (Ether(dst=self.pg0.local_mac,
- src=src.mac) /
- ip_class(src=getattr(src, ip_v), dst=vip.ip) /
- vip.l4p(sport=9999, dport=vip.port) /
- Raw())
-
- rxs = self.send_and_expect(self.pg0,
- p1 * N_PKTS,
- self.pg2)
-
- def cnat_translation(self, vips, isV6=False):
- """ CNat Translation """
-
- ip_class = IPv6 if isV6 else IP
- ip_v = "ip6" if isV6 else "ip4"
- sports = [1234, 1233]
-
- #
- # turn the scanner off whilst testing otherwise sessions
- # will time out
- #
- self.vapi.cli("test cnat scanner off")
-
- sessions = self.vapi.cnat_session_dump()
-
- trs = []
- for nbr, vip in enumerate(vips):
- trs.append(self.cnat_create_translation(vip, nbr))
-
- self.logger.info(self.vapi.cli("sh cnat client"))
- self.logger.info(self.vapi.cli("sh cnat translation"))
+ ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
+ ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
+
+ #
+ # modify the translation to use a different backend
+ #
+ old_dst_port = translation.paths[0][DST].port
+ translation.paths[0][DST].udpate(
+ pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
+ )
+ translation.add_vpp_config()
+
+ #
+ # existing flows follow the old path
+ #
+ for src_pgi in range(N_REMOTE_HOSTS):
+ for sport in [1234, 1233]:
+ # from client to vip
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
+ ctx.cnat_expect(
+ self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
+ )
+ # from vip to client
+ ctx.cnat_send_return().cnat_expect_return()
+
+ #
+ # new flows go to the new backend
+ #
+ for src_pgi in range(N_REMOTE_HOSTS):
+ ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
+ ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
- #
- # translations
- #
- for nbr, vip in enumerate(vips):
- self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
- self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
- if isV6:
- self.logger.info(self.vapi.cli(
- "sh ip6 fib %s" % self.pg0.remote_ip6))
- else:
- self.logger.info(self.vapi.cli(
- "sh ip fib %s" % self.pg0.remote_ip4))
self.logger.info(self.vapi.cli("sh cnat session verbose"))
#
@@ -354,183 +481,206 @@ class TestCNatTranslation(VppTestCase):
# all disapper
#
self.vapi.cli("test cnat scanner on")
-
- n_tries = 0
+ self.virtual_sleep(2)
sessions = self.vapi.cnat_session_dump()
- while (len(sessions) and n_tries < 100):
- n_tries += 1
- sessions = self.vapi.cnat_session_dump()
- self.sleep(2)
- self.logger.info(self.vapi.cli("show cnat session verbose"))
-
- self.assertTrue(n_tries < 100)
+ self.assertEqual(len(sessions), 0)
self.vapi.cli("test cnat scanner off")
#
# load some flows again and purge
#
- for vip in vips:
- for src in self.pg0.remote_hosts:
- for sport in sports:
+ for translation in self.translations:
+ vip = translation.vip
+ ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+ for src_pgi in range(N_REMOTE_HOSTS):
+ for sport in [1234, 1233]:
# from client to vip
- p1 = (Ether(dst=self.pg0.local_mac,
- src=src.mac) /
- ip_class(src=getattr(src, ip_v), dst=vip.ip) /
- vip.l4p(sport=sport, dport=vip.port) /
- Raw())
- self.send_and_expect(self.pg0,
- p1 * N_PKTS,
- self.pg2)
-
- for tr in trs:
- tr.remove_vpp_config()
-
- self.assertTrue(self.vapi.cnat_session_dump())
- self.vapi.cnat_session_purge()
- self.assertFalse(self.vapi.cnat_session_dump())
-
- def test_icmp(self):
- vips = [
- Ep("30.0.0.1", 5555),
- Ep("30.0.0.2", 5554),
- Ep("30.0.0.2", 5553, UDP),
- Ep("30::1", 6666),
- Ep("30::2", 5553, UDP),
- ]
- sport = 1234
-
- self.pg0.generate_remote_hosts(len(vips))
- self.pg0.configure_ipv6_neighbors()
- self.pg0.configure_ipv4_neighbors()
-
- self.pg1.generate_remote_hosts(len(vips))
- self.pg1.configure_ipv6_neighbors()
- self.pg1.configure_ipv4_neighbors()
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
+ ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
- self.vapi.cli("test cnat scanner off")
- trs = []
- for nbr, vip in enumerate(vips):
- trs.append(self.cnat_create_translation(vip, nbr))
-
- self.logger.info(self.vapi.cli("sh cnat client"))
- self.logger.info(self.vapi.cli("sh cnat translation"))
-
- for nbr, vip in enumerate(vips):
- if vip.isV6:
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- remote2_addr = self.pg2.remote_hosts[0].ip6
- else:
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- remote2_addr = self.pg2.remote_hosts[0].ip4
- IP46 = IPv6 if vip.isV6 else IP
- # from client to vip
- p1 = (Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=vip.ip) /
- vip.l4p(sport=sport, dport=vip.port) /
- Raw())
-
- rxs = self.send_and_expect(self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
- self.assertEqual(rx[IP46].src, client_addr)
- self.assertEqual(rx[vip.l4p].sport, sport)
-
- InnerIP = rxs[0][IP46]
-
- ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
- ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
- # from vip to client, ICMP error
- p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP46(src=remote_addr, dst=client_addr) /
- ICMPelem / InnerIP)
-
- rxs = self.send_and_expect(self.pg1,
- p1 * N_PKTS,
- self.pg0)
-
- TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
- IP46error = IPerror6 if vip.isV6 else IPerror
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, vip.ip)
- self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
- self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPUDPError].sport, sport)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPUDPError].dport, vip.port)
-
- # from other remote to client, ICMP error
- # outside shouldn't be NAT-ed
- p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
- IP46(src=remote2_addr, dst=client_addr) /
- ICMPelem / InnerIP)
-
- rxs = self.send_and_expect(self.pg1,
- p1 * N_PKTS,
- self.pg0)
-
- TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
- IP46error = IPerror6 if vip.isV6 else IPerror
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, remote2_addr)
- self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
- self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPUDPError].sport, sport)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPUDPError].dport, vip.port)
-
- self.vapi.cnat_session_purge()
+ def _test_icmp(self):
+ #
+ # Testing ICMP
+ #
+ for nbr, translation in enumerate(self.translations):
+ vip = translation.vip
+ ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
+
+ #
+ # NATing ICMP errors
+ #
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
+ dst_port = translation.paths[0][DST].port
+ ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
+ ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+ #
+ # ICMP errors with no VIP associated should not be
+ # modified
+ #
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
+ dst_port = translation.paths[0][DST].port
+ ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
+ ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+ def _make_multi_backend_translations(self):
+ self.translations = []
+ self.mbtranslations = []
+ self.mbtranslations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
+ ),
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
+ ),
+ ],
+ 0x03, # hash only on dst ip and src ip
+ ).add_vpp_config()
+ )
+ self.mbtranslations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
+ ),
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
+ ),
+ ],
+ 0x08, # hash only on dst port
+ ).add_vpp_config()
+ )
+
+ def _make_translations_v4(self):
+ self.translations = []
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ UDP,
+ Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+
+ def _make_translations_v6(self):
+ self.translations = []
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30::1", port=5555, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30::2", port=5554, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ UDP,
+ Endpoint(ip="30::2", port=5553, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
+ )
+ ],
+ 0x9F,
+ ).add_vpp_config()
+ )
+
+ def test_icmp4(self):
+ # """ CNat Translation icmp v4 """
+ self._make_translations_v4()
+ self._test_icmp()
+
+ def test_icmp6(self):
+ # """ CNat Translation icmp v6 """
+ self._make_translations_v6()
+ self._test_icmp()
def test_cnat6(self):
# """ CNat Translation ipv6 """
- vips = [
- Ep("30::1", 5555),
- Ep("30::2", 5554),
- Ep("30::2", 5553, UDP),
- ]
-
- self.pg0.generate_remote_hosts(len(vips))
- self.pg0.configure_ipv6_neighbors()
- self.pg1.generate_remote_hosts(len(vips))
- self.pg1.configure_ipv6_neighbors()
-
- self.cnat_translation(vips, isV6=True)
+ self._make_translations_v6()
+ self.cnat_translation()
def test_cnat4(self):
# """ CNat Translation ipv4 """
+ self._make_translations_v4()
+ self.cnat_translation()
- vips = [
- Ep("30.0.0.1", 5555),
- Ep("30.0.0.2", 5554),
- Ep("30.0.0.2", 5553, UDP),
- ]
-
- self.pg0.generate_remote_hosts(len(vips))
- self.pg0.configure_ipv4_neighbors()
- self.pg1.generate_remote_hosts(len(vips))
- self.pg1.configure_ipv4_neighbors()
+ def test_cnat_fhc(self):
+ # """ CNat Translation flow hash config """
+ self._make_multi_backend_translations()
+ self.cnat_fhc_translation()
- self.cnat_translation(vips)
-
-class TestCNatSourceNAT(VppTestCase):
- """ CNat Source NAT """
- extra_vpp_punt_config = ["cnat", "{",
- "session-cleanup-timeout", "0.1",
- "session-max-age", "1",
- "tcp-max-age", "1",
- "scanner", "off", "}"]
+class TestCNatSourceNAT(CnatCommonTestCase):
+ """CNat Source NAT"""
@classmethod
def setUpClass(cls):
@@ -540,51 +690,61 @@ class TestCNatSourceNAT(VppTestCase):
def tearDownClass(cls):
super(TestCNatSourceNAT, cls).tearDownClass()
- def setUp(self):
- super(TestCNatSourceNAT, self).setUp()
-
- self.create_pg_interfaces(range(3))
-
- for i in self.pg_interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- i.config_ip6()
- i.resolve_ndp()
-
- self.pg0.configure_ipv6_neighbors()
- self.pg0.configure_ipv4_neighbors()
- self.pg1.generate_remote_hosts(2)
- self.pg1.configure_ipv4_neighbors()
- self.pg1.configure_ipv6_neighbors()
-
+ def _enable_disable_snat(self, is_enable=True):
self.vapi.cnat_set_snat_addresses(
snat_ip4=self.pg2.remote_hosts[0].ip4,
snat_ip6=self.pg2.remote_hosts[0].ip6,
- sw_if_index=INVALID_INDEX)
+ sw_if_index=INVALID_INDEX,
+ )
self.vapi.feature_enable_disable(
- enable=1,
+ enable=1 if is_enable else 0,
arc_name="ip6-unicast",
feature_name="cnat-snat-ip6",
- sw_if_index=self.pg0.sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ )
self.vapi.feature_enable_disable(
- enable=1,
+ enable=1 if is_enable else 0,
arc_name="ip4-unicast",
feature_name="cnat-snat-ip4",
- sw_if_index=self.pg0.sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ )
policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
self.vapi.cnat_set_snat_policy(
- policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+ policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
+ )
for i in self.pg_interfaces:
self.vapi.cnat_snat_policy_add_del_if(
- sw_if_index=i.sw_if_index, is_add=1,
- table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+ sw_if_index=i.sw_if_index,
+ is_add=1 if is_enable else 0,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
+ )
self.vapi.cnat_snat_policy_add_del_if(
- sw_if_index=i.sw_if_index, is_add=1,
- table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+ sw_if_index=i.sw_if_index,
+ is_add=1 if is_enable else 0,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
+ )
+
+ def setUp(self):
+ super(TestCNatSourceNAT, self).setUp()
+
+ self.create_pg_interfaces(range(3))
+ self.pg1.generate_remote_hosts(2)
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ i.config_ip6()
+ i.resolve_ndp()
+ i.configure_ipv6_neighbors()
+ i.configure_ipv4_neighbors()
+
+ self._enable_disable_snat(is_enable=True)
def tearDown(self):
+ self._enable_disable_snat(is_enable=True)
+
self.vapi.cnat_session_purge()
for i in self.pg_interfaces:
i.unconfig_ip4()
@@ -594,272 +754,72 @@ class TestCNatSourceNAT(VppTestCase):
def test_snat_v6(self):
# """ CNat Source Nat v6 """
- self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
- self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
- self.sourcenat_test_icmp_err_conf(isV6=True)
- self.sourcenat_test_icmp_echo6_conf()
+ self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
+ self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
+ self.sourcenat_test_icmp_echo_conf(is_v6=True)
def test_snat_v4(self):
# """ CNat Source Nat v4 """
self.sourcenat_test_tcp_udp_conf(TCP)
self.sourcenat_test_tcp_udp_conf(UDP)
- self.sourcenat_test_icmp_err_conf()
- self.sourcenat_test_icmp_echo4_conf()
-
- def sourcenat_test_icmp_echo6_conf(self):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
-
- # ping from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IPv6(src=client_addr, dst=remote_addr) /
- ICMPv6EchoRequest(id=0xfeed) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assertEqual(rx[IPv6].src, src_nat_addr)
- self.assert_packet_checksums_valid(rx)
-
- received_id = rx[0][ICMPv6EchoRequest].id
- # ping reply from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IPv6(src=remote_addr, dst=src_nat_addr) /
- ICMPv6EchoReply(id=received_id))
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IPv6].src, remote_addr)
- self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
-
- def sourcenat_test_icmp_echo4_conf(self):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
-
- # ping from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- ICMP(type=8, id=0xfeed) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assertEqual(rx[IP46].src, src_nat_addr)
- self.assert_packet_checksums_valid(rx)
-
- received_id = rx[0][ICMP].id
- # ping reply from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- ICMP(type=0, id=received_id))
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, remote_addr)
- self.assertEqual(rx[ICMP].id, 0xfeed)
-
- def sourcenat_test_icmp_err_conf(self, isV6=False):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- if isV6:
- IP46 = IPv6
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
- ICMP46 = ICMPv6DestUnreach
- ICMPelem = ICMPv6DestUnreach(code=1)
- IP46error = IPerror6
- else:
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
- IP46error = IPerror
- ICMP46 = ICMP
- ICMPelem = ICMP(type=11)
-
- # from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- TCP(sport=sports[nbr], dport=dports[nbr]) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[TCP].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
- sport = rx[TCP].sport
-
- InnerIP = rxs[0][IP46]
- # from outside to pods, ICMP error
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- ICMPelem / InnerIP)
-
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].src, remote_addr)
- self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
- self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPerror].sport, sports[nbr])
- self.assertEqual(rx[ICMP46][IP46error]
- [TCPerror].dport, dports[nbr])
-
- def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
- sports = [1234, 1235]
- dports = [6661, 6662]
-
- for nbr, remote_host in enumerate(self.pg1.remote_hosts):
- if isV6:
- IP46 = IPv6
- client_addr = self.pg0.remote_hosts[0].ip6
- remote_addr = self.pg1.remote_hosts[nbr].ip6
- src_nat_addr = self.pg2.remote_hosts[0].ip6
- exclude_prefix = ip_network(
- "%s/100" % remote_addr, strict=False)
- else:
- IP46 = IP
- client_addr = self.pg0.remote_hosts[0].ip4
- remote_addr = self.pg1.remote_hosts[nbr].ip4
- src_nat_addr = self.pg2.remote_hosts[0].ip4
- exclude_prefix = ip_network(
- "%s/16" % remote_addr, strict=False)
- # from pods to outside network
- p1 = (
- Ether(dst=self.pg0.local_mac,
- src=self.pg0.remote_hosts[0].mac) /
- IP46(src=client_addr, dst=remote_addr) /
- l4p(sport=sports[nbr], dport=dports[nbr]) /
- Raw())
-
- self.vapi.cli("trace add pg-input 1")
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- self.logger.info(self.vapi.cli("show trace max 1"))
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
- sport = rx[l4p].sport
-
- # from outside to pods
- p2 = (
- Ether(dst=self.pg1.local_mac,
- src=self.pg1.remote_hosts[nbr].mac) /
- IP46(src=remote_addr, dst=src_nat_addr) /
- l4p(sport=dports[nbr], dport=sport) /
- Raw())
-
- rxs = self.send_and_expect(
- self.pg1,
- p2 * N_PKTS,
- self.pg0)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, client_addr)
- self.assertEqual(rx[l4p].dport, sports[nbr])
- self.assertEqual(rx[l4p].sport, dports[nbr])
- self.assertEqual(rx[IP46].src, remote_addr)
-
- # add remote host to exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=1)
- self.vapi.cnat_session_purge()
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, client_addr)
-
- # remove remote host from exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=0)
- self.vapi.cnat_session_purge()
-
- rxs = self.send_and_expect(
- self.pg0,
- p1 * N_PKTS,
- self.pg1)
-
- for rx in rxs:
- self.assert_packet_checksums_valid(rx)
- self.assertEqual(rx[IP46].dst, remote_addr)
- self.assertEqual(rx[l4p].dport, dports[nbr])
- self.assertEqual(rx[IP46].src, src_nat_addr)
-
- self.vapi.cnat_session_purge()
-
-
-class TestCNatDHCP(VppTestCase):
- """ CNat Translation """
- extra_vpp_punt_config = ["cnat", "{",
- "session-db-buckets", "64",
- "session-cleanup-timeout", "0.1",
- "session-max-age", "1",
- "tcp-max-age", "1",
- "scanner", "off", "}"]
+ self.sourcenat_test_icmp_echo_conf()
+
+ def sourcenat_test_icmp_echo_conf(self, is_v6=False):
+ ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
+ # 8 is ICMP type echo (v4 only)
+ ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
+ ctx = CnatTestContext(self, L4PROTO, is_v6)
+ # we should source NAT
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # exclude dst address of pg1.1 from snat
+ if is_v6:
+ exclude_prefix = ip_network(
+ "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
+ )
+ else:
+ exclude_prefix = ip_network(
+ "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
+ )
+
+ # add remote host to exclude list
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
+
+ # We should not source NAT the id=1
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # But we should source NAT the id=0
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # remove remote host from exclude list
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
+ self.vapi.cnat_session_purge()
+
+ # We should source NAT again
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+ ctx.cnat_send_return().cnat_expect_return()
+
+ # test return ICMP error nating
+ ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
+ ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
+ ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
+
+ self.vapi.cnat_session_purge()
+
+
+class TestCNatDHCP(CnatCommonTestCase):
+ """CNat Translation"""
@classmethod
def setUpClass(cls):
@@ -874,102 +834,111 @@ class TestCNatDHCP(VppTestCase):
i.admin_down()
super(TestCNatDHCP, self).tearDown()
- def create_translation(self, vip_pg, *args, is_v6=False):
- vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
- paths = []
- for (src_pg, dst_pg) in args:
- paths.append(EpTuple(
- Ep.from_pg(src_pg, is_v6=is_v6),
- Ep.from_pg(dst_pg, is_v6=is_v6)
- ))
- t1 = VppCNatTranslation(self, TCP, vip, paths)
- t1.add_vpp_config()
- return t1
-
- def make_addr(self, sw_if_index, i, is_v6):
+ def make_addr(self, sw_if_index, addr_id, is_v6):
if is_v6:
- return "fd01:%x::%u" % (sw_if_index, i + 1)
- else:
- return "172.16.%u.%u" % (sw_if_index, i)
+ return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
+ return "172.16.%u.%u" % (sw_if_index, addr_id)
- def make_prefix(self, sw_if_index, i, is_v6):
+ def make_prefix(self, sw_if_index, addr_id, is_v6):
if is_v6:
- return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
- else:
- return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
-
- def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
- qt1 = tr.query_vpp_config()
- self.assertEqual(str(qt1.vip.addr), self.make_addr(
- vip_pg.sw_if_index, i, is_v6))
- for (src_pg, dst_pg), path in zip(args, qt1.paths):
- if src_pg:
- self.assertEqual(str(path.src_ep.addr), self.make_addr(
- src_pg.sw_if_index, i, is_v6))
- if dst_pg:
- self.assertEqual(str(path.dst_ep.addr), self.make_addr(
- dst_pg.sw_if_index, i, is_v6))
-
- def config_ips(self, rng, is_add=1, is_v6=False):
- for pg, i in product(self.pg_interfaces, rng):
- self.vapi.sw_interface_add_del_address(
- sw_if_index=pg.sw_if_index,
- prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
- is_add=is_add)
-
- def test_dhcp_v4(self):
- self.create_pg_interfaces(range(5))
+ return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
+ return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
+
+ def check_resolved(self, tr, addr_id, is_v6=False):
+ qt = tr.query_vpp_config()
+ self.assertEqual(
+ str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
+ )
+ self.assertEqual(len(qt.paths), len(tr.paths))
+ for path_tr, path_qt in zip(tr.paths, qt.paths):
+ src_qt = path_qt.src_ep
+ dst_qt = path_qt.dst_ep
+ src_tr, dst_tr = path_tr
+ self.assertEqual(
+ str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
+ )
+ self.assertEqual(
+ str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
+ )
+
+ def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=pg.sw_if_index,
+ prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
+ is_add=1 if is_add else 0,
+ )
+
+ def _test_dhcp_v46(self, is_v6):
+ self.create_pg_interfaces(range(4))
for i in self.pg_interfaces:
i.admin_up()
- pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
- t1 = self.create_translation(*pglist)
- self.config_ips([0])
- self.check_resolved(t1, *pglist)
- self.config_ips([1])
- self.config_ips([0], is_add=0)
- self.check_resolved(t1, *pglist, i=1)
- self.config_ips([1], is_add=0)
- t1.remove_vpp_config()
+ paths = [
+ (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
+ (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
+ ]
+ ep = Endpoint(pg=self.pg0, is_v6=is_v6)
+ t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
+ # Add an address on every interface
+ # and check it is reflected in the cnat config
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
+ self.check_resolved(t, addr_id=0, is_v6=is_v6)
+ # Add a new address on every interface, remove the old one
+ # and check it is reflected in the cnat config
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
+ self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
+ self.check_resolved(t, addr_id=1, is_v6=is_v6)
+ # remove the configuration
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
+ t.remove_vpp_config()
+
+ def test_dhcp_v4(self):
+ self._test_dhcp_v46(False)
def test_dhcp_v6(self):
- self.create_pg_interfaces(range(5))
- for i in self.pg_interfaces:
- i.admin_up()
- pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
- t1 = self.create_translation(*pglist, is_v6=True)
- self.config_ips([0], is_v6=True)
- self.check_resolved(t1, *pglist, is_v6=True)
- self.config_ips([1], is_v6=True)
- self.config_ips([0], is_add=0, is_v6=True)
- self.check_resolved(t1, *pglist, i=1, is_v6=True)
- self.config_ips([1], is_add=0, is_v6=True)
- t1.remove_vpp_config()
+ self._test_dhcp_v46(True)
def test_dhcp_snat(self):
self.create_pg_interfaces(range(1))
for i in self.pg_interfaces:
i.admin_up()
self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
- self.config_ips([0], is_v6=False)
- self.config_ips([0], is_v6=True)
+ # Add an address on every interface
+ # and check it is reflected in the cnat config
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
+ self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
r = self.vapi.cnat_get_snat_addresses()
- self.assertEqual(str(r.snat_ip4), self.make_addr(
- self.pg0.sw_if_index, 0, False))
- self.assertEqual(str(r.snat_ip6), self.make_addr(
- self.pg0.sw_if_index, 0, True))
- self.config_ips([1], is_v6=False)
- self.config_ips([1], is_v6=True)
- self.config_ips([0], is_add=0, is_v6=False)
- self.config_ips([0], is_add=0, is_v6=True)
+ self.assertEqual(
+ str(r.snat_ip4),
+ self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
+ )
+ self.assertEqual(
+ str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
+ )
+ # Add a new address on every interface, remove the old one
+ # and check it is reflected in the cnat config
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
+ self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
+ self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
+ self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
r = self.vapi.cnat_get_snat_addresses()
- self.assertEqual(str(r.snat_ip4), self.make_addr(
- self.pg0.sw_if_index, 1, False))
- self.assertEqual(str(r.snat_ip6), self.make_addr(
- self.pg0.sw_if_index, 1, True))
- self.config_ips([1], is_add=0, is_v6=False)
- self.config_ips([1], is_add=0, is_v6=True)
+ self.assertEqual(
+ str(r.snat_ip4),
+ self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
+ )
+ self.assertEqual(
+ str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
+ )
+ # remove the configuration
+ for pg in self.pg_interfaces:
+ self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
+ self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)