diff options
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r-- | test/test_flowprobe.py | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index df6b4230699..d2fd031777c 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -1,4 +1,6 @@ #!/usr/bin/env python + +import binascii import random import socket import unittest @@ -196,8 +198,8 @@ class MethodHolder(VppTestCase): if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) for record in data: - self.assertEqual(int(record[1].encode('hex'), 16), octets) - self.assertEqual(int(record[2].encode('hex'), 16), packets) + self.assertEqual(int(binascii.hexlify(record[1]), 16), octets) + self.assertEqual(int(binascii.hexlify(record[2]), 16), packets) def send_packets(self, src_if=None, dst_if=None): if src_if is None: @@ -225,9 +227,9 @@ class MethodHolder(VppTestCase): if data_set is not None: for record in data: # skip flow if in/out gress interface is 0 - if int(record[10].encode('hex'), 16) == 0: + if int(binascii.hexlify(record[10]), 16) == 0: continue - if int(record[14].encode('hex'), 16) == 0: + if int(binascii.hexlify(record[14]), 16) == 0: continue for field in data_set: @@ -247,7 +249,7 @@ class MethodHolder(VppTestCase): else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) - value = int(ip.encode('hex'), 16) + value = int(binascii.hexlify(ip), 16) elif value == 'dst_ip': if ip_ver == 'v4': ip = socket.inet_pton(socket.AF_INET, @@ -255,12 +257,13 @@ class MethodHolder(VppTestCase): else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) - value = int(ip.encode('hex'), 16) + value = int(binascii.hexlify(ip), 16) elif value == 'sport': value = int(capture[0][UDP].sport) elif value == 'dport': value = int(capture[0][UDP].dport) - self.assertEqual(int(record[field].encode('hex'), 16), + self.assertEqual(int(binascii.hexlify( + record[field]), 16), value) def verify_cflow_data_notimer(self, decoder, capture, cflows): @@ -274,8 +277,10 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16)) - self.assertEqual(1, int(rec[2].encode('hex'), 16)) + self.assertEqual(p[IP].len, int( + binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int( + binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, @@ -411,25 +416,25 @@ class Flowprobe(MethodHolder): if cflow.haslayer(Data): record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0] # ingress interface - self.assertEqual(int(record[10].encode('hex'), 16), 8) + self.assertEqual(int(binascii.hexlify(record[10]), 16), 8) # egress interface - self.assertEqual(int(record[14].encode('hex'), 16), 9) + self.assertEqual(int(binascii.hexlify(record[14]), 16), 9) # packets - self.assertEqual(int(record[2].encode('hex'), 16), 1) + self.assertEqual(int(binascii.hexlify(record[2]), 16), 1) # src mac self.assertEqual(':'.join(re.findall('..', record[56].encode( 'hex'))), self.pg8.local_mac) # dst mac self.assertEqual(':'.join(re.findall('..', record[80].encode( 'hex'))), self.pg8.remote_mac) - flowTimestamp = int(record[156].encode('hex'), 16) >> 32 + flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32 # flow start timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) - flowTimestamp = int(record[157].encode('hex'), 16) >> 32 + flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32 # flow end timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) # ethernet type - self.assertEqual(int(record[256].encode('hex'), 16), 8) + self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip self.assertEqual('.'.join(re.findall('..', record[8].encode( 'hex'))), @@ -441,13 +446,13 @@ class Flowprobe(MethodHolder): '.'.join('{:02x}'.format(int(n)) for n in "9.0.0.100".split('.'))) # protocol (TCP) - self.assertEqual(int(record[4].encode('hex'), 16), 6) + self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port - self.assertEqual(int(record[7].encode('hex'), 16), 1234) + self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234) # dst port - self.assertEqual(int(record[11].encode('hex'), 16), 4321) + self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321) # tcp flags - self.assertEqual(int(record[6].encode('hex'), 16), 80) + self.assertEqual(int(binascii.hexlify(record[6]), 16), 80) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") |