1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
"""
SRv6 LocalSIDs
object abstractions for representing SRv6 localSIDs in VPP
"""
from vpp_object import VppObject
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
class SRv6LocalSIDBehaviors:
# from src/vnet/srv6/sr.h
SR_BEHAVIOR_END = 1
SR_BEHAVIOR_X = 2
SR_BEHAVIOR_T = 3
SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
SR_BEHAVIOR_DX2 = 5
SR_BEHAVIOR_DX6 = 6
SR_BEHAVIOR_DX4 = 7
SR_BEHAVIOR_DT6 = 8
SR_BEHAVIOR_DT4 = 9
SR_BEHAVIOR_END_UN_PERF = 10
SR_BEHAVIOR_END_UN = 11
SR_BEHAVIOR_LAST = 12 # Must always be the last one
class SRv6PolicyType:
# from src/vnet/srv6/sr.h
SR_POLICY_TYPE_DEFAULT = 0
SR_POLICY_TYPE_SPRAY = 1
class SRv6PolicySteeringTypes:
# from src/vnet/srv6/sr.h
SR_STEER_L2 = 2
SR_STEER_IPV4 = 4
SR_STEER_IPV6 = 6
class VppSRv6LocalSID(VppObject):
"""
SRv6 LocalSID
"""
def __init__(
self,
test,
localsid,
behavior,
nh_addr,
end_psp,
sw_if_index,
vlan_index,
fib_table,
):
self._test = test
self.localsid = localsid
self.behavior = behavior
self.nh_addr = nh_addr
self.end_psp = end_psp
self.sw_if_index = sw_if_index
self.vlan_index = vlan_index
self.fib_table = fib_table
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_localsid_add_del(
localsid=self.localsid,
behavior=self.behavior,
nh_addr=self.nh_addr,
is_del=0,
end_psp=self.end_psp,
sw_if_index=self.sw_if_index,
vlan_index=self.vlan_index,
fib_table=self.fib_table,
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_localsid_add_del(
localsid=self.localsid,
behavior=self.behavior,
nh_addr=self.nh_addr,
is_del=1,
end_psp=self.end_psp,
sw_if_index=self.sw_if_index,
vlan_index=self.vlan_index,
fib_table=self.fib_table,
)
self._configured = False
def query_vpp_config(self):
# sr_localsids_dump API is disabled
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%s,%d" % (self.fib_table, self.localsid, self.behavior)
class VppSRv6Policy(VppObject):
"""
SRv6 Policy
"""
def __init__(
self, test, bsid, is_encap, sr_type, weight, fib_table, segments, source
):
self._test = test
self.bsid = bsid
self.is_encap = is_encap
self.sr_type = sr_type
self.weight = weight
self.fib_table = fib_table
self.segments = segments
self.n_segments = len(segments)
# source not passed to API
# self.source = inet_pton(AF_INET6, source)
self.source = source
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_policy_add(
bsid=self.bsid,
weight=self.weight,
is_encap=self.is_encap,
is_spray=self.sr_type,
fib_table=self.fib_table,
sids={"num_sids": self.n_segments, "sids": self.segments},
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_policy_del(self.bsid)
self._configured = False
def query_vpp_config(self):
# no API to query SR Policies
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%s-><%s>;%d" % (
self.sr_type,
self.bsid,
",".join(self.segments),
self.is_encap,
)
class VppSRv6Steering(VppObject):
"""
SRv6 Steering
"""
def __init__(
self,
test,
bsid,
prefix,
mask_width,
traffic_type,
sr_policy_index,
table_id,
sw_if_index,
):
self._test = test
self.bsid = bsid
self.prefix = prefix
self.mask_width = mask_width
self.traffic_type = traffic_type
self.sr_policy_index = sr_policy_index
self.sw_if_index = sw_if_index
self.table_id = table_id
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_steering_add_del(
is_del=0,
bsid=self.bsid,
sr_policy_index=self.sr_policy_index,
table_id=self.table_id,
prefix={"address": self.prefix, "len": self.mask_width},
sw_if_index=self.sw_if_index,
traffic_type=self.traffic_type,
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_steering_add_del(
is_del=1,
bsid=self.bsid,
sr_policy_index=self.sr_policy_index,
table_id=self.table_id,
prefix={"address": self.prefix, "len": self.mask_width},
sw_if_index=self.sw_if_index,
traffic_type=self.traffic_type,
)
self._configured = False
def query_vpp_config(self):
# no API to query steering entries
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%d;%s/%d->%s" % (
self.table_id,
self.traffic_type,
self.prefix,
self.mask_width,
self.bsid,
)
|