aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.dash/app/cdash/data/data.py
diff options
context:
space:
mode:
Diffstat (limited to 'csit.infra.dash/app/cdash/data/data.py')
-rw-r--r--csit.infra.dash/app/cdash/data/data.py67
1 files changed, 47 insertions, 20 deletions
diff --git a/csit.infra.dash/app/cdash/data/data.py b/csit.infra.dash/app/cdash/data/data.py
index 8537cd8db1..c8d5907200 100644
--- a/csit.infra.dash/app/cdash/data/data.py
+++ b/csit.infra.dash/app/cdash/data/data.py
@@ -48,7 +48,12 @@ class Data:
self._data_spec = list()
# Data frame to keep the data:
- self._data = pd.DataFrame()
+ self._data = {
+ "statistics": pd.DataFrame(),
+ "trending": pd.DataFrame(),
+ "iterative": pd.DataFrame(),
+ "coverage": pd.DataFrame()
+ }
# Read from files:
try:
@@ -69,11 +74,13 @@ class Data:
def data(self):
return self._data
- def _get_list_of_files(self,
- path,
- last_modified_begin=None,
- last_modified_end=None,
- days=None) -> list:
+ @staticmethod
+ def _get_list_of_files(
+ path,
+ last_modified_begin=None,
+ last_modified_end=None,
+ days=None
+ ) -> list:
"""Get list of interested files stored in S3 compatible storage and
returns it.
@@ -111,11 +118,11 @@ class Data:
return file_list
+ @staticmethod
def _create_dataframe_from_parquet(
- self,
path, partition_filter=None,
columns=None,
- categories=list(),
+ categories=None,
validate_schema=False,
last_modified_begin=None,
last_modified_end=None,
@@ -157,7 +164,7 @@ class Data:
:returns: Pandas DataFrame or None if DataFrame cannot be fetched.
:rtype: DataFrame
"""
- df = None
+ df = pd.DataFrame()
start = time()
if days:
last_modified_begin = datetime.now(tz=UTC) - timedelta(days=days)
@@ -170,7 +177,7 @@ class Data:
use_threads=True,
dataset=True,
columns=columns,
- # categories=categories,
+ categories=categories,
partition_filter=partition_filter,
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end
@@ -180,9 +187,19 @@ class Data:
f"\nCreation of dataframe {path} took: {time() - start}\n"
)
except NoFilesFound as err:
- logging.error(f"No parquets found.\n{err}")
+ logging.error(
+ f"No parquets found in specified time period.\n"
+ f"Nr of days: {days}\n"
+ f"last_modified_begin: {last_modified_begin}\n"
+ f"{err}"
+ )
except EmptyDataFrame as err:
- logging.error(f"No data.\n{err}")
+ logging.error(
+ f"No data in parquets in specified time period.\n"
+ f"Nr of days: {days}\n"
+ f"last_modified_begin: {last_modified_begin}\n"
+ f"{err}"
+ )
return df
@@ -197,11 +214,9 @@ class Data:
:rtype: dict(str: pandas.DataFrame)
"""
- self._data = dict()
- self._data["trending"] = pd.DataFrame()
- self._data["iterative"] = pd.DataFrame()
lst_trending = list()
lst_iterative = list()
+ lst_coverage = list()
for data_set in self._data_spec:
logging.info(
@@ -211,13 +226,16 @@ class Data:
partition_filter = lambda part: True \
if part[data_set["partition"]] == data_set["partition_name"] \
else False
-
- data = self._create_dataframe_from_parquet(
+ if data_set["data_type"] in ("trending", "statistics"):
+ time_period = days
+ else:
+ time_period = None
+ data = Data._create_dataframe_from_parquet(
path=data_set["path"],
partition_filter=partition_filter,
- columns=data_set.get("columns", list()),
- categories=data_set.get("categories", list()),
- days=None if data_set["data_type"] == "iterative" else days
+ columns=data_set.get("columns", None),
+ categories=data_set.get("categories", None),
+ days=time_period
)
if data_set["data_type"] == "statistics":
@@ -228,6 +246,10 @@ class Data:
data["release"] = data_set["release"]
data["release"] = data["release"].astype("category")
lst_iterative.append(data)
+ elif data_set["data_type"] == "coverage":
+ data["release"] = data_set["release"]
+ data["release"] = data["release"].astype("category")
+ lst_coverage.append(data)
else:
raise NotImplementedError(
f"The data type {data_set['data_type']} is not implemented."
@@ -243,6 +265,11 @@ class Data:
ignore_index=True,
copy=False
)
+ self._data["coverage"] = pd.concat(
+ lst_coverage,
+ ignore_index=True,
+ copy=False
+ )
for key in self._data.keys():
logging.info(