#!/usr/bin/env python import binascii import random import socket import unittest from scapy.contrib.mpls import MPLS from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes from scapy.layers.l2 import Ether, Dot1Q, ARP from scapy.packet import Raw from six import moves from framework import VppTestCase, VppTestRunner from util import ppp from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \ VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \ VppMplsTable, VppIpTable from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint class TestIPv4(VppTestCase): """ IPv4 Test Case """ def setUp(self): """ Perform test setup before test case. **Config:** - create 3 pg interfaces - untagged pg0 interface - Dot1Q subinterface on pg1 - Dot1AD subinterface on pg2 - setup interfaces: - put it into UP state - set IPv4 addresses - resolve neighbor address using ARP - configure 200 fib entries :ivar list interfaces: pg interfaces and subinterfaces. :ivar dict flows: IPv4 packet flows in test. """ super(TestIPv4, self).setUp() # create 3 pg interfaces self.create_pg_interfaces(range(3)) # create 2 subinterfaces for pg1 and pg2 self.sub_interfaces = [ VppDot1QSubint(self, self.pg1, 100), VppDot1ADSubint(self, self.pg2, 200, 300, 400)] # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc. self.flows = dict() self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if] self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if] self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if] # packet sizes self.pg_if_packet_sizes = [64, 1500, 9020] self.interfaces = list(self.pg_interfaces) self.interfaces.extend(self.sub_interfaces) # setup all interfaces for i in self.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() # config 2M FIB entries self.config_fib_entries(200) def tearDown(self): """Run standard test teardown and log ``show ip arp``.""" super(TestIPv4, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show ip arp")) # info(self.vapi.cli("show ip fib")) # many entries def config_fib_entries(self, count): """For each interface add to the FIB table *count* routes to "10.0.0.1/32" destination with interface's local address as next-hop address. :param int count: Number of FIB entries. - *TODO:* check if the next-hop address shouldn't be remote address instead of local address. """ n_int = len(self.interfaces) percent = 0 counter = 0.0 dest_addr = socket.inet_pton(socket.AF_INET, "10.0.0.1") dest_addr_len = 32 for i in self.interfaces: next_hop_address = i.local_ip4n for j in range(count / n_int): self.vapi.ip_add_del_route( dest_addr, dest_addr_len, next_hop_address) counter += 1 if counter / count * 100 > percent: self.logger.info("Configure %d FIB entries .. %d%% done" % (count, percent)) percent += 1 def modify_packet(self, src_if, packet_size, pkt): """Add load, set destination IP and extend packet to required packet size for defined interface. :param VppInterface src_if: Interface to create packet for. :param int packet_size: Required packet size. :param Scapy pkt: Packet to be modified. """ dst_if_idx = packet_size / 10 % 2 dst_if = self.flows[src_if][dst_if_idx] info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = pkt/Raw(payload) p[IP].dst = dst_if.remote_ip4 info.data = p.copy() if isinstance(src_if, VppSubInterface): p = src_if.add_dot1_layer(p) self.extend_packet(p, packet_size) return p def create_stream(self, src_if): """Create input packet stream for defined interface. :param VppInterface src_if: Interface to create packet stream for. """ hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0 pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4) / UDP(sport=1234, dport=1234)) pkts = [self.modify_packet(src_if, i, pkt_tmpl) for i in moves.range(self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10)] pkts_b = [self.modify_packet(src_if, i, pkt_tmpl) for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext, self.pg_if_packet_sizes[2] + hdr_ext, 50)] pkts.extend(pkts_b) return pkts def verify_capture(self, dst_if, capture): """Verify captured input packet stream for defined interface. :param VppInterface dst_if: Interface to verify captured packet stream for. :param list capture: Captured packet stream. """ self.logger.info("Verifying capture on interface %s" % dst_if.name) last_info = dict() for i in self.interfaces: last_info[i.sw_if_index] = None is_sub_if = False dst_sw_if_index = dst_if.sw_if_index if hasattr(dst_if, 'parent'): is_sub_if = True for packet in capture: if is_sub_if: # Check VLAN tags and Ethernet header packet = dst_if.remove_dot1_layer(packet) self.assertTrue(Dot1Q not in packet) try: ip = packet[IP] udp = packet[UDP] payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( "Got packet on port %s: src=%u (id=%u)" % (dst_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data # Check standard fields self.assertEqual(ip.src, saved_packet[IP].src) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertE
#!/usr/bin/env python3
"""CLI functional tests"""
import datetime
import time
import unittest
from vpp_papi import VPPIOError
from framework import VppTestCase, VppTestRunner
class TestCLI(VppTestCase):
""" CLI Test Case """
maxDiff = None
@classmethod
def setUpClass(cls):
# using the framework default
cls.vapi_response_timeout = 5
super(TestCLI, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestCLI, cls).tearDownClass()
def setUp(self):
super(TestCLI, self).setUp()
def tearDown(self):
super(TestCLI, self).tearDown()
def test_cli_retval(self):
""" CLI inband retval """
rv = self.vapi.papi.cli_inband(cmd='this command does not exist')
self.assertNotEqual(rv.retval, 0)
rv = self.vapi.papi.cli_inband(cmd='show version')
self.assertEqual(rv.retval, 0)
def test_long_cli_delay(self):
""" Test that VppApiClient raises VppIOError if timeout.""" # noqa
with self.assertRaises(VPPIOError) as ctx:
rv = self.vapi.papi.cli_inband(cmd='wait 10')
def test_long_cli_delay_override(self):
""" Test per-command _timeout option.""" # noqa
rv = self.vapi.papi.cli_inband(cmd='wait 10', _timeout=15)
self.assertEqual(rv.retval, 0)
class TestCLIExtendedVapiTimeout(VppTestCase):
maxDiff = None
@classmethod
def setUpClass(cls):
cls.vapi_response_timeout = 15
cls.__doc__ = " CLI Test Case w/ Extended (%ssec) Vapi Timeout " \
% cls.vapi_response_timeout
super(TestCLIExtendedVapiTimeout, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestCLIExtendedVapiTimeout, cls).tearDownClass()
def setUp(self):
super(TestCLIExtendedVapiTimeout, self).setUp()
def tearDown(self):
super(TestCLIExtendedVapiTimeout, self).tearDown()
def test_long_cli_delay(self):
""" Test that delayed result returns with extended timeout."""
wait_secs = self.vapi_response_timeout - 1
# get vpp time as float
start = self.vapi.papi.show_vpe_system_time(
_no_type_conversion=True).vpe_system_time
rv = self.vapi.papi.cli_inband(cmd='wait %s' % wait_secs)
now = self.vapi.papi.show_vpe_system_time(
_no_type_conversion=True).vpe_system_time
# assume that the overhead of the measurement is not more that .5 sec.
self.assertEqual(round(now - start), wait_secs)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)