aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_cnat.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_cnat.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_cnat.py')
-rw-r--r--test/test_cnat.py382
1 files changed, 227 insertions, 155 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py
index 25c2a6454cb..e2e7c6bab8c 100644
--- a/test/test_cnat.py
+++ b/test/test_cnat.py
@@ -15,8 +15,14 @@ from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
import struct
-from ipaddress import ip_address, ip_network, \
- IPv4Address, IPv6Address, IPv4Network, IPv6Network
+from ipaddress import (
+ ip_address,
+ ip_network,
+ IPv4Address,
+ IPv6Address,
+ IPv4Network,
+ IPv6Network,
+)
from vpp_object import VppObject
from vpp_papi import VppEnum
@@ -29,18 +35,27 @@ DST = 1
class CnatCommonTestCase(VppTestCase):
- """ CNat common test class """
+ """CNat common test class"""
#
# turn the scanner off whilst testing otherwise sessions
# will time out
#
- extra_vpp_punt_config = ["cnat", "{",
- "session-db-buckets", "64",
- "session-cleanup-timeout", "0.1",
- "session-max-age", "1",
- "tcp-max-age", "1",
- "scanner", "off", "}"]
+ extra_vpp_punt_config = [
+ "cnat",
+ "{",
+ "session-db-buckets",
+ "64",
+ "session-cleanup-timeout",
+ "0.1",
+ "session-max-age",
+ "1",
+ "tcp-max-age",
+ "1",
+ "scanner",
+ "off",
+ "}",
+ ]
@classmethod
def setUpClass(cls):
@@ -52,7 +67,7 @@ class CnatCommonTestCase(VppTestCase):
class Endpoint(object):
- """ CNat endpoint """
+ """CNat endpoint"""
def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
self.port = port
@@ -83,17 +98,18 @@ class Endpoint(object):
return VppEnum.vl_api_address_family_t.ADDRESS_IP4
def encode(self):
- return {'addr': self.ip,
- 'port': self.port,
- 'sw_if_index': self.sw_if_index,
- 'if_af': self._vpp_if_af()}
+ return {
+ "addr": self.ip,
+ "port": self.port,
+ "sw_if_index": self.sw_if_index,
+ "if_af": self._vpp_if_af(),
+ }
def __str__(self):
- return ("%s:%d" % (self.ip, self.port))
+ return "%s:%d" % (self.ip, self.port)
class Translation(VppObject):
-
def __init__(self, test, iproto, vip, paths):
self._test = test
self.vip = vip
@@ -102,7 +118,7 @@ class Translation(VppObject):
self.id = None
def __str__(self):
- return ("%s %s %s" % (self.vip, self.iproto, self.paths))
+ return "%s %s %s" % (self.vip, self.iproto, self.paths)
def _vl4_proto(self):
ip_proto = VppEnum.vl_api_ip_proto_t
@@ -112,21 +128,26 @@ class Translation(VppObject):
}[self.iproto]
def _encoded_paths(self):
- return [{'src_ep': src.encode(),
- 'dst_ep': dst.encode()} for (src, dst) in self.paths]
+ return [
+ {"src_ep": src.encode(), "dst_ep": dst.encode()}
+ for (src, dst) in self.paths
+ ]
def add_vpp_config(self):
r = self._test.vapi.cnat_translation_update(
- {'vip': self.vip.encode(),
- 'ip_proto': self._vl4_proto(),
- 'n_paths': len(self.paths),
- 'paths': self._encoded_paths()})
+ {
+ "vip": self.vip.encode(),
+ "ip_proto": self._vl4_proto(),
+ "n_paths": len(self.paths),
+ "paths": self._encoded_paths(),
+ }
+ )
self._test.registry.register(self, self._test.logger)
self.id = r.id
return self
def remove_vpp_config(self):
- assert(self.id is not None)
+ assert self.id is not None
self._test.vapi.cnat_translation_del(id=self.id)
return self
@@ -172,8 +193,9 @@ class CnatTestContext(object):
def IP46(self):
return IPv6 if self.is_v6 else IP
- def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
- no_replies=False):
+ def cnat_send(
+ self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
+ ):
if isinstance(src_id, int):
self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
else:
@@ -191,11 +213,12 @@ class CnatTestContext(object):
l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
elif self.L4PROTO in [ICMP] and self.is_v6:
l4 = ICMPv6EchoRequest(id=self.src_port)
- p1 = (Ether(src=src_pg.remote_mac,
- dst=src_pg.local_mac) /
- self.IP46(src=self.src_addr, dst=self.dst_addr) /
- l4 /
- Raw())
+ p1 = (
+ Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
+ / self.IP46(src=self.src_addr, dst=self.dst_addr)
+ / l4
+ / Raw()
+ )
if no_replies:
self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
@@ -230,38 +253,35 @@ class CnatTestContext(object):
self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
if self.L4PROTO in [TCP, UDP]:
- self._test.assertEqual(
- rx[self.L4PROTO].dport, self.expect_dst_port)
- self._test.assertEqual(
- rx[self.L4PROTO].sport, self.expect_src_port)
+ self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
+ self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
elif self.L4PROTO in [ICMP] and not self.is_v6:
- self._test.assertEqual(
- rx[self.L4PROTO].type, self.expect_dst_port)
- self._test.assertEqual(
- rx[self.L4PROTO].id, self.expect_src_port)
+ self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
+ self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
elif self.L4PROTO in [ICMP] and self.is_v6:
- self._test.assertEqual(
- rx[ICMPv6EchoRequest].id, self.expect_src_port)
+ self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
return self
def cnat_send_return(self):
"""This sends the return traffic"""
if self.L4PROTO in [TCP, UDP]:
- l4 = self.L4PROTO(sport=self.expect_dst_port,
- dport=self.expect_src_port)
+ l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
elif self.L4PROTO in [ICMP] and not self.is_v6:
# icmp type 0 if echo reply
l4 = self.L4PROTO(id=self.expect_src_port, type=0)
elif self.L4PROTO in [ICMP] and self.is_v6:
l4 = ICMPv6EchoReply(id=self.expect_src_port)
src_mac = self.expected_dst_pg.remote_mac
- p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
- self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
- l4 /
- Raw())
+ p1 = (
+ Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
+ / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
+ / l4
+ / Raw()
+ )
self.return_rxs = self._test.send_and_expect(
- self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
+ self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
+ )
return self
def cnat_expect_return(self):
@@ -288,12 +308,16 @@ class CnatTestContext(object):
ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
InnerIP = self.rxs[0][self.IP46]
p1 = (
- Ether(src=self.expected_dst_pg.remote_mac,
- dst=self.expected_dst_pg.local_mac) /
- self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
- ICMPelem / InnerIP)
+ Ether(
+ src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
+ )
+ / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
+ / ICMPelem
+ / InnerIP
+ )
self.return_rxs = self._test.send_and_expect(
- self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
+ self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
+ )
return self
def cnat_expect_icmp_error_return(self):
@@ -306,12 +330,11 @@ class CnatTestContext(object):
self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
- self._test.assertEqual(
- rx[ICMP46][IP46err][L4err].sport, self.src_port)
- self._test.assertEqual(
- rx[ICMP46][IP46err][L4err].dport, self.dst_port)
+ self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
+ self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
return self
+
# -------------------------------------------------------------------
# -------------------------------------------------------------------
# -------------------------------------------------------------------
@@ -319,7 +342,7 @@ class CnatTestContext(object):
class TestCNatTranslation(CnatCommonTestCase):
- """ CNat Translation """
+ """CNat Translation"""
@classmethod
def setUpClass(cls):
@@ -359,7 +382,7 @@ class TestCNatTranslation(CnatCommonTestCase):
super(TestCNatTranslation, self).tearDown()
def cnat_translation(self):
- """ CNat Translation """
+ """CNat Translation"""
self.logger.info(self.vapi.cli("sh cnat client"))
self.logger.info(self.vapi.cli("sh cnat translation"))
@@ -372,11 +395,9 @@ class TestCNatTranslation(CnatCommonTestCase):
ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
# from client to vip
- ctx.cnat_send(self.pg0, src_pgi, sport,
- self.pg1, vip.ip, vip.port)
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
dst_port = translation.paths[0][DST].port
- ctx.cnat_expect(self.pg0, src_pgi, sport,
- self.pg1, nbr, dst_port)
+ ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
# from vip to client
ctx.cnat_send_return().cnat_expect_return()
@@ -384,8 +405,9 @@ class TestCNatTranslation(CnatCommonTestCase):
# packets to the VIP that do not match a
# translation are dropped
#
- ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
- vip.ip, 6666, no_replies=True)
+ ctx.cnat_send(
+ self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
+ )
#
# packets from the VIP that do not match a
@@ -399,7 +421,8 @@ class TestCNatTranslation(CnatCommonTestCase):
#
old_dst_port = translation.paths[0][DST].port
translation.paths[0][DST].udpate(
- pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
+ pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
+ )
translation.add_vpp_config()
#
@@ -408,10 +431,10 @@ class TestCNatTranslation(CnatCommonTestCase):
for src_pgi in range(N_REMOTE_HOSTS):
for sport in [1234, 1233]:
# from client to vip
- ctx.cnat_send(self.pg0, src_pgi, sport,
- self.pg1, vip.ip, vip.port)
- ctx.cnat_expect(self.pg0, src_pgi, sport,
- self.pg1, nbr, old_dst_port)
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
+ ctx.cnat_expect(
+ self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
+ )
# from vip to client
ctx.cnat_send_return().cnat_expect_return()
@@ -419,8 +442,7 @@ class TestCNatTranslation(CnatCommonTestCase):
# new flows go to the new backend
#
for src_pgi in range(N_REMOTE_HOSTS):
- ctx.cnat_send(self.pg0, src_pgi, 9999,
- self.pg2, vip.ip, vip.port)
+ ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
self.logger.info(self.vapi.cli("sh cnat session verbose"))
@@ -444,10 +466,8 @@ class TestCNatTranslation(CnatCommonTestCase):
for src_pgi in range(N_REMOTE_HOSTS):
for sport in [1234, 1233]:
# from client to vip
- ctx.cnat_send(self.pg0, src_pgi, sport,
- self.pg2, vip.ip, vip.port)
- ctx.cnat_expect(self.pg0, src_pgi,
- sport, self.pg2, 0, 5000)
+ ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
+ ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
def _test_icmp(self):
@@ -477,51 +497,87 @@ class TestCNatTranslation(CnatCommonTestCase):
def _make_translations_v4(self):
self.translations = []
- self.translations.append(Translation(
- self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
- [(
- Endpoint(is_v6=False),
- Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
- )]
- ).add_vpp_config())
- self.translations.append(Translation(
- self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
- [(
- Endpoint(is_v6=False),
- Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
- )]
- ).add_vpp_config())
- self.translations.append(Translation(
- self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
- [(
- Endpoint(is_v6=False),
- Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
- )]
- ).add_vpp_config())
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
+ )
+ ],
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
+ )
+ ],
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ UDP,
+ Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
+ [
+ (
+ Endpoint(is_v6=False),
+ Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
+ )
+ ],
+ ).add_vpp_config()
+ )
def _make_translations_v6(self):
self.translations = []
- self.translations.append(Translation(
- self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
- [(
- Endpoint(is_v6=True),
- Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
- )]
- ).add_vpp_config())
- self.translations.append(Translation(
- self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
- [(
- Endpoint(is_v6=True),
- Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
- )]
- ).add_vpp_config())
- self.translations.append(Translation(
- self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
- [(
- Endpoint(is_v6=True),
- Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
- )]
- ).add_vpp_config())
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30::1", port=5555, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
+ )
+ ],
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ TCP,
+ Endpoint(ip="30::2", port=5554, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
+ )
+ ],
+ ).add_vpp_config()
+ )
+ self.translations.append(
+ Translation(
+ self,
+ UDP,
+ Endpoint(ip="30::2", port=5553, is_v6=True),
+ [
+ (
+ Endpoint(is_v6=True),
+ Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
+ )
+ ],
+ ).add_vpp_config()
+ )
def test_icmp4(self):
# """ CNat Translation icmp v4 """
@@ -545,7 +601,7 @@ class TestCNatTranslation(CnatCommonTestCase):
class TestCNatSourceNAT(CnatCommonTestCase):
- """ CNat Source NAT """
+ """CNat Source NAT"""
@classmethod
def setUpClass(cls):
@@ -559,28 +615,36 @@ class TestCNatSourceNAT(CnatCommonTestCase):
self.vapi.cnat_set_snat_addresses(
snat_ip4=self.pg2.remote_hosts[0].ip4,
snat_ip6=self.pg2.remote_hosts[0].ip6,
- sw_if_index=INVALID_INDEX)
+ sw_if_index=INVALID_INDEX,
+ )
self.vapi.feature_enable_disable(
enable=1 if is_enable else 0,
arc_name="ip6-unicast",
feature_name="cnat-snat-ip6",
- sw_if_index=self.pg0.sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ )
self.vapi.feature_enable_disable(
enable=1 if is_enable else 0,
arc_name="ip4-unicast",
feature_name="cnat-snat-ip4",
- sw_if_index=self.pg0.sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ )
policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
self.vapi.cnat_set_snat_policy(
- policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
+ policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
+ )
for i in self.pg_interfaces:
self.vapi.cnat_snat_policy_add_del_if(
- sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
- table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
+ sw_if_index=i.sw_if_index,
+ is_add=1 if is_enable else 0,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
+ )
self.vapi.cnat_snat_policy_add_del_if(
- sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
- table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
+ sw_if_index=i.sw_if_index,
+ is_add=1 if is_enable else 0,
+ table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
+ )
def setUp(self):
super(TestCNatSourceNAT, self).setUp()
@@ -624,7 +688,7 @@ class TestCNatSourceNAT(CnatCommonTestCase):
def sourcenat_test_icmp_echo_conf(self, is_v6=False):
ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
# 8 is ICMP type echo (v4 only)
- ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
+ ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
ctx.cnat_send_return().cnat_expect_return()
@@ -638,14 +702,15 @@ class TestCNatSourceNAT(CnatCommonTestCase):
# exclude dst address of pg1.1 from snat
if is_v6:
exclude_prefix = ip_network(
- "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
+ "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
+ )
else:
exclude_prefix = ip_network(
- "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
+ "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
+ )
# add remote host to exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=1)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
# We should not source NAT the id=1
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
@@ -658,8 +723,7 @@ class TestCNatSourceNAT(CnatCommonTestCase):
ctx.cnat_send_return().cnat_expect_return()
# remove remote host from exclude list
- self.vapi.cnat_snat_policy_add_del_exclude_pfx(
- prefix=exclude_prefix, is_add=0)
+ self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
self.vapi.cnat_session_purge()
# We should source NAT again
@@ -676,7 +740,7 @@ class TestCNatSourceNAT(CnatCommonTestCase):
class TestCNatDHCP(CnatCommonTestCase):
- """ CNat Translation """
+ """CNat Translation"""
@classmethod
def setUpClass(cls):
@@ -703,33 +767,35 @@ class TestCNatDHCP(CnatCommonTestCase):
def check_resolved(self, tr, addr_id, is_v6=False):
qt = tr.query_vpp_config()
- self.assertEqual(str(qt.vip.addr), self.make_addr(
- tr.vip.sw_if_index, addr_id, is_v6))
+ self.assertEqual(
+ str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
+ )
self.assertEqual(len(qt.paths), len(tr.paths))
for path_tr, path_qt in zip(tr.paths, qt.paths):
src_qt = path_qt.src_ep
dst_qt = path_qt.dst_ep
src_tr, dst_tr = path_tr
- self.assertEqual(str(src_qt.addr), self.make_addr(
- src_tr.sw_if_index, addr_id, is_v6))
- self.assertEqual(str(dst_qt.addr), self.make_addr(
- dst_tr.sw_if_index, addr_id, is_v6))
+ self.assertEqual(
+ str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
+ )
+ self.assertEqual(
+ str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
+ )
def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
self.vapi.sw_interface_add_del_address(
sw_if_index=pg.sw_if_index,
prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
- is_add=1 if is_add else 0)
+ is_add=1 if is_add else 0,
+ )
def _test_dhcp_v46(self, is_v6):
self.create_pg_interfaces(range(4))
for i in self.pg_interfaces:
i.admin_up()
paths = [
- (Endpoint(pg=self.pg1, is_v6=is_v6),
- Endpoint(pg=self.pg2, is_v6=is_v6)),
- (Endpoint(pg=self.pg1, is_v6=is_v6),
- Endpoint(pg=self.pg3, is_v6=is_v6))
+ (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
+ (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
]
ep = Endpoint(pg=self.pg0, is_v6=is_v6)
t = Translation(self, TCP, ep, paths).add_vpp_config()
@@ -766,10 +832,13 @@ class TestCNatDHCP(CnatCommonTestCase):
self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
r = self.vapi.cnat_get_snat_addresses()
- self.assertEqual(str(r.snat_ip4), self.make_addr(
- self.pg0.sw_if_index, addr_id=0, is_v6=False))
- self.assertEqual(str(r.snat_ip6), self.make_addr(
- self.pg0.sw_if_index, addr_id=0, is_v6=True))
+ self.assertEqual(
+ str(r.snat_ip4),
+ self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
+ )
+ self.assertEqual(
+ str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
+ )
# Add a new address on every interface, remove the old one
# and check it is reflected in the cnat config
for pg in self.pg_interfaces:
@@ -778,10 +847,13 @@ class TestCNatDHCP(CnatCommonTestCase):
self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
r = self.vapi.cnat_get_snat_addresses()
- self.assertEqual(str(r.snat_ip4), self.make_addr(
- self.pg0.sw_if_index, addr_id=1, is_v6=False))
- self.assertEqual(str(r.snat_ip6), self.make_addr(
- self.pg0.sw_if_index, addr_id=1, is_v6=True))
+ self.assertEqual(
+ str(r.snat_ip4),
+ self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
+ )
+ self.assertEqual(
+ str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
+ )
# remove the configuration
for pg in self.pg_interfaces:
self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
@@ -789,5 +861,5 @@ class TestCNatDHCP(CnatCommonTestCase):
self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)