diff options
author | pmikus <peter.mikus@protonmail.ch> | 2024-05-22 14:30:49 +0200 |
---|---|---|
committer | Peter Mikus <peter.mikus@protonmail.ch> | 2024-05-22 12:56:34 +0000 |
commit | 2ebf9e56d0dd200fa09979505a2da070b39da63f (patch) | |
tree | 6e6ca12c8458faa1c6559bf7afd1bbfc8b9abdd6 /csit.infra.etl/stats.py | |
parent | e07a1a535c97e8b7e26e78e9ec3d8c6593407f70 (diff) |
feat(etl): Release pipelines
Signed-off-by: pmikus <peter.mikus@protonmail.ch>
Change-Id: I4ce20267b4747bf1901b6175e0ec5936b583a510
Diffstat (limited to 'csit.infra.etl/stats.py')
-rw-r--r-- | csit.infra.etl/stats.py | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/csit.infra.etl/stats.py b/csit.infra.etl/stats.py index 5d44caa25d..08ce4a9d0d 100644 --- a/csit.infra.etl/stats.py +++ b/csit.infra.etl/stats.py @@ -28,8 +28,9 @@ from pyspark.context import SparkContext from pyspark.sql.functions import lit from pyspark.sql.types import StructType -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" + +S3_LOGS_BUCKET=environ.get("S3_LOGS_BUCKET", "fdio-logs-s3-cloudfront-index") +S3_DOCS_BUCKET=environ.get("S3_DOCS_BUCKET", "fdio-docs-s3-cloudfront-index") PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" SUFFIX="suite.info.json.gz" IGNORE_SUFFIX=[] @@ -106,7 +107,6 @@ paths = wr.s3.list_objects( for schema_name in ["sra"]: out_sdf = process_json_to_dataframe(schema_name, paths) - out_sdf.show(truncate=False) out_sdf.printSchema() out_sdf = out_sdf \ .withColumn("year", lit(datetime.now().year)) \ @@ -115,6 +115,16 @@ for schema_name in ["sra"]: .repartition(1) try: + boto3_session = session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + except KeyError: + boto3_session = session.Session() + ) + + try: wr.s3.to_parquet( df=out_sdf.toPandas(), path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats", @@ -123,11 +133,7 @@ for schema_name in ["sra"]: compression="snappy", use_threads=True, mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) + boto3_session=boto3_session ) except EmptyDataFrame: pass |