#!/usr/bin/env python3 import socket import unittest from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.ppp import PPPoE, PPPoED, PPP from scapy.layers.inet import IP from framework import VppTestCase, VppTestRunner from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_pppoe_interface import VppPppoeInterface from util import ppp, ppc class TestPPPoE(VppTestCase): """ PPPoE Test Case """ @classmethod def setUpClass(cls): super(TestPPPoE, cls).setUpClass() cls.session_id = 1 cls.dst_ip = "100.1.1.100" cls.dst_ipn = socket.inet_pton(socket.AF_INET, cls.dst_ip) @classmethod def tearDownClass(cls): super(TestPPPoE, cls).tearDownClass() def setUp(self): super(TestPPPoE, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(3)) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() def tearDown(self): super(TestPPPoE, self).tearDown() for i in self.pg_interfaces: i.unconfig_ip4() i.admin_down() def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show int")) self.logger.info(self.vapi.cli("show pppoe fib")) self.logger.info(self.vapi.cli("show pppoe session")) self.logger.info(self.vapi.cli("show ip fib")) self.logger.info(self.vapi.cli("show trace")) def create_stream_pppoe_discovery(self, src_if, dst_if, client_mac, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoED(sessionid=0) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_pppoe_lcp(self, src_if, dst_if, client_mac, session_id, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoE(sessionid=session_id) / PPP(proto=0xc021) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_pppoe_ip4(self, src_if, dst_if, client_mac, session_id, client_ip, count=1): packets = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=client_mac) / PPPoE(sessionid=session_id) / PPP(proto=0x0021) / IP(src=client_ip, dst=self.dst_ip) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list packets.append(p) # return the created packet list return packets def create_stream_ip4(self, src_if, dst_if, client_ip, dst_ip, count=1): pkts = [] for i in range(count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=dst_ip, dst=client_ip) / Raw(payload)) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list pkts.append(p) # return the created packet list return pkts def verify_decapped_pppoe(self, src_if, capture, sent): self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): try: tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) except: self.logger.error(ppp("Rx:", rx)) self.logger.error(ppp("Tx:", tx)) raise def verify_encaped_pppoe(self, src_if, capture, sent, session_id): self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): try: tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) rx_pppoe = rx[PPPoE] self.assertEqual(rx_pppoe.sessionid, session_id) except: self.logger.error(ppp("Rx:", rx)) self.logger.error(ppp("Tx:", tx)) raise def test_PPPoE_Decap(self): """ PPPoE Decap Test """ self.vapi.cli("clear trace") # # Add a route that resolves the server's destination # route_sever_dst = VppIpRoute(self, "100.1.1.100", 32, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]) route_sever_dst.add_vpp_config() # Send PPPoE Discovery tx0 = self.create_stream_pppoe_discovery(self.pg0, self.pg1, self.pg0.remote_mac) self.pg0.add_stream(tx0) self.pg_start() # Send PPPoE PPP LCP tx1 = self.create_stream_pppoe_lcp(self.pg0, self.pg1, self.pg0.remote_mac, self.session_id) self.pg0.add_stream(tx1) self.pg_start() # Create PPPoE session pppoe_if = VppPppoeInterface(self, self.pg0.remote_ip4, s
#!/usr/bin/env python
# Copyright (c) 2016 Comcast Cable Communications Management, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Filter for .c files to make various preprocessor tricks Doxygenish

import os, sys, re

if len(sys.argv) < 2:
    sys.stderr.write("Usage: %s <filename>\n" % (sys.argv[0]))
    sys.exit(1)

replace_patterns = [
    # Search for VLIB_CLI_COMMAND, extract its parameters and add a docblock for it
    ( re.compile("(?P<m>VLIB_CLI_COMMAND)\s*[(](?P<name>[a-zA-Z0-9_]+)[)]"),
        r"/** @brief (@em constructor) \g<m> (\g<name>) */ vlib_cli_command_t \g<name>"),
    ( re.compile("(?P<m>VLIB_CLI_COMMAND)\s*[(](?P<name>[a-zA-Z0-9_]+),\s*(?P<qual>[^)]*)[)]"),
        r"/** @brief (@em constructor) \g<m> (\g<name>) */ \g<qual> vlib_cli_command_t \g<name>"),

    # Search for VLIB_REGISTER_NODE, extract its parameters and add a docblock for it
    ( re.compile("(?P<m>VLIB_REGISTER_NODE)\s*[(](?P<name>[a-zA-Z0-9_]+)[)]"),
        r"/** @brief (@em constructor) \g<m> (\g<name>) */ vlib_node_registration_t \g<name>"),
    ( re.compile("(?P<m>VLIB_REGISTER_NODE)\s*[(](?P<name>[a-zA-Z0-9_]+),\s*(?P<qual>[^)]*)[)]"),
        r"/** @brief (@em constructor) \g<m> (\g<name>) */ \g<qual> vlib_node_registration_t \g<name>"),

    # Search for VLIB_INIT_FUNCTION, extract its parameter and add a docblock for it
    ( re.compile("(?P<m>VLIB_INIT_FUNCTION)\s*[(](?P<name>[a-zA-Z0-9_]+)[)]"),
        r"/** @brief (@em constructor) \g<m> (@ref \g<name>) */ vlib_init_function_t * _vlib_init_function_\g<name>"),
    ( re.compile("(?P<m>VLIB_DECLARE_INIT_FUNCTION)\s*[(](?P<name>[a-zA-Z0-9_]+)[)]"),