#!/usr/bin/env python3 import unittest import socket from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6, Raw from scapy.layers.l2 import Ether, ARP from util import reassemble4 from vpp_object import VppObject from framework import VppTestCase from asfframework import VppTestRunner from vpp_ipip_tun_interface import VppIpIpTunInterface from template_ipsec import ( TemplateIpsec, IpsecTun4, ) from template_ipsec import ( TemplateIpsec, IpsecTun4, ) from test_ipsec_tun_if_esp import TemplateIpsecItf4 from config import config class VppLcpPair(VppObject): def __init__(self, test, phy, host): self._test = test self.phy = phy self.host = host def add_vpp_config(self): self._test.vapi.cli("test lcp add phy %s host %s" % (self.phy, self.host)) self._test.registry.register(self, self._test.logger) return self def remove_vpp_config(self): self._test.vapi.cli("test lcp del phy %s host %s" % (self.phy, self.host)) def object_id(self): return "lcp:%d:%d" % (self.phy.sw_if_index, self.host.sw_if_index) def query_vpp_config(self): pairs = list(self._test.vapi.vpp.details_iter(self._test.vapi.lcp_itf_pair_get)) for p in pairs: if ( p.phy_sw_if_index == self.phy.sw_if_index and p.host_sw_if_index == self.host.sw_if_index ): return True return False @unittest.skipIf("linux-cp" in config.excluded_plugins, "Exclude linux-cp plugin tests") class TestLinuxCP(VppTestCase): """Linux Control Plane""" extra_vpp_plugin_config = [ "plugin", "linux_cp_plugin.so", "{", "enable", "}", "plugin", "linux_cp_unittest_plugin.so", "{", "enable", "}", ] @classmethod def setUpClass(cls): super(TestLinuxCP, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestLinuxCP, cls).tearDownClass() def setUp(self): super(TestLinuxCP, self).setUp() # create 4 pg interfaces so we can create two pairs self.create_pg_interfaces(range(4)) # create on ip4 and one ip6 pg tun self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5)) self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6)) for i in self.pg_interfaces: i.admin_up() def tearDown(self): for i in self.pg_interfaces: i.admin_down() super(TestLinuxCP, self).tearDown() def test_linux_cp_tap(self): """Linux CP TAP""" # # Setup # arp_opts = {"who-has": 1, "is-at": 2} # create two pairs, wihch a bunch of hots on the phys hosts = [self.pg0, self.pg1] phys = [self.pg2, self.pg3] N_HOSTS = 4 for phy in phys: phy.config_ip4() phy.generate_remote_hosts(4) phy.configure_ipv4_neighbors() pair1 = VppLcpPair(self, phys[0], hosts[0]).add_vpp_config() pair2 = VppLcpPair(self, phys[1], hosts[1]).add_vpp_config() self.logger.info(self.vapi.cli("sh lcp adj verbose")) self.logger.info(self.vapi.cli("sh lcp")) # # Traffic Tests # # hosts to phys for phy, host in zip(phys, hosts): for j in range(N_HOSTS): p = ( Ether(src=phy.local_mac, dst=host.local_mac) / IP(src=phy.local_ip4, dst=phy.remote_hosts[j].ip4) / UDP(sport=1234, dport=1234) / Raw() ) rxs = self.send_and_expect(host, [p], phy) # verify packet is unchanged for rx in rxs: self.assertEqual(p.show2(True), rx.show2(True)) # ARPs x-connect to phy p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP( op="who-has", hwdst=phy.remote_hosts[j].mac, hwsrc=phy.local_mac, psrc=phy.local_ip4, pdst=phy.remote_hosts[j].ip4, ) rxs = self.send_and_expect(host, [p], phy) # verify packet is unchanged for rx in rxs: self.assertEqual(p.show2(True), rx.show2(True)) # phy to host for phy, host in zip(phys, hosts): for j in range(N_HOSTS): p = ( Ether(dst=phy.local_mac, src=phy.remote_hosts[j].mac) / IP(dst=phy.local_ip4, src=phy.remote_hosts[j].ip4) / UDP(sport=1234, dport=1234) / Raw() ) rxs = self.send_and_expect(phy, [p], host) # verify packet is unchanged for rx in rxs: self.assertEqual(p.show2(True), rx.show2(True)) # ARPs rx'd on the phy are sent to the host p = Ether(dst="ff:ff:ff:ff:ff:ff", src=phy.remote_hosts[j].mac) / ARP( op="is-at", hwsrc=phy.remote_hosts[j].mac, hwdst=phy.local_mac, pdst=phy.local_ip4, psrc=phy.remote_hosts[j].ip4, ) rxs = self.send_and_expect(phy, [p], host) # verify packet is unchanged for rx in rxs: self.assertEqual(p.show2(True), rx.show2(True)) # cleanup for phy in phys: phy.unconfig_ip4() def test_linux_cp_tun(self): """Linux CP TUN""" # # Setup # N_PKTS = 31 # create two pairs, wihch a bunch of hots on the phys hosts = [self.pg4, self.pg5] phy = self.pg2 phy.config_ip4() phy.config_ip6() phy.resolve_arp() phy.resolve_ndp() tun4 = VppIpIpTunInterface( self, phy, phy.local_ip4, phy.remote_ip4 ).add_vpp_config() tun6 = VppIpIpTunInterface(
/*
* Copyright (c) 2019 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __IPSEC_PUNT_H__
#define __IPSEC_PUNT_H__
#include <vlib/vlib.h>
#define foreach_ipsec_punt_reason \
_ (IP4_SPI_UDP_0, "ipsec4-spi-o-udp-0", IP4_PACKET) \
_ (IP4_NO_SUCH_TUNNEL, "ipsec4-no-such-tunnel", IP4_PACKET) \
_ (IP6_NO_SUCH_TUNNEL, "ipsec6-no-such-tunnel", IP6_PACKET)
typedef enum ipsec_punt_reason_t_
{
#define _(s, v, f) IPSEC_PUNT_##s,
foreach_ipsec_punt_reason
#undef _
IPSEC_PUNT_N_REASONS,
} ipsec_punt_reason_type_t;
extern u8 *format_ipsec_punt_reason (u8<