#!/usr/bin/env python3
import random
import unittest
import re
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from framework import VppTestCase
from asfframework import VppTestRunner
from vpp_sub_interface import VppP2PSubint
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_papi import mac_pton
class P2PEthernetAPI(VppTestCase):
"""P2P Ethernet tests"""
p2p_sub_ifs = []
@classmethod
def setUpClass(cls):
super(P2PEthernetAPI, cls).setUpClass()
# Create pg interfaces
cls.create_pg_interfaces(range(4))
# Set up all interfaces
for i in cls.pg_interfaces:
i.admin_up()
@classmethod
def tearDownClass(cls):
super(P2PEthernetAPI, cls).tearDownClass()
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
self.p2p_sub_ifs.append(p2p)
def delete_p2p_ethernet(self, parent_if, remote_mac):
self.vapi.p2p_ethernet_del(parent_if.sw_if_index, mac_pton(remote_mac))
def test_api(self):
"""delete/create p2p subif"""
self.logger.info("FFP_TEST_START_0000")
self.create_p2p_ethernet(self.pg0, 1, "de:ad:00:00:00:01")
self.create_p2p_ethernet(self.pg0, 2, "de:ad:00:00:00:02")
intfs = self.vapi.cli("show interface")
self.assertIn("pg0.1", intfs)
self.assertIn("pg0.2", intfs)
self.assertNotIn("pg0.5", intfs)
# create pg2.5 subif
self.create_p2p_ethernet(self.pg0, 5, "de:ad:00:00:00:ff")
intfs = self.vapi.cli("show interface")
self.assertIn("pg0.5", intfs)
# delete pg2.5 subif
self.delete_p2p_ethernet(self.pg0, "de:ad:00:00:00:ff")
intfs = self.vapi.cli("show interface")
self.assertIn("pg0.1", intfs)
self.assertIn("pg0.2", intfs)
self.assertNotIn("pg0.5", intfs)
self.logger.info("FFP_TEST_FINISH_0000")
def test_p2p_subif_creation_1k(self):
"""create 1k of p2p subifs"""
self.logger.info("FFP_TEST_START_0001")
macs = []
clients = 1000
mac = int("dead00000000", 16)
for i in range(1, clients + 1):
try:
macs.append(":".join(re.findall("..", "{:02x}".format(mac + i))))
self.vapi.p2p_ethernet_add(
self.pg2.sw_if_index, mac_pton(macs[i - 1]), i
)
except Exception:
self.logger.info("Failed to create subif %d %s" % (i, macs[i - 1]))
raise
intfs = self.vapi.cli("show interface").split("\n")
count = 0
for intf in intfs:
if intf.startswith("pg2."):
count += 1
self.assertEqual(count, clients)
self.logger.info("FFP_TEST_FINISH_0001")
class P2PEthernetIPV6(VppTestCase):
"""P2P Ethernet IPv6 tests"""
p2p_sub_ifs = []
packets = []
@classmethod
def setUpClass(cls):
super(P2PEthernetIPV6, cls).setUpClass()
# Create pg interfaces
cls.create_pg_interfaces(range(3))
# Packet sizes
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
# Set up all interfaces
for i in cls.pg_interfaces:
i.admin_up()
cls.pg0.generate_remote_hosts(3)
cls.pg0.configure_ipv6_neighbors()
cls.pg1.config_ip6()
cls.pg1.generate_remote_hosts(3)
cls.pg1.configure_ipv6_neighbors()
cls.pg1.disable_ipv6_ra()
@classmethod
def tearDownClass(cls):
super(P2PEthernetIPV6, cls).tearDownClass()
def setUp(self):
super(P2PEthernetIPV6, self).setUp()
for p in self.packets:
self.packets.remove(p)
self.p2p_sub_ifs.append(
self.create_p2p_ethernet(self.pg0, 1, self.pg0._remote_hosts[0].mac)
)
self.p2p_sub_ifs.append(
self.create_p2p_ethernet(self.pg0, 2, self.pg0._remote_hosts[1].mac)
)
self.vapi.cli("trace add p2p-ethernet-input 50")
def tearDown(self):
while len(self.p2p_sub_ifs):
p2p = self.p2p_sub_ifs.pop()
self.delete_p2p_ethernet(p2p)
super(P2PEthernetIPV6, self).tearDown()
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
p2p.admin_up()
p2p.config_ip6()
p2p.disable_ipv6_ra()
return p2p
def delete_p2p_ethernet(self, p2p):
p2p.unconfig_ip6()
p2p.admin_down()
self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index, p2p.p2p_remote_mac)
def create_stream(
self, src_mac=None, dst_mac=None, src_ip=None, dst_ip=None, size=None
):
pkt_size = size
if size is None:
pkt_size = random.choice(self.pg_if_packet_sizes)
p = Ether(src=src_mac, dst=dst_mac)
p /= IPv6(src=src_ip, dst=dst_ip)
p /= UDP(sport=1234, dport=4321) / Raw(b"\xa5" * 20)
self.extend_packet(p, pkt_size)
return p
def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
self.pg_enable_capture([dst_if])
if packets is None:
packets = self.packets
src_if.add_stream(packets)
self.pg_start()
if count is None:
count = len(packets)
return dst_if.get_capture(count)
def test_no_p2p_subif(self):
"""standard routing without p2p subinterfaces"""
self.logger.info("FFP_TEST_START_0001")
self.pg0.config_ip6()
route_8000 = VppIpRoute(
self,
"8000::",
64,
[VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
)
route_8000.add_vpp_config()
self.packets = [
(
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
/ IPv6(src="3001::1", dst="8000::100")
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
)
]
self.send_packets(self.pg1, self.pg0)
self.pg0.unconfig_ip6()
self.logger.info("FFP_TEST_FINISH_0001")
def test_ip6_rx_p2p_subif(self):
"""receive ipv6 packet via p2p subinterface"""
self.logger.info("FFP_TEST_START_0002")
route_9001 = VppIpRoute(
self,
"9001::",
64,
[VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
)
route_9001.add_vpp_config()
self.packets.append(
self.create_stream(
src_mac=self.pg0._remote_hosts[0].mac,
dst_mac=self.pg0.local_mac,
src_ip=self.p2p_sub_ifs[0].remote_ip6,
dst_ip="9001::100",
)
)
self.send_packets(self.pg0, self.pg1, self.packets)
self.assert_packet_counter_equal("p2p-ethernet-input", 1)
route_9001.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
def test_ip6_rx_p2p_subif_route(self):
"""route rx ip6 packet not matching p2p subinterface"""
self.logger.info("FFP_TEST_START_0003")
self.pg0.config_ip6()
route_3 = VppIpRoute(
self,
"9000::",
64,
[VppRoutePath(self.pg1._remote_hosts[0].ip6, self.pg1.sw_if_index)],
)
route_3.add_vpp_config()
self.packets.append(
self.create_stream(
src_mac="02:03:00:00:ff:ff",
dst_mac=self.pg0.local_mac,
src_ip="a000::100",
dst_ip="9000::100",
)
)
self.send_packets(self.pg0, self.pg1)
self.pg0.unconfig_ip6()
route_3.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0003")
def test_ip6_rx_p2p_subif_drop(self):
"""drop rx packet not matching p2p subinterface"""
self.logger.info("FFP_TEST_START_0004")
route_9001 = VppIpRoute(
self,
"9000::",
64,
[VppRoutePath(self.pg1._remote_hosts[0].ip6, self.pg1.sw_if_index)],
)
route_9001.add_vpp_config()
self.packets.append(
self.create_stream(
src_mac="02:03:00:00:ff:ff",
dst_mac=self.pg0.local_mac,
src_ip="a000::100",
dst_ip="9000::100",
)
)
# no packet received
self.send_packets(self.pg0, self.pg1, count=0)
self.logger.info("FFP_TEST_FINISH_0004")
def test_ip6_tx_p2p_subif(self):
"""send packet via p2p subinterface"""
self.logger.info("FFP_TEST_START_0005")
self.pg0.config_ip6()
route_8000 = VppIpRoute(
self,
"8000::",
64,
[VppRoutePath(self.pg0.remote_hosts[0].ip6, self.pg0.sw_if_index)],
)
route_8000.add_vpp_config()
route_8001 = VppIpRoute(
self,
"8001::",
64,
[
VppRoutePath(
self.p2p_sub_ifs[0].remote_ip6, self.p2p_sub_ifs[0].sw_if_index
)
],
)
route_8001.add_vpp_config()
route_8002 = VppIpRoute(
self,
"8002::",
64,
[
VppRoutePath(
self.p2p_sub_ifs[1].remote_ip6, self.p2p_sub_ifs[1].sw_if_index
)
],
)
route_8002.add_vpp_config()
for i in range(0, 3):
self.packets.append(
self.create_stream(
src_mac=self.pg1.remote_mac,
dst_mac=self.pg1.local_mac,
src_ip=self.pg1.remote_ip6,
dst_ip="800%d::100" % i,
)
)
self.send_packets(self.pg1, self.pg0, count=3)
route_8000.remove_vpp_config()
route_8001.remove_vpp_config()
route_8002.remove_vpp_config()
self.pg0.unconfig_ip6()
self.logger.info("FFP_TEST_FINISH_0005")
def test_ip6_tx_p2p_subif_drop(self):
"""drop tx ip6 packet not matching p2p subinterface"""
self.logger.info("FFP_TEST_START_0006")
self.packets.append(
self.create_stream(
src_mac="02:03:00:00:ff:ff",
dst_mac=self.pg0.local_mac,
src_ip="a000::100",
dst_ip="9000::100",
)
)
# no packet received
self.send_packets(self.pg0, self.pg1, count=0)
self.logger.info("FFP_TEST_FINISH_0006")
class P2PEthernetIPV4(VppTestCase):
"""P2P Ethernet IPv4 tests"""
p2p_sub_ifs = []
packets = []
@classmethod
def setUpClass(cls):
super(P2PEthernetIPV4, cls).setUpClass()
# Create pg interfaces
cls.create_pg_interfaces(range(3))
# Packet sizes
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
# Set up all interfaces
for i in cls.pg_interfaces:
i.admin_up()
cls.pg0.config_ip4()
cls.pg0.generate_remote_hosts(5)
cls.pg0.configure_ipv4_neighbors()
cls.pg1.config_ip4()
cls.pg1.generate_remote_hosts(5)
cls.pg1.configure_ipv4_neighbors()
@classmethod
def tearDownClass(cls):
super(P2PEthernetIPV4, cls).tearDownClass()
def setUp(self):
super(P2PEthernetIPV4, self).setUp()
for p in self.packets:
self.packets.remove(p)
self.p2p_sub_ifs.append(
self.create_p2p_ethernet(self.pg0, 1, self.pg0._remote_hosts[0].mac)
)
self.p2p_sub_ifs.append(
self.create_p2p_ethernet(self.pg0, 2, self.pg0._remote_hosts[1].mac)
)
self.vapi.cli("trace add p2p-ethernet-input 50")
def tearDown(self):
while len(self.p2p_sub_ifs):
p2p = self.p2p_sub_ifs.pop()
self.delete_p2p_ethernet(p2p)
super(P2PEthernetIPV4, self).tearDown()
def create_stream(
self, src_mac=None, dst_mac=None, src_ip=None, dst_ip=None, size=None
):
pkt_size = size
if size is None:
pkt_size = random.choice(self.pg_if_packet_sizes)
p = Ether(src=src_mac, dst=dst_mac)
p /= IP(src=src_ip, dst=dst_ip)
p /= UDP(sport=1234, dport=4321) / Raw(b"\xa5" * 20)
self.extend_packet(p, pkt_size)
return p
def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
self.pg_enable_capture([dst_if])
if packets is None:
packets = self.packets
src_if.add_stream(packets)
self.pg_start()
if count is None:
count = len(packets)
return dst_if.get_capture(count)
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
p2p.admin_up()
p2p.config_ip4()
return p2p
def delete_p2p_ethernet(self, p2p):
p2p.unconfig_ip4()
p2p.admin_down()
self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index, p2p.p2p_remote_mac)
def test_ip4_rx_p2p_subif(self):
"""receive ipv4 packet via p2p subinterface"""
self.logger.info("FFP_TEST_START_0002")
route_9000 = VppIpRoute(
self,
"9.0.0.0",
16,
[VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
)
route_9000.add_vpp_config()
self.packets.append(
self.create_stream(
src_mac=self.pg0._remote_hosts[0].mac,
dst_mac=self.pg0.local_mac,
src_ip=self.p2p_sub_ifs[0].remote_ip4,
dst_ip="9.0.0.100",
)
)
self.send_packets(self.pg0, self.pg1, self.packets)
self.assert_packet_counter_equal("p2p-ethernet-input", 1)
route_9000.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
def test_ip4_rx_p2p_subif_route(self):
"""route rx packet not matching p2p subinterface"""
self.logger.info("FFP_TEST_START_0003")
route_9001 = VppIpRoute(
self,
"9.0.0.0",
24,
[VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
)
route_9001.add_vpp_config()
self.packets.append(
self.create_stream(
src_mac="02:01:00:00:ff:ff",
dst_mac=self.pg0.local_mac,
src_ip="8.0.0.100",
dst_ip="9.0.0.100",
)
)
self.send_packets(self.pg0, self.pg1)
route_9001.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0003")
def test_ip4_tx_p2p_subif(self):
"""send ip4 packet via p2p subinterface"""
self.logger.info("FFP_TEST_START_0005")
route_9100 = VppIpRoute(
self,
"9.1.0.100",
24,
[
VppRoutePath(
self.pg0.remote_ip4,
self.pg0.sw_if_index,
)
],
)
route_9100.add_vpp_config()
route_9200 = VppIpRoute(
self,
"9.2.0.100",
24,
[
VppRoutePath(
self.p2p_sub_ifs[0].remote_ip4,
self.p2p_sub_ifs[0].sw_if_index,
)
],
)
route_9200.add_vpp_config()
route_9300 = VppIpRoute(
self,
"9.3.0.100",
24,
[
VppRoutePath(
self.p2p_sub_ifs[1].remote_ip4, self.p2p_sub_ifs[1].sw_if_index
)
],
)
route_9300.add_vpp_config()
for i in range(0, 3):
self.packets.append(
self.create_stream(
src_mac=self.pg1.remote_mac,
dst_mac=self.pg1.local_mac,
src_ip=self.pg1.remote_ip4,
dst_ip="9.%d.0.100" % (i + 1),
)
)
self.send_packets(self.pg1, self.pg0)
# route_7000.remove_vpp_config()
route_9100.remove_vpp_config()
route_9200.remove_vpp_config()
route_9300.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0005")
def test_ip4_tx_p2p_subif_drop(self):
"""drop tx ip4 packet not matching p2p subinterface"""
self.logger.info("FFP_TEST_START_0006")
self.packets.append(
self.create_stream(
src_mac="02:01:00:00:ff:ff",
dst_mac=self.pg0.local_mac,
src_ip="8.0.0.100",
dst_ip="9.0.0.100",
)
)
# no packet received
self.send_packets(self.pg0, self.pg1, count=0)
self.logger.info("FFP_TEST_FINISH_0006")
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)