aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat44_ei.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_nat44_ei.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_nat44_ei.py')
-rw-r--r--test/test_nat44_ei.py2521
1 files changed, 1419 insertions, 1102 deletions
diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py
index aafd345f43f..9eb127aaf0b 100644
--- a/test/test_nat44_ei.py
+++ b/test/test_nat44_ei.py
@@ -10,9 +10,19 @@ from io import BytesIO
import scapy.compat
from framework import VppTestCase, VppTestRunner
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
- IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
- PacketListField
+from scapy.all import (
+ bind_layers,
+ Packet,
+ ByteEnumField,
+ ShortField,
+ IPField,
+ IntField,
+ LongField,
+ XByteField,
+ FlagsField,
+ FieldLenField,
+ PacketListField,
+)
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
@@ -30,22 +40,22 @@ from vpp_papi import VppEnum
# NAT HA protocol event data
class Event(Packet):
name = "Event"
- fields_desc = [ByteEnumField("event_type", None,
- {1: "add", 2: "del", 3: "refresh"}),
- ByteEnumField("protocol", None,
- {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
- ShortField("flags", 0),
- IPField("in_addr", None),
- IPField("out_addr", None),
- ShortField("in_port", None),
- ShortField("out_port", None),
- IPField("eh_addr", None),
- IPField("ehn_addr", None),
- ShortField("eh_port", None),
- ShortField("ehn_port", None),
- IntField("fib_index", None),
- IntField("total_pkts", 0),
- LongField("total_bytes", 0)]
+ fields_desc = [
+ ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
+ ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
+ ShortField("flags", 0),
+ IPField("in_addr", None),
+ IPField("out_addr", None),
+ ShortField("in_port", None),
+ ShortField("out_port", None),
+ IPField("eh_addr", None),
+ IPField("ehn_addr", None),
+ ShortField("eh_port", None),
+ ShortField("ehn_port", None),
+ IntField("fib_index", None),
+ IntField("total_pkts", 0),
+ LongField("total_bytes", 0),
+ ]
def extract_padding(self, s):
return "", s
@@ -54,17 +64,18 @@ class Event(Packet):
# NAT HA protocol header
class HANATStateSync(Packet):
name = "HA NAT state sync"
- fields_desc = [XByteField("version", 1),
- FlagsField("flags", 0, 8, ['ACK']),
- FieldLenField("count", None, count_of="events"),
- IntField("sequence_number", 1),
- IntField("thread_index", 0),
- PacketListField("events", [], Event,
- count_from=lambda pkt: pkt.count)]
+ fields_desc = [
+ XByteField("version", 1),
+ FlagsField("flags", 0, 8, ["ACK"]),
+ FieldLenField("count", None, count_of="events"),
+ IntField("sequence_number", 1),
+ IntField("thread_index", 0),
+ PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
+ ]
class MethodHolder(VppTestCase):
- """ NAT create capture and verify method holder """
+ """NAT create capture and verify method holder"""
@property
def config_flags(self):
@@ -74,10 +85,19 @@ class MethodHolder(VppTestCase):
def SYSLOG_SEVERITY(self):
return VppEnum.vl_api_syslog_severity_t
- def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
- local_port=0, external_port=0, vrf_id=0,
- is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0, tag="", flags=0):
+ def nat44_add_static_mapping(
+ self,
+ local_ip,
+ external_ip="0.0.0.0",
+ local_port=0,
+ external_port=0,
+ vrf_id=0,
+ is_add=1,
+ external_sw_if_index=0xFFFFFFFF,
+ proto=0,
+ tag="",
+ flags=0,
+ ):
"""
Add/delete NAT44EI static mapping
@@ -103,9 +123,11 @@ class MethodHolder(VppTestCase):
external_sw_if_index=external_sw_if_index,
local_port=local_port,
external_port=external_port,
- vrf_id=vrf_id, protocol=proto,
+ vrf_id=vrf_id,
+ protocol=proto,
flags=flags,
- tag=tag)
+ tag=tag,
+ )
def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
"""
@@ -114,31 +136,40 @@ class MethodHolder(VppTestCase):
:param ip: IP address
:param is_add: 1 if add, 0 if delete (Default add)
"""
- self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip,
- last_ip_address=ip,
- vrf_id=vrf_id,
- is_add=is_add)
+ self.vapi.nat44_ei_add_del_address_range(
+ first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
+ )
def create_routes_and_neigbors(self):
- r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
- [VppRoutePath(self.pg7.remote_ip4,
- self.pg7.sw_if_index)])
- r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
- [VppRoutePath(self.pg8.remote_ip4,
- self.pg8.sw_if_index)])
+ r1 = VppIpRoute(
+ self,
+ self.pg7.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
+ )
+ r2 = VppIpRoute(
+ self,
+ self.pg8.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
+ )
r1.add_vpp_config()
r2.add_vpp_config()
- n1 = VppNeighbor(self,
- self.pg7.sw_if_index,
- self.pg7.remote_mac,
- self.pg7.remote_ip4,
- is_static=1)
- n2 = VppNeighbor(self,
- self.pg8.sw_if_index,
- self.pg8.remote_mac,
- self.pg8.remote_ip4,
- is_static=1)
+ n1 = VppNeighbor(
+ self,
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ is_static=1,
+ )
+ n2 = VppNeighbor(
+ self,
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ is_static=1,
+ )
n1.add_vpp_config()
n2.add_vpp_config()
@@ -156,21 +187,27 @@ class MethodHolder(VppTestCase):
pkts = []
# TCP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(sport=self.tcp_port_in, dport=20))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / TCP(sport=self.tcp_port_in, dport=20)
+ )
pkts.extend([p, p])
# UDP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(sport=self.udp_port_in, dport=20))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / UDP(sport=self.udp_port_in, dport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
return pkts
@@ -216,11 +253,10 @@ class MethodHolder(VppTestCase):
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
- def create_stream_out(self, out_if, dst_ip=None, ttl=64,
- use_inside_ports=False):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
"""
Create packet stream for outside network
@@ -242,21 +278,27 @@ class MethodHolder(VppTestCase):
icmp_id = self.icmp_id_in
pkts = []
# TCP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=tcp_port, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / TCP(dport=tcp_port, sport=20)
+ )
pkts.extend([p, p])
# UDP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=udp_port, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / UDP(dport=udp_port, sport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=icmp_id, type='echo-reply'))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / ICMP(id=icmp_id, type="echo-reply")
+ )
pkts.append(p)
return pkts
@@ -271,27 +313,40 @@ class MethodHolder(VppTestCase):
"""
pkts = []
# TCP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- TCP(dport=self.tcp_port_out, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / TCP(dport=self.tcp_port_out, sport=20)
+ )
pkts.append(p)
# UDP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- UDP(dport=self.udp_port_out, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / UDP(dport=self.udp_port_out, sport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- ICMPv6EchoReply(id=self.icmp_id_out))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / ICMPv6EchoReply(id=self.icmp_id_out)
+ )
pkts.append(p)
return pkts
- def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- dst_ip=None, is_ip6=False, ignore_port=False):
+ def verify_capture_out(
+ self,
+ capture,
+ nat_ip=None,
+ same_port=False,
+ dst_ip=None,
+ is_ip6=False,
+ ignore_port=False,
+ ):
"""
Verify captured packets on outside network
@@ -319,39 +374,33 @@ class MethodHolder(VppTestCase):
if packet.haslayer(TCP):
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
else:
- self.assertNotEqual(
- packet[TCP].sport, self.tcp_port_in)
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
self.assert_packet_checksums_valid(packet)
elif packet.haslayer(UDP):
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
- self.assertNotEqual(
- packet[UDP].sport, self.udp_port_in)
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
self.udp_port_out = packet[UDP].sport
else:
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[ICMP46].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
else:
- self.assertNotEqual(
- packet[ICMP46].id, self.icmp_id_in)
+ self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP46].id
self.assert_packet_checksums_valid(packet)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
- def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
- dst_ip=None):
+ def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
"""
Verify captured packets on outside network
@@ -360,8 +409,7 @@ class MethodHolder(VppTestCase):
:param same_port: Source port number is not translated (Default False)
:param dst_ip: Destination IP address (Default do not verify)
"""
- return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
- True)
+ return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
def verify_capture_in(self, capture, in_if):
"""
@@ -381,8 +429,9 @@ class MethodHolder(VppTestCase):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
def verify_capture_no_translation(self, capture, ingress_if, egress_if):
@@ -404,12 +453,12 @@ class MethodHolder(VppTestCase):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
- def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
- icmp_type=11):
+ def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
"""
Verify captured packets with ICMP errors on outside network
@@ -430,16 +479,15 @@ class MethodHolder(VppTestCase):
self.assertTrue(icmp.haslayer(IPerror))
inner_ip = icmp[IPerror]
if inner_ip.haslayer(TCPerror):
- self.assertEqual(inner_ip[TCPerror].dport,
- self.tcp_port_out)
+ self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
elif inner_ip.haslayer(UDPerror):
- self.assertEqual(inner_ip[UDPerror].dport,
- self.udp_port_out)
+ self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
else:
self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
@@ -460,20 +508,20 @@ class MethodHolder(VppTestCase):
self.assertTrue(icmp.haslayer(IPerror))
inner_ip = icmp[IPerror]
if inner_ip.haslayer(TCPerror):
- self.assertEqual(inner_ip[TCPerror].sport,
- self.tcp_port_in)
+ self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
elif inner_ip.haslayer(UDPerror):
- self.assertEqual(inner_ip[UDPerror].sport,
- self.udp_port_in)
+ self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
else:
self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
- def create_stream_frag(self, src_if, dst, sport, dport, data,
- proto=IP_PROTOS.tcp, echo_reply=False):
+ def create_stream_frag(
+ self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
+ ):
"""
Create fragmented packet stream
@@ -487,9 +535,11 @@ class MethodHolder(VppTestCase):
:returns: Fragments
"""
if proto == IP_PROTOS.tcp:
- p = (IP(src=src_if.remote_ip4, dst=dst) /
- TCP(sport=sport, dport=dport) /
- Raw(data))
+ p = (
+ IP(src=src_if.remote_ip4, dst=dst)
+ / TCP(sport=sport, dport=dport)
+ / Raw(data)
+ )
p = p.__class__(scapy.compat.raw(p))
chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
@@ -497,9 +547,9 @@ class MethodHolder(VppTestCase):
proto_header = UDP(sport=sport, dport=dport)
elif proto == IP_PROTOS.icmp:
if not echo_reply:
- proto_header = ICMP(id=sport, type='echo-request')
+ proto_header = ICMP(id=sport, type="echo-request")
else:
- proto_header = ICMP(id=sport, type='echo-reply')
+ proto_header = ICMP(id=sport, type="echo-reply")
else:
raise Exception("Unsupported protocol")
id = random.randint(0, 65535)
@@ -508,28 +558,32 @@ class MethodHolder(VppTestCase):
raw = Raw(data[0:4])
else:
raw = Raw(data[0:16])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
- proto_header /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
+ / proto_header
+ / raw
+ )
pkts.append(p)
if proto == IP_PROTOS.tcp:
raw = Raw(data[4:20])
else:
raw = Raw(data[16:32])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
- proto=proto) /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
+ / raw
+ )
pkts.append(p)
if proto == IP_PROTOS.tcp:
raw = Raw(data[20:])
else:
raw = Raw(data[32:])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
- id=id) /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
+ / raw
+ )
pkts.append(p)
return pkts
@@ -550,17 +604,15 @@ class MethodHolder(VppTestCase):
self.assert_ip_checksum_valid(p)
buffer.seek(p[IP].frag * 8)
buffer.write(bytes(p[IP].payload))
- ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
- proto=frags[0][IP].proto)
+ ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
if ip.proto == IP_PROTOS.tcp:
- p = (ip / TCP(buffer.getvalue()))
+ p = ip / TCP(buffer.getvalue())
self.logger.debug(ppp("Reassembled:", p))
self.assert_tcp_checksum_valid(p)
elif ip.proto == IP_PROTOS.udp:
- p = (ip / UDP(buffer.getvalue()[:8]) /
- Raw(buffer.getvalue()[8:]))
+ p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
elif ip.proto == IP_PROTOS.icmp:
- p = (ip / ICMP(buffer.getvalue()))
+ p = ip / ICMP(buffer.getvalue())
return p
def verify_ipfix_nat44_ses(self, data):
@@ -580,29 +632,24 @@ class MethodHolder(VppTestCase):
else:
nat44_ses_delete_num += 1
# sourceIPv4Address
- self.assertEqual(self.pg0.remote_ip4,
- str(ipaddress.IPv4Address(record[8])))
+ self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
# postNATSourceIPv4Address
- self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
- record[225])
+ self.assertEqual(
+ socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
+ )
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort
# /postNAPTSourceTransportPort
if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
- self.assertEqual(struct.pack("!H", self.icmp_id_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
- self.assertEqual(struct.pack("!H", self.tcp_port_in),
- record[7])
- self.assertEqual(struct.pack("!H", self.tcp_port_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
+ self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
- self.assertEqual(struct.pack("!H", self.udp_port_in),
- record[7])
- self.assertEqual(struct.pack("!H", self.udp_port_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
+ self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
else:
self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
self.assertEqual(3, nat44_ses_create_num)
@@ -627,16 +674,16 @@ class MethodHolder(VppTestCase):
self.assertEqual(struct.pack("!I", limit), record[471])
def verify_no_nat44_user(self):
- """ Verify that there is no NAT44EI user """
+ """Verify that there is no NAT44EI user"""
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 0)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[0][0], 0)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[0][0], 0)
def verify_syslog_apmap(self, data, is_add=True):
- message = data.decode('utf-8')
+ message = data.decode("utf-8")
try:
message = SyslogMessage.parse(message)
except ParseError as e:
@@ -644,26 +691,26 @@ class MethodHolder(VppTestCase):
raise
else:
self.assertEqual(message.severity, SyslogSeverity.info)
- self.assertEqual(message.appname, 'NAT')
- self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
- sd_params = message.sd.get('napmap')
+ self.assertEqual(message.appname, "NAT")
+ self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
+ sd_params = message.sd.get("napmap")
self.assertTrue(sd_params is not None)
- self.assertEqual(sd_params.get('IATYP'), 'IPv4')
- self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
- self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
- self.assertEqual(sd_params.get('XATYP'), 'IPv4')
- self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
- self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
- self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
- self.assertTrue(sd_params.get('SSUBIX') is not None)
- self.assertEqual(sd_params.get('SVLAN'), '0')
+ self.assertEqual(sd_params.get("IATYP"), "IPv4")
+ self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
+ self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
+ self.assertEqual(sd_params.get("XATYP"), "IPv4")
+ self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
+ self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
+ self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
+ self.assertTrue(sd_params.get("SSUBIX") is not None)
+ self.assertEqual(sd_params.get("SVLAN"), "0")
def verify_mss_value(self, pkt, mss):
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
raise TypeError("Not a TCP/IP packet")
for option in pkt[TCP].options:
- if option[0] == 'MSS':
+ if option[0] == "MSS":
self.assertEqual(option[1], mss)
self.assert_tcp_checksum_valid(pkt)
@@ -678,8 +725,9 @@ class MethodHolder(VppTestCase):
else:
raise Exception("Unsupported protocol")
- def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
- ignore_port=False):
+ def frag_in_order(
+ self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
+ ):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
@@ -689,20 +737,19 @@ class MethodHolder(VppTestCase):
self.port_in = random.randint(1025, 65535)
# in2out
- pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
- self.port_in, 20, data, proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
+ )
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg1.get_capture(len(pkts))
if not dont_translate:
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
else:
- p = self.reass_frags_and_verify(frags,
- self.pg0.remote_ip4,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg0.remote_ip4, self.pg1.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
if not dont_translate:
self.assertEqual(p[layer].dport, 20)
@@ -729,15 +776,14 @@ class MethodHolder(VppTestCase):
else:
sport = p[layer].id
dport = 0
- pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
- proto, echo_reply=True)
+ pkts = self.create_stream_frag(
+ self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
if proto != IP_PROTOS.icmp:
self.assertEqual(p[layer].sport, 20)
self.assertEqual(p[layer].dport, self.port_in)
@@ -745,9 +791,15 @@ class MethodHolder(VppTestCase):
self.assertEqual(p[layer].id, self.port_in)
self.assertEqual(data, p[Raw].load)
- def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.tcp,
- ignore_port=False):
+ def reass_hairpinning(
+ self,
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.tcp,
+ ignore_port=False,
+ ):
layer = self.proto2layer(proto)
@@ -757,19 +809,14 @@ class MethodHolder(VppTestCase):
data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
- pkts = self.create_stream_frag(self.pg0,
- self.nat_addr,
- host_in_port,
- server_out_port,
- data,
- proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
+ )
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- server_addr)
+ p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
if proto != IP_PROTOS.icmp:
if not ignore_port:
self.assertNotEqual(p[layer].sport, host_in_port)
@@ -779,8 +826,9 @@ class MethodHolder(VppTestCase):
self.assertNotEqual(p[layer].id, host_in_port)
self.assertEqual(data, p[Raw].load)
- def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
- ignore_port=False):
+ def frag_out_of_order(
+ self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
+ ):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
@@ -791,21 +839,22 @@ class MethodHolder(VppTestCase):
for i in range(2):
# in2out
- pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
- self.port_in, 20, data, proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
+ )
pkts.reverse()
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg1.get_capture(len(pkts))
if not dont_translate:
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.nat_addr, self.pg1.remote_ip4
+ )
else:
- p = self.reass_frags_and_verify(frags,
- self.pg0.remote_ip4,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg0.remote_ip4, self.pg1.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
if not dont_translate:
self.assertEqual(p[layer].dport, 20)
@@ -832,16 +881,17 @@ class MethodHolder(VppTestCase):
else:
sport = p[layer].id
dport = 0
- pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
- data, proto, echo_reply=True)
+ pkts = self.create_stream_frag(
+ self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
+ )
pkts.reverse()
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg1.remote_ip4, self.pg0.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
self.assertEqual(p[layer].sport, 20)
self.assertEqual(p[layer].dport, self.port_in)
@@ -861,7 +911,7 @@ def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
class TestNAT44EI(MethodHolder):
- """ NAT44EI Test Cases """
+ """NAT44EI Test Cases"""
max_translations = 10240
max_users = 10240
@@ -877,7 +927,7 @@ class TestNAT44EI(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
+ cls.nat_addr = "10.0.0.3"
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
cls.tcp_external_port = 80
@@ -898,8 +948,8 @@ class TestNAT44EI(MethodHolder):
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
@@ -921,8 +971,8 @@ class TestNAT44EI(MethodHolder):
cls.pg9.generate_remote_hosts(2)
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg9.sw_if_index,
- prefix="10.0.0.1/24")
+ sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
+ )
cls.pg9.admin_up()
cls.pg9.resolve_arp()
@@ -932,8 +982,8 @@ class TestNAT44EI(MethodHolder):
def plugin_enable(self):
self.vapi.nat44_ei_plugin_enable_disable(
- sessions=self.max_translations,
- users=self.max_users, enable=1)
+ sessions=self.max_translations, users=self.max_users, enable=1
+ )
def setUp(self):
super(TestNAT44EI, self).setUp()
@@ -943,8 +993,8 @@ class TestNAT44EI(MethodHolder):
super(TestNAT44EI, self).tearDown()
if not self.vpp_dead:
self.vapi.nat44_ei_ipfix_enable_disable(
- domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port,
- enable=0)
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
+ )
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
@@ -952,16 +1002,16 @@ class TestNAT44EI(MethodHolder):
self.vapi.cli("clear logging")
def test_clear_sessions(self):
- """ NAT44EI session clearing test """
+ """NAT44EI session clearing test"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -970,32 +1020,32 @@ class TestNAT44EI(MethodHolder):
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
self.logger.info("sessions before clearing: %s" % sessions[0][0])
self.vapi.cli("clear nat44 ei sessions")
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
self.logger.info("sessions after clearing: %s" % sessions[0][0])
def test_dynamic(self):
- """ NAT44EI dynamic translation test """
+ """NAT44EI dynamic translation test"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
- tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp']
- udpn = self.statistics['/nat44-ei/in2out/slowpath/udp']
- icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp']
- drops = self.statistics['/nat44-ei/in2out/slowpath/drops']
+ tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
+ udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
+ icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
+ drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -1005,20 +1055,20 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
if_idx = self.pg0.sw_if_index
- cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/udp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/drops']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
# out2in
- tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp']
- udpn = self.statistics['/nat44-ei/out2in/slowpath/udp']
- icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp']
- drops = self.statistics['/nat44-ei/out2in/slowpath/drops']
+ tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
+ udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
+ icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
+ drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
pkts = self.create_stream_out(self.pg1)
self.pg1.add_stream(pkts)
@@ -1028,31 +1078,31 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
if_idx = self.pg1.sw_if_index
- cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/udp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/drops']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[:, 0].sum(), 1)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 3)
def test_dynamic_icmp_errors_in2out_ttl_1(self):
- """ NAT44EI handling of client packets with TTL=1 """
+ """NAT44EI handling of client packets with TTL=1"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - generate traffic
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
@@ -1062,16 +1112,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_1(self):
- """ NAT44EI handling of server packets with TTL=1 """
+ """NAT44EI handling of server packets with TTL=1"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - create sessions
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1086,21 +1136,19 @@ class TestNAT44EI(MethodHolder):
capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
# Server side - verify ICMP type 11 packets
- self.verify_capture_out_with_icmp_errors(capture,
- src_ip=self.pg1.local_ip4)
+ self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
def test_dynamic_icmp_errors_in2out_ttl_2(self):
- """ NAT44EI handling of error responses to client packets with TTL=2
- """
+ """NAT44EI handling of error responses to client packets with TTL=2"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - generate traffic
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
@@ -1110,9 +1158,13 @@ class TestNAT44EI(MethodHolder):
# Server side - simulate ICMP type 11 response
capture = self.pg1.get_capture(len(pkts))
- pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- ICMP(type=11) / packet[IP] for packet in capture]
+ pkts = [
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / ICMP(type=11)
+ / packet[IP]
+ for packet in capture
+ ]
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1122,17 +1174,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_2(self):
- """ NAT44EI handling of error responses to server packets with TTL=2
- """
+ """NAT44EI handling of error responses to server packets with TTL=2"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - create sessions
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1150,9 +1201,13 @@ class TestNAT44EI(MethodHolder):
# Client side - simulate ICMP type 11 response
capture = self.pg0.get_capture(len(pkts))
- pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- ICMP(type=11) / packet[IP] for packet in capture]
+ pkts = [
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / ICMP(type=11)
+ / packet[IP]
+ for packet in capture
+ ]
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1162,20 +1217,22 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out_with_icmp_errors(capture)
def test_ping_out_interface_from_outside(self):
- """ NAT44EI ping out interface from outside network """
+ """NAT44EI ping out interface from outside network"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
- ICMP(id=self.icmp_id_out, type='echo-request'))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
+ / ICMP(id=self.icmp_id_out, type="echo-request")
+ )
pkts = [p]
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1188,26 +1245,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
self.assertEqual(packet[ICMP].type, 0) # echo reply
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
def test_ping_internal_host_from_outside(self):
- """ NAT44EI ping internal host from outside network """
+ """NAT44EI ping internal host from outside network"""
self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# out2in
- pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
- ICMP(id=self.icmp_id_out, type='echo-request'))
+ pkt = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
+ / ICMP(id=self.icmp_id_out, type="echo-request")
+ )
self.pg1.add_stream(pkt)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1216,9 +1276,11 @@ class TestNAT44EI(MethodHolder):
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
# in2out
- pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
- ICMP(id=self.icmp_id_in, type='echo-reply'))
+ pkt = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
+ / ICMP(id=self.icmp_id_in, type="echo-reply")
+ )
self.pg0.add_stream(pkt)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1227,25 +1289,27 @@ class TestNAT44EI(MethodHolder):
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
def test_forwarding(self):
- """ NAT44EI forwarding test """
+ """NAT44EI forwarding test"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
real_ip = self.pg0.remote_ip4
alias_ip = self.nat_addr
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_static_mapping(
- is_add=1, local_ip_address=real_ip,
+ is_add=1,
+ local_ip_address=real_ip,
external_ip_address=alias_ip,
external_sw_if_index=0xFFFFFFFF,
- flags=flags)
+ flags=flags,
+ )
try:
# static mapping match
@@ -1269,9 +1333,9 @@ class TestNAT44EI(MethodHolder):
host0 = self.pg0.remote_hosts[0]
self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
try:
- pkts = self.create_stream_out(self.pg1,
- dst_ip=self.pg0.remote_ip4,
- use_inside_ports=True)
+ pkts = self.create_stream_out(
+ self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1283,8 +1347,9 @@ class TestNAT44EI(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
- same_port=True)
+ self.verify_capture_out(
+ capture, nat_ip=self.pg0.remote_ip4, same_port=True
+ )
finally:
self.pg0.remote_hosts[0] = host0
@@ -1296,10 +1361,11 @@ class TestNAT44EI(MethodHolder):
local_ip_address=real_ip,
external_ip_address=alias_ip,
external_sw_if_index=0xFFFFFFFF,
- flags=flags)
+ flags=flags,
+ )
def test_static_in(self):
- """ NAT44EI 1:1 NAT initialized from inside network """
+ """NAT44EI 1:1 NAT initialized from inside network"""
nat_ip = "10.0.0.10"
self.tcp_port_out = 6303
@@ -1309,14 +1375,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
sm = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual(sm[0].tag, '')
+ self.assertEqual(sm[0].tag, "")
self.assertEqual(sm[0].protocol, 0)
self.assertEqual(sm[0].local_port, 0)
self.assertEqual(sm[0].external_port, 0)
@@ -1338,7 +1404,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
def test_static_out(self):
- """ NAT44EI 1:1 NAT initialized from outside network """
+ """NAT44EI 1:1 NAT initialized from outside network"""
nat_ip = "10.0.0.20"
self.tcp_port_out = 6303
@@ -1349,11 +1415,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
sm = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(len(sm), 1)
self.assertEqual(sm[0].tag, tag)
@@ -1375,29 +1441,41 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip, True)
def test_static_with_port_in(self):
- """ NAT44EI 1:1 NAPT initialized from inside network """
+ """NAT44EI 1:1 NAPT initialized from inside network"""
self.tcp_port_out = 3606
self.udp_port_out = 3607
self.icmp_id_out = 3608
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1416,29 +1494,41 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
def test_static_with_port_out(self):
- """ NAT44EI 1:1 NAPT initialized from outside network """
+ """NAT44EI 1:1 NAPT initialized from outside network"""
self.tcp_port_out = 30606
self.udp_port_out = 30607
self.icmp_id_out = 30608
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg1)
@@ -1457,7 +1547,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
def test_static_vrf_aware(self):
- """ NAT44EI 1:1 NAT VRF awareness """
+ """NAT44EI 1:1 NAT VRF awareness"""
nat_ip1 = "10.0.0.30"
nat_ip2 = "10.0.0.40"
@@ -1465,20 +1555,18 @@ class TestNAT44EI(MethodHolder):
self.udp_port_out = 6304
self.icmp_id_out = 6305
- self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
- vrf_id=10)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
- vrf_id=10)
+ self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg4.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
+ )
# inside interface VRF match NAT44EI static mapping VRF
pkts = self.create_stream_in(self.pg4, self.pg3)
@@ -1497,7 +1585,7 @@ class TestNAT44EI(MethodHolder):
self.pg3.assert_nothing_captured()
def test_dynamic_to_static(self):
- """ NAT44EI Switch from dynamic translation to 1:1NAT """
+ """NAT44EI Switch from dynamic translation to 1:1NAT"""
nat_ip = "10.0.0.10"
self.tcp_port_out = 6303
self.udp_port_out = 6304
@@ -1506,11 +1594,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# dynamic
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1532,22 +1620,27 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip, True)
def test_identity_nat(self):
- """ NAT44EI Identity NAT """
+ """NAT44EI Identity NAT"""
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
- flags=flags, is_add=1)
+ ip_address=self.pg0.remote_ip4,
+ sw_if_index=0xFFFFFFFF,
+ flags=flags,
+ is_add=1,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
- TCP(sport=12345, dport=56789))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
+ / TCP(sport=12345, dport=56789)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1569,26 +1662,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(len(sessions), 0)
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
- flags=flags, vrf_id=1, is_add=1)
+ ip_address=self.pg0.remote_ip4,
+ sw_if_index=0xFFFFFFFF,
+ flags=flags,
+ vrf_id=1,
+ is_add=1,
+ )
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(len(identity_mappings), 2)
def test_multiple_inside_interfaces(self):
- """ NAT44EI multiple non-overlapping address space inside interfaces
- """
+ """NAT44EI multiple non-overlapping address space inside interfaces"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# between two NAT44EI inside interfaces (no translation)
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1639,26 +1735,24 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg1)
def test_inside_overlapping_interfaces(self):
- """ NAT44EI multiple inside interfaces with overlapping address space
- """
+ """NAT44EI multiple inside interfaces with overlapping address space"""
static_nat_ip = "10.0.0.10"
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg4.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg5.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg6.sw_if_index,
- flags=flags, is_add=1)
- self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
- vrf_id=20)
+ sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
+ )
+ self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
# between NAT44EI inside interfaces with same VRF (no translation)
pkts = self.create_stream_in(self.pg4, self.pg5)
@@ -1669,9 +1763,11 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_no_translation(capture, self.pg4, self.pg5)
# between NAT44EI inside interfaces with different VRF (hairpinning)
- p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
- IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
- TCP(sport=1234, dport=5678))
+ p = (
+ Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
+ / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
+ / TCP(sport=1234, dport=5678)
+ )
self.pg4.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1723,16 +1819,12 @@ class TestNAT44EI(MethodHolder):
# pg5 session dump
addresses = self.vapi.nat44_ei_address_dump()
self.assertEqual(len(addresses), 1)
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg5.remote_ip4, 10)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
self.assertEqual(len(sessions), 3)
for session in sessions:
- self.assertFalse(session.flags &
- self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg5.remote_ip4)
- self.assertEqual(session.outside_ip_address,
- addresses[0].ip_address)
+ self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
+ self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
@@ -1765,44 +1857,38 @@ class TestNAT44EI(MethodHolder):
addresses = self.vapi.nat44_ei_address_dump()
self.assertEqual(len(addresses), 1)
for user in users:
- sessions = self.vapi.nat44_ei_user_session_dump(user.ip_address,
- user.vrf_id)
+ sessions = self.vapi.nat44_ei_user_session_dump(
+ user.ip_address, user.vrf_id
+ )
for session in sessions:
self.assertEqual(user.ip_address, session.inside_ip_address)
self.assertTrue(session.total_bytes > session.total_pkts > 0)
- self.assertTrue(session.protocol in
- [IP_PROTOS.tcp, IP_PROTOS.udp,
- IP_PROTOS.icmp])
+ self.assertTrue(
+ session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
+ )
# pg4 session dump
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg4.remote_ip4, 10)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
- self.assertFalse(
- session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg4.remote_ip4)
- self.assertEqual(session.outside_ip_address,
- addresses[0].ip_address)
+ self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
+ self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
# pg6 session dump
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg6.remote_ip4, 20)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
+ self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
+ self.assertEqual(str(session.outside_ip_address), static_nat_ip)
self.assertTrue(
- session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg6.remote_ip4)
- self.assertEqual(str(session.outside_ip_address),
- static_nat_ip)
- self.assertTrue(session.inside_port in
- [self.tcp_port_in, self.udp_port_in,
- self.icmp_id_in])
+ session.inside_port
+ in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
+ )
def test_hairpinning(self):
- """ NAT44EI hairpinning - 1:1 NAPT """
+ """NAT44EI hairpinning - 1:1 NAPT"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -1814,22 +1900,28 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
-
- cnt = self.statistics['/nat44-ei/hairpinning']
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1848,14 +1940,16 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1873,13 +1967,15 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
- self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
- 2+(1 if self.vpp_worker_count > 0 else 0))
+ self.assertEqual(
+ after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+ 2 + (1 if self.vpp_worker_count > 0 else 0),
+ )
def test_hairpinning2(self):
- """ NAT44EI hairpinning - 1:1 NAT"""
+ """NAT44EI hairpinning - 1:1 NAT"""
server1_nat_ip = "10.0.0.10"
server2_nat_ip = "10.0.0.11"
@@ -1892,11 +1988,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for servers
self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
@@ -1904,17 +2000,23 @@ class TestNAT44EI(MethodHolder):
# host to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1942,17 +2044,23 @@ class TestNAT44EI(MethodHolder):
# server1 to host
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1977,17 +2085,23 @@ class TestNAT44EI(MethodHolder):
# server2 to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -2015,17 +2129,23 @@ class TestNAT44EI(MethodHolder):
# server1 to server2
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -2049,7 +2169,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_hairpinning_avoid_inf_loop(self):
- """ NAT44EI hairpinning - 1:1 NAPT avoid infinite loop """
+ """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -2061,34 +2181,42 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
# add another static mapping that maps pg0.local_ip4 address to itself
self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
# send packet from host to VPP (the packet should get dropped)
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.pg0.local_ip4) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.pg0.local_ip4)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
# Here VPP used to crash due to an infinite loop
- cnt = self.statistics['/nat44-ei/hairpinning']
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2107,14 +2235,16 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2132,16 +2262,18 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
- self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
- 2+(1 if self.vpp_worker_count > 0 else 0))
+ self.assertEqual(
+ after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+ 2 + (1 if self.vpp_worker_count > 0 else 0),
+ )
def test_interface_addr(self):
- """ NAT44EI acquire addresses from interface """
+ """NAT44EI acquire addresses from interface"""
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
# no address in NAT pool
addresses = self.vapi.nat44_ei_address_dump()
@@ -2159,22 +2291,20 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(0, len(addresses))
def test_interface_addr_static_mapping(self):
- """ NAT44EI Static mapping with addresses from interface """
+ """NAT44EI Static mapping with addresses from interface"""
tag = "testTAG"
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
self.nat44_add_static_mapping(
- '1.2.3.4',
- external_sw_if_index=self.pg7.sw_if_index,
- tag=tag)
+ "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
+ )
# static mappings with external interface
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(1, len(static_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- static_mappings[0].external_sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
self.assertEqual(static_mappings[0].tag, tag)
# configure interface address and check static mappings
@@ -2184,8 +2314,7 @@ class TestNAT44EI(MethodHolder):
resolved = False
for sm in static_mappings:
if sm.external_sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(sm.external_ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
@@ -2194,8 +2323,7 @@ class TestNAT44EI(MethodHolder):
self.pg7.unconfig_ip4()
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(1, len(static_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- static_mappings[0].external_sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
self.assertEqual(static_mappings[0].tag, tag)
# configure interface address again and check static mappings
@@ -2205,40 +2333,37 @@ class TestNAT44EI(MethodHolder):
resolved = False
for sm in static_mappings:
if sm.external_sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(sm.external_ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
# remove static mapping
self.nat44_add_static_mapping(
- '1.2.3.4',
- external_sw_if_index=self.pg7.sw_if_index,
- tag=tag,
- is_add=0)
+ "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
+ )
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(0, len(static_mappings))
def test_interface_addr_identity_nat(self):
- """ NAT44EI Identity NAT with addresses from interface """
+ """NAT44EI Identity NAT with addresses from interface"""
port = 53053
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=b'0',
+ ip_address=b"0",
sw_if_index=self.pg7.sw_if_index,
port=port,
protocol=IP_PROTOS.tcp,
- is_add=1)
+ is_add=1,
+ )
# identity mappings with external interface
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(1, len(identity_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- identity_mappings[0].sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
# configure interface address and check identity mappings
self.pg7.config_ip4()
@@ -2247,8 +2372,9 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(2, len(identity_mappings))
for sm in identity_mappings:
if sm.sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(identity_mappings[0].ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(
+ str(identity_mappings[0].ip_address), self.pg7.local_ip4
+ )
self.assertEqual(port, identity_mappings[0].port)
self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
resolved = True
@@ -2258,11 +2384,10 @@ class TestNAT44EI(MethodHolder):
self.pg7.unconfig_ip4()
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(1, len(identity_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- identity_mappings[0].sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
def test_ipfix_nat44_sess(self):
- """ NAT44EI IPFIX logging NAT44EI session created/deleted """
+ """NAT44EI IPFIX logging NAT44EI session created/deleted"""
self.ipfix_domain_id = 10
self.ipfix_src_port = 20202
collector_port = 30303
@@ -2270,19 +2395,21 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10,
- collector_port=collector_port)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ collector_port=collector_port,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -2301,8 +2428,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, collector_port)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2312,25 +2438,29 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_nat44_ses(data)
def test_ipfix_addr_exhausted(self):
- """ NAT44EI IPFIX logging NAT addresses exhausted """
+ """NAT44EI IPFIX logging NAT addresses exhausted"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=3025))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=3025)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2345,8 +2475,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2356,15 +2485,15 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_addr_exhausted(data)
def test_ipfix_max_sessions(self):
- """ NAT44EI IPFIX logging maximum session entries exceeded """
+ """NAT44EI IPFIX logging maximum session entries exceeded"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
max_sessions_per_thread = self.max_translations
max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
@@ -2372,26 +2501,32 @@ class TestNAT44EI(MethodHolder):
pkts = []
for i in range(0, max_sessions):
src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=src, dst=self.pg1.remote_ip4) /
- TCP(sport=1025))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=src, dst=self.pg1.remote_ip4)
+ / TCP(sport=1025)
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.get_capture(max_sessions)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1025))
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
+
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=1025)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2406,8 +2541,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2417,22 +2551,23 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
def test_syslog_apmap(self):
- """ NAT44EI syslog address and port mapping creation and deletion """
- self.vapi.syslog_set_filter(
- self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
+ """NAT44EI syslog address and port mapping creation and deletion"""
+ self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=20))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=20)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2448,22 +2583,25 @@ class TestNAT44EI(MethodHolder):
self.verify_syslog_apmap(capture[0][Raw].load, False)
def test_pool_addr_fib(self):
- """ NAT44EI add pool addresses to FIB """
- static_addr = '10.0.0.10'
+ """NAT44EI add pool addresses to FIB"""
+ static_addr = "10.0.0.10"
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
# NAT44EI address
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2472,9 +2610,12 @@ class TestNAT44EI(MethodHolder):
self.assertTrue(capture[0][ARP].op, ARP.is_at)
# 1:1 NAT address
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=static_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=static_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2483,9 +2624,12 @@ class TestNAT44EI(MethodHolder):
self.assertTrue(capture[0][ARP].op, ARP.is_at)
# send ARP to non-NAT44EI interface
- p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
+ p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg2.remote_ip4,
+ hwsrc=self.pg2.remote_mac,
+ )
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2493,27 +2637,32 @@ class TestNAT44EI(MethodHolder):
# remove addresses and verify
self.nat44_add_address(self.nat_addr, is_add=0)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
- is_add=0)
-
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
+
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=static_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=static_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
def test_vrf_mode(self):
- """ NAT44EI tenant VRF aware address pool mode """
+ """NAT44EI tenant VRF aware address pool mode"""
vrf_id1 = 1
vrf_id2 = 2
@@ -2522,8 +2671,8 @@ class TestNAT44EI(MethodHolder):
self.pg0.unconfig_ip4()
self.pg1.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
self.pg0.set_table_ip4(vrf_id1)
self.pg1.set_table_ip4(vrf_id2)
self.pg0.config_ip4()
@@ -2535,14 +2684,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
try:
# first VRF
@@ -2570,11 +2719,11 @@ class TestNAT44EI(MethodHolder):
self.pg1.config_ip4()
self.pg0.resolve_arp()
self.pg1.resolve_arp()
- self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
def test_vrf_feature_independent(self):
- """ NAT44EI tenant VRF independent address pool mode """
+ """NAT44EI tenant VRF independent address pool mode"""
nat_ip1 = "10.0.0.10"
nat_ip2 = "10.0.0.11"
@@ -2583,14 +2732,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(nat_ip2, vrf_id=99)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
# first VRF
pkts = self.create_stream_in(self.pg0, self.pg2)
@@ -2609,16 +2758,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip1)
def test_dynamic_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address """
+ """NAT44EI interfaces without configured IP address"""
self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg7, self.pg8)
@@ -2637,17 +2786,17 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg7)
def test_static_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address - 1:1 NAT """
+ """NAT44EI interfaces without configured IP address - 1:1 NAT"""
self.create_routes_and_neigbors()
self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg8)
@@ -2666,7 +2815,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, self.nat_addr, True)
def test_static_with_port_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
+ """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
self.tcp_port_out = 30606
self.udp_port_out = 30607
@@ -2674,22 +2823,34 @@ class TestNAT44EI(MethodHolder):
self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg8)
@@ -2708,23 +2869,25 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
def test_static_unknown_proto(self):
- """ NAT44EI 1:1 translate packet with unknown protocol """
+ """NAT44EI 1:1 translate packet with unknown protocol"""
nat_ip = "10.0.0.10"
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- GRE() /
- IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / GRE()
+ / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2740,11 +2903,13 @@ class TestNAT44EI(MethodHolder):
raise
# out2in
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- GRE() /
- IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=nat_ip)
+ / GRE()
+ / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2760,8 +2925,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_hairpinning_static_unknown_proto(self):
- """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
- """
+ """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -2773,18 +2937,20 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(server.ip4, server_nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# host to server
- p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
- IP(src=host.ip4, dst=server_nat_ip) /
- GRE() /
- IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=host.mac)
+ / IP(src=host.ip4, dst=server_nat_ip)
+ / GRE()
+ / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2800,11 +2966,13 @@ class TestNAT44EI(MethodHolder):
raise
# server to host
- p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
- IP(src=server.ip4, dst=host_nat_ip) /
- GRE() /
- IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=server.mac)
+ / IP(src=server.ip4, dst=host_nat_ip)
+ / GRE()
+ / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2820,10 +2988,11 @@ class TestNAT44EI(MethodHolder):
raise
def test_output_feature(self):
- """ NAT44EI output feature (in2out postrouting) """
+ """NAT44EI output feature (in2out postrouting)"""
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg3.sw_if_index, is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg0, self.pg3)
@@ -2850,25 +3019,32 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_no_translation(capture, self.pg2, self.pg0)
def test_output_feature_vrf_aware(self):
- """ NAT44EI output feature VRF aware (in2out postrouting) """
+ """NAT44EI output feature VRF aware (in2out postrouting)"""
nat_ip_vrf10 = "10.0.0.10"
nat_ip_vrf20 = "10.0.0.20"
- r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)],
- table_id=10)
- r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)],
- table_id=20)
+ r1 = VppIpRoute(
+ self,
+ self.pg3.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+ table_id=10,
+ )
+ r2 = VppIpRoute(
+ self,
+ self.pg3.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+ table_id=20,
+ )
r1.add_vpp_config()
r2.add_vpp_config()
self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg3.sw_if_index, is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# in2out VRF 10
pkts = self.create_stream_in(self.pg4, self.pg3)
@@ -2903,7 +3079,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg6)
def test_output_feature_hairpinning(self):
- """ NAT44EI output feature hairpinning (in2out postrouting) """
+ """NAT44EI output feature hairpinning (in2out postrouting)"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
host_in_port = 1234
@@ -2913,19 +3089,27 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg0.sw_if_index, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg1.sw_if_index, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2945,9 +3129,11 @@ class TestNAT44EI(MethodHolder):
raise
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2966,7 +3152,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_one_armed_nat44(self):
- """ NAT44EI One armed NAT """
+ """NAT44EI One armed NAT"""
remote_host = self.pg9.remote_hosts[0]
local_host = self.pg9.remote_hosts[1]
external_port = 0
@@ -2974,16 +3160,18 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg9.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg9.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg9.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
+ )
# in2out
- p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
- IP(src=local_host.ip4, dst=remote_host.ip4) /
- TCP(sport=12345, dport=80))
+ p = (
+ Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
+ / IP(src=local_host.ip4, dst=remote_host.ip4)
+ / TCP(sport=12345, dport=80)
+ )
self.pg9.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3003,9 +3191,11 @@ class TestNAT44EI(MethodHolder):
raise
# out2in
- p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
- IP(src=remote_host.ip4, dst=self.nat_addr) /
- TCP(sport=80, dport=external_port))
+ p = (
+ Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
+ / IP(src=remote_host.ip4, dst=self.nat_addr)
+ / TCP(sport=80, dport=external_port)
+ )
self.pg9.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3028,21 +3218,21 @@ class TestNAT44EI(MethodHolder):
else:
node = "nat44-ei-classify"
- err = self.statistics.get_err_counter('/err/%s/next in2out' % node)
+ err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
self.assertEqual(err, 1)
- err = self.statistics.get_err_counter('/err/%s/next out2in' % node)
+ err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
self.assertEqual(err, 1)
def test_del_session(self):
- """ NAT44EI delete session """
+ """NAT44EI delete session"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -3057,12 +3247,14 @@ class TestNAT44EI(MethodHolder):
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
protocol=sessions[0].protocol,
- flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.vapi.nat44_ei_del_session(
address=sessions[1].outside_ip_address,
port=sessions[1].outside_port,
- protocol=sessions[1].protocol)
+ protocol=sessions[1].protocol,
+ )
sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(nsessions - len(sessions), 2)
@@ -3071,60 +3263,56 @@ class TestNAT44EI(MethodHolder):
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
protocol=sessions[0].protocol,
- flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.verify_no_nat44_user()
def test_frag_in_order(self):
- """ NAT44EI translate fragments arriving in order """
+ """NAT44EI translate fragments arriving in order"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.frag_in_order(proto=IP_PROTOS.tcp)
self.frag_in_order(proto=IP_PROTOS.udp)
self.frag_in_order(proto=IP_PROTOS.icmp)
def test_frag_forwarding(self):
- """ NAT44EI forwarding fragment test """
+ """NAT44EI forwarding fragment test"""
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg1.sw_if_index)
+ is_add=1, sw_if_index=self.pg1.sw_if_index
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
data = b"A" * 16 + b"B" * 16 + b"C" * 3
- pkts = self.create_stream_frag(self.pg1,
- self.pg0.remote_ip4,
- 4789,
- 4789,
- data,
- proto=IP_PROTOS.udp)
+ pkts = self.create_stream_frag(
+ self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
self.assertEqual(p[UDP].sport, 4789)
self.assertEqual(p[UDP].dport, 4789)
self.assertEqual(data, p[Raw].load)
def test_reass_hairpinning(self):
- """ NAT44EI fragments hairpinning """
+ """NAT44EI fragments hairpinning"""
server_addr = self.pg0.remote_hosts[1].ip4
host_in_port = random.randint(1025, 65535)
@@ -3134,63 +3322,85 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server_addr, self.nat_addr,
- server_in_port,
- server_out_port,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(server_addr, self.nat_addr,
- server_in_port,
- server_out_port,
- proto=IP_PROTOS.udp)
+ self.nat44_add_static_mapping(
+ server_addr,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ server_addr,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.udp,
+ )
self.nat44_add_static_mapping(server_addr, self.nat_addr)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.tcp)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.udp)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.icmp)
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.tcp,
+ )
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.udp,
+ )
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.icmp,
+ )
def test_frag_out_of_order(self):
- """ NAT44EI translate fragments arriving out of order """
+ """NAT44EI translate fragments arriving out of order"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.frag_out_of_order(proto=IP_PROTOS.tcp)
self.frag_out_of_order(proto=IP_PROTOS.udp)
self.frag_out_of_order(proto=IP_PROTOS.icmp)
def test_port_restricted(self):
- """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
+ """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=1,
- psid_offset=6,
- psid_length=6,
- psid=10)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=4567, dport=22))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
+ alg=1, psid_offset=6, psid_length=6, psid=10
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=4567, dport=22)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3210,24 +3420,26 @@ class TestNAT44EI(MethodHolder):
raise
def test_port_range(self):
- """ NAT44EI External address port range """
+ """NAT44EI External address port range"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=2,
- start_port=1025,
- end_port=1027)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
+ alg=2, start_port=1025, end_port=1027
+ )
pkts = []
for port in range(0, 5):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1125 + port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=1125 + port)
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -3239,14 +3451,14 @@ class TestNAT44EI(MethodHolder):
self.assertLessEqual(tcp.sport, 1027)
def test_multiple_outside_vrf(self):
- """ NAT44EI Multiple outside VRF """
+ """NAT44EI Multiple outside VRF"""
vrf_id1 = 1
vrf_id2 = 2
self.pg1.unconfig_ip4()
self.pg2.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
self.pg1.set_table_ip4(vrf_id1)
self.pg2.set_table_ip4(vrf_id2)
self.pg1.config_ip4()
@@ -3257,14 +3469,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
try:
# first VRF
@@ -3313,20 +3525,26 @@ class TestNAT44EI(MethodHolder):
self.pg2.resolve_arp()
def test_mss_clamping(self):
- """ NAT44EI TCP MSS clamping """
+ """NAT44EI TCP MSS clamping"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="S", options=[('MSS', 1400)]))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(
+ sport=self.tcp_port_in,
+ dport=self.tcp_external_port,
+ flags="S",
+ options=[("MSS", 1400)],
+ )
+ )
self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
self.pg0.add_stream(p)
@@ -3353,21 +3571,22 @@ class TestNAT44EI(MethodHolder):
self.verify_mss_value(capture[0], 1400)
def test_ha_send(self):
- """ NAT44EI Send HA session synchronization events (active) """
+ """NAT44EI Send HA session synchronization events (active)"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_ha_set_listener(
- ip_address=self.pg3.local_ip4, port=12345, path_mtu=512)
+ ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
+ )
self.vapi.nat44_ei_ha_set_failover(
- ip_address=self.pg3.remote_ip4, port=12346,
- session_refresh_interval=10)
+ ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
+ )
bind_layers(UDP, HANATStateSync, sport=12345)
# create sessions
@@ -3379,7 +3598,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
# active send HA events
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/add-event-send']
+ stats = self.statistics["/nat44-ei/ha/add-event-send"]
self.assertEqual(stats[:, 0].sum(), 3)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3407,23 +3626,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(event.fib_index, 0)
# ACK received events
- ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=seq, flags='ACK',
- thread_index=hanat.thread_index))
+ ack = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
+ )
+ )
self.pg3.add_stream(ack)
self.pg_start()
- stats = self.statistics['/nat44-ei/ha/ack-recv']
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
# delete one session
self.pg_enable_capture(self.pg_interfaces)
self.vapi.nat44_ei_del_session(
- address=self.pg0.remote_ip4, port=self.tcp_port_in,
- protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ address=self.pg0.remote_ip4,
+ port=self.tcp_port_in,
+ protocol=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/del-event-send']
+ stats = self.statistics["/nat44-ei/ha/del-event-send"]
self.assertEqual(stats[:, 0].sum(), 1)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3438,9 +3663,9 @@ class TestNAT44EI(MethodHolder):
# do not send ACK, active retry send HA event again
self.pg_enable_capture(self.pg_interfaces)
self.virtual_sleep(12)
- stats = self.statistics['/nat44-ei/ha/retry-count']
+ stats = self.statistics["/nat44-ei/ha/retry-count"]
self.assertEqual(stats[:, 0].sum(), 3)
- stats = self.statistics['/nat44-ei/ha/missed-count']
+ stats = self.statistics["/nat44-ei/ha/missed-count"]
self.assertEqual(stats[:, 0].sum(), 1)
capture = self.pg3.get_capture(3)
for packet in capture:
@@ -3453,7 +3678,7 @@ class TestNAT44EI(MethodHolder):
self.pg_start()
self.pg0.get_capture(2)
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/refresh-event-send']
+ stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
self.assertEqual(stats[:, 0].sum(), 2)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3480,29 +3705,33 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(event.total_pkts, 2)
self.assertGreater(event.total_bytes, 0)
- stats = self.statistics['/nat44-ei/ha/ack-recv']
- ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=seq, flags='ACK',
- thread_index=hanat.thread_index))
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
+ ack = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
+ )
+ )
self.pg3.add_stream(ack)
self.pg_start()
- stats = self.statistics['/nat44-ei/ha/ack-recv']
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
self.assertEqual(stats[:, 0].sum(), 2)
def test_ha_recv(self):
- """ NAT44EI Receive HA session synchronization events (passive) """
+ """NAT44EI Receive HA session synchronization events (passive)"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_ha_set_listener(ip_address=self.pg3.local_ip4,
- port=12345, path_mtu=512)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_ha_set_listener(
+ ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
+ )
bind_layers(UDP, HANATStateSync, sport=12345)
# this is a bit tricky - HA dictates thread index due to how it's
@@ -3512,11 +3741,12 @@ class TestNAT44EI(MethodHolder):
# IP address) and out2in (based on outside port)
# first choose a thread index which is correct for IP
- thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4,
- self.vpp_worker_count)
+ thread_index = get_nat44_ei_in2out_worker_index(
+ self.pg0.remote_ip4, self.vpp_worker_count
+ )
# now pick a port which is correct for given thread
- port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count))
+ port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
self.udp_port_out = 1024 + random.randint(1, port_per_thread)
if self.vpp_worker_count > 0:
@@ -3524,25 +3754,43 @@ class TestNAT44EI(MethodHolder):
self.udp_port_out += port_per_thread * (thread_index - 1)
# send HA session add events to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=1, events=[
- Event(event_type='add', protocol='tcp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.tcp_port_in, out_port=self.tcp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.tcp_external_port,
- ehn_port=self.tcp_external_port, fib_index=0),
- Event(event_type='add', protocol='udp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.udp_port_in, out_port=self.udp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.udp_external_port,
- ehn_port=self.udp_external_port, fib_index=0)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=1,
+ events=[
+ Event(
+ event_type="add",
+ protocol="tcp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.tcp_port_in,
+ out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port,
+ fib_index=0,
+ ),
+ Event(
+ event_type="add",
+ protocol="udp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.udp_port_in,
+ out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port,
+ fib_index=0,
+ ),
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
@@ -3557,49 +3805,57 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 1)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
self.assertEqual(hanat.thread_index, thread_index)
- stats = self.statistics['/nat44-ei/ha/ack-send']
+ stats = self.statistics["/nat44-ei/ha/ack-send"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics['/nat44-ei/ha/add-event-recv']
+ stats = self.statistics["/nat44-ei/ha/add-event-recv"]
self.assertEqual(stats[:, 0].sum(), 2)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[:, 0].sum(), 1)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 2)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
# there should be 2 sessions created by HA
sessions = self.vapi.nat44_ei_user_session_dump(
- users[0].ip_address, users[0].vrf_id)
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 2)
for session in sessions:
- self.assertEqual(str(session.inside_ip_address),
- self.pg0.remote_ip4)
- self.assertEqual(str(session.outside_ip_address),
- self.nat_addr)
- self.assertIn(session.inside_port,
- [self.tcp_port_in, self.udp_port_in])
- self.assertIn(session.outside_port,
- [self.tcp_port_out, self.udp_port_out])
+ self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
+ self.assertEqual(str(session.outside_ip_address), self.nat_addr)
+ self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
+ self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
# send HA session delete event to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=2, events=[
- Event(event_type='del', protocol='udp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.udp_port_in, out_port=self.udp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.udp_external_port,
- ehn_port=self.udp_external_port, fib_index=0)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=2,
+ events=[
+ Event(
+ event_type="del",
+ protocol="udp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.udp_port_in,
+ out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port,
+ fib_index=0,
+ )
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
@@ -3614,37 +3870,49 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 2)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
# now we should have only 1 session, 1 deleted by HA
- sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address,
- users[0].vrf_id)
+ sessions = self.vapi.nat44_ei_user_session_dump(
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 1)
- stats = self.statistics['/nat44-ei/ha/del-event-recv']
+ stats = self.statistics["/nat44-ei/ha/del-event-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics.get_err_counter(
- '/err/nat44-ei-ha/pkts-processed')
+ stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
self.assertEqual(stats, 2)
# send HA session refresh event to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=3, events=[
- Event(event_type='refresh', protocol='tcp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.tcp_port_in, out_port=self.tcp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.tcp_external_port,
- ehn_port=self.tcp_external_port, fib_index=0,
- total_bytes=1024, total_pkts=2)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=3,
+ events=[
+ Event(
+ event_type="refresh",
+ protocol="tcp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.tcp_port_in,
+ out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port,
+ fib_index=0,
+ total_bytes=1024,
+ total_pkts=2,
+ )
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3658,29 +3926,30 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 3)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
sessions = self.vapi.nat44_ei_user_session_dump(
- users[0].ip_address, users[0].vrf_id)
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 1)
session = sessions[0]
self.assertEqual(session.total_bytes, 1024)
self.assertEqual(session.total_pkts, 2)
- stats = self.statistics['/nat44-ei/ha/refresh-event-recv']
+ stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics.get_err_counter(
- '/err/nat44-ei-ha/pkts-processed')
+ stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
self.assertEqual(stats, 3)
# send packet to test session created by HA
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3706,7 +3975,7 @@ class TestNAT44EI(MethodHolder):
return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
def test_set_frame_queue_nelts(self):
- """ NAT44EI API test - worker handoff frame queue elements """
+ """NAT44EI API test - worker handoff frame queue elements"""
self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
def show_commands_at_teardown(self):
@@ -3718,11 +3987,10 @@ class TestNAT44EI(MethodHolder):
self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
self.logger.info(self.vapi.cli("show nat44 ei ha"))
- self.logger.info(
- self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
+ self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
def test_outside_address_distribution(self):
- """ Outside address distribution based on source address """
+ """Outside address distribution based on source address"""
x = 100
nat_addresses = []
@@ -3733,16 +4001,18 @@ class TestNAT44EI(MethodHolder):
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=nat_addresses[0],
last_ip_address=nat_addresses[-1],
- vrf_id=0xFFFFFFFF, is_add=1)
+ vrf_id=0xFFFFFFFF,
+ is_add=1,
+ )
self.pg0.generate_remote_hosts(x)
@@ -3750,11 +4020,12 @@ class TestNAT44EI(MethodHolder):
for i in range(x):
info = self.create_packet_info(self.pg0, self.pg1)
payload = self.info_to_payload(info)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_hosts[i].ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=7000+i, dport=8000+i) /
- Raw(payload))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=7000 + i, dport=8000 + i)
+ / Raw(payload)
+ )
info.data = p
pkts.append(p)
@@ -3772,21 +4043,23 @@ class TestNAT44EI(MethodHolder):
packed = socket.inet_aton(p_sent[IP].src)
numeric = struct.unpack("!L", packed)[0]
numeric = socket.htonl(numeric)
- a = nat_addresses[(numeric-1) % len(nat_addresses)]
+ a = nat_addresses[(numeric - 1) % len(nat_addresses)]
self.assertEqual(
- a, p_recvd[IP].src,
+ a,
+ p_recvd[IP].src,
"Invalid packet (src IP %s translated to %s, but expected %s)"
- % (p_sent[IP].src, p_recvd[IP].src, a))
+ % (p_sent[IP].src, p_recvd[IP].src, a),
+ )
def test_default_user_sessions(self):
- """ NAT44EI default per-user session limit is used and reported """
+ """NAT44EI default per-user session limit is used and reported"""
nat44_ei_config = self.vapi.nat44_ei_show_running_config()
# a nonzero default should be reported for user_sessions
self.assertNotEqual(nat44_ei_config.user_sessions, 0)
class TestNAT44Out2InDPO(MethodHolder):
- """ NAT44EI Test Cases using out2in DPO """
+ """NAT44EI Test Cases using out2in DPO"""
@classmethod
def setUpClass(cls):
@@ -3799,8 +4072,8 @@ class TestNAT44Out2InDPO(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
- cls.dst_ip4 = '192.168.70.1'
+ cls.nat_addr = "10.0.0.3"
+ cls.dst_ip4 = "192.168.70.1"
cls.create_pg_interfaces(range(2))
@@ -3812,10 +4085,13 @@ class TestNAT44Out2InDPO(MethodHolder):
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
- r1 = VppIpRoute(cls, "::", 0,
- [VppRoutePath(cls.pg1.remote_ip6,
- cls.pg1.sw_if_index)],
- register=False)
+ r1 = VppIpRoute(
+ cls,
+ "::",
+ 0,
+ [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
+ register=False,
+ )
r1.add_vpp_config()
def setUp(self):
@@ -3830,37 +4106,44 @@ class TestNAT44Out2InDPO(MethodHolder):
self.vapi.cli("clear logging")
def configure_xlat(self):
- self.dst_ip6_pfx = '1:2:3::'
- self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
- self.dst_ip6_pfx)
+ self.dst_ip6_pfx = "1:2:3::"
+ self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
self.dst_ip6_pfx_len = 96
- self.src_ip6_pfx = '4:5:6::'
- self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
- self.src_ip6_pfx)
+ self.src_ip6_pfx = "4:5:6::"
+ self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
self.src_ip6_pfx_len = 96
- self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
- self.src_ip6_pfx_n, self.src_ip6_pfx_len,
- '\x00\x00\x00\x00', 0)
-
- @unittest.skip('Temporary disabled')
+ self.vapi.map_add_domain(
+ self.dst_ip6_pfx_n,
+ self.dst_ip6_pfx_len,
+ self.src_ip6_pfx_n,
+ self.src_ip6_pfx_len,
+ "\x00\x00\x00\x00",
+ 0,
+ )
+
+ @unittest.skip("Temporary disabled")
def test_464xlat_ce(self):
- """ Test 464XLAT CE with NAT44EI """
+ """Test 464XLAT CE with NAT44EI"""
self.configure_xlat()
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=self.nat_addr_n,
last_ip_address=self.nat_addr_n,
- vrf_id=0xFFFFFFFF, is_add=1)
+ vrf_id=0xFFFFFFFF,
+ is_add=1,
+ )
- out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
- self.dst_ip6_pfx_len)
- out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
- self.src_ip6_pfx_len)
+ out_src_ip6 = self.compose_ip6(
+ self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
+ )
+ out_dst_ip6 = self.compose_ip6(
+ self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
+ )
try:
pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
@@ -3868,11 +4151,9 @@ class TestNAT44Out2InDPO(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
- dst_ip=out_src_ip6)
+ self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
- pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
- out_dst_ip6)
+ pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3880,31 +4161,35 @@ class TestNAT44Out2InDPO(MethodHolder):
self.verify_capture_in(capture, self.pg0)
finally:
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags)
+ sw_if_index=self.pg0.sw_if_index, flags=flags
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=self.nat_addr_n,
last_ip_address=self.nat_addr_n,
- vrf_id=0xFFFFFFFF)
+ vrf_id=0xFFFFFFFF,
+ )
- @unittest.skip('Temporary disabled')
+ @unittest.skip("Temporary disabled")
def test_464xlat_ce_no_nat(self):
- """ Test 464XLAT CE without NAT44EI """
+ """Test 464XLAT CE without NAT44EI"""
self.configure_xlat()
- out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
- self.dst_ip6_pfx_len)
- out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
- self.src_ip6_pfx_len)
+ out_src_ip6 = self.compose_ip6(
+ self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
+ )
+ out_dst_ip6 = self.compose_ip6(
+ self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
+ )
pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
- nat_ip=out_dst_ip6, same_port=True)
+ self.verify_capture_out_ip6(
+ capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
+ )
pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
self.pg1.add_stream(pkts)
@@ -3915,7 +4200,8 @@ class TestNAT44Out2InDPO(MethodHolder):
class TestNAT44EIMW(MethodHolder):
- """ NAT44EI Test Cases (multiple workers) """
+ """NAT44EI Test Cases (multiple workers)"""
+
vpp_worker_count = 2
max_translations = 10240
max_users = 10240
@@ -3931,7 +4217,7 @@ class TestNAT44EIMW(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
+ cls.nat_addr = "10.0.0.3"
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
cls.tcp_external_port = 80
@@ -3952,8 +4238,8 @@ class TestNAT44EIMW(MethodHolder):
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
@@ -3975,8 +4261,8 @@ class TestNAT44EIMW(MethodHolder):
cls.pg9.generate_remote_hosts(2)
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg9.sw_if_index,
- prefix="10.0.0.1/24")
+ sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
+ )
cls.pg9.admin_up()
cls.pg9.resolve_arp()
@@ -3987,16 +4273,15 @@ class TestNAT44EIMW(MethodHolder):
def setUp(self):
super(TestNAT44EIMW, self).setUp()
self.vapi.nat44_ei_plugin_enable_disable(
- sessions=self.max_translations,
- users=self.max_users, enable=1)
+ sessions=self.max_translations, users=self.max_users, enable=1
+ )
def tearDown(self):
super(TestNAT44EIMW, self).tearDown()
if not self.vpp_dead:
self.vapi.nat44_ei_ipfix_enable_disable(
- domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=0)
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
+ )
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
@@ -4004,7 +4289,7 @@ class TestNAT44EIMW(MethodHolder):
self.vapi.cli("clear logging")
def test_hairpinning(self):
- """ NAT44EI hairpinning - 1:1 NAPT """
+ """NAT44EI hairpinning - 1:1 NAPT"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -4018,22 +4303,28 @@ class TestNAT44EIMW(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
-
- cnt = self.statistics['/nat44-ei/hairpinning']
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -4052,15 +4343,17 @@ class TestNAT44EIMW(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -4078,13 +4371,13 @@ class TestNAT44EIMW(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
def test_hairpinning2(self):
- """ NAT44EI hairpinning - 1:1 NAT"""
+ """NAT44EI hairpinning - 1:1 NAT"""
server1_nat_ip = "10.0.0.10"
server2_nat_ip = "10.0.0.11"
@@ -4097,11 +4390,11 @@ class TestNAT44EIMW(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for servers
self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
@@ -4109,17 +4402,23 @@ class TestNAT44EIMW(MethodHolder):
# host to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4147,17 +4446,23 @@ class TestNAT44EIMW(MethodHolder):
# server1 to host
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4182,17 +4487,23 @@ class TestNAT44EIMW(MethodHolder):
# server2 to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4220,17 +4531,23 @@ class TestNAT44EIMW(MethodHolder):
# server1 to server2
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4254,5 +4571,5 @@ class TestNAT44EIMW(MethodHolder):
raise
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)