aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat44_ei.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_nat44_ei.py')
-rw-r--r--test/test_nat44_ei.py2521
1 files changed, 1419 insertions, 1102 deletions
diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py
index aafd345f43f..9eb127aaf0b 100644
--- a/test/test_nat44_ei.py
+++ b/test/test_nat44_ei.py
@@ -10,9 +10,19 @@ from io import BytesIO
import scapy.compat
from framework import VppTestCase, VppTestRunner
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
-from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
- IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
- PacketListField
+from scapy.all import (
+ bind_layers,
+ Packet,
+ ByteEnumField,
+ ShortField,
+ IPField,
+ IntField,
+ LongField,
+ XByteField,
+ FlagsField,
+ FieldLenField,
+ PacketListField,
+)
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
@@ -30,22 +40,22 @@ from vpp_papi import VppEnum
# NAT HA protocol event data
class Event(Packet):
name = "Event"
- fields_desc = [ByteEnumField("event_type", None,
- {1: "add", 2: "del", 3: "refresh"}),
- ByteEnumField("protocol", None,
- {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
- ShortField("flags", 0),
- IPField("in_addr", None),
- IPField("out_addr", None),
- ShortField("in_port", None),
- ShortField("out_port", None),
- IPField("eh_addr", None),
- IPField("ehn_addr", None),
- ShortField("eh_port", None),
- ShortField("ehn_port", None),
- IntField("fib_index", None),
- IntField("total_pkts", 0),
- LongField("total_bytes", 0)]
+ fields_desc = [
+ ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
+ ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
+ ShortField("flags", 0),
+ IPField("in_addr", None),
+ IPField("out_addr", None),
+ ShortField("in_port", None),
+ ShortField("out_port", None),
+ IPField("eh_addr", None),
+ IPField("ehn_addr", None),
+ ShortField("eh_port", None),
+ ShortField("ehn_port", None),
+ IntField("fib_index", None),
+ IntField("total_pkts", 0),
+ LongField("total_bytes", 0),
+ ]
def extract_padding(self, s):
return "", s
@@ -54,17 +64,18 @@ class Event(Packet):
# NAT HA protocol header
class HANATStateSync(Packet):
name = "HA NAT state sync"
- fields_desc = [XByteField("version", 1),
- FlagsField("flags", 0, 8, ['ACK']),
- FieldLenField("count", None, count_of="events"),
- IntField("sequence_number", 1),
- IntField("thread_index", 0),
- PacketListField("events", [], Event,
- count_from=lambda pkt: pkt.count)]
+ fields_desc = [
+ XByteField("version", 1),
+ FlagsField("flags", 0, 8, ["ACK"]),
+ FieldLenField("count", None, count_of="events"),
+ IntField("sequence_number", 1),
+ IntField("thread_index", 0),
+ PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
+ ]
class MethodHolder(VppTestCase):
- """ NAT create capture and verify method holder """
+ """NAT create capture and verify method holder"""
@property
def config_flags(self):
@@ -74,10 +85,19 @@ class MethodHolder(VppTestCase):
def SYSLOG_SEVERITY(self):
return VppEnum.vl_api_syslog_severity_t
- def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
- local_port=0, external_port=0, vrf_id=0,
- is_add=1, external_sw_if_index=0xFFFFFFFF,
- proto=0, tag="", flags=0):
+ def nat44_add_static_mapping(
+ self,
+ local_ip,
+ external_ip="0.0.0.0",
+ local_port=0,
+ external_port=0,
+ vrf_id=0,
+ is_add=1,
+ external_sw_if_index=0xFFFFFFFF,
+ proto=0,
+ tag="",
+ flags=0,
+ ):
"""
Add/delete NAT44EI static mapping
@@ -103,9 +123,11 @@ class MethodHolder(VppTestCase):
external_sw_if_index=external_sw_if_index,
local_port=local_port,
external_port=external_port,
- vrf_id=vrf_id, protocol=proto,
+ vrf_id=vrf_id,
+ protocol=proto,
flags=flags,
- tag=tag)
+ tag=tag,
+ )
def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
"""
@@ -114,31 +136,40 @@ class MethodHolder(VppTestCase):
:param ip: IP address
:param is_add: 1 if add, 0 if delete (Default add)
"""
- self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip,
- last_ip_address=ip,
- vrf_id=vrf_id,
- is_add=is_add)
+ self.vapi.nat44_ei_add_del_address_range(
+ first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
+ )
def create_routes_and_neigbors(self):
- r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
- [VppRoutePath(self.pg7.remote_ip4,
- self.pg7.sw_if_index)])
- r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
- [VppRoutePath(self.pg8.remote_ip4,
- self.pg8.sw_if_index)])
+ r1 = VppIpRoute(
+ self,
+ self.pg7.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
+ )
+ r2 = VppIpRoute(
+ self,
+ self.pg8.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
+ )
r1.add_vpp_config()
r2.add_vpp_config()
- n1 = VppNeighbor(self,
- self.pg7.sw_if_index,
- self.pg7.remote_mac,
- self.pg7.remote_ip4,
- is_static=1)
- n2 = VppNeighbor(self,
- self.pg8.sw_if_index,
- self.pg8.remote_mac,
- self.pg8.remote_ip4,
- is_static=1)
+ n1 = VppNeighbor(
+ self,
+ self.pg7.sw_if_index,
+ self.pg7.remote_mac,
+ self.pg7.remote_ip4,
+ is_static=1,
+ )
+ n2 = VppNeighbor(
+ self,
+ self.pg8.sw_if_index,
+ self.pg8.remote_mac,
+ self.pg8.remote_ip4,
+ is_static=1,
+ )
n1.add_vpp_config()
n2.add_vpp_config()
@@ -156,21 +187,27 @@ class MethodHolder(VppTestCase):
pkts = []
# TCP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(sport=self.tcp_port_in, dport=20))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / TCP(sport=self.tcp_port_in, dport=20)
+ )
pkts.extend([p, p])
# UDP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(sport=self.udp_port_in, dport=20))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / UDP(sport=self.udp_port_in, dport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
return pkts
@@ -216,11 +253,10 @@ class MethodHolder(VppTestCase):
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
- def create_stream_out(self, out_if, dst_ip=None, ttl=64,
- use_inside_ports=False):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
"""
Create packet stream for outside network
@@ -242,21 +278,27 @@ class MethodHolder(VppTestCase):
icmp_id = self.icmp_id_in
pkts = []
# TCP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=tcp_port, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / TCP(dport=tcp_port, sport=20)
+ )
pkts.extend([p, p])
# UDP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=udp_port, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / UDP(dport=udp_port, sport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=icmp_id, type='echo-reply'))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / ICMP(id=icmp_id, type="echo-reply")
+ )
pkts.append(p)
return pkts
@@ -271,27 +313,40 @@ class MethodHolder(VppTestCase):
"""
pkts = []
# TCP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- TCP(dport=self.tcp_port_out, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / TCP(dport=self.tcp_port_out, sport=20)
+ )
pkts.append(p)
# UDP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- UDP(dport=self.udp_port_out, sport=20))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / UDP(dport=self.udp_port_out, sport=20)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
- ICMPv6EchoReply(id=self.icmp_id_out))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
+ / ICMPv6EchoReply(id=self.icmp_id_out)
+ )
pkts.append(p)
return pkts
- def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- dst_ip=None, is_ip6=False, ignore_port=False):
+ def verify_capture_out(
+ self,
+ capture,
+ nat_ip=None,
+ same_port=False,
+ dst_ip=None,
+ is_ip6=False,
+ ignore_port=False,
+ ):
"""
Verify captured packets on outside network
@@ -319,39 +374,33 @@ class MethodHolder(VppTestCase):
if packet.haslayer(TCP):
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
else:
- self.assertNotEqual(
- packet[TCP].sport, self.tcp_port_in)
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
self.assert_packet_checksums_valid(packet)
elif packet.haslayer(UDP):
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
- self.assertNotEqual(
- packet[UDP].sport, self.udp_port_in)
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
self.udp_port_out = packet[UDP].sport
else:
if not ignore_port:
if same_port:
- self.assertEqual(
- packet[ICMP46].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
else:
- self.assertNotEqual(
- packet[ICMP46].id, self.icmp_id_in)
+ self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP46].id
self.assert_packet_checksums_valid(packet)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
- def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
- dst_ip=None):
+ def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
"""
Verify captured packets on outside network
@@ -360,8 +409,7 @@ class MethodHolder(VppTestCase):
:param same_port: Source port number is not translated (Default False)
:param dst_ip: Destination IP address (Default do not verify)
"""
- return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
- True)
+ return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
def verify_capture_in(self, capture, in_if):
"""
@@ -381,8 +429,9 @@ class MethodHolder(VppTestCase):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
def verify_capture_no_translation(self, capture, ingress_if, egress_if):
@@ -404,12 +453,12 @@ class MethodHolder(VppTestCase):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
- def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
- icmp_type=11):
+ def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
"""
Verify captured packets with ICMP errors on outside network
@@ -430,16 +479,15 @@ class MethodHolder(VppTestCase):
self.assertTrue(icmp.haslayer(IPerror))
inner_ip = icmp[IPerror]
if inner_ip.haslayer(TCPerror):
- self.assertEqual(inner_ip[TCPerror].dport,
- self.tcp_port_out)
+ self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
elif inner_ip.haslayer(UDPerror):
- self.assertEqual(inner_ip[UDPerror].dport,
- self.udp_port_out)
+ self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
else:
self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
@@ -460,20 +508,20 @@ class MethodHolder(VppTestCase):
self.assertTrue(icmp.haslayer(IPerror))
inner_ip = icmp[IPerror]
if inner_ip.haslayer(TCPerror):
- self.assertEqual(inner_ip[TCPerror].sport,
- self.tcp_port_in)
+ self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
elif inner_ip.haslayer(UDPerror):
- self.assertEqual(inner_ip[UDPerror].sport,
- self.udp_port_in)
+ self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
else:
self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
raise
- def create_stream_frag(self, src_if, dst, sport, dport, data,
- proto=IP_PROTOS.tcp, echo_reply=False):
+ def create_stream_frag(
+ self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
+ ):
"""
Create fragmented packet stream
@@ -487,9 +535,11 @@ class MethodHolder(VppTestCase):
:returns: Fragments
"""
if proto == IP_PROTOS.tcp:
- p = (IP(src=src_if.remote_ip4, dst=dst) /
- TCP(sport=sport, dport=dport) /
- Raw(data))
+ p = (
+ IP(src=src_if.remote_ip4, dst=dst)
+ / TCP(sport=sport, dport=dport)
+ / Raw(data)
+ )
p = p.__class__(scapy.compat.raw(p))
chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
@@ -497,9 +547,9 @@ class MethodHolder(VppTestCase):
proto_header = UDP(sport=sport, dport=dport)
elif proto == IP_PROTOS.icmp:
if not echo_reply:
- proto_header = ICMP(id=sport, type='echo-request')
+ proto_header = ICMP(id=sport, type="echo-request")
else:
- proto_header = ICMP(id=sport, type='echo-reply')
+ proto_header = ICMP(id=sport, type="echo-reply")
else:
raise Exception("Unsupported protocol")
id = random.randint(0, 65535)
@@ -508,28 +558,32 @@ class MethodHolder(VppTestCase):
raw = Raw(data[0:4])
else:
raw = Raw(data[0:16])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
- proto_header /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
+ / proto_header
+ / raw
+ )
pkts.append(p)
if proto == IP_PROTOS.tcp:
raw = Raw(data[4:20])
else:
raw = Raw(data[16:32])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
- proto=proto) /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
+ / raw
+ )
pkts.append(p)
if proto == IP_PROTOS.tcp:
raw = Raw(data[20:])
else:
raw = Raw(data[32:])
- p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
- IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
- id=id) /
- raw)
+ p = (
+ Ether(src=src_if.remote_mac, dst=src_if.local_mac)
+ / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
+ / raw
+ )
pkts.append(p)
return pkts
@@ -550,17 +604,15 @@ class MethodHolder(VppTestCase):
self.assert_ip_checksum_valid(p)
buffer.seek(p[IP].frag * 8)
buffer.write(bytes(p[IP].payload))
- ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
- proto=frags[0][IP].proto)
+ ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
if ip.proto == IP_PROTOS.tcp:
- p = (ip / TCP(buffer.getvalue()))
+ p = ip / TCP(buffer.getvalue())
self.logger.debug(ppp("Reassembled:", p))
self.assert_tcp_checksum_valid(p)
elif ip.proto == IP_PROTOS.udp:
- p = (ip / UDP(buffer.getvalue()[:8]) /
- Raw(buffer.getvalue()[8:]))
+ p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
elif ip.proto == IP_PROTOS.icmp:
- p = (ip / ICMP(buffer.getvalue()))
+ p = ip / ICMP(buffer.getvalue())
return p
def verify_ipfix_nat44_ses(self, data):
@@ -580,29 +632,24 @@ class MethodHolder(VppTestCase):
else:
nat44_ses_delete_num += 1
# sourceIPv4Address
- self.assertEqual(self.pg0.remote_ip4,
- str(ipaddress.IPv4Address(record[8])))
+ self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
# postNATSourceIPv4Address
- self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
- record[225])
+ self.assertEqual(
+ socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
+ )
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort
# /postNAPTSourceTransportPort
if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
- self.assertEqual(struct.pack("!H", self.icmp_id_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
- self.assertEqual(struct.pack("!H", self.tcp_port_in),
- record[7])
- self.assertEqual(struct.pack("!H", self.tcp_port_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
+ self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
- self.assertEqual(struct.pack("!H", self.udp_port_in),
- record[7])
- self.assertEqual(struct.pack("!H", self.udp_port_out),
- record[227])
+ self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
+ self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
else:
self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
self.assertEqual(3, nat44_ses_create_num)
@@ -627,16 +674,16 @@ class MethodHolder(VppTestCase):
self.assertEqual(struct.pack("!I", limit), record[471])
def verify_no_nat44_user(self):
- """ Verify that there is no NAT44EI user """
+ """Verify that there is no NAT44EI user"""
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 0)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[0][0], 0)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[0][0], 0)
def verify_syslog_apmap(self, data, is_add=True):
- message = data.decode('utf-8')
+ message = data.decode("utf-8")
try:
message = SyslogMessage.parse(message)
except ParseError as e:
@@ -644,26 +691,26 @@ class MethodHolder(VppTestCase):
raise
else:
self.assertEqual(message.severity, SyslogSeverity.info)
- self.assertEqual(message.appname, 'NAT')
- self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
- sd_params = message.sd.get('napmap')
+ self.assertEqual(message.appname, "NAT")
+ self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
+ sd_params = message.sd.get("napmap")
self.assertTrue(sd_params is not None)
- self.assertEqual(sd_params.get('IATYP'), 'IPv4')
- self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
- self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
- self.assertEqual(sd_params.get('XATYP'), 'IPv4')
- self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
- self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
- self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
- self.assertTrue(sd_params.get('SSUBIX') is not None)
- self.assertEqual(sd_params.get('SVLAN'), '0')
+ self.assertEqual(sd_params.get("IATYP"), "IPv4")
+ self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
+ self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
+ self.assertEqual(sd_params.get("XATYP"), "IPv4")
+ self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
+ self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
+ self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
+ self.assertTrue(sd_params.get("SSUBIX") is not None)
+ self.assertEqual(sd_params.get("SVLAN"), "0")
def verify_mss_value(self, pkt, mss):
if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
raise TypeError("Not a TCP/IP packet")
for option in pkt[TCP].options:
- if option[0] == 'MSS':
+ if option[0] == "MSS":
self.assertEqual(option[1], mss)
self.assert_tcp_checksum_valid(pkt)
@@ -678,8 +725,9 @@ class MethodHolder(VppTestCase):
else:
raise Exception("Unsupported protocol")
- def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
- ignore_port=False):
+ def frag_in_order(
+ self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
+ ):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
@@ -689,20 +737,19 @@ class MethodHolder(VppTestCase):
self.port_in = random.randint(1025, 65535)
# in2out
- pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
- self.port_in, 20, data, proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
+ )
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg1.get_capture(len(pkts))
if not dont_translate:
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
else:
- p = self.reass_frags_and_verify(frags,
- self.pg0.remote_ip4,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg0.remote_ip4, self.pg1.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
if not dont_translate:
self.assertEqual(p[layer].dport, 20)
@@ -729,15 +776,14 @@ class MethodHolder(VppTestCase):
else:
sport = p[layer].id
dport = 0
- pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
- proto, echo_reply=True)
+ pkts = self.create_stream_frag(
+ self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
if proto != IP_PROTOS.icmp:
self.assertEqual(p[layer].sport, 20)
self.assertEqual(p[layer].dport, self.port_in)
@@ -745,9 +791,15 @@ class MethodHolder(VppTestCase):
self.assertEqual(p[layer].id, self.port_in)
self.assertEqual(data, p[Raw].load)
- def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.tcp,
- ignore_port=False):
+ def reass_hairpinning(
+ self,
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.tcp,
+ ignore_port=False,
+ ):
layer = self.proto2layer(proto)
@@ -757,19 +809,14 @@ class MethodHolder(VppTestCase):
data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
- pkts = self.create_stream_frag(self.pg0,
- self.nat_addr,
- host_in_port,
- server_out_port,
- data,
- proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
+ )
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- server_addr)
+ p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
if proto != IP_PROTOS.icmp:
if not ignore_port:
self.assertNotEqual(p[layer].sport, host_in_port)
@@ -779,8 +826,9 @@ class MethodHolder(VppTestCase):
self.assertNotEqual(p[layer].id, host_in_port)
self.assertEqual(data, p[Raw].load)
- def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
- ignore_port=False):
+ def frag_out_of_order(
+ self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
+ ):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
@@ -791,21 +839,22 @@ class MethodHolder(VppTestCase):
for i in range(2):
# in2out
- pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
- self.port_in, 20, data, proto)
+ pkts = self.create_stream_frag(
+ self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
+ )
pkts.reverse()
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg1.get_capture(len(pkts))
if not dont_translate:
- p = self.reass_frags_and_verify(frags,
- self.nat_addr,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.nat_addr, self.pg1.remote_ip4
+ )
else:
- p = self.reass_frags_and_verify(frags,
- self.pg0.remote_ip4,
- self.pg1.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg0.remote_ip4, self.pg1.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
if not dont_translate:
self.assertEqual(p[layer].dport, 20)
@@ -832,16 +881,17 @@ class MethodHolder(VppTestCase):
else:
sport = p[layer].id
dport = 0
- pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
- data, proto, echo_reply=True)
+ pkts = self.create_stream_frag(
+ self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
+ )
pkts.reverse()
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(
+ frags, self.pg1.remote_ip4, self.pg0.remote_ip4
+ )
if proto != IP_PROTOS.icmp:
self.assertEqual(p[layer].sport, 20)
self.assertEqual(p[layer].dport, self.port_in)
@@ -861,7 +911,7 @@ def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
class TestNAT44EI(MethodHolder):
- """ NAT44EI Test Cases """
+ """NAT44EI Test Cases"""
max_translations = 10240
max_users = 10240
@@ -877,7 +927,7 @@ class TestNAT44EI(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
+ cls.nat_addr = "10.0.0.3"
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
cls.tcp_external_port = 80
@@ -898,8 +948,8 @@ class TestNAT44EI(MethodHolder):
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
@@ -921,8 +971,8 @@ class TestNAT44EI(MethodHolder):
cls.pg9.generate_remote_hosts(2)
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg9.sw_if_index,
- prefix="10.0.0.1/24")
+ sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
+ )
cls.pg9.admin_up()
cls.pg9.resolve_arp()
@@ -932,8 +982,8 @@ class TestNAT44EI(MethodHolder):
def plugin_enable(self):
self.vapi.nat44_ei_plugin_enable_disable(
- sessions=self.max_translations,
- users=self.max_users, enable=1)
+ sessions=self.max_translations, users=self.max_users, enable=1
+ )
def setUp(self):
super(TestNAT44EI, self).setUp()
@@ -943,8 +993,8 @@ class TestNAT44EI(MethodHolder):
super(TestNAT44EI, self).tearDown()
if not self.vpp_dead:
self.vapi.nat44_ei_ipfix_enable_disable(
- domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port,
- enable=0)
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
+ )
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
@@ -952,16 +1002,16 @@ class TestNAT44EI(MethodHolder):
self.vapi.cli("clear logging")
def test_clear_sessions(self):
- """ NAT44EI session clearing test """
+ """NAT44EI session clearing test"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -970,32 +1020,32 @@ class TestNAT44EI(MethodHolder):
capture = self.pg1.get_capture(len(pkts))
self.verify_capture_out(capture)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
self.logger.info("sessions before clearing: %s" % sessions[0][0])
self.vapi.cli("clear nat44 ei sessions")
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
self.logger.info("sessions after clearing: %s" % sessions[0][0])
def test_dynamic(self):
- """ NAT44EI dynamic translation test """
+ """NAT44EI dynamic translation test"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
- tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp']
- udpn = self.statistics['/nat44-ei/in2out/slowpath/udp']
- icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp']
- drops = self.statistics['/nat44-ei/in2out/slowpath/drops']
+ tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
+ udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
+ icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
+ drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -1005,20 +1055,20 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
if_idx = self.pg0.sw_if_index
- cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/udp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/in2out/slowpath/drops']
+ cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
# out2in
- tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp']
- udpn = self.statistics['/nat44-ei/out2in/slowpath/udp']
- icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp']
- drops = self.statistics['/nat44-ei/out2in/slowpath/drops']
+ tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
+ udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
+ icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
+ drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
pkts = self.create_stream_out(self.pg1)
self.pg1.add_stream(pkts)
@@ -1028,31 +1078,31 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
if_idx = self.pg1.sw_if_index
- cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/udp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
- cnt = self.statistics['/nat44-ei/out2in/slowpath/drops']
+ cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[:, 0].sum(), 1)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 3)
def test_dynamic_icmp_errors_in2out_ttl_1(self):
- """ NAT44EI handling of client packets with TTL=1 """
+ """NAT44EI handling of client packets with TTL=1"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - generate traffic
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
@@ -1062,16 +1112,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_1(self):
- """ NAT44EI handling of server packets with TTL=1 """
+ """NAT44EI handling of server packets with TTL=1"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - create sessions
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1086,21 +1136,19 @@ class TestNAT44EI(MethodHolder):
capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
# Server side - verify ICMP type 11 packets
- self.verify_capture_out_with_icmp_errors(capture,
- src_ip=self.pg1.local_ip4)
+ self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
def test_dynamic_icmp_errors_in2out_ttl_2(self):
- """ NAT44EI handling of error responses to client packets with TTL=2
- """
+ """NAT44EI handling of error responses to client packets with TTL=2"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - generate traffic
pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
@@ -1110,9 +1158,13 @@ class TestNAT44EI(MethodHolder):
# Server side - simulate ICMP type 11 response
capture = self.pg1.get_capture(len(pkts))
- pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- ICMP(type=11) / packet[IP] for packet in capture]
+ pkts = [
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / ICMP(type=11)
+ / packet[IP]
+ for packet in capture
+ ]
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1122,17 +1174,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in_with_icmp_errors(capture, self.pg0)
def test_dynamic_icmp_errors_out2in_ttl_2(self):
- """ NAT44EI handling of error responses to server packets with TTL=2
- """
+ """NAT44EI handling of error responses to server packets with TTL=2"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# Client side - create sessions
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1150,9 +1201,13 @@ class TestNAT44EI(MethodHolder):
# Client side - simulate ICMP type 11 response
capture = self.pg0.get_capture(len(pkts))
- pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- ICMP(type=11) / packet[IP] for packet in capture]
+ pkts = [
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / ICMP(type=11)
+ / packet[IP]
+ for packet in capture
+ ]
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1162,20 +1217,22 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out_with_icmp_errors(capture)
def test_ping_out_interface_from_outside(self):
- """ NAT44EI ping out interface from outside network """
+ """NAT44EI ping out interface from outside network"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
- ICMP(id=self.icmp_id_out, type='echo-request'))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
+ / ICMP(id=self.icmp_id_out, type="echo-request")
+ )
pkts = [p]
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1188,26 +1245,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
self.assertEqual(packet[ICMP].type, 0) # echo reply
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
def test_ping_internal_host_from_outside(self):
- """ NAT44EI ping internal host from outside network """
+ """NAT44EI ping internal host from outside network"""
self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# out2in
- pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
- ICMP(id=self.icmp_id_out, type='echo-request'))
+ pkt = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
+ / ICMP(id=self.icmp_id_out, type="echo-request")
+ )
self.pg1.add_stream(pkt)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1216,9 +1276,11 @@ class TestNAT44EI(MethodHolder):
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
# in2out
- pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
- ICMP(id=self.icmp_id_in, type='echo-reply'))
+ pkt = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
+ / ICMP(id=self.icmp_id_in, type="echo-reply")
+ )
self.pg0.add_stream(pkt)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1227,25 +1289,27 @@ class TestNAT44EI(MethodHolder):
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
def test_forwarding(self):
- """ NAT44EI forwarding test """
+ """NAT44EI forwarding test"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
real_ip = self.pg0.remote_ip4
alias_ip = self.nat_addr
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_static_mapping(
- is_add=1, local_ip_address=real_ip,
+ is_add=1,
+ local_ip_address=real_ip,
external_ip_address=alias_ip,
external_sw_if_index=0xFFFFFFFF,
- flags=flags)
+ flags=flags,
+ )
try:
# static mapping match
@@ -1269,9 +1333,9 @@ class TestNAT44EI(MethodHolder):
host0 = self.pg0.remote_hosts[0]
self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
try:
- pkts = self.create_stream_out(self.pg1,
- dst_ip=self.pg0.remote_ip4,
- use_inside_ports=True)
+ pkts = self.create_stream_out(
+ self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1283,8 +1347,9 @@ class TestNAT44EI(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
- same_port=True)
+ self.verify_capture_out(
+ capture, nat_ip=self.pg0.remote_ip4, same_port=True
+ )
finally:
self.pg0.remote_hosts[0] = host0
@@ -1296,10 +1361,11 @@ class TestNAT44EI(MethodHolder):
local_ip_address=real_ip,
external_ip_address=alias_ip,
external_sw_if_index=0xFFFFFFFF,
- flags=flags)
+ flags=flags,
+ )
def test_static_in(self):
- """ NAT44EI 1:1 NAT initialized from inside network """
+ """NAT44EI 1:1 NAT initialized from inside network"""
nat_ip = "10.0.0.10"
self.tcp_port_out = 6303
@@ -1309,14 +1375,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
sm = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual(sm[0].tag, '')
+ self.assertEqual(sm[0].tag, "")
self.assertEqual(sm[0].protocol, 0)
self.assertEqual(sm[0].local_port, 0)
self.assertEqual(sm[0].external_port, 0)
@@ -1338,7 +1404,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
def test_static_out(self):
- """ NAT44EI 1:1 NAT initialized from outside network """
+ """NAT44EI 1:1 NAT initialized from outside network"""
nat_ip = "10.0.0.20"
self.tcp_port_out = 6303
@@ -1349,11 +1415,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
sm = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(len(sm), 1)
self.assertEqual(sm[0].tag, tag)
@@ -1375,29 +1441,41 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip, True)
def test_static_with_port_in(self):
- """ NAT44EI 1:1 NAPT initialized from inside network """
+ """NAT44EI 1:1 NAPT initialized from inside network"""
self.tcp_port_out = 3606
self.udp_port_out = 3607
self.icmp_id_out = 3608
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1416,29 +1494,41 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg0)
def test_static_with_port_out(self):
- """ NAT44EI 1:1 NAPT initialized from outside network """
+ """NAT44EI 1:1 NAPT initialized from outside network"""
self.tcp_port_out = 30606
self.udp_port_out = 30607
self.icmp_id_out = 30608
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg0.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg1)
@@ -1457,7 +1547,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
def test_static_vrf_aware(self):
- """ NAT44EI 1:1 NAT VRF awareness """
+ """NAT44EI 1:1 NAT VRF awareness"""
nat_ip1 = "10.0.0.30"
nat_ip2 = "10.0.0.40"
@@ -1465,20 +1555,18 @@ class TestNAT44EI(MethodHolder):
self.udp_port_out = 6304
self.icmp_id_out = 6305
- self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
- vrf_id=10)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
- vrf_id=10)
+ self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg4.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
+ )
# inside interface VRF match NAT44EI static mapping VRF
pkts = self.create_stream_in(self.pg4, self.pg3)
@@ -1497,7 +1585,7 @@ class TestNAT44EI(MethodHolder):
self.pg3.assert_nothing_captured()
def test_dynamic_to_static(self):
- """ NAT44EI Switch from dynamic translation to 1:1NAT """
+ """NAT44EI Switch from dynamic translation to 1:1NAT"""
nat_ip = "10.0.0.10"
self.tcp_port_out = 6303
self.udp_port_out = 6304
@@ -1506,11 +1594,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# dynamic
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1532,22 +1620,27 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip, True)
def test_identity_nat(self):
- """ NAT44EI Identity NAT """
+ """NAT44EI Identity NAT"""
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
- flags=flags, is_add=1)
+ ip_address=self.pg0.remote_ip4,
+ sw_if_index=0xFFFFFFFF,
+ flags=flags,
+ is_add=1,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
- TCP(sport=12345, dport=56789))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
+ / TCP(sport=12345, dport=56789)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1569,26 +1662,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(len(sessions), 0)
flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
- flags=flags, vrf_id=1, is_add=1)
+ ip_address=self.pg0.remote_ip4,
+ sw_if_index=0xFFFFFFFF,
+ flags=flags,
+ vrf_id=1,
+ is_add=1,
+ )
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(len(identity_mappings), 2)
def test_multiple_inside_interfaces(self):
- """ NAT44EI multiple non-overlapping address space inside interfaces
- """
+ """NAT44EI multiple non-overlapping address space inside interfaces"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# between two NAT44EI inside interfaces (no translation)
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -1639,26 +1735,24 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg1)
def test_inside_overlapping_interfaces(self):
- """ NAT44EI multiple inside interfaces with overlapping address space
- """
+ """NAT44EI multiple inside interfaces with overlapping address space"""
static_nat_ip = "10.0.0.10"
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg3.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg4.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg5.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg6.sw_if_index,
- flags=flags, is_add=1)
- self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
- vrf_id=20)
+ sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
+ )
+ self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
# between NAT44EI inside interfaces with same VRF (no translation)
pkts = self.create_stream_in(self.pg4, self.pg5)
@@ -1669,9 +1763,11 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_no_translation(capture, self.pg4, self.pg5)
# between NAT44EI inside interfaces with different VRF (hairpinning)
- p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
- IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
- TCP(sport=1234, dport=5678))
+ p = (
+ Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
+ / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
+ / TCP(sport=1234, dport=5678)
+ )
self.pg4.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1723,16 +1819,12 @@ class TestNAT44EI(MethodHolder):
# pg5 session dump
addresses = self.vapi.nat44_ei_address_dump()
self.assertEqual(len(addresses), 1)
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg5.remote_ip4, 10)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
self.assertEqual(len(sessions), 3)
for session in sessions:
- self.assertFalse(session.flags &
- self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg5.remote_ip4)
- self.assertEqual(session.outside_ip_address,
- addresses[0].ip_address)
+ self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
+ self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
@@ -1765,44 +1857,38 @@ class TestNAT44EI(MethodHolder):
addresses = self.vapi.nat44_ei_address_dump()
self.assertEqual(len(addresses), 1)
for user in users:
- sessions = self.vapi.nat44_ei_user_session_dump(user.ip_address,
- user.vrf_id)
+ sessions = self.vapi.nat44_ei_user_session_dump(
+ user.ip_address, user.vrf_id
+ )
for session in sessions:
self.assertEqual(user.ip_address, session.inside_ip_address)
self.assertTrue(session.total_bytes > session.total_pkts > 0)
- self.assertTrue(session.protocol in
- [IP_PROTOS.tcp, IP_PROTOS.udp,
- IP_PROTOS.icmp])
+ self.assertTrue(
+ session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
+ )
# pg4 session dump
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg4.remote_ip4, 10)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
self.assertGreaterEqual(len(sessions), 4)
for session in sessions:
- self.assertFalse(
- session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg4.remote_ip4)
- self.assertEqual(session.outside_ip_address,
- addresses[0].ip_address)
+ self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
+ self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
# pg6 session dump
- sessions = self.vapi.nat44_ei_user_session_dump(
- self.pg6.remote_ip4, 20)
+ sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
self.assertGreaterEqual(len(sessions), 3)
for session in sessions:
+ self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
+ self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
+ self.assertEqual(str(session.outside_ip_address), static_nat_ip)
self.assertTrue(
- session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
- self.assertEqual(str(session.inside_ip_address),
- self.pg6.remote_ip4)
- self.assertEqual(str(session.outside_ip_address),
- static_nat_ip)
- self.assertTrue(session.inside_port in
- [self.tcp_port_in, self.udp_port_in,
- self.icmp_id_in])
+ session.inside_port
+ in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
+ )
def test_hairpinning(self):
- """ NAT44EI hairpinning - 1:1 NAPT """
+ """NAT44EI hairpinning - 1:1 NAPT"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -1814,22 +1900,28 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
-
- cnt = self.statistics['/nat44-ei/hairpinning']
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1848,14 +1940,16 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -1873,13 +1967,15 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
- self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
- 2+(1 if self.vpp_worker_count > 0 else 0))
+ self.assertEqual(
+ after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+ 2 + (1 if self.vpp_worker_count > 0 else 0),
+ )
def test_hairpinning2(self):
- """ NAT44EI hairpinning - 1:1 NAT"""
+ """NAT44EI hairpinning - 1:1 NAT"""
server1_nat_ip = "10.0.0.10"
server2_nat_ip = "10.0.0.11"
@@ -1892,11 +1988,11 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for servers
self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
@@ -1904,17 +2000,23 @@ class TestNAT44EI(MethodHolder):
# host to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1942,17 +2044,23 @@ class TestNAT44EI(MethodHolder):
# server1 to host
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -1977,17 +2085,23 @@ class TestNAT44EI(MethodHolder):
# server2 to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -2015,17 +2129,23 @@ class TestNAT44EI(MethodHolder):
# server1 to server2
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -2049,7 +2169,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_hairpinning_avoid_inf_loop(self):
- """ NAT44EI hairpinning - 1:1 NAPT avoid infinite loop """
+ """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -2061,34 +2181,42 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
# add another static mapping that maps pg0.local_ip4 address to itself
self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
# send packet from host to VPP (the packet should get dropped)
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.pg0.local_ip4) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.pg0.local_ip4)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
# Here VPP used to crash due to an infinite loop
- cnt = self.statistics['/nat44-ei/hairpinning']
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2107,14 +2235,16 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2132,16 +2262,18 @@ class TestNAT44EI(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
- self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
- 2+(1 if self.vpp_worker_count > 0 else 0))
+ self.assertEqual(
+ after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+ 2 + (1 if self.vpp_worker_count > 0 else 0),
+ )
def test_interface_addr(self):
- """ NAT44EI acquire addresses from interface """
+ """NAT44EI acquire addresses from interface"""
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
# no address in NAT pool
addresses = self.vapi.nat44_ei_address_dump()
@@ -2159,22 +2291,20 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(0, len(addresses))
def test_interface_addr_static_mapping(self):
- """ NAT44EI Static mapping with addresses from interface """
+ """NAT44EI Static mapping with addresses from interface"""
tag = "testTAG"
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
self.nat44_add_static_mapping(
- '1.2.3.4',
- external_sw_if_index=self.pg7.sw_if_index,
- tag=tag)
+ "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
+ )
# static mappings with external interface
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(1, len(static_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- static_mappings[0].external_sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
self.assertEqual(static_mappings[0].tag, tag)
# configure interface address and check static mappings
@@ -2184,8 +2314,7 @@ class TestNAT44EI(MethodHolder):
resolved = False
for sm in static_mappings:
if sm.external_sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(sm.external_ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
@@ -2194,8 +2323,7 @@ class TestNAT44EI(MethodHolder):
self.pg7.unconfig_ip4()
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(1, len(static_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- static_mappings[0].external_sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
self.assertEqual(static_mappings[0].tag, tag)
# configure interface address again and check static mappings
@@ -2205,40 +2333,37 @@ class TestNAT44EI(MethodHolder):
resolved = False
for sm in static_mappings:
if sm.external_sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(sm.external_ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
self.assertEqual(sm.tag, tag)
resolved = True
self.assertTrue(resolved)
# remove static mapping
self.nat44_add_static_mapping(
- '1.2.3.4',
- external_sw_if_index=self.pg7.sw_if_index,
- tag=tag,
- is_add=0)
+ "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
+ )
static_mappings = self.vapi.nat44_ei_static_mapping_dump()
self.assertEqual(0, len(static_mappings))
def test_interface_addr_identity_nat(self):
- """ NAT44EI Identity NAT with addresses from interface """
+ """NAT44EI Identity NAT with addresses from interface"""
port = 53053
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg7.sw_if_index)
+ is_add=1, sw_if_index=self.pg7.sw_if_index
+ )
self.vapi.nat44_ei_add_del_identity_mapping(
- ip_address=b'0',
+ ip_address=b"0",
sw_if_index=self.pg7.sw_if_index,
port=port,
protocol=IP_PROTOS.tcp,
- is_add=1)
+ is_add=1,
+ )
# identity mappings with external interface
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(1, len(identity_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- identity_mappings[0].sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
# configure interface address and check identity mappings
self.pg7.config_ip4()
@@ -2247,8 +2372,9 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(2, len(identity_mappings))
for sm in identity_mappings:
if sm.sw_if_index == 0xFFFFFFFF:
- self.assertEqual(str(identity_mappings[0].ip_address),
- self.pg7.local_ip4)
+ self.assertEqual(
+ str(identity_mappings[0].ip_address), self.pg7.local_ip4
+ )
self.assertEqual(port, identity_mappings[0].port)
self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
resolved = True
@@ -2258,11 +2384,10 @@ class TestNAT44EI(MethodHolder):
self.pg7.unconfig_ip4()
identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
self.assertEqual(1, len(identity_mappings))
- self.assertEqual(self.pg7.sw_if_index,
- identity_mappings[0].sw_if_index)
+ self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
def test_ipfix_nat44_sess(self):
- """ NAT44EI IPFIX logging NAT44EI session created/deleted """
+ """NAT44EI IPFIX logging NAT44EI session created/deleted"""
self.ipfix_domain_id = 10
self.ipfix_src_port = 20202
collector_port = 30303
@@ -2270,19 +2395,21 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10,
- collector_port=collector_port)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ collector_port=collector_port,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -2301,8 +2428,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, collector_port)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2312,25 +2438,29 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_nat44_ses(data)
def test_ipfix_addr_exhausted(self):
- """ NAT44EI IPFIX logging NAT addresses exhausted """
+ """NAT44EI IPFIX logging NAT addresses exhausted"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=3025))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=3025)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2345,8 +2475,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2356,15 +2485,15 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_addr_exhausted(data)
def test_ipfix_max_sessions(self):
- """ NAT44EI IPFIX logging maximum session entries exceeded """
+ """NAT44EI IPFIX logging maximum session entries exceeded"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
max_sessions_per_thread = self.max_translations
max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
@@ -2372,26 +2501,32 @@ class TestNAT44EI(MethodHolder):
pkts = []
for i in range(0, max_sessions):
src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=src, dst=self.pg1.remote_ip4) /
- TCP(sport=1025))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=src, dst=self.pg1.remote_ip4)
+ / TCP(sport=1025)
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.get_capture(max_sessions)
- self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
- src_address=self.pg3.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=1)
-
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1025))
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg3.remote_ip4,
+ src_address=self.pg3.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ )
+ self.vapi.nat44_ei_ipfix_enable_disable(
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
+ )
+
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=1025)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2406,8 +2541,7 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
self.assertEqual(p[UDP].sport, self.ipfix_src_port)
self.assertEqual(p[UDP].dport, 4739)
- self.assertEqual(p[IPFIX].observationDomainID,
- self.ipfix_domain_id)
+ self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
if p.haslayer(Template):
ipfix.add_template(p.getlayer(Template))
# verify events in data set
@@ -2417,22 +2551,23 @@ class TestNAT44EI(MethodHolder):
self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
def test_syslog_apmap(self):
- """ NAT44EI syslog address and port mapping creation and deletion """
- self.vapi.syslog_set_filter(
- self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
+ """NAT44EI syslog address and port mapping creation and deletion"""
+ self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=20))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=20)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2448,22 +2583,25 @@ class TestNAT44EI(MethodHolder):
self.verify_syslog_apmap(capture[0][Raw].load, False)
def test_pool_addr_fib(self):
- """ NAT44EI add pool addresses to FIB """
- static_addr = '10.0.0.10'
+ """NAT44EI add pool addresses to FIB"""
+ static_addr = "10.0.0.10"
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
# NAT44EI address
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2472,9 +2610,12 @@ class TestNAT44EI(MethodHolder):
self.assertTrue(capture[0][ARP].op, ARP.is_at)
# 1:1 NAT address
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=static_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=static_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2483,9 +2624,12 @@ class TestNAT44EI(MethodHolder):
self.assertTrue(capture[0][ARP].op, ARP.is_at)
# send ARP to non-NAT44EI interface
- p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
+ p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg2.remote_ip4,
+ hwsrc=self.pg2.remote_mac,
+ )
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2493,27 +2637,32 @@ class TestNAT44EI(MethodHolder):
# remove addresses and verify
self.nat44_add_address(self.nat_addr, is_add=0)
- self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
- is_add=0)
-
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=self.nat_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
+
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=self.nat_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
- p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
- ARP(op=ARP.who_has, pdst=static_addr,
- psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
+ p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
+ op=ARP.who_has,
+ pdst=static_addr,
+ psrc=self.pg1.remote_ip4,
+ hwsrc=self.pg1.remote_mac,
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.assert_nothing_captured()
def test_vrf_mode(self):
- """ NAT44EI tenant VRF aware address pool mode """
+ """NAT44EI tenant VRF aware address pool mode"""
vrf_id1 = 1
vrf_id2 = 2
@@ -2522,8 +2671,8 @@ class TestNAT44EI(MethodHolder):
self.pg0.unconfig_ip4()
self.pg1.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
self.pg0.set_table_ip4(vrf_id1)
self.pg1.set_table_ip4(vrf_id2)
self.pg0.config_ip4()
@@ -2535,14 +2684,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
try:
# first VRF
@@ -2570,11 +2719,11 @@ class TestNAT44EI(MethodHolder):
self.pg1.config_ip4()
self.pg0.resolve_arp()
self.pg1.resolve_arp()
- self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
def test_vrf_feature_independent(self):
- """ NAT44EI tenant VRF independent address pool mode """
+ """NAT44EI tenant VRF independent address pool mode"""
nat_ip1 = "10.0.0.10"
nat_ip2 = "10.0.0.11"
@@ -2583,14 +2732,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(nat_ip2, vrf_id=99)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
# first VRF
pkts = self.create_stream_in(self.pg0, self.pg2)
@@ -2609,16 +2758,16 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, nat_ip1)
def test_dynamic_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address """
+ """NAT44EI interfaces without configured IP address"""
self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg7, self.pg8)
@@ -2637,17 +2786,17 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg7)
def test_static_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address - 1:1 NAT """
+ """NAT44EI interfaces without configured IP address - 1:1 NAT"""
self.create_routes_and_neigbors()
self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg8)
@@ -2666,7 +2815,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture, self.nat_addr, True)
def test_static_with_port_ipless_interfaces(self):
- """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
+ """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
self.tcp_port_out = 30606
self.udp_port_out = 30607
@@ -2674,22 +2823,34 @@ class TestNAT44EI(MethodHolder):
self.create_routes_and_neigbors()
self.nat44_add_address(self.nat_addr)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.tcp_port_in, self.tcp_port_out,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.udp_port_in, self.udp_port_out,
- proto=IP_PROTOS.udp)
- self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
- self.icmp_id_in, self.icmp_id_out,
- proto=IP_PROTOS.icmp)
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.tcp_port_in,
+ self.tcp_port_out,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.udp_port_in,
+ self.udp_port_out,
+ proto=IP_PROTOS.udp,
+ )
+ self.nat44_add_static_mapping(
+ self.pg7.remote_ip4,
+ self.nat_addr,
+ self.icmp_id_in,
+ self.icmp_id_out,
+ proto=IP_PROTOS.icmp,
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg7.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg8.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg8.sw_if_index, is_add=1
+ )
# out2in
pkts = self.create_stream_out(self.pg8)
@@ -2708,23 +2869,25 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
def test_static_unknown_proto(self):
- """ NAT44EI 1:1 translate packet with unknown protocol """
+ """NAT44EI 1:1 translate packet with unknown protocol"""
nat_ip = "10.0.0.10"
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# in2out
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- GRE() /
- IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / GRE()
+ / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2740,11 +2903,13 @@ class TestNAT44EI(MethodHolder):
raise
# out2in
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- GRE() /
- IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=nat_ip)
+ / GRE()
+ / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2760,8 +2925,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_hairpinning_static_unknown_proto(self):
- """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
- """
+ """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -2773,18 +2937,20 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_static_mapping(server.ip4, server_nat_ip)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# host to server
- p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
- IP(src=host.ip4, dst=server_nat_ip) /
- GRE() /
- IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=host.mac)
+ / IP(src=host.ip4, dst=server_nat_ip)
+ / GRE()
+ / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2800,11 +2966,13 @@ class TestNAT44EI(MethodHolder):
raise
# server to host
- p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
- IP(src=server.ip4, dst=host_nat_ip) /
- GRE() /
- IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
- TCP(sport=1234, dport=1234))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=server.mac)
+ / IP(src=server.ip4, dst=host_nat_ip)
+ / GRE()
+ / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
+ / TCP(sport=1234, dport=1234)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2820,10 +2988,11 @@ class TestNAT44EI(MethodHolder):
raise
def test_output_feature(self):
- """ NAT44EI output feature (in2out postrouting) """
+ """NAT44EI output feature (in2out postrouting)"""
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg3.sw_if_index, is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# in2out
pkts = self.create_stream_in(self.pg0, self.pg3)
@@ -2850,25 +3019,32 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_no_translation(capture, self.pg2, self.pg0)
def test_output_feature_vrf_aware(self):
- """ NAT44EI output feature VRF aware (in2out postrouting) """
+ """NAT44EI output feature VRF aware (in2out postrouting)"""
nat_ip_vrf10 = "10.0.0.10"
nat_ip_vrf20 = "10.0.0.20"
- r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)],
- table_id=10)
- r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
- [VppRoutePath(self.pg3.remote_ip4,
- self.pg3.sw_if_index)],
- table_id=20)
+ r1 = VppIpRoute(
+ self,
+ self.pg3.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+ table_id=10,
+ )
+ r2 = VppIpRoute(
+ self,
+ self.pg3.remote_ip4,
+ 32,
+ [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
+ table_id=20,
+ )
r1.add_vpp_config()
r2.add_vpp_config()
self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg3.sw_if_index, is_add=1)
+ sw_if_index=self.pg3.sw_if_index, is_add=1
+ )
# in2out VRF 10
pkts = self.create_stream_in(self.pg4, self.pg3)
@@ -2903,7 +3079,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_in(capture, self.pg6)
def test_output_feature_hairpinning(self):
- """ NAT44EI output feature hairpinning (in2out postrouting) """
+ """NAT44EI output feature hairpinning (in2out postrouting)"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
host_in_port = 1234
@@ -2913,19 +3089,27 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg0.sw_if_index, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_add_del_output_interface(
- sw_if_index=self.pg1.sw_if_index, is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2945,9 +3129,11 @@ class TestNAT44EI(MethodHolder):
raise
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -2966,7 +3152,7 @@ class TestNAT44EI(MethodHolder):
raise
def test_one_armed_nat44(self):
- """ NAT44EI One armed NAT """
+ """NAT44EI One armed NAT"""
remote_host = self.pg9.remote_hosts[0]
local_host = self.pg9.remote_hosts[1]
external_port = 0
@@ -2974,16 +3160,18 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg9.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg9.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg9.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
+ )
# in2out
- p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
- IP(src=local_host.ip4, dst=remote_host.ip4) /
- TCP(sport=12345, dport=80))
+ p = (
+ Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
+ / IP(src=local_host.ip4, dst=remote_host.ip4)
+ / TCP(sport=12345, dport=80)
+ )
self.pg9.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3003,9 +3191,11 @@ class TestNAT44EI(MethodHolder):
raise
# out2in
- p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
- IP(src=remote_host.ip4, dst=self.nat_addr) /
- TCP(sport=80, dport=external_port))
+ p = (
+ Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
+ / IP(src=remote_host.ip4, dst=self.nat_addr)
+ / TCP(sport=80, dport=external_port)
+ )
self.pg9.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3028,21 +3218,21 @@ class TestNAT44EI(MethodHolder):
else:
node = "nat44-ei-classify"
- err = self.statistics.get_err_counter('/err/%s/next in2out' % node)
+ err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
self.assertEqual(err, 1)
- err = self.statistics.get_err_counter('/err/%s/next out2in' % node)
+ err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
self.assertEqual(err, 1)
def test_del_session(self):
- """ NAT44EI delete session """
+ """NAT44EI delete session"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
@@ -3057,12 +3247,14 @@ class TestNAT44EI(MethodHolder):
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
protocol=sessions[0].protocol,
- flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.vapi.nat44_ei_del_session(
address=sessions[1].outside_ip_address,
port=sessions[1].outside_port,
- protocol=sessions[1].protocol)
+ protocol=sessions[1].protocol,
+ )
sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
self.assertEqual(nsessions - len(sessions), 2)
@@ -3071,60 +3263,56 @@ class TestNAT44EI(MethodHolder):
address=sessions[0].inside_ip_address,
port=sessions[0].inside_port,
protocol=sessions[0].protocol,
- flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.verify_no_nat44_user()
def test_frag_in_order(self):
- """ NAT44EI translate fragments arriving in order """
+ """NAT44EI translate fragments arriving in order"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.frag_in_order(proto=IP_PROTOS.tcp)
self.frag_in_order(proto=IP_PROTOS.udp)
self.frag_in_order(proto=IP_PROTOS.icmp)
def test_frag_forwarding(self):
- """ NAT44EI forwarding fragment test """
+ """NAT44EI forwarding fragment test"""
self.vapi.nat44_ei_add_del_interface_addr(
- is_add=1,
- sw_if_index=self.pg1.sw_if_index)
+ is_add=1, sw_if_index=self.pg1.sw_if_index
+ )
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
data = b"A" * 16 + b"B" * 16 + b"C" * 3
- pkts = self.create_stream_frag(self.pg1,
- self.pg0.remote_ip4,
- 4789,
- 4789,
- data,
- proto=IP_PROTOS.udp)
+ pkts = self.create_stream_frag(
+ self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
frags = self.pg0.get_capture(len(pkts))
- p = self.reass_frags_and_verify(frags,
- self.pg1.remote_ip4,
- self.pg0.remote_ip4)
+ p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
self.assertEqual(p[UDP].sport, 4789)
self.assertEqual(p[UDP].dport, 4789)
self.assertEqual(data, p[Raw].load)
def test_reass_hairpinning(self):
- """ NAT44EI fragments hairpinning """
+ """NAT44EI fragments hairpinning"""
server_addr = self.pg0.remote_hosts[1].ip4
host_in_port = random.randint(1025, 65535)
@@ -3134,63 +3322,85 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server_addr, self.nat_addr,
- server_in_port,
- server_out_port,
- proto=IP_PROTOS.tcp)
- self.nat44_add_static_mapping(server_addr, self.nat_addr,
- server_in_port,
- server_out_port,
- proto=IP_PROTOS.udp)
+ self.nat44_add_static_mapping(
+ server_addr,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+ self.nat44_add_static_mapping(
+ server_addr,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.udp,
+ )
self.nat44_add_static_mapping(server_addr, self.nat_addr)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.tcp)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.udp)
- self.reass_hairpinning(server_addr, server_in_port, server_out_port,
- host_in_port, proto=IP_PROTOS.icmp)
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.tcp,
+ )
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.udp,
+ )
+ self.reass_hairpinning(
+ server_addr,
+ server_in_port,
+ server_out_port,
+ host_in_port,
+ proto=IP_PROTOS.icmp,
+ )
def test_frag_out_of_order(self):
- """ NAT44EI translate fragments arriving out of order """
+ """NAT44EI translate fragments arriving out of order"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.frag_out_of_order(proto=IP_PROTOS.tcp)
self.frag_out_of_order(proto=IP_PROTOS.udp)
self.frag_out_of_order(proto=IP_PROTOS.icmp)
def test_port_restricted(self):
- """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
+ """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=1,
- psid_offset=6,
- psid_length=6,
- psid=10)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=4567, dport=22))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
+ alg=1, psid_offset=6, psid_length=6, psid=10
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=4567, dport=22)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3210,24 +3420,26 @@ class TestNAT44EI(MethodHolder):
raise
def test_port_range(self):
- """ NAT44EI External address port range """
+ """NAT44EI External address port range"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=2,
- start_port=1025,
- end_port=1027)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
+ alg=2, start_port=1025, end_port=1027
+ )
pkts = []
for port in range(0, 5):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=1125 + port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=1125 + port)
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -3239,14 +3451,14 @@ class TestNAT44EI(MethodHolder):
self.assertLessEqual(tcp.sport, 1027)
def test_multiple_outside_vrf(self):
- """ NAT44EI Multiple outside VRF """
+ """NAT44EI Multiple outside VRF"""
vrf_id1 = 1
vrf_id2 = 2
self.pg1.unconfig_ip4()
self.pg2.unconfig_ip4()
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
- self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
+ self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
self.pg1.set_table_ip4(vrf_id1)
self.pg2.set_table_ip4(vrf_id2)
self.pg1.config_ip4()
@@ -3257,14 +3469,14 @@ class TestNAT44EI(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg2.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=1
+ )
try:
# first VRF
@@ -3313,20 +3525,26 @@ class TestNAT44EI(MethodHolder):
self.pg2.resolve_arp()
def test_mss_clamping(self):
- """ NAT44EI TCP MSS clamping """
+ """NAT44EI TCP MSS clamping"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
-
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="S", options=[('MSS', 1400)]))
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(
+ sport=self.tcp_port_in,
+ dport=self.tcp_external_port,
+ flags="S",
+ options=[("MSS", 1400)],
+ )
+ )
self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
self.pg0.add_stream(p)
@@ -3353,21 +3571,22 @@ class TestNAT44EI(MethodHolder):
self.verify_mss_value(capture[0], 1400)
def test_ha_send(self):
- """ NAT44EI Send HA session synchronization events (active) """
+ """NAT44EI Send HA session synchronization events (active)"""
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.nat44_add_address(self.nat_addr)
self.vapi.nat44_ei_ha_set_listener(
- ip_address=self.pg3.local_ip4, port=12345, path_mtu=512)
+ ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
+ )
self.vapi.nat44_ei_ha_set_failover(
- ip_address=self.pg3.remote_ip4, port=12346,
- session_refresh_interval=10)
+ ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
+ )
bind_layers(UDP, HANATStateSync, sport=12345)
# create sessions
@@ -3379,7 +3598,7 @@ class TestNAT44EI(MethodHolder):
self.verify_capture_out(capture)
# active send HA events
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/add-event-send']
+ stats = self.statistics["/nat44-ei/ha/add-event-send"]
self.assertEqual(stats[:, 0].sum(), 3)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3407,23 +3626,29 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(event.fib_index, 0)
# ACK received events
- ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=seq, flags='ACK',
- thread_index=hanat.thread_index))
+ ack = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
+ )
+ )
self.pg3.add_stream(ack)
self.pg_start()
- stats = self.statistics['/nat44-ei/ha/ack-recv']
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
# delete one session
self.pg_enable_capture(self.pg_interfaces)
self.vapi.nat44_ei_del_session(
- address=self.pg0.remote_ip4, port=self.tcp_port_in,
- protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE)
+ address=self.pg0.remote_ip4,
+ port=self.tcp_port_in,
+ protocol=IP_PROTOS.tcp,
+ flags=self.config_flags.NAT44_EI_IF_INSIDE,
+ )
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/del-event-send']
+ stats = self.statistics["/nat44-ei/ha/del-event-send"]
self.assertEqual(stats[:, 0].sum(), 1)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3438,9 +3663,9 @@ class TestNAT44EI(MethodHolder):
# do not send ACK, active retry send HA event again
self.pg_enable_capture(self.pg_interfaces)
self.virtual_sleep(12)
- stats = self.statistics['/nat44-ei/ha/retry-count']
+ stats = self.statistics["/nat44-ei/ha/retry-count"]
self.assertEqual(stats[:, 0].sum(), 3)
- stats = self.statistics['/nat44-ei/ha/missed-count']
+ stats = self.statistics["/nat44-ei/ha/missed-count"]
self.assertEqual(stats[:, 0].sum(), 1)
capture = self.pg3.get_capture(3)
for packet in capture:
@@ -3453,7 +3678,7 @@ class TestNAT44EI(MethodHolder):
self.pg_start()
self.pg0.get_capture(2)
self.vapi.nat44_ei_ha_flush()
- stats = self.statistics['/nat44-ei/ha/refresh-event-send']
+ stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
self.assertEqual(stats[:, 0].sum(), 2)
capture = self.pg3.get_capture(1)
p = capture[0]
@@ -3480,29 +3705,33 @@ class TestNAT44EI(MethodHolder):
self.assertEqual(event.total_pkts, 2)
self.assertGreater(event.total_bytes, 0)
- stats = self.statistics['/nat44-ei/ha/ack-recv']
- ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=seq, flags='ACK',
- thread_index=hanat.thread_index))
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
+ ack = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
+ )
+ )
self.pg3.add_stream(ack)
self.pg_start()
- stats = self.statistics['/nat44-ei/ha/ack-recv']
+ stats = self.statistics["/nat44-ei/ha/ack-recv"]
self.assertEqual(stats[:, 0].sum(), 2)
def test_ha_recv(self):
- """ NAT44EI Receive HA session synchronization events (passive) """
+ """NAT44EI Receive HA session synchronization events (passive)"""
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
- self.vapi.nat44_ei_ha_set_listener(ip_address=self.pg3.local_ip4,
- port=12345, path_mtu=512)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
+ self.vapi.nat44_ei_ha_set_listener(
+ ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
+ )
bind_layers(UDP, HANATStateSync, sport=12345)
# this is a bit tricky - HA dictates thread index due to how it's
@@ -3512,11 +3741,12 @@ class TestNAT44EI(MethodHolder):
# IP address) and out2in (based on outside port)
# first choose a thread index which is correct for IP
- thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4,
- self.vpp_worker_count)
+ thread_index = get_nat44_ei_in2out_worker_index(
+ self.pg0.remote_ip4, self.vpp_worker_count
+ )
# now pick a port which is correct for given thread
- port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count))
+ port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
self.udp_port_out = 1024 + random.randint(1, port_per_thread)
if self.vpp_worker_count > 0:
@@ -3524,25 +3754,43 @@ class TestNAT44EI(MethodHolder):
self.udp_port_out += port_per_thread * (thread_index - 1)
# send HA session add events to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=1, events=[
- Event(event_type='add', protocol='tcp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.tcp_port_in, out_port=self.tcp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.tcp_external_port,
- ehn_port=self.tcp_external_port, fib_index=0),
- Event(event_type='add', protocol='udp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.udp_port_in, out_port=self.udp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.udp_external_port,
- ehn_port=self.udp_external_port, fib_index=0)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=1,
+ events=[
+ Event(
+ event_type="add",
+ protocol="tcp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.tcp_port_in,
+ out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port,
+ fib_index=0,
+ ),
+ Event(
+ event_type="add",
+ protocol="udp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.udp_port_in,
+ out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port,
+ fib_index=0,
+ ),
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
@@ -3557,49 +3805,57 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 1)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
self.assertEqual(hanat.thread_index, thread_index)
- stats = self.statistics['/nat44-ei/ha/ack-send']
+ stats = self.statistics["/nat44-ei/ha/ack-send"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics['/nat44-ei/ha/add-event-recv']
+ stats = self.statistics["/nat44-ei/ha/add-event-recv"]
self.assertEqual(stats[:, 0].sum(), 2)
- users = self.statistics['/nat44-ei/total-users']
+ users = self.statistics["/nat44-ei/total-users"]
self.assertEqual(users[:, 0].sum(), 1)
- sessions = self.statistics['/nat44-ei/total-sessions']
+ sessions = self.statistics["/nat44-ei/total-sessions"]
self.assertEqual(sessions[:, 0].sum(), 2)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
# there should be 2 sessions created by HA
sessions = self.vapi.nat44_ei_user_session_dump(
- users[0].ip_address, users[0].vrf_id)
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 2)
for session in sessions:
- self.assertEqual(str(session.inside_ip_address),
- self.pg0.remote_ip4)
- self.assertEqual(str(session.outside_ip_address),
- self.nat_addr)
- self.assertIn(session.inside_port,
- [self.tcp_port_in, self.udp_port_in])
- self.assertIn(session.outside_port,
- [self.tcp_port_out, self.udp_port_out])
+ self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
+ self.assertEqual(str(session.outside_ip_address), self.nat_addr)
+ self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
+ self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
# send HA session delete event to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=2, events=[
- Event(event_type='del', protocol='udp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.udp_port_in, out_port=self.udp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.udp_external_port,
- ehn_port=self.udp_external_port, fib_index=0)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=2,
+ events=[
+ Event(
+ event_type="del",
+ protocol="udp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.udp_port_in,
+ out_port=self.udp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.udp_external_port,
+ ehn_port=self.udp_external_port,
+ fib_index=0,
+ )
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
@@ -3614,37 +3870,49 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 2)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
# now we should have only 1 session, 1 deleted by HA
- sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address,
- users[0].vrf_id)
+ sessions = self.vapi.nat44_ei_user_session_dump(
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 1)
- stats = self.statistics['/nat44-ei/ha/del-event-recv']
+ stats = self.statistics["/nat44-ei/ha/del-event-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics.get_err_counter(
- '/err/nat44-ei-ha/pkts-processed')
+ stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
self.assertEqual(stats, 2)
# send HA session refresh event to failover/passive
- p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
- IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
- UDP(sport=12346, dport=12345) /
- HANATStateSync(sequence_number=3, events=[
- Event(event_type='refresh', protocol='tcp',
- in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
- in_port=self.tcp_port_in, out_port=self.tcp_port_out,
- eh_addr=self.pg1.remote_ip4,
- ehn_addr=self.pg1.remote_ip4,
- eh_port=self.tcp_external_port,
- ehn_port=self.tcp_external_port, fib_index=0,
- total_bytes=1024, total_pkts=2)],
- thread_index=thread_index))
+ p = (
+ Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
+ / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
+ / UDP(sport=12346, dport=12345)
+ / HANATStateSync(
+ sequence_number=3,
+ events=[
+ Event(
+ event_type="refresh",
+ protocol="tcp",
+ in_addr=self.pg0.remote_ip4,
+ out_addr=self.nat_addr,
+ in_port=self.tcp_port_in,
+ out_port=self.tcp_port_out,
+ eh_addr=self.pg1.remote_ip4,
+ ehn_addr=self.pg1.remote_ip4,
+ eh_port=self.tcp_external_port,
+ ehn_port=self.tcp_external_port,
+ fib_index=0,
+ total_bytes=1024,
+ total_pkts=2,
+ )
+ ],
+ thread_index=thread_index,
+ )
+ )
self.pg3.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3658,29 +3926,30 @@ class TestNAT44EI(MethodHolder):
raise
else:
self.assertEqual(hanat.sequence_number, 3)
- self.assertEqual(hanat.flags, 'ACK')
+ self.assertEqual(hanat.flags, "ACK")
self.assertEqual(hanat.version, 1)
users = self.vapi.nat44_ei_user_dump()
self.assertEqual(len(users), 1)
- self.assertEqual(str(users[0].ip_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
sessions = self.vapi.nat44_ei_user_session_dump(
- users[0].ip_address, users[0].vrf_id)
+ users[0].ip_address, users[0].vrf_id
+ )
self.assertEqual(len(sessions), 1)
session = sessions[0]
self.assertEqual(session.total_bytes, 1024)
self.assertEqual(session.total_pkts, 2)
- stats = self.statistics['/nat44-ei/ha/refresh-event-recv']
+ stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
self.assertEqual(stats[:, 0].sum(), 1)
- stats = self.statistics.get_err_counter(
- '/err/nat44-ei-ha/pkts-processed')
+ stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
self.assertEqual(stats, 3)
# send packet to test session created by HA
- p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3706,7 +3975,7 @@ class TestNAT44EI(MethodHolder):
return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
def test_set_frame_queue_nelts(self):
- """ NAT44EI API test - worker handoff frame queue elements """
+ """NAT44EI API test - worker handoff frame queue elements"""
self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
def show_commands_at_teardown(self):
@@ -3718,11 +3987,10 @@ class TestNAT44EI(MethodHolder):
self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
self.logger.info(self.vapi.cli("show nat44 ei ha"))
- self.logger.info(
- self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
+ self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
def test_outside_address_distribution(self):
- """ Outside address distribution based on source address """
+ """Outside address distribution based on source address"""
x = 100
nat_addresses = []
@@ -3733,16 +4001,18 @@ class TestNAT44EI(MethodHolder):
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=nat_addresses[0],
last_ip_address=nat_addresses[-1],
- vrf_id=0xFFFFFFFF, is_add=1)
+ vrf_id=0xFFFFFFFF,
+ is_add=1,
+ )
self.pg0.generate_remote_hosts(x)
@@ -3750,11 +4020,12 @@ class TestNAT44EI(MethodHolder):
for i in range(x):
info = self.create_packet_info(self.pg0, self.pg1)
payload = self.info_to_payload(info)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_hosts[i].ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=7000+i, dport=8000+i) /
- Raw(payload))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=7000 + i, dport=8000 + i)
+ / Raw(payload)
+ )
info.data = p
pkts.append(p)
@@ -3772,21 +4043,23 @@ class TestNAT44EI(MethodHolder):
packed = socket.inet_aton(p_sent[IP].src)
numeric = struct.unpack("!L", packed)[0]
numeric = socket.htonl(numeric)
- a = nat_addresses[(numeric-1) % len(nat_addresses)]
+ a = nat_addresses[(numeric - 1) % len(nat_addresses)]
self.assertEqual(
- a, p_recvd[IP].src,
+ a,
+ p_recvd[IP].src,
"Invalid packet (src IP %s translated to %s, but expected %s)"
- % (p_sent[IP].src, p_recvd[IP].src, a))
+ % (p_sent[IP].src, p_recvd[IP].src, a),
+ )
def test_default_user_sessions(self):
- """ NAT44EI default per-user session limit is used and reported """
+ """NAT44EI default per-user session limit is used and reported"""
nat44_ei_config = self.vapi.nat44_ei_show_running_config()
# a nonzero default should be reported for user_sessions
self.assertNotEqual(nat44_ei_config.user_sessions, 0)
class TestNAT44Out2InDPO(MethodHolder):
- """ NAT44EI Test Cases using out2in DPO """
+ """NAT44EI Test Cases using out2in DPO"""
@classmethod
def setUpClass(cls):
@@ -3799,8 +4072,8 @@ class TestNAT44Out2InDPO(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
- cls.dst_ip4 = '192.168.70.1'
+ cls.nat_addr = "10.0.0.3"
+ cls.dst_ip4 = "192.168.70.1"
cls.create_pg_interfaces(range(2))
@@ -3812,10 +4085,13 @@ class TestNAT44Out2InDPO(MethodHolder):
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
- r1 = VppIpRoute(cls, "::", 0,
- [VppRoutePath(cls.pg1.remote_ip6,
- cls.pg1.sw_if_index)],
- register=False)
+ r1 = VppIpRoute(
+ cls,
+ "::",
+ 0,
+ [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
+ register=False,
+ )
r1.add_vpp_config()
def setUp(self):
@@ -3830,37 +4106,44 @@ class TestNAT44Out2InDPO(MethodHolder):
self.vapi.cli("clear logging")
def configure_xlat(self):
- self.dst_ip6_pfx = '1:2:3::'
- self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
- self.dst_ip6_pfx)
+ self.dst_ip6_pfx = "1:2:3::"
+ self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
self.dst_ip6_pfx_len = 96
- self.src_ip6_pfx = '4:5:6::'
- self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
- self.src_ip6_pfx)
+ self.src_ip6_pfx = "4:5:6::"
+ self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
self.src_ip6_pfx_len = 96
- self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
- self.src_ip6_pfx_n, self.src_ip6_pfx_len,
- '\x00\x00\x00\x00', 0)
-
- @unittest.skip('Temporary disabled')
+ self.vapi.map_add_domain(
+ self.dst_ip6_pfx_n,
+ self.dst_ip6_pfx_len,
+ self.src_ip6_pfx_n,
+ self.src_ip6_pfx_len,
+ "\x00\x00\x00\x00",
+ 0,
+ )
+
+ @unittest.skip("Temporary disabled")
def test_464xlat_ce(self):
- """ Test 464XLAT CE with NAT44EI """
+ """Test 464XLAT CE with NAT44EI"""
self.configure_xlat()
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=self.nat_addr_n,
last_ip_address=self.nat_addr_n,
- vrf_id=0xFFFFFFFF, is_add=1)
+ vrf_id=0xFFFFFFFF,
+ is_add=1,
+ )
- out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
- self.dst_ip6_pfx_len)
- out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
- self.src_ip6_pfx_len)
+ out_src_ip6 = self.compose_ip6(
+ self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
+ )
+ out_dst_ip6 = self.compose_ip6(
+ self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
+ )
try:
pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
@@ -3868,11 +4151,9 @@ class TestNAT44Out2InDPO(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
- dst_ip=out_src_ip6)
+ self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
- pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
- out_dst_ip6)
+ pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -3880,31 +4161,35 @@ class TestNAT44Out2InDPO(MethodHolder):
self.verify_capture_in(capture, self.pg0)
finally:
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags)
+ sw_if_index=self.pg0.sw_if_index, flags=flags
+ )
self.vapi.nat44_ei_add_del_address_range(
first_ip_address=self.nat_addr_n,
last_ip_address=self.nat_addr_n,
- vrf_id=0xFFFFFFFF)
+ vrf_id=0xFFFFFFFF,
+ )
- @unittest.skip('Temporary disabled')
+ @unittest.skip("Temporary disabled")
def test_464xlat_ce_no_nat(self):
- """ Test 464XLAT CE without NAT44EI """
+ """Test 464XLAT CE without NAT44EI"""
self.configure_xlat()
- out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
- self.dst_ip6_pfx_len)
- out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
- self.src_ip6_pfx_len)
+ out_src_ip6 = self.compose_ip6(
+ self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
+ )
+ out_dst_ip6 = self.compose_ip6(
+ self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
+ )
pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
- nat_ip=out_dst_ip6, same_port=True)
+ self.verify_capture_out_ip6(
+ capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
+ )
pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
self.pg1.add_stream(pkts)
@@ -3915,7 +4200,8 @@ class TestNAT44Out2InDPO(MethodHolder):
class TestNAT44EIMW(MethodHolder):
- """ NAT44EI Test Cases (multiple workers) """
+ """NAT44EI Test Cases (multiple workers)"""
+
vpp_worker_count = 2
max_translations = 10240
max_users = 10240
@@ -3931,7 +4217,7 @@ class TestNAT44EIMW(MethodHolder):
cls.udp_port_out = 6304
cls.icmp_id_in = 6305
cls.icmp_id_out = 6305
- cls.nat_addr = '10.0.0.3'
+ cls.nat_addr = "10.0.0.3"
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
cls.tcp_external_port = 80
@@ -3952,8 +4238,8 @@ class TestNAT44EIMW(MethodHolder):
cls.pg1.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
- cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
+ cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
cls.pg4._local_ip4 = "172.16.255.1"
cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
@@ -3975,8 +4261,8 @@ class TestNAT44EIMW(MethodHolder):
cls.pg9.generate_remote_hosts(2)
cls.pg9.config_ip4()
cls.vapi.sw_interface_add_del_address(
- sw_if_index=cls.pg9.sw_if_index,
- prefix="10.0.0.1/24")
+ sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
+ )
cls.pg9.admin_up()
cls.pg9.resolve_arp()
@@ -3987,16 +4273,15 @@ class TestNAT44EIMW(MethodHolder):
def setUp(self):
super(TestNAT44EIMW, self).setUp()
self.vapi.nat44_ei_plugin_enable_disable(
- sessions=self.max_translations,
- users=self.max_users, enable=1)
+ sessions=self.max_translations, users=self.max_users, enable=1
+ )
def tearDown(self):
super(TestNAT44EIMW, self).tearDown()
if not self.vpp_dead:
self.vapi.nat44_ei_ipfix_enable_disable(
- domain_id=self.ipfix_domain_id,
- src_port=self.ipfix_src_port,
- enable=0)
+ domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
+ )
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
@@ -4004,7 +4289,7 @@ class TestNAT44EIMW(MethodHolder):
self.vapi.cli("clear logging")
def test_hairpinning(self):
- """ NAT44EI hairpinning - 1:1 NAPT """
+ """NAT44EI hairpinning - 1:1 NAPT"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -4018,22 +4303,28 @@ class TestNAT44EIMW(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for server
- self.nat44_add_static_mapping(server.ip4, self.nat_addr,
- server_in_port, server_out_port,
- proto=IP_PROTOS.tcp)
-
- cnt = self.statistics['/nat44-ei/hairpinning']
+ self.nat44_add_static_mapping(
+ server.ip4,
+ self.nat_addr,
+ server_in_port,
+ server_out_port,
+ proto=IP_PROTOS.tcp,
+ )
+
+ cnt = self.statistics["/nat44-ei/hairpinning"]
# send packet from host to server
- p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
- IP(src=host.ip4, dst=self.nat_addr) /
- TCP(sport=host_in_port, dport=server_out_port))
+ p = (
+ Ether(src=host.mac, dst=self.pg0.local_mac)
+ / IP(src=host.ip4, dst=self.nat_addr)
+ / TCP(sport=host_in_port, dport=server_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -4052,15 +4343,17 @@ class TestNAT44EIMW(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
# send reply from server to host
- p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
- IP(src=server.ip4, dst=self.nat_addr) /
- TCP(sport=server_in_port, dport=host_out_port))
+ p = (
+ Ether(src=server.mac, dst=self.pg0.local_mac)
+ / IP(src=server.ip4, dst=self.nat_addr)
+ / TCP(sport=server_in_port, dport=host_out_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -4078,13 +4371,13 @@ class TestNAT44EIMW(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
- after = self.statistics['/nat44-ei/hairpinning']
+ after = self.statistics["/nat44-ei/hairpinning"]
if_idx = self.pg0.sw_if_index
self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
def test_hairpinning2(self):
- """ NAT44EI hairpinning - 1:1 NAT"""
+ """NAT44EI hairpinning - 1:1 NAT"""
server1_nat_ip = "10.0.0.10"
server2_nat_ip = "10.0.0.11"
@@ -4097,11 +4390,11 @@ class TestNAT44EIMW(MethodHolder):
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT44_EI_IF_INSIDE
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- flags=flags, is_add=1)
+ sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
+ )
self.vapi.nat44_ei_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1
+ )
# add static mapping for servers
self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
@@ -4109,17 +4402,23 @@ class TestNAT44EIMW(MethodHolder):
# host to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=host.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=host.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4147,17 +4446,23 @@ class TestNAT44EIMW(MethodHolder):
# server1 to host
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=self.nat_addr) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=self.nat_addr)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4182,17 +4487,23 @@ class TestNAT44EIMW(MethodHolder):
# server2 to server1
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- UDP(sport=self.udp_port_in, dport=server_udp_port))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / UDP(sport=self.udp_port_in, dport=server_udp_port)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server2.ip4, dst=server1_nat_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server2.ip4, dst=server1_nat_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4220,17 +4531,23 @@ class TestNAT44EIMW(MethodHolder):
# server1 to server2
pkts = []
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- UDP(sport=server_udp_port, dport=self.udp_port_out))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / UDP(sport=server_udp_port, dport=self.udp_port_out)
+ )
pkts.append(p)
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=server1.ip4, dst=server2_nat_ip) /
- ICMP(id=self.icmp_id_out, type='echo-reply'))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=server1.ip4, dst=server2_nat_ip)
+ / ICMP(id=self.icmp_id_out, type="echo-reply")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -4254,5 +4571,5 @@ class TestNAT44EIMW(MethodHolder):
raise
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)