#!/usr/bin/env python3 import unittest from framework import VppTestCase from asfframework import VppTestRunner, tag_fixme_vpp_workers from vpp_ip import INVALID_INDEX from vpp_ip_route import ( VppIpRoute, VppRoutePath, VppMplsRoute, VppMplsIpBind, VppIpMRoute, VppMRoutePath, VppIpTable, VppMplsTable, VppMplsLabel, MplsLspMode, find_mpls_route, FibPathProto, FibPathType, FibPathFlags, VppMplsLabel, MplsLspMode, ) from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface from vpp_papi import VppEnum from config import config import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes from scapy.layers.inet6 import ( IPv6, ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6PacketTooBig, ) from scapy.contrib.mpls import MPLS NUM_PKTS = 67 # scapy removed these attributes. # we asked that they be restored: https://github.com/secdev/scapy/pull/1878 # semantic names have more meaning than numbers. so here they are. ARP.who_has = 1 ARP.is_at = 2 def verify_filter(capture, sent): if not len(capture) == len(sent): # filter out any IPv6 RAs from the capture for p in capture: if p.haslayer(IPv6): capture.remove(p) return capture def verify_mpls_stack(tst, rx, mpls_labels): # the rx'd packet has the MPLS label popped eth = rx[Ether] tst.assertEqual(eth.type, 0x8847) rx_mpls = rx[MPLS] for ii in range(len(mpls_labels)): tst.assertEqual(rx_mpls.label, mpls_labels[ii].value) tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp) tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl) if ii == len(mpls_labels) - 1: tst.assertEqual(rx_mpls.s, 1) else: # not end of stack tst.assertEqual(rx_mpls.s, 0) # pop the label to expose the next rx_mpls = rx_mpls[MPLS].payload @tag_fixme_vpp_workers class TestMPLS(VppTestCase): """MPLS Test Case""" @classmethod def setUpClass(cls): super(TestMPLS, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestMPLS, cls).tearDownClass() def setUp(self): super(TestMPLS, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(4)) # setup both interfaces # assign them different tables. table_id = 0 self.tables = [] tbl = VppMplsTable(self, 0) tbl.add_vpp_config() self.tables.append(tbl) for i in self.pg_interfaces: i.admin_up() if table_id != 0: tbl = VppIpTable(self, table_id) tbl.add_vpp_config() self.tables.append(tbl) tbl = VppIpTable(self, table_id, is_ip6=1) tbl.add_vpp_config() self.tables.append(tbl) i.set_table_ip4(table_id) i.set_table_ip6(table_id) i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() i.enable_mpls() table_id += 1 def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.set_table_ip4(0) i.set_table_ip6(0) i.disable_mpls() i.admin_down() super(TestMPLS, self).tearDown() # the default of 64 matches the IP packet TTL default def create_stream_labelled_ip4( self, src_if, mpls_labels, ping=0, ip_itf=None, dst_ip=None, chksum=None, ip_ttl=64, n=257, ): self.reset_packet_infos() pkts = [] for i in range(0, n): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for ii in range(len(mpls_labels)): p = p / MPLS( label=mpls_labels[ii].value, ttl=mpls_labels[ii].ttl, cos=mpls_labels[ii].exp, ) if not ping: if not dst_ip: p = ( p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload) ) else: p = ( p / IP(src=src_if.local_ip4, dst=dst_ip, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload) ) else: p = ( p / IP(src=ip_itf.remote_ip4, dst=ip_itf.local_ip4, ttl=ip_ttl) / ICMP() ) if chksum: p[IP].chksum = chksum info.data = p.copy() pkts.append(p) return pkts def create_stream_ip4( self, src_if, dst_ip, ip_ttl=64, ip_dscp=0, payload_size=None, n=257 ): self.reset_packet_infos() pkts = [] for i in range(0, n): dst = dst_ip[i % len(dst_ip)] if isinstance(dst_ip, list) else dst_ip info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = ( Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst, ttl=ip_ttl, tos=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload) ) info.data = p.copy() if payload_size: self.extend_packet(p, payload_size) pkts.append(p) return pkts def create_stream_ip6(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0, n=257): self.reset_packet_infos() pkts = [] for i in range(0, n): dst = dst_ip[i % len(dst_ip)] if isinstance(dst_ip, list) else dst_ip info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = ( Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_if.remote_ip6, dst=dst, hlim=ip_ttl, tc=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload) ) info.data = p.copy() pkts.append(p) return pkts def create_stream_labelled_ip6( self, src_if, mpls_labels, hlim=64, dst_ip=None, ping=0, ip_itf=None ): if dst_ip is None: dst_ip = src_if.remote_ip6 self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for l in mpls_labels: p = p / MPLS(label=l.value, ttl=l.ttl, cos=l.exp) if ping: p = p / ( IPv6(src=ip_itf.remote_ip6, dst=ip_i
from trex_stl_lib.api import *
class STLS1:
def create_stream(self, port_id):
# base_pkt = Ether()/IP(dst="2.2.0.1")/UDP(dport=12)
# pad = Padding()
# if len(base_pkt) < 64:
# pad_len = 64 - len(base_pkt)
# pad.load = '\x00' * pad_len
# vm = STLVM()
# vm.tuple_var(name="tuple", ip_min="10.0.0.3", ip_max="10.0.0.202", port_min=1025, port_max=61124, limit_flows = 100000)
# vm.write(fv_name="tuple.ip", pkt_offset="IP.src")
# vm.fix_chksum()
# vm.write(fv_name="tuple.port", pkt_offset="UDP.sport")
# pkt = STLPktBuilder(pkt=base_pkt/pad, vm=vm)
# return STLStream(packet=pkt, mode=STLTXCont())
vm = STLScVmRaw([STLVmTupleGen(ip_min="10.0.0.1", ip_max="10.255.255.254",
port_min=1025, port_max=65535,
name="stuple", limit_flows=10000),
STLVmTupleGen(ip_min="2.0.0.1", ip_max="2.255.255.254",
port_min=1025, port_max=65535,
name="dtuple", limit_flows=100000000),
# write ip to packet IP.src
STLVmWrFlowVar(fv_name="stuple.ip",
pkt_offset="IP.src"),
STLVmWrFlowVar(fv_name="dtuple.ip",
pkt_offset="IP.dst"),
# fix checksum
STLVmFixIpv4(offset="IP"),
# write udp.port
STLVmWrFlowVar(fv_name="stuple.port",
pkt_offset="UDP.sport"),
STLVmWrFlowVar(fv_name="dtuple.port",
pkt_offset="UDP.dport"),
]
)
base_pkt = Ether()/IP(src="16.0.0.1", dst="2.0.0.1")/UDP(dport=12, sport=1025)
pad = Padding()
if len(base_pkt) < 64:
pad_len = 64 - len(base_pkt)
pad.load = '\x00' * pad_len
pad = max(0, 64 - len(base_pkt)) * 'x'
pad_latency = max(0, (64-4) - len(base_pkt)) * 'x'
pkt = STLPktBuilder(pkt=base_pkt/pad, vm=vm)
return [STLStream(packet=pkt, mode=STLTXCont()),
# latency stream
STLStream(packet = STLPktBuilder(pkt = base_pkt/pad_latency),
mode = STLTXCont(pps=1000),
flow_stats = STLFlowLatencyStats(pg_id = 12+port_id))
]
def get_streams(self, direction=0, **kwargs):
# return [self.create_stream()]
return self.create_stream(kwargs['port_id'])
# dynamic load - used for trex console or simulator
def register():
return STLS1()