summaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/framework.py41
-rw-r--r--test/test_bfd.py2
-rw-r--r--test/test_gre.py62
-rw-r--r--test/test_l2_fib.py14
-rw-r--r--test/test_l2bd_multi_instance.py22
-rw-r--r--test/test_l2xc_multi_instance.py31
-rw-r--r--test/test_lb.py16
-rw-r--r--test/test_mpls.py29
-rw-r--r--test/test_snat.py24
-rw-r--r--test/util.py22
-rw-r--r--test/vpp_pg_interface.py82
11 files changed, 188 insertions, 157 deletions
diff --git a/test/framework.py b/test/framework.py
index 2618b267706..1c3e56cc827 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -5,7 +5,6 @@ import unittest
import tempfile
import time
import resource
-from time import sleep
from collections import deque
from threading import Thread
from inspect import getdoc
@@ -181,7 +180,8 @@ class VppTestCase(unittest.TestCase):
cls.logger.info("Temporary dir is %s, shm prefix is %s",
cls.tempdir, cls.shm_prefix)
cls.setUpConstants()
- cls.pg_streams = []
+ cls._captures = []
+ cls._zombie_captures = []
cls.packet_infos = {}
cls.verbose = 0
cls.vpp_dead = False
@@ -312,17 +312,36 @@ class VppTestCase(unittest.TestCase):
i.enable_capture()
@classmethod
- def pg_start(cls, sleep_time=1):
- """
- Enable the packet-generator and send all prepared packet streams
- Remove the packet streams afterwards
- """
+ def register_capture(cls, cap_name):
+ """ Register a capture in the testclass """
+ # add to the list of captures with current timestamp
+ cls._captures.append((time.time(), cap_name))
+ # filter out from zombies
+ cls._zombie_captures = [(stamp, name)
+ for (stamp, name) in cls._zombie_captures
+ if name != cap_name]
+
+ @classmethod
+ def pg_start(cls):
+ """ Remove any zombie captures and enable the packet generator """
+ # how long before capture is allowed to be deleted - otherwise vpp
+ # crashes - 100ms seems enough (this shouldn't be needed at all)
+ capture_ttl = 0.1
+ now = time.time()
+ for stamp, cap_name in cls._zombie_captures:
+ wait = stamp + capture_ttl - now
+ if wait > 0:
+ cls.logger.debug("Waiting for %ss before deleting capture %s",
+ wait, cap_name)
+ time.sleep(wait)
+ now = time.time()
+ cls.logger.debug("Removing zombie capture %s" % cap_name)
+ cls.vapi.cli('packet-generator delete %s' % cap_name)
+
cls.vapi.cli("trace add pg-input 50") # 50 is maximum
cls.vapi.cli('packet-generator enable')
- sleep(sleep_time) # give VPP some time to process the packets
- for stream in cls.pg_streams:
- cls.vapi.cli('packet-generator delete %s' % stream)
- cls.pg_streams = []
+ cls._zombie_captures = cls._captures
+ cls._captures = []
@classmethod
def create_pg_interfaces(cls, interfaces):
diff --git a/test/test_bfd.py b/test/test_bfd.py
index bf0e88ddcdd..c1095d226a3 100644
--- a/test/test_bfd.py
+++ b/test/test_bfd.py
@@ -98,7 +98,7 @@ class BFDTestSession(object):
p = self.create_packet()
self.test.logger.debug(ppp("Sending packet:", p))
self.test.pg0.add_stream([p])
- self.test.pg_start(sleep_time=0)
+ self.test.pg_start()
def verify_packet(self, packet):
""" Verify correctness of BFD layer. """
diff --git a/test/test_gre.py b/test/test_gre.py
index 0b5082859b7..59d03e9307e 100644
--- a/test/test_gre.py
+++ b/test/test_gre.py
@@ -1,22 +1,22 @@
#!/usr/bin/env python
import unittest
-import socket
from logging import *
from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from vpp_sub_interface import VppDot1QSubint
from vpp_gre_interface import VppGreInterface
from vpp_ip_route import IpRoute, RoutePath
from vpp_papi_provider import L2_VTR_OP
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP, GRE
+from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import ICMPv6ND_NS, ICMPv6ND_RA, IPv6, UDP
-from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import ICMPv6ND_RA, IPv6
from scapy.volatile import RandMAC, RandIP
+from util import ppp, ppc
+
class TestGRE(VppTestCase):
""" GRE Test Case """
@@ -131,7 +131,7 @@ class TestGRE(VppTestCase):
def verify_filter(self, capture, sent):
if not len(capture) == len(sent):
- # filter out any IPv6 RAs from the captur
+ # filter out any IPv6 RAs from the capture
for p in capture:
if (p.haslayer(ICMPv6ND_RA)):
capture.remove(p)
@@ -163,8 +163,8 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_tunneled_l2o4(self, src_if, capture, sent,
@@ -196,8 +196,8 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_tunneled_vlano4(self, src_if, capture, sent,
@@ -206,7 +206,7 @@ class TestGRE(VppTestCase):
capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
except:
- capture.show()
+ ppc("Unexpected packets captured:", capture)
raise
for i in range(len(capture)):
@@ -237,8 +237,8 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_decapped_4o4(self, src_if, capture, sent):
@@ -261,8 +261,8 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_decapped_6o4(self, src_if, capture, sent):
@@ -284,8 +284,8 @@ class TestGRE(VppTestCase):
self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def test_gre(self):
@@ -333,14 +333,8 @@ class TestGRE(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- error("GRE packets forwarded without DIP resolved")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="GRE packets forwarded without DIP resolved")
#
# Add a route that resolves the tunnel's destination
@@ -397,13 +391,8 @@ class TestGRE(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("GRE packets forwarded despite no SRC address match")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="GRE packets forwarded despite no SRC address match")
#
# Configure IPv6 on the PG interface so we can route IPv6
@@ -427,13 +416,8 @@ class TestGRE(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("IPv6 GRE packets forwarded despite IPv6 not enabled on tunnel")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(remark="IPv6 GRE packets forwarded "
+ "despite IPv6 not enabled on tunnel")
#
# Enable IPv6 on the tunnel
diff --git a/test/test_l2_fib.py b/test/test_l2_fib.py
index eb4f4e32d1b..4855a3ea383 100644
--- a/test/test_l2_fib.py
+++ b/test/test_l2_fib.py
@@ -68,7 +68,7 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
-from util import Host
+from util import Host, ppp
class TestL2fib(VppTestCase):
@@ -282,8 +282,7 @@ class TestL2fib(VppTestCase):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
@@ -327,14 +326,7 @@ class TestL2fib(VppTestCase):
# Verify
# Verify outgoing packet streams per packet-generator interface
for i in self.pg_interfaces:
- capture = i.get_capture()
- self.logger.info("Verifying capture on interface %s" % i.name)
- try:
- self.assertEqual(len(capture), 0)
- except AssertionError:
- self.logger.error("The capture on interface %s is not empty!"
- % i.name)
- raise AssertionError("%d != 0" % len(capture))
+ i.assert_nothing_captured(remark="outgoing interface")
def test_l2_fib_01(self):
""" L2 FIB test 1 - program 100 MAC addresses
diff --git a/test/test_l2bd_multi_instance.py b/test/test_l2bd_multi_instance.py
index 417df9e18cd..1272d7656a2 100644
--- a/test/test_l2bd_multi_instance.py
+++ b/test/test_l2bd_multi_instance.py
@@ -70,7 +70,8 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
-from util import Host
+from util import Host, ppp
+
@unittest.skip("Crashes VPP")
class TestL2bdMultiInst(VppTestCase):
@@ -92,12 +93,12 @@ class TestL2bdMultiInst(VppTestCase):
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
for i in range(0, len(cls.pg_interfaces), 3):
- cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i+1],
- cls.pg_interfaces[i+2]]
- cls.flows[cls.pg_interfaces[i+1]] = [cls.pg_interfaces[i],
- cls.pg_interfaces[i+2]]
- cls.flows[cls.pg_interfaces[i+2]] = [cls.pg_interfaces[i],
- cls.pg_interfaces[i+1]]
+ cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i + 1],
+ cls.pg_interfaces[i + 2]]
+ cls.flows[cls.pg_interfaces[i + 1]] = [cls.pg_interfaces[i],
+ cls.pg_interfaces[i + 2]]
+ cls.flows[cls.pg_interfaces[i + 2]] = [cls.pg_interfaces[i],
+ cls.pg_interfaces[i + 1]]
# Mapping between packet-generator index and lists of test hosts
cls.hosts_by_pg_idx = dict()
@@ -188,7 +189,7 @@ class TestL2bdMultiInst(VppTestCase):
if self.bd_deleted_list.count(bd_id) == 1:
self.bd_deleted_list.remove(bd_id)
for j in range(3):
- pg_if = self.pg_interfaces[(i+start-1)*3+j]
+ pg_if = self.pg_interfaces[(i + start - 1) * 3 + j]
self.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
bd_id=bd_id)
self.logger.info("pg-interface %s added to bridge domain ID %d"
@@ -221,7 +222,7 @@ class TestL2bdMultiInst(VppTestCase):
if self.bd_deleted_list.count(bd_id) == 0:
self.bd_deleted_list.append(bd_id)
for j in range(3):
- pg_if = self.pg_interfaces[(i+start-1)*3+j]
+ pg_if = self.pg_interfaces[(i + start - 1) * 3 + j]
self.pg_in_bd.remove(pg_if)
self.pg_not_in_bd.append(pg_if)
self.logger.info("Bridge domain ID %d deleted" % bd_id)
@@ -290,8 +291,7 @@ class TestL2bdMultiInst(VppTestCase):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
diff --git a/test/test_l2xc_multi_instance.py b/test/test_l2xc_multi_instance.py
index 4de76917009..2e55674e70c 100644
--- a/test/test_l2xc_multi_instance.py
+++ b/test/test_l2xc_multi_instance.py
@@ -56,7 +56,7 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
-from util import Host
+from util import Host, ppp
class TestL2xcMultiInst(VppTestCase):
@@ -79,7 +79,7 @@ class TestL2xcMultiInst(VppTestCase):
cls.flows = dict()
for i in range(len(cls.pg_interfaces)):
delta = 1 if i % 2 == 0 else -1
- cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i+delta]]
+ cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i + delta]]
# Mapping between packet-generator index and lists of test hosts
cls.hosts_by_pg_idx = dict()
@@ -155,9 +155,9 @@ class TestL2xcMultiInst(VppTestCase):
(Default value = 0)
"""
for i in range(count):
- rx_if = self.pg_interfaces[i+start]
+ rx_if = self.pg_interfaces[i + start]
delta = 1 if i % 2 == 0 else -1
- tx_if = self.pg_interfaces[i+start+delta]
+ tx_if = self.pg_interfaces[i + start + delta]
self.vapi.sw_interface_set_l2_xconnect(rx_if.sw_if_index,
tx_if.sw_if_index, 1)
self.logger.info("Cross-connect from %s to %s created"
@@ -177,9 +177,9 @@ class TestL2xcMultiInst(VppTestCase):
(Default value = 0)
"""
for i in range(count):
- rx_if = self.pg_interfaces[i+start]
+ rx_if = self.pg_interfaces[i + start]
delta = 1 if i % 2 == 0 else -1
- tx_if = self.pg_interfaces[i+start+delta]
+ tx_if = self.pg_interfaces[i + start + delta]
self.vapi.sw_interface_set_l2_xconnect(rx_if.sw_if_index,
tx_if.sw_if_index, 0)
self.logger.info("Cross-connect from %s to %s deleted"
@@ -253,8 +253,7 @@ class TestL2xcMultiInst(VppTestCase):
self.assertEqual(udp.sport, saved_packet[UDP].sport)
self.assertEqual(udp.dport, saved_packet[UDP].dport)
except:
- self.logger.error("Unexpected or invalid packet:")
- self.logger.error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
@@ -291,21 +290,15 @@ class TestL2xcMultiInst(VppTestCase):
# Verify
# Verify outgoing packet streams per packet-generator interface
for pg_if in self.pg_interfaces:
- capture = pg_if.get_capture()
if pg_if in self.pg_in_xc:
- if len(capture) == 0:
- raise RuntimeError("Interface %s is cross-connect sink but "
- "the capture is empty!" % pg_if.name)
+ capture = pg_if.get_capture(
+ remark="interface is a cross-connect sink")
self.verify_capture(pg_if, capture)
elif pg_if in self.pg_not_in_xc:
- try:
- self.assertEqual(len(capture), 0)
- except AssertionError:
- raise RuntimeError("Interface %s is not cross-connect sink "
- "but the capture is not empty!"
- % pg_if.name)
+ pg_if.assert_nothing_captured(
+ remark="interface is not a cross-connect sink")
else:
- self.logger.error("Unknown interface: %s" % pg_if.name)
+ raise Exception("Unexpected interface: %s" % pg_if.name)
def test_l2xc_inst_01(self):
""" L2XC Multi-instance test 1 - create 10 cross-connects
diff --git a/test/test_lb.py b/test/test_lb.py
index 3e7f5e13faf..7037d80c571 100644
--- a/test/test_lb.py
+++ b/test/test_lb.py
@@ -1,7 +1,7 @@
import socket
from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import ICMPv6ND_RA, IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.packet import Raw
@@ -95,10 +95,16 @@ class TestLB(VppTestCase):
self.assertEqual(str(inner), str(self.info.data[IPver]))
def checkCapture(self, gre4, isv4):
- out = self.pg0.get_capture()
- # This check is edited because RA appears in output, maybe disable RA?
- # self.assertEqual(len(out), 0)
- self.assertLess(len(out), 20)
+ # RA might appear in capture
+ try:
+ out = self.pg0.get_capture()
+ # filter out any IPv6 RAs from the capture
+ for p in out:
+ if (p.haslayer(ICMPv6ND_RA)):
+ out.remove(p)
+ self.assertEqual(len(out), 0)
+ except:
+ pass
out = self.pg1.get_capture()
self.assertEqual(len(out), len(self.packets))
diff --git a/test/test_mpls.py b/test/test_mpls.py
index 24fc4129ecd..6d5eeb2bf88 100644
--- a/test/test_mpls.py
+++ b/test/test_mpls.py
@@ -11,8 +11,6 @@ from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
-from util import ppp
-
class TestMPLS(VppTestCase):
""" MPLS Test Case """
@@ -60,7 +58,7 @@ class TestMPLS(VppTestCase):
else:
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=0)
if not ping:
- p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4) /
+ p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw(payload))
else:
@@ -331,14 +329,8 @@ class TestMPLS(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
-
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS non-EOS packets popped and forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(
+ remark="MPLS non-EOS packets popped and forwarded")
#
# A recursive EOS x-connect, which resolves through another x-connect
@@ -586,8 +578,7 @@ class TestMPLS(VppTestCase):
0, # next-hop-table-id
1, # next-hop-weight
2, # num-out-labels,
- [44, 46]
- )
+ [44, 46])
self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
#
@@ -606,8 +597,7 @@ class TestMPLS(VppTestCase):
0, # next-hop-table-id
1, # next-hop-weight
0, # num-out-labels,
- [] # out-label
- )
+ []) # out-label
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
@@ -632,14 +622,7 @@ class TestMPLS(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS TTL=0 packets forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(remark="MPLS TTL=0 packets forwarded")
#
# a stream with a non-zero MPLS TTL
diff --git a/test/test_snat.py b/test/test_snat.py
index 5cc76f6c7da..fdd81f02e75 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -2,12 +2,12 @@
import socket
import unittest
-from logging import *
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.l2 import Ether
+from util import ppp
class TestSNAT(VppTestCase):
@@ -88,7 +88,7 @@ class TestSNAT(VppTestCase):
:param dst_ip: Destination IP address (Default use global SNAT address)
"""
if dst_ip is None:
- dst_ip=self.snat_addr
+ dst_ip = self.snat_addr
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
@@ -145,8 +145,8 @@ class TestSNAT(VppTestCase):
self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
self.icmp_id_out = packet[ICMP].id
except:
- error("Unexpected or invalid packet (outside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
raise
def verify_capture_in(self, capture, in_if, packet_num=3):
@@ -168,8 +168,8 @@ class TestSNAT(VppTestCase):
else:
self.assertEqual(packet[ICMP].id, self.icmp_id_in)
except:
- error("Unexpected or invalid packet (inside network):")
- error(packet.show())
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
raise
def clear_snat(self):
@@ -410,11 +410,10 @@ class TestSNAT(VppTestCase):
self.pg0.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- capture = self.pg3.get_capture()
- self.verify_capture_out(capture, packet_num=0)
+ self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
- """ SNAT multiple inside interfaces with non-overlapping address space """
+ """SNAT multiple inside interfaces with non-overlapping address space"""
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
@@ -566,8 +565,7 @@ class TestSNAT(VppTestCase):
self.assertEqual(tcp.dport, server_in_port)
host_out_port = tcp.sport
except:
- error("Unexpected or invalid packet:")
- error(p.show())
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# send reply from server to host
@@ -588,11 +586,9 @@ class TestSNAT(VppTestCase):
self.assertEqual(tcp.sport, server_out_port)
self.assertEqual(tcp.dport, host_in_port)
except:
- error("Unexpected or invalid packet:")
- error(p.show())
+ self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
-
def tearDown(self):
super(TestSNAT, self).tearDown()
if not self.vpp_dead:
diff --git a/test/util.py b/test/util.py
index f6c6acd41e9..0ac23760465 100644
--- a/test/util.py
+++ b/test/util.py
@@ -15,6 +15,28 @@ def ppp(headline, packet):
return o.getvalue()
+def ppc(headline, capture, limit=10):
+ """ Return string containing ppp() printout for a capture.
+
+ :param headline: printed as first line of output
+ :param capture: packets to print
+ :param limit: limit the print to # of packets
+ """
+ if not capture:
+ return headline
+ result = headline + "\n"
+ count = 1
+ for p in capture:
+ result.append(ppp("Packet #%s:" % count, p))
+ count += 1
+ if count >= limit:
+ break
+ if limit < len(capture):
+ result.append(
+ "Capture contains %s packets in total, of which %s were printed" %
+ (len(capture), limit))
+
+
class NumericConstant(object):
__metaclass__ = ABCMeta
diff --git a/test/vpp_pg_interface.py b/test/vpp_pg_interface.py
index 2ebcbb57024..44bd1a2d520 100644
--- a/test/vpp_pg_interface.py
+++ b/test/vpp_pg_interface.py
@@ -6,7 +6,7 @@ from vpp_interface import VppInterface
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA,\
ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
-from util import ppp
+from util import ppp, ppc
class VppPGInterface(VppInterface):
@@ -114,25 +114,71 @@ class VppPGInterface(VppInterface):
except:
pass
wrpcap(self.in_path, pkts)
+ self.test.register_capture(self.cap_name)
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.input_cli)
- self.test.pg_streams.append(self.cap_name)
- self.test.vapi.cli("trace add pg-input %d" % len(pkts))
- def get_capture(self):
+ def get_capture(self, remark=None):
"""
Get captured packets
:returns: iterable packets
"""
try:
+ self.wait_for_capture_file()
output = rdpcap(self.out_path)
except IOError: # TODO
- self.test.logger.error("File %s does not exist, probably because no"
+ self.test.logger.debug("File %s does not exist, probably because no"
" packets arrived" % self.out_path)
- return []
+ if remark:
+ raise Exception("No packets captured on %s(%s)" %
+ (self.name, remark))
+ else:
+ raise Exception("No packets captured on %s" % self.name)
return output
+ def assert_nothing_captured(self, remark=None):
+ if os.path.isfile(self.out_path):
+ try:
+ capture = self.get_capture()
+ self.test.logger.error(
+ ppc("Unexpected packets captured:", capture))
+ except:
+ pass
+ if remark:
+ raise AssertionError(
+ "Capture file present for interface %s(%s)" %
+ (self.name, remark))
+ else:
+ raise AssertionError("Capture file present for interface %s" %
+ self.name)
+
+ def wait_for_capture_file(self, timeout=1):
+ """
+ Wait until pcap capture file appears
+
+ :param timeout: How long to wait for the packet (default 1s)
+
+ :raises Exception: if the capture file does not appear within timeout
+ """
+ limit = time.time() + timeout
+ if not os.path.isfile(self.out_path):
+ self.test.logger.debug(
+ "Waiting for capture file to appear, timeout is %ss", timeout)
+ else:
+ self.test.logger.debug("Capture file already exists")
+ return
+ while time.time() < limit:
+ if os.path.isfile(self.out_path):
+ break
+ time.sleep(0) # yield
+ if os.path.isfile(self.out_path):
+ self.test.logger.debug("Capture file appeared after %fs" %
+ (time.time() - (limit - timeout)))
+ else:
+ self.test.logger.debug("Timeout - capture file still nowhere")
+ raise Exception("Capture file did not appear within timeout")
+
def wait_for_packet(self, timeout):
"""
Wait for next packet captured with a timeout
@@ -144,18 +190,8 @@ class VppPGInterface(VppInterface):
"""
limit = time.time() + timeout
if self._pcap_reader is None:
- self.test.logger.debug("Waiting for the capture file to appear")
- while time.time() < limit:
- if os.path.isfile(self.out_path):
- break
- time.sleep(0) # yield
- if os.path.isfile(self.out_path):
- self.test.logger.debug("Capture file appeared after %fs" %
- (time.time() - (limit - timeout)))
- self._pcap_reader = PcapReader(self.out_path)
- else:
- self.test.logger.debug("Timeout - capture file still nowhere")
- raise Exception("Packet didn't arrive within timeout")
+ self.wait_for_capture_file(timeout)
+ self._pcap_reader = PcapReader(self.out_path)
self.test.logger.debug("Waiting for packet")
while time.time() < limit:
@@ -197,11 +233,11 @@ class VppPGInterface(VppInterface):
pg_interface.enable_capture()
self.test.pg_start()
self.test.logger.info(self.test.vapi.cli("show trace"))
- arp_reply = pg_interface.get_capture()
- if arp_reply is None or len(arp_reply) == 0:
- self.test.logger.info(
- "No ARP received on port %s" %
- pg_interface.name)
+ try:
+ arp_reply = pg_interface.get_capture()
+ except:
+ self.test.logger.info("No ARP received on port %s" %
+ pg_interface.name)
return
arp_reply = arp_reply[0]
# Make Dot1AD packet content recognizable to scapy