1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
# Copyright (c) 2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module with keywords that publish parts of result structure."""
from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.model.util import descend, get_export_data
def export_dut_type_and_version(dut_type=u"unknown", dut_version=u"unknown"):
"""Export the arguments as dut type and version.
Robot tends to convert "none" into None, hence the unusual default values.
If either argument is missing, the value from robot variable is used.
If argument is present, the value is also stored to robot suite variable.
:param dut_type: DUT type, e.g. VPP or DPDK.
:param dut_version: DUT version as determined by the caller.
:type dut_type: Optional[str]
:type dut_version: Optiona[str]
:raises RuntimeError: If value is neither in argument not robot variable.
"""
if dut_type == u"unknown":
dut_type = BuiltIn().get_variable_value(u"\\${DUT_TYPE}", u"unknown")
if dut_type == u"unknown":
raise RuntimeError(u"Dut type not provided.")
else:
# We want to set a variable in higher level suite setup
# to be available to test setup several levels lower.
BuiltIn().set_suite_variable(
u"\\${DUT_TYPE}", dut_type, u"children=True"
)
if dut_version == u"unknown":
dut_version = BuiltIn().get_variable_value(
u"\\${DUT_VERSION}", u"unknown"
)
if dut_type == u"unknown":
raise RuntimeError(u"Dut version not provided.")
else:
BuiltIn().set_suite_variable(
u"\\${DUT_VERSION}", dut_version, u"children=True"
)
data = get_export_data()
data[u"dut_type"] = dut_type.lower()
data[u"dut_version"] = dut_version
def export_tg_type_and_version(tg_type=u"unknown", tg_version=u"unknown"):
"""Export the arguments as tg type and version.
Robot tends to convert "none" into None, hence the unusual default values.
If either argument is missing, the value from robot variable is used.
If argument is present, the value is also stored to robot suite variable.
:param tg_type: TG type, e.g. TREX.
:param tg_version: TG version as determined by the caller.
:type tg_type: Optional[str]
:type tg_version: Optiona[str]
:raises RuntimeError: If value is neither in argument not robot variable.
"""
if tg_type == u"unknown":
tg_type = BuiltIn().get_variable_value(u"\\${TG_TYPE}", u"unknown")
if tg_type == u"unknown":
raise RuntimeError(u"TG type not provided.")
else:
# We want to set a variable in higher level suite setup
# to be available to test setup several levels lower.
BuiltIn().set_suite_variable(
u"\\${TG_TYPE}", tg_type, u"children=True"
)
if tg_version == u"unknown":
tg_version = BuiltIn().get_variable_value(
u"\\${TG_VERSION}", u"unknown"
)
if tg_type == u"unknown":
raise RuntimeError(u"TG version not provided.")
else:
BuiltIn().set_suite_variable(
u"\\${TG_VERSION}", tg_version, u"children=True"
)
data = get_export_data()
data[u"tg_type"] = tg_type.lower()
data[u"tg_version"] = tg_version
def append_mrr_value(mrr_value, unit):
"""Store mrr value to proper place so it is dumped into json.
The value is appended only when unit is not empty.
:param mrr_value: Forwarding rate from MRR trial.
:param unit: Unit of measurement for the rate.
:type mrr_value: float
:type unit: str
"""
if not unit:
return
data = get_export_data()
data[u"result"][u"type"] = u"mrr"
rate_node = descend(descend(data[u"result"], u"receive_rate"), "rate")
rate_node[u"unit"] = str(unit)
values_list = descend(rate_node, u"values", list)
values_list.append(float(mrr_value))
def export_search_bound(text, value, unit, bandwidth=None):
"""Store bound value and unit.
This function works for both NDRPDR and SOAK, decided by text.
If a node does not exist, it is created.
If a previous value exists, it is overwritten silently.
Result type is set (overwritten) to ndrpdr (or soak).
Text is used to determine whether it is ndr or pdr, upper or lower bound,
as the Robot caller has the information only there.
:param text: Info from Robot caller to determime bound type.
:param value: The bound value in packets (or connections) per second.
:param unit: Rate unit the bound is measured (or estimated) in.
:param bandwidth: The same value recomputed into L1 bits per second.
:type text: str
:type value: float
:type unit: str
:type bandwidth: Optional[float]
"""
value = float(value)
text = str(text).lower()
result_type = u"soak" if u"plrsearch" in text else u"ndrpdr"
upper_or_lower = u"upper" if u"upper" in text else u"lower"
ndr_or_pdr = u"ndr" if u"ndr" in text else u"pdr"
data = get_export_data()
result_node = data[u"result"]
result_node[u"type"] = result_type
rate_item = dict(rate=dict(value=value, unit=unit))
if bandwidth:
rate_item[u"bandwidth"] = dict(value=float(bandwidth), unit=u"bps")
if result_type == u"soak":
descend(result_node, u"critical_rate")[upper_or_lower] = rate_item
return
descend(result_node, ndr_or_pdr)[upper_or_lower] = rate_item
def _add_latency(result_node, percent, whichward, latency_string):
"""Descend to a corresponding node and add values from latency string.
This is an internal block, moved out from export_ndrpdr_latency,
as it can be called up to 4 times.
:param result_node: UTI tree node to descend from.
:param percent: Percent value to use in node key (90, 50, 10, 0).
:param whichward: "forward" or "reverse".
:param latency_item: Unidir output from TRex utility, min/avg/max/hdrh.
:type result_node: dict
:type percent: int
:type whichward: str
:latency_string: str
"""
l_min, l_avg, l_max, l_hdrh = latency_string.split(u"/", 3)
whichward_node = descend(result_node, f"latency_{whichward}")
percent_node = descend(whichward_node, f"pdr_{percent}")
percent_node[u"min"] = int(l_min)
percent_node[u"avg"] = int(l_avg)
percent_node[u"max"] = int(l_max)
percent_node[u"hdrh"] = l_hdrh
percent_node[u"unit"] = u"us"
def export_ndrpdr_latency(text, latency):
"""Store NDRPDR hdrh latency data.
If "latency" node does not exist, it is created.
If a previous value exists, it is overwritten silently.
Text is used to determine what percentage of PDR is the load,
as the Robot caller has the information only there.
Reverse data may be missing, we assume the test was unidirectional.
:param text: Info from Robot caller to determime load.
:param latency: Output from TRex utility, min/avg/max/hdrh.
:type text: str
:type latency: 1-tuple or 2-tuple of str
"""
data = get_export_data()
result_node = data[u"result"]
percent = 0
if u"90" in text:
percent = 90
elif u"50" in text:
percent = 50
elif u"10" in text:
percent = 10
_add_latency(result_node, percent, u"forward", latency[0])
# Else TRex does not support latency measurement for this traffic profile.
if len(latency) < 2:
return
_add_latency(result_node, percent, u"reverse", latency[1])
def append_telemetry(telemetry_item):
"""Append telemetry entry to proper place so it is dumped into json.
:param telemetry_item: Telemetry entry.
:type telemetry_item: str
"""
data = get_export_data()
data[u"telemetry"].append(telemetry_item)
|