aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.dash/app/cdash/data
diff options
context:
space:
mode:
authorTibor Frank <tifrank@cisco.com>2023-01-27 08:26:25 +0100
committerPeter Mikus <peter.mikus@protonmail.ch>2023-02-01 08:34:12 +0000
commitc31372861134f29ae6eec8d98874e030e57ab5f1 (patch)
treee6a42ba2826dda42b52abcd7ad8297e11b9fd639 /csit.infra.dash/app/cdash/data
parent20432cc3b4321f16c82e22ac54d6bf979391ee71 (diff)
C-Dash: Pre-load the data from parquets
Signed-off-by: Tibor Frank <tifrank@cisco.com> Change-Id: I20792792469c10d1db2e891b76879ec8ced1b7d3
Diffstat (limited to 'csit.infra.dash/app/cdash/data')
-rw-r--r--csit.infra.dash/app/cdash/data/data.py252
-rw-r--r--csit.infra.dash/app/cdash/data/data.yaml172
2 files changed, 220 insertions, 204 deletions
diff --git a/csit.infra.dash/app/cdash/data/data.py b/csit.infra.dash/app/cdash/data/data.py
index 7ddb44311a..8537cd8db1 100644
--- a/csit.infra.dash/app/cdash/data/data.py
+++ b/csit.infra.dash/app/cdash/data/data.py
@@ -15,13 +15,14 @@
"""
import logging
+import resource
import awswrangler as wr
+import pandas as pd
from yaml import load, FullLoader, YAMLError
from datetime import datetime, timedelta
from time import time
from pytz import UTC
-from pandas import DataFrame
from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
@@ -30,27 +31,24 @@ class Data:
applications.
"""
- def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+ def __init__(self, data_spec_file: str) -> None:
"""Initialize the Data object.
:param data_spec_file: Path to file specifying the data to be read from
parquets.
- :param debug: If True, the debuf information is printed to stdout.
:type data_spec_file: str
- :type debug: bool
:raises RuntimeError: if it is not possible to open data_spec_file or it
is not a valid yaml file.
"""
# Inputs:
self._data_spec_file = data_spec_file
- self._debug = debug
# Specification of data to be read from parquets:
- self._data_spec = None
+ self._data_spec = list()
# Data frame to keep the data:
- self._data = None
+ self._data = pd.DataFrame()
# Read from files:
try:
@@ -71,48 +69,6 @@ class Data:
def data(self):
return self._data
- def _get_columns(self, parquet: str) -> list:
- """Get the list of columns from the data specification file to be read
- from parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have any columns specified.
- :returns: List of columns.
- :rtype: list
- """
-
- try:
- return self._data_spec[parquet]["columns"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have any columns "
- f"specified.\n{err}"
- )
-
- def _get_path(self, parquet: str) -> str:
- """Get the path from the data specification file to be read from
- parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have the path specified.
- :returns: Path.
- :rtype: str
- """
-
- try:
- return self._data_spec[parquet]["path"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have the path "
- f"specified.\n{err}"
- )
-
def _get_list_of_files(self,
path,
last_modified_begin=None,
@@ -147,8 +103,7 @@ class Data:
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
)
- if self._debug:
- logging.info("\n".join(file_list))
+ logging.debug("\n".join(file_list))
except NoFilesFound as err:
logging.error(f"No parquets found.\n{err}")
except EmptyDataFrame as err:
@@ -156,13 +111,16 @@ class Data:
return file_list
- def _create_dataframe_from_parquet(self,
- path, partition_filter=None,
- columns=None,
- validate_schema=False,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> DataFrame:
+ def _create_dataframe_from_parquet(
+ self,
+ path, partition_filter=None,
+ columns=None,
+ categories=list(),
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None
+ ) -> pd.DataFrame:
"""Read parquet stored in S3 compatible storage and returns Pandas
Dataframe.
@@ -176,6 +134,8 @@ class Data:
extracted from S3. This function MUST return a bool, True to read
the partition or False to ignore it. Ignored if dataset=False.
:param columns: Names of columns to read from the file(s).
+ :param categories: List of columns names that should be returned as
+ pandas.Categorical.
:param validate_schema: Check that individual file schemas are all the
same / compatible. Schemas within a folder prefix should all be the
same. Disable if you have schemas that are different and want to
@@ -189,6 +149,7 @@ class Data:
:type path: Union[str, List[str]]
:type partition_filter: Callable[[Dict[str, str]], bool], optional
:type columns: List[str], optional
+ :type categories: List[str], optional
:type validate_schema: bool, optional
:type last_modified_begin: datetime, optional
:type last_modified_end: datetime, optional
@@ -209,142 +170,89 @@ class Data:
use_threads=True,
dataset=True,
columns=columns,
+ # categories=categories,
partition_filter=partition_filter,
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
)
- if self._debug:
- df.info(verbose=True, memory_usage='deep')
- logging.info(
- f"\nCreation of dataframe {path} took: {time() - start}\n"
- )
+ df.info(verbose=True, memory_usage="deep")
+ logging.debug(
+ f"\nCreation of dataframe {path} took: {time() - start}\n"
+ )
except NoFilesFound as err:
logging.error(f"No parquets found.\n{err}")
except EmptyDataFrame as err:
logging.error(f"No data.\n{err}")
- self._data = df
return df
- def check_datasets(self, days: int=None):
- """Read structure from parquet.
+ def read_all_data(self, days: int=None) -> dict:
+ """Read all data necessary for all applications.
- :param days: Number of days back to the past for which the data will be
- read.
+ :param days: Number of days to filter. If None, all data will be
+ downloaded.
:type days: int
+ :returns: A dictionary where keys are names of parquets and values are
+ the pandas dataframes with fetched data.
+ :rtype: dict(str: pandas.DataFrame)
"""
- self._get_list_of_files(path=self._get_path("trending"), days=days)
- self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
- def read_stats(self, days: int=None) -> tuple:
- """Read statistics from parquet.
- It reads from:
- - Suite Result Analysis (SRA) partition,
- - NDRPDR trending partition,
- - MRR trending partition.
+ self._data = dict()
+ self._data["trending"] = pd.DataFrame()
+ self._data["iterative"] = pd.DataFrame()
+ lst_trending = list()
+ lst_iterative = list()
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: tuple of pandas DataFrame-s with data read from specified
- parquets.
- :rtype: tuple of pandas DataFrame-s
- """
-
- l_stats = lambda part: True if part["stats_type"] == "sra" else False
- l_mrr = lambda part: True if part["test_type"] == "mrr" else False
- l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return (
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=l_stats,
- columns=self._get_columns("statistics"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-mrr"),
- partition_filter=l_mrr,
- columns=self._get_columns("statistics-trending-mrr"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-ndrpdr"),
- partition_filter=l_ndrpdr,
- columns=self._get_columns("statistics-trending-ndrpdr"),
- days=days
+ for data_set in self._data_spec:
+ logging.info(
+ f"Reading data for {data_set['data_type']} "
+ f"{data_set['partition_name']} {data_set.get('release', '')}"
)
- )
-
- def read_trending_mrr(self, days: int=None) -> DataFrame:
- """Read MRR data partition from parquet.
+ partition_filter = lambda part: True \
+ if part[data_set["partition"]] == data_set["partition_name"] \
+ else False
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-mrr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr"),
- days=days
- )
-
- def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
- """Read NDRPDR data partition from iterative parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
+ data = self._create_dataframe_from_parquet(
+ path=data_set["path"],
+ partition_filter=partition_filter,
+ columns=data_set.get("columns", list()),
+ categories=data_set.get("categories", list()),
+ days=None if data_set["data_type"] == "iterative" else days
+ )
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+ if data_set["data_type"] == "statistics":
+ self._data["statistics"] = data
+ elif data_set["data_type"] == "trending":
+ lst_trending.append(data)
+ elif data_set["data_type"] == "iterative":
+ data["release"] = data_set["release"]
+ data["release"] = data["release"].astype("category")
+ lst_iterative.append(data)
+ else:
+ raise NotImplementedError(
+ f"The data type {data_set['data_type']} is not implemented."
+ )
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-ndrpdr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr"),
- days=days
+ self._data["iterative"] = pd.concat(
+ lst_iterative,
+ ignore_index=True,
+ copy=False
)
-
- def read_iterative_mrr(self, release: str) -> DataFrame:
- """Read MRR data partition from iterative parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-mrr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-mrr")
+ self._data["trending"] = pd.concat(
+ lst_trending,
+ ignore_index=True,
+ copy=False
)
- def read_iterative_ndrpdr(self, release: str) -> DataFrame:
- """Read NDRPDR data partition from parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
+ for key in self._data.keys():
+ logging.info(
+ f"\nData frame {key}:"
+ f"\n{self._data[key].memory_usage(deep=True)}\n"
+ )
+ self._data[key].info(verbose=True, memory_usage="deep")
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+ mem_alloc = \
+ resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+ logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-ndrpdr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-ndrpdr")
- )
+ return self._data
diff --git a/csit.infra.dash/app/cdash/data/data.yaml b/csit.infra.dash/app/cdash/data/data.yaml
index ec7f7ef1dd..846be6b628 100644
--- a/csit.infra.dash/app/cdash/data/data.yaml
+++ b/csit.infra.dash/app/cdash/data/data.yaml
@@ -1,11 +1,42 @@
-statistics:
+- data_type: statistics
+ partition: stats_type
+ partition_name: sra
path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/stats
columns:
- job
- build
- start_time
- duration
-statistics-trending-ndrpdr:
+ categories:
+ - job
+ - build
+- data_type: trending
+ partition: test_type
+ partition_name: mrr
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
+ columns:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - hosts
+ - start_time
+ - passed
+ - test_id
+ - version
+ - result_receive_rate_rate_avg
+ - result_receive_rate_rate_stdev
+ - result_receive_rate_rate_unit
+ - telemetry
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: trending
+ partition: test_type
+ partition_name: ndrpdr
path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
columns:
- job
@@ -16,10 +47,33 @@ statistics-trending-ndrpdr:
- start_time
- passed
- test_id
+ - version
+ - result_pdr_lower_rate_unit
- result_pdr_lower_rate_value
+ - result_ndr_lower_rate_unit
- result_ndr_lower_rate_value
-statistics-trending-mrr:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
+ - result_latency_reverse_pdr_90_hdrh
+ - result_latency_reverse_pdr_50_hdrh
+ - result_latency_reverse_pdr_10_hdrh
+ - result_latency_reverse_pdr_0_hdrh
+ - result_latency_forward_pdr_90_hdrh
+ - result_latency_forward_pdr_50_avg
+ - result_latency_forward_pdr_50_hdrh
+ - result_latency_forward_pdr_50_unit
+ - result_latency_forward_pdr_10_hdrh
+ - result_latency_forward_pdr_0_hdrh
+ - telemetry
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: mrr
+ release: rls2206
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2206
columns:
- job
- build
@@ -29,15 +83,47 @@ statistics-trending-mrr:
- start_time
- passed
- test_id
+ - version
- result_receive_rate_rate_avg
-trending:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
+ - result_receive_rate_rate_stdev
+ - result_receive_rate_rate_unit
+ - result_receive_rate_rate_values
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: mrr
+ release: rls2210
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2210
columns:
- job
- build
+ - dut_type
+ - dut_version
+ - hosts
- start_time
-trending-mrr:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
+ - passed
+ - test_id
+ - version
+ - result_receive_rate_rate_avg
+ - result_receive_rate_rate_stdev
+ - result_receive_rate_rate_unit
+ - result_receive_rate_rate_values
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: mrr
+ release: rls2302
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2302
columns:
- job
- build
@@ -51,9 +137,18 @@ trending-mrr:
- result_receive_rate_rate_avg
- result_receive_rate_rate_stdev
- result_receive_rate_rate_unit
- - telemetry
-trending-ndrpdr:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending
+ - result_receive_rate_rate_values
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: ndrpdr
+ release: rls2206
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2206
columns:
- job
- build
@@ -68,19 +163,19 @@ trending-ndrpdr:
- result_pdr_lower_rate_value
- result_ndr_lower_rate_unit
- result_ndr_lower_rate_value
- - result_latency_reverse_pdr_90_hdrh
- - result_latency_reverse_pdr_50_hdrh
- - result_latency_reverse_pdr_10_hdrh
- - result_latency_reverse_pdr_0_hdrh
- - result_latency_forward_pdr_90_hdrh
- result_latency_forward_pdr_50_avg
- - result_latency_forward_pdr_50_hdrh
- result_latency_forward_pdr_50_unit
- - result_latency_forward_pdr_10_hdrh
- - result_latency_forward_pdr_0_hdrh
- - telemetry
-iterative-mrr:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_{release}
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: ndrpdr
+ release: rls2210
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2210
columns:
- job
- build
@@ -91,12 +186,23 @@ iterative-mrr:
- passed
- test_id
- version
- - result_receive_rate_rate_avg
- - result_receive_rate_rate_stdev
- - result_receive_rate_rate_unit
- - result_receive_rate_rate_values
-iterative-ndrpdr:
- path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_{release}
+ - result_pdr_lower_rate_unit
+ - result_pdr_lower_rate_value
+ - result_ndr_lower_rate_unit
+ - result_ndr_lower_rate_value
+ - result_latency_forward_pdr_50_avg
+ - result_latency_forward_pdr_50_unit
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version
+- data_type: iterative
+ partition: test_type
+ partition_name: ndrpdr
+ release: rls2302
+ path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2302
columns:
- job
- build
@@ -113,7 +219,9 @@ iterative-ndrpdr:
- result_ndr_lower_rate_value
- result_latency_forward_pdr_50_avg
- result_latency_forward_pdr_50_unit
-# coverage-ndrpdr:
-# path: str
-# columns:
-# - list
+ categories:
+ - job
+ - build
+ - dut_type
+ - dut_version
+ - version