#!/usr/bin/env python3 import unittest from ipaddress import IPv4Network from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from framework import VppTestCase from asfframework import VppTestRunner from vpp_interface import VppInterface from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath from vpp_acl import AclRule, VppAcl, VppAclInterface from config import config NUM_PKTS = 67 class VppPipe(VppInterface): """ VPP Pipe """ @property def east(self): return self.result.pipe_sw_if_index[1] @property def west(self): return self.result.pipe_sw_if_index[0] def __init__(self, test, instance=0xFFFFFFFF): super(VppPipe, self).__init__(test) self._test = test self.instance = instance def add_vpp_config(self): self.result = self._test.vapi.pipe_create( 0 if self.instance == 0xFFFFFFFF else 1, self.instance ) self.set_sw_if_index(self.result.sw_if_index) def remove_vpp_config(self): self._test.vapi.pipe_delete(self.result.sw_if_index) def object_id(self): return "pipe-%d" % (self._sw_if_index) def query_vpp_config(self): pipes = self._test.vapi.pipe_dump() for p in pipes: if p.sw_if_index == self.result.sw_if_index: return True return False def set_unnumbered(self, ip_sw_if_index, is_east, is_add=True): if is_east: res = self._test.vapi.sw_interface_set_unnumbered( ip_sw_if_index, self.east, is_add ) else: res = self._test.vapi.sw_interface_set_unnumbered( ip_sw_if_index, self.west, is_add ) @unittest.skipIf("acl" in config.excluded_plugins, "Exclude tests requiring ACL plugin") class TestPipe(VppTestCase): """Pipes""" @classmethod def setUpClass(cls): super(TestPipe, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestPipe, cls).tearDownClass() def setUp(self): super(TestPipe, self).setUp() self.create_pg_interfaces(range(4)) for i in self.pg_interfaces: i.admin_up() def tearDown(self): for i in self.pg_interfaces: i.admin_down() super(TestPipe, self).tearDown() def test_pipe(self): """Pipes""" pipes = [VppPipe(self), VppPipe(self, 10)] for p in pipes: p.add_vpp_config() p.admin_up() # # L2 cross-connect pipe0 east with pg0 and west with pg1 # self.vapi.sw_interface_set_l2_xconnect( self.pg0.sw_if_index, pipes[0].east, enable=1 ) self.vapi.sw_interface_set_l2_xconnect( pipes[0].east, self.pg0.sw_if_index, enable=1 ) self.vapi.sw_interface_set_l2_xconnect( self.pg1.sw_if_index, pipes[0].west, enable=1 ) self.vapi.sw_interface_set_l2_xconnect( pipes[0].west, self.pg1.sw_if_index, enable=1 ) # test bi-directional L2 flow pg0<->pg1 p = ( Ether(src=self.pg0.remote_mac, dst=self.pg1.remote_mac) / IP(src="1.1.1.1", dst="1.1.1.2") / UDP(sport=1234, dport=1234) / Raw(b"\xa5" * 100) ) self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1) self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0) # # Attach ACL to ensure features are run on the pipe # rule_1 = AclRule( is_permit=0, proto=17, src_prefix=IPv4Network("1.1.1.1/32"), dst_prefix=IPv4Network("1.1.1.2/32"), ports=1234, ) acl = VppAcl(self, rules=[rule_1]) acl.add_vpp_config() # Apply the ACL on the pipe on output acl_if_e = VppAclInterface( self, sw_if_index=pipes[0].east, n_input=0, acls=[acl] ) acl_if_e.add_vpp_config() self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS) self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0) # remove from output and apply on input acl_if_e.remove_vpp_config() acl_if_w = VppAclInterface( self, sw_if_index=pipes[0].west, n_input=1, acls=[acl] ) acl_if_w.add_vpp_config() self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS) self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0) acl_if_w.remove_vpp_config() self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1) self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0) # # L3 routes in two separate tables so a pipe can be used to L3 # x-connect # tables = [] tables.append(VppIpTable(self, 1)) tables.append(VppIpTable(self, 2)) for t in tables: t.add_vpp_config() self.pg2.set_table_ip4(1) self.pg2.config_ip4() self.pg2.resolve_arp() self.pg3.set_table_ip4(2) self.pg3.config_ip4() self.pg3.resolve_arp() routes = [] routes.append( VppIpRoute( self, "1.1.1.1", 32, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], table_id=2, ) ) routes.append( VppIpRoute( self, "1.1.1.1", 32, [VppRoutePath("0.0.0.0", pipes[1].east)], table_id=1, ) ) routes.append( VppIpRoute( self, "1.1.1.2", 32, [VppRoutePath("0.0.0.0", pipes[1].west)], table_id=2, ) ) routes.append( VppIpRoute( self, "1.1.1.2", 32, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)], table_id=1, ) ) for r in routes: r.add_vpp_config() p_east = ( Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / IP(src="1.1.1.2", dst="1.1.1.1") / UDP(sport=1234, dport=1234) / Raw(b"\xa5" * 100) ) # bind the pipe ends to the correct tables self.vapi.sw_interface_set_table(pipes[1].west, 0, 2) self.vapi.sw_interface_set_table(pipes[1].east, 0, 1) # IP is not enabled on the pipes at this point self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS) # IP enable the Pipes by making them unnumbered pipes[1].set_unnumbered(self.pg2.sw_if_index, True) pipes[1].set_unnumbered(self.pg3.sw_if_index, False) self.send_and_expect(self.pg2, p_east * NUM_PKTS, self.pg3) # and the return path p_west = ( Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) / IP(src="1.1.1.1", dst="1.1.1.2") / UDP(sport=1234, dport=1234) / Raw(b"\xa5" * 100) ) self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2) # # Use ACLs to test features run on the Pipes # acl_if_e1 = VppAclInterface( self, sw_if_index=pipes[1].east, n_input=0, acls=[acl] ) acl_if_e1.add_vpp_config() self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS) self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2) # remove from output and apply on input acl_if_e1.remove_vpp_config() acl_if_w1 = VppAclInterface( self, sw_if_index=pipes[1].west, n_input=1, acls=[acl] ) acl_if_w1.add_vpp_config() self.send_and_assert_no_replies(self.pg2, p_east * NUM_PKTS) self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2) acl_if_w1.remove_vpp_config() self.send_and_expect(self.pg2, p_east * NUM_PKTS, self.pg3) self.send_and_expect(self.pg3, p_west * NUM_PKTS, self.pg2) # cleanup (so the tables delete) self.pg2.unconfig_ip4() self.pg2.set_table_ip4(0) self.pg3.unconfig_ip4() self.pg3.set_table_ip4(0) self.vapi.sw_interface_set_table(pipes[1].west, 0, 0) self.vapi.sw_interface_set_table(pipes[1].east, 0, 0) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)