aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.dash/app/pal/data/data.py
diff options
context:
space:
mode:
authorpmikus <peter.mikus@protonmail.ch>2022-10-05 08:58:31 +0200
committerpmikus <peter.mikus@protonmail.ch>2022-10-05 08:58:31 +0200
commitaf8e703eb180e46ca65ff0c165a21f2261896548 (patch)
treee477719c9010ca3e8ed3ffa63ffe293a2734d358 /csit.infra.dash/app/pal/data/data.py
parent4d095b586bc4e249ab4e30e1a3f17b310f52a229 (diff)
fix(cdash): Rename
Signed-off-by: pmikus <peter.mikus@protonmail.ch> Change-Id: Ia6dff2674a28b42ebfbe91230587f1e175ae1137
Diffstat (limited to 'csit.infra.dash/app/pal/data/data.py')
-rw-r--r--csit.infra.dash/app/pal/data/data.py351
1 files changed, 0 insertions, 351 deletions
diff --git a/csit.infra.dash/app/pal/data/data.py b/csit.infra.dash/app/pal/data/data.py
deleted file mode 100644
index 77fd113a9c..0000000000
--- a/csit.infra.dash/app/pal/data/data.py
+++ /dev/null
@@ -1,351 +0,0 @@
-# Copyright (c) 2022 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Prepare data for Plotly Dash applications.
-"""
-
-import logging
-import awswrangler as wr
-
-from yaml import load, FullLoader, YAMLError
-from datetime import datetime, timedelta
-from time import time
-from pytz import UTC
-from pandas import DataFrame
-from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
-
-
-class Data:
- """Gets the data from parquets and stores it for further use by dash
- applications.
- """
-
- def __init__(self, data_spec_file: str, debug: bool=False) -> None:
- """Initialize the Data object.
-
- :param data_spec_file: Path to file specifying the data to be read from
- parquets.
- :param debug: If True, the debuf information is printed to stdout.
- :type data_spec_file: str
- :type debug: bool
- :raises RuntimeError: if it is not possible to open data_spec_file or it
- is not a valid yaml file.
- """
-
- # Inputs:
- self._data_spec_file = data_spec_file
- self._debug = debug
-
- # Specification of data to be read from parquets:
- self._data_spec = None
-
- # Data frame to keep the data:
- self._data = None
-
- # Read from files:
- try:
- with open(self._data_spec_file, "r") as file_read:
- self._data_spec = load(file_read, Loader=FullLoader)
- except IOError as err:
- raise RuntimeError(
- f"Not possible to open the file {self._data_spec_file,}\n{err}"
- )
- except YAMLError as err:
- raise RuntimeError(
- f"An error occurred while parsing the specification file "
- f"{self._data_spec_file,}\n"
- f"{err}"
- )
-
- @property
- def data(self):
- return self._data
-
- def _get_columns(self, parquet: str) -> list:
- """Get the list of columns from the data specification file to be read
- from parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have any columns specified.
- :returns: List of columns.
- :rtype: list
- """
-
- try:
- return self._data_spec[parquet]["columns"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have any columns "
- f"specified.\n{err}"
- )
-
- def _get_path(self, parquet: str) -> str:
- """Get the path from the data specification file to be read from
- parquets.
-
- :param parquet: The parquet's name.
- :type parquet: str
- :raises RuntimeError: if the parquet is not defined in the data
- specification file or it does not have the path specified.
- :returns: Path.
- :rtype: str
- """
-
- try:
- return self._data_spec[parquet]["path"]
- except KeyError as err:
- raise RuntimeError(
- f"The parquet {parquet} is not defined in the specification "
- f"file {self._data_spec_file} or it does not have the path "
- f"specified.\n{err}"
- )
-
- def _get_list_of_files(self,
- path,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> list:
- """Get list of interested files stored in S3 compatible storage and
- returns it.
-
- :param path: S3 prefix (accepts Unix shell-style wildcards)
- (e.g. s3://bucket/prefix) or list of S3 objects paths
- (e.g. [s3://bucket/key0, s3://bucket/key1]).
- :param last_modified_begin: Filter the s3 files by the Last modified
- date of the object. The filter is applied only after list all s3
- files.
- :param last_modified_end: Filter the s3 files by the Last modified date
- of the object. The filter is applied only after list all s3 files.
- :param days: Number of days to filter.
- :type path: Union[str, List[str]]
- :type last_modified_begin: datetime, optional
- :type last_modified_end: datetime, optional
- :type days: integer, optional
- :returns: List of file names.
- :rtype: List
- """
- if days:
- last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
- try:
- file_list = wr.s3.list_objects(
- path=path,
- suffix="parquet",
- last_modified_begin=last_modified_begin,
- last_modified_end=last_modified_end
- )
- if self._debug:
- logging.info("\n".join(file_list))
- except NoFilesFound as err:
- logging.error(f"No parquets found.\n{err}")
- except EmptyDataFrame as err:
- logging.error(f"No data.\n{err}")
-
- return file_list
-
- def _create_dataframe_from_parquet(self,
- path, partition_filter=None,
- columns=None,
- validate_schema=False,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> DataFrame:
- """Read parquet stored in S3 compatible storage and returns Pandas
- Dataframe.
-
- :param path: S3 prefix (accepts Unix shell-style wildcards)
- (e.g. s3://bucket/prefix) or list of S3 objects paths
- (e.g. [s3://bucket/key0, s3://bucket/key1]).
- :param partition_filter: Callback Function filters to apply on PARTITION
- columns (PUSH-DOWN filter). This function MUST receive a single
- argument (Dict[str, str]) where keys are partitions names and values
- are partitions values. Partitions values will be always strings
- extracted from S3. This function MUST return a bool, True to read
- the partition or False to ignore it. Ignored if dataset=False.
- :param columns: Names of columns to read from the file(s).
- :param validate_schema: Check that individual file schemas are all the
- same / compatible. Schemas within a folder prefix should all be the
- same. Disable if you have schemas that are different and want to
- disable this check.
- :param last_modified_begin: Filter the s3 files by the Last modified
- date of the object. The filter is applied only after list all s3
- files.
- :param last_modified_end: Filter the s3 files by the Last modified date
- of the object. The filter is applied only after list all s3 files.
- :param days: Number of days to filter.
- :type path: Union[str, List[str]]
- :type partition_filter: Callable[[Dict[str, str]], bool], optional
- :type columns: List[str], optional
- :type validate_schema: bool, optional
- :type last_modified_begin: datetime, optional
- :type last_modified_end: datetime, optional
- :type days: integer, optional
- :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
- :rtype: DataFrame
- """
- df = None
- start = time()
- if days:
- last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
- try:
- df = wr.s3.read_parquet(
- path=path,
- path_suffix="parquet",
- ignore_empty=True,
- validate_schema=validate_schema,
- use_threads=True,
- dataset=True,
- columns=columns,
- partition_filter=partition_filter,
- last_modified_begin=last_modified_begin,
- last_modified_end=last_modified_end
- )
- if self._debug:
- df.info(verbose=True, memory_usage='deep')
- logging.info(
- u"\n"
- f"Creation of dataframe {path} took: {time() - start}"
- u"\n"
- )
- except NoFilesFound as err:
- logging.error(f"No parquets found.\n{err}")
- except EmptyDataFrame as err:
- logging.error(f"No data.\n{err}")
-
- self._data = df
- return df
-
- def check_datasets(self, days: int=None):
- """Read structure from parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- """
- self._get_list_of_files(path=self._get_path("trending"), days=days)
- self._get_list_of_files(path=self._get_path("statistics"), days=days)
-
- def read_stats(self, days: int=None) -> tuple:
- """Read statistics from parquet.
-
- It reads from:
- - Suite Result Analysis (SRA) partition,
- - NDRPDR trending partition,
- - MRR trending partition.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: tuple of pandas DataFrame-s with data read from specified
- parquets.
- :rtype: tuple of pandas DataFrame-s
- """
-
- l_stats = lambda part: True if part["stats_type"] == "sra" else False
- l_mrr = lambda part: True if part["test_type"] == "mrr" else False
- l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return (
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics"),
- partition_filter=l_stats,
- columns=self._get_columns("statistics"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-mrr"),
- partition_filter=l_mrr,
- columns=self._get_columns("statistics-trending-mrr"),
- days=days
- ),
- self._create_dataframe_from_parquet(
- path=self._get_path("statistics-trending-ndrpdr"),
- partition_filter=l_ndrpdr,
- columns=self._get_columns("statistics-trending-ndrpdr"),
- days=days
- )
- )
-
- def read_trending_mrr(self, days: int=None) -> DataFrame:
- """Read MRR data partition from parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-mrr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-mrr"),
- days=days
- )
-
- def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
- """Read NDRPDR data partition from iterative parquet.
-
- :param days: Number of days back to the past for which the data will be
- read.
- :type days: int
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("trending-ndrpdr"),
- partition_filter=lambda_f,
- columns=self._get_columns("trending-ndrpdr"),
- days=days
- )
-
- def read_iterative_mrr(self, release: str) -> DataFrame:
- """Read MRR data partition from iterative parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "mrr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-mrr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-mrr")
- )
-
- def read_iterative_ndrpdr(self, release: str) -> DataFrame:
- """Read NDRPDR data partition from parquet.
-
- :param release: The CSIT release from which the data will be read.
- :type release: str
- :returns: Pandas DataFrame with read data.
- :rtype: DataFrame
- """
-
- lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
-
- return self._create_dataframe_from_parquet(
- path=self._get_path("iterative-ndrpdr").format(release=release),
- partition_filter=lambda_f,
- columns=self._get_columns("iterative-ndrpdr")
- )