# Copyright (c) 2024 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of graphs for trending data.
"""
import logging
import plotly.graph_objects as go
import pandas as pd
from numpy import nan
from datetime import datetime
from pytz import UTC
from ..utils.constants import Constants as C
from ..utils.utils import get_color, get_hdrh_latencies
from ..utils.anomalies import classify_anomalies
def select_trending_data(data: pd.DataFrame, itm: dict) -> pd.DataFrame:
"""Select the data for graphs from the provided data frame.
:param data: Data frame with data for graphs.
:param itm: Item (in this case job name) which data will be selected from
the input data frame.
:type data: pandas.DataFrame
:type itm: dict
:returns: A data frame with selected data.
:rtype: pandas.DataFrame
"""
phy = itm["phy"].rsplit("-", maxsplit=2)
if len(phy) == 3:
topo_arch, nic, drv = phy
if drv == "dpdk":
drv = ""
else:
drv += "-"
drv = drv.replace("_", "-")
else:
return None
if itm["testtype"] in ("ndr", "pdr"):
test_type = "ndrpdr"
elif itm["testtype"] == "mrr":
test_type = "mrr"
elif itm["testtype"] == "soak":
test_type = "soak"
elif itm["area"] == "hoststack":
test_type = "hoststack"
df = data.loc[(
(data["test_type"] == test_type) &
(data["passed"] == True)
)]
df = df[df.job.str.endswith(topo_arch)]
core = str() if itm["dut"] == "trex" else itm["core"]
ttype = "ndrpdr" if itm["testtype"] in ("ndr", "pdr") else itm["testtype"]
df = df[df.test_id.str.contains(
f"^.*[.|-]{nic}.*{itm['framesize']}-{core}-{drv}{itm['test']}-{ttype}$",
regex=True
)].sort_values(by="start_time", ignore_index=True)
return df
def graph_trending(
data: pd.DataFrame,
sel: dict,
layout: dict,
normalize: bool=False,
trials: bool=False
) -> tuple:
"""Generate the trending graph(s) - MRR, NDR, PDR and for PDR also Latences
(result_latency_forward_pdr_50_avg).
:param data: Data frame with test results.
:param sel: Selected tests.
:param layout: Layout of plot.ly graph.
:param normalize: If True, the data is normalized to CPU frquency
Constants.NORM_FREQUENCY.
:param trials: If True, MRR trials are displayed in the trending graph.
:type data: pandas.DataFrame
:type sel: dict
:type layout: dict
:type normalize: bool
:type: trials: bool
:returns: Trending graph(s)
:rtype: tuple(plotly.graph_objects.Figure, plotly.graph_objects.Figure)
"""
if not sel:
return None, None
def _generate_trending_traces(
ttype: str,
name: str,
df: pd.DataFrame,
color: str,
nf: float
) -> list:
"""Generate the trending traces for the trending graph.
:param ttype: Test type (MRR, NDR, PDR).
:param name: The test name to be displayed as the graph title.
:param df: Data frame with test data.
:param color: The color of the trace (samples and trend line).
:param nf: The factor used for normalization of the results to
CPU frequency set to Constants.NORM_FREQUENCY.
:type ttype: str
:type name: str
:type df: pandas.DataFrame
:type color: str
:type nf: float
:returns: Traces (samples, trending line, anomalies)
:rtype: list
"""
df = df.dropna(subset=[C.VALUE[ttype], ])
if df.empty:
return list(), list()
hover = list()
customdata = list()
customdata_samples = list()
name_lst = name.split("-")
for _, row in df.iterrows():
h_tput, h_band, h_lat, h_tput_trials, h_band_trials = \
str(), str(), str(), str(), str()
if ttype in ("mrr", "mrr-bandwidth"):
h_tput = (
f"tput avg [{row['result_receive_rate_rate_unit']}]: "
f"{row['result_receive_rate_rate_avg'] * nf:,.0f}
"
f"tput stdev [{row['result_receive_rate_rate_unit']}]: "
f"{row['result_receive_rate_rate_stdev'] * nf:,.0f}
"
)
if pd.notna(row["result_receive_rate_bandwidth_avg"]):
h_band = (
f"bandwidth avg "
f"[{row['result_receive_rate_bandwidth_unit']}]: "
f"{row['result_receive_rate_bandwidth_avg'] * nf:,.0f}"
"
"
f"bandwidth stdev "
f"[{row['result_receive_rate_bandwidth_unit']}]: "
f"{row['result_receive_rate_bandwidth_stdev']* nf:,.0f}"
"
"
)
if trials:
h_tput_trials = (
f"tput trials "
f"[{row['result_receive_rate_rate_unit']}]: "
)
for itm in row["result_receive_rate_rate_values"]:
h_tput_trials += f"{itm * nf:,.0f}; "
h_tput_trials = h_tput_trials[:-2] + "
"
if pd.notna(row["result_receive_rate_bandwidth_avg"]):
h_band_trials = (
f"bandwidth trials "
f"[{row['result_receive_rate_bandwidth_unit']}]: "
)
for itm in row["result_receive_rate_bandwidth_values"]:
h_band_trials += f"{itm * nf:,.0f}; "
h_band_trials = h_band_trials[:-2] + "
"
elif ttype in ("ndr", "ndr-bandwidth"):
h_tput = (
f"tput [{row['result_ndr_lower_rate_unit']}]: "
f"{row['result_ndr_lower_rate_value'] * nf:,.0f}
"
)
if pd.notna(row["result_ndr_lower_bandwidth_value"]):
h_band = (
f"bandwidth [{row['result_ndr_lower_bandwidth_unit']}]:"
f" {row['result_ndr_lower_bandwidth_value'] * nf:,.0f}"
"
"
)
elif ttype in ("pdr", "pdr-bandwidth", "latency"):
h_tput = (
f"tput [{row['result_pdr_lower_rate_unit']}]: "
f"{row['result_pdr_lower_rate_value'] * nf:,.0f}
"
)
if pd.notna(row["result_pdr_lower_bandwidth_value"]):
h_band = (
f"bandwidth [{row['result_pdr_lower_bandwidth_unit']}]:"
f" {row['result_pdr_lower_bandwidth_value'] * nf:,.0f}"
"
"
)
if pd.notna(row["result_latency_forward_pdr_50_avg"]):
h_lat = (
f"latency "
f"[{row['result_latency_forward_pdr_50_unit']}]: "
f"{row['result_latency_forward_pdr_50_avg'] / nf:,.0f}"
"
"
)
elif ttype in ("hoststack-cps", "hoststack-rps",
"hoststack-cps-bandwidth",
"hoststack-rps-bandwidth", "hoststack-latency"):
h_tput = (
f"tput [{row['result_rate_unit']}]: "
f"{row['result_rate_value'] * nf:,.0f}
"
)
h_band = (
f"bandwidth [{row['result_bandwidth_unit']}]: "
f"{row['result_bandwidth_value'] * nf:,.0f}
"
)
h_lat = (
f"latency [{row['result_latency_unit']}]: "
f"{row['result_latency_value'] / nf:,.0f}
"
)
elif ttype in ("hoststack-bps", ):
h_band = (
f"bandwidth [{row['result_bandwidth_unit']}]: "
f"{row['result_bandwidth_value'] * nf:,.0f}
"
)
elif ttype in ("soak", "soak-bandwidth"):
h_tput = (
f"tput [{row['result_critical_rate_lower_rate_unit']}]: "
f"{row['result_critical_rate_lower_rate_value'] * nf:,.0f}"
"
"
)
if pd.notna(row["result_critical_rate_lower_bandwidth_value"]):
bv = row['result_critical_rate_lower_bandwidth_value']
h_band = (
"bandwidth "
f"[{row['result_critical_rate_lower_bandwidth_unit']}]:"
f" {bv * nf:,.0f}"
"
"
)
try:
hosts = f"
hosts: {', '.join(row['hosts'])}"
except (KeyError, TypeError):
hosts = str()
for drv in C.DRIVERS:
if drv in name_lst:
split_idx = name_lst.index(drv) + 1
break
else:
split_idx = 5
hover_itm = (
f"dut: {name_lst[0]}
"
f"infra: {'-'.join(name_lst[1:split_idx])}
"
f"test: {'-'.join(name_lst[split_idx:])}
"
f"date: {row['start_time'].strftime('%Y-%m-%d %H:%M:%S')}
"
f"{h_tput}{h_tput_trials}{h_band}{h_band_trials}{h_lat}"
f"{row['dut_type']}-ref: {row['dut_version']}
"
f"csit-ref: {row['job']}/{row['build']}"
f"{hosts}"
)
hover.append(hover_itm)
if ttype == "latency":
customdata_samples.append(get_hdrh_latencies(row, name))
customdata.append({"name": name})
else:
customdata_samples.append(
{"name": name, "show_telemetry": True}
)
customdata.append({"name": name})
x_axis = df["start_time"].tolist()
if "latency" in ttype:
y_data = [(v / nf) for v in df[C.VALUE[ttype]].tolist()]
else:
y_data = [(v * nf) for v in df[C.VALUE[ttype]].tolist()]
units = df[C.UNIT[ttype]].unique().tolist()
try:
anomalies, trend_avg, trend_stdev = classify_anomalies(
{k: v for k, v in zip(x_axis, y_data)}
)
except ValueError as err:
logging.error(err)
return list(), list()
hover_trend = list()
for avg, stdev, (_, row) in zip(trend_avg, trend_stdev, df.iterrows()):
try:
hosts = f"
hosts: {', '.join(row['hosts'])}"
except (KeyError, TypeError):
hosts = str()
hover_itm = (
f"dut: {name_lst[0]}
"
f"infra: {'-'.join(name_lst[1:5])}
"
f"test: {'-'.join(name_lst[5:])}
"
f"date: {row['start_time'].strftime('%Y-%m-%d %H:%M:%S')}
"
f"trend [{row[C.UNIT[ttype]]}]: {avg:,.0f}
"
f"stdev [{row[C.UNIT[ttype]]}]: {stdev:,.0f}
"
f"{row['dut_type']}-ref: {row['dut_version']}
"
f"csit-ref: {row['job']}/{row['build']}"
f"{hosts}"
)
if ttype == "latency":
hover_itm = hover_itm.replace("[pps]", "[us]")
hover_trend.append(hover_itm)
traces = [
go.Scatter( # Samples
x=x_axis,
y=y_data,
name=name,
mode="markers",
marker={
"size": 5,
"color": color,
"symbol": "circle"
},
text=hover,
hoverinfo="text",
showlegend=True,
legendgroup=name,
customdata=customdata_samples
),
go.Scatter( # Trend line
x=x_axis,
y=trend_avg,
name=name,
mode="lines",
line={
"shape": "linear",
"width": 1,
"color": color,
},
text=hover_trend,
hoverinfo="text",
showlegend=False,
legendgroup=name,
customdata=customdata
)
]
if anomalies:
anomaly_x = list()
anomaly_y = list()
anomaly_color = list()
hover = list()
for idx, anomaly in enumerate(anomalies):
if anomaly in ("regression", "progression"):
anomaly_x.append(x_axis[idx])
anomaly_y.append(trend_avg[idx])
anomaly_color.append(C.ANOMALY_COLOR[anomaly])
hover_itm = (
f"dut: {name_lst[0]}
"
f"infra: {'-'.join(name_lst[1:5])}
"
f"test: {'-'.join(name_lst[5:])}
"
f"date: {x_axis[idx].strftime('%Y-%m-%d %H:%M:%S')}
"
f"trend [pps]: {trend_avg[idx]:,.0f}
"
f"classification: {anomaly}"
)
if ttype == "latency":
hover_itm = hover_itm.replace("[pps]", "[us]")
hover.append(hover_itm)
anomaly_color.extend([0.0, 0.5, 1.0])
traces.append(
go.Scatter(
x=anomaly_x,
y=anomaly_y,
mode="markers",
text=hover,
hoverinfo="text",
showlegend=False,
legendgroup=name,
name=name,
customdata=customdata,
marker={
"size": 15,
"symbol": "circle-open",
"color": anomaly_color,
"colorscale": C.COLORSCALE_LAT \
if ttype == "latency" else C.COLORSCALE_TPUT,
"showscale": True,
"line": {
"width": 2
},
"colorbar": {
"y": 0.5,
"len": 0.8,
"title": "Circles Marking Data Classification",
"titleside": "right",
"tickmode": "array",
"tickvals": [0.167, 0.500, 0.833],
"ticktext": C.TICK_TEXT_LAT \
if ttype == "latency" else C.TICK_TEXT_TPUT,
"ticks": "",
"ticklen": 0,
"tickangle": -90,
"thickness": 10
}
}
)
)
return traces, units
def _add_mrr_trials_traces(
ttype: str,
name: str,
df: pd.DataFrame,
color: str,
nf: float
) -> list:
"""Add the traces with mrr trials.
:param ttype: Test type (mrr, mrr-bandwidth).
:param name: The test name to be displayed in hover.
:param df: Data frame with test data.
:param color: The color of the trace.
:param nf: The factor used for normalization of the results to
CPU frequency set to Constants.NORM_FREQUENCY.
:type ttype: str
:type name: str
:type df: pandas.DataFrame
:type color: str
:type nf: float
:returns: list of Traces
:rtype: list
"""
traces = list()
x_axis = df["start_time"].tolist()
y_data = df[C.VALUE[ttype].replace("avg", "values")].tolist()
for idx_trial in range(10):
y_axis = list()
for idx_run in range(len(x_axis)):
try:
y_axis.append(y_data[idx_run][idx_trial] * nf)
except (IndexError, TypeError, ValueError):
y_axis.append(nan)
traces.append(go.Scatter(
x=x_axis,
y=y_axis,
name=name,
mode="markers",
marker={
"size": 2,
"color": color,
"symbol": "circle"
},
showlegend=True,
legendgroup=name,
hoverinfo="skip"
))
return traces
fig_tput = None
fig_lat = None
fig_band = None
start_times = list()
y_units = set()
for idx, itm in enumerate(sel):
df = select_trending_data(data, itm)
if df is None or df.empty:
continue
start_times.append(df["start_time"][0])
if normalize:
phy = itm["phy"].rsplit("-", maxsplit=2)
topo_arch = phy[0] if len(phy) == 3 else str()
norm_factor = (C.NORM_FREQUENCY / C.FREQUENCY.get(topo_arch, 1.0)) \
if topo_arch else 1.0
else:
norm_factor = 1.0
if itm["area"] == "hoststack":
ttype = f"hoststack-{itm['testtype']}"
else:
ttype = itm["testtype"]
traces, units = _generate_trending_traces(
ttype,
itm["id"],
df,
get_color(idx),
norm_factor
)
if traces:
if not fig_tput:
fig_tput = go.Figure()
if trials and "mrr" in ttype:
traces.extend(_add_mrr_trials_traces(
ttype,
itm["id"],
df,
get_color(idx),
norm_factor
))
fig_tput.add_traces(traces)
if ttype in C.TESTS_WITH_BANDWIDTH:
traces, _ = _generate_trending_traces(
f"{ttype}-bandwidth",
itm["id"],
df,
get_color(idx),
norm_factor
)
if traces:
if not fig_band:
fig_band = go.Figure()
if trials and "mrr" in ttype:
traces.extend(_add_mrr_trials_traces(
f"{ttype}-bandwidth",
itm["id"],
df,
get_color(idx),
norm_factor
))
fig_band.add_traces(traces)
if ttype in C.TESTS_WITH_LATENCY:
traces, _ = _generate_trending_traces(
"latency" if ttype == "pdr" else "hoststack-latency",
itm["id"],
df,
get_color(idx),
norm_factor
)
if traces:
if not fig_lat:
fig_lat = go.Figure()
fig_lat.add_traces(traces)
y_units.update(units)
x_range = [min(start_times), datetime.now(tz=UTC).strftime("%Y-%m-%d")]
if fig_tput:
layout_tput = layout.get("plot-trending-tput", dict())
layout_tput["yaxis"]["title"] = \
f"Throughput [{'|'.join(sorted(y_units))}]"
layout_tput["xaxis"]["range"] = x_range
fig_tput.update_layout(layout_tput)
if fig_band:
layout_band = layout.get("plot-trending-bandwidth", dict())
layout_band["xaxis"]["range"] = x_range
fig_band.update_layout(layout_band)
if fig_lat:
layout_lat = layout.get("plot-trending-lat", dict())
layout_lat["xaxis"]["range"] = x_range
fig_lat.update_layout(layout_lat)
return fig_tput, fig_band, fig_lat
def graph_tm_trending(
data: pd.DataFrame,
layout: dict,
all_in_one: bool=False
) -> list:
"""Generates one trending graph per test, each graph includes all selected
metrics.
:param data: Data frame with telemetry data.
:param layout: Layout of plot.ly graph.
:param all_in_one: If True, all telemetry traces are placed in one graph,
otherwise they are split to separate graphs grouped by test ID.
:type data: pandas.DataFrame
:type layout: dict
:type all_in_one: bool
:returns: List of generated graphs together with test names.
list(tuple(plotly.graph_objects.Figure(), str()), tuple(...), ...)
:rtype: list
"""
if data.empty:
return list()
def _generate_traces(
data: pd.DataFrame,
test: str,
all_in_one: bool,
color_index: int
) -> list:
"""Generates a trending graph for given test with all metrics.
:param data: Data frame with telemetry data for the given test.
:param test: The name of the test.
:param all_in_one: If True, all telemetry traces are placed in one
graph, otherwise they are split to separate graphs grouped by
test ID.
:param color_index: The index of the test used if all_in_one is True.
:type data: pandas.DataFrame
:type test: str
:type all_in_one: bool
:type color_index: int
:returns: List of traces.
:rtype: list
"""
traces = list()
metrics = data.tm_metric.unique().tolist()
for idx, metric in enumerate(metrics):
if "-pdr" in test and "='pdr'" not in metric:
continue
if "-ndr" in test and "='ndr'" not in metric:
continue
df = data.loc[(data["tm_metric"] == metric)]
x_axis = df["start_time"].tolist()
y_data = [float(itm) for itm in df["tm_value"].tolist()]
hover = list()
for i, (_, row) in enumerate(df.iterrows()):
if row["test_type"] == "mrr":
rate = (
f"mrr avg [{row[C.UNIT['mrr']]}]: "
f"{row[C.VALUE['mrr']]:,.0f}
"
f"mrr stdev [{row[C.UNIT['mrr']]}]: "
f"{row['result_receive_rate_rate_stdev']:,.0f}
"
)
elif row["test_type"] == "ndrpdr":
if "-pdr" in test:
rate = (
f"pdr [{row[C.UNIT['pdr']]}]: "
f"{row[C.VALUE['pdr']]:,.0f}
"
)
elif "-ndr" in test:
rate = (
f"ndr [{row[C.UNIT['ndr']]}]: "
f"{row[C.VALUE['ndr']]:,.0f}
"
)
else:
rate = str()
else:
rate = str()
hover.append(
f"date: "
f"{row['start_time'].strftime('%Y-%m-%d %H:%M:%S')}
"
f"value: {y_data[i]:,.2f}
"
f"{rate}"
f"{row['dut_type']}-ref: {row['dut_version']}
"
f"csit-ref: {row['job']}/{row['build']}
"
)
if any(y_data):
anomalies, trend_avg, trend_stdev = classify_anomalies(
{k: v for k, v in zip(x_axis, y_data)}
)
hover_trend = list()
for avg, stdev, (_, row) in \
zip(trend_avg, trend_stdev, df.iterrows()):
hover_trend.append(
f"date: "
f"{row['start_time'].strftime('%Y-%m-%d %H:%M:%S')}
"
f"trend: {avg:,.2f}
"
f"stdev: {stdev:,.2f}
"
f"{row['dut_type']}-ref: {row['dut_version']}
"
f"csit-ref: {row['job']}/{row['build']}"
)
else:
anomalies = None
if all_in_one:
color = get_color(color_index * len(metrics) + idx)
metric_name = f"{test}
{metric}"
else:
color = get_color(idx)
metric_name = metric
traces.append(
go.Scatter( # Samples
x=x_axis,
y=y_data,
name=metric_name,
mode="markers",
marker={
"size": 5,
"color": color,
"symbol": "circle",
},
text=hover,
hoverinfo="text+name",
showlegend=True,
legendgroup=metric_name
)
)
if anomalies:
traces.append(
go.Scatter( # Trend line
x=x_axis,
y=trend_avg,
name=metric_name,
mode="lines",
line={
"shape": "linear",
"width": 1,
"color": color,
},
text=hover_trend,
hoverinfo="text+name",
showlegend=False,
legendgroup=metric_name
)
)
anomaly_x = list()
anomaly_y = list()
anomaly_color = list()
hover = list()
for idx, anomaly in enumerate(anomalies):
if anomaly in ("regression", "progression"):
anomaly_x.append(x_axis[idx])
anomaly_y.append(trend_avg[idx])
anomaly_color.append(C.ANOMALY_COLOR[anomaly])
hover_itm = (
f"date: {x_axis[idx].strftime('%Y-%m-%d %H:%M:%S')}"
f"
trend: {trend_avg[idx]:,.2f}"
f"
classification: {anomaly}"
)
hover.append(hover_itm)
anomaly_color.extend([0.0, 0.5, 1.0])
traces.append(
go.Scatter(
x=anomaly_x,
y=anomaly_y,
mode="markers",
text=hover,
hoverinfo="text+name",
showlegend=False,
legendgroup=metric_name,
name=metric_name,
marker={
"size": 15,
"symbol": "circle-open",
"color": anomaly_color,
"colorscale": C.COLORSCALE_TPUT,
"showscale": True,
"line": {
"width": 2
},
"colorbar": {
"y": 0.5,
"len": 0.8,
"title": "Circles Marking Data Classification",
"titleside": "right",
"tickmode": "array",
"tickvals": [0.167, 0.500, 0.833],
"ticktext": C.TICK_TEXT_TPUT,
"ticks": "",
"ticklen": 0,
"tickangle": -90,
"thickness": 10
}
}
)
)
unique_metrics = set()
for itm in metrics:
unique_metrics.add(itm.split("{", 1)[0])
return traces, unique_metrics
tm_trending_graphs = list()
graph_layout = layout.get("plot-trending-telemetry", dict())
if all_in_one:
all_traces = list()
all_metrics = set()
all_tests = list()
for idx, test in enumerate(data.test_name.unique()):
df = data.loc[(data["test_name"] == test)]
traces, metrics = _generate_traces(df, test, all_in_one, idx)
if traces:
all_metrics.update(metrics)
if all_in_one:
all_traces.extend(traces)
all_tests.append(test)
else:
graph = go.Figure()
graph.add_traces(traces)
graph.update_layout(graph_layout)
tm_trending_graphs.append((graph, [test, ], ))
if all_in_one:
graph = go.Figure()
graph.add_traces(all_traces)
graph.update_layout(graph_layout)
tm_trending_graphs.append((graph, all_tests, ))
return tm_trending_graphs, list(all_metrics)