diff options
author | Peter Mikus <pmikus@cisco.com> | 2022-08-05 08:01:59 +0000 |
---|---|---|
committer | Peter Mikus <pmikus@cisco.com> | 2022-08-05 08:01:59 +0000 |
commit | 22a45eda880bee367ff8937d8e764cd41905a0cd (patch) | |
tree | 336ccc164c84a4924084debc097205fc1e69b9e8 /resources/tools/dash/app/pal/data/data.py | |
parent | 1ec5035f813e726b4998be28a4df81327606ca85 (diff) |
feat(uti): Add some more debug tools
Signed-off-by: Peter Mikus <pmikus@cisco.com>
Change-Id: I1dfe1782334c8415fe5dbcdba24781947076639d
Diffstat (limited to 'resources/tools/dash/app/pal/data/data.py')
-rw-r--r-- | resources/tools/dash/app/pal/data/data.py | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/resources/tools/dash/app/pal/data/data.py b/resources/tools/dash/app/pal/data/data.py index 0956333e34..77fd113a9c 100644 --- a/resources/tools/dash/app/pal/data/data.py +++ b/resources/tools/dash/app/pal/data/data.py @@ -113,6 +113,48 @@ class Data: f"specified.\n{err}" ) + def _get_list_of_files(self, + path, + last_modified_begin=None, + last_modified_end=None, + days=None) -> list: + """Get list of interested files stored in S3 compatible storage and + returns it. + + :param path: S3 prefix (accepts Unix shell-style wildcards) + (e.g. s3://bucket/prefix) or list of S3 objects paths + (e.g. [s3://bucket/key0, s3://bucket/key1]). + :param last_modified_begin: Filter the s3 files by the Last modified + date of the object. The filter is applied only after list all s3 + files. + :param last_modified_end: Filter the s3 files by the Last modified date + of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. + :type path: Union[str, List[str]] + :type last_modified_begin: datetime, optional + :type last_modified_end: datetime, optional + :type days: integer, optional + :returns: List of file names. + :rtype: List + """ + if days: + last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days) + try: + file_list = wr.s3.list_objects( + path=path, + suffix="parquet", + last_modified_begin=last_modified_begin, + last_modified_end=last_modified_end + ) + if self._debug: + logging.info("\n".join(file_list)) + except NoFilesFound as err: + logging.error(f"No parquets found.\n{err}") + except EmptyDataFrame as err: + logging.error(f"No data.\n{err}") + + return file_list + def _create_dataframe_from_parquet(self, path, partition_filter=None, columns=None, @@ -142,12 +184,14 @@ class Data: files. :param last_modified_end: Filter the s3 files by the Last modified date of the object. The filter is applied only after list all s3 files. + :param days: Number of days to filter. :type path: Union[str, List[str]] :type partition_filter: Callable[[Dict[str, str]], bool], optional :type columns: List[str], optional :type validate_schema: bool, optional :type last_modified_begin: datetime, optional :type last_modified_end: datetime, optional + :type days: integer, optional :returns: Pandas DataFrame or None if DataFrame cannot be fetched. :rtype: DataFrame """ @@ -183,6 +227,16 @@ class Data: self._data = df return df + def check_datasets(self, days: int=None): + """Read structure from parquet. + + :param days: Number of days back to the past for which the data will be + read. + :type days: int + """ + self._get_list_of_files(path=self._get_path("trending"), days=days) + self._get_list_of_files(path=self._get_path("statistics"), days=days) + def read_stats(self, days: int=None) -> tuple: """Read statistics from parquet. |