aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_det44.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_det44.py')
-rw-r--r--test/test_det44.py419
1 files changed, 238 insertions, 181 deletions
diff --git a/test/test_det44.py b/test/test_det44.py
index ced77468959..ede80981349 100644
--- a/test/test_det44.py
+++ b/test/test_det44.py
@@ -5,7 +5,8 @@ import struct
import unittest
import scapy.compat
from time import sleep
-from framework import VppTestCase, running_extended_tests
+from config import config
+from framework import VppTestCase
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, UDPerror
@@ -14,7 +15,7 @@ from util import ppp
class TestDET44(VppTestCase):
- """ Deterministic NAT Test Cases """
+ """Deterministic NAT Test Cases"""
@classmethod
def setUpClass(cls):
@@ -26,7 +27,7 @@ class TestDET44(VppTestCase):
cls.udp_port_in = 6304
cls.udp_external_port = 6304
cls.icmp_id_in = 6305
- cls.nat_addr = '10.0.0.3'
+ cls.nat_addr = "10.0.0.3"
cls.create_pg_interfaces(range(3))
cls.interfaces = list(cls.pg_interfaces)
@@ -78,8 +79,9 @@ class TestDET44(VppTestCase):
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
fired = True
- self.logger.error(ppp("Unexpected or invalid packet "
- "(inside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (inside network):", packet)
+ )
if fired:
raise
@@ -96,9 +98,9 @@ class TestDET44(VppTestCase):
# natEvent
self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
- self.assertEqual(struct.pack("I", 3), record[466])
+ self.assertEqual(struct.pack("!I", 3), record[466])
# maxEntriesPerUser
- self.assertEqual(struct.pack("I", limit), record[473])
+ self.assertEqual(struct.pack("!I", limit), record[473])
# sourceIPv4Address
self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
@@ -111,10 +113,11 @@ class TestDET44(VppTestCase):
"""
# SYN packet in->out
- p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="S"))
+ p = (
+ Ether(src=in_if.remote_mac, dst=in_if.local_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")
+ )
in_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -123,20 +126,22 @@ class TestDET44(VppTestCase):
self.tcp_port_out = p[TCP].sport
# SYN + ACK packet out->in
- p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
- IP(src=out_if.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="SA"))
+ p = (
+ Ether(src=out_if.remote_mac, dst=out_if.local_mac)
+ / IP(src=out_if.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")
+ )
out_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
in_if.get_capture(1)
# ACK packet in->out
- p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="A"))
+ p = (
+ Ether(src=in_if.remote_mac, dst=in_if.local_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
+ )
in_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -152,21 +157,27 @@ class TestDET44(VppTestCase):
"""
pkts = []
# TCP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)
+ )
pkts.append(p)
# UDP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- UDP(sport=self.udp_port_in, dport=self.udp_external_port))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
+ / UDP(sport=self.udp_port_in, dport=self.udp_external_port)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- ICMP(id=self.icmp_id_in, type='echo-request'))
+ p = (
+ Ether(dst=in_if.local_mac, src=in_if.remote_mac)
+ / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl)
+ / ICMP(id=self.icmp_id_in, type="echo-request")
+ )
pkts.append(p)
return pkts
@@ -183,21 +194,27 @@ class TestDET44(VppTestCase):
dst_ip = self.nat_addr
pkts = []
# TCP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)
+ )
pkts.append(p)
# UDP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_port_out, sport=self.udp_external_port))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / UDP(dport=self.udp_port_out, sport=self.udp_external_port)
+ )
pkts.append(p)
# ICMP
- p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- ICMP(id=self.icmp_external_id, type='echo-reply'))
+ p = (
+ Ether(dst=out_if.local_mac, src=out_if.remote_mac)
+ / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
+ / ICMP(id=self.icmp_external_id, type="echo-reply")
+ )
pkts.append(p)
return pkts
@@ -222,21 +239,26 @@ class TestDET44(VppTestCase):
else:
self.icmp_external_id = packet[ICMP].id
except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
+ self.logger.error(
+ ppp("Unexpected or invalid packet (outside network):", packet)
+ )
raise
def test_deterministic_mode(self):
- """ NAT plugin run deterministic mode """
- in_addr = '172.16.255.0'
- out_addr = '172.17.255.50'
- in_addr_t = '172.16.255.20'
+ """NAT plugin run deterministic mode"""
+ in_addr = "172.16.255.0"
+ out_addr = "172.17.255.50"
+ in_addr_t = "172.16.255.20"
in_plen = 24
out_plen = 32
- self.vapi.det44_add_del_map(is_add=1, in_addr=in_addr,
- in_plen=in_plen, out_addr=out_addr,
- out_plen=out_plen)
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=in_addr,
+ in_plen=in_plen,
+ out_addr=out_addr,
+ out_plen=out_plen,
+ )
rep1 = self.vapi.det44_forward(in_addr_t)
self.assertEqual(str(rep1.out_addr), out_addr)
@@ -253,40 +275,46 @@ class TestDET44(VppTestCase):
self.assertEqual(out_plen, dsm.out_plen)
def test_set_timeouts(self):
- """ Set deterministic NAT timeouts """
+ """Set deterministic NAT timeouts"""
timeouts_before = self.vapi.det44_get_timeouts()
self.vapi.det44_set_timeouts(
udp=timeouts_before.udp + 10,
tcp_established=timeouts_before.tcp_established + 10,
tcp_transitory=timeouts_before.tcp_transitory + 10,
- icmp=timeouts_before.icmp + 10)
+ icmp=timeouts_before.icmp + 10,
+ )
timeouts_after = self.vapi.det44_get_timeouts()
self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
- self.assertNotEqual(timeouts_before.tcp_established,
- timeouts_after.tcp_established)
- self.assertNotEqual(timeouts_before.tcp_transitory,
- timeouts_after.tcp_transitory)
+ self.assertNotEqual(
+ timeouts_before.tcp_established, timeouts_after.tcp_established
+ )
+ self.assertNotEqual(
+ timeouts_before.tcp_transitory, timeouts_after.tcp_transitory
+ )
def test_in(self):
- """ DET44 translation test (TCP, UDP, ICMP) """
+ """DET44 translation test (TCP, UDP, ICMP)"""
nat_ip = "10.0.0.10"
- self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
- in_plen=32,
- out_addr=socket.inet_aton(nat_ip),
- out_plen=32)
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=self.pg0.remote_ip4,
+ in_plen=32,
+ out_addr=socket.inet_aton(nat_ip),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
# in2out
pkts = self.create_stream_in(self.pg0, self.pg1)
@@ -329,7 +357,7 @@ class TestDET44(VppTestCase):
self.assertEqual(s.out_port, self.icmp_external_id)
def test_multiple_users(self):
- """ Deterministic NAT multiple users """
+ """Deterministic NAT multiple users"""
nat_ip = "10.0.0.10"
port_in = 80
@@ -338,20 +366,26 @@ class TestDET44(VppTestCase):
host0 = self.pg0.remote_hosts[0]
host1 = self.pg0.remote_hosts[1]
- self.vapi.det44_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24,
- out_addr=socket.inet_aton(nat_ip),
- out_plen=32)
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=host0.ip4,
+ in_plen=24,
+ out_addr=socket.inet_aton(nat_ip),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
# host0 to out
- p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
- IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=port_in, dport=external_port))
+ p = (
+ Ether(src=host0.mac, dst=self.pg0.local_mac)
+ / IP(src=host0.ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=port_in, dport=external_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -369,9 +403,11 @@ class TestDET44(VppTestCase):
raise
# host1 to out
- p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
- IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=port_in, dport=external_port))
+ p = (
+ Ether(src=host1.mac, dst=self.pg0.local_mac)
+ / IP(src=host1.ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=port_in, dport=external_port)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -393,9 +429,11 @@ class TestDET44(VppTestCase):
self.assertEqual(2, dms[0].ses_num)
# out to host0
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- TCP(sport=external_port, dport=port_out0))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=nat_ip)
+ / TCP(sport=external_port, dport=port_out0)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -413,9 +451,11 @@ class TestDET44(VppTestCase):
raise
# out to host1
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- TCP(sport=external_port, dport=port_out1))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=nat_ip)
+ / TCP(sport=external_port, dport=port_out1)
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -433,42 +473,44 @@ class TestDET44(VppTestCase):
raise
# session close api test
- self.vapi.det44_close_session_out(socket.inet_aton(nat_ip),
- port_out1,
- self.pg1.remote_ip4,
- external_port)
+ self.vapi.det44_close_session_out(
+ socket.inet_aton(nat_ip), port_out1, self.pg1.remote_ip4, external_port
+ )
dms = self.vapi.det44_map_dump()
self.assertEqual(dms[0].ses_num, 1)
- self.vapi.det44_close_session_in(host0.ip4,
- port_in,
- self.pg1.remote_ip4,
- external_port)
+ self.vapi.det44_close_session_in(
+ host0.ip4, port_in, self.pg1.remote_ip4, external_port
+ )
dms = self.vapi.det44_map_dump()
self.assertEqual(dms[0].ses_num, 0)
def test_tcp_session_close_detection_in(self):
- """ DET44 TCP session close from inside network """
- self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
- in_plen=32,
- out_addr=socket.inet_aton(self.nat_addr),
- out_plen=32)
+ """DET44 TCP session close from inside network"""
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=self.pg0.remote_ip4,
+ in_plen=32,
+ out_addr=socket.inet_aton(self.nat_addr),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
self.initiate_tcp_session(self.pg0, self.pg1)
# close the session from inside
try:
# FIN packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="F"))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -477,17 +519,19 @@ class TestDET44(VppTestCase):
pkts = []
# ACK packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="A"))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
+ )
pkts.append(p)
# FIN packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="F"))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
+ )
pkts.append(p)
self.pg1.add_stream(pkts)
@@ -496,10 +540,11 @@ class TestDET44(VppTestCase):
self.pg0.get_capture(2)
# ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="A"))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -513,27 +558,31 @@ class TestDET44(VppTestCase):
raise
def test_tcp_session_close_detection_out(self):
- """ Deterministic NAT TCP session close from outside network """
- self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
- in_plen=32,
- out_addr=socket.inet_aton(self.nat_addr),
- out_plen=32)
+ """Deterministic NAT TCP session close from outside network"""
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=self.pg0.remote_ip4,
+ in_plen=32,
+ out_addr=socket.inet_aton(self.nat_addr),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
self.initiate_tcp_session(self.pg0, self.pg1)
# close the session from outside
try:
# FIN packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="F"))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -542,17 +591,19 @@ class TestDET44(VppTestCase):
pkts = []
# ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="A"))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")
+ )
pkts.append(p)
# ACK packet in -> out
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
- flags="F"))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
@@ -561,10 +612,11 @@ class TestDET44(VppTestCase):
self.pg1.get_capture(2)
# ACK packet out -> in
- p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
- IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
- TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
- flags="A"))
+ p = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
+ / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")
+ )
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -577,59 +629,66 @@ class TestDET44(VppTestCase):
self.logger.error("TCP session termination failed")
raise
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_session_timeout(self):
- """ Deterministic NAT session timeouts """
- self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
- in_plen=32,
- out_addr=socket.inet_aton(self.nat_addr),
- out_plen=32)
+ """Deterministic NAT session timeouts"""
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=self.pg0.remote_ip4,
+ in_plen=32,
+ out_addr=socket.inet_aton(self.nat_addr),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
self.initiate_tcp_session(self.pg0, self.pg1)
- self.vapi.det44_set_timeouts(udp=5, tcp_established=5,
- tcp_transitory=5, icmp=5)
+ self.vapi.det44_set_timeouts(udp=5, tcp_established=5, tcp_transitory=5, icmp=5)
pkts = self.create_stream_in(self.pg0, self.pg1)
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.pg1.get_capture(len(pkts))
- sleep(15)
+ self.virtual_sleep(15)
dms = self.vapi.det44_map_dump()
self.assertEqual(0, dms[0].ses_num)
# TODO: ipfix needs to be separated from NAT base plugin
- @unittest.skipUnless(running_extended_tests, "part of extended tests")
+ @unittest.skipUnless(config.extended, "part of extended tests")
def test_session_limit_per_user(self):
- """ Deterministic NAT maximum sessions per user limit """
- self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
- in_plen=32,
- out_addr=socket.inet_aton(self.nat_addr),
- out_plen=32)
+ """Deterministic NAT maximum sessions per user limit"""
+ self.vapi.det44_add_del_map(
+ is_add=1,
+ in_addr=self.pg0.remote_ip4,
+ in_plen=32,
+ out_addr=socket.inet_aton(self.nat_addr),
+ out_plen=32,
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg0.sw_if_index,
- is_add=1, is_inside=1)
+ sw_if_index=self.pg0.sw_if_index, is_add=1, is_inside=1
+ )
self.vapi.det44_interface_add_del_feature(
- sw_if_index=self.pg1.sw_if_index,
- is_add=1, is_inside=0)
- self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
- src_address=self.pg2.local_ip4,
- path_mtu=512,
- template_interval=10)
- self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739,
- enable=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=1, is_inside=0
+ )
+ self.vapi.set_ipfix_exporter(
+ collector_address=self.pg2.remote_ip4,
+ src_address=self.pg2.local_ip4,
+ path_mtu=512,
+ template_interval=10,
+ )
+ self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=1)
pkts = []
for port in range(1025, 2025):
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=port, dport=port))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=port, dport=port)
+ )
pkts.append(p)
self.pg0.add_stream(pkts)
@@ -637,9 +696,11 @@ class TestDET44(VppTestCase):
self.pg_start()
self.pg1.get_capture(len(pkts))
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=3001, dport=3002))
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=3001, dport=3002)
+ )
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
@@ -663,8 +724,7 @@ class TestDET44(VppTestCase):
# verify IPFIX logging
self.vapi.ipfix_flush()
- sleep(1)
- capture = self.pg2.get_capture(2)
+ capture = self.pg2.get_capture(7)
ipfix = IPFIXDecoder()
# first load template
for p in capture:
@@ -675,8 +735,5 @@ class TestDET44(VppTestCase):
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- self.verify_ipfix_max_entries_per_user(data,
- 1000,
- self.pg0.remote_ip4)
- self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739,
- enable=0)
+ self.verify_ipfix_max_entries_per_user(data, 1000, self.pg0.remote_ip4)
+ self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739, enable=0)