summaryrefslogtreecommitdiffstats
path: root/test/test_reassembly.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_reassembly.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_reassembly.py')
-rw-r--r--test/test_reassembly.py1509
1 files changed, 920 insertions, 589 deletions
diff --git a/test/test_reassembly.py b/test/test_reassembly.py
index 4c7a7cd320b..cebe5837763 100644
--- a/test/test_reassembly.py
+++ b/test/test_reassembly.py
@@ -9,10 +9,18 @@ import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP, icmptypes
-from scapy.layers.inet6 import HBHOptUnknown, ICMPv6ParamProblem,\
- ICMPv6TimeExceeded, IPv6, IPv6ExtHdrFragment,\
- IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, PadN, ICMPv6EchoRequest,\
- ICMPv6EchoReply
+from scapy.layers.inet6 import (
+ HBHOptUnknown,
+ ICMPv6ParamProblem,
+ ICMPv6TimeExceeded,
+ IPv6,
+ IPv6ExtHdrFragment,
+ IPv6ExtHdrHopByHop,
+ IPv6ExtHdrDestOpt,
+ PadN,
+ ICMPv6EchoRequest,
+ ICMPv6EchoReply,
+)
from framework import VppTestCase, VppTestRunner
from util import ppp, ppc, fragment_rfc791, fragment_rfc8200
from vpp_gre_interface import VppGreInterface
@@ -25,7 +33,7 @@ test_packet_count = 35
class TestIPv4Reassembly(VppTestCase):
- """ IPv4 Reassembly """
+ """IPv4 Reassembly"""
@classmethod
def setUpClass(cls):
@@ -52,21 +60,29 @@ class TestIPv4Reassembly(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip4=True)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000)
+ sw_if_index=self.src_if.sw_if_index, enable_ip4=True
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ )
def tearDown(self):
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip4=False)
+ sw_if_index=self.src_if.sw_if_index, enable_ip4=False
+ )
super().tearDown()
def show_commands_at_teardown(self):
@@ -82,11 +98,14 @@ class TestIPv4Reassembly(VppTestCase):
for i in range(0, packet_count):
info = cls.create_packet_info(cls.src_if, cls.src_if)
payload = cls.info_to_payload(info)
- p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
- IP(id=info.index, src=cls.src_if.remote_ip4,
- dst=cls.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac)
+ / IP(
+ id=info.index, src=cls.src_if.remote_ip4, dst=cls.dst_if.remote_ip4
+ )
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = packet_sizes[(i // 2) % len(packet_sizes)]
cls.extend_packet(p, size, cls.padding)
info.data = p
@@ -101,20 +120,21 @@ class TestIPv4Reassembly(VppTestCase):
# p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
- fragments_200 = [
- x for f in fragments_400 for x in fragment_rfc791(f, 200)]
- cls.pkt_infos.append(
- (index, fragments_400, fragments_300, fragments_200))
- cls.fragments_400 = [
- x for (_, frags, _, _) in cls.pkt_infos for x in frags]
- cls.fragments_300 = [
- x for (_, _, frags, _) in cls.pkt_infos for x in frags]
- cls.fragments_200 = [
- x for (_, _, _, frags) in cls.pkt_infos for x in frags]
- cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
- "%s 300-byte fragments and %s 200-byte fragments" %
- (len(infos), len(cls.fragments_400),
- len(cls.fragments_300), len(cls.fragments_200)))
+ fragments_200 = [x for f in fragments_400 for x in fragment_rfc791(f, 200)]
+ cls.pkt_infos.append((index, fragments_400, fragments_300, fragments_200))
+ cls.fragments_400 = [x for (_, frags, _, _) in cls.pkt_infos for x in frags]
+ cls.fragments_300 = [x for (_, _, frags, _) in cls.pkt_infos for x in frags]
+ cls.fragments_200 = [x for (_, _, _, frags) in cls.pkt_infos for x in frags]
+ cls.logger.debug(
+ "Fragmented %s packets into %s 400-byte fragments, "
+ "%s 300-byte fragments and %s 200-byte fragments"
+ % (
+ len(infos),
+ len(cls.fragments_400),
+ len(cls.fragments_300),
+ len(cls.fragments_200),
+ )
+ )
def verify_capture(self, capture, dropped_packet_indexes=[]):
"""Verify captured packet stream.
@@ -132,7 +152,8 @@ class TestIPv4Reassembly(VppTestCase):
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
+ ppp("Packet received, but should be dropped:", packet),
+ )
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
@@ -148,11 +169,13 @@ class TestIPv4Reassembly(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
+ self.assertTrue(
+ index in seen or index in dropped_packet_indexes,
+ "Packet with packet_index %d not received" % index,
+ )
def test_reassembly(self):
- """ basic reassembly """
+ """basic reassembly"""
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_200)
@@ -172,7 +195,7 @@ class TestIPv4Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_verify_clear_trace_mid_reassembly(self):
- """ verify clear trace works mid-reassembly """
+ """verify clear trace works mid-reassembly"""
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_200[0:-1])
@@ -187,7 +210,7 @@ class TestIPv4Reassembly(VppTestCase):
self.verify_capture(packets)
def test_reversed(self):
- """ reverse order reassembly """
+ """reverse order reassembly"""
fragments = list(self.fragments_200)
fragments.reverse()
@@ -210,27 +233,33 @@ class TestIPv4Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_long_fragment_chain(self):
- """ long fragment chain """
+ """long fragment chain"""
- error_cnt_str = \
+ error_cnt_str = (
"/err/ip4-full-reassembly-feature/fragment chain too long (drop)"
+ )
error_cnt = self.statistics.get_err_counter(error_cnt_str)
- self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
- max_reassembly_length=3,
- expire_walk_interval_ms=50)
-
- p1 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1000, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(b"X" * 1000))
- p2 = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1001, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(b"X" * 1000))
+ self.vapi.ip_reassembly_set(
+ timeout_ms=100,
+ max_reassemblies=1000,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=50,
+ )
+
+ p1 = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1000, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(b"X" * 1000)
+ )
+ p2 = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1001, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(b"X" * 1000)
+ )
frags = fragment_rfc791(p1, 200) + fragment_rfc791(p2, 500)
self.pg_enable_capture()
@@ -241,19 +270,20 @@ class TestIPv4Reassembly(VppTestCase):
self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
def test_5737(self):
- """ fragment length + ip header size > 65535 """
+ """fragment length + ip header size > 65535"""
self.vapi.cli("clear errors")
- raw = b'''E\x00\x00\x88,\xf8\x1f\xfe@\x01\x98\x00\xc0\xa8\n-\xc0\xa8\n\
+ raw = b"""E\x00\x00\x88,\xf8\x1f\xfe@\x01\x98\x00\xc0\xa8\n-\xc0\xa8\n\
\x01\x08\x00\xf0J\xed\xcb\xf1\xf5Test-group: IPv4.IPv4.ipv4-message.\
-Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
- malformed_packet = (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(raw))
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1000, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(b"X" * 1000))
+Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737"""
+ malformed_packet = Ether(
+ dst=self.src_if.local_mac, src=self.src_if.remote_mac
+ ) / IP(raw)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1000, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(b"X" * 1000)
+ )
valid_fragments = fragment_rfc791(p, 400)
counter = "/err/ip4-full-reassembly-feature/malformed packets"
@@ -264,33 +294,50 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.dst_if.get_capture(1)
self.logger.debug(self.vapi.ppcli("show error"))
- self.assertEqual(self.statistics.get_err_counter(counter),
- error_counter + 1)
+ self.assertEqual(self.statistics.get_err_counter(counter), error_counter + 1)
def test_44924(self):
- """ compress tiny fragments """
- packets = [(Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=24339, flags="MF", frag=0, ttl=64,
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
- Raw(load='Test-group: IPv4')),
- (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=24339, flags="MF", frag=3, ttl=64,
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
- Raw(load='.IPv4.Fragmentation.vali')),
- (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=24339, frag=6, ttl=64,
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- ICMP(type="echo-request", code=0, id=0x1fe6, seq=0x2407) /
- Raw(load='d; Test-case: 44924'))
- ]
+ """compress tiny fragments"""
+ packets = [
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=24339,
+ flags="MF",
+ frag=0,
+ ttl=64,
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / ICMP(type="echo-request", code=0, id=0x1FE6, seq=0x2407)
+ / Raw(load="Test-group: IPv4")
+ ),
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=24339,
+ flags="MF",
+ frag=3,
+ ttl=64,
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / ICMP(type="echo-request", code=0, id=0x1FE6, seq=0x2407)
+ / Raw(load=".IPv4.Fragmentation.vali")
+ ),
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=24339,
+ frag=6,
+ ttl=64,
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / ICMP(type="echo-request", code=0, id=0x1FE6, seq=0x2407)
+ / Raw(load="d; Test-case: 44924")
+ ),
+ ]
self.pg_enable_capture()
self.src_if.add_stream(packets)
@@ -299,27 +346,42 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.dst_if.get_capture(1)
def test_frag_1(self):
- """ fragment of size 1 """
+ """fragment of size 1"""
self.vapi.cli("clear errors")
- malformed_packets = [(Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=7, len=21, flags="MF", frag=0, ttl=64,
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- ICMP(type="echo-request")),
- (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=7, len=21, frag=1, ttl=64,
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- Raw(load=b'\x08')),
- ]
-
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1000, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(b"X" * 1000))
+ malformed_packets = [
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=7,
+ len=21,
+ flags="MF",
+ frag=0,
+ ttl=64,
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / ICMP(type="echo-request")
+ ),
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=7,
+ len=21,
+ frag=1,
+ ttl=64,
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / Raw(load=b"\x08")
+ ),
+ ]
+
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1000, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(b"X" * 1000)
+ )
valid_fragments = fragment_rfc791(p, 400)
self.pg_enable_capture()
@@ -335,7 +397,7 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
# "/err/ip4-full-reassembly-feature/malformed packets", 1)
def test_random(self):
- """ random order reassembly """
+ """random order reassembly"""
fragments = list(self.fragments_200)
shuffle(fragments)
@@ -358,10 +420,11 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.src_if.assert_nothing_captured()
def test_duplicates(self):
- """ duplicate fragments """
+ """duplicate fragments"""
fragments = [
- x for (_, frags, _, _) in self.pkt_infos
+ x
+ for (_, frags, _, _) in self.pkt_infos
for x in frags
for _ in range(0, min(2, len(frags)))
]
@@ -375,7 +438,7 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.src_if.assert_nothing_captured()
def test_overlap1(self):
- """ overlapping fragments case #1 """
+ """overlapping fragments case #1"""
fragments = []
for _, _, frags_300, frags_200 in self.pkt_infos:
@@ -404,7 +467,7 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.src_if.assert_nothing_captured()
def test_overlap2(self):
- """ overlapping fragments case #2 """
+ """overlapping fragments case #2"""
fragments = []
for _, _, frags_300, frags_200 in self.pkt_infos:
@@ -439,94 +502,112 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
self.src_if.assert_nothing_captured()
def test_timeout_inline(self):
- """ timeout (inline) """
+ """timeout (inline)"""
dropped_packet_indexes = set(
index for (index, frags, _, _) in self.pkt_infos if len(frags) > 1
)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=3,
- expire_walk_interval_ms=10000)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=10000,
+ )
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_timeout_cleanup(self):
- """ timeout (cleanup) """
+ """timeout (cleanup)"""
# whole packets + fragmented packets sans last fragment
fragments = [
- x for (_, frags_400, _, _) in self.pkt_infos
- for x in frags_400[:-1 if len(frags_400) > 1 else None]
+ x
+ for (_, frags_400, _, _) in self.pkt_infos
+ for x in frags_400[: -1 if len(frags_400) > 1 else None]
]
# last fragments for fragmented packets
- fragments2 = [frags_400[-1]
- for (_, frags_400, _, _) in self.pkt_infos
- if len(frags_400) > 1]
+ fragments2 = [
+ frags_400[-1]
+ for (_, frags_400, _, _) in self.pkt_infos
+ if len(frags_400) > 1
+ ]
dropped_packet_indexes = set(
- index for (index, frags_400, _, _) in self.pkt_infos
- if len(frags_400) > 1)
+ index for (index, frags_400, _, _) in self.pkt_infos if len(frags_400) > 1
+ )
- self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=50)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=100,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=50,
+ )
self.pg_enable_capture()
self.src_if.add_stream(fragments)
self.pg_start()
- self.virtual_sleep(.25, "wait before sending rest of fragments")
+ self.virtual_sleep(0.25, "wait before sending rest of fragments")
self.src_if.add_stream(fragments2)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_disabled(self):
- """ reassembly disabled """
+ """reassembly disabled"""
dropped_packet_indexes = set(
- index for (index, frags_400, _, _) in self.pkt_infos
- if len(frags_400) > 1)
+ index for (index, frags_400, _, _) in self.pkt_infos if len(frags_400) > 1
+ )
- self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
- max_reassembly_length=3,
- expire_walk_interval_ms=10000)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000,
+ max_reassemblies=0,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=10000,
+ )
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_local_enable_disable(self):
- """ local reassembly enabled/disable """
+ """local reassembly enabled/disable"""
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip4=False)
+ sw_if_index=self.src_if.sw_if_index, enable_ip4=False
+ )
self.vapi.ip_local_reass_enable_disable(enable_ip4=True)
- p = (Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac) /
- IP(src=self.src_if.remote_ip4, dst=self.src_if.local_ip4) /
- ICMP(id=1234, type='echo-request') /
- Raw('x' * 1000))
+ p = (
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
+ / IP(src=self.src_if.remote_ip4, dst=self.src_if.local_ip4)
+ / ICMP(id=1234, type="echo-request")
+ / Raw("x" * 1000)
+ )
frags = fragment_rfc791(p, 400)
r = self.send_and_expect(self.src_if, frags, self.src_if, n_rx=1)[0]
self.assertEqual(1234, r[ICMP].id)
- self.assertEqual(icmptypes[r[ICMP].type], 'echo-reply')
+ self.assertEqual(icmptypes[r[ICMP].type], "echo-reply")
self.vapi.ip_local_reass_enable_disable()
self.send_and_assert_no_replies(self.src_if, frags)
@@ -534,7 +615,7 @@ Ethernet-Payload.IPv4-Packet.IPv4-Header.Fragment-Offset; Test-case: 5737'''
class TestIPv4SVReassembly(VppTestCase):
- """ IPv4 Shallow Virtual Reassembly """
+ """IPv4 Shallow Virtual Reassembly"""
@classmethod
def setUpClass(cls):
@@ -551,22 +632,28 @@ class TestIPv4SVReassembly(VppTestCase):
i.resolve_arp()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip4=True,
- type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL)
+ sw_if_index=self.src_if.sw_if_index,
+ enable_ip4=True,
+ type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
+ )
self.vapi.ip_reassembly_set(
- timeout_ms=0, max_reassemblies=1000,
+ timeout_ms=0,
+ max_reassemblies=1000,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- expire_walk_interval_ms=10)
- self.virtual_sleep(.25)
+ expire_walk_interval_ms=10,
+ )
+ self.virtual_sleep(0.25)
self.vapi.ip_reassembly_set(
- timeout_ms=1000000, max_reassemblies=1000,
+ timeout_ms=1000000,
+ max_reassemblies=1000,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- expire_walk_interval_ms=10000)
+ expire_walk_interval_ms=10000,
+ )
def tearDown(self):
super().tearDown()
@@ -574,7 +661,7 @@ class TestIPv4SVReassembly(VppTestCase):
self.logger.debug(self.vapi.ppcli("show buffers"))
def test_basic(self):
- """ basic reassembly """
+ """basic reassembly"""
payload_len = 1000
payload = ""
counter = 0
@@ -582,12 +669,13 @@ class TestIPv4SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc791(p, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc791(p, payload_len / 4)
# send fragment #2 - should be cached inside reassembly
self.pg_enable_capture()
@@ -623,7 +711,7 @@ class TestIPv4SVReassembly(VppTestCase):
self.assertEqual(sent[Raw].payload, recvd[Raw].payload)
def test_verify_clear_trace_mid_reassembly(self):
- """ verify clear trace works mid-reassembly """
+ """verify clear trace works mid-reassembly"""
payload_len = 1000
payload = ""
counter = 0
@@ -631,12 +719,13 @@ class TestIPv4SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc791(p, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc791(p, payload_len / 4)
self.pg_enable_capture()
self.src_if.add_stream(fragments[1])
@@ -659,7 +748,7 @@ class TestIPv4SVReassembly(VppTestCase):
self.dst_if.get_capture(len(fragments[2:]))
def test_timeout(self):
- """ reassembly timeout """
+ """reassembly timeout"""
payload_len = 1000
payload = ""
counter = 0
@@ -667,18 +756,21 @@ class TestIPv4SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(id=1, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc791(p, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=1, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc791(p, payload_len / 4)
self.vapi.ip_reassembly_set(
- timeout_ms=100, max_reassemblies=1000,
+ timeout_ms=100,
+ max_reassemblies=1000,
max_reassembly_length=1000,
expire_walk_interval_ms=50,
- type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL)
+ type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
+ )
# send fragments #2 and #1 - should be forwarded
self.pg_enable_capture()
@@ -694,7 +786,7 @@ class TestIPv4SVReassembly(VppTestCase):
self.assertEqual(sent[Raw].payload, recvd[Raw].payload)
# wait for cleanup
- self.virtual_sleep(.25, "wait before sending rest of fragments")
+ self.virtual_sleep(0.25, "wait before sending rest of fragments")
# send rest of fragments - shouldn't be forwarded
self.pg_enable_capture()
@@ -703,13 +795,15 @@ class TestIPv4SVReassembly(VppTestCase):
self.dst_if.assert_nothing_captured()
def test_lru(self):
- """ reassembly reuses LRU element """
+ """reassembly reuses LRU element"""
self.vapi.ip_reassembly_set(
- timeout_ms=1000000, max_reassemblies=1,
+ timeout_ms=1000000,
+ max_reassemblies=1,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- expire_walk_interval_ms=10000)
+ expire_walk_interval_ms=10000,
+ )
payload_len = 1000
payload = ""
@@ -720,15 +814,17 @@ class TestIPv4SVReassembly(VppTestCase):
packet_count = 10
- fragments = [f
- for i in range(packet_count)
- for p in (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=i, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- for f in fragment_rfc791(p, payload_len/4)]
+ fragments = [
+ f
+ for i in range(packet_count)
+ for p in (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(id=i, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ for f in fragment_rfc791(p, payload_len / 4)
+ ]
self.pg_enable_capture()
self.src_if.add_stream(fragments)
@@ -742,16 +838,20 @@ class TestIPv4SVReassembly(VppTestCase):
def send_mixed_and_verify_capture(self, traffic):
stream = []
for t in traffic:
- for c in range(t['count']):
+ for c in range(t["count"]):
stream.append(
- (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IP(id=self.counter,
- flags=t['flags'],
- src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw("abcdef")))
+ (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(
+ id=self.counter,
+ flags=t["flags"],
+ src=self.src_if.remote_ip4,
+ dst=self.dst_if.remote_ip4,
+ )
+ / UDP(sport=1234, dport=5678)
+ / Raw("abcdef")
+ )
+ )
self.counter = self.counter + 1
self.pg_enable_capture()
@@ -763,58 +863,89 @@ class TestIPv4SVReassembly(VppTestCase):
self.dst_if.get_capture(len(stream))
def test_mixed(self):
- """ mixed traffic correctly passes through SVR """
+ """mixed traffic correctly passes through SVR"""
self.counter = 1
- self.send_mixed_and_verify_capture([{'count': 1, 'flags': ''}])
- self.send_mixed_and_verify_capture([{'count': 2, 'flags': ''}])
- self.send_mixed_and_verify_capture([{'count': 3, 'flags': ''}])
- self.send_mixed_and_verify_capture([{'count': 8, 'flags': ''}])
- self.send_mixed_and_verify_capture([{'count': 257, 'flags': ''}])
+ self.send_mixed_and_verify_capture([{"count": 1, "flags": ""}])
+ self.send_mixed_and_verify_capture([{"count": 2, "flags": ""}])
+ self.send_mixed_and_verify_capture([{"count": 3, "flags": ""}])
+ self.send_mixed_and_verify_capture([{"count": 8, "flags": ""}])
+ self.send_mixed_and_verify_capture([{"count": 257, "flags": ""}])
- self.send_mixed_and_verify_capture([{'count': 1, 'flags': 'MF'}])
- self.send_mixed_and_verify_capture([{'count': 2, 'flags': 'MF'}])
- self.send_mixed_and_verify_capture([{'count': 3, 'flags': 'MF'}])
- self.send_mixed_and_verify_capture([{'count': 8, 'flags': 'MF'}])
- self.send_mixed_and_verify_capture([{'count': 257, 'flags': 'MF'}])
+ self.send_mixed_and_verify_capture([{"count": 1, "flags": "MF"}])
+ self.send_mixed_and_verify_capture([{"count": 2, "flags": "MF"}])
+ self.send_mixed_and_verify_capture([{"count": 3, "flags": "MF"}])
+ self.send_mixed_and_verify_capture([{"count": 8, "flags": "MF"}])
+ self.send_mixed_and_verify_capture([{"count": 257, "flags": "MF"}])
self.send_mixed_and_verify_capture(
- [{'count': 1, 'flags': ''}, {'count': 1, 'flags': 'MF'}])
+ [{"count": 1, "flags": ""}, {"count": 1, "flags": "MF"}]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 2, 'flags': ''}, {'count': 2, 'flags': 'MF'}])
+ [{"count": 2, "flags": ""}, {"count": 2, "flags": "MF"}]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 3, 'flags': ''}, {'count': 3, 'flags': 'MF'}])
+ [{"count": 3, "flags": ""}, {"count": 3, "flags": "MF"}]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 8, 'flags': ''}, {'count': 8, 'flags': 'MF'}])
+ [{"count": 8, "flags": ""}, {"count": 8, "flags": "MF"}]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 129, 'flags': ''}, {'count': 129, 'flags': 'MF'}])
+ [{"count": 129, "flags": ""}, {"count": 129, "flags": "MF"}]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 1, 'flags': ''}, {'count': 1, 'flags': 'MF'},
- {'count': 1, 'flags': ''}, {'count': 1, 'flags': 'MF'}])
+ [
+ {"count": 1, "flags": ""},
+ {"count": 1, "flags": "MF"},
+ {"count": 1, "flags": ""},
+ {"count": 1, "flags": "MF"},
+ ]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 2, 'flags': ''}, {'count': 2, 'flags': 'MF'},
- {'count': 2, 'flags': ''}, {'count': 2, 'flags': 'MF'}])
+ [
+ {"count": 2, "flags": ""},
+ {"count": 2, "flags": "MF"},
+ {"count": 2, "flags": ""},
+ {"count": 2, "flags": "MF"},
+ ]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 3, 'flags': ''}, {'count': 3, 'flags': 'MF'},
- {'count': 3, 'flags': ''}, {'count': 3, 'flags': 'MF'}])
+ [
+ {"count": 3, "flags": ""},
+ {"count": 3, "flags": "MF"},
+ {"count": 3, "flags": ""},
+ {"count": 3, "flags": "MF"},
+ ]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 8, 'flags': ''}, {'count': 8, 'flags': 'MF'},
- {'count': 8, 'flags': ''}, {'count': 8, 'flags': 'MF'}])
+ [
+ {"count": 8, "flags": ""},
+ {"count": 8, "flags": "MF"},
+ {"count": 8, "flags": ""},
+ {"count": 8, "flags": "MF"},
+ ]
+ )
self.send_mixed_and_verify_capture(
- [{'count': 65, 'flags': ''}, {'count': 65, 'flags': 'MF'},
- {'count': 65, 'flags': ''}, {'count': 65, 'flags': 'MF'}])
+ [
+ {"count": 65, "flags": ""},
+ {"count": 65, "flags": "MF"},
+ {"count": 65, "flags": ""},
+ {"count": 65, "flags": "MF"},
+ ]
+ )
class TestIPv4MWReassembly(VppTestCase):
- """ IPv4 Reassembly (multiple workers) """
+ """IPv4 Reassembly (multiple workers)"""
+
vpp_worker_count = 3
@classmethod
def setUpClass(cls):
super().setUpClass()
- cls.create_pg_interfaces(range(cls.vpp_worker_count+1))
+ cls.create_pg_interfaces(range(cls.vpp_worker_count + 1))
cls.src_if = cls.pg0
cls.send_ifs = cls.pg_interfaces[:-1]
cls.dst_if = cls.pg_interfaces[-1]
@@ -828,8 +959,12 @@ class TestIPv4MWReassembly(VppTestCase):
# packets sizes reduced here because we are generating packets without
# Ethernet headers, which are added later (diff fragments go via
# different interfaces)
- cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
- 1518-len(Ether()), 9018-len(Ether())]
+ cls.packet_sizes = [
+ 64 - len(Ether()),
+ 512 - len(Ether()),
+ 1518 - len(Ether()),
+ 9018 - len(Ether()),
+ ]
cls.padding = " abcdefghijklmn"
cls.create_stream(cls.packet_sizes)
cls.create_fragments()
@@ -839,23 +974,31 @@ class TestIPv4MWReassembly(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
for intf in self.send_ifs:
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=intf.sw_if_index, enable_ip4=True)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000)
+ sw_if_index=intf.sw_if_index, enable_ip4=True
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ )
def tearDown(self):
for intf in self.send_ifs:
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=intf.sw_if_index, enable_ip4=False)
+ sw_if_index=intf.sw_if_index, enable_ip4=False
+ )
super().tearDown()
def show_commands_at_teardown(self):
@@ -871,10 +1014,11 @@ class TestIPv4MWReassembly(VppTestCase):
for i in range(0, packet_count):
info = cls.create_packet_info(cls.src_if, cls.src_if)
payload = cls.info_to_payload(info)
- p = (IP(id=info.index, src=cls.src_if.remote_ip4,
- dst=cls.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ IP(id=info.index, src=cls.src_if.remote_ip4, dst=cls.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = packet_sizes[(i // 2) % len(packet_sizes)]
cls.extend_packet(p, size, cls.padding)
info.data = p
@@ -889,10 +1033,11 @@ class TestIPv4MWReassembly(VppTestCase):
# p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
cls.pkt_infos.append((index, fragments_400))
- cls.fragments_400 = [
- x for (_, frags) in cls.pkt_infos for x in frags]
- cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
- (len(infos), len(cls.fragments_400)))
+ cls.fragments_400 = [x for (_, frags) in cls.pkt_infos for x in frags]
+ cls.logger.debug(
+ "Fragmented %s packets into %s 400-byte fragments, "
+ % (len(infos), len(cls.fragments_400))
+ )
def verify_capture(self, capture, dropped_packet_indexes=[]):
"""Verify captured packet stream.
@@ -910,7 +1055,8 @@ class TestIPv4MWReassembly(VppTestCase):
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
+ ppp("Packet received, but should be dropped:", packet),
+ )
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
@@ -926,8 +1072,10 @@ class TestIPv4MWReassembly(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
+ self.assertTrue(
+ index in seen or index in dropped_packet_indexes,
+ "Packet with packet_index %d not received" % index,
+ )
def send_packets(self, packets):
for counter in range(self.vpp_worker_count):
@@ -935,13 +1083,16 @@ class TestIPv4MWReassembly(VppTestCase):
continue
send_if = self.send_ifs[counter]
send_if.add_stream(
- (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
- for x in packets[counter]),
- worker=counter)
+ (
+ Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
+ for x in packets[counter]
+ ),
+ worker=counter,
+ )
self.pg_start()
def test_worker_conflict(self):
- """ 1st and FO=0 fragments on different workers """
+ """1st and FO=0 fragments on different workers"""
# in first wave we send fragments which don't start at offset 0
# then we send fragments with offset 0 on a different thread
@@ -988,7 +1139,7 @@ class TestIPv4MWReassembly(VppTestCase):
class TestIPv6Reassembly(VppTestCase):
- """ IPv6 Reassembly """
+ """IPv6 Reassembly"""
@classmethod
def setUpClass(cls):
@@ -1015,23 +1166,33 @@ class TestIPv6Reassembly(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip6=True)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10, is_ip6=1)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000, is_ip6=1)
+ sw_if_index=self.src_if.sw_if_index, enable_ip6=True
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ is_ip6=1,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ is_ip6=1,
+ )
self.logger.debug(self.vapi.ppcli("show ip6-full-reassembly details"))
self.logger.debug(self.vapi.ppcli("show buffers"))
def tearDown(self):
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip6=False)
+ sw_if_index=self.src_if.sw_if_index, enable_ip6=False
+ )
super().tearDown()
def show_commands_at_teardown(self):
@@ -1047,11 +1208,12 @@ class TestIPv6Reassembly(VppTestCase):
for i in range(0, packet_count):
info = cls.create_packet_info(cls.src_if, cls.src_if)
payload = cls.info_to_payload(info)
- p = (Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac) /
- IPv6(src=cls.src_if.remote_ip6,
- dst=cls.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ Ether(dst=cls.src_if.local_mac, src=cls.src_if.remote_mac)
+ / IPv6(src=cls.src_if.remote_ip6, dst=cls.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = packet_sizes[(i // 2) % len(packet_sizes)]
cls.extend_packet(p, size, cls.padding)
info.data = p
@@ -1067,14 +1229,13 @@ class TestIPv6Reassembly(VppTestCase):
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
- cls.fragments_400 = [
- x for _, frags, _ in cls.pkt_infos for x in frags]
- cls.fragments_300 = [
- x for _, _, frags in cls.pkt_infos for x in frags]
- cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, "
- "and %s 300-byte fragments" %
- (len(infos), len(cls.fragments_400),
- len(cls.fragments_300)))
+ cls.fragments_400 = [x for _, frags, _ in cls.pkt_infos for x in frags]
+ cls.fragments_300 = [x for _, _, frags in cls.pkt_infos for x in frags]
+ cls.logger.debug(
+ "Fragmented %s packets into %s 400-byte fragments, "
+ "and %s 300-byte fragments"
+ % (len(infos), len(cls.fragments_400), len(cls.fragments_300))
+ )
def verify_capture(self, capture, dropped_packet_indexes=[]):
"""Verify captured packet strea .
@@ -1092,7 +1253,8 @@ class TestIPv6Reassembly(VppTestCase):
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
+ ppp("Packet received, but should be dropped:", packet),
+ )
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
@@ -1108,11 +1270,13 @@ class TestIPv6Reassembly(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
+ self.assertTrue(
+ index in seen or index in dropped_packet_indexes,
+ "Packet with packet_index %d not received" % index,
+ )
def test_reassembly(self):
- """ basic reassembly """
+ """basic reassembly"""
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400)
@@ -1132,16 +1296,16 @@ class TestIPv6Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_buffer_boundary(self):
- """ fragment header crossing buffer boundary """
-
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.src_if.local_ip6) /
- IPv6ExtHdrHopByHop(
- options=[HBHOptUnknown(otype=0xff, optlen=0)] * 1000) /
- IPv6ExtHdrFragment(m=1) /
- UDP(sport=1234, dport=5678) /
- Raw())
+ """fragment header crossing buffer boundary"""
+
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / IPv6ExtHdrHopByHop(options=[HBHOptUnknown(otype=0xFF, optlen=0)] * 1000)
+ / IPv6ExtHdrFragment(m=1)
+ / UDP(sport=1234, dport=5678)
+ / Raw()
+ )
self.pg_enable_capture()
self.src_if.add_stream([p])
self.pg_start()
@@ -1149,7 +1313,7 @@ class TestIPv6Reassembly(VppTestCase):
self.dst_if.assert_nothing_captured()
def test_verify_clear_trace_mid_reassembly(self):
- """ verify clear trace works mid-reassembly """
+ """verify clear trace works mid-reassembly"""
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400[0:-1])
@@ -1164,7 +1328,7 @@ class TestIPv6Reassembly(VppTestCase):
self.verify_capture(packets)
def test_reversed(self):
- """ reverse order reassembly """
+ """reverse order reassembly"""
fragments = list(self.fragments_400)
fragments.reverse()
@@ -1187,7 +1351,7 @@ class TestIPv6Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_random(self):
- """ random order reassembly """
+ """random order reassembly"""
fragments = list(self.fragments_400)
shuffle(fragments)
@@ -1210,10 +1374,11 @@ class TestIPv6Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_duplicates(self):
- """ duplicate fragments """
+ """duplicate fragments"""
fragments = [
- x for (_, frags, _) in self.pkt_infos
+ x
+ for (_, frags, _) in self.pkt_infos
for x in frags
for _ in range(0, min(2, len(frags)))
]
@@ -1227,22 +1392,28 @@ class TestIPv6Reassembly(VppTestCase):
self.src_if.assert_nothing_captured()
def test_long_fragment_chain(self):
- """ long fragment chain """
+ """long fragment chain"""
- error_cnt_str = \
+ error_cnt_str = (
"/err/ip6-full-reassembly-feature/fragment chain too long (drop)"
+ )
error_cnt = self.statistics.get_err_counter(error_cnt_str)
- self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
- max_reassembly_length=3,
- expire_walk_interval_ms=50, is_ip6=1)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=100,
+ max_reassemblies=1000,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=50,
+ is_ip6=1,
+ )
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(b"X" * 1000))
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(b"X" * 1000)
+ )
frags = fragment_rfc8200(p, 1, 300) + fragment_rfc8200(p, 2, 500)
self.pg_enable_capture()
@@ -1253,7 +1424,7 @@ class TestIPv6Reassembly(VppTestCase):
self.assert_error_counter_equal(error_cnt_str, error_cnt + 1)
def test_overlap1(self):
- """ overlapping fragments case #1 """
+ """overlapping fragments case #1"""
fragments = []
for _, frags_400, frags_300 in self.pkt_infos:
@@ -1273,12 +1444,13 @@ class TestIPv6Reassembly(VppTestCase):
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_overlap2(self):
- """ overlapping fragments case #2 """
+ """overlapping fragments case #2"""
fragments = []
for _, frags_400, frags_300 in self.pkt_infos:
@@ -1304,27 +1476,33 @@ class TestIPv6Reassembly(VppTestCase):
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_timeout_inline(self):
- """ timeout (inline) """
+ """timeout (inline)"""
dropped_packet_indexes = set(
index for (index, frags, _) in self.pkt_infos if len(frags) > 1
)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=3,
- expire_walk_interval_ms=10000, is_ip6=1)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=10000,
+ is_ip6=1,
+ )
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
pkts = self.src_if._get_capture(1)
for icmp in pkts:
@@ -1334,42 +1512,51 @@ class TestIPv6Reassembly(VppTestCase):
dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
def test_timeout_cleanup(self):
- """ timeout (cleanup) """
+ """timeout (cleanup)"""
# whole packets + fragmented packets sans last fragment
fragments = [
- x for (_, frags_400, _) in self.pkt_infos
- for x in frags_400[:-1 if len(frags_400) > 1 else None]
+ x
+ for (_, frags_400, _) in self.pkt_infos
+ for x in frags_400[: -1 if len(frags_400) > 1 else None]
]
# last fragments for fragmented packets
- fragments2 = [frags_400[-1]
- for (_, frags_400, _) in self.pkt_infos
- if len(frags_400) > 1]
+ fragments2 = [
+ frags_400[-1] for (_, frags_400, _) in self.pkt_infos if len(frags_400) > 1
+ ]
dropped_packet_indexes = set(
- index for (index, frags_400, _) in self.pkt_infos
- if len(frags_400) > 1)
+ index for (index, frags_400, _) in self.pkt_infos if len(frags_400) > 1
+ )
- self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=50)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=100,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=50,
+ )
- self.vapi.ip_reassembly_set(timeout_ms=100, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=50, is_ip6=1)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=100,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=50,
+ is_ip6=1,
+ )
self.pg_enable_capture()
self.src_if.add_stream(fragments)
self.pg_start()
- self.virtual_sleep(.25, "wait before sending rest of fragments")
+ self.virtual_sleep(0.25, "wait before sending rest of fragments")
self.src_if.add_stream(fragments2)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
pkts = self.src_if._get_capture(1)
for icmp in pkts:
@@ -1379,34 +1566,41 @@ class TestIPv6Reassembly(VppTestCase):
dropped_packet_indexes.remove(icmp[IPv6ExtHdrFragment].id)
def test_disabled(self):
- """ reassembly disabled """
+ """reassembly disabled"""
dropped_packet_indexes = set(
- index for (index, frags_400, _) in self.pkt_infos
- if len(frags_400) > 1)
+ index for (index, frags_400, _) in self.pkt_infos if len(frags_400) > 1
+ )
- self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=0,
- max_reassembly_length=3,
- expire_walk_interval_ms=10000, is_ip6=1)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000,
+ max_reassemblies=0,
+ max_reassembly_length=3,
+ expire_walk_interval_ms=10000,
+ is_ip6=1,
+ )
self.pg_enable_capture()
self.src_if.add_stream(self.fragments_400)
self.pg_start()
packets = self.dst_if.get_capture(
- len(self.pkt_infos) - len(dropped_packet_indexes))
+ len(self.pkt_infos) - len(dropped_packet_indexes)
+ )
self.verify_capture(packets, dropped_packet_indexes)
self.src_if.assert_nothing_captured()
def test_missing_upper(self):
- """ missing upper layer """
- optdata = '\x00' * 100
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.src_if.local_ip6) /
- IPv6ExtHdrFragment(m=1) /
- IPv6ExtHdrDestOpt(nh=17, options=PadN(optdata='\101' * 255) /
- PadN(optdata='\102'*255)))
+ """missing upper layer"""
+ optdata = "\x00" * 100
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / IPv6ExtHdrFragment(m=1)
+ / IPv6ExtHdrDestOpt(
+ nh=17, options=PadN(optdata="\101" * 255) / PadN(optdata="\102" * 255)
+ )
+ )
self.pg_enable_capture()
self.src_if.add_stream([p])
@@ -1417,21 +1611,23 @@ class TestIPv6Reassembly(VppTestCase):
self.assert_equal(icmp[ICMPv6ParamProblem].code, 3, "ICMP code")
def test_truncated_fragment(self):
- """ truncated fragment """
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6,
- nh=44, plen=2) /
- IPv6ExtHdrFragment(nh=6))
+ """truncated fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2)
+ / IPv6ExtHdrFragment(nh=6)
+ )
self.send_and_assert_no_replies(self.pg0, [pkt], self.pg0)
def test_invalid_frag_size(self):
- """ fragment size not a multiple of 8 """
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.src_if.local_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw())
+ """fragment size not a multiple of 8"""
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw()
+ )
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
bad_fragment = fragments[0]
@@ -1445,12 +1641,13 @@ class TestIPv6Reassembly(VppTestCase):
self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
def test_invalid_packet_size(self):
- """ total packet size > 65535 """
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.src_if.local_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw())
+ """total packet size > 65535"""
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw()
+ )
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
bad_fragment = fragments[1]
@@ -1464,44 +1661,56 @@ class TestIPv6Reassembly(VppTestCase):
self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
def test_atomic_fragment(self):
- """ IPv6 atomic fragment """
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6,
- nh=44, plen=65535) /
- IPv6ExtHdrFragment(offset=8191, m=1, res1=0xFF, res2=0xFF,
- nh=255, id=0xffff)/('X'*1452))
+ """IPv6 atomic fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=65535)
+ / IPv6ExtHdrFragment(
+ offset=8191, m=1, res1=0xFF, res2=0xFF, nh=255, id=0xFFFF
+ )
+ / ("X" * 1452)
+ )
rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
self.assertIn(ICMPv6ParamProblem, rx[0])
def test_truncated_fragment(self):
- """ IPv6 truncated fragment header """
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6,
- nh=44, plen=2) /
- IPv6ExtHdrFragment(nh=6))
+ """IPv6 truncated fragment header"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2)
+ / IPv6ExtHdrFragment(nh=6)
+ )
self.send_and_assert_no_replies(self.pg0, [pkt])
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) /
- ICMPv6EchoRequest())
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
+ / ICMPv6EchoRequest()
+ )
rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
def test_one_fragment(self):
- """ whole packet in one fragment processed independently """
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- ICMPv6EchoRequest()/Raw('X' * 1600))
+ """whole packet in one fragment processed independently"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
frags = fragment_rfc8200(pkt, 1, 400)
# send a fragment with known id
self.send_and_assert_no_replies(self.pg0, [frags[0]])
# send an atomic fragment with same id - should be reassembled
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest())
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
self.assertNotIn(IPv6ExtHdrFragment, rx)
@@ -1510,33 +1719,46 @@ class TestIPv6Reassembly(VppTestCase):
self.assertNotIn(IPv6ExtHdrFragment, rx)
def test_bunch_of_fragments(self):
- """ valid fragments followed by rogue fragments and atomic fragment"""
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- ICMPv6EchoRequest()/Raw('X' * 1600))
+ """valid fragments followed by rogue fragments and atomic fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
frags = fragment_rfc8200(pkt, 1, 400)
self.send_and_expect(self.pg0, frags, self.pg0, n_rx=1)
- inc_frag = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- IPv6ExtHdrFragment(id=1, nh=58, offset=608)/Raw('X'*308))
+ inc_frag = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1, nh=58, offset=608)
+ / Raw("X" * 308)
+ )
- self.send_and_assert_no_replies(self.pg0, inc_frag*604)
+ self.send_and_assert_no_replies(self.pg0, inc_frag * 604)
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest())
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
rx = self.send_and_expect(self.pg0, [pkt], self.pg0)
self.assertNotIn(IPv6ExtHdrFragment, rx)
def test_local_enable_disable(self):
- """ local reassembly enabled/disable """
+ """local reassembly enabled/disable"""
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip6=False)
+ sw_if_index=self.src_if.sw_if_index, enable_ip6=False
+ )
self.vapi.ip_local_reass_enable_disable(enable_ip6=True)
- pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6) /
- ICMPv6EchoRequest(id=1234)/Raw('X' * 1600))
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
+ / ICMPv6EchoRequest(id=1234)
+ / Raw("X" * 1600)
+ )
frags = fragment_rfc8200(pkt, 1, 400)
r = self.send_and_expect(self.src_if, frags, self.src_if, n_rx=1)[0]
self.assertEqual(1234, r[ICMPv6EchoReply].id)
@@ -1547,14 +1769,15 @@ class TestIPv6Reassembly(VppTestCase):
class TestIPv6MWReassembly(VppTestCase):
- """ IPv6 Reassembly (multiple workers) """
+ """IPv6 Reassembly (multiple workers)"""
+
vpp_worker_count = 3
@classmethod
def setUpClass(cls):
super().setUpClass()
- cls.create_pg_interfaces(range(cls.vpp_worker_count+1))
+ cls.create_pg_interfaces(range(cls.vpp_worker_count + 1))
cls.src_if = cls.pg0
cls.send_ifs = cls.pg_interfaces[:-1]
cls.dst_if = cls.pg_interfaces[-1]
@@ -1568,8 +1791,12 @@ class TestIPv6MWReassembly(VppTestCase):
# packets sizes reduced here because we are generating packets without
# Ethernet headers, which are added later (diff fragments go via
# different interfaces)
- cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
- 1518-len(Ether()), 9018-len(Ether())]
+ cls.packet_sizes = [
+ 64 - len(Ether()),
+ 512 - len(Ether()),
+ 1518 - len(Ether()),
+ 9018 - len(Ether()),
+ ]
cls.padding = " abcdefghijklmn"
cls.create_stream(cls.packet_sizes)
cls.create_fragments()
@@ -1579,23 +1806,33 @@ class TestIPv6MWReassembly(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
for intf in self.send_ifs:
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=intf.sw_if_index, enable_ip6=True)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10, is_ip6=1)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=1000, is_ip6=1)
+ sw_if_index=intf.sw_if_index, enable_ip6=True
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ is_ip6=1,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=1000,
+ is_ip6=1,
+ )
def tearDown(self):
for intf in self.send_ifs:
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=intf.sw_if_index, enable_ip6=False)
+ sw_if_index=intf.sw_if_index, enable_ip6=False
+ )
super().tearDown()
def show_commands_at_teardown(self):
@@ -1611,10 +1848,11 @@ class TestIPv6MWReassembly(VppTestCase):
for i in range(0, packet_count):
info = cls.create_packet_info(cls.src_if, cls.src_if)
payload = cls.info_to_payload(info)
- p = (IPv6(src=cls.src_if.remote_ip6,
- dst=cls.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ IPv6(src=cls.src_if.remote_ip6, dst=cls.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = packet_sizes[(i // 2) % len(packet_sizes)]
cls.extend_packet(p, size, cls.padding)
info.data = p
@@ -1629,10 +1867,11 @@ class TestIPv6MWReassembly(VppTestCase):
# p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc8200(p, index, 400)
cls.pkt_infos.append((index, fragments_400))
- cls.fragments_400 = [
- x for (_, frags) in cls.pkt_infos for x in frags]
- cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
- (len(infos), len(cls.fragments_400)))
+ cls.fragments_400 = [x for (_, frags) in cls.pkt_infos for x in frags]
+ cls.logger.debug(
+ "Fragmented %s packets into %s 400-byte fragments, "
+ % (len(infos), len(cls.fragments_400))
+ )
def verify_capture(self, capture, dropped_packet_indexes=[]):
"""Verify captured packet strea .
@@ -1650,7 +1889,8 @@ class TestIPv6MWReassembly(VppTestCase):
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
+ ppp("Packet received, but should be dropped:", packet),
+ )
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
@@ -1666,8 +1906,10 @@ class TestIPv6MWReassembly(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
+ self.assertTrue(
+ index in seen or index in dropped_packet_indexes,
+ "Packet with packet_index %d not received" % index,
+ )
def send_packets(self, packets):
for counter in range(self.vpp_worker_count):
@@ -1675,13 +1917,16 @@ class TestIPv6MWReassembly(VppTestCase):
continue
send_if = self.send_ifs[counter]
send_if.add_stream(
- (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
- for x in packets[counter]),
- worker=counter)
+ (
+ Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
+ for x in packets[counter]
+ ),
+ worker=counter,
+ )
self.pg_start()
def test_worker_conflict(self):
- """ 1st and FO=0 fragments on different workers """
+ """1st and FO=0 fragments on different workers"""
# in first wave we send fragments which don't start at offset 0
# then we send fragments with offset 0 on a different thread
@@ -1728,7 +1973,7 @@ class TestIPv6MWReassembly(VppTestCase):
class TestIPv6SVReassembly(VppTestCase):
- """ IPv6 Shallow Virtual Reassembly """
+ """IPv6 Shallow Virtual Reassembly"""
@classmethod
def setUpClass(cls):
@@ -1745,22 +1990,30 @@ class TestIPv6SVReassembly(VppTestCase):
i.resolve_ndp()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip6=True,
- type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL)
+ sw_if_index=self.src_if.sw_if_index,
+ enable_ip6=True,
+ type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
+ )
self.vapi.ip_reassembly_set(
- timeout_ms=0, max_reassemblies=1000,
+ timeout_ms=0,
+ max_reassemblies=1000,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- expire_walk_interval_ms=10, is_ip6=1)
- self.virtual_sleep(.25)
+ expire_walk_interval_ms=10,
+ is_ip6=1,
+ )
+ self.virtual_sleep(0.25)
self.vapi.ip_reassembly_set(
- timeout_ms=1000000, max_reassemblies=1000,
+ timeout_ms=1000000,
+ max_reassemblies=1000,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- expire_walk_interval_ms=10000, is_ip6=1)
+ expire_walk_interval_ms=10000,
+ is_ip6=1,
+ )
def tearDown(self):
super().tearDown()
@@ -1768,7 +2021,7 @@ class TestIPv6SVReassembly(VppTestCase):
self.logger.debug(self.vapi.ppcli("show buffers"))
def test_basic(self):
- """ basic reassembly """
+ """basic reassembly"""
payload_len = 1000
payload = ""
counter = 0
@@ -1776,11 +2029,13 @@ class TestIPv6SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc8200(p, 1, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc8200(p, 1, payload_len / 4)
# send fragment #2 - should be cached inside reassembly
self.pg_enable_capture()
@@ -1816,7 +2071,7 @@ class TestIPv6SVReassembly(VppTestCase):
self.assertEqual(sent[Raw].payload, recvd[Raw].payload)
def test_verify_clear_trace_mid_reassembly(self):
- """ verify clear trace works mid-reassembly """
+ """verify clear trace works mid-reassembly"""
payload_len = 1000
payload = ""
counter = 0
@@ -1824,11 +2079,13 @@ class TestIPv6SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc8200(p, 1, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc8200(p, 1, payload_len / 4)
self.pg_enable_capture()
self.src_if.add_stream(fragments[1])
@@ -1851,7 +2108,7 @@ class TestIPv6SVReassembly(VppTestCase):
self.dst_if.get_capture(len(fragments[2:]))
def test_timeout(self):
- """ reassembly timeout """
+ """reassembly timeout"""
payload_len = 1000
payload = ""
counter = 0
@@ -1859,18 +2116,22 @@ class TestIPv6SVReassembly(VppTestCase):
payload += "%u " % counter
counter += 1
- p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- fragments = fragment_rfc8200(p, 1, payload_len/4)
+ p = (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ fragments = fragment_rfc8200(p, 1, payload_len / 4)
self.vapi.ip_reassembly_set(
- timeout_ms=100, max_reassemblies=1000,
+ timeout_ms=100,
+ max_reassemblies=1000,
max_reassembly_length=1000,
expire_walk_interval_ms=50,
is_ip6=1,
- type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL)
+ type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
+ )
# send fragments #2 and #1 - should be forwarded
self.pg_enable_capture()
@@ -1886,7 +2147,7 @@ class TestIPv6SVReassembly(VppTestCase):
self.assertEqual(sent[Raw].payload, recvd[Raw].payload)
# wait for cleanup
- self.virtual_sleep(.25, "wait before sending rest of fragments")
+ self.virtual_sleep(0.25, "wait before sending rest of fragments")
# send rest of fragments - shouldn't be forwarded
self.pg_enable_capture()
@@ -1895,13 +2156,16 @@ class TestIPv6SVReassembly(VppTestCase):
self.dst_if.assert_nothing_captured()
def test_lru(self):
- """ reassembly reuses LRU element """
+ """reassembly reuses LRU element"""
self.vapi.ip_reassembly_set(
- timeout_ms=1000000, max_reassemblies=1,
+ timeout_ms=1000000,
+ max_reassemblies=1,
max_reassembly_length=1000,
type=VppEnum.vl_api_ip_reass_type_t.IP_REASS_TYPE_SHALLOW_VIRTUAL,
- is_ip6=1, expire_walk_interval_ms=10000)
+ is_ip6=1,
+ expire_walk_interval_ms=10000,
+ )
payload_len = 1000
payload = ""
@@ -1912,15 +2176,17 @@ class TestIPv6SVReassembly(VppTestCase):
packet_count = 10
- fragments = [f
- for i in range(packet_count)
- for p in (Ether(dst=self.src_if.local_mac,
- src=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6,
- dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
- for f in fragment_rfc8200(p, i, payload_len/4)]
+ fragments = [
+ f
+ for i in range(packet_count)
+ for p in (
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
+ for f in fragment_rfc8200(p, i, payload_len / 4)
+ ]
self.pg_enable_capture()
self.src_if.add_stream(fragments)
@@ -1932,55 +2198,71 @@ class TestIPv6SVReassembly(VppTestCase):
self.assertEqual(sent[Raw].payload, recvd[Raw].payload)
def test_one_fragment(self):
- """ whole packet in one fragment processed independently """
- pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- ICMPv6EchoRequest()/Raw('X' * 1600))
+ """whole packet in one fragment processed independently"""
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
frags = fragment_rfc8200(pkt, 1, 400)
# send a fragment with known id
self.send_and_expect(self.src_if, [frags[0]], self.dst_if)
# send an atomic fragment with same id - should be reassembled
- pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest())
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
rx = self.send_and_expect(self.src_if, [pkt], self.dst_if)
# now forward packets matching original reassembly, should still work
rx = self.send_and_expect(self.src_if, frags[1:], self.dst_if)
def test_bunch_of_fragments(self):
- """ valid fragments followed by rogue fragments and atomic fragment"""
- pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- ICMPv6EchoRequest()/Raw('X' * 1600))
+ """valid fragments followed by rogue fragments and atomic fragment"""
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / ICMPv6EchoRequest()
+ / Raw("X" * 1600)
+ )
frags = fragment_rfc8200(pkt, 1, 400)
rx = self.send_and_expect(self.src_if, frags, self.dst_if)
- rogue = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- IPv6ExtHdrFragment(id=1, nh=58, offset=608)/Raw('X'*308))
+ rogue = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / IPv6ExtHdrFragment(id=1, nh=58, offset=608)
+ / Raw("X" * 308)
+ )
- self.send_and_expect(self.src_if, rogue*604, self.dst_if)
+ self.send_and_expect(self.src_if, rogue * 604, self.dst_if)
- pkt = (Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac) /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- IPv6ExtHdrFragment(id=1)/ICMPv6EchoRequest())
+ pkt = (
+ Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / IPv6ExtHdrFragment(id=1)
+ / ICMPv6EchoRequest()
+ )
rx = self.send_and_expect(self.src_if, [pkt], self.dst_if)
def test_truncated_fragment(self):
- """ truncated fragment """
- pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6,
- nh=44, plen=2) /
- IPv6ExtHdrFragment(nh=6))
+ """truncated fragment"""
+ pkt = (
+ Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2)
+ / IPv6ExtHdrFragment(nh=6)
+ )
self.send_and_assert_no_replies(self.pg0, [pkt], self.pg0)
class TestIPv4ReassemblyLocalNode(VppTestCase):
- """ IPv4 Reassembly for packets coming to ip4-local node """
+ """IPv4 Reassembly for packets coming to ip4-local node"""
@classmethod
def setUpClass(cls):
@@ -2004,15 +2286,21 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ )
def tearDown(self):
super().tearDown()
@@ -2030,12 +2318,16 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
for i in range(0, packet_count):
info = cls.create_packet_info(cls.src_dst_if, cls.src_dst_if)
payload = cls.info_to_payload(info)
- p = (Ether(dst=cls.src_dst_if.local_mac,
- src=cls.src_dst_if.remote_mac) /
- IP(id=info.index, src=cls.src_dst_if.remote_ip4,
- dst=cls.src_dst_if.local_ip4) /
- ICMP(type='echo-request', id=1234) /
- Raw(payload))
+ p = (
+ Ether(dst=cls.src_dst_if.local_mac, src=cls.src_dst_if.remote_mac)
+ / IP(
+ id=info.index,
+ src=cls.src_dst_if.remote_ip4,
+ dst=cls.src_dst_if.local_ip4,
+ )
+ / ICMP(type="echo-request", id=1234)
+ / Raw(payload)
+ )
cls.extend_packet(p, 1518, cls.padding)
info.data = p
@@ -2050,8 +2342,10 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
- cls.logger.debug("Fragmented %s packets into %s 300-byte fragments" %
- (len(infos), len(cls.fragments_300)))
+ cls.logger.debug(
+ "Fragmented %s packets into %s 300-byte fragments"
+ % (len(infos), len(cls.fragments_300))
+ )
def verify_capture(self, capture):
"""Verify captured packet stream.
@@ -2084,11 +2378,12 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertIn(index, seen,
- "Packet with packet_index %d not received" % index)
+ self.assertIn(
+ index, seen, "Packet with packet_index %d not received" % index
+ )
def test_reassembly(self):
- """ basic reassembly """
+ """basic reassembly"""
self.pg_enable_capture()
self.src_dst_if.add_stream(self.fragments_300)
@@ -2107,7 +2402,7 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
class TestFIFReassembly(VppTestCase):
- """ Fragments in fragments reassembly """
+ """Fragments in fragments reassembly"""
@classmethod
def setUpClass(cls):
@@ -2131,27 +2426,41 @@ class TestFIFReassembly(VppTestCase):
super().tearDownClass()
def setUp(self):
- """ Test setup - force timeout on existing reassemblies """
+ """Test setup - force timeout on existing reassemblies"""
super().setUp()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.src_if.sw_if_index, enable_ip4=True,
- enable_ip6=True)
+ sw_if_index=self.src_if.sw_if_index, enable_ip4=True, enable_ip6=True
+ )
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.dst_if.sw_if_index, enable_ip4=True,
- enable_ip6=True)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10)
- self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10, is_ip6=1)
- self.virtual_sleep(.25)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000)
- self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
- max_reassembly_length=1000,
- expire_walk_interval_ms=10000, is_ip6=1)
+ sw_if_index=self.dst_if.sw_if_index, enable_ip4=True, enable_ip6=True
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=0,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10,
+ is_ip6=1,
+ )
+ self.virtual_sleep(0.25)
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ )
+ self.vapi.ip_reassembly_set(
+ timeout_ms=1000000,
+ max_reassemblies=1000,
+ max_reassembly_length=1000,
+ expire_walk_interval_ms=10000,
+ is_ip6=1,
+ )
def tearDown(self):
super().tearDown()
@@ -2177,7 +2486,8 @@ class TestFIFReassembly(VppTestCase):
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
- ppp("Packet received, but should be dropped:", packet))
+ ppp("Packet received, but should be dropped:", packet),
+ )
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
seen.add(packet_index)
@@ -2193,11 +2503,13 @@ class TestFIFReassembly(VppTestCase):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for index in self._packet_infos:
- self.assertTrue(index in seen or index in dropped_packet_indexes,
- "Packet with packet_index %d not received" % index)
+ self.assertTrue(
+ index in seen or index in dropped_packet_indexes,
+ "Packet with packet_index %d not received" % index,
+ )
def test_fif4(self):
- """ Fragments in fragments (4o4) """
+ """Fragments in fragments (4o4)"""
# TODO this should be ideally in setUpClass, but then we hit a bug
# with VppIpRoute incorrectly reporting it's present when it's not
@@ -2211,11 +2523,15 @@ class TestFIFReassembly(VppTestCase):
self.gre4.config_ip4()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.gre4.sw_if_index, enable_ip4=True)
+ sw_if_index=self.gre4.sw_if_index, enable_ip4=True
+ )
- self.route4 = VppIpRoute(self, self.tun_ip4, 32,
- [VppRoutePath(self.src_if.remote_ip4,
- self.src_if.sw_if_index)])
+ self.route4 = VppIpRoute(
+ self,
+ self.tun_ip4,
+ 32,
+ [VppRoutePath(self.src_if.remote_ip4, self.src_if.sw_if_index)],
+ )
self.route4.add_vpp_config()
self.reset_packet_infos()
@@ -2225,28 +2541,33 @@ class TestFIFReassembly(VppTestCase):
# Ethernet header here is only for size calculation, thus it
# doesn't matter how it's initialized. This is to ensure that
# reassembled packet is not > 9000 bytes, so that it's not dropped
- p = (Ether() /
- IP(id=i, src=self.src_if.remote_ip4,
- dst=self.dst_if.remote_ip4) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ Ether()
+ / IP(id=i, src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
self.extend_packet(p, size, self.padding)
info.data = p[IP] # use only IP part, without ethernet header
- fragments = [x for _, p in self._packet_infos.items()
- for x in fragment_rfc791(p.data, 400)]
+ fragments = [
+ x
+ for _, p in self._packet_infos.items()
+ for x in fragment_rfc791(p.data, 400)
+ ]
- encapped_fragments = \
- [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IP(src=self.tun_ip4, dst=self.src_if.local_ip4) /
- GRE() /
- p
- for p in fragments]
+ encapped_fragments = [
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IP(src=self.tun_ip4, dst=self.src_if.local_ip4)
+ / GRE()
+ / p
+ for p in fragments
+ ]
- fragmented_encapped_fragments = \
- [x for p in encapped_fragments
- for x in fragment_rfc791(p, 200)]
+ fragmented_encapped_fragments = [
+ x for p in encapped_fragments for x in fragment_rfc791(p, 200)
+ ]
self.src_if.add_stream(fragmented_encapped_fragments)
@@ -2263,7 +2584,7 @@ class TestFIFReassembly(VppTestCase):
self.logger.debug(self.vapi.ppcli("show interface"))
def test_fif6(self):
- """ Fragments in fragments (6o6) """
+ """Fragments in fragments (6o6)"""
# TODO this should be ideally in setUpClass, but then we hit a bug
# with VppIpRoute incorrectly reporting it's present when it's not
# so we need to manually remove the vpp config, thus we cannot have
@@ -2276,12 +2597,15 @@ class TestFIFReassembly(VppTestCase):
self.gre6.config_ip6()
self.vapi.ip_reassembly_enable_disable(
- sw_if_index=self.gre6.sw_if_index, enable_ip6=True)
+ sw_if_index=self.gre6.sw_if_index, enable_ip6=True
+ )
- self.route6 = VppIpRoute(self, self.tun_ip6, 128,
- [VppRoutePath(
- self.src_if.remote_ip6,
- self.src_if.sw_if_index)])
+ self.route6 = VppIpRoute(
+ self,
+ self.tun_ip6,
+ 128,
+ [VppRoutePath(self.src_if.remote_ip6, self.src_if.sw_if_index)],
+ )
self.route6.add_vpp_config()
self.reset_packet_infos()
@@ -2291,34 +2615,41 @@ class TestFIFReassembly(VppTestCase):
# Ethernet header here is only for size calculation, thus it
# doesn't matter how it's initialized. This is to ensure that
# reassembled packet is not > 9000 bytes, so that it's not dropped
- p = (Ether() /
- IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6) /
- UDP(sport=1234, dport=5678) /
- Raw(payload))
+ p = (
+ Ether()
+ / IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
+ / UDP(sport=1234, dport=5678)
+ / Raw(payload)
+ )
size = self.packet_sizes[(i // 2) % len(self.packet_sizes)]
self.extend_packet(p, size, self.padding)
info.data = p[IPv6] # use only IPv6 part, without ethernet header
- fragments = [x for _, i in self._packet_infos.items()
- for x in fragment_rfc8200(
- i.data, i.index, 400)]
+ fragments = [
+ x
+ for _, i in self._packet_infos.items()
+ for x in fragment_rfc8200(i.data, i.index, 400)
+ ]
- encapped_fragments = \
- [Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) /
- IPv6(src=self.tun_ip6, dst=self.src_if.local_ip6) /
- GRE() /
- p
- for p in fragments]
+ encapped_fragments = [
+ Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac)
+ / IPv6(src=self.tun_ip6, dst=self.src_if.local_ip6)
+ / GRE()
+ / p
+ for p in fragments
+ ]
- fragmented_encapped_fragments = \
- [x for p in encapped_fragments for x in (
+ fragmented_encapped_fragments = [
+ x
+ for p in encapped_fragments
+ for x in (
fragment_rfc8200(
- p,
- 2 * len(self._packet_infos) + p[IPv6ExtHdrFragment].id,
- 200)
- if IPv6ExtHdrFragment in p else [p]
+ p, 2 * len(self._packet_infos) + p[IPv6ExtHdrFragment].id, 200
+ )
+ if IPv6ExtHdrFragment in p
+ else [p]
)
- ]
+ ]
self.src_if.add_stream(fragmented_encapped_fragments)
@@ -2334,5 +2665,5 @@ class TestFIFReassembly(VppTestCase):
self.gre6.remove_vpp_config()
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)