summaryrefslogtreecommitdiffstats
path: root/test/test_flowprobe.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r--test/test_flowprobe.py43
1 files changed, 24 insertions, 19 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
index df6b4230699..d2fd031777c 100644
--- a/test/test_flowprobe.py
+++ b/test/test_flowprobe.py
@@ -1,4 +1,6 @@
#!/usr/bin/env python
+
+import binascii
import random
import socket
import unittest
@@ -196,8 +198,8 @@ class MethodHolder(VppTestCase):
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
for record in data:
- self.assertEqual(int(record[1].encode('hex'), 16), octets)
- self.assertEqual(int(record[2].encode('hex'), 16), packets)
+ self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
def send_packets(self, src_if=None, dst_if=None):
if src_if is None:
@@ -225,9 +227,9 @@ class MethodHolder(VppTestCase):
if data_set is not None:
for record in data:
# skip flow if in/out gress interface is 0
- if int(record[10].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[10]), 16) == 0:
continue
- if int(record[14].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[14]), 16) == 0:
continue
for field in data_set:
@@ -247,7 +249,7 @@ class MethodHolder(VppTestCase):
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.src)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'dst_ip':
if ip_ver == 'v4':
ip = socket.inet_pton(socket.AF_INET,
@@ -255,12 +257,13 @@ class MethodHolder(VppTestCase):
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.dst)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'sport':
value = int(capture[0][UDP].sport)
elif value == 'dport':
value = int(capture[0][UDP].dport)
- self.assertEqual(int(record[field].encode('hex'), 16),
+ self.assertEqual(int(binascii.hexlify(
+ record[field]), 16),
value)
def verify_cflow_data_notimer(self, decoder, capture, cflows):
@@ -274,8 +277,10 @@ class MethodHolder(VppTestCase):
for rec in data:
p = capture[idx]
idx += 1
- self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16))
- self.assertEqual(1, int(rec[2].encode('hex'), 16))
+ self.assertEqual(p[IP].len, int(
+ binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(
+ binascii.hexlify(rec[2]), 16))
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
@@ -411,25 +416,25 @@ class Flowprobe(MethodHolder):
if cflow.haslayer(Data):
record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
# ingress interface
- self.assertEqual(int(record[10].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
# egress interface
- self.assertEqual(int(record[14].encode('hex'), 16), 9)
+ self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
# packets
- self.assertEqual(int(record[2].encode('hex'), 16), 1)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
# src mac
self.assertEqual(':'.join(re.findall('..', record[56].encode(
'hex'))), self.pg8.local_mac)
# dst mac
self.assertEqual(':'.join(re.findall('..', record[80].encode(
'hex'))), self.pg8.remote_mac)
- flowTimestamp = int(record[156].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
# flow start timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
- flowTimestamp = int(record[157].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
# flow end timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# ethernet type
- self.assertEqual(int(record[256].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
# src ip
self.assertEqual('.'.join(re.findall('..', record[8].encode(
'hex'))),
@@ -441,13 +446,13 @@ class Flowprobe(MethodHolder):
'.'.join('{:02x}'.format(int(n)) for n in
"9.0.0.100".split('.')))
# protocol (TCP)
- self.assertEqual(int(record[4].encode('hex'), 16), 6)
+ self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
# src port
- self.assertEqual(int(record[7].encode('hex'), 16), 1234)
+ self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
# dst port
- self.assertEqual(int(record[11].encode('hex'), 16), 4321)
+ self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
# tcp flags
- self.assertEqual(int(record[6].encode('hex'), 16), 80)
+ self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")