diff options
author | Paul Vinciguerra <pvinci@vinciconsulting.com> | 2019-03-10 10:04:23 -0700 |
---|---|---|
committer | Ole Trøan <otroan@employees.org> | 2019-03-11 08:11:16 +0000 |
commit | a7427ec6f86cbeba7594f98e41fecab291d66b73 (patch) | |
tree | a1fe195ee4ccdecd420cd908c0752a33ebf3aa19 | |
parent | 0f6602cb246894ea98253e16aae198094bf78694 (diff) |
VPP-1508: Use scapy.compat to manage packet level library differences.
Change-Id: Icdf6abc9e53d33b26fd1d531c7dda6be0bb9cb55
Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
-rw-r--r-- | test/framework.py | 20 | ||||
-rw-r--r-- | test/hook.py | 5 | ||||
-rw-r--r-- | test/test_acl_plugin_l2l3.py | 9 | ||||
-rw-r--r-- | test/test_acl_plugin_macip.py | 48 | ||||
-rw-r--r-- | test/test_bfd.py | 29 | ||||
-rw-r--r-- | test/test_bier.py | 46 | ||||
-rw-r--r-- | test/test_cdp.py | 4 | ||||
-rw-r--r-- | test/test_dhcp.py | 5 | ||||
-rw-r--r-- | test/test_gbp.py | 2 | ||||
-rw-r--r-- | test/test_gre.py | 13 | ||||
-rw-r--r-- | test/test_ip4.py | 5 | ||||
-rw-r--r-- | test/test_ip4_vrf_multi_instance.py | 4 | ||||
-rw-r--r-- | test/test_ip6.py | 1 | ||||
-rw-r--r-- | test/test_ipsec_nat.py | 6 | ||||
-rw-r--r-- | test/test_lb.py | 10 | ||||
-rw-r--r-- | test/test_map.py | 8 | ||||
-rw-r--r-- | test/test_mpls.py | 3 | ||||
-rw-r--r-- | test/test_nat.py | 85 | ||||
-rw-r--r-- | test/test_neighbor.py | 6 | ||||
-rw-r--r-- | test/test_punt.py | 5 | ||||
-rw-r--r-- | test/test_qos.py | 61 | ||||
-rw-r--r-- | test/test_reassembly.py | 12 | ||||
-rw-r--r-- | test/test_srv6.py | 15 | ||||
-rw-r--r-- | test/test_srv6_ad.py | 15 | ||||
-rwxr-xr-x | test/test_srv6_as.py | 15 | ||||
-rw-r--r-- | test/util.py | 13 | ||||
-rw-r--r-- | test/vpp_pg_interface.py | 9 |
27 files changed, 261 insertions, 193 deletions
diff --git a/test/framework.py b/test/framework.py index 2c1c8291ed1..7656253294e 100644 --- a/test/framework.py +++ b/test/framework.py @@ -18,6 +18,8 @@ from threading import Thread, Event from inspect import getdoc, isclass from traceback import format_exception from logging import FileHandler, DEBUG, Formatter + +import scapy.compat from scapy.packet import Raw from hook import StepHook, PollHook, VppDiedError from vpp_pg_interface import VppPGInterface @@ -511,7 +513,11 @@ class VppTestCase(unittest.TestCase): if hasattr(cls, 'vpp'): if hasattr(cls, 'vapi'): + cls.logger.debug("Disconnecting class vapi client on %s", + cls.__name__) cls.vapi.disconnect() + cls.logger.debug("Deleting class vapi attribute on %s", + cls.__name__) del cls.vapi cls.vpp.poll() if cls.vpp.returncode is None: @@ -519,6 +525,8 @@ class VppTestCase(unittest.TestCase): cls.vpp.kill() cls.logger.debug("Waiting for vpp to die") cls.vpp.communicate() + cls.logger.debug("Deleting class vpp attribute on %s", + cls.__name__) del cls.vpp if cls.vpp_startup_failed: @@ -560,7 +568,6 @@ class VppTestCase(unittest.TestCase): def tearDown(self): """ Show various debug prints after each test """ - super(VppTestCase, self).tearDown() self.logger.debug("--- tearDown() for %s.%s(%s) called ---" % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)) @@ -848,14 +855,14 @@ class VppTestCase(unittest.TestCase): def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True): - received = packet.__class__(str(packet)) + received = packet.__class__(scapy.compat.raw(packet)) self.logger.debug( ppp("Verifying packet checksums for packet:", received)) udp_layers = ['UDP', 'UDPerror'] checksum_fields = ['cksum', 'chksum'] checksums = [] counter = 0 - temp = received.__class__(str(received)) + temp = received.__class__(scapy.compat.raw(received)) while True: layer = temp.getlayer(counter) if layer: @@ -872,7 +879,7 @@ class VppTestCase(unittest.TestCase): counter = counter + 1 if 0 == len(checksums): return - temp = temp.__class__(str(temp)) + temp = temp.__class__(scapy.compat.raw(temp)) for layer, cf in checksums: calc_sum = getattr(temp[layer], cf) self.assert_equal( @@ -889,9 +896,10 @@ class VppTestCase(unittest.TestCase): received_packet_checksum = getattr(received_packet[layer], field_name) if ignore_zero_checksum and 0 == received_packet_checksum: return - recalculated = received_packet.__class__(str(received_packet)) + recalculated = received_packet.__class__( + scapy.compat.raw(received_packet)) delattr(recalculated[layer], field_name) - recalculated = recalculated.__class__(str(recalculated)) + recalculated = recalculated.__class__(scapy.compat.raw(recalculated)) self.assert_equal(received_packet_checksum, getattr(recalculated[layer], field_name), "packet checksum on layer: %s" % layer) diff --git a/test/hook.py b/test/hook.py index 6c971c9937f..555fc2ea32a 100644 --- a/test/hook.py +++ b/test/hook.py @@ -5,6 +5,9 @@ import traceback from log import RED, single_line_delim, double_line_delim import ipaddress from subprocess import check_output, CalledProcessError + +import scapy.compat + from util import check_core_path, get_core_path @@ -31,7 +34,7 @@ class Hook(object): return val if len(val) == 6: return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format( - ord(x)) for x in val])) + scapy.compat.orb(x)) for x in val])) try: # we don't call test_type(val) because it is a packed value. return '{!s} ({!s})'.format(val, str( diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 73dd473c67f..2a99e86119b 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -28,6 +28,7 @@ from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP, TCP @@ -308,12 +309,14 @@ class TestACLpluginL2L3(VppTestCase): # Scapy IPv6 stuff is too smart for its own good. # So we do this and coerce the ICMP into unknown type if packet.haslayer(UDP): - data = str(packet[UDP][Raw]) + data = scapy.compat.raw(packet[UDP][Raw]) else: if l3 == IP: - data = str(ICMP(str(packet[l3].payload))[Raw]) + data = scapy.compat.raw(ICMP( + scapy.compat.raw(packet[l3].payload))[Raw]) else: - data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody) + data = scapy.compat.raw(ICMPv6Unknown( + scapy.compat.raw(packet[l3].payload)).msgbody) udp_or_icmp = packet[l3].payload payload_info = self.payload_to_info(data) packet_index = payload_info.index diff --git a/test/test_acl_plugin_macip.py b/test/test_acl_plugin_macip.py index d2d684abc11..6f99646e6e2 100644 --- a/test/test_acl_plugin_macip.py +++ b/test/test_acl_plugin_macip.py @@ -3,12 +3,14 @@ from __future__ import print_function """ACL plugin - MACIP tests """ import binascii +import ipaddress import random from socket import inet_ntop, inet_pton, AF_INET, AF_INET6 from struct import pack, unpack import re import unittest +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP @@ -194,8 +196,10 @@ class MethodHolder(VppTestCase): return acls def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP, - acl_count=1, rules_count=[1]): + acl_count=1, rules_count=None): acls = [] + if rules_count is None: + rules_count = [1] src_mac = int("220000dead00", 16) for acl in range(2, (acl_count+1) * 2): rules = [] @@ -248,7 +252,7 @@ class MethodHolder(VppTestCase): ip4[3] = 0 ip6[8] = random.randint(100, 200) ip6[15] = 0 - ip_pack = '' + ip_pack = b'' for j in range(0, len(ip)): ip_pack += pack('<B', int(ip[j])) @@ -256,8 +260,9 @@ class MethodHolder(VppTestCase): 'is_ipv6': is_ip6, 'src_ip_addr': ip_pack, 'src_ip_prefix_len': ip_len, - 'src_mac': mac.replace(':', '').decode('hex'), - 'src_mac_mask': mask.replace(':', '').decode('hex')}) + 'src_mac': binascii.unhexlify(mac.replace(':', '')), + 'src_mac_mask': binascii.unhexlify( + mask.replace(':', ''))}) rules.append(rule) if ip_type == self.WILD_IP: break @@ -275,7 +280,7 @@ class MethodHolder(VppTestCase): def verify_macip_acls(self, acl_count, rules_count, expected_count=2): reply = self.macip_acl_dump_debug() for acl in range(2, (acl_count+1) * 2): - self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1]) + self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1]) self.vapi.macip_acl_interface_get() @@ -410,30 +415,32 @@ class MethodHolder(VppTestCase): sub_ip[15] = random.randint(200, 255) elif ip_type == self.SUBNET_IP: if denyIP: - sub_ip[2] = str(int(sub_ip[2]) + 1) + sub_ip[2] = int(sub_ip[2]) + 1 sub_ip[14] = random.randint(100, 199) sub_ip[15] = random.randint(200, 255) - src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip))) + packed_src_ip6 = b''.join( + [scapy.compat.chb(x) for x in sub_ip]) + src_ip6 = inet_ntop(AF_INET6, packed_src_ip6) packet /= IPv6(src=src_ip6, dst=dst_ip6) else: if ip_type != self.EXACT_IP: sub_ip = ip_rule.split('.') if ip_type == self.WILD_IP: - sub_ip[0] = str(random.randint(1, 49)) - sub_ip[1] = str(random.randint(50, 99)) - sub_ip[2] = str(random.randint(100, 199)) - sub_ip[3] = str(random.randint(200, 255)) + sub_ip[0] = random.randint(1, 49) + sub_ip[1] = random.randint(50, 99) + sub_ip[2] = random.randint(100, 199) + sub_ip[3] = random.randint(200, 255) elif ip_type == self.SUBNET_IP: if denyIP: - sub_ip[1] = str(int(sub_ip[1])+1) - sub_ip[2] = str(random.randint(100, 199)) - sub_ip[3] = str(random.randint(200, 255)) - src_ip4 = ".".join(sub_ip) + sub_ip[1] = int(sub_ip[1])+1 + sub_ip[2] = random.randint(100, 199) + sub_ip[3] = random.randint(200, 255) + src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip]) packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0) packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload) - packet[Raw].load += " mac:"+src_mac + packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac) size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)] if isinstance(src_if, VppSubInterface): @@ -485,7 +492,9 @@ class MethodHolder(VppTestCase): sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip))) for i in range(8, 16): sub_ip[i] = 0 - ip = inet_ntop(AF_INET6, str(bytearray(sub_ip))) + packed_ip = b''.join( + [scapy.compat.chb(x) for x in sub_ip]) + ip = inet_ntop(AF_INET6, packed_ip) else: if ip_type == self.WILD_IP: ip = "0.0.0.0" @@ -540,8 +549,9 @@ class MethodHolder(VppTestCase): 'is_ipv6': is_ip6, 'src_ip_addr': ip_rule, 'src_ip_prefix_len': prefix_len, - 'src_mac': mac_rule.replace(':', '').decode('hex'), - 'src_mac_mask': mac_mask.replace(':', '').decode('hex')}) + 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')), + 'src_mac_mask': binascii.unhexlify( + mac_mask.replace(':', ''))}) macip_rules.append(macip_rule) # deny all other packets diff --git a/test/test_bfd.py b/test/test_bfd.py index 7b474228eb6..4c3f5354c1a 100644 --- a/test/test_bfd.py +++ b/test/test_bfd.py @@ -12,6 +12,7 @@ from socket import AF_INET, AF_INET6, inet_ntop from struct import pack, unpack from six import moves +import scapy.compat from scapy.layers.inet import UDP, IP from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether @@ -42,7 +43,8 @@ class AuthKeyFactory(object): while conf_key_id in self._conf_key_ids: conf_key_id = randint(0, 0xFFFFFFFF) self._conf_key_ids[conf_key_id] = 1 - key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))])) + key = scapy.compat.raw( + bytearray([randint(0, 255) for _ in range(randint(1, 20))])) return VppBFDAuthKey(test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key) @@ -435,7 +437,8 @@ class BFDTestSession(object): self.test.logger.debug("BFD: Creating packet") self.fill_packet_fields(packet) if self.sha1_key: - hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \ + hash_material = scapy.compat.raw( + packet[BFD])[:32] + self.sha1_key.key + \ "\0" * (20 - len(self.sha1_key.key)) self.test.logger.debug("BFD: Calculated SHA1 hash: %s" % hashlib.sha1(hash_material).hexdigest()) @@ -492,7 +495,7 @@ class BFDTestSession(object): # last 20 bytes represent the hash - so replace them with the key, # pad the result with zeros and hash the result hash_material = bfd.original[:-20] + self.sha1_key.key + \ - "\0" * (20 - len(self.sha1_key.key)) + b"\0" * (20 - len(self.sha1_key.key)) expected_hash = hashlib.sha1(hash_material).hexdigest() self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash), expected_hash, "Auth key hash") @@ -923,7 +926,7 @@ class BFD4TestCase(VppTestCase): # halve required min rx old_required_min_rx = self.vpp_session.required_min_rx self.vpp_session.modify_parameters( - required_min_rx=0.5 * self.vpp_session.required_min_rx) + required_min_rx=self.vpp_session.required_min_rx // 2) # now we wait 0.8*3*old-req-min-rx and the session should still be up self.sleep(0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC, @@ -1118,8 +1121,8 @@ class BFD4TestCase(VppTestCase): udp_sport_rx += 1 # need to compare the hex payload here, otherwise BFD_vpp_echo # gets in way - self.assertEqual(str(p[UDP].payload), - str(echo_packet[UDP].payload), + self.assertEqual(scapy.compat.raw(p[UDP].payload), + scapy.compat.raw(echo_packet[UDP].payload), "Received packet is not the echo packet sent") self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== " "ECHO packet identifier for test purposes)") @@ -1612,8 +1615,8 @@ class BFD6TestCase(VppTestCase): udp_sport_rx += 1 # need to compare the hex payload here, otherwise BFD_vpp_echo # gets in way - self.assertEqual(str(p[UDP].payload), - str(echo_packet[UDP].payload), + self.assertEqual(scapy.compat.raw(p[UDP].payload), + scapy.compat.raw(echo_packet[UDP].payload), "Received packet is not the echo packet sent") self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== " "ECHO packet identifier for test purposes)") @@ -2293,7 +2296,7 @@ class BFDCLITestCase(VppTestCase): def cli_verify_no_response(self, cli): """ execute a CLI, asserting that the response is empty """ self.assert_equal(self.vapi.cli(cli), - "", + b"", "CLI command response") def cli_verify_response(self, cli, expected): @@ -2325,7 +2328,7 @@ class BFDCLITestCase(VppTestCase): self.cli_verify_no_response( "bfd key set conf-key-id %s type keyed-sha1 secret %s" % (k.conf_key_id, - "".join("{:02x}".format(ord(c)) for c in k.key))) + "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key))) self.assertTrue(k.query_vpp_config()) self.vpp_session = VppBFDUDPSession( self, self.pg0, self.pg0.remote_ip4, sha1_key=k) @@ -2342,7 +2345,7 @@ class BFDCLITestCase(VppTestCase): self.cli_verify_response( "bfd key set conf-key-id %s type keyed-sha1 secret %s" % (k.conf_key_id, - "".join("{:02x}".format(ord(c)) for c in k2.key)), + "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)), "bfd key set: `bfd_auth_set_key' API call failed, " "rv=-103:BFD object in use") # manipulating the session using old secret should still work @@ -2361,7 +2364,7 @@ class BFDCLITestCase(VppTestCase): self.cli_verify_no_response( "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" % (k.conf_key_id, - "".join("{:02x}".format(ord(c)) for c in k.key))) + "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key))) self.assertTrue(k.query_vpp_config()) self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, @@ -2380,7 +2383,7 @@ class BFDCLITestCase(VppTestCase): self.cli_verify_response( "bfd key set conf-key-id %s type keyed-sha1 secret %s" % (k.conf_key_id, - "".join("{:02x}".format(ord(c)) for c in k2.key)), + "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)), "bfd key set: `bfd_auth_set_key' API call failed, " "rv=-103:BFD object in use") # manipulating the session using old secret should still work diff --git a/test/test_bier.py b/test/test_bier.py index 9a9db3b0b6e..58ca3b08ebc 100644 --- a/test/test_bier.py +++ b/test/test_bier.py @@ -12,6 +12,7 @@ from vpp_bier import BIER_HDR_PAYLOAD, VppBierImp, VppBierDispEntry, \ VppBierDispTable, VppBierTable, VppBierTableID, VppBierRoute from vpp_udp_encap import VppUdpEncap +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP @@ -122,10 +123,11 @@ class TestBier(VppTestCase): for pkt_size in pkt_sizes: p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / MPLS(label=77, ttl=255) / - BIER(length=hdr_len_id, BitString=chr(255)*n_bytes) / + BIER(length=hdr_len_id, + BitString=scapy.compat.chb(255)*n_bytes) / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) / UDP(sport=1234, dport=1234) / - Raw(chr(5) * pkt_size)) + Raw(scapy.compat.chb(5) * pkt_size)) pkts = p self.pg0.add_stream(pkts) @@ -159,18 +161,18 @@ class TestBier(VppTestCase): self.assertEqual(bier_hdr.Proto, 5) # The bit-string should consist only of the BP given by i. - byte_array = ['\0'] * (n_bytes) - byte_val = chr(1 << (bp - 1) % 8) - byte_pos = n_bytes - (((bp - 1) / 8) + 1) + byte_array = [b'\0'] * (n_bytes) + byte_val = scapy.compat.chb(1 << (bp - 1) % 8) + byte_pos = n_bytes - (((bp - 1) // 8) + 1) byte_array[byte_pos] = byte_val - bitstring = ''.join(byte_array) + bitstring = ''.join([scapy.compat.chb(x) for x in byte_array]) self.assertEqual(len(bitstring), len(bier_hdr.BitString)) self.assertEqual(bitstring, bier_hdr.BitString) # # cleanup. not strictly necessary, but it's much quicker this way - # becuase the bier_fib_dump and ip_fib_dump will be empty when the + # because the bier_fib_dump and ip_fib_dump will be empty when the # auto-cleanup kicks in # for br in bier_routes: @@ -222,7 +224,7 @@ class TestBier(VppTestCase): MPLS(label=77, ttl=255) / BIER(length=BIERLength.BIER_LEN_64, entropy=ii, - BitString=chr(255)*16) / + BitString=scapy.compat.chb(255)*16) / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) / UDP(sport=1234, dport=1234) / @@ -334,7 +336,7 @@ class TestBier(VppTestCase): # # An imposition object with both bit-positions set # - bi = VppBierImp(self, bti, 333, chr(0x3) * 32) + bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32) bi.add_vpp_config() # @@ -446,7 +448,7 @@ class TestBier(VppTestCase): p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / MPLS(label=77, ttl=255) / BIER(length=BIERLength.BIER_LEN_256, - BitString=chr(255)*32, + BitString=scapy.compat.chb(255)*32, BFRID=99) / IP(src="1.1.1.1", dst="232.1.1.1") / UDP(sport=1234, dport=1234) / @@ -460,7 +462,7 @@ class TestBier(VppTestCase): p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / MPLS(label=77, ttl=255) / BIER(length=BIERLength.BIER_LEN_256, - BitString=chr(255)*32, + BitString=scapy.compat.chb(255)*32, BFRID=77) / IP(src="1.1.1.1", dst="232.1.1.1") / UDP(sport=1234, dport=1234) / @@ -486,7 +488,7 @@ class TestBier(VppTestCase): # A multicast route to forward post BIER disposition that needs # a check against sending back into the BIER core # - bi = VppBierImp(self, bti, 333, chr(0x3) * 32) + bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32) bi.add_vpp_config() route_eg_232_1_1_2 = VppIpMRoute( @@ -506,7 +508,7 @@ class TestBier(VppTestCase): p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / MPLS(label=77, ttl=255) / BIER(length=BIERLength.BIER_LEN_256, - BitString=chr(255)*32, + BitString=scapy.compat.chb(255)*32, BFRID=77) / IP(src="1.1.1.1", dst="232.1.1.2") / UDP(sport=1234, dport=1234) / @@ -523,10 +525,10 @@ class TestBier(VppTestCase): bt = VppBierTable(self, bti, 77) bt.add_vpp_config() - lowest = ['\0'] * (n_bytes) - lowest[-1] = chr(1) - highest = ['\0'] * (n_bytes) - highest[0] = chr(128) + lowest = [b'\0'] * (n_bytes) + lowest[-1] = scapy.compat.chb(1) + highest = [b'\0'] * (n_bytes) + highest[0] = scapy.compat.chb(128) # # Impostion Sets bit strings @@ -636,7 +638,7 @@ class TestBier(VppTestCase): src=self.pg0.remote_mac) / IP(src="1.1.1.1", dst="232.1.1.1") / UDP(sport=1234, dport=1234) / - Raw(chr(5) * 32)) + Raw(scapy.compat.chb(5) * 32)) rx = self.send_and_expect(self.pg0, p*65, self.pg1) @@ -647,7 +649,7 @@ class TestBier(VppTestCase): src=self.pg0.remote_mac) / IP(src="1.1.1.1", dst="232.1.1.2") / UDP(sport=1234, dport=1234) / - Raw(chr(5) * 512)) + Raw(scapy.compat.chb(5) * 512)) rx = self.send_and_expect(self.pg0, p*65, self.pg1) self.assertEqual(rx[0][IP].src, "1.1.1.1") @@ -716,9 +718,9 @@ class TestBier(VppTestCase): # only use the second, but creating 2 tests with a non-zero # value index in the route add # - bi = VppBierImp(self, bti, 333, chr(0xff) * 32) + bi = VppBierImp(self, bti, 333, scapy.compat.chb(0xff) * 32) bi.add_vpp_config() - bi2 = VppBierImp(self, bti, 334, chr(0xff) * 32) + bi2 = VppBierImp(self, bti, 334, scapy.compat.chb(0xff) * 32) bi2.add_vpp_config() # @@ -821,7 +823,7 @@ class TestBier(VppTestCase): UDP(sport=333, dport=8138) / BIFT(sd=1, set=0, bsl=2, ttl=255) / BIER(length=BIERLength.BIER_LEN_256, - BitString=chr(255)*32, + BitString=scapy.compat.chb(255)*32, BFRID=99) / IP(src="1.1.1.1", dst="232.1.1.1") / UDP(sport=1234, dport=1234) / diff --git a/test/test_cdp.py b/test/test_cdp.py index e8ced7cb796..1996c5141f0 100644 --- a/test/test_cdp.py +++ b/test/test_cdp.py @@ -71,6 +71,10 @@ class TestCDP(VppTestCase): super(TestCDP, cls).tearDownClass() raise + @classmethod + def tearDownClass(cls): + super(TestCDP, cls).tearDownClass() + def test_enable_cdp(self): self.logger.info(self.vapi.cli("cdp enable")) ret = self.vapi.cli("show cdp") diff --git a/test/test_dhcp.py b/test/test_dhcp.py index 2efa9a7e244..62db66282b6 100644 --- a/test/test_dhcp.py +++ b/test/test_dhcp.py @@ -8,6 +8,7 @@ from framework import VppTestCase, VppTestRunner, running_extended_tests from vpp_neighbor import VppNeighbor from vpp_ip_route import find_route, VppIpTable from util import mk_ll_addr +import scapy.compat from scapy.layers.l2 import Ether, getmacbyip, ARP from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, in6_getnsmac @@ -490,7 +491,7 @@ class TestDHCP(VppTestCase): # # 1. not our IP address = not checked by VPP? so offer is replayed # to client - bad_ip = option_82[0:8] + chr(33) + option_82[9:] + bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:] p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / @@ -504,7 +505,7 @@ class TestDHCP(VppTestCase): "DHCP offer option 82 bad address") # 2. Not a sw_if_index VPP knows - bad_if_index = option_82[0:2] + chr(33) + option_82[3:] + bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:] p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / diff --git a/test/test_gbp.py b/test/test_gbp.py index 43043033a45..a5f053dcacc 100644 --- a/test/test_gbp.py +++ b/test/test_gbp.py @@ -3207,7 +3207,7 @@ class TestGBP(VppTestCase): vx_tun_l3.add_vpp_config() # - # packets destined to unkown addresses in the BVI's subnet + # packets destined to unknown addresses in the BVI's subnet # are ARP'd for # p4 = (Ether(src=self.pg0.remote_mac, dst=str(self.router_mac)) / diff --git a/test/test_gre.py b/test/test_gre.py index ccd66c428e8..694f819308e 100644 --- a/test/test_gre.py +++ b/test/test_gre.py @@ -2,6 +2,7 @@ import unittest +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q, GRE from scapy.layers.inet import IP, UDP @@ -146,7 +147,8 @@ class TestGRE(VppTestCase): GRE() / Ether(dst=RandMAC('*:*:*:*:*:*'), src=RandMAC('*:*:*:*:*:*')) / - IP(src=str(RandIP()), dst=str(RandIP())) / + IP(src=scapy.compat.raw(RandIP()), + dst=scapy.compat.raw(RandIP())) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() @@ -165,7 +167,8 @@ class TestGRE(VppTestCase): Ether(dst=RandMAC('*:*:*:*:*:*'), src=RandMAC('*:*:*:*:*:*')) / Dot1Q(vlan=vlan) / - IP(src=str(RandIP()), dst=str(RandIP())) / + IP(src=scapy.compat.raw(RandIP()), + dst=scapy.compat.raw(RandIP())) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() @@ -217,7 +220,7 @@ class TestGRE(VppTestCase): self.assertEqual(rx_ip.src, tunnel_src) self.assertEqual(rx_ip.dst, tunnel_dst) - rx_gre = GRE(str(rx_ip[IPv6].payload)) + rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload)) rx_ip = rx_gre[IPv6] self.assertEqual(rx_ip.src, tx_ip.src) @@ -243,7 +246,7 @@ class TestGRE(VppTestCase): self.assertEqual(rx_ip.src, tunnel_src) self.assertEqual(rx_ip.dst, tunnel_dst) - rx_gre = GRE(str(rx_ip[IPv6].payload)) + rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload)) tx_ip = tx[IP] rx_ip = rx_gre[IP] @@ -270,7 +273,7 @@ class TestGRE(VppTestCase): self.assertEqual(rx_ip.src, tunnel_src) self.assertEqual(rx_ip.dst, tunnel_dst) - rx_gre = GRE(str(rx_ip[IP].payload)) + rx_gre = GRE(scapy.compat.raw(rx_ip[IP].payload)) rx_ip = rx_gre[IPv6] tx_ip = tx[IPv6] diff --git a/test/test_ip4.py b/test/test_ip4.py index 492e96a99e3..57005f79355 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -4,6 +4,7 @@ import random import socket import unittest +import scapy.compat from scapy.contrib.mpls import MPLS from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes from scapy.layers.l2 import Ether, Dot1Q, ARP @@ -310,7 +311,7 @@ class TestIPv4FibCrud(VppTestCase): dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): - n_dest_addr = '{:08x}'.format(dest_addr).decode('hex') + n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr)) self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len, n_next_hop_addr) added_ips.append(socket.inet_ntoa(n_dest_addr)) @@ -325,7 +326,7 @@ class TestIPv4FibCrud(VppTestCase): dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): - n_dest_addr = '{:08x}'.format(dest_addr).decode('hex') + n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr)) self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len, n_next_hop_addr, is_add=0) removed_ips.append(socket.inet_ntoa(n_dest_addr)) diff --git a/test/test_ip4_vrf_multi_instance.py b/test/test_ip4_vrf_multi_instance.py index 71532adf364..6d0f21d8bc9 100644 --- a/test/test_ip4_vrf_multi_instance.py +++ b/test/test_ip4_vrf_multi_instance.py @@ -66,6 +66,7 @@ import unittest import random import socket +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ARP @@ -344,7 +345,8 @@ class TestIp4VrfMultiInst(VppTestCase): if found: break for host in pg_if.remote_hosts: - if str(addr) == str(host.ip4): + if scapy.compat.raw(addr) == \ + scapy.compat.raw(host.ip4): vrf_count += 1 found = True break diff --git a/test/test_ip6.py b/test/test_ip6.py index 1d52edb7721..07e3c7834c5 100644 --- a/test/test_ip6.py +++ b/test/test_ip6.py @@ -4,6 +4,7 @@ import socket import unittest from parameterized import parameterized +import scapy.compat import scapy.layers.inet6 as inet6 from scapy.contrib.mpls import MPLS from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \ diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py index cdb9cb438f2..e364f5ffe96 100644 --- a/test/test_ipsec_nat.py +++ b/test/test_ipsec_nat.py @@ -2,9 +2,11 @@ import socket +import scapy.compat from scapy.layers.l2 import Ether from scapy.layers.inet import ICMP, IP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP + from util import ppp, ppc from template_ipsec import TemplateIpsec from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ @@ -126,9 +128,9 @@ class IPSecNATTestCase(TemplateIpsec): def verify_capture_encrypted(self, capture, sa): for packet in capture: try: - copy = packet.__class__(str(packet)) + copy = packet.__class__(scapy.compat.raw(packet)) del copy[UDP].len - copy = packet.__class__(str(copy)) + copy = packet.__class__(scapy.compat.raw(copy)) self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length") self.assert_packet_checksums_valid(packet) diff --git a/test/test_lb.py b/test/test_lb.py index 86e7cb46ad3..fa179504855 100644 --- a/test/test_lb.py +++ b/test/test_lb.py @@ -1,5 +1,6 @@ import socket +import scapy.compat from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from scapy.layers.l2 import Ether, GRE @@ -93,11 +94,12 @@ class TestLB(VppTestCase): self.assertEqual(gre.proto, 0x0800 if isv4 else 0x86DD) self.assertEqual(gre.flags, 0) self.assertEqual(gre.version, 0) - inner = IPver(str(gre.payload)) + inner = IPver(scapy.compat.raw(gre.payload)) payload_info = self.payload_to_info(inner[Raw]) self.info = self.packet_infos[payload_info.index] self.assertEqual(payload_info.src, self.pg0.sw_if_index) - self.assertEqual(str(inner), str(self.info.data[IPver])) + self.assertEqual(scapy.compat.raw(inner), + scapy.compat.raw(self.info.data[IPver])) def checkCapture(self, encap, isv4): self.pg0.assert_nothing_captured() @@ -135,7 +137,7 @@ class TestLB(VppTestCase): ) self.assertEqual(ip.nh, 47) # self.assertEqual(len(ip.options), 0) - gre = GRE(str(p[IPv6].payload)) + gre = GRE(scapy.compat.raw(p[IPv6].payload)) self.checkInner(gre, isv4) elif (encap == 'l3dsr'): ip = p[IP] @@ -174,7 +176,7 @@ class TestLB(VppTestCase): ) self.assertEqual(ip.nh, 17) self.assertGreaterEqual(ip.hlim, 63) - udp = UDP(str(p[IPv6].payload)) + udp = UDP(scapy.compat.raw(p[IPv6].payload)) self.assertEqual(udp.dport, 3307) load[asid] += 1 except: diff --git a/test/test_map.py b/test/test_map.py index 952a737c1ac..1655ceda361 100644 --- a/test/test_map.py +++ b/test/test_map.py @@ -5,6 +5,8 @@ import unittest from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath + +import scapy.compat from scapy.layers.l2 import Ether, Raw from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded @@ -187,7 +189,7 @@ class TestMAP(VppTestCase): self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1") def validate(self, rx, expected): - self.assertEqual(rx, expected.__class__(str(expected))) + self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected))) def payload(self, len): return 'x' * len @@ -369,7 +371,7 @@ class TestMAP(VppTestCase): p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f") / payload) p6_translated.hlim -= 1 - p6_translated['TCP'].options = [('MSS', 1300)] + p6_translated[TCP].options = [('MSS', 1300)] rx = self.send_and_expect(self.pg0, p4*1, self.pg1) for p in rx: self.validate(p[1], p6_translated) @@ -383,7 +385,7 @@ class TestMAP(VppTestCase): dst=self.pg0.remote_ip4) / payload) p4_translated.id = 0 p4_translated.ttl -= 1 - p4_translated['TCP'].options = [('MSS', 1300)] + p4_translated[TCP].options = [('MSS', 1300)] rx = self.send_and_expect(self.pg1, p6*1, self.pg0) for p in rx: self.validate(p[1], p4_translated) diff --git a/test/test_mpls.py b/test/test_mpls.py index e7fb288f856..3198d905327 100644 --- a/test/test_mpls.py +++ b/test/test_mpls.py @@ -11,6 +11,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \ VppMplsLabel, MplsLspMode, find_mpls_route from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP @@ -1873,7 +1874,7 @@ class TestMPLSL2(VppTestCase): verify_mpls_stack(self, rx, mpls_labels) tx_eth = tx[Ether] - rx_eth = Ether(str(rx[MPLS].payload)) + rx_eth = Ether(scapy.compat.raw(rx[MPLS].payload)) self.assertEqual(rx_eth.src, tx_eth.src) self.assertEqual(rx_eth.dst, tx_eth.dst) diff --git a/test/test_nat.py b/test/test_nat.py index 0ef30267088..160890a078e 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -6,6 +6,8 @@ import struct import random from framework import VppTestCase, VppTestRunner, running_extended_tests + +import scapy.compat from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \ @@ -320,7 +322,8 @@ class MethodHolder(VppTestCase): pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] - return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n)) + packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n]) + return socket.inet_ntop(socket.AF_INET6, packed_pref_n) def extract_ip4(self, ip6, plen): """ @@ -689,8 +692,8 @@ class MethodHolder(VppTestCase): p = (IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data)) - p = p.__class__(str(p)) - chksum = p['TCP'].chksum + p = p.__class__(scapy.compat.raw(p)) + chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) @@ -869,8 +872,8 @@ class MethodHolder(VppTestCase): self.assertEqual(6, len(data)) for record in data: # natEvent - self.assertIn(ord(record[230]), [4, 5]) - if ord(record[230]) == 4: + self.assertIn(scapy.compat.orb(record[230]), [4, 5]) + if scapy.compat.orb(record[230]) == 4: nat44_ses_create_num += 1 else: nat44_ses_delete_num += 1 @@ -882,16 +885,16 @@ class MethodHolder(VppTestCase): # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort - if IP_PROTOS.icmp == ord(record[4]): + if IP_PROTOS.icmp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7]) self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227]) - elif IP_PROTOS.tcp == ord(record[4]): + elif IP_PROTOS.tcp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) - elif IP_PROTOS.udp == ord(record[4]): + elif IP_PROTOS.udp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.udp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.udp_port_out), @@ -910,7 +913,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 3) + self.assertEqual(scapy.compat.orb(record[230]), 3) # natPoolID self.assertEqual(struct.pack("!I", 0), record[283]) @@ -924,7 +927,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 1), record[466]) # maxSessionEntries @@ -940,7 +943,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 2), record[466]) # maxBIBEntries @@ -957,7 +960,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 5), record[466]) # maxFragmentsPendingReassembly @@ -976,7 +979,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 5), record[466]) # maxFragmentsPendingReassembly @@ -996,15 +999,15 @@ class MethodHolder(VppTestCase): record = data[0] # natEvent if is_create: - self.assertEqual(ord(record[230]), 10) + self.assertEqual(scapy.compat.orb(record[230]), 10) else: - self.assertEqual(ord(record[230]), 11) + self.assertEqual(scapy.compat.orb(record[230]), 11) # sourceIPv6Address self.assertEqual(src_addr, record[27]) # postNATSourceIPv4Address self.assertEqual(self.nat_addr_n, record[225]) # protocolIdentifier - self.assertEqual(IP_PROTOS.tcp, ord(record[4])) + self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort @@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase): record = data[0] # natEvent if is_create: - self.assertEqual(ord(record[230]), 6) + self.assertEqual(scapy.compat.orb(record[230]), 6) else: - self.assertEqual(ord(record[230]), 7) + self.assertEqual(scapy.compat.orb(record[230]), 7) # sourceIPv6Address self.assertEqual(src_addr, record[27]) # destinationIPv6Address @@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase): self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226]) # protocolIdentifier - self.assertEqual(IP_PROTOS.tcp, ord(record[4])) + self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort @@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase): self.assertEqual(1, len(data)) record = data[0] # natEvent - self.assertEqual(ord(record[230]), 13) + self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("I", 3), record[466]) # maxEntriesPerUser @@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) reass = self.vapi.nat_reass_dump() @@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag(self.pg0, @@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) for i in range(2): @@ -3539,7 +3542,7 @@ class TestNAT44(MethodHolder): is_inside=0) self.vapi.nat44_forwarding_enable_disable(1) - data = "A" * 16 + "B" * 16 + "C" * 3 + data = b"A" * 16 + b"B" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, 4789, @@ -3667,7 +3670,7 @@ class TestNAT44(MethodHolder): self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port) - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 self.tcp_port_in = random.randint(1025, 65535) pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, @@ -8043,7 +8046,7 @@ class TestNAT64(MethodHolder): reass_n_start = len(reass) # in2out - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) self.pg0.add_stream(pkts) @@ -8059,7 +8062,7 @@ class TestNAT64(MethodHolder): self.assertEqual(data, p[Raw].load) # out2in - data = "A" * 4 + "b" * 16 + "C" * 3 + data = b"A" * 4 + b"b" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.nat_addr, 20, @@ -8127,7 +8130,7 @@ class TestNAT64(MethodHolder): self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0) # in2out - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) pkts.reverse() @@ -8144,7 +8147,7 @@ class TestNAT64(MethodHolder): self.assertEqual(data, p[Raw].load) # out2in - data = "A" * 4 + "B" * 16 + "C" * 3 + data = b"A" * 4 + b"B" * 16 + b"C" * 3 pkts = self.create_stream_frag(self.pg1, self.nat_addr, 20, @@ -8283,7 +8286,7 @@ class TestNAT64(MethodHolder): self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port) - data = 'a' * 200 + data = b'a' * 200 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data) pkts.reverse() @@ -8358,9 +8361,9 @@ class TestNAT64(MethodHolder): for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - if ord(data[0][230]) == 10: + if scapy.compat.orb(data[0][230]) == 10: self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n) - elif ord(data[0][230]) == 6: + elif scapy.compat.orb(data[0][230]) == 6: self.verify_ipfix_nat64_ses(data, 1, self.pg0.remote_ip6n, @@ -8387,9 +8390,9 @@ class TestNAT64(MethodHolder): self.ipfix_domain_id) if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) - if ord(data[0][230]) == 11: + if scapy.compat.orb(data[0][230]) == 11: self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n) - elif ord(data[0][230]) == 7: + elif scapy.compat.orb(data[0][230]) == 7: self.verify_ipfix_nat64_ses(data, 0, self.pg0.remote_ip6n, diff --git a/test/test_neighbor.py b/test/test_neighbor.py index c378cff4e13..aec48712921 100644 --- a/test/test_neighbor.py +++ b/test/test_neighbor.py @@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \ VppIpTable, DpoProto from vpp_papi import VppEnum +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q from scapy.layers.inet import IP, UDP @@ -1178,8 +1179,9 @@ class ARPTestCase(VppTestCase): # # change the interface's MAC # - mac = [chr(0x00), chr(0x00), chr(0x00), - chr(0x33), chr(0x33), chr(0x33)] + mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00), + scapy.compat.chb(0x00), scapy.compat.chb(0x33), + scapy.compat.chb(0x33), scapy.compat.chb(0x33)] mac_string = ''.join(mac) self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index, diff --git a/test/test_punt.py b/test/test_punt.py index 7959b981837..e265dc01f9b 100644 --- a/test/test_punt.py +++ b/test/test_punt.py @@ -11,6 +11,7 @@ import struct from struct import unpack, unpack_from from util import ppp, ppc from re import compile +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP @@ -20,12 +21,12 @@ from framework import VppTestCase, VppTestRunner # Format MAC Address def get_mac_addr(bytes_addr): - return ':'.join('%02x' % ord(b) for b in bytes_addr) + return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr) # Format IP Address def ipv4(bytes_addr): - return '.'.join('%d' % ord(b) for b in bytes_addr) + return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr) # Unpack Ethernet Frame diff --git a/test/test_qos.py b/test/test_qos.py index 90ce4063348..38f4aafcb11 100644 --- a/test/test_qos.py +++ b/test/test_qos.py @@ -9,6 +9,7 @@ from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \ VppMplsLabel, VppMplsTable +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet import IP, UDP @@ -50,10 +51,10 @@ class TestQOS(VppTestCase): # for table 1 map the n=0xff possible values of input QoS mark, # n to 1-n # - output = [chr(0)] * 256 + output = [scapy.compat.chb(0)] * 256 for i in range(0, 255): - output[i] = chr(255 - i) - os = ''.join(output) + output[i] = scapy.compat.chb(255 - i) + os = b''.join(output) rows = [{'outputs': os}, {'outputs': os}, {'outputs': os}, @@ -64,8 +65,8 @@ class TestQOS(VppTestCase): # # For table 2 (and up) use the value n for everything # - output = [chr(2)] * 256 - os = ''.join(output) + output = [scapy.compat.chb(2)] * 256 + os = b''.join(output) rows = [{'outputs': os}, {'outputs': os}, {'outputs': os}, @@ -73,8 +74,8 @@ class TestQOS(VppTestCase): self.vapi.qos_egress_map_update(2, rows) - output = [chr(3)] * 256 - os = ''.join(output) + output = [scapy.compat.chb(3)] * 256 + os = b''.join(output) rows = [{'outputs': os}, {'outputs': os}, {'outputs': os}, @@ -82,8 +83,8 @@ class TestQOS(VppTestCase): self.vapi.qos_egress_map_update(3, rows) - output = [chr(4)] * 256 - os = ''.join(output) + output = [scapy.compat.chb(4)] * 256 + os = b''.join(output) rows = [{'outputs': os}, {'outputs': os}, {'outputs': os}, @@ -121,12 +122,12 @@ class TestQOS(VppTestCase): p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6, tc=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) # # Since we have not yet enabled the recording of the input QoS @@ -282,14 +283,14 @@ class TestQOS(VppTestCase): from_ip = 6 from_mpls = 5 from_vlan = 4 - output = [chr(from_ext)] * 256 - os1 = ''.join(output) - output = [chr(from_vlan)] * 256 - os2 = ''.join(output) - output = [chr(from_mpls)] * 256 - os3 = ''.join(output) - output = [chr(from_ip)] * 256 - os4 = ''.join(output) + output = [scapy.compat.chb(from_ext)] * 256 + os1 = b''.join(output) + output = [scapy.compat.chb(from_vlan)] * 256 + os2 = b''.join(output) + output = [scapy.compat.chb(from_mpls)] * 256 + os3 = b''.join(output) + output = [scapy.compat.chb(from_ip)] * 256 + os4 = b''.join(output) rows = [{'outputs': os1}, {'outputs': os2}, {'outputs': os3}, @@ -333,11 +334,11 @@ class TestQOS(VppTestCase): p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1) @@ -388,7 +389,7 @@ class TestQOS(VppTestCase): MPLS(label=32, cos=3, ttl=2) / IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1) for p in rx: @@ -414,7 +415,7 @@ class TestQOS(VppTestCase): MPLS(label=33, ttl=2, cos=3) / IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1) @@ -446,10 +447,10 @@ class TestQOS(VppTestCase): # # QoS for all input values # - output = [chr(0)] * 256 + output = [scapy.compat.chb(0)] * 256 for i in range(0, 255): - output[i] = chr(255 - i) - os = ''.join(output) + output[i] = scapy.compat.chb(255 - i) + os = b''.join(output) rows = [{'outputs': os}, {'outputs': os}, {'outputs': os}, @@ -515,12 +516,12 @@ class TestQOS(VppTestCase): Dot1Q(vlan=11, prio=1) / IP(src="1.1.1.1", dst="10.0.0.2", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src="1.1.1.1", dst="10.0.0.1", tos=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0) @@ -536,12 +537,12 @@ class TestQOS(VppTestCase): Dot1Q(vlan=11, prio=2) / IPv6(src="2001::1", dst="2001::2", tc=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IPv6(src="3001::1", dst="2001::1", tc=1) / UDP(sport=1234, dport=1234) / - Raw(chr(100) * 65)) + Raw(scapy.compat.chb(100) * 65)) rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0) diff --git a/test/test_reassembly.py b/test/test_reassembly.py index 8004f362f3d..f3d983dc303 100644 --- a/test/test_reassembly.py +++ b/test/test_reassembly.py @@ -5,6 +5,7 @@ import six import unittest from parameterized import parameterized +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, GRE from scapy.layers.inet import IP, UDP, ICMP @@ -258,7 +259,8 @@ class TestIPv4Reassembly(TestIPReassemblyMixin, VppTestCase): cls.pkt_infos = [] for index, info in six.iteritems(infos): p = info.data - # cls.logger.debug(ppp("Packet:", p.__class__(str(p)))) + # cls.logger.debug(ppp("Packet:", + # p.__class__(scapy.compat.raw(p)))) fragments_400 = fragment_rfc791(p, 400) fragments_300 = fragment_rfc791(p, 300) fragments_200 = [ @@ -589,7 +591,8 @@ class TestIPv6Reassembly(TestIPReassemblyMixin, VppTestCase): cls.pkt_infos = [] for index, info in six.iteritems(infos): p = info.data - # cls.logger.debug(ppp("Packet:", p.__class__(str(p)))) + # cls.logger.debug(ppp("Packet:", + # p.__class__(scapy.compat.raw(p)))) fragments_400 = fragment_rfc8200(p, info.index, 400) fragments_300 = fragment_rfc8200(p, info.index, 300) cls.pkt_infos.append((index, fragments_400, fragments_300)) @@ -772,7 +775,7 @@ class TestIPv6Reassembly(TestIPReassemblyMixin, VppTestCase): Raw()) self.extend_packet(p, 1000, self.padding) fragments = fragment_rfc8200(p, 1, 500) - bad_fragment = p.__class__(str(fragments[1])) + bad_fragment = p.__class__(scapy.compat.raw(fragments[1])) bad_fragment[IPv6ExtHdrFragment].nh = 59 bad_fragment[IPv6ExtHdrFragment].offset = 0 self.pg_enable_capture() @@ -880,7 +883,8 @@ class TestIPv4ReassemblyLocalNode(VppTestCase): cls.pkt_infos = [] for index, info in six.iteritems(infos): p = info.data - # cls.logger.debug(ppp("Packet:", p.__class__(str(p)))) + # cls.logger.debug(ppp("Packet:", + # p.__class__(scapy.compat.raw(p)))) fragments_300 = fragment_rfc791(p, 300) cls.pkt_infos.append((index, fragments_300)) cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags] diff --git a/test/test_srv6.py b/test/test_srv6.py index 2ea9da76088..cc53c6b1e79 100644 --- a/test/test_srv6.py +++ b/test/test_srv6.py @@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting @@ -1437,7 +1438,7 @@ class TestSRv6(VppTestCase): tx_ip.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip = IP(str(tx_ip)) + tx_ip = IP(scapy.compat.raw(tx_ip)) self.assertEqual(rx_srh.payload, tx_ip) @@ -1489,7 +1490,7 @@ class TestSRv6(VppTestCase): self.assertEqual(rx_srh.nh, 59) # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt - self.assertEqual(Ether(str(rx_srh.payload)), tx_ether) + self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether) self.logger.debug("packet verification: SUCCESS") @@ -1765,7 +1766,7 @@ class TestSRv6(VppTestCase): tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip2 = IP(str(tx_ip2)) + tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) @@ -1791,7 +1792,7 @@ class TestSRv6(VppTestCase): tx_ip = tx_pkt.getlayer(IPv6) # we can't just get the 2nd Ether layer # get the Raw content and dissect it as Ether - tx_eth1 = Ether(str(tx_pkt[Raw])) + tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) # verify if rx'ed packet has no SRH self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) @@ -1837,7 +1838,7 @@ class TestSRv6(VppTestCase): # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible - p = Ether(str(p)) + p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) @@ -2087,7 +2088,7 @@ class TestSRv6(VppTestCase): # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( - str(Ether(str(packet[Raw]))[Raw])) + Ether(scapy.compat.r(packet[Raw]))[Raw]) return payload_info @@ -2101,7 +2102,7 @@ class TestSRv6(VppTestCase): :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" - % (dst_if.name, compare_func.func_name)) + % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: diff --git a/test/test_srv6_ad.py b/test/test_srv6_ad.py index 728ecd8a796..40cc1906612 100644 --- a/test/test_srv6_ad.py +++ b/test/test_srv6_ad.py @@ -10,6 +10,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting @@ -363,7 +364,7 @@ class TestSRv6(VppTestCase): tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip2 = IP(str(tx_ip2)) + tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) @@ -405,7 +406,7 @@ class TestSRv6(VppTestCase): tx_ip.chksum = None # -> read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - self.assertEqual(rx_srh.payload, IP(str(tx_ip))) + self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip))) self.logger.debug("packet verification: SUCCESS") @@ -495,7 +496,7 @@ class TestSRv6(VppTestCase): tx_ip = tx_pkt.getlayer(IPv6) # we can't just get the 2nd Ether layer # get the Raw content and dissect it as Ether - tx_eth1 = Ether(str(tx_pkt[Raw])) + tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) # verify if rx'ed packet has no SRH self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) @@ -535,7 +536,7 @@ class TestSRv6(VppTestCase): # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt tx_ether = tx_pkt.getlayer(Ether) - self.assertEqual(Ether(str(rx_srh.payload)), tx_ether) + self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether) self.logger.debug("packet verification: SUCCESS") @@ -573,7 +574,7 @@ class TestSRv6(VppTestCase): # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible - p = Ether(str(p)) + p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) @@ -736,7 +737,7 @@ class TestSRv6(VppTestCase): # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( - Ether(str(packet[Raw]))[Raw]) + Ether(scapy.compat.raw(packet[Raw]))[Raw]) return payload_info @@ -750,7 +751,7 @@ class TestSRv6(VppTestCase): :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" - % (dst_if.name, compare_func.func_name)) + % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: diff --git a/test/test_srv6_as.py b/test/test_srv6_as.py index 1ff7906e513..6b8e23c8520 100755 --- a/test/test_srv6_as.py +++ b/test/test_srv6_as.py @@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting @@ -488,7 +489,7 @@ class TestSRv6(VppTestCase): tx_ip.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip = IP(str(tx_ip)) + tx_ip = IP(scapy.compat.raw(tx_ip)) self.assertEqual(payload, tx_ip) @@ -537,7 +538,7 @@ class TestSRv6(VppTestCase): payload = rx_ip.payload # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt - self.assertEqual(Ether(str(payload)), tx_ether) + self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether) self.logger.debug("packet verification: SUCCESS") @@ -590,7 +591,7 @@ class TestSRv6(VppTestCase): tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip2 = IP(str(tx_ip2)) + tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) @@ -609,7 +610,7 @@ class TestSRv6(VppTestCase): tx_ip = tx_pkt.getlayer(IPv6) # we can't just get the 2nd Ether layer # get the Raw content and dissect it as Ether - tx_eth1 = Ether(str(tx_pkt[Raw])) + tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) # verify if rx'ed packet has no SRH self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) @@ -655,7 +656,7 @@ class TestSRv6(VppTestCase): # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible - p = Ether(str(p)) + p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) @@ -817,7 +818,7 @@ class TestSRv6(VppTestCase): # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( - Ether(str(packet[Raw]))[Raw]) + Ether(scapy.compat.raw(packet[Raw]))[Raw]) return payload_info @@ -831,7 +832,7 @@ class TestSRv6(VppTestCase): :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" - % (dst_if.name, compare_func.func_name)) + % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: diff --git a/test/util.py b/test/util.py index fc0ebd73500..d27b3fb25e6 100644 --- a/test/util.py +++ b/test/util.py @@ -7,6 +7,7 @@ import six import sys import os.path +import scapy.compat from scapy.layers.l2 import Ether from scapy.layers.inet import IP from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\ @@ -282,7 +283,7 @@ def fragment_rfc791(packet, fragsize, _logger=None): """ logger = LoggerWrapper(_logger) logger.debug(ppp("Fragmenting packet:", packet)) - packet = packet.__class__(str(packet)) # recalculate all values + packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values if len(packet[IP].options) > 0: raise Exception("Not implemented") if len(packet) <= fragsize: @@ -290,7 +291,7 @@ def fragment_rfc791(packet, fragsize, _logger=None): pre_ip_len = len(packet) - len(packet[IP]) ip_header_len = packet[IP].ihl * 4 - hex_packet = str(packet) + hex_packet = scapy.compat.raw(packet) hex_headers = hex_packet[:(pre_ip_len + ip_header_len)] hex_payload = hex_packet[(pre_ip_len + ip_header_len):] @@ -327,7 +328,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None): :returns: list of fragments """ logger = LoggerWrapper(_logger) - packet = packet.__class__(str(packet)) # recalculate all values + packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values if len(packet) <= fragsize: return [packet] logger.debug(ppp("Fragmenting packet:", packet)) @@ -378,7 +379,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None): logger.debug(ppp("Per-fragment headers:", per_fragment_headers)) ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1] - hex_payload = str(ext_and_upper_layer) + hex_payload = scapy.compat.raw(ext_and_upper_layer) logger.debug("Payload length is %s" % len(hex_payload)) logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer)) @@ -407,7 +408,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None): p[IPv6ExtHdrFragment].id = identification p[IPv6ExtHdrFragment].offset = 0 p[IPv6ExtHdrFragment].m = 1 - p = p.__class__(str(p)) + p = p.__class__(scapy.compat.raw(p)) logger.debug(ppp("Fragment %s:" % len(pkts), p)) pkts.append(p) offset = first_payload_len_nfb * 8 @@ -424,7 +425,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None): p[IPv6ExtHdrFragment].id = identification p[IPv6ExtHdrFragment].offset = offset / 8 p[IPv6ExtHdrFragment].m = 1 - p = p.__class__(str(p)) + p = p.__class__(scapy.compat.raw(p)) logger.debug(ppp("Fragment %s:" % len(pkts), p)) pkts.append(p) offset = offset + l_nfb * 8 diff --git a/test/vpp_pg_interface.py b/test/vpp_pg_interface.py index b22a93388a6..8792e6c3767 100644 --- a/test/vpp_pg_interface.py +++ b/test/vpp_pg_interface.py @@ -3,6 +3,8 @@ import time import socket import struct from traceback import format_exc, format_stack + +import scapy.compat from scapy.utils import wrpcap, rdpcap, PcapReader from scapy.plist import PacketList from vpp_interface import VppInterface @@ -414,7 +416,7 @@ class VppPGInterface(VppInterface): # Make Dot1AD packet content recognizable to scapy if arp_reply.type == 0x88a8: arp_reply.type = 0x8100 - arp_reply = Ether(str(arp_reply)) + arp_reply = Ether(scapy.compat.raw(arp_reply)) try: if arp_reply[ARP].op == ARP.is_at: self.test.logger.info("VPP %s MAC address is %s " % @@ -460,8 +462,11 @@ class VppPGInterface(VppInterface): ndp_reply = captured_packet.copy() # keep original for exception # Make Dot1AD packet content recognizable to scapy if ndp_reply.type == 0x88a8: + self._test.logger.info( + "Replacing EtherType: 0x88a8 with " + "0x8100 and regenerating Ethernet header. ") ndp_reply.type = 0x8100 - ndp_reply = Ether(str(ndp_reply)) + ndp_reply = Ether(scapy.compat.raw(ndp_reply)) try: ndp_na = ndp_reply[ICMPv6ND_NA] opt = ndp_na[ICMPv6NDOptDstLLAddr] |