diff options
author | Tibor Frank <tifrank@cisco.com> | 2023-01-27 08:26:25 +0100 |
---|---|---|
committer | Peter Mikus <peter.mikus@protonmail.ch> | 2023-02-01 08:34:12 +0000 |
commit | c31372861134f29ae6eec8d98874e030e57ab5f1 (patch) | |
tree | e6a42ba2826dda42b52abcd7ad8297e11b9fd639 /csit.infra.dash/app/cdash/data | |
parent | 20432cc3b4321f16c82e22ac54d6bf979391ee71 (diff) |
C-Dash: Pre-load the data from parquets
Signed-off-by: Tibor Frank <tifrank@cisco.com>
Change-Id: I20792792469c10d1db2e891b76879ec8ced1b7d3
Diffstat (limited to 'csit.infra.dash/app/cdash/data')
-rw-r--r-- | csit.infra.dash/app/cdash/data/data.py | 252 | ||||
-rw-r--r-- | csit.infra.dash/app/cdash/data/data.yaml | 172 |
2 files changed, 220 insertions, 204 deletions
diff --git a/csit.infra.dash/app/cdash/data/data.py b/csit.infra.dash/app/cdash/data/data.py index 7ddb44311a..8537cd8db1 100644 --- a/csit.infra.dash/app/cdash/data/data.py +++ b/csit.infra.dash/app/cdash/data/data.py @@ -15,13 +15,14 @@ """ import logging +import resource import awswrangler as wr +import pandas as pd from yaml import load, FullLoader, YAMLError from datetime import datetime, timedelta from time import time from pytz import UTC -from pandas import DataFrame from awswrangler.exceptions import EmptyDataFrame, NoFilesFound @@ -30,27 +31,24 @@ class Data: applications. """ - def __init__(self, data_spec_file: str, debug: bool=False) -> None: + def __init__(self, data_spec_file: str) -> None: """Initialize the Data object. :param data_spec_file: Path to file specifying the data to be read from parquets. - :param debug: If True, the debuf information is printed to stdout. :type data_spec_file: str - :type debug: bool :raises RuntimeError: if it is not possible to open data_spec_file or it is not a valid yaml file. """ # Inputs: self._data_spec_file = data_spec_file - self._debug = debug # Specification of data to be read from parquets: - self._data_spec = None + self._data_spec = list() # Data frame to keep the data: - self._data = None + self._data = pd.DataFrame() # Read from files: try: @@ -71,48 +69,6 @@ class Data: def data(self): return self._data - def _get_columns(self, parquet: str) -> list: - """Get the list of columns from the data specification file to be read - from parquets. - - :param parquet: The parquet's name. - :type parquet: str - :raises RuntimeError: if the parquet is not defined in the data - specification file or it does not have any columns specified. - :returns: List of columns. - :rtype: list - """ - - try: - return self._data_spec[parquet]["columns"] - except KeyError as err: - raise RuntimeError( - f"The parquet {parquet} is not defined in the specification " - f"file {self._data_spec_file} or it does not have any columns " - f"specified.\n{err}" - ) - - def _get_path(self, parquet: str) -> str: - """Get the path from the data specification file to be read from - parquets. - - :param parquet: The parquet's name. - :type parquet: str - :raises RuntimeError: if the parquet is not defined in the data - specification file or it does not have the path specified. - :returns: Path. - :rtype: str - """ - - try: - return self._data_spec[parquet]["path"] - except KeyError as err: - raise RuntimeError( - f"The parquet {parquet} is not defined in the specification " - f"file {self._data_spec_file} or it does not have the path " - f"specified.\n{err}" - ) - def _get_list_of_files(self, path, last_modified_begin=None, @@ -147,8 +103,7 @@ class Data: last_modified_begin=last_modified_begin, last_modified_end=last_modified_end ) - if self._debug: - logging.info("\n".join(file_list)) + logging.debug("\n".join(file_list)) except NoFilesFound as err: logging.error(f"No parquets found.\n{err}") except EmptyDataFrame as err: @@ -156,13 +111,16 @@ class Data: return file_list - def _create_dataframe_from_parquet(self, - path, partition_filter=None, - columns=None, - validate_schema=False, - last_modified_begin=None, - last_modified_end=None, - days=None) -> DataFrame: + def _create_dataframe_from_parquet( + self, + path, partition_filter=None, + columns=None, + categories=list(), + validate_schema=False, + last_modified_begin=None, + last_modified_end=None, + days=None + ) -> pd.DataFrame: """Read parquet stored in S3 compatible storage and returns Pandas Dataframe. @@ -176,6 +134,8 @@ class Data: extracted from S3. This function MUST return a bool, True to read the partition or False to ignore it. Ignored if dataset=False. :param columns: Names of columns to read from the file(s). + :param categories: List of columns names that should be returned as + pandas.Categorical. :param validate_schema: Check that individual file schemas are all the same / compatible. Schemas within a folder prefix should all be the same. Disable if you have schemas that are different and want to @@ -189,6 +149,7 @@ class Data: :type path: Union[str, List[str]] :type partition_filter: Callable[[Dict[str, str]], bool], optional :type columns: List[str], optional + :type categories: List[str], optional :type validate_schema: bool, optional :type last_modified_begin: datetime, optional :type last_modified_end: datetime, optional @@ -209,142 +170,89 @@ class Data: use_threads=True, dataset=True, columns=columns, + # categories=categories, partition_filter=partition_filter, last_modified_begin=last_modified_begin, last_modified_end=last_modified_end ) - if self._debug: - df.info(verbose=True, memory_usage='deep') - logging.info( - f"\nCreation of dataframe {path} took: {time() - start}\n" - ) + df.info(verbose=True, memory_usage="deep") + logging.debug( + f"\nCreation of dataframe {path} took: {time() - start}\n" + ) except NoFilesFound as err: logging.error(f"No parquets found.\n{err}") except EmptyDataFrame as err: logging.error(f"No data.\n{err}") - self._data = df return df - def check_datasets(self, days: int=None): - """Read structure from parquet. + def read_all_data(self, days: int=None) -> dict: + """Read all data necessary for all applications. - :param days: Number of days back to the past for which the data will be - read. + :param days: Number of days to filter. If None, all data will be + downloaded. :type days: int + :returns: A dictionary where keys are names of parquets and values are + the pandas dataframes with fetched data. + :rtype: dict(str: pandas.DataFrame) """ - self._get_list_of_files(path=self._get_path("trending"), days=days) - self._get_list_of_files(path=self._get_path("statistics"), days=days) - - def read_stats(self, days: int=None) -> tuple: - """Read statistics from parquet. - It reads from: - - Suite Result Analysis (SRA) partition, - - NDRPDR trending partition, - - MRR trending partition. + self._data = dict() + self._data["trending"] = pd.DataFrame() + self._data["iterative"] = pd.DataFrame() + lst_trending = list() + lst_iterative = list() - :param days: Number of days back to the past for which the data will be - read. - :type days: int - :returns: tuple of pandas DataFrame-s with data read from specified - parquets. - :rtype: tuple of pandas DataFrame-s - """ - - l_stats = lambda part: True if part["stats_type"] == "sra" else False - l_mrr = lambda part: True if part["test_type"] == "mrr" else False - l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False - - return ( - self._create_dataframe_from_parquet( - path=self._get_path("statistics"), - partition_filter=l_stats, - columns=self._get_columns("statistics"), - days=days - ), - self._create_dataframe_from_parquet( - path=self._get_path("statistics-trending-mrr"), - partition_filter=l_mrr, - columns=self._get_columns("statistics-trending-mrr"), - days=days - ), - self._create_dataframe_from_parquet( - path=self._get_path("statistics-trending-ndrpdr"), - partition_filter=l_ndrpdr, - columns=self._get_columns("statistics-trending-ndrpdr"), - days=days + for data_set in self._data_spec: + logging.info( + f"Reading data for {data_set['data_type']} " + f"{data_set['partition_name']} {data_set.get('release', '')}" ) - ) - - def read_trending_mrr(self, days: int=None) -> DataFrame: - """Read MRR data partition from parquet. + partition_filter = lambda part: True \ + if part[data_set["partition"]] == data_set["partition_name"] \ + else False - :param days: Number of days back to the past for which the data will be - read. - :type days: int - :returns: Pandas DataFrame with read data. - :rtype: DataFrame - """ - - lambda_f = lambda part: True if part["test_type"] == "mrr" else False - - return self._create_dataframe_from_parquet( - path=self._get_path("trending-mrr"), - partition_filter=lambda_f, - columns=self._get_columns("trending-mrr"), - days=days - ) - - def read_trending_ndrpdr(self, days: int=None) -> DataFrame: - """Read NDRPDR data partition from iterative parquet. - - :param days: Number of days back to the past for which the data will be - read. - :type days: int - :returns: Pandas DataFrame with read data. - :rtype: DataFrame - """ + data = self._create_dataframe_from_parquet( + path=data_set["path"], + partition_filter=partition_filter, + columns=data_set.get("columns", list()), + categories=data_set.get("categories", list()), + days=None if data_set["data_type"] == "iterative" else days + ) - lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False + if data_set["data_type"] == "statistics": + self._data["statistics"] = data + elif data_set["data_type"] == "trending": + lst_trending.append(data) + elif data_set["data_type"] == "iterative": + data["release"] = data_set["release"] + data["release"] = data["release"].astype("category") + lst_iterative.append(data) + else: + raise NotImplementedError( + f"The data type {data_set['data_type']} is not implemented." + ) - return self._create_dataframe_from_parquet( - path=self._get_path("trending-ndrpdr"), - partition_filter=lambda_f, - columns=self._get_columns("trending-ndrpdr"), - days=days + self._data["iterative"] = pd.concat( + lst_iterative, + ignore_index=True, + copy=False ) - - def read_iterative_mrr(self, release: str) -> DataFrame: - """Read MRR data partition from iterative parquet. - - :param release: The CSIT release from which the data will be read. - :type release: str - :returns: Pandas DataFrame with read data. - :rtype: DataFrame - """ - - lambda_f = lambda part: True if part["test_type"] == "mrr" else False - - return self._create_dataframe_from_parquet( - path=self._get_path("iterative-mrr").format(release=release), - partition_filter=lambda_f, - columns=self._get_columns("iterative-mrr") + self._data["trending"] = pd.concat( + lst_trending, + ignore_index=True, + copy=False ) - def read_iterative_ndrpdr(self, release: str) -> DataFrame: - """Read NDRPDR data partition from parquet. - - :param release: The CSIT release from which the data will be read. - :type release: str - :returns: Pandas DataFrame with read data. - :rtype: DataFrame - """ + for key in self._data.keys(): + logging.info( + f"\nData frame {key}:" + f"\n{self._data[key].memory_usage(deep=True)}\n" + ) + self._data[key].info(verbose=True, memory_usage="deep") - lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False + mem_alloc = \ + resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000 + logging.info(f"Memory allocation: {mem_alloc:.0f}MB") - return self._create_dataframe_from_parquet( - path=self._get_path("iterative-ndrpdr").format(release=release), - partition_filter=lambda_f, - columns=self._get_columns("iterative-ndrpdr") - ) + return self._data diff --git a/csit.infra.dash/app/cdash/data/data.yaml b/csit.infra.dash/app/cdash/data/data.yaml index ec7f7ef1dd..846be6b628 100644 --- a/csit.infra.dash/app/cdash/data/data.yaml +++ b/csit.infra.dash/app/cdash/data/data.yaml @@ -1,11 +1,42 @@ -statistics: +- data_type: statistics + partition: stats_type + partition_name: sra path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/stats columns: - job - build - start_time - duration -statistics-trending-ndrpdr: + categories: + - job + - build +- data_type: trending + partition: test_type + partition_name: mrr + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending + columns: + - job + - build + - dut_type + - dut_version + - hosts + - start_time + - passed + - test_id + - version + - result_receive_rate_rate_avg + - result_receive_rate_rate_stdev + - result_receive_rate_rate_unit + - telemetry + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: trending + partition: test_type + partition_name: ndrpdr path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending columns: - job @@ -16,10 +47,33 @@ statistics-trending-ndrpdr: - start_time - passed - test_id + - version + - result_pdr_lower_rate_unit - result_pdr_lower_rate_value + - result_ndr_lower_rate_unit - result_ndr_lower_rate_value -statistics-trending-mrr: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending + - result_latency_reverse_pdr_90_hdrh + - result_latency_reverse_pdr_50_hdrh + - result_latency_reverse_pdr_10_hdrh + - result_latency_reverse_pdr_0_hdrh + - result_latency_forward_pdr_90_hdrh + - result_latency_forward_pdr_50_avg + - result_latency_forward_pdr_50_hdrh + - result_latency_forward_pdr_50_unit + - result_latency_forward_pdr_10_hdrh + - result_latency_forward_pdr_0_hdrh + - telemetry + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: mrr + release: rls2206 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2206 columns: - job - build @@ -29,15 +83,47 @@ statistics-trending-mrr: - start_time - passed - test_id + - version - result_receive_rate_rate_avg -trending: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending + - result_receive_rate_rate_stdev + - result_receive_rate_rate_unit + - result_receive_rate_rate_values + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: mrr + release: rls2210 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2210 columns: - job - build + - dut_type + - dut_version + - hosts - start_time -trending-mrr: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending + - passed + - test_id + - version + - result_receive_rate_rate_avg + - result_receive_rate_rate_stdev + - result_receive_rate_rate_unit + - result_receive_rate_rate_values + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: mrr + release: rls2302 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2302 columns: - job - build @@ -51,9 +137,18 @@ trending-mrr: - result_receive_rate_rate_avg - result_receive_rate_rate_stdev - result_receive_rate_rate_unit - - telemetry -trending-ndrpdr: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/trending + - result_receive_rate_rate_values + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: ndrpdr + release: rls2206 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2206 columns: - job - build @@ -68,19 +163,19 @@ trending-ndrpdr: - result_pdr_lower_rate_value - result_ndr_lower_rate_unit - result_ndr_lower_rate_value - - result_latency_reverse_pdr_90_hdrh - - result_latency_reverse_pdr_50_hdrh - - result_latency_reverse_pdr_10_hdrh - - result_latency_reverse_pdr_0_hdrh - - result_latency_forward_pdr_90_hdrh - result_latency_forward_pdr_50_avg - - result_latency_forward_pdr_50_hdrh - result_latency_forward_pdr_50_unit - - result_latency_forward_pdr_10_hdrh - - result_latency_forward_pdr_0_hdrh - - telemetry -iterative-mrr: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_{release} + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: ndrpdr + release: rls2210 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2210 columns: - job - build @@ -91,12 +186,23 @@ iterative-mrr: - passed - test_id - version - - result_receive_rate_rate_avg - - result_receive_rate_rate_stdev - - result_receive_rate_rate_unit - - result_receive_rate_rate_values -iterative-ndrpdr: - path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_{release} + - result_pdr_lower_rate_unit + - result_pdr_lower_rate_value + - result_ndr_lower_rate_unit + - result_ndr_lower_rate_value + - result_latency_forward_pdr_50_avg + - result_latency_forward_pdr_50_unit + categories: + - job + - build + - dut_type + - dut_version + - version +- data_type: iterative + partition: test_type + partition_name: ndrpdr + release: rls2302 + path: s3://fdio-docs-s3-cloudfront-index/csit/parquet/iterative_rls2302 columns: - job - build @@ -113,7 +219,9 @@ iterative-ndrpdr: - result_ndr_lower_rate_value - result_latency_forward_pdr_50_avg - result_latency_forward_pdr_50_unit -# coverage-ndrpdr: -# path: str -# columns: -# - list + categories: + - job + - build + - dut_type + - dut_version + - version |