1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
#!/router/bin/python
from control_plane_general_test import CControlPlaneGeneral_Test
from Client.trex_client import CTRexClient
import socket
from nose.tools import assert_raises, assert_equal, assert_not_equal
from common.trex_status_e import TRexStatus
from common.trex_exceptions import *
from enum import Enum
import time
class CTRexStartStop_Test(CControlPlaneGeneral_Test):
def __init__(self):
super(CTRexStartStop_Test, self).__init__()
self.valid_start_params = dict( c = 4,
m = 1.1,
d = 100,
f = 'avl/sfr_delay_10_1g.yaml',
nc = True,
p = True,
l = 1000)
def setUp(self):
pass
def test_mandatory_param_error(self):
start_params = dict( c = 4,
m = 1.1,
d = 70,
# f = 'avl/sfr_delay_10_1g.yaml', <-- f (mandatory) is not provided on purpose
nc = True,
p = True,
l = 1000)
assert_raises(TypeError, self.trex.start_trex, **start_params)
def test_parameter_name_error(self):
ret = self.trex.start_trex( c = 4,
wrong_key = 1.1, # <----- This key does not exists in TRex API
d = 70,
f = 'avl/sfr_delay_10_1g.yaml',
nc = True,
p = True,
l = 1000)
time.sleep(5)
# check for failure status
run_status = self.trex.get_running_status()
assert isinstance(run_status, dict)
assert_equal (run_status['state'], TRexStatus.Idle )
assert_equal (run_status['verbose'], "TRex run failed due to wrong input parameters, or due to reachability issues.")
assert_raises(TRexError, self.trex.get_running_info)
def test_too_early_sample(self):
ret = self.trex.start_trex(**self.valid_start_params)
assert ret==True
# issue get_running_info() too soon, without any(!) sleep
run_status = self.trex.get_running_status()
assert isinstance(run_status, dict)
assert_equal (run_status['state'], TRexStatus.Starting )
assert_raises(TRexWarning, self.trex.get_running_info)
ret = self.trex.stop_trex()
assert ret==True # make sure stop succeeded
assert self.trex.is_running() == False
def test_start_sampling_on_time(self):
ret = self.trex.start_trex(**self.valid_start_params)
assert ret==True
time.sleep(6)
run_status = self.trex.get_running_status()
assert isinstance(run_status, dict)
assert_equal (run_status['state'], TRexStatus.Running )
run_info = self.trex.get_running_info()
assert isinstance(run_info, dict)
ret = self.trex.stop_trex()
assert ret==True # make sure stop succeeded
assert self.trex.is_running() == False
def test_start_more_than_once_same_user(self):
assert self.trex.is_running() == False # first, make sure TRex is not running
ret = self.trex.start_trex(**self.valid_start_params) # start 1st TRex run
assert ret == True # make sure 1st run submitted successfuly
# time.sleep(1)
assert_raises(TRexInUseError, self.trex.start_trex, **self.valid_start_params) # try to start TRex again
ret = self.trex.stop_trex()
assert ret==True # make sure stop succeeded
assert self.trex.is_running() == False
def test_start_more_than_once_different_users(self):
assert self.trex.is_running() == False # first, make sure TRex is not running
ret = self.trex.start_trex(**self.valid_start_params) # start 1st TRex run
assert ret == True # make sure 1st run submitted successfuly
# time.sleep(1)
tmp_trex = CTRexClient(self.trex_server_name) # initialize another client connecting same server
assert_raises(TRexInUseError, tmp_trex.start_trex, **self.valid_start_params) # try to start TRex again
ret = self.trex.stop_trex()
assert ret==True # make sure stop succeeded
assert self.trex.is_running() == False
def test_simultaneous_sampling(self):
assert self.trex.is_running() == False # first, make sure TRex is not running
tmp_trex = CTRexClient(self.trex_server_name) # initialize another client connecting same server
ret = self.trex.start_trex(**self.valid_start_params) # start TRex run
assert ret == True # make sure 1st run submitted successfuly
time.sleep(6)
# now, sample server from both clients
while (self.trex.is_running()):
info_1 = self.trex.get_running_info()
info_2 = tmp_trex.get_running_info()
# make sure samples are consistent
if self.trex.get_result_obj().is_valid_hist():
assert tmp_trex.get_result_obj().is_valid_hist() == True
if self.trex.get_result_obj().is_done_warmup():
assert tmp_trex.get_result_obj().is_done_warmup() == True
# except TRexError as inst: # TRex might have stopped between is_running result and get_running_info() call
# # hence, ingore that case
# break
assert self.trex.is_running() == False
def test_fast_toggling(self):
assert self.trex.is_running() == False
for i in range(20):
ret = self.trex.start_trex(**self.valid_start_params) # start TRex run
assert ret == True
assert self.trex.is_running() == False # we expect the status to be 'Starting'
ret = self.trex.stop_trex()
assert ret == True
assert self.trex.is_running() == False
pass
def tearDown(self):
pass
class CBasicQuery_Test(CControlPlaneGeneral_Test):
def __init__(self):
super(CBasicQuery_Test, self).__init__()
pass
def setUp(self):
pass
def test_is_running(self):
assert self.trex.is_running() == False
def tearDown(self):
pass
|