#!/usr/bin/env python3 import unittest import socket from framework import tag_fixme_vpp_workers from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto, INVALID_INDEX from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \ VppMplsIpBind, VppIpMRoute, VppMRoutePath, \ VppIpTable, VppMplsTable, \ VppMplsLabel, MplsLspMode, find_mpls_route, \ FibPathProto, FibPathType, FibPathFlags, VppMplsLabel, MplsLspMode from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface from vpp_papi import VppEnum import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6EchoRequest, \ ICMPv6PacketTooBig from scapy.contrib.mpls import MPLS NUM_PKTS = 67 # scapy removed these attributes. # we asked that they be restored: https://github.com/secdev/scapy/pull/1878 # semantic names have more meaning than numbers. so here they are. ARP.who_has = 1 ARP.is_at = 2 def verify_filter(capture, sent): if not len(capture) == len(sent): # filter out any IPv6 RAs from the capture for p in capture: if p.haslayer(IPv6): capture.remove(p) return capture def verify_mpls_stack(tst, rx, mpls_labels): # the rx'd packet has the MPLS label popped eth = rx[Ether] tst.assertEqual(eth.type, 0x8847) rx_mpls = rx[MPLS] for ii in range(len(mpls_labels)): tst.assertEqual(rx_mpls.label, mpls_labels[ii].value) tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp) tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl) if ii == len(mpls_labels) - 1: tst.assertEqual(rx_mpls.s, 1) else: # not end of stack tst.assertEqual(rx_mpls.s, 0) # pop the label to expose the next rx_mpls = rx_mpls[MPLS].payload @tag_fixme_vpp_workers class TestMPLS(VppTestCase): """ MPLS Test Case """ @classmethod def setUpClass(cls): super(TestMPLS, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestMPLS, cls).tearDownClass() def setUp(self): super(TestMPLS, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(4)) # setup both interfaces # assign them different tables. table_id = 0 self.tables = [] tbl = VppMplsTable(self, 0) tbl.add_vpp_config() self.tables.append(tbl) for i in self.pg_interfaces: i.admin_up() if table_id != 0: tbl = VppIpTable(self, table_id) tbl.add_vpp_config() self.tables.append(tbl) tbl = VppIpTable(self, table_id, is_ip6=1) tbl.add_vpp_config() self.tables.append(tbl) i.set_table_ip4(table_id) i.set_table_ip6(table_id) i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() i.enable_mpls() table_id += 1 def tearDown(self): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.set_table_ip4(0) i.set_table_ip6(0) i.disable_mpls() i.admin_down() super(TestMPLS, self).tearDown() # the default of 64 matches the IP packet TTL default def create_stream_labelled_ip4( self, src_if, mpls_labels, ping=0, ip_itf=None, dst_ip=None, chksum=None, ip_ttl=64, n=257): self.reset_packet_infos() pkts = [] for i in range(0, n): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for ii in range(len(mpls_labels)): p = p / MPLS(label=mpls_labels[ii].value, ttl=mpls_labels[ii].ttl, cos=mpls_labels[ii].exp) if not ping: if not dst_ip: p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload)) else: p = (p / IP(src=src_if.local_ip4, dst=dst_ip, ttl=ip_ttl) / UDP(sport=1234, dport=1234) / Raw(payload)) else: p = (p / IP(src=ip_itf.remote_ip4, dst=ip_itf.local_ip4, ttl=ip_ttl) / ICMP()) if chksum: p[IP].chksum = chksum info.data = p.copy() pkts.append(p) return pkts def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0, payload_size=None): self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() if payload_size: self.extend_packet(p, payload_size) pkts.append(p) return pkts def create_stream_ip6(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0): self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=ip_ttl, tc=ip_dscp) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_stream_labelled_ip6(self, src_if, mpls_labels, hlim=64, dst_ip=None, ping=0, ip_itf=None): if dst_ip is None: dst_ip = src_if.remote_ip6 self.reset_packet_infos() pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if, src_if) payload = self.info_to_payload(info) p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) for l in mpls_labels: p = p / MPLS(label=l.value, ttl=l.ttl, cos=l.exp) if ping: p = p / (IPv6(src=ip_itf.remote_ip6, dst=ip_itf.local_ip6) / ICMPv6EchoRequest()) else: p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def verify_capture_ip4(self, src_if, capture, sent, ping_resp=0, ip_ttl=None, ip_dscp=0): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether] self.assertEqual(eth.type, 0x800) tx_ip = tx[IP] rx_ip = rx[IP] if not ping_resp: self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) self.assertEqual(rx_ip.tos, ip_dscp) if not ip_ttl: # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl) else: self.assertEqual(rx_ip.ttl, ip_ttl) else: self.assertEqual(rx_ip.src, tx_ip.dst) self.assertEqual(rx_ip.dst, tx_ip.src) except: raise def verify_capture_labelled_ip4(self, src_if, capture, sent, mpls_labels, ip_ttl=None): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] verify_mpls_stack(self, rx, mpls_labels) self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) if not ip_ttl: # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl) else: self.assertEqual(rx_ip.ttl, ip_ttl) except: raise def verify_capture_labelled_ip6(self, src_if, capture, sent, mpls_labels, ip_ttl=None): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] tx_ip = tx[IPv6] rx_ip = rx[IPv6] verify_mpls_stack(self, rx, mpls_labels) self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) if not ip_ttl: # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim) else: self.assertEqual(rx_ip.hlim, ip_ttl) except: raise def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] verify_mpls_stack(self, rx, mpls_labels) self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl) except: raise def verify_capture_labelled(self, src_if, capture, sent, mpls_labels): try: capture = verify_filter(capture, sent) self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): rx = capture[i] verify_mpls_stack(self, rx, mpls_labels) except: raise def verify_capture_ip6(self, src_if, capture, sent, ip_hlim=None, ip_dscp=0, ping_resp=0): try: self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether] self.assertEqual(eth.type, 0x86DD) tx_ip = tx[IPv6] rx_ip = rx[IPv6] if not ping_resp: self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) self.assertEqual(rx_ip.tc, ip_dscp) # IP processing post pop has decremented the TTL if not ip_hlim: self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim) else: self.assertEqual(rx_ip.hlim, ip_hlim) else: self.assertEqual(rx_ip.src, tx_ip.dst) self.assertEqual(rx_ip.dst, tx_ip.src) except: raise def verify_capture_ip6_icmp(self, src_if, capture, sent): try: # rate limited ICMP self.assertTrue(len(capture) <= len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether] self.assertEqual(eth.type, 0x86DD) tx_ip = tx[IPv6] rx_ip = rx[IPv6] self.assertEqual(rx_ip.dst, tx_ip.src) # ICMP sourced from the interface's address self.assertEqual(rx_ip.src, src_if.local_ip6) # hop-limit reset to 255 for IMCP packet self.assertEqual(rx_ip.hlim, 255) icmp = rx[ICMPv6TimeExceeded] except: raise def verify_capture_fragmented_labelled_ip4(self, src_if, capture, sent, mpls_labels, ip_ttl=None): try: capture = verify_filter(capture, sent) for i in range(len(capture)): tx = sent[0] rx = capture[i] tx_ip = tx[IP] rx_ip = rx[IP] verify_mpls_stack(self, rx, mpls_labels) self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) if not ip_ttl: # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl) else: self.assertEqual(rx_ip.ttl, ip_ttl) except: raise def verify_capture_fragmented_labelled_ip6(self, src_if, capture, sent, mpls_labels, ip_ttl=None): try: capture = verify_filter(capture, sent) for i in range(len(capture)): tx = sent[0] rx = capture[i] tx_ip = tx[IPv6] rx.show() rx_ip = IPv6(rx[MPLS].payload) rx_ip.show() verify_mpls_stack(self, rx, mpls_labels) self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) if not ip_ttl: # IP processing post pop has decremented the hop-limit self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim) else: self.assertEqual(rx_ip.hlim, ip_ttl) except: raise def test_swap(self): """ MPLS label swap tests """ # # A simple MPLS xconnect - eos label in label out # route_32_eos = VppMplsRoute(self, 32, 1, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(33)])]) route_32_eos.add_vpp_config() self.assertTrue( find_mpls_route(self, 0, 32, 1, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(33)])])) # # a stream that matches the route for 10.0.0.1 # PG0 is in the default table # tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(32, ttl=32, exp=1)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(33, ttl=31, exp=1)]) self.assertEqual(route_32_eos.get_stats_to()['packets'], 257) # # A simple MPLS xconnect - non-eos label in label out # route_32_neos = VppMplsRoute(self, 32, 0, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(33)])]) route_32_neos.add_vpp_config() # # a stream that matches the route for 10.0.0.1 # PG0 is in the default table # tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(32, ttl=21, exp=7), VppMplsLabel(99)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(33, ttl=20, exp=7), VppMplsLabel(99)]) self.assertEqual(route_32_neos.get_stats_to()['packets'], 257) # # A simple MPLS xconnect - non-eos label in label out, uniform mode # route_42_neos = VppMplsRoute( self, 42, 0, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(43, MplsLspMode.UNIFORM)])]) route_42_neos.add_vpp_config() tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(42, ttl=21, exp=7), VppMplsLabel(99)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(43, ttl=20, exp=7), VppMplsLabel(99)]) # # An MPLS xconnect - EOS label in IP out # route_33_eos = VppMplsRoute(self, 33, 1, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[])]) route_33_eos.add_vpp_config() tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(33)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip4(self.pg0, rx, tx) # # disposed packets have an invalid IPv4 checksum # tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(33)], dst_ip=self.pg0.remote_ip4, n=65, chksum=1) self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum") # # An MPLS xconnect - EOS label in IP out, uniform mode # route_3333_eos = VppMplsRoute( self, 3333, 1, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(3, MplsLspMode.UNIFORM)])]) route_3333_eos.add_vpp_config() tx = self.create_stream_labelled_ip4( self.pg0, [VppMplsLabel(3333, ttl=55, exp=3)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip4(self.pg0, rx, tx, ip_ttl=54, ip_dscp=0x60) tx = self.create_stream_labelled_ip4( self.pg0, [VppMplsLabel(3333, ttl=66, exp=4)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip4(self.pg0, rx, tx, ip_ttl=65, ip_dscp=0x80) # # An MPLS xconnect - EOS label in IPv6 out # route_333_eos = VppMplsRoute( self, 333, 1, [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index, labels=[])], eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP6) route_333_eos.add_vpp_config() tx = self.create_stream_labelled_ip6(self.pg0, [VppMplsLabel(333)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip6(self.pg0, rx, tx) # # disposed packets have an TTL expired # tx = self.create_stream_labelled_ip6(self.pg0, [VppMplsLabel(333, ttl=64)], dst_ip=self.pg1.remote_ip6, hlim=1) rx = self.send_and_expect_some(self.pg0, tx, self.pg0) self.verify_capture_ip6_icmp(self.pg0, rx, tx) # # An MPLS xconnect - EOS label in IPv6 out w imp-null # route_334_eos = VppMplsRoute( self, 334, 1, [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index, labels=[VppMplsLabel(3)])], eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP6) route_334_eos.add_vpp_config() tx = self.create_stream_labelled_ip6(self.pg0, [VppMplsLabel(334, ttl=64)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip6(self.pg0, rx, tx) # # An MPLS xconnect - EOS label in IPv6 out w imp-null in uniform mode # route_335_eos = VppMplsRoute( self, 335, 1, [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index, labels=[VppMplsLabel(3, MplsLspMode.UNIFORM)])], eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP6) route_335_eos.add_vpp_config() tx = self.create_stream_labelled_ip6( self.pg0, [VppMplsLabel(335, ttl=27, exp=4)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_ip6(self.pg0, rx, tx, ip_hlim=26, ip_dscp=0x80) # # disposed packets have an TTL expired # tx = self.create_stream_labelled_ip6(self.pg0, [VppMplsLabel(334)], dst_ip=self.pg1.remote_ip6, hlim=0) rx = self.send_and_expect_some(self.pg0, tx, self.pg0) self.verify_capture_ip6_icmp(self.pg0, rx, tx) # # An MPLS xconnect - non-EOS label in IP out - an invalid configuration # so this traffic should be dropped. # route_33_neos = VppMplsRoute(self, 33, 0, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[])]) route_33_neos.add_vpp_config() tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(33), VppMplsLabel(99)]) self.send_and_assert_no_replies( self.pg0, tx, "MPLS non-EOS packets popped and forwarded") # # A recursive EOS x-connect, which resolves through another x-connect # in pipe mode # route_34_eos = VppMplsRoute(self, 34, 1, [VppRoutePath("0.0.0.0", 0xffffffff, nh_via_label=32, labels=[VppMplsLabel(44), VppMplsLabel(45)])]) route_34_eos.add_vpp_config() self.logger.info(self.vapi.cli("sh mpls fib 34")) tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(34, ttl=3)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(33), VppMplsLabel(44), VppMplsLabel(45, ttl=2)]) self.assertEqual(route_34_eos.get_stats_to()['packets'], 257) self.assertEqual(route_32_neos.get_stats_via()['packets'], 257) # # A recursive EOS x-connect, which resolves through another x-connect # in uniform mode # route_35_eos = VppMplsRoute( self, 35, 1, [VppRoutePath("0.0.0.0", 0xffffffff, nh_via_label=42, labels=[VppMplsLabel(44)])]) route_35_eos.add_vpp_config() tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(35, ttl=3)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(43, ttl=2), VppMplsLabel(44, ttl=2)]) # # A recursive non-EOS x-connect, which resolves through another # x-connect # route_34_neos = VppMplsRoute(self, 34, 0, [VppRoutePath("0.0.0.0", 0xffffffff, nh_via_label=32, labels=[VppMplsLabel(44), VppMplsLabel(46)])]) route_34_neos.add_vpp_config() tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(34, ttl=45), VppMplsLabel(99)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) # it's the 2nd (counting from 0) label in the stack that is swapped self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(33), VppMplsLabel(44), VppMplsLabel(46, ttl=44), VppMplsLabel(99)]) # # an recursive IP route that resolves through the recursive non-eos # x-connect # ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32, [VppRoutePath("0.0.0.0", 0xffffffff, nh_via_label=34, labels=[VppMplsLabel(55)])]) ip_10_0_0_1.add_vpp_config() tx = self.create_stream_ip4(self.pg0, "10.0.0.1") rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(33), VppMplsLabel(44), VppMplsLabel(46), VppMplsLabel(55)]) self.assertEqual(ip_10_0_0_1.get_stats_to()['packets'], 257) ip_10_0_0_1.remove_vpp_config() route_34_neos.remove_vpp_config() route_34_eos.remove_vpp_config() route_33_neos.remove_vpp_config() route_33_eos.remove_vpp_config() route_32_neos.remove_vpp_config() route_32_eos.remove_vpp_config() def test_bind(self): """ MPLS Local Label Binding test """ # # Add a non-recursive route with a single out label # route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(45)])]) route_10_0_0_1.add_vpp_config() # bind a local label to the route binding = VppMplsIpBind(self, 44, "10.0.0.1", 32) binding.add_vpp_config() # non-EOS stream tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(44), VppMplsLabel(99)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(45, ttl=63), VppMplsLabel(99)]) # EOS stream tx = self.create_stream_labelled_ip4(self.pg0, [VppMplsLabel(44)]) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled(self.pg0, rx, tx, [VppMplsLabel(45, ttl=63)]) # IP stream tx = self.create_stream_ip4(self.pg0, "10.0.0.1") rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(45)]) # # cleanup # binding.remove_vpp_config() route_10_0_0_1.remove_vpp_config() def test_imposition(self): """ MPLS label imposition test """ # # Add a non-recursive route with a single out label # route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)])]) route_10_0_0_1.add_vpp_config() # # a stream that matches the route for 10.0.0.1 # PG0 is in the default table # tx = self.create_stream_ip4(self.pg0, "10.0.0.1") rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)]) # # Add a non-recursive route with a 3 out labels # route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)])]) route_10_0_0_2.add_vpp_config() tx = self.create_stream_ip4(self.pg0, "10.0.0.2", ip_ttl=44, ip_dscp=0xff) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)], ip_ttl=43) # # Add a non-recursive route with a single out label in uniform mode # route_10_0_0_3 = VppIpRoute( self, "10.0.0.3", 32, [VppRoutePath(self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32, mode=MplsLspMode.UNIFORM)])]) route_10_0_0_3.add_vpp_config() tx = self.create_stream_ip4(self.pg0, "10.0.0.3", ip_ttl=54, ip_dscp=0xbe) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32, ttl=53, exp=5)]) # # Add a IPv6 non-recursive route with a single out label in # uniform mode # route_2001_3 = VppIpRoute( self, "2001::3", 128, [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index, labels=[VppMplsLabel(32, mode=MplsLspMode.UNIFORM)])]) route_2001_3.add_vpp_config() tx = self.create_stream_ip6(self.pg0, "2001::3", ip_ttl=54, ip_dscp=0xbe) rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip6(self.pg0, rx, tx, [VppMplsLabel(32, ttl=53, exp=5)]) # # add a recursive path, with output label, via the 1 label route # route_11_0_0_1 = VppIpRoute(self, "11.0.0.1", 32, [VppRoutePath("10.0.0.1", 0xffffffff, labels=[VppMplsLabel(44)])]) route_11_0_0_1.add_vpp_config() # # a stream that matches the route for 11.0.0.1, should pick up # the label stack for 11.0.0.1 and 10.0.0.1 # tx = self.create_stream_ip4(self.pg0, "11.0.0.1") rx = self.send_and_expect(self.pg0, tx, self.pg0) self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32),
#!/usr/bin/env python3
import sys
import shutil
import os
import fnmatch
import unittest
import argparse
import time
import threading
import signal
import psutil
import re
import multiprocessing
from multiprocessing import Process, Pipe, cpu_count
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
import framework
from framework import VppTestRunner, running_extended_tests, VppTestCase, \
get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
TEST_RUN
from debug import spawn_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
colorize, single_line_delim
from discover_tests import discover_tests
from subprocess import check_output, CalledProcessError
from util import check_core_path, get_core_path, is_core_present
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
# that child process is stuck (e.g. waiting for shm mutex, which will never
# get unlocked) and kill the child
core_timeout = 3
min_req_shm = 536870912 # min 512MB shm required
# 128MB per extra process
shm_per_process = 134217728
class StreamQueue(Queue):
def write(self, msg):
self.put(msg)
def flush(self):
sys.__stdout__.flush()
sys.__stderr__.flush()
def fileno(self):
return self._writer.fileno()
class StreamQueueManager(BaseManager):
pass
StreamQueueManager.register('StreamQueue', StreamQueue)
class TestResult(dict):
def __init__(self, testcase_suite, testcases_by_id=None):
super(TestResult, self).__init__()
self[PASS] = []
self[FAIL] = []
self[ERROR] = []
self[SKIP] = []
self[TEST_RUN] = []
self.crashed = False
self.testcase_suite = testcase_suite
self.testcases = [testcase for testcase in testcase_suite]
self.testcases_by_id = testcases_by_id
def was_successful(self):
return 0 == len(self[FAIL]) == len(self[ERROR]) \
and len(self[PASS] + self[SKIP]) \
== self.testcase_suite.countTestCases() == len(self[TEST_RUN])
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
def process_result(self, test_id, result):
self[result].append(test_id)
def suite_from_failed(self):
rerun_ids = set([])
for testcase in self.testcase_suite:
tc_id = testcase.id()
if tc_id not in self[PASS] and tc_id not in self[SKIP]:
rerun_ids.add(tc_id)
if rerun_ids:
return suite_from_failed(self.testcase_suite, rerun_ids)
def get_testcase_names(self, test_id):
# could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
setup_teardown_match = re.match(
r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
if setup_teardown_match:
test_name, _, _, testcase_name = setup_teardown_match.groups()
if len(testcase_name.split('.')) == 2:
for key in self.testcases_by_id.keys():
if key.startswith(testcase_name):
testcase_name = key
break
testcase_name = self._get_testcase_doc_name(testcase_name)
else:
test_name = self._get_test_description(test_id)
testcase_name = self._get_testcase_doc_name(test_id)
return testcase_name, test_name
def _get_test_description(self, test_id):
if test_id in self.testcases_by_id:
desc = get_test_description(descriptions,
self.testcases_by_id[test_id])
else:
desc = test_id
return desc
def _get_testcase_doc_name(self, test_id):
if test_id in self.testcases_by_id:
doc_name = get_testcase_doc_name(self.testcases_by_id[test_id])
else:
doc_name = test_id
return doc_name
def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
finished_pipe, result_pipe, logger):
sys.stdout = stdouterr_queue
sys.stderr = stdouterr_queue
VppTestCase.parallel_handler = logger.handlers[0]
result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
descriptions=descriptions,
verbosity=verbose,
result_pipe=result_pipe,
failfast=failfast,
print_summary=False).run(suite)
finished_pipe.send(result.wasSuccessful())
finished_pipe.close()
keep_alive_pipe.close()
class TestCaseWrapper(object):
def __init__(self, testcase_suite, manager):
self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
duplex=False)
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
if sys.version[0] == '2':
self.stdouterr_queue = manager.StreamQueue()
else:
from multiprocessing import get_context
self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
self.keep_alive_child_end,
self.stdouterr_queue,
self.finished_child_end,
self.result_child_end,
self.logger)
)
self.child.start()
self.last_test_temp_dir = None
self.last_test_vpp_binary = None
self._last_test = None
self.last_test_id = None
self.vpp_pid = None
self.last_heard = time.time()
self.core_detected_at = None
self.testcases_by_id = {}
self.testclasess_with_core = {}
for testcase in self.testcase_suite:
self.testcases_by_id[testcase.id()] = testcase
self.result = TestResult(testcase_suite, self.testcases_by_id)
@property
def last_test(self):
return self._last_test
@last_test.setter
def last_test(self, test_id):
self.last_test_id = test_id
if test_id in self.testcases_by_id:
testcase = self.testcases_by_id[test_id]
self._last_test = testcase.shortDescription()
if not self._last_test:
self._last_test = str(testcase)
else:
self._last_test = test_id
def add_testclass_with_core(self):
if self.last_test_id in self.testcases_by_id:
test = self.testcases_by_id[self.last_test_id]
class_name = unittest.util.strclass(test.__class__)
test_name = "'{}' ({})".format(get_test_description(descriptions,
test),
self.last_test_id)
else:
test_name = self.last_test_id
class_name = re.match(r'((tearDownClass)|(setUpClass)) '
r'\((.+\..+)\)', test_name).groups()[3]
if class_name not in self.testclasess_with_core:
self.testclasess_with_core[class_name] = (
test_name,
self.last_test_vpp_binary,
self.last_test_temp_dir)
def close_pipes(self):
self.keep_alive_child_end.close()
self.finished_child_end.close()
self.result_child_end.close()
self.keep_alive_parent_end.close()
self.finished_parent_end.close()
self.result_parent_end.close()
def was_successful(self):
return self.result.was_successful()
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
read_testcase = None
while read_testcases.is_set() or unread_testcases:
if finished_unread_testcases:
read_testcase = finished_unread_testcases.pop()
unread_testcases.remove(read_testcase)
elif unread_testcases:
read_testcase = unread_testcases.pop()
if read_testcase:
data = ''
while data is not None:
sys.stdout.write(data)
data = read_testcase.stdouterr_queue.get()
read_testcase.stdouterr_queue.close()
finished_unread_testcases.discard(read_testcase)
read_testcase = None
def handle_failed_suite(logger, last_test_temp_dir, vpp_pid):
if last_test_temp_dir:
# Need to create link in case of a timeout or core dump without failure
lttd = os.path.basename(last_test_temp_dir)
failed_dir = os.getenv('FAILED_DIR')
link_path = '%s%s-FAILED' % (failed_dir, lttd)
if not os.path.exists(link_path):
os.symlink(last_test_temp_dir, link_path)
logger.error("Symlink to failed testcase directory: %s -> %s"
% (link_path, lttd))
# Report core existence
core_path = get_core_path(last_test_temp_dir)
if os.path.exists(core_path):
logger.error(
"Core-file exists in test temporary directory: %s!" %
core_path)
check_core_path(logger, core_path)
logger.debug("Running 'file %s':" % core_path)
try:
info = check_output(["file", core_path])
logger.debug(info)
except CalledProcessError as e:
logger.error("Subprocess returned with return code "
"while running `file' utility on core-file "
"returned: "
"rc=%s", e.returncode)
except OSError as e:
logger.error("Subprocess returned with OS error while "
"running 'file' utility "
"on core-file: "
"(%s) %s", e.errno, e.strerror)
except Exception as e:
logger.exception("Unexpected error running `file' utility "
"on core-file")
if vpp_pid:
# Copy api post mortem
api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
if os.path.isfile(api_post_mortem_path):
logger.error("Copying api_post_mortem.%d to %s" %
(vpp_pid, last_test_temp_dir))
shutil.copy2(api_post_mortem_path, last_test_temp_dir)
def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
if is_core_present(tempdir):
if debug_core:
print('VPP core detected in %s. Last test running was %s' %
(tempdir, core_crash_test))
print(single_line_delim)
spawn_gdb(vpp_binary, get_core_path(tempdir))
print(single_line_delim)
elif compress_core:
print("Compressing core-file in test directory `%s'" % tempdir)
os.system("gzip %s" % get_core_path(tempdir))
def handle_cores(failed_testcases):
for failed_testcase in failed_testcases:
tcs_with_core = failed_testcase.testclasess_with_core
if tcs_with_core:
for test, vpp_binary, tempdir in tcs_with_core.values():
check_and_handle_core(vpp_binary, tempdir, test)
def process_finished_testsuite(wrapped_testcase_suite,
finished_testcase_suites,
failed_wrapped_testcases,
results):
results.append(wrapped_testcase_suite.result)
finished_testcase_suites.add(wrapped_testcase_suite)
stop_run = False
if failfast and not wrapped_testcase_suite.was_successful():
stop_run = True
if not wrapped_testcase_suite.was_successful():
failed_wrapped_testcases.add(wrapped_testcase_suite)
handle_failed_suite(wrapped_testcase_suite.logger,
wrapped_testcase_suite.last_test_temp_dir,
wrapped_testcase_suite.vpp_pid)
return stop_run
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
# suites are unhashable, need to use list
results = []
unread_testcases = set()
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
for i in range(concurrent_tests):
if