1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
from vpp_object import VppObject
class VppDHCPProxy(VppObject):
def __init__(
self,
test,
dhcp_server,
dhcp_src_address,
rx_vrf_id=0,
server_vrf_id=0,
):
self._test = test
self._rx_vrf_id = rx_vrf_id
self._server_vrf_id = server_vrf_id
self._dhcp_server = dhcp_server
self._dhcp_src_address = dhcp_src_address
def set_proxy(self, dhcp_server, dhcp_src_address, rx_vrf_id=0, server_vrf_id=0):
if self.query_vpp_config():
raise Exception("Vpp config present")
self._rx_vrf_id = rx_vrf_id
self._server_vrf_id = server_vrf_id
self._dhcp_server = dhcp_server
self._dhcp_src_address = dhcp_src_address
def add_vpp_config(self):
self._test.vapi.dhcp_proxy_config(
is_add=1,
rx_vrf_id=self._rx_vrf_id,
server_vrf_id=self._server_vrf_id,
dhcp_server=self._dhcp_server,
dhcp_src_address=self._dhcp_src_address,
)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.dhcp_proxy_config(
rx_vrf_id=self._rx_vrf_id,
server_vrf_id=self._server_vrf_id,
dhcp_server=self._dhcp_server,
dhcp_src_address=self._dhcp_src_address,
is_add=0,
)
def get_vpp_dump(self):
dump = self._test.vapi.dhcp_proxy_dump()
for entry in dump:
if entry.rx_vrf_id == self._rx_vrf_id:
return entry
def query_vpp_config(self):
dump = self.get_vpp_dump()
return True if dump else False
def object_id(self):
return "dhcp-proxy-%d" % self._rx_vrf_id
class VppDHCPClient(VppObject):
def __init__(
self,
test,
sw_if_index,
hostname,
id=None,
want_dhcp_event=False,
set_broadcast_flag=True,
dscp=None,
pid=None,
):
self._test = test
self._sw_if_index = sw_if_index
self._hostname = hostname
self._id = id
self._want_dhcp_event = want_dhcp_event
self._set_broadcast_flag = set_broadcast_flag
self._dscp = dscp
self._pid = pid
def set_client(
self,
sw_if_index,
hostname,
id=None,
want_dhcp_event=False,
set_broadcast_flag=True,
dscp=None,
pid=None,
):
if self.query_vpp_config():
raise Exception("Vpp config present")
self._sw_if_index = sw_if_index
self._hostname = hostname
self._id = id
self._want_dhcp_event = want_dhcp_event
self._set_broadcast_flag = set_broadcast_flag
self._dscp = dscp
self._pid = pid
def add_vpp_config(self):
id = self._id.encode("ascii") if self._id else None
client = {
"sw_if_index": self._sw_if_index,
"hostname": self._hostname,
"id": id,
"want_dhcp_event": self._want_dhcp_event,
"set_broadcast_flag": self._set_broadcast_flag,
"dscp": self._dscp,
"pid": self._pid,
}
self._test.vapi.dhcp_client_config(is_add=1, client=client)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
client = client = {"sw_if_index": self._sw_if_index, "hostname": self._hostname}
self._test.vapi.dhcp_client_config(client=client, is_add=0)
def get_vpp_dump(self):
dump = self._test.vapi.dhcp_client_dump()
for entry in dump:
if entry.client.sw_if_index == self._sw_if_index:
return entry
def query_vpp_config(self):
dump = self.get_vpp_dump()
return True if dump else False
def object_id(self):
return "dhcp-client-%s/%d" % (self._hostname, self._sw_if_index)
|