aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools/dash/app/pal/data/data.py
diff options
context:
space:
mode:
authorPeter Mikus <pmikus@cisco.com>2022-08-05 08:01:59 +0000
committerPeter Mikus <pmikus@cisco.com>2022-08-05 08:01:59 +0000
commit22a45eda880bee367ff8937d8e764cd41905a0cd (patch)
tree336ccc164c84a4924084debc097205fc1e69b9e8 /resources/tools/dash/app/pal/data/data.py
parent1ec5035f813e726b4998be28a4df81327606ca85 (diff)
feat(uti): Add some more debug tools
Signed-off-by: Peter Mikus <pmikus@cisco.com> Change-Id: I1dfe1782334c8415fe5dbcdba24781947076639d
Diffstat (limited to 'resources/tools/dash/app/pal/data/data.py')
-rw-r--r--resources/tools/dash/app/pal/data/data.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/resources/tools/dash/app/pal/data/data.py b/resources/tools/dash/app/pal/data/data.py
index 0956333e34..77fd113a9c 100644
--- a/resources/tools/dash/app/pal/data/data.py
+++ b/resources/tools/dash/app/pal/data/data.py
@@ -113,6 +113,48 @@ class Data:
f"specified.\n{err}"
)
+ def _get_list_of_files(self,
+ path,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None) -> list:
+ """Get list of interested files stored in S3 compatible storage and
+ returns it.
+
+ :param path: S3 prefix (accepts Unix shell-style wildcards)
+ (e.g. s3://bucket/prefix) or list of S3 objects paths
+ (e.g. [s3://bucket/key0, s3://bucket/key1]).
+ :param last_modified_begin: Filter the s3 files by the Last modified
+ date of the object. The filter is applied only after list all s3
+ files.
+ :param last_modified_end: Filter the s3 files by the Last modified date
+ of the object. The filter is applied only after list all s3 files.
+ :param days: Number of days to filter.
+ :type path: Union[str, List[str]]
+ :type last_modified_begin: datetime, optional
+ :type last_modified_end: datetime, optional
+ :type days: integer, optional
+ :returns: List of file names.
+ :rtype: List
+ """
+ if days:
+ last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
+ try:
+ file_list = wr.s3.list_objects(
+ path=path,
+ suffix="parquet",
+ last_modified_begin=last_modified_begin,
+ last_modified_end=last_modified_end
+ )
+ if self._debug:
+ logging.info("\n".join(file_list))
+ except NoFilesFound as err:
+ logging.error(f"No parquets found.\n{err}")
+ except EmptyDataFrame as err:
+ logging.error(f"No data.\n{err}")
+
+ return file_list
+
def _create_dataframe_from_parquet(self,
path, partition_filter=None,
columns=None,
@@ -142,12 +184,14 @@ class Data:
files.
:param last_modified_end: Filter the s3 files by the Last modified date
of the object. The filter is applied only after list all s3 files.
+ :param days: Number of days to filter.
:type path: Union[str, List[str]]
:type partition_filter: Callable[[Dict[str, str]], bool], optional
:type columns: List[str], optional
:type validate_schema: bool, optional
:type last_modified_begin: datetime, optional
:type last_modified_end: datetime, optional
+ :type days: integer, optional
:returns: Pandas DataFrame or None if DataFrame cannot be fetched.
:rtype: DataFrame
"""
@@ -183,6 +227,16 @@ class Data:
self._data = df
return df
+ def check_datasets(self, days: int=None):
+ """Read structure from parquet.
+
+ :param days: Number of days back to the past for which the data will be
+ read.
+ :type days: int
+ """
+ self._get_list_of_files(path=self._get_path("trending"), days=days)
+ self._get_list_of_files(path=self._get_path("statistics"), days=days)
+
def read_stats(self, days: int=None) -> tuple:
"""Read statistics from parquet.