summaryrefslogtreecommitdiffstats
path: root/test/vpp_pg_interface.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/vpp_pg_interface.py')
-rw-r--r--test/vpp_pg_interface.py91
1 files changed, 90 insertions, 1 deletions
diff --git a/test/vpp_pg_interface.py b/test/vpp_pg_interface.py
index c9e4076d19a..81c7192e07e 100644
--- a/test/vpp_pg_interface.py
+++ b/test/vpp_pg_interface.py
@@ -1,9 +1,12 @@
import os
import time
-from logging import error
+from logging import error, info
from scapy.utils import wrpcap, rdpcap
from vpp_interface import VppInterface
+from scapy.layers.l2 import Ether, ARP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
+
class VppPGInterface(VppInterface):
"""
@@ -130,3 +133,89 @@ class VppPGInterface(VppInterface):
" packets arrived" % self.out_path)
return []
return output
+
+ def create_arp_req(self):
+ """Create ARP request applicable for this interface"""
+ return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
+ ARP(op=ARP.who_has, pdst=self.local_ip4,
+ psrc=self.remote_ip4, hwsrc=self.remote_mac))
+
+ def create_ndp_req(self):
+ """Create NDP - NS applicable for this interface"""
+ return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
+ IPv6(src=self.remote_ip6, dst=self.local_ip6) /
+ ICMPv6ND_NS(tgt=self.local_ip6) /
+ ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
+
+ def resolve_arp(self, pg_interface=None):
+ """Resolve ARP using provided packet-generator interface
+
+ :param pg_interface: interface used to resolve, if None then this
+ interface is used
+
+ """
+ if pg_interface is None:
+ pg_interface = self
+ info("Sending ARP request for %s on port %s" %
+ (self.local_ip4, pg_interface.name))
+ arp_req = self.create_arp_req()
+ pg_interface.add_stream(arp_req)
+ pg_interface.enable_capture()
+ self.test.pg_start()
+ info(self.test.vapi.cli("show trace"))
+ arp_reply = pg_interface.get_capture()
+ if arp_reply is None or len(arp_reply) == 0:
+ info("No ARP received on port %s" % pg_interface.name)
+ return
+ arp_reply = arp_reply[0]
+ # Make Dot1AD packet content recognizable to scapy
+ if arp_reply.type == 0x88a8:
+ arp_reply.type = 0x8100
+ arp_reply = Ether(str(arp_reply))
+ try:
+ if arp_reply[ARP].op == ARP.is_at:
+ info("VPP %s MAC address is %s " %
+ (self.name, arp_reply[ARP].hwsrc))
+ self._local_mac = arp_reply[ARP].hwsrc
+ else:
+ info("No ARP received on port %s" % pg_interface.name)
+ except:
+ error("Unexpected response to ARP request:")
+ error(arp_reply.show())
+ raise
+
+ def resolve_ndp(self, pg_interface=None):
+ """Resolve NDP using provided packet-generator interface
+
+ :param pg_interface: interface used to resolve, if None then this
+ interface is used
+
+ """
+ if pg_interface is None:
+ pg_interface = self
+ info("Sending NDP request for %s on port %s" %
+ (self.local_ip6, pg_interface.name))
+ ndp_req = self.create_ndp_req()
+ pg_interface.add_stream(ndp_req)
+ pg_interface.enable_capture()
+ self.test.pg_start()
+ info(self.test.vapi.cli("show trace"))
+ ndp_reply = pg_interface.get_capture()
+ if ndp_reply is None or len(ndp_reply) == 0:
+ info("No NDP received on port %s" % pg_interface.name)
+ return
+ ndp_reply = ndp_reply[0]
+ # Make Dot1AD packet content recognizable to scapy
+ if ndp_reply.type == 0x88a8:
+ ndp_reply.type = 0x8100
+ ndp_reply = Ether(str(ndp_reply))
+ try:
+ ndp_na = ndp_reply[ICMPv6ND_NA]
+ opt = ndp_na[ICMPv6NDOptDstLLAddr]
+ info("VPP %s MAC address is %s " %
+ (self.name, opt.lladdr))
+ self._local_mac = opt.lladdr
+ except:
+ error("Unexpected response to NDP request:")
+ error(ndp_reply.show())
+ raise