diff options
-rwxr-xr-x | src/vnet/srv6/sr_localsid.c | 225 | ||||
-rwxr-xr-x | src/vnet/srv6/sr_policy_rewrite.c | 6 | ||||
-rwxr-xr-x | src/vnet/srv6/sr_steering.c | 2 | ||||
-rw-r--r-- | test/patches/scapy-2.3.3/inet6.py.patch | 185 | ||||
-rw-r--r-- | test/test_srv6.py | 1997 | ||||
-rw-r--r-- | test/vpp_papi_provider.py | 118 | ||||
-rw-r--r-- | test/vpp_srv6.py | 238 |
7 files changed, 2739 insertions, 32 deletions
diff --git a/src/vnet/srv6/sr_localsid.c b/src/vnet/srv6/sr_localsid.c index adeb5c03..1be68334 100755 --- a/src/vnet/srv6/sr_localsid.c +++ b/src/vnet/srv6/sr_localsid.c @@ -587,12 +587,11 @@ VLIB_CLI_COMMAND (clear_sr_localsid_counters_command, static) = { */ typedef struct { - u32 localsid_index; - ip6_address_t src, out_dst; + ip6_address_t localsid; + u16 behavior; u8 sr[256]; u8 num_segments; u8 segments_left; - //With SRv6 header update include flags here. } sr_localsid_trace_t; #define foreach_sr_localsid_error \ @@ -643,16 +642,12 @@ format_sr_localsid_trace (u8 * s, va_list * args) { CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); - ip6_sr_main_t *sm = &sr_main; sr_localsid_trace_t *t = va_arg (*args, sr_localsid_trace_t *); - ip6_sr_localsid_t *ls = - pool_elt_at_index (sm->localsids, t->localsid_index); - s = format (s, "SR-LOCALSID:\n\tLocalsid: %U\n", format_ip6_address, - &ls->localsid); - switch (ls->behavior) + &t->localsid); + switch (t->behavior) { case SR_BEHAVIOR_END: s = format (s, "\tBehavior: End\n"); @@ -686,7 +681,7 @@ format_sr_localsid_trace (u8 * s, va_list * args) { if (t->num_segments > 0) { - s = format (s, "\tSegments left: %d\n", t->num_segments); + s = format (s, "\tSegments left: %d\n", t->segments_left); s = format (s, "\tSID list: [in ietf order]"); int i = 0; for (i = 0; i < t->num_segments; i++) @@ -882,7 +877,7 @@ end_decaps_srh_processing (vlib_node_runtime_t * node, } /** - * @brief SR LocalSID graph node. Supports all default SR Endpoint variants + * @brief SR LocalSID graph node. Supports all default SR Endpoint variants with decaps */ static uword sr_localsid_d_fn (vlib_main_t * vm, vlib_node_runtime_t * node, @@ -975,7 +970,97 @@ sr_localsid_d_fn (vlib_main_t * vm, vlib_node_runtime_t * node, end_decaps_srh_processing (node, b2, ip2, sr2, ls2, &next2); end_decaps_srh_processing (node, b3, ip3, sr3, ls3, &next3); - //TODO: trace. + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b0, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls0->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls0->behavior; + if (ip0 == vlib_buffer_get_current (b0)) + { + if (ip0->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr0->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr0->segments, sr0->length * 8); + tr->num_segments = + sr0->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr0->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b1, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls1->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls1->behavior; + if (ip1 == vlib_buffer_get_current (b1)) + { + if (ip1->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr1->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr1->segments, sr1->length * 8); + tr->num_segments = + sr1->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr1->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b2->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b2, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls2->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls2->behavior; + if (ip2 == vlib_buffer_get_current (b2)) + { + if (ip2->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr2->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr2->segments, sr2->length * 8); + tr->num_segments = + sr2->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr2->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b3->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b3, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls3->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls3->behavior; + if (ip3 == vlib_buffer_get_current (b3)) + { + if (ip3->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr3->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr3->segments, sr3->length * 8); + tr->num_segments = + sr3->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr3->segments_left; + } + } + else + tr->num_segments = 0xFF; + } vlib_increment_combined_counter (((next0 == @@ -1043,14 +1128,11 @@ sr_localsid_d_fn (vlib_main_t * vm, vlib_node_runtime_t * node, sr_localsid_trace_t *tr = vlib_add_trace (vm, node, b0, sizeof (*tr)); tr->num_segments = 0; - tr->localsid_index = ls0 - sm->localsids; - + clib_memcpy (tr->localsid.as_u8, ls0->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls0->behavior; if (ip0 == vlib_buffer_get_current (b0)) { - clib_memcpy (tr->src.as_u8, ip0->src_address.as_u8, - sizeof (tr->src.as_u8)); - clib_memcpy (tr->out_dst.as_u8, ip0->dst_address.as_u8, - sizeof (tr->out_dst.as_u8)); if (ip0->protocol == IP_PROTOCOL_IPV6_ROUTE && sr0->type == ROUTING_HEADER_TYPE_SR) { @@ -1098,7 +1180,7 @@ VLIB_REGISTER_NODE (sr_localsid_d_node) = { /* *INDENT-ON* */ /** - * @brief SR LocalSID graph node. Supports all default SR Endpoint variants + * @brief SR LocalSID graph node. Supports all default SR Endpoint without decaps */ static uword sr_localsid_fn (vlib_main_t * vm, vlib_node_runtime_t * node, @@ -1195,7 +1277,97 @@ sr_localsid_fn (vlib_main_t * vm, vlib_node_runtime_t * node, end_srh_processing (node, b3, ip3, sr3, ls3, &next3, ls3->end_psp, prev3); - //TODO: proper trace. + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b0, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls0->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls0->behavior; + if (ip0 == vlib_buffer_get_current (b0)) + { + if (ip0->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr0->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr0->segments, sr0->length * 8); + tr->num_segments = + sr0->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr0->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b1->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b1, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls1->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls1->behavior; + if (ip1 == vlib_buffer_get_current (b1)) + { + if (ip1->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr1->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr1->segments, sr1->length * 8); + tr->num_segments = + sr1->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr1->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b2->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b2, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls2->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls2->behavior; + if (ip2 == vlib_buffer_get_current (b2)) + { + if (ip2->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr2->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr2->segments, sr2->length * 8); + tr->num_segments = + sr2->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr2->segments_left; + } + } + else + tr->num_segments = 0xFF; + } + + if (PREDICT_FALSE (b3->flags & VLIB_BUFFER_IS_TRACED)) + { + sr_localsid_trace_t *tr = + vlib_add_trace (vm, node, b3, sizeof (*tr)); + tr->num_segments = 0; + clib_memcpy (tr->localsid.as_u8, ls3->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls3->behavior; + if (ip3 == vlib_buffer_get_current (b3)) + { + if (ip3->protocol == IP_PROTOCOL_IPV6_ROUTE + && sr3->type == ROUTING_HEADER_TYPE_SR) + { + clib_memcpy (tr->sr, sr3->segments, sr3->length * 8); + tr->num_segments = + sr3->length * 8 / sizeof (ip6_address_t); + tr->segments_left = sr3->segments_left; + } + } + else + tr->num_segments = 0xFF; + } vlib_increment_combined_counter (((next0 == @@ -1262,14 +1434,11 @@ sr_localsid_fn (vlib_main_t * vm, vlib_node_runtime_t * node, sr_localsid_trace_t *tr = vlib_add_trace (vm, node, b0, sizeof (*tr)); tr->num_segments = 0; - tr->localsid_index = ls0 - sm->localsids; - + clib_memcpy (tr->localsid.as_u8, ls0->localsid.as_u8, + sizeof (tr->localsid.as_u8)); + tr->behavior = ls0->behavior; if (ip0 == vlib_buffer_get_current (b0)) { - clib_memcpy (tr->src.as_u8, ip0->src_address.as_u8, - sizeof (tr->src.as_u8)); - clib_memcpy (tr->out_dst.as_u8, ip0->dst_address.as_u8, - sizeof (tr->out_dst.as_u8)); if (ip0->protocol == IP_PROTOCOL_IPV6_ROUTE && sr0->type == ROUTING_HEADER_TYPE_SR) { @@ -1280,9 +1449,7 @@ sr_localsid_fn (vlib_main_t * vm, vlib_node_runtime_t * node, } } else - { - tr->num_segments = 0xFF; - } + tr->num_segments = 0xFF; } vlib_increment_combined_counter diff --git a/src/vnet/srv6/sr_policy_rewrite.c b/src/vnet/srv6/sr_policy_rewrite.c index 7a37a66b..f427bbf3 100755 --- a/src/vnet/srv6/sr_policy_rewrite.c +++ b/src/vnet/srv6/sr_policy_rewrite.c @@ -672,7 +672,8 @@ sr_policy_del (ip6_address_t * bsid, u32 index) segment_list = pool_elt_at_index (sm->sid_lists, *sl_index); vec_free (segment_list->segments); vec_free (segment_list->rewrite); - vec_free (segment_list->rewrite_bsid); + if (!sr_policy->is_encap) + vec_free (segment_list->rewrite_bsid); pool_put_index (sm->sid_lists, *sl_index); } @@ -766,7 +767,8 @@ sr_policy_mod (ip6_address_t * bsid, u32 index, u32 fib_table, segment_list = pool_elt_at_index (sm->sid_lists, sl_index); vec_free (segment_list->segments); vec_free (segment_list->rewrite); - vec_free (segment_list->rewrite_bsid); + if (!sr_policy->is_encap) + vec_free (segment_list->rewrite_bsid); pool_put_index (sm->sid_lists, sl_index); vec_del1 (sr_policy->segments_lists, sl_index_iterate - sr_policy->segments_lists); diff --git a/src/vnet/srv6/sr_steering.c b/src/vnet/srv6/sr_steering.c index 704adaa7..57fe21f6 100755 --- a/src/vnet/srv6/sr_steering.c +++ b/src/vnet/srv6/sr_steering.c @@ -165,7 +165,7 @@ sr_steering_policy (int is_del, ip6_address_t * bsid, u32 sr_policy_index, sm->fib_table_ip4 = (u32) ~ 0; } - return 1; + return 0; } else /* It means user requested to update an existing SR steering policy */ { diff --git a/test/patches/scapy-2.3.3/inet6.py.patch b/test/patches/scapy-2.3.3/inet6.py.patch new file mode 100644 index 00000000..f98e7091 --- /dev/null +++ b/test/patches/scapy-2.3.3/inet6.py.patch @@ -0,0 +1,185 @@ +diff --git a/scapy/layers/inet6.py b/scapy/layers/inet6.py +--- a/scapy/layers/inet6.py 2017-06-01 14:04:18.160881034 +0200 ++++ b/scapy/layers/inet6.py 2017-06-02 09:08:40.133800208 +0200 +@@ -369,6 +369,8 @@ + return Raw + elif self.nh == 135 and len(p) > 3: # Mobile IPv6 + return _mip6_mhtype2cls.get(ord(p[2]), MIP6MH_Generic) ++ elif self.nh == 43 and ord(p[2]) == 4: # Segment Routing header ++ return IPv6ExtHdrSegmentRouting + else: + return get_cls(ipv6nhcls.get(self.nh,"Raw"), "Raw") + +@@ -430,6 +432,14 @@ + sd = strxor(sd, a) + sd = inet_ntop(socket.AF_INET6, sd) + ++ if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): ++ # With segment routing header (rh == 4), the destination is ++ # the first address of the IPv6 addresses list ++ try: ++ sd = self.addresses[0] ++ except IndexError: ++ sd = self.dst ++ + if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): + nh = self.payload.nh + +@@ -489,6 +499,8 @@ + return self.payload.answers(other.payload.payload) + elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): + return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting ++ elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): ++ return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting + elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): + return self.payload.payload.answers(other.payload.payload) + elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance +@@ -919,6 +931,148 @@ + pkt = pkt[:3]+struct.pack("B", len(self.addresses))+pkt[4:] + return _IPv6ExtHdr.post_build(self, pkt, pay) + ++######################### Segment Routing Header ############################ ++ ++# This implementation is based on draft 06, available at: ++# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 ++ ++class IPv6ExtHdrSegmentRoutingTLV(Packet): ++ name = "IPv6 Option Header Segment Routing - Generic TLV" ++ fields_desc = [ ByteField("type", 0), ++ ByteField("len", 0), ++ ByteField("reserved", 0), ++ ByteField("flags", 0), ++ StrLenField("value", "", length_from=lambda pkt: pkt.len) ] ++ ++ def extract_padding(self, p): ++ return "",p ++ ++ registered_sr_tlv = {} ++ @classmethod ++ def register_variant(cls): ++ cls.registered_sr_tlv[cls.type.default] = cls ++ ++ @classmethod ++ def dispatch_hook(cls, pkt=None, *args, **kargs): ++ if pkt: ++ tmp_type = ord(pkt[0]) ++ return cls.registered_sr_tlv.get(tmp_type, cls) ++ return cls ++ ++ ++class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): ++ name = "IPv6 Option Header Segment Routing - Ingress Node TLV" ++ fields_desc = [ ByteField("type", 1), ++ ByteField("len", 18), ++ ByteField("reserved", 0), ++ ByteField("flags", 0), ++ IP6Field("ingress_node", "::1") ] ++ ++ ++class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): ++ name = "IPv6 Option Header Segment Routing - Egress Node TLV" ++ fields_desc = [ ByteField("type", 2), ++ ByteField("len", 18), ++ ByteField("reserved", 0), ++ ByteField("flags", 0), ++ IP6Field("egress_node", "::1") ] ++ ++ ++class IPv6ExtHdrSegmentRoutingTLVPadding(IPv6ExtHdrSegmentRoutingTLV): ++ name = "IPv6 Option Header Segment Routing - Padding TLV" ++ fields_desc = [ ByteField("type", 4), ++ FieldLenField("len", None, length_of="padding", fmt="B"), ++ StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len) ] ++ ++ ++class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): ++ # 0 1 2 3 ++ # 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #| Next Header | Hdr Ext Len | Routing Type | Segments Left | ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #| Last Entry | Flags | Tag | ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #| | ++ #| Segment List[0] (128 bits IPv6 address) | ++ #| | ++ #| | ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #| | ++ #| | ++ # ... ++ #| | ++ #| | ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #| | ++ #| Segment List[n] (128 bits IPv6 address) | ++ #| | ++ #| | ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ #// // ++ #// Optional Type Length Value objects (variable) // ++ #// // ++ #+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ++ # ++ # 0 1 2 3 4 5 6 7 ++ # +-+-+-+-+-+-+-+-+ ++ # |U|P|O|A|H| U | ++ # +-+-+-+-+-+-+-+-+ ++ ++ name = "IPv6 Segment Routing Extension Header" ++ fields_desc = [ ByteEnumField("nh", 59, ipv6nh), ++ ByteField("len", None), ++ ByteField("type", 4), ++ ByteField("segleft", None), ++ ByteField("lastentry", None), ++ BitField("unused1", 0, 1), ++ BitField("protected", 0, 1), ++ BitField("oam", 0, 1), ++ BitField("alert", 0, 1), ++ BitField("hmac", 0, 1), ++ BitField("unused2", 0, 3), ++ ShortField("tag", 0), ++ IP6ListField("addresses", ["::1"], ++ count_from=lambda pkt: pkt.lastentry+1), ++ PacketListField("tlv_objects", [], IPv6ExtHdrSegmentRoutingTLV, ++ length_from=lambda pkt: 8*pkt.len - 16*(pkt.lastentry+1)) ] ++ ++ overload_fields = { IPv6: { "nh": 43 } } ++ ++ def post_build(self, pkt, pay): ++ ++ if self.len is None: ++ ++ # The extension must be align on 8 bytes ++ tmp_mod = (len(pkt) - 8) % 8 ++ if tmp_mod == 1: ++ warning("IPv6ExtHdrSegmentRouting(): can't pad 1 byte !") ++ elif tmp_mod >= 2: ++ #Add the padding extension ++ tmp_pad = b"\x00" * (tmp_mod-2) ++ tlv = IPv6ExtHdrSegmentRoutingTLVPadding(padding=tmp_pad) ++ pkt += str(tlv) ++ ++ tmp_len = (len(pkt) - 8) / 8 ++ pkt = pkt[:1] + struct.pack("B", tmp_len)+ pkt[2:] ++ ++ if self.segleft is None: ++ tmp_len = len(self.addresses) ++ if tmp_len: ++ tmp_len -= 1 ++ pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] ++ ++ if self.lastentry is None: ++ #km - changed to contain n-1 ++ tmp_len = len(self.addresses) ++ if tmp_len: ++ tmp_len -= 1 ++ #pkt = pkt[:4] + struct.pack("B", len(self.addresses)) + pkt[5:] ++ pkt = pkt[:4] + struct.pack("B", tmp_len) + pkt[5:] ++ ++ return _IPv6ExtHdr.post_build(self, pkt, pay) ++ ++ + ########################### Fragmentation Header ############################ + + class IPv6ExtHdrFragment(_IPv6ExtHdr): diff --git a/test/test_srv6.py b/test/test_srv6.py new file mode 100644 index 00000000..a31b30eb --- /dev/null +++ b/test/test_srv6.py @@ -0,0 +1,1997 @@ +#!/usr/bin/env python + +import unittest +from socket import AF_INET6 + +from framework import VppTestCase, VppTestRunner +from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto +from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ + SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes + +from scapy.packet import Raw +from scapy.layers.l2 import Ether, Dot1Q +from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting +from scapy.layers.inet import IP, UDP + +from scapy.utils import inet_pton, inet_ntop + +from util import ppp + + +class TestSRv6(VppTestCase): + """ SRv6 Test Case """ + + @classmethod + def setUpClass(self): + super(TestSRv6, self).setUpClass() + + def setUp(self): + """ Perform test setup before each test case. + """ + super(TestSRv6, self).setUp() + + # packet sizes, inclusive L2 overhead + self.pg_packet_sizes = [64, 512, 1518, 9018] + + # reset packet_infos + self.reset_packet_infos() + + def tearDown(self): + """ Clean up test setup after each test case. + """ + self.teardown_interfaces() + + super(TestSRv6, self).tearDown() + + def configure_interface(self, + interface, + ipv6=False, ipv4=False, + ipv6_table_id=0, ipv4_table_id=0): + """ Configure interface. + :param ipv6: configure IPv6 on interface + :param ipv4: configure IPv4 on interface + :param ipv6_table_id: FIB table_id for IPv6 + :param ipv4_table_id: FIB table_id for IPv4 + """ + self.logger.debug("Configuring interface %s" % (interface.name)) + if ipv6: + self.logger.debug("Configuring IPv6") + interface.set_table_ip6(ipv6_table_id) + interface.config_ip6() + interface.resolve_ndp(timeout=5) + if ipv4: + self.logger.debug("Configuring IPv4") + interface.set_table_ip4(ipv4_table_id) + interface.config_ip4() + interface.resolve_arp() + interface.admin_up() + + def setup_interfaces(self, ipv6=[], ipv4=[], + ipv6_table_id=[], ipv4_table_id=[]): + """ Create and configure interfaces. + + :param ipv6: list of interface IPv6 capabilities + :param ipv4: list of interface IPv4 capabilities + :param ipv6_table_id: list of intf IPv6 FIB table_ids + :param ipv4_table_id: list of intf IPv4 FIB table_ids + :returns: List of created interfaces. + """ + # how many interfaces? + if len(ipv6): + count = len(ipv6) + else: + count = len(ipv4) + self.logger.debug("Creating and configuring %d interfaces" % (count)) + + # fill up ipv6 and ipv4 lists if needed + # not enabled (False) is the default + if len(ipv6) < count: + ipv6 += (count - len(ipv6)) * [False] + if len(ipv4) < count: + ipv4 += (count - len(ipv4)) * [False] + + # fill up table_id lists if needed + # table_id 0 (global) is the default + if len(ipv6_table_id) < count: + ipv6_table_id += (count - len(ipv6_table_id)) * [0] + if len(ipv4_table_id) < count: + ipv4_table_id += (count - len(ipv4_table_id)) * [0] + + # create 'count' pg interfaces + self.create_pg_interfaces(range(count)) + + # setup all interfaces + for i in range(count): + intf = self.pg_interfaces[i] + self.configure_interface(intf, + ipv6[i], ipv4[i], + ipv6_table_id[i], ipv4_table_id[i]) + + if any(ipv6): + self.logger.debug(self.vapi.cli("show ip6 neighbors")) + if any(ipv4): + self.logger.debug(self.vapi.cli("show ip arp")) + self.logger.debug(self.vapi.cli("show interface")) + self.logger.debug(self.vapi.cli("show hardware")) + + return self.pg_interfaces + + def teardown_interfaces(self): + """ Unconfigure and bring down interface. + """ + self.logger.debug("Tearing down interfaces") + # tear down all interfaces + # AFAIK they cannot be deleted + for i in self.pg_interfaces: + self.logger.debug("Tear down interface %s" % (i.name)) + i.admin_down() + i.unconfig() + + def test_SRv6_T_Encaps(self): + """ Test SRv6 Transit.Encaps behavior for IPv6. + """ + # send traffic to one destination interface + # source and destination are IPv6 only + self.setup_interfaces(ipv6=[True, True]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure encaps IPv6 source address + # needs to be done before SR Policy config + # TODO: API? + self.vapi.cli("set sr encaps source addr a3::") + + bsid = 'a3::9999:1' + # configure SRv6 Policy + # Note: segment list order: first -> last + sr_policy = VppSRv6Policy( + self, bsid=bsid, + is_encap=1, + sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, + weight=1, fib_table=0, + segments=['a4::', 'a5::', 'a6::c7'], + source='a3::') + sr_policy.add_vpp_config() + self.sr_policy = sr_policy + + # log the sr policies + self.logger.info(self.vapi.cli("show sr policies")) + + # steer IPv6 traffic to a7::/64 into SRv6 Policy + # use the bsid of the above self.sr_policy + pol_steering = VppSRv6Steering( + self, + bsid=self.sr_policy.bsid, + prefix="a7::", mask_width=64, + traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6, + sr_policy_index=0, table_id=0, + sw_if_index=0) + pol_steering.add_vpp_config() + + # log the sr steering policies + self.logger.info(self.vapi.cli("show sr steering policies")) + + # create packets + count = len(self.pg_packet_sizes) + dst_inner = 'a7::1234' + pkts = [] + + # create IPv6 packets without SRH + packet_header = self.create_packet_header_IPv6(dst_inner) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # create IPv6 packets with SRH + # packets with segments-left 1, active segment a7:: + packet_header = self.create_packet_header_IPv6_SRH( + sidlist=['a8::', 'a7::', 'a6::'], + segleft=1) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # create IPv6 packets with SRH and IPv6 + # packets with segments-left 1, active segment a7:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a8::', 'a7::', 'a6::'], + segleft=1) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_T_Encaps) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SR steering + pol_steering.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr steering policies")) + + # remove SR Policies + self.sr_policy.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr policies")) + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_T_Insert(self): + """ Test SRv6 Transit.Insert behavior (IPv6 only). + """ + # send traffic to one destination interface + # source and destination are IPv6 only + self.setup_interfaces(ipv6=[True, True]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure encaps IPv6 source address + # needs to be done before SR Policy config + # TODO: API? + self.vapi.cli("set sr encaps source addr a3::") + + bsid = 'a3::9999:1' + # configure SRv6 Policy + # Note: segment list order: first -> last + sr_policy = VppSRv6Policy( + self, bsid=bsid, + is_encap=0, + sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, + weight=1, fib_table=0, + segments=['a4::', 'a5::', 'a6::c7'], + source='a3::') + sr_policy.add_vpp_config() + self.sr_policy = sr_policy + + # log the sr policies + self.logger.info(self.vapi.cli("show sr policies")) + + # steer IPv6 traffic to a7::/64 into SRv6 Policy + # use the bsid of the above self.sr_policy + pol_steering = VppSRv6Steering( + self, + bsid=self.sr_policy.bsid, + prefix="a7::", mask_width=64, + traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6, + sr_policy_index=0, table_id=0, + sw_if_index=0) + pol_steering.add_vpp_config() + + # log the sr steering policies + self.logger.info(self.vapi.cli("show sr steering policies")) + + # create packets + count = len(self.pg_packet_sizes) + dst_inner = 'a7::1234' + pkts = [] + + # create IPv6 packets without SRH + packet_header = self.create_packet_header_IPv6(dst_inner) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # create IPv6 packets with SRH + # packets with segments-left 1, active segment a7:: + packet_header = self.create_packet_header_IPv6_SRH( + sidlist=['a8::', 'a7::', 'a6::'], + segleft=1) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_T_Insert) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SR steering + pol_steering.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr steering policies")) + + # remove SR Policies + self.sr_policy.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr policies")) + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_T_Encaps_IPv4(self): + """ Test SRv6 Transit.Encaps behavior for IPv4. + """ + # send traffic to one destination interface + # source interface is IPv4 only + # destination interface is IPv6 only + self.setup_interfaces(ipv6=[False, True], ipv4=[True, False]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure encaps IPv6 source address + # needs to be done before SR Policy config + # TODO: API? + self.vapi.cli("set sr encaps source addr a3::") + + bsid = 'a3::9999:1' + # configure SRv6 Policy + # Note: segment list order: first -> last + sr_policy = VppSRv6Policy( + self, bsid=bsid, + is_encap=1, + sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, + weight=1, fib_table=0, + segments=['a4::', 'a5::', 'a6::c7'], + source='a3::') + sr_policy.add_vpp_config() + self.sr_policy = sr_policy + + # log the sr policies + self.logger.info(self.vapi.cli("show sr policies")) + + # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy + # use the bsid of the above self.sr_policy + pol_steering = VppSRv6Steering( + self, + bsid=self.sr_policy.bsid, + prefix="7.1.1.0", mask_width=24, + traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4, + sr_policy_index=0, table_id=0, + sw_if_index=0) + pol_steering.add_vpp_config() + + # log the sr steering policies + self.logger.info(self.vapi.cli("show sr steering policies")) + + # create packets + count = len(self.pg_packet_sizes) + dst_inner = '7.1.1.123' + pkts = [] + + # create IPv4 packets + packet_header = self.create_packet_header_IPv4(dst_inner) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_T_Encaps_IPv4) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SR steering + pol_steering.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr steering policies")) + + # remove SR Policies + self.sr_policy.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr policies")) + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + @unittest.skip("VPP crashes after running this test") + def test_SRv6_T_Encaps_L2(self): + """ Test SRv6 Transit.Encaps behavior for L2. + """ + # send traffic to one destination interface + # source interface is IPv4 only TODO? + # destination interface is IPv6 only + self.setup_interfaces(ipv6=[False, True], ipv4=[False, False]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure encaps IPv6 source address + # needs to be done before SR Policy config + # TODO: API? + self.vapi.cli("set sr encaps source addr a3::") + + bsid = 'a3::9999:1' + # configure SRv6 Policy + # Note: segment list order: first -> last + sr_policy = VppSRv6Policy( + self, bsid=bsid, + is_encap=1, + sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, + weight=1, fib_table=0, + segments=['a4::', 'a5::', 'a6::c7'], + source='a3::') + sr_policy.add_vpp_config() + self.sr_policy = sr_policy + + # log the sr policies + self.logger.info(self.vapi.cli("show sr policies")) + + # steer L2 traffic into SRv6 Policy + # use the bsid of the above self.sr_policy + pol_steering = VppSRv6Steering( + self, + bsid=self.sr_policy.bsid, + prefix="::", mask_width=0, + traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2, + sr_policy_index=0, table_id=0, + sw_if_index=self.pg0.sw_if_index) + pol_steering.add_vpp_config() + + # log the sr steering policies + self.logger.info(self.vapi.cli("show sr steering policies")) + + # create packets + count = len(self.pg_packet_sizes) + pkts = [] + + # create L2 packets without dot1q header + packet_header = self.create_packet_header_L2() + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # create L2 packets with dot1q header + packet_header = self.create_packet_header_L2(vlan=123) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_T_Encaps_L2) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SR steering + pol_steering.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr steering policies")) + + # remove SR Policies + self.sr_policy.remove_vpp_config() + self.logger.info(self.vapi.cli("show sr policies")) + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End(self): + """ Test SRv6 End (without PSP) behavior. + """ + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, True]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID End without PSP behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='A3::0', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END, + nh_addr='::', + end_psp=0, + sw_if_index=0, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=2, SL=1, SL=0) + # send one packet per SL value per packet size + # SL=0 packet with localSID End with USP needs 2nd SRH + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' + pkts = [] + + # packets with segments-left 2, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a5::', 'a4::', 'a3::'], + segleft=2) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets with segments-left 1, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a4::', 'a3::', 'a2::'], + segleft=1) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # TODO: test behavior with SL=0 packet (needs 2*SRH?) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_with_PSP(self): + """ Test SRv6 End with PSP behavior. + """ + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, True]) + + # configure FIB entries + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID End with PSP behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='A3::0', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END, + nh_addr='::', + end_psp=1, + sw_if_index=0, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=2, SL=1) + # send one packet per SL value per packet size + # SL=0 packet with localSID End with PSP is dropped + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' + pkts = [] + + # packets with segments-left 2, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a5::', 'a4::', 'a3::'], + segleft=2) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets with segments-left 1, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a4::', 'a3::', 'a2::'], + segleft=1) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End_PSP) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_X(self): + """ Test SRv6 End.X (without PSP) behavior. + """ + # create three interfaces (1 source, 2 destinations) + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, True, True]) + + # configure FIB entries + # a4::/64 via pg1 and pg2 + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6), + VppRoutePath(self.pg2.remote_ip6, + self.pg2.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + self.logger.debug(self.vapi.cli("show ip6 fib")) + + # configure SRv6 localSID End.X without PSP behavior + # End.X points to interface pg1 + localsid = VppSRv6LocalSID( + self, localsid_addr='A3::C4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X, + nh_addr=self.pg1.remote_ip6, + end_psp=0, + sw_if_index=self.pg1.sw_if_index, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=2, SL=1) + # send one packet per SL value per packet size + # SL=0 packet with localSID End with PSP is dropped + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' + pkts = [] + + # packets with segments-left 2, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a5::', 'a4::', 'a3::c4'], + segleft=2) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets with segments-left 1, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a4::', 'a3::c4', 'a2::'], + segleft=1) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + # using same comparison function as End (no PSP) + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End) + + # assert nothing was received on the other interface (pg2) + self.pg2.assert_nothing_captured("mis-directed packet(s)") + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_X_with_PSP(self): + """ Test SRv6 End.X with PSP behavior. + """ + # create three interfaces (1 source, 2 destinations) + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, True, True]) + + # configure FIB entries + # a4::/64 via pg1 and pg2 + route = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6), + VppRoutePath(self.pg2.remote_ip6, + self.pg2.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID End with PSP behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='A3::C4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X, + nh_addr=self.pg1.remote_ip6, + end_psp=1, + sw_if_index=self.pg1.sw_if_index, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=2, SL=1) + # send one packet per SL value per packet size + # SL=0 packet with localSID End with PSP is dropped + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' + pkts = [] + + # packets with segments-left 2, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a5::', 'a4::', 'a3::c4'], + segleft=2) + # create traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets with segments-left 1, active segment a3:: + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a4::', 'a3::c4', 'a2::'], + segleft=1) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + # using same comparison function as End with PSP + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End_PSP) + + # assert nothing was received on the other interface (pg2) + self.pg2.assert_nothing_captured("mis-directed packet(s)") + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_DX6(self): + """ Test SRv6 End.DX6 behavior. + """ + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, True]) + + # configure SRv6 localSID End.DX6 behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='a3::c4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6, + nh_addr=self.pg1.remote_ip6, + end_psp=0, + sw_if_index=self.pg1.sw_if_index, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=0) + # send one packet per packet size + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' # inner header destination address + pkts = [] + + # packets with SRH, segments-left 0, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, IPv6 in IPv6 + # outer IPv6 dest addr is the localsid End.DX6 + packet_header = self.create_packet_header_IPv6_IPv6( + dst_inner, + dst_outer='a3::c4') + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End_DX6) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_DT6(self): + """ Test SRv6 End.DT6 behavior. + """ + # create three interfaces (1 source, 2 destinations) + # all interfaces are IPv6 only + # source interface in global FIB (0) + # destination interfaces in global and vrf + vrf_1 = 1 + self.setup_interfaces(ipv6=[True, True, True], + ipv6_table_id=[0, 0, vrf_1]) + + # configure FIB entries + # a4::/64 is reachable + # via pg1 in table 0 (global) + # and via pg2 in table vrf_1 + route0 = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg1.remote_ip6, + self.pg1.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6, + nh_table_id=0)], + table_id=0, + is_ip6=1) + route0.add_vpp_config() + route1 = VppIpRoute(self, "a4::", 64, + [VppRoutePath(self.pg2.remote_ip6, + self.pg2.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6, + nh_table_id=vrf_1)], + table_id=vrf_1, + is_ip6=1) + route1.add_vpp_config() + self.logger.debug(self.vapi.cli("show ip6 fib")) + + # configure SRv6 localSID End.DT6 behavior + # Note: + # fib_table: where the localsid is installed + # sw_if_index: in T-variants of localsid this is the vrf table_id + localsid = VppSRv6LocalSID( + self, localsid_addr='a3::c4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6, + nh_addr='::', + end_psp=0, + sw_if_index=vrf_1, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=0) + # send one packet per packet size + count = len(self.pg_packet_sizes) + dst_inner = 'a4::1234' # inner header destination address + pkts = [] + + # packets with SRH, segments-left 0, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv6( + dst_inner, + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, IPv6 in IPv6 + # outer IPv6 dest addr is the localsid End.DT6 + packet_header = self.create_packet_header_IPv6_IPv6( + dst_inner, + dst_outer='a3::c4') + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + # using same comparison function as End.DX6 + self.send_and_verify_pkts(self.pg0, pkts, self.pg2, + self.compare_rx_tx_packet_End_DX6) + + # assert nothing was received on the other interface (pg2) + self.pg1.assert_nothing_captured("mis-directed packet(s)") + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_DX4(self): + """ Test SRv6 End.DX4 behavior. + """ + # send traffic to one destination interface + # source interface is IPv6 only + # destination interface is IPv4 only + self.setup_interfaces(ipv6=[True, False], ipv4=[False, True]) + + # configure SRv6 localSID End.DX4 behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='a3::c4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4, + nh_addr=self.pg1.remote_ip4, + end_psp=0, + sw_if_index=self.pg1.sw_if_index, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + dst_inner = '4.1.1.123' # inner header destination address + pkts = [] + + # packets with SRH, segments-left 0, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv4( + dst_inner, + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, IPv4 in IPv6 + # outer IPv6 dest addr is the localsid End.DX4 + packet_header = self.create_packet_header_IPv6_IPv4( + dst_inner, + dst_outer='a3::c4') + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End_DX4) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_DT4(self): + """ Test SRv6 End.DT4 behavior. + """ + # create three interfaces (1 source, 2 destinations) + # source interface is IPv6-only + # destination interfaces are IPv4 only + # source interface in global FIB (0) + # destination interfaces in global and vrf + vrf_1 = 1 + self.setup_interfaces(ipv6=[True, False, False], + ipv4=[False, True, True], + ipv6_table_id=[0, 0, 0], + ipv4_table_id=[0, 0, vrf_1]) + + # configure FIB entries + # 4.1.1.0/24 is reachable + # via pg1 in table 0 (global) + # and via pg2 in table vrf_1 + route0 = VppIpRoute(self, "4.1.1.0", 24, + [VppRoutePath(self.pg1.remote_ip4, + self.pg1.sw_if_index, + nh_table_id=0)], + table_id=0, + is_ip6=0) + route0.add_vpp_config() + route1 = VppIpRoute(self, "4.1.1.0", 24, + [VppRoutePath(self.pg2.remote_ip4, + self.pg2.sw_if_index, + nh_table_id=vrf_1)], + table_id=vrf_1, + is_ip6=0) + route1.add_vpp_config() + self.logger.debug(self.vapi.cli("show ip fib")) + + # configure SRv6 localSID End.DT6 behavior + # Note: + # fib_table: where the localsid is installed + # sw_if_index: in T-variants of localsid: vrf table_id + localsid = VppSRv6LocalSID( + self, localsid_addr='a3::c4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4, + nh_addr='::', + end_psp=0, + sw_if_index=vrf_1, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # create IPv6 packets with SRH (SL=0) + # send one packet per packet size + count = len(self.pg_packet_sizes) + dst_inner = '4.1.1.123' # inner header destination address + pkts = [] + + # packets with SRH, segments-left 0, active segment a3::c4 + packet_header = self.create_packet_header_IPv6_SRH_IPv4( + dst_inner, + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, IPv6 in IPv6 + # outer IPv6 dest addr is the localsid End.DX4 + packet_header = self.create_packet_header_IPv6_IPv4( + dst_inner, + dst_outer='a3::c4') + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + # using same comparison function as End.DX4 + self.send_and_verify_pkts(self.pg0, pkts, self.pg2, + self.compare_rx_tx_packet_End_DX4) + + # assert nothing was received on the other interface (pg2) + self.pg1.assert_nothing_captured("mis-directed packet(s)") + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # remove FIB entries + # done by tearDown + + # cleanup interfaces + self.teardown_interfaces() + + def test_SRv6_End_DX2(self): + """ Test SRv6 End.DX2 behavior. + """ + # send traffic to one destination interface + # source interface is IPv6 only + self.setup_interfaces(ipv6=[True, False], ipv4=[False, False]) + + # configure SRv6 localSID End.DX2 behavior + localsid = VppSRv6LocalSID( + self, localsid_addr='a3::c4', + behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2, + nh_addr='::', + end_psp=0, + sw_if_index=self.pg1.sw_if_index, + vlan_index=0, + fib_table=0) + localsid.add_vpp_config() + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + pkts = [] + + # packets with SRH, segments-left 0, active segment a3::c4 + # L2 has no dot1q header + packet_header = self.create_packet_header_IPv6_SRH_L2( + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0, + vlan=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets with SRH, segments-left 0, active segment a3::c4 + # L2 has dot1q header + packet_header = self.create_packet_header_IPv6_SRH_L2( + sidlist=['a3::c4', 'a2::', 'a1::'], + segleft=0, + vlan=123) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, L2 in IPv6 + # outer IPv6 dest addr is the localsid End.DX2 + # L2 has no dot1q header + packet_header = self.create_packet_header_IPv6_L2( + dst_outer='a3::c4', + vlan=0) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # packets without SRH, L2 in IPv6 + # outer IPv6 dest addr is the localsid End.DX2 + # L2 has dot1q header + packet_header = self.create_packet_header_IPv6_L2( + dst_outer='a3::c4', + vlan=123) + # add to traffic stream pg0->pg1 + pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, + self.pg_packet_sizes, count)) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts, self.pg1, + self.compare_rx_tx_packet_End_DX2) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + localsid.remove_vpp_config() + + # cleanup interfaces + self.teardown_interfaces() + + def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing T.Encaps + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # T.Encaps updates the headers as follows: + # SR Policy seglist (S3, S2, S1) + # SR Policy source C + # IPv6: + # in: IPv6(A, B2) + # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2) + # IPv6 + SRH: + # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1) + # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + + tx_ip = tx_pkt.getlayer(IPv6) + + # expected segment-list + seglist = self.sr_policy.segments + # reverse list to get order as in SRH + tx_seglist = seglist[::-1] + + # get source address of SR Policy + sr_policy_source = self.sr_policy.source + + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, sr_policy_source) + # received ip.dst should be equal to expected sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + # rx'ed seglist should be equal to expected seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size expected seglist-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + # except for the hop-limit field + # -> update tx'ed hlim to the expected hlim + tx_ip.hlim = tx_ip.hlim - 1 + + self.assertEqual(rx_srh.payload, tx_ip) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing T.Encaps for IPv4 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # T.Encaps for IPv4 updates the headers as follows: + # SR Policy seglist (S3, S2, S1) + # SR Policy source C + # IPv4: + # in: IPv4(A, B2) + # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + + tx_ip = tx_pkt.getlayer(IP) + + # expected segment-list + seglist = self.sr_policy.segments + # reverse list to get order as in SRH + tx_seglist = seglist[::-1] + + # get source address of SR Policy + sr_policy_source = self.sr_policy.source + + # checks common to cases tx with and without SRH + # rx'ed packet should have SRH and IPv4 header + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + self.assertTrue(rx_ip.payload.haslayer(IP)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, sr_policy_source) + # received ip.dst should be equal to sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + # rx'ed seglist should be equal to seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size seglist-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + # except for the ttl field and ip checksum + # -> adjust tx'ed ttl to expected ttl + tx_ip.ttl = tx_ip.ttl - 1 + # -> set tx'ed ip checksum to None and let scapy recompute + tx_ip.chksum = None + # read back the pkt (with str()) to force computing these fields + # probably other ways to accomplish this are possible + tx_ip = IP(str(tx_ip)) + + self.assertEqual(rx_srh.payload, tx_ip) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing T.Encaps for L2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # T.Encaps for L2 updates the headers as follows: + # SR Policy seglist (S3, S2, S1) + # SR Policy source C + # L2: + # in: L2 + # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2 + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + + tx_ether = tx_pkt.getlayer(Ether) + + # expected segment-list + seglist = self.sr_policy.segments + # reverse list to get order as in SRH + tx_seglist = seglist[::-1] + + # get source address of SR Policy + sr_policy_source = self.sr_policy.source + + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, sr_policy_source) + # received ip.dst should be equal to sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + # rx'ed seglist should be equal to seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size seglist-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + # nh should be "No Next Header" (59) + self.assertEqual(rx_srh.nh, 59) + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + self.assertEqual(Ether(str(rx_srh.payload)), tx_ether) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing T.Insert + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # T.Insert updates the headers as follows: + # IPv6: + # in: IPv6(A, B2) + # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3) + # IPv6 + SRH: + # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1) + # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + rx_ip2 = None + rx_srh2 = None + rx_ip3 = None + rx_udp = rx_pkt[UDP] + + tx_ip = tx_pkt.getlayer(IPv6) + tx_srh = None + tx_ip2 = None + # some packets have been tx'ed with an SRH, some without it + # get SRH if tx'ed packet has it + if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting): + tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + tx_ip2 = tx_pkt.getlayer(IPv6, 2) + tx_udp = tx_pkt[UDP] + + # expected segment-list (make copy of SR Policy segment list) + seglist = self.sr_policy.segments[:] + # expected seglist has initial dest addr as last segment + seglist.append(tx_ip.dst) + # reverse list to get order as in SRH + tx_seglist = seglist[::-1] + + # get source address of SR Policy + sr_policy_source = self.sr_policy.source + + # checks common to cases tx with and without SRH + # rx'ed packet should have SRH and only one IPv6 header + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + self.assertFalse(rx_ip.payload.haslayer(IPv6)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # rx'ed ip.src should be equal to tx'ed ip.src + self.assertEqual(rx_ip.src, tx_ip.src) + # rx'ed ip.dst should be equal to sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + + # rx'ed seglist should be equal to expected seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size(expected seglist)-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + + if tx_srh: # packet was tx'ed with SRH + # packet should have 2nd SRH + self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting)) + # get 2nd SRH + rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2) + + # rx'ed srh2.addresses should be equal to tx'ed srh.addresses + self.assertEqual(rx_srh2.addresses, tx_srh.addresses) + # rx'ed srh2.segleft should be equal to tx'ed srh.segleft + self.assertEqual(rx_srh2.segleft, tx_srh.segleft) + # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry + self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry) + + else: # packet was tx'ed without SRH + # rx packet should have no other SRH + self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting)) + + # UDP layer should be unchanged + self.assertEqual(rx_udp, tx_udp) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End (without PSP) + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # End (no PSP) updates the headers as follows: + # IPv6 + SRH: + # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2) + # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + rx_ip2 = None + rx_udp = rx_pkt[UDP] + + tx_ip = tx_pkt.getlayer(IPv6) + # we know the packet has been tx'ed + # with an inner IPv6 header and an SRH + tx_ip2 = tx_pkt.getlayer(IPv6, 2) + tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + tx_udp = tx_pkt[UDP] + + # common checks, regardless of tx segleft value + # rx'ed packet should have 2nd IPv6 header + self.assertTrue(rx_ip.payload.haslayer(IPv6)) + # get second (inner) IPv6 header + rx_ip2 = rx_pkt.getlayer(IPv6, 2) + + if tx_ip.segleft > 0: + # SRH should NOT have been popped: + # End SID without PSP does not pop SRH if segleft>0 + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # received ip.src should be equal to expected ip.src + self.assertEqual(rx_ip.src, tx_ip.src) + # sidlist should be unchanged + self.assertEqual(rx_srh.addresses, tx_srh.addresses) + # segleft should have been decremented + self.assertEqual(rx_srh.segleft, tx_srh.segleft-1) + # received ip.dst should be equal to sidlist[segleft] + self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft]) + # lastentry should be unchanged + self.assertEqual(rx_srh.lastentry, tx_srh.lastentry) + # inner IPv6 packet (ip2) should be unchanged + self.assertEqual(rx_ip2.src, tx_ip2.src) + self.assertEqual(rx_ip2.dst, tx_ip2.dst) + # else: # tx_ip.segleft == 0 + # TODO: Does this work with 2 SRHs in ingress packet? + + # UDP layer should be unchanged + self.assertEqual(rx_udp, tx_udp) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End with PSP + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # End (PSP) updates the headers as follows: + # IPv6 + SRH (SL>1): + # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2) + # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1) + # IPv6 + SRH (SL=1): + # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1) + # out: IPv6(A, S3) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + rx_ip2 = None + rx_udp = rx_pkt[UDP] + + tx_ip = tx_pkt.getlayer(IPv6) + # we know the packet has been tx'ed + # with an inner IPv6 header and an SRH + tx_ip2 = tx_pkt.getlayer(IPv6, 2) + tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + tx_udp = tx_pkt[UDP] + + # common checks, regardless of tx segleft value + self.assertTrue(rx_ip.payload.haslayer(IPv6)) + rx_ip2 = rx_pkt.getlayer(IPv6, 2) + # inner IPv6 packet (ip2) should be unchanged + self.assertEqual(rx_ip2.src, tx_ip2.src) + self.assertEqual(rx_ip2.dst, tx_ip2.dst) + + if tx_ip.segleft > 1: + # SRH should NOT have been popped: + # End SID with PSP does not pop SRH if segleft>1 + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + + # received ip.src should be equal to expected ip.src + self.assertEqual(rx_ip.src, tx_ip.src) + # sidlist should be unchanged + self.assertEqual(rx_srh.addresses, tx_srh.addresses) + # segleft should have been decremented + self.assertEqual(rx_srh.segleft, tx_srh.segleft-1) + # received ip.dst should be equal to sidlist[segleft] + self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft]) + # lastentry should be unchanged + self.assertEqual(rx_srh.lastentry, tx_srh.lastentry) + + else: # tx_ip.segleft <= 1 + # SRH should have been popped: + # End SID with PSP and segleft=1 pops SRH + # the two IPv6 headers are still present + # outer IPv6 header has DA == last segment of popped SRH + # SRH should not be present + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # outer IPv6 header ip.src should be equal to tx'ed ip.src + self.assertEqual(rx_ip.src, tx_ip.src) + # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1] + self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1]) + + # UDP layer should be unchanged + self.assertEqual(rx_udp, tx_udp) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.DX6 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # End.DX6 updates the headers as follows: + # IPv6 + SRH (SL=0): + # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D) + # out: IPv6(B, D) + # IPv6: + # in: IPv6(A, S3)IPv6(B, D) + # out: IPv6(B, D) + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + + tx_ip = tx_pkt.getlayer(IPv6) + tx_ip2 = tx_pkt.getlayer(IPv6, 2) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_ip pkt should be equal to tx_ip2 + # except for the hlim field + # -> adjust tx'ed hlim to expected hlim + tx_ip2.hlim = tx_ip2.hlim - 1 + + self.assertEqual(rx_ip, tx_ip2) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.DX4 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # End.DX4 updates the headers as follows: + # IPv6 + SRH (SL=0): + # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D) + # out: IPv4(B, D) + # IPv6: + # in: IPv6(A, S3)IPv4(B, D) + # out: IPv4(B, D) + + # get IPv4 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IP) + + tx_ip = tx_pkt.getlayer(IPv6) + tx_ip2 = tx_pkt.getlayer(IP) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_ip pkt should be equal to tx_ip2 + # except for the ttl field and ip checksum + # -> adjust tx'ed ttl to expected ttl + tx_ip2.ttl = tx_ip2.ttl - 1 + # -> set tx'ed ip checksum to None and let scapy recompute + tx_ip2.chksum = None + # read back the pkt (with str()) to force computing these fields + # probably other ways to accomplish this are possible + tx_ip2 = IP(str(tx_ip2)) + + self.assertEqual(rx_ip, tx_ip2) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.DX2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + # End.DX2 updates the headers as follows: + # IPv6 + SRH (SL=0): + # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2 + # out: L2 + # IPv6: + # in: IPv6(A, S3)L2 + # out: L2 + + # get IPv4 header of rx'ed packet + rx_eth = rx_pkt.getlayer(Ether) + + tx_ip = tx_pkt.getlayer(IPv6) + # we can't just get the 2nd Ether layer + # get the Raw content and dissect it as Ether + tx_eth1 = Ether(str(tx_pkt[Raw])) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_eth pkt should be equal to tx_eth1 + self.assertEqual(rx_eth, tx_eth1) + + self.logger.debug("packet verification: SUCCESS") + + def create_stream(self, src_if, dst_if, packet_header, packet_sizes, + count): + """Create SRv6 input packet stream for defined interface. + + :param VppInterface src_if: Interface to create packet stream for + :param VppInterface dst_if: destination interface of packet stream + :param packet_header: Layer3 scapy packet headers, + L2 is added when not provided, + Raw(payload) with packet_info is added + :param list packet_sizes: packet stream pckt sizes,sequentially applied + to packets in stream have + :param int count: number of packets in packet stream + :return: list of packets + """ + self.logger.info("Creating packets") + pkts = [] + for i in range(0, count-1): + payload_info = self.create_packet_info(src_if, dst_if) + self.logger.debug( + "Creating packet with index %d" % (payload_info.index)) + payload = self.info_to_payload(payload_info) + # add L2 header if not yet provided in packet_header + if packet_header.getlayer(0).name == 'Ethernet': + p = (packet_header / + Raw(payload)) + else: + p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / + packet_header / + Raw(payload)) + size = packet_sizes[i % len(packet_sizes)] + self.logger.debug("Packet size %d" % (size)) + self.extend_packet(p, size) + # we need to store the packet with the automatic fields computed + # read back the dumped packet (with str()) + # to force computing these fields + # probably other ways are possible + p = Ether(str(p)) + payload_info.data = p.copy() + self.logger.debug(ppp("Created packet:", p)) + pkts.append(p) + self.logger.info("Done creating packets") + return pkts + + def send_and_verify_pkts(self, input, pkts, output, compare_func): + """Send packets and verify received packets using compare_func + + :param input: ingress interface of DUT + :param pkts: list of packets to transmit + :param output: egress interface of DUT + :param compare_func: function to compare in and out packets + """ + # add traffic stream to input interface + input.add_stream(pkts) + + # enable capture on all interfaces + self.pg_enable_capture(self.pg_interfaces) + + # start traffic + self.logger.info("Starting traffic") + self.pg_start() + + # get output capture + self.logger.info("Getting packet capture") + capture = output.get_capture() + + # assert nothing was captured on input interface + input.assert_nothing_captured() + + # verify captured packets + self.verify_captured_pkts(output, capture, compare_func) + + def create_packet_header_IPv6(self, dst): + """Create packet header: IPv6 header, UDP header + + :param dst: IPv6 destination address + + IPv6 source address is 1234::1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=dst) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_SRH(self, sidlist, segleft): + """Create packet header: IPv6 header with SRH, UDP header + + :param list sidlist: segment list + :param int segleft: segments-left field value + + IPv6 destination address is set to sidlist[segleft] + IPv6 source addresses are 1234::1 and 4321::1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft): + """Create packet header: IPv6 encapsulated in SRv6: + IPv6 header with SRH, IPv6 header, UDP header + + :param ipv6address dst: inner IPv6 destination address + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source addresses are 1234::1 and 4321::1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=41) / + IPv6(src='4321::1', dst=dst) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer): + """Create packet header: IPv6 encapsulated in IPv6: + IPv6 header, IPv6 header, UDP header + + :param ipv6address dst_inner: inner IPv6 destination address + :param ipv6address dst_outer: outer IPv6 destination address + + IPv6 source addresses are 1234::1 and 4321::1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=dst_outer) / + IPv6(src='4321::1', dst=dst_inner) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1, + sidlist2, segleft2): + """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH: + IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header + + :param ipv6address dst: inner IPv6 destination address + :param list sidlist1: segment list of outer IPv6 SRH + :param int segleft1: segments-left field of outer IPv6 SRH + :param list sidlist2: segment list of inner IPv6 SRH + :param int segleft2: segments-left field of inner IPv6 SRH + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source addresses are 1234::1 and 4321::1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist1, + segleft=segleft1, nh=43) / + IPv6ExtHdrSegmentRouting(addresses=sidlist2, + segleft=segleft2, nh=41) / + IPv6(src='4321::1', dst=dst) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv4(self, dst): + """Create packet header: IPv4 header, UDP header + + :param dst: IPv4 destination address + + IPv4 source address is 123.1.1.1 + UDP source port and destination port are 1234 + """ + + p = (IP(src='123.1.1.1', dst=dst) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer): + """Create packet header: IPv4 encapsulated in IPv6: + IPv6 header, IPv4 header, UDP header + + :param ipv4address dst_inner: inner IPv4 destination address + :param ipv6address dst_outer: outer IPv6 destination address + + IPv6 source address is 1234::1 + IPv4 source address is 123.1.1.1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=dst_outer) / + IP(src='123.1.1.1', dst=dst_inner) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft): + """Create packet header: IPv4 encapsulated in SRv6: + IPv6 header with SRH, IPv4 header, UDP header + + :param ipv4address dst: inner IPv4 destination address + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source address is 1234::1 + IPv4 source address is 123.1.1.1 + UDP source port and destination port are 1234 + """ + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=4) / + IP(src='123.1.1.1', dst=dst) / + UDP(sport=1234, dport=1234)) + return p + + def create_packet_header_L2(self, vlan=0): + """Create packet header: L2 header + + :param vlan: if vlan!=0 then add 802.1q header + """ + # Note: the dst addr ('00:55:44:33:22:11') is used in + # the compare function compare_rx_tx_packet_T_Encaps_L2 + # to detect presence of L2 in SRH payload + p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + p /= Dot1Q(vlan=vlan, type=etype) + else: + p.type = etype + return p + + def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0): + """Create packet header: L2 encapsulated in SRv6: + IPv6 header with SRH, L2 + + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source address is 1234::1 + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=59) / + eth) + return p + + def create_packet_header_IPv6_L2(self, dst_outer, vlan=0): + """Create packet header: L2 encapsulated in IPv6: + IPv6 header, L2 + + :param ipv6address dst_outer: outer IPv6 destination address + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth) + return p + + def get_payload_info(self, packet): + """ Extract the payload_info from the packet + """ + # in most cases, payload_info is in packet[Raw] + # but packet[Raw] gives the complete payload + # (incl L2 header) for the T.Encaps L2 case + try: + payload_info = self.payload_to_info(str(packet[Raw])) + + except: + # remote L2 header from packet[Raw]: + # take packet[Raw], convert it to an Ether layer + # and then extract Raw from it + payload_info = self.payload_to_info( + str(Ether(str(packet[Raw]))[Raw])) + + return payload_info + + def verify_captured_pkts(self, dst_if, capture, compare_func): + """ + Verify captured packet stream for specified interface. + Compare ingress with egress packets using the specified compare fn + + :param dst_if: egress interface of DUT + :param capture: captured packets + :param compare_func: function to compare in and out packet + """ + self.logger.info("Verifying capture on interface %s using function %s" + % (dst_if.name, compare_func.func_name)) + + last_info = dict() + for i in self.pg_interfaces: + last_info[i.sw_if_index] = None + dst_sw_if_index = dst_if.sw_if_index + + for packet in capture: + try: + # extract payload_info from packet's payload + payload_info = self.get_payload_info(packet) + packet_index = payload_info.index + + self.logger.debug("Verifying packet with index %d" + % (packet_index)) + # packet should have arrived on the expected interface + self.assertEqual(payload_info.dst, dst_sw_if_index) + self.logger.debug( + "Got packet on interface %s: src=%u (idx=%u)" % + (dst_if.name, payload_info.src, packet_index)) + + # search for payload_info with same src and dst if_index + # this will give us the transmitted packet + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, dst_sw_if_index, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + # next_info should not be None + self.assertTrue(next_info is not None) + # index of tx and rx packets should be equal + self.assertEqual(packet_index, next_info.index) + # data field of next_info contains the tx packet + txed_packet = next_info.data + + self.logger.debug(ppp("Transmitted packet:", + txed_packet)) # ppp=Pretty Print Packet + + self.logger.debug(ppp("Received packet:", packet)) + + # compare rcvd packet with expected packet using compare_func + compare_func(txed_packet, packet) + + except: + print packet.command() + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + + # have all expected packets arrived? + for i in self.pg_interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) + self.assertTrue(remaining_packet is None, + "Interface %s: Packet expected from interface %s " + "didn't arrive" % (dst_if.name, i.name)) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py index 1daa2a9e..95de0be6 100644 --- a/test/vpp_papi_provider.py +++ b/test/vpp_papi_provider.py @@ -2108,3 +2108,121 @@ class VppPapiProvider(object): 'client_ip': client_ip, 'decap_vrf_id': decap_vrf_id, 'client_mac': client_mac}) + + def sr_localsid_add_del(self, + localsid_addr, + behavior, + nh_addr, + is_del=0, + end_psp=0, + sw_if_index=0xFFFFFFFF, + vlan_index=0, + fib_table=0, + ): + """ Add/del IPv6 SR local-SID. + + :param localsid_addr: + :param behavior: END=1; END.X=2; END.DX2=4; END.DX6=5; + :param behavior: END.DX4=6; END.DT6=7; END.DT4=8 + :param nh_addr: + :param is_del: (Default value = 0) + :param end_psp: (Default value = 0) + :param sw_if_index: (Default value = 0xFFFFFFFF) + :param vlan_index: (Default value = 0) + :param fib_table: (Default value = 0) + """ + return self.api( + self.papi.sr_localsid_add_del, + {'is_del': is_del, + 'localsid_addr': localsid_addr, + 'end_psp': end_psp, + 'behavior': behavior, + 'sw_if_index': sw_if_index, + 'vlan_index': vlan_index, + 'fib_table': fib_table, + 'nh_addr': nh_addr + } + ) + + def sr_policy_add( + self, + bsid_addr, + weight=1, + is_encap=1, + type=0, + fib_table=0, + n_segments=0, + segments=[]): + """ + :param bsid_addr: bindingSID of the SR Policy + :param weight: weight of the sid list. optional. (default: 1) + :param is_encap: (bool) whether SR policy should Encap or SRH insert \ + (default: Encap) + :param type: type/behavior of the SR policy. (default or spray) \ + (default: default) + :param fib_table: VRF where to install the FIB entry for the BSID \ + (default: 0) + :param n_segments: number of segments \ + (default: 0) + :param segments: a vector of IPv6 address composing the segment list \ + (default: []) + """ + return self.api( + self.papi.sr_policy_add, + {'bsid_addr': bsid_addr, + 'weight': weight, + 'is_encap': is_encap, + 'type': type, + 'fib_table': fib_table, + 'n_segments': n_segments, + 'segments': segments + } + ) + + def sr_policy_del( + self, + bsid_addr, + sr_policy_index=0): + """ + :param bsid: bindingSID of the SR Policy + :param sr_policy_index: index of the sr policy (default: 0) + """ + return self.api( + self.papi.sr_policy_del, + {'bsid_addr': bsid_addr, + 'sr_policy_index': sr_policy_index + }) + + def sr_steering_add_del( + self, + is_del, + bsid_addr, + sr_policy_index, + table_id, + prefix_addr, + mask_width, + sw_if_index, + traffic_type): + """ + Steer traffic L2 and L3 traffic through a given SR policy + + :param is_del: delete or add + :param bsid_addr: bindingSID of the SR Policy (alt to sr_policy_index) + :param sr_policy: is the index of the SR Policy (alt to bsid) + :param table_id: is the VRF where to install the FIB entry for the BSID + :param prefix_addr: is the IPv4/v6 address for L3 traffic type + :param mask_width: is the mask for L3 traffic type + :param sw_if_index: is the incoming interface for L2 traffic + :param traffic_type: type of traffic (IPv4: 4, IPv6: 6, L2: 2) + """ + return self.api( + self.papi.sr_steering_add_del, + {'is_del': is_del, + 'bsid_addr': bsid_addr, + 'sr_policy_index': sr_policy_index, + 'table_id': table_id, + 'prefix_addr': prefix_addr, + 'mask_width': mask_width, + 'sw_if_index': sw_if_index, + 'traffic_type': traffic_type + }) diff --git a/test/vpp_srv6.py b/test/vpp_srv6.py new file mode 100644 index 00000000..28ff4b85 --- /dev/null +++ b/test/vpp_srv6.py @@ -0,0 +1,238 @@ +""" + SRv6 LocalSIDs + + object abstractions for representing SRv6 localSIDs in VPP +""" + +from vpp_object import * +from socket import inet_pton, inet_ntop, AF_INET, AF_INET6 + + +class SRv6LocalSIDBehaviors(): + # from src/vnet/srv6/sr.h + SR_BEHAVIOR_END = 1 + SR_BEHAVIOR_X = 2 + SR_BEHAVIOR_T = 3 + SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D + SR_BEHAVIOR_DX2 = 5 + SR_BEHAVIOR_DX6 = 6 + SR_BEHAVIOR_DX4 = 7 + SR_BEHAVIOR_DT6 = 8 + SR_BEHAVIOR_DT4 = 9 + SR_BEHAVIOR_LAST = 10 # Must always be the last one + + +class SRv6PolicyType(): + # from src/vnet/srv6/sr.h + SR_POLICY_TYPE_DEFAULT = 0 + SR_POLICY_TYPE_SPRAY = 1 + + +class SRv6PolicySteeringTypes(): + # from src/vnet/srv6/sr.h + SR_STEER_L2 = 2 + SR_STEER_IPV4 = 4 + SR_STEER_IPV6 = 6 + + +class VppSRv6LocalSID(VppObject): + """ + SRv6 LocalSID + """ + + def __init__(self, test, localsid_addr, behavior, nh_addr, end_psp, + sw_if_index, vlan_index, fib_table): + self._test = test + self.localsid_addr = localsid_addr + # keep binary format in _localsid_addr + self._localsid_addr = inet_pton(AF_INET6, self.localsid_addr) + self.behavior = behavior + self.nh_addr = nh_addr + # keep binary format in _nh_addr + if ':' in nh_addr: + # IPv6 + self._nh_addr = inet_pton(AF_INET6, nh_addr) + else: + # IPv4 + # API expects 16 octets (128 bits) + # last 4 octets are used for IPv4 + # --> prepend 12 octets + self._nh_addr = ('\x00' * 12) + inet_pton(AF_INET, nh_addr) + self.end_psp = end_psp + self.sw_if_index = sw_if_index + self.vlan_index = vlan_index + self.fib_table = fib_table + self._configured = False + + def add_vpp_config(self): + self._test.vapi.sr_localsid_add_del( + self._localsid_addr, + self.behavior, + self._nh_addr, + is_del=0, + end_psp=self.end_psp, + sw_if_index=self.sw_if_index, + vlan_index=self.vlan_index, + fib_table=self.fib_table) + self._configured = True + + def remove_vpp_config(self): + self._test.vapi.sr_localsid_add_del( + self._localsid_addr, + self.behavior, + self._nh_addr, + is_del=1, + end_psp=self.end_psp, + sw_if_index=self.sw_if_index, + vlan_index=self.vlan_index, + fib_table=self.fib_table) + self._configured = False + + def query_vpp_config(self): + # sr_localsids_dump API is disabled + # use _configured flag for now + return self._configured + + def __str__(self): + return self.object_id() + + def object_id(self): + return ("%d;%s,%d" + % (self.fib_table, + self.localsid_addr, + self.behavior)) + + +class VppSRv6Policy(VppObject): + """ + SRv6 Policy + """ + + def __init__(self, test, bsid, + is_encap, sr_type, weight, fib_table, + segments, source): + self._test = test + self.bsid = bsid + # keep binary format in _bsid + self._bsid = inet_pton(AF_INET6, bsid) + self.is_encap = is_encap + self.sr_type = sr_type + self.weight = weight + self.fib_table = fib_table + self.segments = segments + # keep binary format in _segments + self._segments = [] + for seg in segments: + self._segments.extend(inet_pton(AF_INET6, seg)) + self.n_segments = len(segments) + # source not passed to API + # self.source = inet_pton(AF_INET6, source) + self.source = source + self._configured = False + + def add_vpp_config(self): + self._test.vapi.sr_policy_add( + self._bsid, + self.weight, + self.is_encap, + self.sr_type, + self.fib_table, + self.n_segments, + self._segments) + self._configured = True + + def remove_vpp_config(self): + self._test.vapi.sr_policy_del( + self._bsid) + self._configured = False + + def query_vpp_config(self): + # no API to query SR Policies + # use _configured flag for now + return self._configured + + def __str__(self): + return self.object_id() + + def object_id(self): + return ("%d;%s-><%s>;%d" + % (self.sr_type, + self.bsid, + ','.join(self.segments), + self.is_encap)) + + +class VppSRv6Steering(VppObject): + """ + SRv6 Steering + """ + + def __init__(self, test, + bsid, + prefix, + mask_width, + traffic_type, + sr_policy_index, + table_id, + sw_if_index): + self._test = test + self.bsid = bsid + # keep binary format in _bsid + self._bsid = inet_pton(AF_INET6, bsid) + self.prefix = prefix + # keep binary format in _prefix + if ':' in prefix: + # IPv6 + self._prefix = inet_pton(AF_INET6, prefix) + else: + # IPv4 + # API expects 16 octets (128 bits) + # last 4 octets are used for IPv4 + # --> prepend 12 octets + self._prefix = ('\x00' * 12) + inet_pton(AF_INET, prefix) + self.mask_width = mask_width + self.traffic_type = traffic_type + self.sr_policy_index = sr_policy_index + self.sw_if_index = sw_if_index + self.table_id = table_id + self._configured = False + + def add_vpp_config(self): + self._test.vapi.sr_steering_add_del( + 0, + self._bsid, + self.sr_policy_index, + self.table_id, + self._prefix, + self.mask_width, + self.sw_if_index, + self.traffic_type) + self._configured = True + + def remove_vpp_config(self): + self._test.vapi.sr_steering_add_del( + 1, + self._bsid, + self.sr_policy_index, + self.table_id, + self._prefix, + self.mask_width, + self.sw_if_index, + self.traffic_type) + self._configured = False + + def query_vpp_config(self): + # no API to query steering entries + # use _configured flag for now + return self._configured + + def __str__(self): + return self.object_id() + + def object_id(self): + return ("%d;%d;%s/%d->%s" + % (self.table_id, + self.traffic_type, + self.prefix, + self.mask_width, + self.bsid)) |