1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
import time
# a generic abstract class for implementing RX services using the server
class RXServiceAPI(object):
# specify for which layer this service is
LAYER_MODE_ANY = 0
LAYER_MODE_L2 = 1
LAYER_MODE_L3 = 2
def __init__ (self, port, layer_mode = LAYER_MODE_ANY, queue_size = 100):
self.port = port
self.queue_size = queue_size
self.layer_mode = layer_mode
################### virtual methods ######################
def get_name (self):
"""
returns the name of the service
:returns:
str
"""
raise NotImplementedError()
def pre_execute (self):
"""
specific class code called before executing
:returns:
RC object
"""
raise NotImplementedError()
def generate_request (self):
"""
generate a request to be sent to the server
:returns:
list of streams
"""
raise NotImplementedError()
def on_pkt_rx (self, pkt, start_ts):
"""
called for each packet arriving on RX
:parameters:
'pkt' - the packet received
'start_ts' - the time recorded when 'start' was called
:returns:
None for fetching more packets
RC object for terminating
"""
raise NotImplementedError()
def on_timeout_err (self, retries):
"""
called when a timeout occurs
:parameters:
retries - how many times was the service retring before failing
:returns:
RC object
"""
raise NotImplementedError()
##################### API ######################
def execute (self, retries = 0):
# sanity check
rc = self.__sanity()
if not rc:
return rc
# first cleanup
rc = self.port.remove_all_streams()
if not rc:
return rc
# start the iteration
try:
# add the stream(s)
self.port.add_streams(self.generate_request())
rc = self.port.set_rx_queue(size = self.queue_size)
if not rc:
return rc
return self.__execute_internal(retries)
finally:
# best effort restore
self.port.remove_rx_queue()
self.port.remove_all_streams()
##################### Internal ######################
def __sanity (self):
if not self.port.is_service_mode_on():
return self.port.err('port service mode must be enabled for performing {0}. Please enable service mode'.format(self.get_name()))
if self.layer_mode == RXServiceAPI.LAYER_MODE_L2:
if not self.port.is_l2_mode():
return self.port.err('{0} - requires L2 mode configuration'.format(self.get_name()))
elif self.layer_mode == RXServiceAPI.LAYER_MODE_L3:
if not self.port.is_l3_mode():
return self.port.err('{0} - requires L3 mode configuration'.format(self.get_name()))
# sanity
if self.port.is_active():
return self.port.err('{0} - port is active, please stop the port before executing command'.format(self.get_name()))
# call the specific class implementation
rc = self.pre_execute()
if not rc:
return rc
return True
# main resolve function
def __execute_internal (self, retries):
# retry for 'retries'
index = 0
while True:
rc = self.execute_iteration()
if rc is not None:
return rc
if index >= retries:
return self.on_timeout_err(retries)
index += 1
time.sleep(0.1)
def execute_iteration (self):
mult = {'op': 'abs', 'type' : 'percentage', 'value': 100}
rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff)
if not rc:
return rc
# save the start timestamp
self.start_ts = rc.data()['ts']
# block until traffic finishes
while self.port.is_active():
time.sleep(0.01)
return self.wait_for_rx_response()
def wait_for_rx_response (self):
# we try to fetch response for 5 times
polling = 5
while polling > 0:
# fetch the queue
rx_pkts = self.port.get_rx_queue_pkts()
# might be an error
if not rx_pkts:
return rx_pkts
# for each packet - examine it
for pkt in rx_pkts.data():
rc = self.on_pkt_rx(pkt, self.start_ts)
if rc is not None:
return rc
if polling == 0:
return None
polling -= 1
time.sleep(0.1)
|