summaryrefslogtreecommitdiffstats
path: root/test/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/util.py')
-rw-r--r--test/util.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/test/util.py b/test/util.py
index d6aa9a42677..3e0267a3f4d 100644
--- a/test/util.py
+++ b/test/util.py
@@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta
from cStringIO import StringIO
from scapy.layers.inet6 import in6_mactoifaceid
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
+from scapy.packet import Packet
+from socket import inet_pton, AF_INET, AF_INET6
+
def ppp(headline, packet):
""" Return string containing the output of scapy packet.show() call. """
@@ -163,3 +170,62 @@ class ForeignAddressFactory(object):
raise Exception("Network host address exhaustion")
self.count += 1
return self.net_template.format(self.count)
+
+
+class L4_Conn():
+ """ L4 'connection' tied to two VPP interfaces """
+ def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
+ self.testcase = testcase
+ self.ifs = [None, None]
+ self.ifs[0] = if1
+ self.ifs[1] = if2
+ self.address_family = af
+ self.l4proto = l4proto
+ self.ports = [None, None]
+ self.ports[0] = port1
+ self.ports[1] = port2
+ self
+
+ def pkt(self, side, l4args={}, payload="x"):
+ is_ip6 = 1 if self.address_family == AF_INET6 else 0
+ s0 = side
+ s1 = 1-side
+ src_if = self.ifs[s0]
+ dst_if = self.ifs[s1]
+ layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+ IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
+ merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
+ merged_l4args.update(l4args)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ layer_3[is_ip6] /
+ self.l4proto(**merged_l4args) /
+ Raw(payload))
+ return p
+
+ def send(self, side, flags=None, payload=""):
+ l4args = {}
+ if flags is not None:
+ l4args['flags'] = flags
+ self.ifs[side].add_stream(self.pkt(side,
+ l4args=l4args, payload=payload))
+ self.ifs[1-side].enable_capture()
+ self.testcase.pg_start()
+
+ def recv(self, side):
+ p = self.ifs[side].wait_for_packet(1)
+ return p
+
+ def send_through(self, side, flags=None, payload=""):
+ self.send(side, flags, payload)
+ p = self.recv(1-side)
+ return p
+
+ def send_pingpong(self, side, flags1=None, flags2=None):
+ p1 = self.send_through(side, flags1)
+ p2 = self.send_through(1-side, flags2)
+ return [p1, p2]
+
+
+class L4_CONN_SIDE:
+ L4_CONN_SIDE_ZERO = 0
+ L4_CONN_SIDE_ONE = 1