aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Yourtchenko <ayourtch@gmail.com>2017-09-07 13:22:24 +0200
committerDamjan Marion <dmarion.lists@gmail.com>2017-09-07 13:38:56 +0000
commit92dc12a01b1f5c75500bb015bd159d6bba0547b5 (patch)
tree798adb51ce8966661f820483b96193381e551928
parent7d6412e66d7bef15e964935845ed30c03d8b12b7 (diff)
test: factor out "L4_Conn" into a class within util.py (VPP-931)
It seems a useful abstraction for the purposes of writing fine-grained tests, to be able to create a "connection" object which would be bound to two VPP interfaces, and hold some information about the state, allowing to send the packets back and forth with minimal amount of arguments. Change-Id: Idb83b6b82b38bded5b7e1756a41bb2df4cd58e3a Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
-rw-r--r--test/test_acl_plugin_conns.py52
-rw-r--r--test/util.py66
2 files changed, 68 insertions, 50 deletions
diff --git a/test/test_acl_plugin_conns.py b/test/test_acl_plugin_conns.py
index 06f3cf7e178..0d4aa09d5eb 100644
--- a/test/test_acl_plugin_conns.py
+++ b/test/test_acl_plugin_conns.py
@@ -13,6 +13,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
from scapy.layers.inet6 import IPv6ExtHdrFragment
from pprint import pprint
from random import randint
+from util import L4_Conn
def to_acl_rule(self, is_permit, wildcard_sport=False):
@@ -68,37 +69,7 @@ class IterateWithSleep():
self.testcase.sleep(self.sleep_sec)
-class Conn():
- def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
- self.testcase = testcase
- self.ifs = [None, None]
- self.ifs[0] = if1
- self.ifs[1] = if2
- self.address_family = af
- self.l4proto = l4proto
- self.ports = [None, None]
- self.ports[0] = port1
- self.ports[1] = port2
- self
-
- def pkt(self, side, flags=None):
- is_ip6 = 1 if self.address_family == AF_INET6 else 0
- s0 = side
- s1 = 1-side
- src_if = self.ifs[s0]
- dst_if = self.ifs[s1]
- layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
- IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
- payload = "x"
- l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
- if flags is not None:
- l4args['flags'] = flags
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- layer_3[is_ip6] /
- self.l4proto(**l4args) /
- Raw(payload))
- return p
-
+class Conn(L4_Conn):
def apply_acls(self, reflect_side, acl_side):
pkts = []
pkts.append(self.pkt(0))
@@ -152,25 +123,6 @@ class Conn():
}
return new_rule
- def send(self, side, flags=None):
- self.ifs[side].add_stream(self.pkt(side, flags))
- self.ifs[1-side].enable_capture()
- self.testcase.pg_start()
-
- def recv(self, side):
- p = self.ifs[side].wait_for_packet(1)
- return p
-
- def send_through(self, side, flags=None):
- self.send(side, flags)
- p = self.recv(1-side)
- return p
-
- def send_pingpong(self, side, flags1=None, flags2=None):
- p1 = self.send_through(side, flags1)
- p2 = self.send_through(1-side, flags2)
- return [p1, p2]
-
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
class ACLPluginConnTestCase(VppTestCase):
diff --git a/test/util.py b/test/util.py
index d6aa9a42677..3e0267a3f4d 100644
--- a/test/util.py
+++ b/test/util.py
@@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta
from cStringIO import StringIO
from scapy.layers.inet6 import in6_mactoifaceid
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
+from scapy.packet import Packet
+from socket import inet_pton, AF_INET, AF_INET6
+
def ppp(headline, packet):
""" Return string containing the output of scapy packet.show() call. """
@@ -163,3 +170,62 @@ class ForeignAddressFactory(object):
raise Exception("Network host address exhaustion")
self.count += 1
return self.net_template.format(self.count)
+
+
+class L4_Conn():
+ """ L4 'connection' tied to two VPP interfaces """
+ def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
+ self.testcase = testcase
+ self.ifs = [None, None]
+ self.ifs[0] = if1
+ self.ifs[1] = if2
+ self.address_family = af
+ self.l4proto = l4proto
+ self.ports = [None, None]
+ self.ports[0] = port1
+ self.ports[1] = port2
+ self
+
+ def pkt(self, side, l4args={}, payload="x"):
+ is_ip6 = 1 if self.address_family == AF_INET6 else 0
+ s0 = side
+ s1 = 1-side
+ src_if = self.ifs[s0]
+ dst_if = self.ifs[s1]
+ layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+ IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
+ merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
+ merged_l4args.update(l4args)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ layer_3[is_ip6] /
+ self.l4proto(**merged_l4args) /
+ Raw(payload))
+ return p
+
+ def send(self, side, flags=None, payload=""):
+ l4args = {}
+ if flags is not None:
+ l4args['flags'] = flags
+ self.ifs[side].add_stream(self.pkt(side,
+ l4args=l4args, payload=payload))
+ self.ifs[1-side].enable_capture()
+ self.testcase.pg_start()
+
+ def recv(self, side):
+ p = self.ifs[side].wait_for_packet(1)
+ return p
+
+ def send_through(self, side, flags=None, payload=""):
+ self.send(side, flags, payload)
+ p = self.recv(1-side)
+ return p
+
+ def send_pingpong(self, side, flags1=None, flags2=None):
+ p1 = self.send_through(side, flags1)
+ p2 = self.send_through(1-side, flags2)
+ return [p1, p2]
+
+
+class L4_CONN_SIDE:
+ L4_CONN_SIDE_ZERO = 0
+ L4_CONN_SIDE_ONE = 1