diff options
Diffstat (limited to 'test/test_srv6_as.py')
-rwxr-xr-x[-rw-r--r--] | test/test_srv6_as.py | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/test/test_srv6_as.py b/test/test_srv6_as.py index 0e94bd97ce6..0fa838edd10 100644..100755 --- a/test/test_srv6_as.py +++ b/test/test_srv6_as.py @@ -162,6 +162,93 @@ class TestSRv6(VppTestCase): test_sid_index=0, rewrite_src_addr='a1::') + def test_SRv6_End_AS_L2_noSRH(self): + """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite. + """ + self.run_SRv6_End_AS_L2( + sid_list=['a1::', 'a2::a6', 'a3::'], + test_sid_index=1, + rewrite_src_addr='a2::') + + def test_SRv6_End_AS_L2_SRH(self): + """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite. + """ + self.run_SRv6_End_AS_L2( + sid_list=['a1::a6', 'a2::', 'a3::'], + test_sid_index=0, + rewrite_src_addr='a1::') + + def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr): + """ Run SRv6 End.AS test with L2 traffic. + """ + self.rewrite_src_addr = rewrite_src_addr + self.rewrite_sid_list = sid_list[test_sid_index + 1::] + + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, False]) + + # configure route to next segment + route = VppIpRoute(self, sid_list[test_sid_index + 1], 128, + [VppRoutePath(self.pg0.remote_ip6, + self.pg0.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID behavior + cli_str = "sr localsid address " + sid_list[test_sid_index] \ + + " behavior end.as" \ + + " oif " + self.pg1.name \ + + " iif " + self.pg1.name \ + + " src " + self.rewrite_src_addr + for s in self.rewrite_sid_list: + cli_str += " next " + s + self.vapi.cli(cli_str) + + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + + # prepare L2 in SRv6 headers + packet_header1 = self.create_packet_header_IPv6_SRH_L2( + sidlist=sid_list[::-1], + segleft=len(sid_list) - test_sid_index - 1, + vlan=0) + + # generate packets (pg0->pg1) + pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts1, self.pg1, + self.compare_rx_tx_packet_End_AS_L2_out) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # prepare L2 header for returning packets + packet_header2 = self.create_packet_header_L2() + + # generate returning packets (pg1->pg0) + pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg1, pkts2, self.pg0, + self.compare_rx_tx_packet_End_AS_L2_in) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + self.vapi.cli("sr localsid del address " + sid_list[test_sid_index]) + + # cleanup interfaces + self.teardown_interfaces() + def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr): """ Run SRv6 End.AS test with IPv6 traffic. """ @@ -407,6 +494,53 @@ class TestSRv6(VppTestCase): self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AS + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + + tx_ether = tx_pkt.getlayer(Ether) + + # expected segment-list (SRH order) + tx_seglist = self.rewrite_sid_list[::-1] + + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, self.rewrite_src_addr) + # received ip.dst should be equal to expected sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + + if len(tx_seglist) > 1: + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + # rx'ed seglist should be equal to seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size seglist-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + # nh should be "No Next Header" (59) + self.assertEqual(rx_srh.nh, 59) + # get payload + payload = rx_srh.payload + else: + # rx'ed packet should NOT have SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get payload + payload = rx_ip.payload + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + self.assertEqual(Ether(str(payload)), tx_ether) + + self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt): """ Compare input and output packet after passing End.AS with IPv6 @@ -462,6 +596,29 @@ class TestSRv6(VppTestCase): self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AS with L2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get IPv4 header of rx'ed packet + rx_eth = rx_pkt.getlayer(Ether) + + tx_ip = tx_pkt.getlayer(IPv6) + # we can't just get the 2nd Ether layer + # get the Raw content and dissect it as Ether + tx_eth1 = Ether(str(tx_pkt[Raw])) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_eth pkt should be equal to tx_eth1 + self.assertEqual(rx_eth, tx_eth1) + + self.logger.debug("packet verification: SUCCESS") + def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count): """Create SRv6 input packet stream for defined interface. @@ -604,6 +761,48 @@ class TestSRv6(VppTestCase): UDP(sport=1234, dport=1234)) return p + def create_packet_header_L2(self, vlan=0): + """Create packet header: L2 header + + :param vlan: if vlan!=0 then add 802.1q header + """ + # Note: the dst addr ('00:55:44:33:22:11') is used in + # the compare function compare_rx_tx_packet_T_Encaps_L2 + # to detect presence of L2 in SRH payload + p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + p /= Dot1Q(vlan=vlan, type=etype) + else: + p.type = etype + return p + + def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0): + """Create packet header: L2 encapsulated in SRv6: + IPv6 header with SRH, L2 + + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source address is 1234::1 + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=59) / + eth) + return p + def get_payload_info(self, packet): """ Extract the payload_info from the packet """ |