aboutsummaryrefslogtreecommitdiffstats
path: root/test/vpp_interface.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/vpp_interface.py')
-rw-r--r--test/vpp_interface.py155
1 files changed, 54 insertions, 101 deletions
diff --git a/test/vpp_interface.py b/test/vpp_interface.py
index 509ab952236..d74248a39b3 100644
--- a/test/vpp_interface.py
+++ b/test/vpp_interface.py
@@ -1,9 +1,8 @@
from abc import abstractmethod, ABCMeta
import socket
-from logging import info, error
-from scapy.layers.l2 import Ether, ARP
+from logging import info
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
+from util import Host
class VppInterface(object):
@@ -20,7 +19,7 @@ class VppInterface(object):
@property
def remote_mac(self):
"""MAC-address of the remote interface "connected" to this interface"""
- return self._remote_mac
+ return self._remote_hosts[0].mac
@property
def local_mac(self):
@@ -35,17 +34,17 @@ class VppInterface(object):
@property
def local_ip4n(self):
"""Local IPv4 address - raw, suitable as API parameter"""
- return self._local_ip4n
+ return socket.inet_pton(socket.AF_INET, self._local_ip4)
@property
def remote_ip4(self):
"""IPv4 address of remote peer "connected" to this interface"""
- return self._remote_ip4
+ return self._remote_hosts[0].ip4
@property
def remote_ip4n(self):
"""IPv4 address of remote peer - raw, suitable as API parameter"""
- return self._remote_ip4n
+ return socket.inet_pton(socket.AF_INET, self.remote_ip4)
@property
def local_ip6(self):
@@ -55,17 +54,17 @@ class VppInterface(object):
@property
def local_ip6n(self):
"""Local IPv6 address - raw, suitable as API parameter"""
- return self._local_ip6n
+ return socket.inet_pton(socket.AF_INET6, self.local_ip6)
@property
def remote_ip6(self):
"""IPv6 address of remote peer "connected" to this interface"""
- return self._remote_ip6
+ return self._remote_hosts[0].ip6
@property
def remote_ip6n(self):
"""IPv6 address of remote peer - raw, suitable as API parameter"""
- return self._remote_ip6n
+ return socket.inet_pton(socket.AF_INET6, self.remote_ip6)
@property
def name(self):
@@ -82,19 +81,51 @@ class VppInterface(object):
"""Test case creating this interface"""
return self._test
+ @property
+ def remote_hosts(self):
+ """Remote hosts list"""
+ return self._remote_hosts
+
+ @remote_hosts.setter
+ def remote_hosts(self, value):
+ self._remote_hosts = value
+ #TODO: set hosts_by dicts
+
+ def host_by_mac(self, mac):
+ return self._hosts_by_mac[mac]
+
+ def host_by_ip4(self, ip):
+ return self._hosts_by_ip4[ip]
+
+ def host_by_ip6(self, ip):
+ return self._hosts_by_ip6[ip]
+
+ def generate_remote_hosts(self, count=1):
+ """Generate and add remote hosts for the interface."""
+ self._remote_hosts = []
+ self._hosts_by_mac = {}
+ self._hosts_by_ip4 = {}
+ self._hosts_by_ip6 = {}
+ for i in range(2, count+2): # 0: network address, 1: local vpp address
+ mac = "02:%02x:00:00:ff:%02x" % (self.sw_if_index, i)
+ ip4 = "172.16.%u.%u" % (self.sw_if_index, i)
+ ip6 = "fd01:%04x::%04x" % (self.sw_if_index, i)
+ host = Host(mac, ip4, ip6)
+ self._remote_hosts.append(host)
+ self._hosts_by_mac[mac] = host
+ self._hosts_by_ip4[ip4] = host
+ self._hosts_by_ip6[ip6] = host
+
def post_init_setup(self):
"""Additional setup run after creating an interface object"""
- self._remote_mac = "02:00:00:00:ff:%02x" % self.sw_if_index
+
+ self.generate_remote_hosts()
self._local_ip4 = "172.16.%u.1" % self.sw_if_index
self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
- self._remote_ip4 = "172.16.%u.2" % self.sw_if_index
- self._remote_ip4n = socket.inet_pton(socket.AF_INET, self.remote_ip4)
- self._local_ip6 = "fd01:%u::1" % self.sw_if_index
+ self._local_ip6 = "fd01:%04x::1" % self.sw_if_index
self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
- self._remote_ip6 = "fd01:%u::2" % self.sw_if_index
- self._remote_ip6n = socket.inet_pton(socket.AF_INET6, self.remote_ip6)
r = self.test.vapi.sw_interface_dump()
for intf in r:
@@ -124,6 +155,13 @@ class VppInterface(object):
self.test.vapi.sw_interface_add_del_address(
self.sw_if_index, addr, addr_len)
+ def configure_extend_ipv4_mac_binding(self):
+ """Configure neighbor MAC to IPv4 addresses."""
+ for host in self._remote_hosts:
+ macn = host.mac.replace(":", "").decode('hex')
+ ipn = host.ip4n
+ self.test.vapi.ip_neighbor_add_del(self.sw_if_index, macn, ipn)
+
def config_ip6(self):
"""Configure IPv6 address on the VPP interface"""
addr = self._local_ip6n
@@ -147,91 +185,6 @@ class VppInterface(object):
"""Configure IPv6 RA suppress on the VPP interface"""
self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
- def create_arp_req(self):
- """Create ARP request applicable for this interface"""
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
- ARP(op=ARP.who_has, pdst=self.local_ip4,
- psrc=self.remote_ip4, hwsrc=self.remote_mac))
-
- def create_ndp_req(self):
- return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
- IPv6(src=self.remote_ip6, dst=self.local_ip6) /
- ICMPv6ND_NS(tgt=self.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
-
- def resolve_arp(self, pg_interface=None):
- """Resolve ARP using provided packet-generator interface
-
- :param pg_interface: interface used to resolve, if None then this
- interface is used
-
- """
- if pg_interface is None:
- pg_interface = self
- info("Sending ARP request for %s on port %s" %
- (self.local_ip4, pg_interface.name))
- arp_req = self.create_arp_req()
- pg_interface.add_stream(arp_req)
- pg_interface.enable_capture()
- self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
- arp_reply = pg_interface.get_capture()
- if arp_reply is None or len(arp_reply) == 0:
- info("No ARP received on port %s" % pg_interface.name)
- return
- arp_reply = arp_reply[0]
- # Make Dot1AD packet content recognizable to scapy
- if arp_reply.type == 0x88a8:
- arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
- try:
- if arp_reply[ARP].op == ARP.is_at:
- info("VPP %s MAC address is %s " %
- (self.name, arp_reply[ARP].hwsrc))
- self._local_mac = arp_reply[ARP].hwsrc
- else:
- info("No ARP received on port %s" % pg_interface.name)
- except:
- error("Unexpected response to ARP request:")
- error(arp_reply.show())
- raise
-
- def resolve_ndp(self, pg_interface=None):
- """Resolve NDP using provided packet-generator interface
-
- :param pg_interface: interface used to resolve, if None then this
- interface is used
-
- """
- if pg_interface is None:
- pg_interface = self
- info("Sending NDP request for %s on port %s" %
- (self.local_ip6, pg_interface.name))
- ndp_req = self.create_ndp_req()
- pg_interface.add_stream(ndp_req)
- pg_interface.enable_capture()
- self.test.pg_start()
- info(self.test.vapi.cli("show trace"))
- ndp_reply = pg_interface.get_capture()
- if ndp_reply is None or len(ndp_reply) == 0:
- info("No NDP received on port %s" % pg_interface.name)
- return
- ndp_reply = ndp_reply[0]
- # Make Dot1AD packet content recognizable to scapy
- if ndp_reply.type == 0x88a8:
- ndp_reply.type = 0x8100
- ndp_reply = Ether(str(ndp_reply))
- try:
- ndp_na = ndp_reply[ICMPv6ND_NA]
- opt = ndp_na[ICMPv6NDOptDstLLAddr]
- info("VPP %s MAC address is %s " %
- (self.name, opt.lladdr))
- self._local_mac = opt.lladdr
- except:
- error("Unexpected response to NDP request:")
- error(ndp_reply.show())
- raise
-
def admin_up(self):
""" Put interface ADMIN-UP """
self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)