From 2ebf9e56d0dd200fa09979505a2da070b39da63f Mon Sep 17 00:00:00 2001 From: pmikus Date: Wed, 22 May 2024 14:30:49 +0200 Subject: feat(etl): Release pipelines Signed-off-by: pmikus Change-Id: I4ce20267b4747bf1901b6175e0ec5936b583a510 --- csit.infra.etl/trending_hoststack.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) (limited to 'csit.infra.etl/trending_hoststack.py') diff --git a/csit.infra.etl/trending_hoststack.py b/csit.infra.etl/trending_hoststack.py index 85cab5a179..45cb5c9bf5 100644 --- a/csit.infra.etl/trending_hoststack.py +++ b/csit.infra.etl/trending_hoststack.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# Copyright (c) 2023 Cisco and/or its affiliates. +# Copyright (c) 2024 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -29,8 +29,8 @@ from pyspark.sql.functions import col, lit, regexp_replace from pyspark.sql.types import StructType -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +S3_LOGS_BUCKET=environ.get("S3_LOGS_BUCKET", "fdio-logs-s3-cloudfront-index") +S3_DOCS_BUCKET=environ.get("S3_DOCS_BUCKET", "fdio-docs-s3-cloudfront-index") PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" SUFFIX="info.json.gz" IGNORE_SUFFIX=[ @@ -144,7 +144,6 @@ paths = wr.s3.list_objects( filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] out_sdf = process_json_to_dataframe("hoststack", filtered_paths) -out_sdf.show(truncate=False) out_sdf.printSchema() out_sdf = out_sdf \ .withColumn("year", lit(datetime.now().year)) \ @@ -152,6 +151,16 @@ out_sdf = out_sdf \ .withColumn("day", lit(datetime.now().day)) \ .repartition(1) +try: + boto3_session = session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) +except KeyError: + boto3_session = session.Session() +) + try: wr.s3.to_parquet( df=out_sdf.toPandas(), @@ -161,11 +170,7 @@ try: compression="snappy", use_threads=True, mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) + boto3_session=boto3_session ) except EmptyDataFrame: pass -- cgit