aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.dash/app/pal/data/data.py
diff options
context:
space:
mode:
authorpmikus <peter.mikus@protonmail.ch>2022-09-19 08:49:01 +0200
committerPeter Mikus <peter.mikus@protonmail.ch>2022-09-19 06:54:43 +0000
commitd6a60b5043c6f7c3dfc45853feb68d0aca5a4a5f (patch)
tree022343584d4e00a0da8ef12eb4e713d67344fc95 /csit.infra.dash/app/pal/data/data.py
parentd2ef7bc01df66f6a27f25d061db064cf4a463267 (diff)
feat(uti): Move directory
Signed-off-by: pmikus <peter.mikus@protonmail.ch> Change-Id: I7300ecfe756baaf3fbeedb020070f882cfaca445
Diffstat (limited to 'csit.infra.dash/app/pal/data/data.py')
-rw-r--r--csit.infra.dash/app/pal/data/data.py351
1 files changed, 351 insertions, 0 deletions
diff --git a/csit.infra.dash/app/pal/data/data.py b/csit.infra.dash/app/pal/data/data.py
new file mode 100644
index 0000000000..77fd113a9c
--- /dev/null
+++ b/csit.infra.dash/app/pal/data/data.py
@@ -0,0 +1,351 @@
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Prepare data for Plotly Dash applications.
+"""
+
+import logging
+import awswrangler as wr
+
+from yaml import load, FullLoader, YAMLError
+from datetime import datetime, timedelta
+from time import time
+from pytz import UTC
+from pandas import DataFrame
+from awswrangler.exceptions import EmptyDataFrame, NoFilesFound
+
+
+class Data:
+ """Gets the data from parquets and stores it for further use by dash
+ applications.
+ """
+
+ def __init__(self, data_spec_file: str, debug: bool=False) -> None:
+ """Initialize the Data object.
+
+ :param data_spec_file: Path to file specifying the data to be read from
+ parquets.
+ :param debug: If True, the debuf information is printed to stdout.
+ :type data_spec_file: str
+ :type debug: bool
+ :raises RuntimeError: if it is not possible to open data_spec_file or it
+ is not a valid yaml file.
+ """
+
+ # Inputs:
+ self._data_spec_file = data_spec_file
+ self._debug = debug
+
+ # Specification of data to be read from parquets:
+ self._data_spec = None
+
+ # Data frame to keep the data:
+ self._data = None
+
+ # Read from files:
+ try:
+ with open(self._data_spec_file, "r") as file_read:
+ self._data_spec = load(file_read, Loader=FullLoader)
+ except IOError as err:
+ raise RuntimeError(
+ f"Not possible to open the file {self._data_spec_file,}\n{err}"
+ )
+ except YAMLError as err:
+ raise RuntimeError(
+ f"An error occurred while parsing the specification file "
+ f"{self._data_spec_file,}\n"
+ f"{err}"
+ )
+
+ @property
+ def data(self):
+ return self._data
+
+ def _get_columns(self, parquet: str) -> list:
+ """Get the list of columns from the data specification file to be read
+ from parquets.
+
+ :param parquet: The parquet's name.
+ :type parquet: str
+ :raises RuntimeError: if the parquet is not defined in the data
+ specification file or it does not have any columns specified.
+ :returns: List of columns.
+ :rtype: list
+ """
+
+ try:
+ return self._data_spec[parquet]["columns"]
+ except KeyError as err:
+ raise RuntimeError(
+ f"The parquet {parquet} is not defined in the specification "
+ f"file {self._data_spec_file} or it does not have any columns "
+ f"specified.\n{err}"
+ )
+
+ def _get_path(self, parquet: str) -> str:
+ """Get the path from the data specification file to be read from
+ parquets.
+
+ :param parquet: The parquet's name.
+ :type parquet: str
+ :raises RuntimeError: if the parquet is not defined in the data
+ specification file or it does not have the path specified.
+ :returns: Path.
+ :rtype: str
+ """
+
+ try:
+ return self._data_spec[parquet]["path"]
+ except KeyError as err:
+ raise RuntimeError(
+ f"The parquet {parquet} is not defined in the specification "
+ f"file {self._data_spec_file} or it does not have the path "
+ f"specified.\n{err}"
+ )
+
+ def _get_list_of_files(self,
+ path,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None) -> list:
+ """Get list of interested files stored in S3 compatible storage and
+ returns it.
+
+ :param path: S3 prefix (accepts Unix shell-style wildcards)
+ (e.g. s3://bucket/prefix) or list of S3 objects paths
+ (e.g. [s3://bucket/key0, s3://bucket/key1]).
+ :param last_modified_begin: Filter the s3 files by the Last modified
+ date of the object. The filter is applied only after list all s3
+ files.
+ :param last_modified_end: Filter the s3 files by the Last modified date
+ of the object. The filter is applied only after list all s3 files.
+ :param days: Number of days to filter.
+ :type path: Union[str, List[str]]
+ :type last_modified_begin: datetime, optional
+ :type last_modified_end: datetime, optional
+ :type days: integer, optional
+ :returns: List of file names.
+ :rtype: List
+ """
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+ try:
+ file_list = wr.s3.list_objects(
+ path=path,
+ suffix="parquet",
+ last_modified_begin=last_modified_begin,
+ last_modified_end=last_modified_end
+ )
+ if self._debug:
+ logging.info("\n".join(file_list))
+ except NoFilesFound as err:
+ logging.error(f"No parquets found.\n{err}")
+ except EmptyDataFrame as err:
+ logging.error(f"No data.\n{err}")
+
+ return file_list
+
+ def _create_dataframe_from_parquet(self,
+ path, partition_filter=None,
+ columns=None,
+ validate_schema=False,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None) -> DataFrame:
+ """Read parquet stored in S3 compatible storage and returns Pandas
+ Dataframe.
+
+ :param path: S3 prefix (accepts Unix shell-style wildcards)
+ (e.g. s3://bucket/prefix) or list of S3 objects paths
+ (e.g. [s3://bucket/key0, s3://bucket/key1]).
+ :param partition_filter: Callback Function filters to apply on PARTITION
+ columns (PUSH-DOWN filter). This function MUST receive a single
+ argument (Dict[str, str]) where keys are partitions names and values
+ are partitions values. Partitions values will be always strings
+ extracted from S3. This function MUST return a bool, True to read
+ the partition or False to ignore it. Ignored if dataset=False.
+ :param columns: Names of columns to read from the file(s).
+ :param validate_schema: Check that individual file schemas are all the
+ same / compatible. Schemas within a folder prefix should all be the
+ same. Disable if you have schemas that are different and want to
+ disable this check.
+ :param last_modified_begin: Filter the s3 files by the Last modified
+ date of the object. The filter is applied only after list all s3
+ files.
+ :param last_modified_end: Filter the s3 files by the Last modified date
+ of the object. The filter is applied only after list all s3 files.
+ :param days: Number of days to filter.
+ :type path: Union[str, List[str]]
+ :type partition_filter: Callable[[Dict[str, str]], bool], optional
+ :type columns: List[str], optional
+ :type validate_schema: bool, optional
+ :type last_modified_begin: datetime, optional
+ :type last_modified_end: datetime, optional
+ :type days: integer, optional
+ :returns: Pandas DataFrame or None if DataFrame cannot be fetched.
+ :rtype: DataFrame
+ """
+ df = None
+ start = time()
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+ try:
+ df = wr.s3.read_parquet(
+ path=path,
+ path_suffix="parquet",
+ ignore_empty=True,
+ validate_schema=validate_schema,
+ use_threads=True,
+ dataset=True,
+ columns=columns,
+ partition_filter=partition_filter,
+ last_modified_begin=last_modified_begin,
+ last_modified_end=last_modified_end
+ )
+ if self._debug:
+ df.info(verbose=True, memory_usage='deep')
+ logging.info(
+ u"\n"
+ f"Creation of dataframe {path} took: {time() - start}"
+ u"\n"
+ )
+ except NoFilesFound as err:
+ logging.error(f"No parquets found.\n{err}")
+ except EmptyDataFrame as err:
+ logging.error(f"No data.\n{err}")
+
+ self._data = df
+ return df
+
+ def check_datasets(self, days: int=None):
+ """Read structure from parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ """
+ self._get_list_of_files(path=self._get_path("trending"), days=days)
+ self._get_list_of_files(path=self._get_path("statistics"), days=days)
+
+ def read_stats(self, days: int=None) -> tuple:
+ """Read statistics from parquet.
+
+ It reads from:
+ - Suite Result Analysis (SRA) partition,
+ - NDRPDR trending partition,
+ - MRR trending partition.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: tuple of pandas DataFrame-s with data read from specified
+ parquets.
+ :rtype: tuple of pandas DataFrame-s
+ """
+
+ l_stats = lambda part: True if part["stats_type"] == "sra" else False
+ l_mrr = lambda part: True if part["test_type"] == "mrr" else False
+ l_ndrpdr = lambda part: True if part["test_type"] == "ndrpdr" else False
+
+ return (
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics"),
+ partition_filter=l_stats,
+ columns=self._get_columns("statistics"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending-mrr"),
+ partition_filter=l_mrr,
+ columns=self._get_columns("statistics-trending-mrr"),
+ days=days
+ ),
+ self._create_dataframe_from_parquet(
+ path=self._get_path("statistics-trending-ndrpdr"),
+ partition_filter=l_ndrpdr,
+ columns=self._get_columns("statistics-trending-ndrpdr"),
+ days=days
+ )
+ )
+
+ def read_trending_mrr(self, days: int=None) -> DataFrame:
+ """Read MRR data partition from parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
+ """
+
+ lambda_f = lambda part: True if part["test_type"] == "mrr" else False
+
+ return self._create_dataframe_from_parquet(
+ path=self._get_path("trending-mrr"),
+ partition_filter=lambda_f,
+ columns=self._get_columns("trending-mrr"),
+ days=days
+ )
+
+ def read_trending_ndrpdr(self, days: int=None) -> DataFrame:
+ """Read NDRPDR data partition from iterative parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
+ """
+
+ lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+
+ return self._create_dataframe_from_parquet(
+ path=self._get_path("trending-ndrpdr"),
+ partition_filter=lambda_f,
+ columns=self._get_columns("trending-ndrpdr"),
+ days=days
+ )
+
+ def read_iterative_mrr(self, release: str) -> DataFrame:
+ """Read MRR data partition from iterative parquet.
+
+ :param release: The CSIT release from which the data will be read.
+ :type release: str
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
+ """
+
+ lambda_f = lambda part: True if part["test_type"] == "mrr" else False
+
+ return self._create_dataframe_from_parquet(
+ path=self._get_path("iterative-mrr").format(release=release),
+ partition_filter=lambda_f,
+ columns=self._get_columns("iterative-mrr")
+ )
+
+ def read_iterative_ndrpdr(self, release: str) -> DataFrame:
+ """Read NDRPDR data partition from parquet.
+
+ :param release: The CSIT release from which the data will be read.
+ :type release: str
+ :returns: Pandas DataFrame with read data.
+ :rtype: DataFrame
+ """
+
+ lambda_f = lambda part: True if part["test_type"] == "ndrpdr" else False
+
+ return self._create_dataframe_from_parquet(
+ path=self._get_path("iterative-ndrpdr").format(release=release),
+ partition_filter=lambda_f,
+ columns=self._get_columns("iterative-ndrpdr")
+ )