aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_nat.py')
-rw-r--r--test/test_nat.py85
1 files changed, 44 insertions, 41 deletions
diff --git a/test/test_nat.py b/test/test_nat.py
index 0ef30267088..160890a078e 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -6,6 +6,8 @@ import struct
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
@@ -320,7 +322,8 @@ class MethodHolder(VppTestCase):
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
def extract_ip4(self, ip6, plen):
"""
@@ -689,8 +692,8 @@ class MethodHolder(VppTestCase):
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
- p = p.__class__(str(p))
- chksum = p['TCP'].chksum
+ p = p.__class__(scapy.compat.raw(p))
+ chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
@@ -869,8 +872,8 @@ class MethodHolder(VppTestCase):
self.assertEqual(6, len(data))
for record in data:
# natEvent
- self.assertIn(ord(record[230]), [4, 5])
- if ord(record[230]) == 4:
+ self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+ if scapy.compat.orb(record[230]) == 4:
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
@@ -882,16 +885,16 @@ class MethodHolder(VppTestCase):
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
- if IP_PROTOS.icmp == ord(record[4]):
+ if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
- elif IP_PROTOS.tcp == ord(record[4]):
+ elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
- elif IP_PROTOS.udp == ord(record[4]):
+ elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
@@ -910,7 +913,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 3)
+ self.assertEqual(scapy.compat.orb(record[230]), 3)
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
@@ -924,7 +927,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
@@ -940,7 +943,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
@@ -957,7 +960,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
@@ -976,7 +979,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
@@ -996,15 +999,15 @@ class MethodHolder(VppTestCase):
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 10)
+ self.assertEqual(scapy.compat.orb(record[230]), 10)
else:
- self.assertEqual(ord(record[230]), 11)
+ self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
@@ -1027,9 +1030,9 @@ class MethodHolder(VppTestCase):
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 6)
+ self.assertEqual(scapy.compat.orb(record[230]), 6)
else:
- self.assertEqual(ord(record[230]), 7)
+ self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
@@ -1044,7 +1047,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
@@ -1076,7 +1079,7 @@ class MethodHolder(VppTestCase):
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
@@ -1167,9 +1170,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
@@ -1248,9 +1251,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -1317,9 +1320,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
@@ -1346,9 +1349,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -1422,9 +1425,9 @@ class MethodHolder(VppTestCase):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
@@ -3539,7 +3542,7 @@ class TestNAT44(MethodHolder):
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
@@ -3667,7 +3670,7 @@ class TestNAT44(MethodHolder):
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
@@ -8043,7 +8046,7 @@ class TestNAT64(MethodHolder):
reass_n_start = len(reass)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
@@ -8059,7 +8062,7 @@ class TestNAT64(MethodHolder):
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "b" * 16 + "C" * 3
+ data = b"A" * 4 + b"b" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
@@ -8127,7 +8130,7 @@ class TestNAT64(MethodHolder):
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
@@ -8144,7 +8147,7 @@ class TestNAT64(MethodHolder):
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
@@ -8283,7 +8286,7 @@ class TestNAT64(MethodHolder):
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
@@ -8358,9 +8361,9 @@ class TestNAT64(MethodHolder):
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 10:
+ if scapy.compat.orb(data[0][230]) == 10:
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 6:
+ elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
@@ -8387,9 +8390,9 @@ class TestNAT64(MethodHolder):
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 11:
+ if scapy.compat.orb(data[0][230]) == 11:
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 7:
+ elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,