#!/usr/bin/env python3 import random import unittest import datetime import re from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from framework import VppTestCase, VppTestRunner from vpp_sub_interface import VppP2PSubint from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_papi import mac_pton class P2PEthernetAPI(VppTestCase): """P2P Ethernet tests""" p2p_sub_ifs = [] @classmethod def setUpClass(cls): super(P2PEthernetAPI, cls).setUpClass() # Create pg interfaces cls.create_pg_interfaces(range(4)) # Set up all interfaces for i in cls.pg_interfaces: i.admin_up() @classmethod def tearDownClass(cls): super(P2PEthernetAPI, cls).tearDownClass() def create_p2p_ethernet(self, parent_if, sub_id, remote_mac): p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac)) self.p2p_sub_ifs.append(p2p) def delete_p2p_ethernet(self, parent_if, remote_mac): self.vapi.p2p_ethernet_del(parent_if.sw_if_index, mac_pton(remote_mac)) def test_api(self): """delete/create p2p subif""" self.logger.info("FFP_TEST_START_0000") self.create_p2p_ethernet(self.pg0, 1, "de:ad:00:00:00:01") self.create_p2p_ethernet(self.pg0, 2, "de:ad:00:00:00:02") intfs = self.vapi.cli("show interface") self.assertIn("pg0.1", intfs) self.assertIn("pg0.2", intfs) self.assertNotIn("pg0.5", intfs) # create pg2.5 subif self.create_p2p_ethernet(self.pg0, 5, "de:ad:00:00:00:ff") intfs = self.vapi.cli("show interface") self.assertIn("pg0.5", intfs) # delete pg2.5 subif self.delete_p2p_ethernet(self.pg0, "de:ad:00:00:00:ff") intfs = self.vapi.cli("show interface") self.assertIn("pg0.1", intfs) self.assertIn("pg0.2", intfs) self.assertNotIn("pg0.5", intfs) self.logger.info("FFP_TEST_FINISH_0000") def test_p2p_subif_creation_1k(self): """create 1k of p2p subifs""" self.logger.info("FFP_TEST_START_0001") macs = [] clients = 1000 mac = int("dead00000000", 16) for i in range(1, clients + 1): try: macs.append(":".join(re.findall("..", "{:02x}".format(mac + i)))) self.vapi.p2p_ethernet_add( self.pg2.sw_if_index, mac_pton(macs[i - 1]), i ) except Exception: self.logger.info("Failed to create subif %d %s" % (i, macs[i - 1])) raise intfs = self.vapi.cli("show interface").split("\n") count = 0 for intf in intfs: if intf.startswith("pg2."): count += 1 self.assertEqual(count, clients) self.logger.info("FFP_TEST_FINISH_0001") class P2PEthernetIPV6(VppTestCase): """P2P Ethernet IPv6 tests""" p2p_sub_ifs = [] packets = [] @classmethod def setUpClass(cls): super(P2PEthernetIPV6, cls).setUpClass() # Create pg interfaces cls.create_pg_interfaces(range(3)) # Packet sizes cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # Set up all interfaces for i in cls.pg_interfaces: i.admin_up() cls.pg0.generate_remote_hosts(3) cls.pg0.configure_ipv6_neighbors() cls.pg1.config_ip6() cls.pg1.generate_remote_hosts(3) cls.pg1.configure_ipv6_neighbors() cls.pg1.disable_ipv6_ra() @classmethod def tearDownClass(cls): super(P2PEthernetIPV6, cls).tearDownClass() def setUp(self): super(P2PEthernetIPV6, self).setUp() for p in self.packets: self.packets.remove(p) self.p2p_sub_ifs.append( self.create_p2p_ethernet(self.pg0, 1, self.pg0._remote_hosts[0].mac) ) self.p2p_sub_ifs.append( self.create_p2p_ethernet(self.pg0, 2, self.pg0._remote_hosts[1].mac) ) self.vapi.cli("trace add p2p-ethernet-input 50") def tearDown(self): while len(self.p2p_sub_ifs): p2p = self.p2p_sub_ifs.pop() self.delete_p2p_ethernet(p2p) super(P2PEthernetIPV6, self).tearDown() def create_p2p_ethernet(self, parent_if, sub_id, remote_mac): p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac)) p2p.admin_up() p2p.config_ip6() p2p.disable_ipv6_ra() return p2p def delete_p2p_ethernet(self, p2p): p2p.unconfig_ip6() p2p.admin_down() self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index, p2p.p2p_remote_mac) def create_stream( self, src_mac=None, dst_mac=None, src_ip=None, dst_ip=None, size=None ): pkt_size = size if size is None: pkt_size = random.choice(self.pg_if_packet_sizes) p = Ether(src=src_mac, dst=dst_mac) p /= IPv6(src=src_ip, dst=dst_ip) p /= UDP(sport=1234, dport=4321) / Raw(b"\xa5" * 20) self.extend_packet(p, pkt_size) return p def send_packets(self, src_if=None, dst_if=None, packets=None, count=None): self.pg_enable_capture([dst_if]) if packets is None: packets = self.packets src_if.add_stream(packets) self.pg_start() if count is None:
.. _developer-friendly:
==================
Developer Friendly
==================
This section describes the different ways VPP is friendly to developers:
* Extensive runtime counters; throughput, `intructions per cycle <https://en.wikipedia.org/wiki/Instructions_per_cycle>`_, errors, events etc.
* Integrated pipeline tracing facilities
* Multi-language API bindings
* Integrated command line for debugging
* Fault-tolerant and upgradable
* Runs as a standard user-space process for fault tolerance, software crashes seldom require more than a process restart.
* Improved fault-tolerance and upgradability when compared to running similar packet processing in the kernel, software updates never require system reboots.
* Development expierence is easier compared to similar kernel code
* Hardware isolation and protection (`iommu <https://en.wikipedia.org/wiki/Input%E2%80%93output_memory_management_unit>`_)
* Built for security
* Extensive white-box testing
* Image segment base address randomization
* Shared-memory segment base address randomization
* Stack bounds checking
* Static analysis with `Coverity <https://en.wikipedia.org/wiki/Coverity>`_