aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/model/ExportResult.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/model/ExportResult.py')
-rw-r--r--resources/libraries/python/model/ExportResult.py221
1 files changed, 162 insertions, 59 deletions
diff --git a/resources/libraries/python/model/ExportResult.py b/resources/libraries/python/model/ExportResult.py
index 16c6b89fb3..f155848913 100644
--- a/resources/libraries/python/model/ExportResult.py
+++ b/resources/libraries/python/model/ExportResult.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2022 Cisco and/or its affiliates.
+# Copyright (c) 2023 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -18,7 +18,7 @@ from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.model.util import descend, get_export_data
-def export_dut_type_and_version(dut_type=u"unknown", dut_version=u"unknown"):
+def export_dut_type_and_version(dut_type="unknown", dut_version="unknown"):
"""Export the arguments as dut type and version.
Robot tends to convert "none" into None, hence the unusual default values.
@@ -32,32 +32,32 @@ def export_dut_type_and_version(dut_type=u"unknown", dut_version=u"unknown"):
:type dut_version: Optiona[str]
:raises RuntimeError: If value is neither in argument not robot variable.
"""
- if dut_type == u"unknown":
- dut_type = BuiltIn().get_variable_value(u"\\${DUT_TYPE}", u"unknown")
- if dut_type == u"unknown":
- raise RuntimeError(u"Dut type not provided.")
+ if dut_type == "unknown":
+ dut_type = BuiltIn().get_variable_value("\\${DUT_TYPE}", "unknown")
+ if dut_type == "unknown":
+ raise RuntimeError("Dut type not provided.")
else:
# We want to set a variable in higher level suite setup
# to be available to test setup several levels lower.
BuiltIn().set_suite_variable(
- u"\\${DUT_TYPE}", dut_type, u"children=True"
+ "\\${DUT_TYPE}", dut_type, "children=True"
)
- if dut_version == u"unknown":
+ if dut_version == "unknown":
dut_version = BuiltIn().get_variable_value(
- u"\\${DUT_VERSION}", u"unknown"
+ "\\${DUT_VERSION}", "unknown"
)
- if dut_type == u"unknown":
- raise RuntimeError(u"Dut version not provided.")
+ if dut_type == "unknown":
+ raise RuntimeError("Dut version not provided.")
else:
BuiltIn().set_suite_variable(
- u"\\${DUT_VERSION}", dut_version, u"children=True"
+ "\\${DUT_VERSION}", dut_version, "children=True"
)
data = get_export_data()
- data[u"dut_type"] = dut_type.lower()
- data[u"dut_version"] = dut_version
+ data["dut_type"] = dut_type.lower()
+ data["dut_version"] = dut_version
-def export_tg_type_and_version(tg_type=u"unknown", tg_version=u"unknown"):
+def export_tg_type_and_version(tg_type="unknown", tg_version="unknown"):
"""Export the arguments as tg type and version.
Robot tends to convert "none" into None, hence the unusual default values.
@@ -71,50 +71,57 @@ def export_tg_type_and_version(tg_type=u"unknown", tg_version=u"unknown"):
:type tg_version: Optiona[str]
:raises RuntimeError: If value is neither in argument not robot variable.
"""
- if tg_type == u"unknown":
- tg_type = BuiltIn().get_variable_value(u"\\${TG_TYPE}", u"unknown")
- if tg_type == u"unknown":
- raise RuntimeError(u"TG type not provided.")
+ if tg_type == "unknown":
+ tg_type = BuiltIn().get_variable_value("\\${TG_TYPE}", "unknown")
+ if tg_type == "unknown":
+ raise RuntimeError("TG type not provided!")
else:
# We want to set a variable in higher level suite setup
# to be available to test setup several levels lower.
BuiltIn().set_suite_variable(
- u"\\${TG_TYPE}", tg_type, u"children=True"
+ "\\${TG_TYPE}", tg_type, "children=True"
)
- if tg_version == u"unknown":
+ if tg_version == "unknown":
tg_version = BuiltIn().get_variable_value(
- u"\\${TG_VERSION}", u"unknown"
+ "\\${TG_VERSION}", "unknown"
)
- if tg_type == u"unknown":
- raise RuntimeError(u"TG version not provided.")
+ if tg_type == "unknown":
+ raise RuntimeError("TG version not provided!")
else:
BuiltIn().set_suite_variable(
- u"\\${TG_VERSION}", tg_version, u"children=True"
+ "\\${TG_VERSION}", tg_version, "children=True"
)
data = get_export_data()
- data[u"tg_type"] = tg_type.lower()
- data[u"tg_version"] = tg_version
+ data["tg_type"] = tg_type.lower()
+ data["tg_version"] = tg_version
-def append_mrr_value(mrr_value, unit):
+def append_mrr_value(mrr_value, mrr_unit, bandwidth_value=None,
+ bandwidth_unit="bps"):
"""Store mrr value to proper place so it is dumped into json.
The value is appended only when unit is not empty.
:param mrr_value: Forwarding rate from MRR trial.
- :param unit: Unit of measurement for the rate.
+ :param mrr_unit: Unit of measurement for the rate.
+ :param bandwidth_value: The same value recomputed into L1 bits per second.
:type mrr_value: float
- :type unit: str
+ :type mrr_unit: str
+ :type bandwidth_value: Optional[float]
+ :type bandwidth_unit: Optional[str]
"""
- if not unit:
+ if not mrr_unit:
return
data = get_export_data()
- data[u"result"][u"type"] = u"mrr"
- rate_node = descend(descend(data[u"result"], u"receive_rate"), "rate")
- rate_node[u"unit"] = str(unit)
- values_list = descend(rate_node, u"values", list)
- values_list.append(float(mrr_value))
- # TODO: Fill in the bandwidth part for pps?
+ data["result"]["type"] = "mrr"
+
+ for node_val, node_unit, node_name in ((mrr_value, mrr_unit, "rate"),
+ (bandwidth_value, bandwidth_unit, "bandwidth")):
+ if node_val is not None:
+ node = descend(descend(data["result"], "receive_rate"), node_name)
+ node["unit"] = str(node_unit)
+ values_list = descend(node, "values", list)
+ values_list.append(float(node_val))
def export_search_bound(text, value, unit, bandwidth=None):
@@ -140,18 +147,17 @@ def export_search_bound(text, value, unit, bandwidth=None):
"""
value = float(value)
text = str(text).lower()
- result_type = u"soak" if u"plrsearch" in text else u"ndrpdr"
- upper_or_lower = u"upper" if u"upper" in text else u"lower"
- ndr_or_pdr = u"ndr" if u"ndr" in text else u"pdr"
+ result_type = "soak" if "plrsearch" in text else "ndrpdr"
+ upper_or_lower = "upper" if "upper" in text else "lower"
+ ndr_or_pdr = "ndr" if "ndr" in text else "pdr"
- data = get_export_data()
- result_node = data[u"result"]
- result_node[u"type"] = result_type
+ result_node = get_export_data()["result"]
+ result_node["type"] = result_type
rate_item = dict(rate=dict(value=value, unit=unit))
if bandwidth:
- rate_item[u"bandwidth"] = dict(value=float(bandwidth), unit=u"bps")
- if result_type == u"soak":
- descend(result_node, u"critical_rate")[upper_or_lower] = rate_item
+ rate_item["bandwidth"] = dict(value=float(bandwidth), unit="bps")
+ if result_type == "soak":
+ descend(result_node, "critical_rate")[upper_or_lower] = rate_item
return
descend(result_node, ndr_or_pdr)[upper_or_lower] = rate_item
@@ -171,14 +177,14 @@ def _add_latency(result_node, percent, whichward, latency_string):
:type whichward: str
:latency_string: str
"""
- l_min, l_avg, l_max, l_hdrh = latency_string.split(u"/", 3)
+ l_min, l_avg, l_max, l_hdrh = latency_string.split("/", 3)
whichward_node = descend(result_node, f"latency_{whichward}")
percent_node = descend(whichward_node, f"pdr_{percent}")
- percent_node[u"min"] = int(l_min)
- percent_node[u"avg"] = int(l_avg)
- percent_node[u"max"] = int(l_max)
- percent_node[u"hdrh"] = l_hdrh
- percent_node[u"unit"] = u"us"
+ percent_node["min"] = int(l_min)
+ percent_node["avg"] = int(l_avg)
+ percent_node["max"] = int(l_max)
+ percent_node["hdrh"] = l_hdrh
+ percent_node["unit"] = "us"
def export_ndrpdr_latency(text, latency):
@@ -197,17 +203,114 @@ def export_ndrpdr_latency(text, latency):
:type text: str
:type latency: 1-tuple or 2-tuple of str
"""
- data = get_export_data()
- result_node = data[u"result"]
+ result_node = get_export_data()["result"]
percent = 0
- if u"90" in text:
+ if "90" in text:
percent = 90
- elif u"50" in text:
+ elif "50" in text:
percent = 50
- elif u"10" in text:
+ elif "10" in text:
percent = 10
- _add_latency(result_node, percent, u"forward", latency[0])
+ _add_latency(result_node, percent, "forward", latency[0])
# Else TRex does not support latency measurement for this traffic profile.
if len(latency) < 2:
return
- _add_latency(result_node, percent, u"reverse", latency[1])
+ _add_latency(result_node, percent, "reverse", latency[1])
+
+
+def export_reconf_result(packet_rate, packet_loss, bandwidth):
+ """Export the RECONF type results.
+
+ Result type is set to reconf.
+
+ :param packet_rate: Aggregate offered load in packets per second.
+ :param packet_loss: How many of the packets were dropped or unsent.
+ :param bandwidth: The offered load recomputed into L1 bits per second.
+ :type packet_rate: float
+ :type packet_loss: int
+ :type bandwidth: float
+ """
+ result_node = get_export_data()["result"]
+ result_node["type"] = "reconf"
+
+ time_loss = int(packet_loss) / float(packet_rate)
+ result_node["aggregate_rate"] = dict(
+ bandwidth=dict(
+ unit="bps",
+ value=float(bandwidth)
+ ),
+ rate=dict(
+ unit="pps",
+ value=float(packet_rate)
+ )
+ )
+ result_node["loss"] = dict(
+ packet=dict(
+ unit="packets",
+ value=int(packet_loss)
+ ),
+ time=dict(
+ unit="s",
+ value=time_loss
+ )
+ )
+
+
+def export_hoststack_results(
+ bandwidth, rate=None, rate_unit=None, latency=None,
+ failed_requests=None, completed_requests=None, retransmits=None,
+ duration=None
+):
+ """Export the HOSTSTACK type results.
+
+ Result type is set to hoststack.
+
+ :param bandwidth: Measured transfer rate using bps as a unit.
+ :param rate: Resulting rate measured by the test. [Optional]
+ :param rate_unit: CPS or RPS. [Optional]
+ :param latency: Measure latency. [Optional]
+ :param failed_requests: Number of failed requests. [Optional]
+ :param completed_requests: Number of completed requests. [Optional]
+ :param retransmits: Retransmitted TCP packets. [Optional]
+ :param duration: Measurment duration. [Optional]
+ :type bandwidth: float
+ :type rate: float
+ :type rate_unit: str
+ :type latency: float
+ :type failed_requests: int
+ :type completed_requests: int
+ :type retransmits: int
+ :type duration: float
+ """
+ result_node = get_export_data()["result"]
+ result_node["type"] = "hoststack"
+
+ result_node["bandwidth"] = dict(unit="bps", value=bandwidth)
+ if rate is not None:
+ result_node["rate"] = \
+ dict(unit=rate_unit, value=rate)
+ if latency is not None:
+ result_node["latency"] = \
+ dict(unit="ms", value=latency)
+ if failed_requests is not None:
+ result_node["failed_requests"] = \
+ dict(unit="requests", value=failed_requests)
+ if completed_requests is not None:
+ result_node["completed_requests"] = \
+ dict(unit="requests", value=completed_requests)
+ if retransmits is not None:
+ result_node["retransmits"] = \
+ dict(unit="packets", value=retransmits)
+ if duration is not None:
+ result_node["duration"] = \
+ dict(unit="s", value=duration)
+
+
+def append_telemetry(telemetry_item):
+ """Append telemetry entry to proper place so it is dumped into json.
+
+ :param telemetry_item: Telemetry entry.
+ :type telemetry_item: str
+ """
+ data = get_export_data()
+ data["telemetry"].append(telemetry_item)