aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.dash/app/cdash/data/data.py
diff options
context:
space:
mode:
Diffstat (limited to 'csit.infra.dash/app/cdash/data/data.py')
-rw-r--r--csit.infra.dash/app/cdash/data/data.py252
1 files changed, 80 insertions, 172 deletions
diff --git a/csit.infra.dash/app/cdash/data/data.py b/csit.infra.dash/app/cdash/data/data.py
index 7ddb44311a..8537cd8db1 100644
--- a/csit.infra.dash/app/cdash/data/data.py
+++ b/csit.infra.dash/app/cdash/data/data.py
@@ -15,13 +15,14 @@
"""
import logging
+import resource
import awswrangler as wr
+import pandas as pd
from yaml import load, FullLoader, YAMLError
from datetime import datetime, timedelta
from time import time
from pytz import UTC
-from pandas import DataFrame
from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
@@ -30,27 +31,24 @@ class Data:
applications.
"""
- def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+ def __init__(self, data_spec_file: str) -> None:
"""Initialize the Data object.
:param data_spec_file: Path to file specifying the data to be read from
parquets.
- :param debug: If True, the debuf information is printed to stdout.
:type data_spec_file: str
- :type debug: bool
:raises RuntimeError: if it is not possible to open data_spec_file or it
is not a valid yaml file.
"""
# Inputs:
self._data_spec_file = data_spec_file
- self._debug = debug
# Specification of data to be read from parquets:
- self._data_spec = None
+ self._data_spec = list()
# Data frame to keep the data:
- self._data = None
+ self._data = pd.DataFrame()
# Read from files:
try:
@@ -71,48 +69,6 @@ class Data:
def data(self):
return self._data
- def _get_columns(self, parquet: str) -> list:
- """Get the list of columns from the data specification file to be read
- from parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have any columns specified.
- :returns: List of columns.
- :rtype: list
- """
-
- try:
- return self._data_spec[parquet]["columns"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have any columns "
- f"specified.\n{err}"
- )
-
- def _get_path(self, parquet: str) -> str:
- """Get the path from the data specification file to be read from
- parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have the path specified.
- :returns: Path.
- :rtype: str
- """
-
- try:
- return self._data_spec[parquet]["path"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have the path "
- f"specified.\n{err}"
- )
-
def _get_list_of_files(self,
path,
last_modified_begin=None,
@@ -147,8 +103,7 @@ class Data:
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
)
- if self._debug:
- logging.info("\n".join(file_list))
+ logging.debug("\n".join(file_list))
except NoFilesFound as err:
logging.error(f"No parquets found.\n{err}")
except EmptyDataFrame as err:
@@ -156,13 +111,16 @@ class Data:
return file_list
- def _create_dataframe_from_parquet(self,
- path, partition_filter=None,
- columns=None,
- validate_schema=False,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> DataFrame:
+ def _create_dataframe_from_parquet(
+ self,
+ path, partition_filter=None,
+ columns=None,
+ categories=list(),
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None
+ ) -> pd.DataFrame:
"""Read parquet stored in S3 compatible storage and returns Pandas
Dataframe.
@@ -176,6 +134,8 @@ class Data:
extracted from S3. This function MUST return a bool, True to read
the partition or False to ignore it. Ignored if dataset=False.
:param columns: Names of columns to read from the file(s).
+ :param categories: List of columns names that should be returned as
+ pandas.Categorical.
:param validate_schema: Check that individual file schemas are all the
same / compatible. Schemas within a folder prefix should all be the
same. Disable if you have schemas that are different and want to
@@ -189,6 +149,7 @@ class Data:
:type path: Union[str, List[str]]
:type partition_filter: Callable[[Dict[str, str]], bool], optional
:type columns: List[str], optional
+ :type categories: List[str], optional
:type validate_schema: bool, optional
:type last_modified_begin: datetime, optional
:type last_modified_end: datetime, optional
@@ -209,142 +170,89 @@ class Data:
use_threads=True,
dataset=True,
columns=columns,
+ # categories=categories,
partition_filter=partition_filter,
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
)
- if self._debug:
- df.info(verbose=True, memory_usage='deep')
- logging.info(
- f"\nCreation of dataframe {path} took: {time() - start}\n"
- )
+ df.info(verbose=True, memory_usage="deep")
+ logging.debug(
+ f"\nCreation of dataframe {path} took: {time() - start}\n"
+ )
except NoFilesFound as err:
logging.error(f"No parquets found.\n{err}")
except EmptyDataFrame as err:
logging.error(f"No data.\n{err}")
- self._data = df
return df
- def check_datasets(self, days: int=None):
- """Read structure from parquet.
+ def read_all_data(self, days: int=None) -> dict:
+ """Read all data necessary for all applications.
- :param days: Number of days back to the past for which the data will be
- read.
+ :param days: Number of days to filter. If None, all data will be
+ downloaded.
:type days: int
+ :returns: A dictionary where keys are names of parquets and values are
+ the pandas dataframes with fetched data.
+ :rtype: dict(str: pandas.DataFrame)
"""
- self._get_list_of_files(path=self._get_path("trending"), days=days)
- self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
- def read_stats(self, days: int=None) -> tuple:
- """Read statistics from parquet.
- It reads from:
- - Suite Result Analysis (SRA) partition,
- - NDRPDR trending partition,
- - MRR trending partition.
+ self._data = dict()
+ self._data["trending"] = pd.DataFrame()
+ self._data["iterative"] = pd.DataFrame()
+ lst_trending = list()
+ lst_iterative = list()
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: tuple of pandas DataFrame-s with data read from specified
- parquets.
- :rtype: tuple of pandas DataFrame-s
- """
-
- l_stats = lambda part: True if part["stats_type"] == "sra" else False
- l_mrr = lambda part: True if part["test_type"] == "mrr" else False
- l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return (
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=l_stats,
- columns=self._get_columns("statistics"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-mrr"),
- partition_filter=l_mrr,
- columns=self._get_columns("statistics-trending-mrr"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-ndrpdr"),
- partition_filter=l_ndrpdr,
- columns=self._get_columns("statistics-trending-ndrpdr"),
- days=days
+ for data_set in self._data_spec:
+ logging.info(
+ f"Reading data for {data_set['data_type']} "
+ f"{data_set['partition_name']} {data_set.get('release', '')}"
)
- )
-
- def read_trending_mrr(self, days: int=None) -> DataFrame:
- """Read MRR data partition from parquet.
+ partition_filter = lambda part: True \
+ if part[data_set["partition"]] == data_set["partition_name"] \
+ else False
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-mrr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr"),
- days=days
- )
-
- def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
- """Read NDRPDR data partition from iterative parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
+ data = self._create_dataframe_from_parquet(
+ path=data_set["path"],
+ partition_filter=partition_filter,
+ columns=data_set.get("columns", list()),
+ categories=data_set.get("categories", list()),
+ days=None if data_set["data_type"] == "iterative" else days
+ )
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+ if data_set["data_type"] == "statistics":
+ self._data["statistics"] = data
+ elif data_set["data_type"] == "trending":
+ lst_trending.append(data)
+ elif data_set["data_type"] == "iterative":
+ data["release"] = data_set["release"]
+ data["release"] = data["release"].astype("category")
+ lst_iterative.append(data)
+ else:
+ raise NotImplementedError(
+ f"The data type {data_set['data_type']} is not implemented."
+ )
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-ndrpdr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr"),
- days=days
+ self._data["iterative"] = pd.concat(
+ lst_iterative,
+ ignore_index=True,
+ copy=False
)
-
- def read_iterative_mrr(self, release: str) -> DataFrame:
- """Read MRR data partition from iterative parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-mrr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-mrr")
+ self._data["trending"] = pd.concat(
+ lst_trending,
+ ignore_index=True,
+ copy=False
)
- def read_iterative_ndrpdr(self, release: str) -> DataFrame:
- """Read NDRPDR data partition from parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
+ for key in self._data.keys():
+ logging.info(
+ f"\nData frame {key}:"
+ f"\n{self._data[key].memory_usage(deep=True)}\n"
+ )
+ self._data[key].info(verbose=True, memory_usage="deep")
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+ mem_alloc = \
+ resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1000
+ logging.info(f"Memory allocation: {mem_alloc:.0f}MB")
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-ndrpdr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-ndrpdr")
- )
+ return self._data