1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
"""
IP Routes
object abstractions for representing IP routes in VPP
"""
import socket
# from vnet/vnet/mpls/mpls_types.h
MPLS_IETF_MAX_LABEL = 0xfffff
MPLS_LABEL_INVALID = MPLS_IETF_MAX_LABEL + 1
class RoutePath:
def __init__(
self,
nh_addr,
nh_sw_if_index,
nh_table_id=0,
labels=[],
nh_via_label=MPLS_LABEL_INVALID,
is_ip6=0):
self.nh_itf = nh_sw_if_index
self.nh_table_id = nh_table_id
self.nh_via_label = nh_via_label
self.nh_labels = labels
if is_ip6:
self.nh_addr = socket.inet_pton(socket.AF_INET6, nh_addr)
else:
self.nh_addr = socket.inet_pton(socket.AF_INET, nh_addr)
class IpRoute:
"""
IP Route
"""
def __init__(self, test, dest_addr,
dest_addr_len, paths, table_id=0, is_ip6=0, is_local=0):
self._test = test
self.paths = paths
self.dest_addr_len = dest_addr_len
self.table_id = table_id
self.is_ip6 = is_ip6
self.is_local = is_local
if is_ip6:
self.dest_addr = socket.inet_pton(socket.AF_INET6, dest_addr)
else:
self.dest_addr = socket.inet_pton(socket.AF_INET, dest_addr)
def add_vpp_config(self):
if self.is_local:
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
socket.inet_pton(socket.AF_INET6, "::"),
0xffffffff,
is_local=1,
table_id=self.table_id,
is_ipv6=self.is_ip6)
else:
for path in self.paths:
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label,
is_ipv6=self.is_ip6)
def remove_vpp_config(self):
if self.is_local:
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
socket.inet_pton(socket.AF_INET6, "::"),
0xffffffff,
is_local=1,
is_add=0,
table_id=self.table_id,
is_ipv6=self.is_ip6)
else:
for path in self.paths:
self._test.vapi.ip_add_del_route(self.dest_addr,
self.dest_addr_len,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
is_add=0)
class MplsIpBind:
"""
MPLS to IP Binding
"""
def __init__(self, test, local_label, dest_addr, dest_addr_len):
self._test = test
self.dest_addr = socket.inet_pton(socket.AF_INET, dest_addr)
self.dest_addr_len = dest_addr_len
self.local_label = local_label
def add_vpp_config(self):
self._test.vapi.mpls_ip_bind_unbind(self.local_label,
self.dest_addr,
self.dest_addr_len)
def remove_vpp_config(self):
self._test.vapi.mpls_ip_bind_unbind(self.local_label,
self.dest_addr,
self.dest_addr_len,
is_bind=0)
class MplsRoute:
"""
MPLS Route
"""
def __init__(self, test, local_label, eos_bit, paths, table_id=0):
self._test = test
self.paths = paths
self.local_label = local_label
self.eos_bit = eos_bit
self.table_id = table_id
def add_vpp_config(self):
for path in self.paths:
self._test.vapi.mpls_route_add_del(
self.local_label,
self.eos_bit,
1,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label,
next_hop_table_id=path.nh_table_id)
def remove_vpp_config(self):
for path in self.paths:
self._test.vapi.mpls_route_add_del(self.local_label,
self.eos_bit,
1,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
is_add=0)
|