aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--test/framework.py20
-rw-r--r--test/hook.py5
-rw-r--r--test/test_acl_plugin_l2l3.py9
-rw-r--r--test/test_acl_plugin_macip.py48
-rw-r--r--test/test_bfd.py29
-rw-r--r--test/test_bier.py46
-rw-r--r--test/test_cdp.py4
-rw-r--r--test/test_dhcp.py5
-rw-r--r--test/test_gbp.py2
-rw-r--r--test/test_gre.py13
-rw-r--r--test/test_ip4.py5
-rw-r--r--test/test_ip4_vrf_multi_instance.py4
-rw-r--r--test/test_ip6.py1
-rw-r--r--test/test_ipsec_nat.py6
-rw-r--r--test/test_lb.py10
-rw-r--r--test/test_map.py8
-rw-r--r--test/test_mpls.py3
-rw-r--r--test/test_nat.py85
-rw-r--r--test/test_neighbor.py6
-rw-r--r--test/test_punt.py5
-rw-r--r--test/test_qos.py61
-rw-r--r--test/test_reassembly.py12
-rw-r--r--test/test_srv6.py15
-rw-r--r--test/test_srv6_ad.py15
-rwxr-xr-xtest/test_srv6_as.py15
-rw-r--r--test/util.py13
-rw-r--r--test/vpp_pg_interface.py9
27 files changed, 261 insertions, 193 deletions
diff --git a/test/framework.py b/test/framework.py
index 2c1c8291ed1..7656253294e 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -18,6 +18,8 @@ from threading import Thread, Event
from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
+
+import scapy.compat
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
@@ -511,7 +513,11 @@ class VppTestCase(unittest.TestCase):
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
+ cls.logger.debug("Disconnecting class vapi client on %s",
+ cls.__name__)
cls.vapi.disconnect()
+ cls.logger.debug("Deleting class vapi attribute on %s",
+ cls.__name__)
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
@@ -519,6 +525,8 @@ class VppTestCase(unittest.TestCase):
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s",
+ cls.__name__)
del cls.vpp
if cls.vpp_startup_failed:
@@ -560,7 +568,6 @@ class VppTestCase(unittest.TestCase):
def tearDown(self):
""" Show various debug prints after each test """
- super(VppTestCase, self).tearDown()
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
@@ -848,14 +855,14 @@ class VppTestCase(unittest.TestCase):
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
- received = packet.__class__(str(packet))
+ received = packet.__class__(scapy.compat.raw(packet))
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = received.__class__(str(received))
+ temp = received.__class__(scapy.compat.raw(received))
while True:
layer = temp.getlayer(counter)
if layer:
@@ -872,7 +879,7 @@ class VppTestCase(unittest.TestCase):
counter = counter + 1
if 0 == len(checksums):
return
- temp = temp.__class__(str(temp))
+ temp = temp.__class__(scapy.compat.raw(temp))
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
@@ -889,9 +896,10 @@ class VppTestCase(unittest.TestCase):
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(str(received_packet))
+ recalculated = received_packet.__class__(
+ scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
- recalculated = recalculated.__class__(str(recalculated))
+ recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
diff --git a/test/hook.py b/test/hook.py
index 6c971c9937f..555fc2ea32a 100644
--- a/test/hook.py
+++ b/test/hook.py
@@ -5,6 +5,9 @@ import traceback
from log import RED, single_line_delim, double_line_delim
import ipaddress
from subprocess import check_output, CalledProcessError
+
+import scapy.compat
+
from util import check_core_path, get_core_path
@@ -31,7 +34,7 @@ class Hook(object):
return val
if len(val) == 6:
return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format(
- ord(x)) for x in val]))
+ scapy.compat.orb(x)) for x in val]))
try:
# we don't call test_type(val) because it is a packed value.
return '{!s} ({!s})'.format(val, str(
diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py
index 73dd473c67f..2a99e86119b 100644
--- a/test/test_acl_plugin_l2l3.py
+++ b/test/test_acl_plugin_l2l3.py
@@ -28,6 +28,7 @@ from socket import inet_pton, AF_INET, AF_INET6
from random import choice, shuffle
from pprint import pprint
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP, TCP
@@ -308,12 +309,14 @@ class TestACLpluginL2L3(VppTestCase):
# Scapy IPv6 stuff is too smart for its own good.
# So we do this and coerce the ICMP into unknown type
if packet.haslayer(UDP):
- data = str(packet[UDP][Raw])
+ data = scapy.compat.raw(packet[UDP][Raw])
else:
if l3 == IP:
- data = str(ICMP(str(packet[l3].payload))[Raw])
+ data = scapy.compat.raw(ICMP(
+ scapy.compat.raw(packet[l3].payload))[Raw])
else:
- data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
+ data = scapy.compat.raw(ICMPv6Unknown(
+ scapy.compat.raw(packet[l3].payload)).msgbody)
udp_or_icmp = packet[l3].payload
payload_info = self.payload_to_info(data)
packet_index = payload_info.index
diff --git a/test/test_acl_plugin_macip.py b/test/test_acl_plugin_macip.py
index d2d684abc11..6f99646e6e2 100644
--- a/test/test_acl_plugin_macip.py
+++ b/test/test_acl_plugin_macip.py
@@ -3,12 +3,14 @@ from __future__ import print_function
"""ACL plugin - MACIP tests
"""
import binascii
+import ipaddress
import random
from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
from struct import pack, unpack
import re
import unittest
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
@@ -194,8 +196,10 @@ class MethodHolder(VppTestCase):
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
- acl_count=1, rules_count=[1]):
+ acl_count=1, rules_count=None):
acls = []
+ if rules_count is None:
+ rules_count = [1]
src_mac = int("220000dead00", 16)
for acl in range(2, (acl_count+1) * 2):
rules = []
@@ -248,7 +252,7 @@ class MethodHolder(VppTestCase):
ip4[3] = 0
ip6[8] = random.randint(100, 200)
ip6[15] = 0
- ip_pack = ''
+ ip_pack = b''
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
@@ -256,8 +260,9 @@ class MethodHolder(VppTestCase):
'is_ipv6': is_ip6,
'src_ip_addr': ip_pack,
'src_ip_prefix_len': ip_len,
- 'src_mac': mac.replace(':', '').decode('hex'),
- 'src_mac_mask': mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mask.replace(':', ''))})
rules.append(rule)
if ip_type == self.WILD_IP:
break
@@ -275,7 +280,7 @@ class MethodHolder(VppTestCase):
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
for acl in range(2, (acl_count+1) * 2):
- self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1])
+ self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
self.vapi.macip_acl_interface_get()
@@ -410,30 +415,32 @@ class MethodHolder(VppTestCase):
sub_ip[15] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[2] = str(int(sub_ip[2]) + 1)
+ sub_ip[2] = int(sub_ip[2]) + 1
sub_ip[14] = random.randint(100, 199)
sub_ip[15] = random.randint(200, 255)
- src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_src_ip6 = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
packet /= IPv6(src=src_ip6, dst=dst_ip6)
else:
if ip_type != self.EXACT_IP:
sub_ip = ip_rule.split('.')
if ip_type == self.WILD_IP:
- sub_ip[0] = str(random.randint(1, 49))
- sub_ip[1] = str(random.randint(50, 99))
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
+ sub_ip[0] = random.randint(1, 49)
+ sub_ip[1] = random.randint(50, 99)
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[1] = str(int(sub_ip[1])+1)
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
- src_ip4 = ".".join(sub_ip)
+ sub_ip[1] = int(sub_ip[1])+1
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
+ src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
- packet[Raw].load += " mac:"+src_mac
+ packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac)
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
if isinstance(src_if, VppSubInterface):
@@ -485,7 +492,9 @@ class MethodHolder(VppTestCase):
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
for i in range(8, 16):
sub_ip[i] = 0
- ip = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_ip = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ ip = inet_ntop(AF_INET6, packed_ip)
else:
if ip_type == self.WILD_IP:
ip = "0.0.0.0"
@@ -540,8 +549,9 @@ class MethodHolder(VppTestCase):
'is_ipv6': is_ip6,
'src_ip_addr': ip_rule,
'src_ip_prefix_len': prefix_len,
- 'src_mac': mac_rule.replace(':', '').decode('hex'),
- 'src_mac_mask': mac_mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mac_mask.replace(':', ''))})
macip_rules.append(macip_rule)
# deny all other packets
diff --git a/test/test_bfd.py b/test/test_bfd.py
index 7b474228eb6..4c3f5354c1a 100644
--- a/test/test_bfd.py
+++ b/test/test_bfd.py
@@ -12,6 +12,7 @@ from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
from six import moves
+import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
@@ -42,7 +43,8 @@ class AuthKeyFactory(object):
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
@@ -435,7 +437,8 @@ class BFDTestSession(object):
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
@@ -492,7 +495,7 @@ class BFDTestSession(object):
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+ b"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
@@ -923,7 +926,7 @@ class BFD4TestCase(VppTestCase):
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=0.5 * self.vpp_session.required_min_rx)
+ required_min_rx=self.vpp_session.required_min_rx // 2)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
@@ -1118,8 +1121,8 @@ class BFD4TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@ -1612,8 +1615,8 @@ class BFD6TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@ -2293,7 +2296,7 @@ class BFDCLITestCase(VppTestCase):
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
- "",
+ b"",
"CLI command response")
def cli_verify_response(self, cli, expected):
@@ -2325,7 +2328,7 @@ class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
@@ -2342,7 +2345,7 @@ class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
@@ -2361,7 +2364,7 @@ class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
@@ -2380,7 +2383,7 @@ class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
diff --git a/test/test_bier.py b/test/test_bier.py
index 9a9db3b0b6e..58ca3b08ebc 100644
--- a/test/test_bier.py
+++ b/test/test_bier.py
@@ -12,6 +12,7 @@ from vpp_bier import BIER_HDR_PAYLOAD, VppBierImp, VppBierDispEntry, \
VppBierDispTable, VppBierTable, VppBierTableID, VppBierRoute
from vpp_udp_encap import VppUdpEncap
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
@@ -122,10 +123,11 @@ class TestBier(VppTestCase):
for pkt_size in pkt_sizes:
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
- BIER(length=hdr_len_id, BitString=chr(255)*n_bytes) /
+ BIER(length=hdr_len_id,
+ BitString=scapy.compat.chb(255)*n_bytes) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * pkt_size))
+ Raw(scapy.compat.chb(5) * pkt_size))
pkts = p
self.pg0.add_stream(pkts)
@@ -159,18 +161,18 @@ class TestBier(VppTestCase):
self.assertEqual(bier_hdr.Proto, 5)
# The bit-string should consist only of the BP given by i.
- byte_array = ['\0'] * (n_bytes)
- byte_val = chr(1 << (bp - 1) % 8)
- byte_pos = n_bytes - (((bp - 1) / 8) + 1)
+ byte_array = [b'\0'] * (n_bytes)
+ byte_val = scapy.compat.chb(1 << (bp - 1) % 8)
+ byte_pos = n_bytes - (((bp - 1) // 8) + 1)
byte_array[byte_pos] = byte_val
- bitstring = ''.join(byte_array)
+ bitstring = ''.join([scapy.compat.chb(x) for x in byte_array])
self.assertEqual(len(bitstring), len(bier_hdr.BitString))
self.assertEqual(bitstring, bier_hdr.BitString)
#
# cleanup. not strictly necessary, but it's much quicker this way
- # becuase the bier_fib_dump and ip_fib_dump will be empty when the
+ # because the bier_fib_dump and ip_fib_dump will be empty when the
# auto-cleanup kicks in
#
for br in bier_routes:
@@ -222,7 +224,7 @@ class TestBier(VppTestCase):
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_64,
entropy=ii,
- BitString=chr(255)*16) /
+ BitString=scapy.compat.chb(255)*16) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
@@ -334,7 +336,7 @@ class TestBier(VppTestCase):
#
# An imposition object with both bit-positions set
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
bi.add_vpp_config()
#
@@ -446,7 +448,7 @@ class TestBier(VppTestCase):
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
@@ -460,7 +462,7 @@ class TestBier(VppTestCase):
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
@@ -486,7 +488,7 @@ class TestBier(VppTestCase):
# A multicast route to forward post BIER disposition that needs
# a check against sending back into the BIER core
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
bi.add_vpp_config()
route_eg_232_1_1_2 = VppIpMRoute(
@@ -506,7 +508,7 @@ class TestBier(VppTestCase):
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
@@ -523,10 +525,10 @@ class TestBier(VppTestCase):
bt = VppBierTable(self, bti, 77)
bt.add_vpp_config()
- lowest = ['\0'] * (n_bytes)
- lowest[-1] = chr(1)
- highest = ['\0'] * (n_bytes)
- highest[0] = chr(128)
+ lowest = [b'\0'] * (n_bytes)
+ lowest[-1] = scapy.compat.chb(1)
+ highest = [b'\0'] * (n_bytes)
+ highest[0] = scapy.compat.chb(128)
#
# Impostion Sets bit strings
@@ -636,7 +638,7 @@ class TestBier(VppTestCase):
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * 32))
+ Raw(scapy.compat.chb(5) * 32))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
@@ -647,7 +649,7 @@ class TestBier(VppTestCase):
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * 512))
+ Raw(scapy.compat.chb(5) * 512))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
self.assertEqual(rx[0][IP].src, "1.1.1.1")
@@ -716,9 +718,9 @@ class TestBier(VppTestCase):
# only use the second, but creating 2 tests with a non-zero
# value index in the route add
#
- bi = VppBierImp(self, bti, 333, chr(0xff) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0xff) * 32)
bi.add_vpp_config()
- bi2 = VppBierImp(self, bti, 334, chr(0xff) * 32)
+ bi2 = VppBierImp(self, bti, 334, scapy.compat.chb(0xff) * 32)
bi2.add_vpp_config()
#
@@ -821,7 +823,7 @@ class TestBier(VppTestCase):
UDP(sport=333, dport=8138) /
BIFT(sd=1, set=0, bsl=2, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
diff --git a/test/test_cdp.py b/test/test_cdp.py
index e8ced7cb796..1996c5141f0 100644
--- a/test/test_cdp.py
+++ b/test/test_cdp.py
@@ -71,6 +71,10 @@ class TestCDP(VppTestCase):
super(TestCDP, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCDP, cls).tearDownClass()
+
def test_enable_cdp(self):
self.logger.info(self.vapi.cli("cdp enable"))
ret = self.vapi.cli("show cdp")
diff --git a/test/test_dhcp.py b/test/test_dhcp.py
index 2efa9a7e244..62db66282b6 100644
--- a/test/test_dhcp.py
+++ b/test/test_dhcp.py
@@ -8,6 +8,7 @@ from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
+import scapy.compat
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
@@ -490,7 +491,7 @@ class TestDHCP(VppTestCase):
#
# 1. not our IP address = not checked by VPP? so offer is replayed
# to client
- bad_ip = option_82[0:8] + chr(33) + option_82[9:]
+ bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
@@ -504,7 +505,7 @@ class TestDHCP(VppTestCase):
"DHCP offer option 82 bad address")
# 2. Not a sw_if_index VPP knows
- bad_if_index = option_82[0:2] + chr(33) + option_82[3:]
+ bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
diff --git a/test/test_gbp.py b/test/test_gbp.py
index 43043033a45..a5f053dcacc 100644
--- a/test/test_gbp.py
+++ b/test/test_gbp.py
@@ -3207,7 +3207,7 @@ class TestGBP(VppTestCase):
vx_tun_l3.add_vpp_config()
#
- # packets destined to unkown addresses in the BVI's subnet
+ # packets destined to unknown addresses in the BVI's subnet
# are ARP'd for
#
p4 = (Ether(src=self.pg0.remote_mac, dst=str(self.router_mac)) /
diff --git a/test/test_gre.py b/test/test_gre.py
index ccd66c428e8..694f819308e 100644
--- a/test/test_gre.py
+++ b/test/test_gre.py
@@ -2,6 +2,7 @@
import unittest
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
@@ -146,7 +147,8 @@ class TestGRE(VppTestCase):
GRE() /
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
@@ -165,7 +167,8 @@ class TestGRE(VppTestCase):
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
Dot1Q(vlan=vlan) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
@@ -217,7 +220,7 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
rx_ip = rx_gre[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
@@ -243,7 +246,7 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
tx_ip = tx[IP]
rx_ip = rx_gre[IP]
@@ -270,7 +273,7 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IP].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IP].payload))
rx_ip = rx_gre[IPv6]
tx_ip = tx[IPv6]
diff --git a/test/test_ip4.py b/test/test_ip4.py
index 492e96a99e3..57005f79355 100644
--- a/test/test_ip4.py
+++ b/test/test_ip4.py
@@ -4,6 +4,7 @@ import random
import socket
import unittest
+import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
@@ -310,7 +311,7 @@ class TestIPv4FibCrud(VppTestCase):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
@@ -325,7 +326,7 @@ class TestIPv4FibCrud(VppTestCase):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr, is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
diff --git a/test/test_ip4_vrf_multi_instance.py b/test/test_ip4_vrf_multi_instance.py
index 71532adf364..6d0f21d8bc9 100644
--- a/test/test_ip4_vrf_multi_instance.py
+++ b/test/test_ip4_vrf_multi_instance.py
@@ -66,6 +66,7 @@ import unittest
import random
import socket
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ARP
@@ -344,7 +345,8 @@ class TestIp4VrfMultiInst(VppTestCase):
if found:
break
for host in pg_if.remote_hosts:
- if str(addr) == str(host.ip4):
+ if scapy.compat.raw(addr) == \
+ scapy.compat.raw(host.ip4):
vrf_count += 1
found = True
break
diff --git a/test/test_ip6.py b/test/test_ip6.py
index 1d52edb7721..07e3c7834c5 100644
--- a/test/test_ip6.py
+++ b/test/test_ip6.py
@@ -4,6 +4,7 @@ import socket
import unittest
from parameterized import parameterized
+import scapy.compat
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py
index cdb9cb438f2..e364f5ffe96 100644
--- a/test/test_ipsec_nat.py
+++ b/test/test_ipsec_nat.py
@@ -2,9 +2,11 @@
import socket
+import scapy.compat
from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
+
from util import ppp, ppc
from template_ipsec import TemplateIpsec
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
@@ -126,9 +128,9 @@ class IPSecNATTestCase(TemplateIpsec):
def verify_capture_encrypted(self, capture, sa):
for packet in capture:
try:
- copy = packet.__class__(str(packet))
+ copy = packet.__class__(scapy.compat.raw(packet))
del copy[UDP].len
- copy = packet.__class__(str(copy))
+ copy = packet.__class__(scapy.compat.raw(copy))
self.assert_equal(packet[UDP].len, copy[UDP].len,
"UDP header length")
self.assert_packet_checksums_valid(packet)
diff --git a/test/test_lb.py b/test/test_lb.py
index 86e7cb46ad3..fa179504855 100644
--- a/test/test_lb.py
+++ b/test/test_lb.py
@@ -1,5 +1,6 @@
import socket
+import scapy.compat
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
@@ -93,11 +94,12 @@ class TestLB(VppTestCase):
self.assertEqual(gre.proto, 0x0800 if isv4 else 0x86DD)
self.assertEqual(gre.flags, 0)
self.assertEqual(gre.version, 0)
- inner = IPver(str(gre.payload))
+ inner = IPver(scapy.compat.raw(gre.payload))
payload_info = self.payload_to_info(inner[Raw])
self.info = self.packet_infos[payload_info.index]
self.assertEqual(payload_info.src, self.pg0.sw_if_index)
- self.assertEqual(str(inner), str(self.info.data[IPver]))
+ self.assertEqual(scapy.compat.raw(inner),
+ scapy.compat.raw(self.info.data[IPver]))
def checkCapture(self, encap, isv4):
self.pg0.assert_nothing_captured()
@@ -135,7 +137,7 @@ class TestLB(VppTestCase):
)
self.assertEqual(ip.nh, 47)
# self.assertEqual(len(ip.options), 0)
- gre = GRE(str(p[IPv6].payload))
+ gre = GRE(scapy.compat.raw(p[IPv6].payload))
self.checkInner(gre, isv4)
elif (encap == 'l3dsr'):
ip = p[IP]
@@ -174,7 +176,7 @@ class TestLB(VppTestCase):
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
- udp = UDP(str(p[IPv6].payload))
+ udp = UDP(scapy.compat.raw(p[IPv6].payload))
self.assertEqual(udp.dport, 3307)
load[asid] += 1
except:
diff --git a/test/test_map.py b/test/test_map.py
index 952a737c1ac..1655ceda361 100644
--- a/test/test_map.py
+++ b/test/test_map.py
@@ -5,6 +5,8 @@ import unittest
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
+
+import scapy.compat
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
@@ -187,7 +189,7 @@ class TestMAP(VppTestCase):
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
def payload(self, len):
return 'x' * len
@@ -369,7 +371,7 @@ class TestMAP(VppTestCase):
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
- p6_translated['TCP'].options = [('MSS', 1300)]
+ p6_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
@@ -383,7 +385,7 @@ class TestMAP(VppTestCase):
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
- p4_translated['TCP'].options = [('MSS', 1300)]
+ p4_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
diff --git a/test/test_mpls.py b/test/test_mpls.py
index e7fb288f856..3198d905327 100644
--- a/test/test_mpls.py
+++ b/test/test_mpls.py
@@ -11,6 +11,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsLabel, MplsLspMode, find_mpls_route
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
@@ -1873,7 +1874,7 @@ class TestMPLSL2(VppTestCase):
verify_mpls_stack(self, rx, mpls_labels)
tx_eth = tx[Ether]
- rx_eth = Ether(str(rx[MPLS].payload))
+ rx_eth = Ether(scapy.compat.raw(rx[MPLS].payload))
self.assertEqual(rx_eth.src, tx_eth.src)
self.assertEqual(rx_eth.dst, tx_eth.dst)
diff --git a/test/test_nat.py b/test/test_nat.py
index 0ef30267088..160890a078e 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -6,6 +6,8 @@ import struct
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
@@ -320,7 +322,8 @@ class MethodHolder(VppTestCase):
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
def extract_ip4(self, ip6, plen):
"""
@@ -689,8 +692,8 @@ class MethodHolder(VppTestCase):
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
- p = p.__class__(str(p))
- chksum = p['TCP'].chksum
+ p = p.__class__(scapy.compat.raw(p))
+ chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
@@ -869,8 +872,8 @@ class MethodHolder(VppTestCase):
self.assertEqual(6, len(data))
for record in data:
# natEvent
- self.assertIn(ord(record[230]), [4, 5])
- if ord(record[230]) == 4:
+ self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+ if scapy.compat.orb(record[230]) == 4:
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
@@ -882,16 +885,16 @@ class MethodHolder(VppTestCase):
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
- if IP_PROTOS.icmp == ord(record[4]):
+ if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
- elif IP_PROTOS.tcp == ord(record[4]):
+ elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
- elif IP_PROTOS.udp == ord(record[4]):
+ elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
@@ -910,7 +913,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 3)
+ self.assertEqual(scapy.compat.orb(record[230]), 3)
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
@@ -924,7 +927,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
@@ -940,7 +943,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
@@ -957,7 +960,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
@@ -976,7 +979,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
@@ -996,15 +999,15 @@ class MethodHolder(VppTestCase):
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 10)
+ self.assertEqual(scapy.compat.orb(record[230]), 10)
else:
- self.assertEqual(ord(record[230]), 11)
+ self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
@@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase):
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 6)
+ self.assertEqual(scapy.compat.orb(record[230]), 6)
else:
- self.assertEqual(ord(record[230]), 7)
+ self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
@@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
@@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
@@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
@@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
@@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -3539,7 +3542,7 @@ class TestNAT44(MethodHolder):
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
@@ -3667,7 +3670,7 @@ class TestNAT44(MethodHolder):
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
@@ -8043,7 +8046,7 @@ class TestNAT64(MethodHolder):
reass_n_start = len(reass)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
@@ -8059,7 +8062,7 @@ class TestNAT64(MethodHolder):
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "b" * 16 + "C" * 3
+ data = b"A" * 4 + b"b" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
@@ -8127,7 +8130,7 @@ class TestNAT64(MethodHolder):
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
@@ -8144,7 +8147,7 @@ class TestNAT64(MethodHolder):
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
@@ -8283,7 +8286,7 @@ class TestNAT64(MethodHolder):
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
@@ -8358,9 +8361,9 @@ class TestNAT64(MethodHolder):
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 10:
+ if scapy.compat.orb(data[0][230]) == 10:
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 6:
+ elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
@@ -8387,9 +8390,9 @@ class TestNAT64(MethodHolder):
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 11:
+ if scapy.compat.orb(data[0][230]) == 11:
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 7:
+ elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,
diff --git a/test/test_neighbor.py b/test/test_neighbor.py
index c378cff4e13..aec48712921 100644
--- a/test/test_neighbor.py
+++ b/test/test_neighbor.py
@@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
VppIpTable, DpoProto
from vpp_papi import VppEnum
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
@@ -1178,8 +1179,9 @@ class ARPTestCase(VppTestCase):
#
# change the interface's MAC
#
- mac = [chr(0x00), chr(0x00), chr(0x00),
- chr(0x33), chr(0x33), chr(0x33)]
+ mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00),
+ scapy.compat.chb(0x00), scapy.compat.chb(0x33),
+ scapy.compat.chb(0x33), scapy.compat.chb(0x33)]
mac_string = ''.join(mac)
self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
diff --git a/test/test_punt.py b/test/test_punt.py
index 7959b981837..e265dc01f9b 100644
--- a/test/test_punt.py
+++ b/test/test_punt.py
@@ -11,6 +11,7 @@ import struct
from struct import unpack, unpack_from
from util import ppp, ppc
from re import compile
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
@@ -20,12 +21,12 @@ from framework import VppTestCase, VppTestRunner
# Format MAC Address
def get_mac_addr(bytes_addr):
- return ':'.join('%02x' % ord(b) for b in bytes_addr)
+ return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr)
# Format IP Address
def ipv4(bytes_addr):
- return '.'.join('%d' % ord(b) for b in bytes_addr)
+ return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr)
# Unpack Ethernet Frame
diff --git a/test/test_qos.py b/test/test_qos.py
index 90ce4063348..38f4aafcb11 100644
--- a/test/test_qos.py
+++ b/test/test_qos.py
@@ -9,6 +9,7 @@ from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsLabel, VppMplsTable
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
@@ -50,10 +51,10 @@ class TestQOS(VppTestCase):
# for table 1 map the n=0xff possible values of input QoS mark,
# n to 1-n
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
for i in range(0, 255):
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
@@ -64,8 +65,8 @@ class TestQOS(VppTestCase):
#
# For table 2 (and up) use the value n for everything
#
- output = [chr(2)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(2)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
@@ -73,8 +74,8 @@ class TestQOS(VppTestCase):
self.vapi.qos_egress_map_update(2, rows)
- output = [chr(3)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(3)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
@@ -82,8 +83,8 @@ class TestQOS(VppTestCase):
self.vapi.qos_egress_map_update(3, rows)
- output = [chr(4)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(4)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
@@ -121,12 +122,12 @@ class TestQOS(VppTestCase):
p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
#
# Since we have not yet enabled the recording of the input QoS
@@ -282,14 +283,14 @@ class TestQOS(VppTestCase):
from_ip = 6
from_mpls = 5
from_vlan = 4
- output = [chr(from_ext)] * 256
- os1 = ''.join(output)
- output = [chr(from_vlan)] * 256
- os2 = ''.join(output)
- output = [chr(from_mpls)] * 256
- os3 = ''.join(output)
- output = [chr(from_ip)] * 256
- os4 = ''.join(output)
+ output = [scapy.compat.chb(from_ext)] * 256
+ os1 = b''.join(output)
+ output = [scapy.compat.chb(from_vlan)] * 256
+ os2 = b''.join(output)
+ output = [scapy.compat.chb(from_mpls)] * 256
+ os3 = b''.join(output)
+ output = [scapy.compat.chb(from_ip)] * 256
+ os4 = b''.join(output)
rows = [{'outputs': os1},
{'outputs': os2},
{'outputs': os3},
@@ -333,11 +334,11 @@ class TestQOS(VppTestCase):
p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
@@ -388,7 +389,7 @@ class TestQOS(VppTestCase):
MPLS(label=32, cos=3, ttl=2) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
for p in rx:
@@ -414,7 +415,7 @@ class TestQOS(VppTestCase):
MPLS(label=33, ttl=2, cos=3) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
@@ -446,10 +447,10 @@ class TestQOS(VppTestCase):
#
# QoS for all input values
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
for i in range(0, 255):
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
@@ -515,12 +516,12 @@ class TestQOS(VppTestCase):
Dot1Q(vlan=11, prio=1) /
IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
@@ -536,12 +537,12 @@ class TestQOS(VppTestCase):
Dot1Q(vlan=11, prio=2) /
IPv6(src="2001::1", dst="2001::2", tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IPv6(src="3001::1", dst="2001::1", tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
diff --git a/test/test_reassembly.py b/test/test_reassembly.py
index 8004f362f3d..f3d983dc303 100644
--- a/test/test_reassembly.py
+++ b/test/test_reassembly.py
@@ -5,6 +5,7 @@ import six
import unittest
from parameterized import parameterized
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP
@@ -258,7 +259,8 @@ class TestIPv4Reassembly(TestIPReassemblyMixin, VppTestCase):
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
fragments_200 = [
@@ -589,7 +591,8 @@ class TestIPv6Reassembly(TestIPReassemblyMixin, VppTestCase):
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
@@ -772,7 +775,7 @@ class TestIPv6Reassembly(TestIPReassemblyMixin, VppTestCase):
Raw())
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
- bad_fragment = p.__class__(str(fragments[1]))
+ bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
bad_fragment[IPv6ExtHdrFragment].nh = 59
bad_fragment[IPv6ExtHdrFragment].offset = 0
self.pg_enable_capture()
@@ -880,7 +883,8 @@ class TestIPv4ReassemblyLocalNode(VppTestCase):
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
diff --git a/test/test_srv6.py b/test/test_srv6.py
index 2ea9da76088..cc53c6b1e79 100644
--- a/test/test_srv6.py
+++ b/test/test_srv6.py
@@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
@@ -1437,7 +1438,7 @@ class TestSRv6(VppTestCase):
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip = IP(str(tx_ip))
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(rx_srh.payload, tx_ip)
@@ -1489,7 +1490,7 @@ class TestSRv6(VppTestCase):
self.assertEqual(rx_srh.nh, 59)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
@@ -1765,7 +1766,7 @@ class TestSRv6(VppTestCase):
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
@@ -1791,7 +1792,7 @@ class TestSRv6(VppTestCase):
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
@@ -1837,7 +1838,7 @@ class TestSRv6(VppTestCase):
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
@@ -2087,7 +2088,7 @@ class TestSRv6(VppTestCase):
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(scapy.compat.r(packet[Raw]))[Raw])
return payload_info
@@ -2101,7 +2102,7 @@ class TestSRv6(VppTestCase):
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
diff --git a/test/test_srv6_ad.py b/test/test_srv6_ad.py
index 728ecd8a796..40cc1906612 100644
--- a/test/test_srv6_ad.py
+++ b/test/test_srv6_ad.py
@@ -10,6 +10,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
@@ -363,7 +364,7 @@ class TestSRv6(VppTestCase):
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
@@ -405,7 +406,7 @@ class TestSRv6(VppTestCase):
tx_ip.chksum = None
# -> read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
+ self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
self.logger.debug("packet verification: SUCCESS")
@@ -495,7 +496,7 @@ class TestSRv6(VppTestCase):
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
@@ -535,7 +536,7 @@ class TestSRv6(VppTestCase):
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
tx_ether = tx_pkt.getlayer(Ether)
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
@@ -573,7 +574,7 @@ class TestSRv6(VppTestCase):
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
@@ -736,7 +737,7 @@ class TestSRv6(VppTestCase):
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
return payload_info
@@ -750,7 +751,7 @@ class TestSRv6(VppTestCase):
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
diff --git a/test/test_srv6_as.py b/test/test_srv6_as.py
index 1ff7906e513..6b8e23c8520 100755
--- a/test/test_srv6_as.py
+++ b/test/test_srv6_as.py
@@ -9,6 +9,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
@@ -488,7 +489,7 @@ class TestSRv6(VppTestCase):
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip = IP(str(tx_ip))
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(payload, tx_ip)
@@ -537,7 +538,7 @@ class TestSRv6(VppTestCase):
payload = rx_ip.payload
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
@@ -590,7 +591,7 @@ class TestSRv6(VppTestCase):
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
@@ -609,7 +610,7 @@ class TestSRv6(VppTestCase):
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
@@ -655,7 +656,7 @@ class TestSRv6(VppTestCase):
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
@@ -817,7 +818,7 @@ class TestSRv6(VppTestCase):
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
return payload_info
@@ -831,7 +832,7 @@ class TestSRv6(VppTestCase):
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
diff --git a/test/util.py b/test/util.py
index fc0ebd73500..d27b3fb25e6 100644
--- a/test/util.py
+++ b/test/util.py
@@ -7,6 +7,7 @@ import six
import sys
import os.path
+import scapy.compat
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
@@ -282,7 +283,7 @@ def fragment_rfc791(packet, fragsize, _logger=None):
"""
logger = LoggerWrapper(_logger)
logger.debug(ppp("Fragmenting packet:", packet))
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet[IP].options) > 0:
raise Exception("Not implemented")
if len(packet) <= fragsize:
@@ -290,7 +291,7 @@ def fragment_rfc791(packet, fragsize, _logger=None):
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
- hex_packet = str(packet)
+ hex_packet = scapy.compat.raw(packet)
hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
@@ -327,7 +328,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None):
:returns: list of fragments
"""
logger = LoggerWrapper(_logger)
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet) <= fragsize:
return [packet]
logger.debug(ppp("Fragmenting packet:", packet))
@@ -378,7 +379,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None):
logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
- hex_payload = str(ext_and_upper_layer)
+ hex_payload = scapy.compat.raw(ext_and_upper_layer)
logger.debug("Payload length is %s" % len(hex_payload))
logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
@@ -407,7 +408,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None):
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = 0
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = first_payload_len_nfb * 8
@@ -424,7 +425,7 @@ def fragment_rfc8200(packet, identification, fragsize, _logger=None):
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = offset / 8
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = offset + l_nfb * 8
diff --git a/test/vpp_pg_interface.py b/test/vpp_pg_interface.py
index b22a93388a6..8792e6c3767 100644
--- a/test/vpp_pg_interface.py
+++ b/test/vpp_pg_interface.py
@@ -3,6 +3,8 @@ import time
import socket
import struct
from traceback import format_exc, format_stack
+
+import scapy.compat
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
@@ -414,7 +416,7 @@ class VppPGInterface(VppInterface):
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
+ arp_reply = Ether(scapy.compat.raw(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
self.test.logger.info("VPP %s MAC address is %s " %
@@ -460,8 +462,11 @@ class VppPGInterface(VppInterface):
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
+ self._test.logger.info(
+ "Replacing EtherType: 0x88a8 with "
+ "0x8100 and regenerating Ethernet header. ")
ndp_reply.type = 0x8100
- ndp_reply = Ether(str(ndp_reply))
+ ndp_reply = Ether(scapy.compat.raw(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]