summaryrefslogtreecommitdiffstats
path: root/test/test_acl_plugin_l2l3.py
AgeCommit message (Expand)AuthorFilesLines
2021-05-13tests: move test source to vpp/testDave Wallace1-0/+864
2019-08-22tests: move plugin tests to src/plugins/*/testDave Wallace1-871/+0
2019-08-16acl: fix stats-segment counters validation on acl updateAndrew Yourtchenko1-18/+36
2019-04-11Tests: Refactor tearDown show command logging, add lifecycle markers.Paul Vinciguerra1-13/+14
2019-04-10Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.Paul Vinciguerra1-0/+4
2019-03-29tests: test/vpp_l2.py fix missing name.Paul Vinciguerra1-1/+1
2019-03-14acl-plugin: fix the l2l3 unittest to actually verify the pcapAndrew Yourtchenko1-4/+3
2019-03-11vpp_papi_provider: Remove more wrapper functions.Ole Troan1-5/+5
2019-03-11VPP-1508: Use scapy.compat to manage packet level library differences.Paul Vinciguerra1-3/+6
2019-03-07VPP-1508: test_acl_plugin vapi changes for Python3.Paul Vinciguerra1-8/+8
2018-09-27Test typo fixesjuraj.linkes1-4/+4
2018-09-26acl-plugin: fix the stateful ICMP handling and add testcasesAndrew Yourtchenko1-27/+107
2018-09-25L2 BD: introduce a BD interface on which to send UU packetsNeale Ranns1-1/+3
2018-06-24Revert "Revert "make test: fix broken interfaces""Klement Sekera1-1/+1
2018-06-22Revert "make test: fix broken interfaces"Ole Troan1-1/+1
2018-06-22make test: fix broken interfacesKlement Sekera1-1/+1
2018-04-09make test: fix failure on centosKlement Sekera1-1/+1
2018-03-23acl-plugin: make test: shuffle applied ACLs as part of the testsAndrew Yourtchenko1-1/+79
2017-09-26acl-plugin: test: move the API calls to vpp_papi_provider.pyAndrew Yourtchenko1-61/+25
2017-06-19acl-plugin: bihash-based ACL lookupAndrew Yourtchenko1-0/+3
2017-04-20Clean up old datapath code in ACL plugin.Andrew Yourtchenko1-48/+0
2017-04-06acl-plugin: make the IPv4/IPv6 non-first fragment handling in line with ACL (...Andrew Yourtchenko1-3/+48
2017-03-21ACL plugin 1.2Andrew Yourtchenko1-0/+722
='#n476'>476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
#!/usr/bin/env python
import random
import unittest
import datetime
import re

from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6

from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppP2PSubint
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_papi import mac_pton


class P2PEthernetAPI(VppTestCase):
    """P2P Ethernet tests"""

    p2p_sub_ifs = []

    @classmethod
    def setUpClass(cls):
        super(P2PEthernetAPI, cls).setUpClass()

        # Create pg interfaces
        cls.create_pg_interfaces(range(4))

        # Set up all interfaces
        for i in cls.pg_interfaces:
            i.admin_up()

    def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
        p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
        self.p2p_sub_ifs.append(p2p)

    def delete_p2p_ethernet(self, parent_if, remote_mac):
        self.vapi.p2p_ethernet_del(parent_if.sw_if_index,
                                   mac_pton(remote_mac))

    def test_api(self):
        """delete/create p2p subif"""
        self.logger.info("FFP_TEST_START_0000")

        self.create_p2p_ethernet(self.pg0, 1, "de:ad:00:00:00:01")
        self.create_p2p_ethernet(self.pg0, 2, "de:ad:00:00:00:02")
        intfs = self.vapi.cli("show interface")

        self.assertIn('pg0.1', intfs)
        self.assertIn('pg0.2', intfs)
        self.assertNotIn('pg0.5', intfs)

        # create pg2.5 subif
        self.create_p2p_ethernet(self.pg0, 5, "de:ad:00:00:00:ff")
        intfs = self.vapi.cli("show interface")
        self.assertIn('pg0.5', intfs)
        # delete pg2.5 subif
        self.delete_p2p_ethernet(self.pg0, "de:ad:00:00:00:ff")

        intfs = self.vapi.cli("show interface")

        self.assertIn('pg0.1', intfs)
        self.assertIn('pg0.2', intfs)
        self.assertNotIn('pg0.5', intfs)

        self.logger.info("FFP_TEST_FINISH_0000")

    def test_p2p_subif_creation_1k(self):
        """create 1k of p2p subifs"""
        self.logger.info("FFP_TEST_START_0001")

        macs = []
        clients = 1000
        mac = int("dead00000000", 16)

        for i in range(1, clients+1):
            try:
                macs.append(':'.join(re.findall('..', '{:02x}'.format(
                    mac+i))))
                self.vapi.p2p_ethernet_add(self.pg2.sw_if_index,
                                           mac_pton(macs[i-1]),
                                           i)
            except Exception:
                self.logger.info("Failed to create subif %d %s" % (
                    i, macs[i-1]))
                raise

        intfs = self.vapi.cli("show interface").split("\n")
        count = 0
        for intf in intfs:
            if intf.startswith('pg2.'):
                count += 1
        self.assertEqual(count, clients)

        self.logger.info("FFP_TEST_FINISH_0001")


class P2PEthernetIPV6(VppTestCase):
    """P2P Ethernet IPv6 tests"""

    p2p_sub_ifs = []
    packets = []

    @classmethod
    def setUpClass(cls):
        super(P2PEthernetIPV6, cls).setUpClass()

        # Create pg interfaces
        cls.create_pg_interfaces(range(3))

        # Packet sizes
        cls.pg_if_packet_sizes = [64, 512, 1518, 9018]

        # Set up all interfaces
        for i in cls.pg_interfaces:
            i.admin_up()

        cls.pg0.generate_remote_hosts(3)
        cls.pg0.configure_ipv6_neighbors()

        cls.pg1.config_ip6()
        cls.pg1.generate_remote_hosts(3)
        cls.pg1.configure_ipv6_neighbors()
        cls.pg1.disable_ipv6_ra()

    def setUp(self):
        super(P2PEthernetIPV6, self).setUp()
        for p in self.packets:
            self.packets.remove(p)
        self.p2p_sub_ifs.append(
            self.create_p2p_ethernet(self.pg0, 1,
                                     self.pg0._remote_hosts[0].mac))
        self.p2p_sub_ifs.append(
            self.create_p2p_ethernet(self.pg0, 2,
                                     self.pg0._remote_hosts[1].mac))
        self.vapi.cli("trace add p2p-ethernet-input 50")

    def tearDown(self):
        while len(self.p2p_sub_ifs):
            p2p = self.p2p_sub_ifs.pop()
            self.delete_p2p_ethernet(p2p)

        super(P2PEthernetIPV6, self).tearDown()

    def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
        p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
        p2p.admin_up()
        p2p.config_ip6()
        p2p.disable_ipv6_ra()
        return p2p

    def delete_p2p_ethernet(self, p2p):
        p2p.unconfig_ip6()
        p2p.admin_down()
        self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index,
                                   p2p.p2p_remote_mac)

    def create_stream(self, src_mac=None, dst_mac=None,
                      src_ip=None, dst_ip=None, size=None):
        pkt_size = size
        if size is None:
            pkt_size = random.choice(self.pg_if_packet_sizes)
        p = Ether(src=src_mac, dst=dst_mac)
        p /= IPv6(src=src_ip, dst=dst_ip)
        p /= (UDP(sport=1234, dport=4321) / Raw('\xa5' * 20))
        self.extend_packet(p, pkt_size)
        return p

    def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
        self.pg_enable_capture([dst_if])
        if packets is None:
            packets = self.packets
        src_if.add_stream(packets)
        self.pg_start()
        if count is None:
            count = len(packets)
        return dst_if.get_capture(count)

    def test_no_p2p_subif(self):
        """standard routing without p2p subinterfaces"""
        self.logger.info("FFP_TEST_START_0001")

        route_8000 = VppIpRoute(self, "8000::", 64,
                                [VppRoutePath(self.pg0.remote_ip6,
                                              self.pg0.sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_8000.add_vpp_config()

        self.packets = [(Ether(dst=self.pg1.local_mac,
                               src=self.pg1.remote_mac) /
                         IPv6(src="3001::1", dst="8000::100") /
                         UDP(sport=1234, dport=1234) /
                         Raw('\xa5' * 100))]
        self.send_packets(self.pg1, self.pg0)

        self.logger.info("FFP_TEST_FINISH_0001")

    def test_ip6_rx_p2p_subif(self):
        """receive ipv6 packet via p2p subinterface"""
        self.logger.info("FFP_TEST_START_0002")

        route_9001 = VppIpRoute(self, "9001::", 64,
                                [VppRoutePath(self.pg1.remote_ip6,
                                              self.pg1.sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_9001.add_vpp_config()

        self.packets.append(
            self.create_stream(src_mac=self.pg0._remote_hosts[0].mac,
                               dst_mac=self.pg0.local_mac,
                               src_ip=self.p2p_sub_ifs[0].remote_ip6,
                               dst_ip="9001::100"))

        self.send_packets(self.pg0, self.pg1, self.packets)
        self.assert_packet_counter_equal('p2p-ethernet-input', 1)

        route_9001.remove_vpp_config()
        self.logger.info("FFP_TEST_FINISH_0002")

    def test_ip6_rx_p2p_subif_route(self):
        """route rx ip6 packet not matching p2p subinterface"""
        self.logger.info("FFP_TEST_START_0003")

        self.pg0.config_ip6()

        route_3 = VppIpRoute(self, "9000::", 64,
                             [VppRoutePath(self.pg1._remote_hosts[0].ip6,
                                           self.pg1.sw_if_index,
                                           proto=DpoProto.DPO_PROTO_IP6)],
                             is_ip6=1)
        route_3.add_vpp_config()

        self.packets.append(
            self.create_stream(src_mac="02:03:00:00:ff:ff",
                               dst_mac=self.pg0.local_mac,
                               src_ip="a000::100",
                               dst_ip="9000::100"))

        self.send_packets(self.pg0, self.pg1)

        self.pg0.unconfig_ip6()

        route_3.remove_vpp_config()

        self.logger.info("FFP_TEST_FINISH_0003")

    def test_ip6_rx_p2p_subif_drop(self):
        """drop rx packet not matching p2p subinterface"""
        self.logger.info("FFP_TEST_START_0004")

        route_9001 = VppIpRoute(self, "9000::", 64,
                                [VppRoutePath(self.pg1._remote_hosts[0].ip6,
                                              self.pg1.sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_9001.add_vpp_config()

        self.packets.append(
            self.create_stream(src_mac="02:03:00:00:ff:ff",
                               dst_mac=self.pg0.local_mac,
                               src_ip="a000::100",
                               dst_ip="9000::100"))

        # no packet received
        self.send_packets(self.pg0, self.pg1, count=0)
        self.logger.info("FFP_TEST_FINISH_0004")

    def test_ip6_tx_p2p_subif(self):
        """send packet via p2p subinterface"""
        self.logger.info("FFP_TEST_START_0005")

        route_8000 = VppIpRoute(self, "8000::", 64,
                                [VppRoutePath(self.pg0.remote_ip6,
                                              self.pg0.sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_8000.add_vpp_config()
        route_8001 = VppIpRoute(self, "8001::", 64,
                                [VppRoutePath(self.p2p_sub_ifs[0].remote_ip6,
                                              self.p2p_sub_ifs[0].sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_8001.add_vpp_config()
        route_8002 = VppIpRoute(self, "8002::", 64,
                                [VppRoutePath(self.p2p_sub_ifs[1].remote_ip6,
                                              self.p2p_sub_ifs[1].sw_if_index,
                                              proto=DpoProto.DPO_PROTO_IP6)],
                                is_ip6=1)
        route_8002.add_vpp_config()

        for i in range(0, 3):
            self.packets.append(
                self.create_stream(src_mac=self.pg1.remote_mac,
                                   dst_mac=self.pg1.local_mac,
                                   src_ip=self.pg1.remote_ip6,
                                   dst_ip="800%d::100" % i))

        self.send_packets(self.pg1, self.pg0, count=3)

        route_8000.remove_vpp_config()
        route_8001.remove_vpp_config()
        route_8002.remove_vpp_config()

        self.logger.info("FFP_TEST_FINISH_0005")

    def test_ip6_tx_p2p_subif_drop(self):
        """drop tx ip6 packet not matching p2p subinterface"""
        self.logger.info("FFP_TEST_START_0006")

        self.packets.append(
            self.create_stream(src_mac="02:03:00:00:ff:ff",
                               dst_mac=self.pg0.local_mac,
                               src_ip="a000::100",
                               dst_ip="9000::100"))

        # no packet received
        self.send_packets(self.pg0, self.pg1, count=0)
        self.logger.info("FFP_TEST_FINISH_0006")


class P2PEthernetIPV4(VppTestCase):
    """P2P Ethernet IPv4 tests"""

    p2p_sub_ifs = []
    packets = []

    @classmethod
    def setUpClass(cls):
        super(P2PEthernetIPV4, cls).setUpClass()

        # Create pg interfaces
        cls.create_pg_interfaces(range(3))

        # Packet sizes
        cls.pg_if_packet_sizes = [64, 512, 1518, 9018]

        # Set up all interfaces
        for i in cls.pg_interfaces:
            i.admin_up()

        cls.pg0.config_ip4()
        cls.pg0.generate_remote_hosts(5)
        cls.pg0.configure_ipv4_neighbors()

        cls.pg1.config_ip4()
        cls.pg1.generate_remote_hosts(5)
        cls.pg1.configure_ipv4_neighbors()

    def setUp(self):
        super(P2PEthernetIPV4, self).setUp()
        for p in self.packets:
            self.packets.remove(p)
        self.p2p_sub_ifs.append(
            self.create_p2p_ethernet(self.pg0, 1,
                                     self.pg0._remote_hosts[0].mac))
        self.p2p_sub_ifs.append(
            self.create_p2p_ethernet(self.pg0, 2,
                                     self.pg0._remote_hosts[1].mac))
        self.vapi.cli("trace add p2p-ethernet-input 50")

    def tearDown(self):
        while len(self.p2p_sub_ifs):
            p2p = self.p2p_sub_ifs.pop()
            self.delete_p2p_ethernet(p2p)
        super(P2PEthernetIPV4, self).tearDown()

    def create_stream(self, src_mac=None, dst_mac=None,
                      src_ip=None, dst_ip=None, size=None):
        pkt_size = size
        if size is None:
            pkt_size = random.choice(self.pg_if_packet_sizes)
        p = Ether(src=src_mac, dst=dst_mac)
        p /= IP(src=src_ip, dst=dst_ip)
        p /= (UDP(sport=1234, dport=4321) / Raw('\xa5' * 20))
        self.extend_packet(p, pkt_size)
        return p

    def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
        self.pg_enable_capture([dst_if])
        if packets is None:
            packets = self.packets
        src_if.add_stream(packets)
        self.pg_start()
        if count is None:
            count = len(packets)
        return dst_if.get_capture(count)

    def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
        p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
        p2p.admin_up()
        p2p.config_ip4()
        return p2p

    def delete_p2p_ethernet(self, p2p):
        p2p.unconfig_ip4()
        p2p.admin_down()
        self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index,
                                   p2p.p2p_remote_mac)

    def test_ip4_rx_p2p_subif(self):
        """receive ipv4 packet via p2p subinterface"""
        self.logger.info("FFP_TEST_START_0002")

        route_9000 = VppIpRoute(self, "9.0.0.0", 16,
                                [VppRoutePath(self.pg1.remote_ip4,
                                              self.pg1.sw_if_index)])
        route_9000.add_vpp_config()

        self.packets.append(
            self.create_stream(src_mac=self.pg0._remote_hosts[0].mac,
                               dst_mac=self.pg0.local_mac,
                               src_ip=self.p2p_sub_ifs[0].remote_ip4,
                               dst_ip="9.0.0.100"))

        self.send_packets(self.pg0, self.pg1, self.packets)

        self.assert_packet_counter_equal('p2p-ethernet-input', 1)

        route_9000.remove_vpp_config()
        self.logger.info("FFP_TEST_FINISH_0002")

    def test_ip4_rx_p2p_subif_route(self):
        """route rx packet not matching p2p subinterface"""
        self.logger.info("FFP_TEST_START_0003")

        route_9001 = VppIpRoute(self, "9.0.0.0", 24,
                                [VppRoutePath(self.pg1.remote_ip4,
                                              self.pg1.sw_if_index)])
        route_9001.add_vpp_config()

        self.packets.append(
            self.create_stream(src_mac="02:01:00:00:ff:ff",
                               dst_mac=self.pg0.local_mac,
                               src_ip="8.0.0.100",
                               dst_ip="9.0.0.100"))

        self.send_packets(self.pg0, self.pg1)

        route_9001.remove_vpp_config()

        self.logger.info("FFP_TEST_FINISH_0003")

    def test_ip4_tx_p2p_subif(self):
        """send ip4 packet via p2p subinterface"""
        self.logger.info("FFP_TEST_START_0005")

        route_9100 = VppIpRoute(self, "9.1.0.100", 24,
                                [VppRoutePath(self.pg0.remote_ip4,
                                              self.pg0.sw_if_index,
                                              )])
        route_9100.add_vpp_config()
        route_9200 = VppIpRoute(self, "9.2.0.100", 24,
                                [VppRoutePath(self.p2p_sub_ifs[0].remote_ip4,
                                              self.p2p_sub_ifs[0].sw_if_index,
                                              )])
        route_9200.add_vpp_config()
        route_9300 = VppIpRoute(self, "9.3.0.100", 24,
                                [VppRoutePath(self.p2p_sub_ifs[1].remote_ip4,
                                              self.p2p_sub_ifs[1].sw_if_index
                                              )])
        route_9300.add_vpp_config()

        for i in range(0, 3):
            self.packets.append(
                self.create_stream(src_mac=self.pg1.remote_mac,
                                   dst_mac=self.pg1.local_mac,
                                   src_ip=self.pg1.remote_ip4,
                                   dst_ip="9.%d.0.100" % (i+1)))

        self.send_packets(self.pg1, self.pg0)

        # route_7000.remove_vpp_config()
        route_9100.remove_vpp_config()
        route_9200.remove_vpp_config()
        route_9300.remove_vpp_config()

        self.logger.info("FFP_TEST_FINISH_0005")

    def test_ip4_tx_p2p_subif_drop(self):
        """drop tx ip4 packet not matching p2p subinterface"""
        self.logger.info("FFP_TEST_START_0006")

        self.packets.append(
            self.create_stream(src_mac="02:01:00:00:ff:ff",
                               dst_mac=self.pg0.local_mac,
                               src_ip="8.0.0.100",
                               dst_ip="9.0.0.100"))

        # no packet received
        self.send_packets(self.pg0, self.pg1, count=0)
        self.logger.info("FFP_TEST_FINISH_0006")


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)