summaryrefslogtreecommitdiffstats
path: root/test/vpp_pg_interface.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/vpp_pg_interface.py')
-rw-r--r--test/vpp_pg_interface.py91
1 files changed, 90 insertions, 1 deletions
diff --git a/test/vpp_pg_interface.py b/test/vpp_pg_interface.py
index c9e4076d19a..81c7192e07e 100644
--- a/test/vpp_pg_interface.py
+++ b/test/vpp_pg_interface.py
@@ -1,9 +1,12 @@
import os
import time
-from logging import error
+from logging import error, info
from scapy.utils import wrpcap, rdpcap
from vpp_interface import VppInterface
+from scapy.layers.l2 import Ether, ARP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
+
class VppPGInterface(VppInterface):
"""
@@ -130,3 +133,89 @@ class VppPGInterface(VppInterface):
" packets arrived" % self.out_path)
return []
return output
+
+ def create_arp_req(self):
+ """Create ARP request applicable for this interface"""
+ return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
+ ARP(op=ARP.who_has, pdst=self.local_ip4,
+ psrc=self.remote_ip4, hwsrc=self.remote_mac))
+
+ def create_ndp_req(self):
+ """Create NDP - NS applicable for this interface"""
+ return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
+ IPv6(src=self.remote_ip6, dst=self.local_ip6) /
+ ICMPv6ND_NS(tgt=self.local_ip6) /
+ ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
+
+ def resolve_arp(self, pg_interface=None):
+ """Resolve ARP using provided packet-generator interface
+
+ :param pg_interface: interface used to resolve, if None then this
+ interface is used
+
+ """
+ if pg_interface is None:
+ pg_interface = self
+ info("Sending ARP request for %s on port %s" %
+ (self.local_ip4, pg_interface.name))
+ arp_req = self.create_arp_req()
+ pg_interface.add_stream(arp_req)
+ pg_interface.enable_capture()
+ self.test.pg_start()
+ info(self.test.vapi.cli("show trace"))
+ arp_reply = pg_interface.get_capture()
+ if arp_reply is None or len(arp_reply) == 0:
+ info("No ARP received on port %s" % pg_interface.name)
+ return
+ arp_reply = arp_reply[0]
+ # Make Dot1AD packet content recognizable to scapy
+ if arp_reply.type == 0x88a8:
+ arp_reply.type = 0x8100
+ arp_reply = Ether(str(arp_reply))
+ try:
+ if arp_reply[ARP].op == ARP.is_at:
+ info("VPP %s MAC address is %s " %
+ (self.name, arp_reply[ARP].hwsrc))
+ self._local_mac = arp_reply[ARP].hwsrc
+ else:
+ info("No ARP received on port %s" % pg_interface.name)
+ except:
+ error("Unexpected response to ARP request:")
+ error(arp_reply.show())
+ raise
+
+ def resolve_ndp(self, pg_interface=None):
+ """Resolve NDP using provided packet-generator interface
+
+ :param pg_interface: interface used to resolve, if None then this
+ interface is used
+
+ """
+ if pg_interface is None:
+ pg_interface = self
+ info("Sending NDP request for %s on port %s" %
+ (self.local_ip6, pg_interface.name))
+ ndp_req = self.create_ndp_req()
+ pg_interface.add_stream(ndp_req)
+ pg_interface.enable_capture()
+ self.test.pg_start()
+ info(self.test.vapi.cli("show trace"))
+ ndp_reply = pg_interface.get_capture()
+ if ndp_reply is None or len(ndp_reply) == 0:
+ info("No NDP received on port %s" % pg_interface.name)
+ return
+ ndp_reply = ndp_reply[0]
+ # Make Dot1AD packet content recognizable to scapy
+ if ndp_reply.type == 0x88a8:
+ ndp_reply.type = 0x8100
+ ndp_reply = Ether(str(ndp_reply))
+ try:
+ ndp_na = ndp_reply[ICMPv6ND_NA]
+ opt = ndp_na[ICMPv6NDOptDstLLAddr]
+ info("VPP %s MAC address is %s " %
+ (self.name, opt.lladdr))
+ self._local_mac = opt.lladdr
+ except:
+ error("Unexpected response to NDP request:")
+ error(ndp_reply.show())
+ raise
n281'>281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330