#!/usr/bin/env python import unittest from logging import * from framework import VppTestCase, VppTestRunner from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_pppoe_interface import VppPppoeInterface from vpp_papi_provider import L2_VTR_OP from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.ppp import PPPoE, PPPoED, PPP from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from scapy.volatile import RandMAC, RandIP from util import ppp, ppc, mactobinary import socket class TestPPPoE(VppTestCase): """ PPPoE Test Case """ @classmethod def setUpClass(cls): super(TestPPPoE, cls).setUpClass() cls.session_id = 1 cls.dst_ip = "100.1.1.100" cls.dst_ipn = socket.inet_pton(socket.AF_INET, cls.dst_ip) def setUp(self): super(TestPPPoE, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(3)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() def tearDown(self): super(TestPPPoE, self).tearDown() self.logger.info(self.vapi.cli("show int")) self.logger.info(self.vapi.cli("show pppoe fib")) self.logger.info(self.vapi.cli("show pppoe session")) self.logger.info(self.vapi.cli("show ip fib")) self.logger.info(self.vapi.cli("show trace")) for i in self.pg_interfaces: i.unconfig_ip4() i.admin_down() def create_stream_pppoe_discovery(self, src_if, dst_if, client_mac, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoED(sessionid=0) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_pppoe_lcp(self, src_if, dst_if, client_mac, session_id, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoE(sessionid=session_id) / PPP(proto=0xc021) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_pppoe_ip4(self, src_if, dst_if, client_mac, session_id, client_ip, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoE(sessionid=session_id) / PPP(proto=0x0021) / IP(src=client_ip, dst=self.dst_ip) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_ip4(self, src_if, dst_if, client_ip, dst_ip, count=1): pkts = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=dst_ip, dst=client_ip) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list pkts.append(p) # return the created packet list return pkts def verify_decapped_pppoe(self, src_if, capture, sent): self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): try: tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) except: self.logger.error(ppp("Rx:", rx)) self.logger.error(ppp("Tx:", tx)) raise def verify_encaped_pppoe(self, src_if, capture, sent, session_id): self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): try: tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) rx_pppoe = rx[PPPoE] self.assertEqual(rx_pppoe.sessionid, session_id) except: self.logger.error(ppp("Rx:", rx)) self.logger.error(ppp("Tx:", tx)) raise def test_PPPoE_Decap(self): """ PPPoE Decap Test """ self.vapi.cli("clear trace") # # Add a route that resolves the server's destination # route_sever_dst = VppIpRoute(self, "100.1.1.100", 32, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]) route_sever_dst.add_vpp_config() # Send PPPoE Discovery tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1, self.pg0.remote_mac) self.pg0.add_stream(tx0) self.pg_start() # S
from vpp_interface import VppInterface
from vpp_ip import VppIpAddress


INDEX_INVALID = 0xffffffff


def find_vxlan_gbp_tunnel(test, src, dst, vni):
    vsrc = VppIpAddress(src)
    vdst = VppIpAddress(dst)

    ts = test.vapi.vxlan_gbp_tunnel_dump(INDEX_INVALID)
    for t in ts:
        if vsrc == t.tunnel.src and \
           vdst == t.tunnel.dst and \
           t.tunnel.vni == vni:
            return t.tunnel.sw_if_index
    return INDEX_INVALID


class VppVxlanGbpTunnel(VppInterface):
    """
    VPP VXLAN GBP interface
    """

    def __init__(self, test, src, dst, vni, mcast_itf=None):
        """ Create VXLAN-GBP Tunnel interface """
        super(VppVxlanGbpTunnel, self).__init__(test)
        self.src = VppIpAddress(src)
        self.dst = VppIpAddress(dst)
        self.vni = vni
        self.mcast_itf = mcast_itf

    def add_vpp_config(self):
        mcast_sw_if_index = INDEX_INVALID
        if (self.mcast_itf):
            mcast_sw_if_index = self.mcast_itf.sw_if_index
        reply = self.test.vapi.vxlan_gbp_tunnel_add_del(
            self.src.encode(),
            self.