summaryrefslogtreecommitdiffstats
path: root/test/vpp_srv6.py
blob: 7701cf35bc34f0f141623617dead97a3e2db3faf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
  SRv6 LocalSIDs

  object abstractions for representing SRv6 localSIDs in VPP
"""

from vpp_object import *
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6


class SRv6LocalSIDBehaviors():
    # from src/vnet/srv6/sr.h
    SR_BEHAVIOR_END = 1
    SR_BEHAVIOR_X = 2
    SR_BEHAVIOR_T = 3
    SR_BEHAVIOR_D_FIRST = 4   # Unused. Separator in between regular and D
    SR_BEHAVIOR_DX2 = 5
    SR_BEHAVIOR_DX6 = 6
    SR_BEHAVIOR_DX4 = 7
    SR_BEHAVIOR_DT6 = 8
    SR_BEHAVIOR_DT4 = 9
    SR_BEHAVIOR_LAST = 10      # Must always be the last one


class SRv6PolicyType():
    # from src/vnet/srv6/sr.h
    SR_POLICY_TYPE_DEFAULT = 0
    SR_POLICY_TYPE_SPRAY = 1


class SRv6PolicySteeringTypes():
    # from src/vnet/srv6/sr.h
    SR_STEER_L2 = 2
    SR_STEER_IPV4 = 4
    SR_STEER_IPV6 = 6


class VppSRv6LocalSID(VppObject):
    """
    SRv6 LocalSID
    """

    def __init__(self, test, localsid, behavior, nh_addr4, nh_addr6,
                 end_psp, sw_if_index, vlan_index, fib_table):
        self._test = test
        self.localsid = localsid
        # keep binary format in _localsid
        self.localsid["addr"] = inet_pton(AF_INET6, self.localsid["addr"])
        self.behavior = behavior
        self.nh_addr4 = inet_pton(AF_INET, nh_addr4)
        self.nh_addr6 = inet_pton(AF_INET6, nh_addr6)
        self.end_psp = end_psp
        self.sw_if_index = sw_if_index
        self.vlan_index = vlan_index
        self.fib_table = fib_table
        self._configured = False

    def add_vpp_config(self):
        self._test.vapi.sr_localsid_add_del(
            self.localsid,
            self.behavior,
            self.nh_addr4,
            self.nh_addr6,
            is_del=0,
            end_psp=self.end_psp,
            sw_if_index=self.sw_if_index,
            vlan_index=self.vlan_index,
            fib_table=self.fib_table)
        self._configured = True

    def remove_vpp_config(self):
        self._test.vapi.sr_localsid_add_del(
            self.localsid,
            self.behavior,
            self.nh_addr4,
            self.nh_addr6,
            is_del=1,
            end_psp=self.end_psp,
            sw_if_index=self.sw_if_index,
            vlan_index=self.vlan_index,
            fib_table=self.fib_table)
        self._configured = False

    def query_vpp_config(self):
        # sr_localsids_dump API is disabled
        # use _configured flag for now
        return self._configured

    def __str__(self):
        return self.object_id()

    def object_id(self):
        return ("%d;%s,%d"
                % (self.fib_table,
                   self.localsid,
                   self.behavior))


class VppSRv6Policy(VppObject):
    """
    SRv6 Policy
    """

    def __init__(self, test, bsid,
                 is_encap, sr_type, weight, fib_table,
                 segments, source):
        self._test = test
        self.bsid = bsid
        # keep binary format in _bsid
        self._bsid = inet_pton(AF_INET6, bsid)
        self.is_encap = is_encap
        self.sr_type = sr_type
        self.weight = weight
        self.fib_table = fib_table
        self.segments = segments
        # keep binary format in _segments
        self._segments = []
        for seg in segments:
            self._segments.extend(inet_pton(AF_INET6, seg))
        self.n_segments = len(segments)
        # source not passed to API
        # self.source = inet_pton(AF_INET6, source)
        self.source = source
        self._configured = False

    def add_vpp_config(self):
        self._test.vapi.sr_policy_add(
                     self._bsid,
                     self.weight,
                     self.is_encap,
                     self.sr_type,
                     self.fib_table,
                     self.n_segments,
                     self._segments)
        self._configured = True

    def remove_vpp_config(self):
        self._test.vapi.sr_policy_del(
                     self._bsid)
        self._configured = False

    def query_vpp_config(self):
        # no API to query SR Policies
        # use _configured flag for now
        return self._configured

    def __str__(self):
        return self.object_id()

    def object_id(self):
        return ("%d;%s-><%s>;%d"
                % (self.sr_type,
                   self.bsid,
                   ','.join(self.segments),
                   self.is_encap))


class VppSRv6Steering(VppObject):
    """
    SRv6 Steering
    """

    def __init__(self, test,
                 bsid,
                 prefix,
                 mask_width,
                 traffic_type,
                 sr_policy_index,
                 table_id,
                 sw_if_index):
        self._test = test
        self.bsid = bsid
        # keep binary format in _bsid
        self._bsid = inet_pton(AF_INET6, bsid)
        self.prefix = prefix
        # keep binary format in _prefix
        if ':' in prefix:
            # IPv6
            self._prefix = inet_pton(AF_INET6, prefix)
        else:
            # IPv4
            # API expects 16 octets (128 bits)
            # last 4 octets are used for IPv4
            # --> prepend 12 octets
            self._prefix = ('\x00' * 12) + inet_pton(AF_INET, prefix)
        self.mask_width = mask_width
        self.traffic_type = traffic_type
        self.sr_policy_index = sr_policy_index
        self.sw_if_index = sw_if_index
        self.table_id = table_id
        self._configured = False

    def add_vpp_config(self):
        self._test.vapi.sr_steering_add_del(
                     0,
                     self._bsid,
                     self.sr_policy_index,
                     self.table_id,
                     self._prefix,
                     self.mask_width,
                     self.sw_if_index,
                     self.traffic_type)
        self._configured = True

    def remove_vpp_config(self):
        self._test.vapi.sr_steering_add_del(
                     1,
                     self._bsid,
                     self.sr_policy_index,
                     self.table_id,
                     self._prefix,
                     self.mask_width,
                     self.sw_if_index,
                     self.traffic_type)
        self._configured = False

    def query_vpp_config(self):
        # no API to query steering entries
        # use _configured flag for now
        return self._configured

    def __str__(self):
        return self.object_id()

    def object_id(self):
        return ("%d;%d;%s/%d->%s"
                % (self.table_id,
                   self.traffic_type,
                   self.prefix,
                   self.mask_width,
                   self.bsid))
ss="bp">self, data, offset=0, result=None, ntc=False): length, length_field_size = self.length_field_packer.unpack(data, offset) if length == 0: return b'', 0 p = BaseTypes('u8', length) x, size = p.unpack(data, offset + length_field_size) x2 = x.split(b'\0', 1)[0] return (x2.decode('utf8'), size + length_field_size) types = {'u8': BaseTypes('u8'), 'u16': BaseTypes('u16'), 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'), 'u64': BaseTypes('u64'), 'f64': BaseTypes('f64'), 'bool': BaseTypes('bool'), 'string': String} def vpp_get_type(name): try: return types[name] except KeyError: return None class VPPSerializerValueError(ValueError): pass class FixedList_u8(object): def __init__(self, name, field_type, num): self.name = name self.num = num self.packer = BaseTypes(field_type, num) self.size = self.packer.size self.field_type = field_type def __call__(self, args): self.options = args return self def pack(self, data, kwargs=None): """Packs a fixed length bytestring. Left-pads with zeros if input data is too short.""" if not data: return b'\x00' * self.size if len(data) > self.num: raise VPPSerializerValueError( 'Fixed list length error for "{}", got: {}' ' expected: {}' .format(self.name, len(data), self.num)) return self.packer.pack(data) def unpack(self, data, offset=0, result=None, ntc=False): if len(data[offset:]) < self.num: raise VPPSerializerValueError( 'Invalid array length for "{}" got {}' ' expected {}' .format(self.name, len(data[offset:]), self.num)) if self.field_type == 'string': s = self.packer.unpack(data, offset) s2 = s[0].split(b'\0', 1)[0] return (s2.decode('utf-8'), self.num) return self.packer.unpack(data, offset) class FixedList(object): def __init__(self, name, field_type, num): self.num = num self.packer = types[field_type] self.size = self.packer.size * num self.name = name self.field_type = field_type def __call__(self, args): self.options = args return self def pack(self, list, kwargs): if len(list) != self.num: raise VPPSerializerValueError( 'Fixed list length error, got: {} expected: {}' .format(len(list), self.num)) b = bytes() for e in list: b += self.packer.pack(e) return b def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments result = [] total = 0 for e in range(self.num): x, size = self.packer.unpack(data, offset, ntc=ntc) result.append(x) offset += size total += size return result, total class VLAList(object): def __init__(self, name, field_type, len_field_name, index): self.name = name self.field_type = field_type self.index = index self.packer = types[field_type] self.size = self.packer.size self.length_field = len_field_name def __call__(self, args): self.options = args return self def pack(self, list, kwargs=None): if not list: return b"" if len(list) != kwargs[self.length_field]: raise VPPSerializerValueError( 'Variable length error, got: {} expected: {}' .format(len(list), kwargs[self.length_field])) b = bytes() # u8 array if self.packer.size == 1: return bytearray(list) for e in list: b += self.packer.pack(e) return b def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments total = 0 # u8 array if self.packer.size == 1: if result[self.index] == 0: return b'', 0 p = BaseTypes('u8', result[self.index]) return p.unpack(data, offset, ntc=ntc) r = [] for e in range(result[self.index]): x, size = self.packer.unpack(data, offset, ntc=ntc) r.append(x) offset += size total += size return r, total class VLAList_legacy(): def __init__(self, name, field_type): self.packer = types[field_type] self.size = self.packer.size def __call__(self, args): self.options = args return self def pack(self, list, kwargs=None): if self.packer.size == 1: return bytes(list) b = bytes() for e in list: b += self.packer.pack(e) return b def unpack(self, data, offset=0, result=None, ntc=False): total = 0 # Return a list of arguments if (len(data) - offset) % self.packer.size: raise VPPSerializerValueError( 'Legacy Variable Length Array length mismatch.') elements = int((len(data) - offset) / self.packer.size) r = [] for e in range(elements): x, size = self.packer.unpack(data, offset, ntc=ntc) r.append(x) offset += self.packer.size total += size return r, total class VPPEnumType(object): def __init__(self, name, msgdef): self.size = types['u32'].size self.enumtype = 'u32' e_hash = {} for f in msgdef: if type(f) is dict and 'enumtype' in f: if f['enumtype'] != 'u32': self.size = types[f['enumtype']].size self.enumtype = f['enumtype'] continue ename, evalue = f e_hash[ename] = evalue self.enum = IntFlag(name, e_hash) types[name] = self def __call__(self, args): self.options = args return self def __getattr__(self, name): return self.enum[name] def __nonzero__(self): return True def pack(self, data, kwargs=None): return types[self.enumtype].pack(data) def unpack(self, data, offset=0, result=None, ntc=False): x, size = types[self.enumtype].unpack(data, offset) return self.enum(x), size class VPPUnionType(object): def __init__(self, name, msgdef): self.name = name self.size = 0 self.maxindex = 0 fields = [] self.packers = collections.OrderedDict() for i, f in enumerate(msgdef): if type(f) is dict and 'crc' in f: self.crc = f['crc'] continue f_type, f_name = f if f_type not in types: logger.debug('Unknown union type {}'.format(f_type)) raise VPPSerializerValueError( 'Unknown message type {}'.format(f_type)) fields.append(f_name) size = types[f_type].size self.packers[f_name] = types[f_type] if size > self.size: self.size = size self.maxindex = i types[name] = self self.tuple = collections.namedtuple(name, fields, rename=True) def __call__(self, args): self.options = args return self # Union of variable length? def pack(self, data, kwargs=None): if not data: return b'\x00' * self.size for k, v in data.items(): logger.debug("Key: {} Value: {}".format(k, v)) b = self.packers[k].pack(v, kwargs) break r = bytearray(self.size) r[:len(b)] = b return r def unpack(self, data, offset=0, result=None, ntc=False): r = [] maxsize = 0 for k, p in self.packers.items(): x, size = p.unpack(data, offset, ntc=ntc) if size > maxsize: maxsize = size r.append(x) return self.tuple._make(r), maxsize class VPPTypeAlias(object): def __init__(self, name, msgdef): self.name = name t = vpp_get_type(msgdef['type']) if not t: raise ValueError() if 'length' in msgdef: if msgdef['length'] == 0: raise ValueError() if msgdef['type'] == 'u8': self.packer = FixedList_u8(name, msgdef['type'], msgdef['length']) self.size = self.packer.size else: self.packer = FixedList(name, msgdef['type'], msgdef['length']) else: self.packer = t self.size = t.size types[name] = self def __call__(self, args): self.options = args return self def pack(self, data, kwargs=None): if data and conversion_required(data, self.name): try: return conversion_packer(data, self.name) # Python 2 and 3 raises different exceptions from inet_pton except(OSError, socket.error, TypeError): pass return self.packer.pack(data, kwargs) def unpack(self, data, offset=0, result=None, ntc=False): t, size = self.packer.unpack(data, offset, result, ntc=ntc) if not ntc: return conversion_unpacker(t, self.name), size return t, size class VPPType(object): # Set everything up to be able to pack / unpack def __init__(self, name, msgdef): self.name = name self.msgdef = msgdef self.packers = [] self.fields = [] self.fieldtypes = [] self.field_by_name = {} size = 0 for i, f in enumerate(msgdef): if type(f) is dict and 'crc' in f: self.crc = f['crc'] continue f_type, f_name = f[:2] self.fields.append(f_name) self.field_by_name[f_name] = None self.fieldtypes.append(f_type) if f_type not in types: logger.debug('Unknown type {}'.format(f_type)) raise VPPSerializerValueError( 'Unknown message type {}'.format(f_type)) fieldlen = len(f) options = [x for x in f if type(x) is dict] if len(options): self.options = options[0] fieldlen -= 1 else: self.options = {} if fieldlen == 3: # list list_elements = f[2] if list_elements == 0: p = VLAList_legacy(f_name, f_type) self.packers.append(p) elif f_type == 'u8' or f_type == 'string': p = FixedList_u8(f_name, f_type, list_elements) self.packers.append(p) size += p.size else: p = FixedList(f_name, f_type, list_elements) self.packers.append(p) size += p.size elif fieldlen == 4: # Variable length list length_index = self.fields.index(f[3]) p = VLAList(f_name, f_type, f[3], length_index) self.packers.append(p) else: p = types[f_type](self.options) self.packers.append(p) size += p.size self.size = size self.tuple = collections.namedtuple(name, self.fields, rename=True) types[name] = self def __call__(self, args): self.options = args return self def pack(self, data, kwargs=None): if not kwargs: kwargs = data b = bytes() # Try one of the format functions if data and conversion_required(data, self.name): return conversion_packer(data, self.name) for i, a in enumerate(self.fields): if data and type(data) is not dict and a not in data: raise VPPSerializerValueError( "Invalid argument: {} expected {}.{}". format(data, self.name, a)) # Defaulting to zero. if not data or a not in data: # Default to 0 arg = None kwarg = None # No default for VLA else: arg = data[a] kwarg = kwargs[a] if a in kwargs else None if isinstance(self.packers[i], VPPType): b += self.packers[i].pack(arg, kwarg) else: b += self.packers[i].pack(arg, kwargs) return b def unpack(self, data, offset=0, result=None, ntc=False): # Return a list of arguments result = [] total = 0 for p in self.packers: x, size = p.unpack(data, offset, result, ntc) if type(x) is tuple and len(x) == 1: x = x[0] result.append(x) offset += size total += size t = self.tuple._make(result) if not ntc: t = conversion_unpacker(t, self.name) return t, total class VPPMessage(VPPType): pass