aboutsummaryrefslogtreecommitdiffstats
path: root/csit.infra.etl/stats.py
diff options
context:
space:
mode:
Diffstat (limited to 'csit.infra.etl/stats.py')
-rw-r--r--csit.infra.etl/stats.py22
1 files changed, 14 insertions, 8 deletions
diff --git a/csit.infra.etl/stats.py b/csit.infra.etl/stats.py
index 5d44caa25d..08ce4a9d0d 100644
--- a/csit.infra.etl/stats.py
+++ b/csit.infra.etl/stats.py
@@ -28,8 +28,9 @@ from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
-S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
-S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+
+S3_LOGS_BUCKET=environ.get("S3_LOGS_BUCKET", "fdio-logs-s3-cloudfront-index")
+S3_DOCS_BUCKET=environ.get("S3_DOCS_BUCKET", "fdio-docs-s3-cloudfront-index")
PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
SUFFIX="suite.info.json.gz"
IGNORE_SUFFIX=[]
@@ -106,7 +107,6 @@ paths = wr.s3.list_objects(
for schema_name in ["sra"]:
out_sdf = process_json_to_dataframe(schema_name, paths)
- out_sdf.show(truncate=False)
out_sdf.printSchema()
out_sdf = out_sdf \
.withColumn("year", lit(datetime.now().year)) \
@@ -115,6 +115,16 @@ for schema_name in ["sra"]:
.repartition(1)
try:
+ boto3_session = session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ except KeyError:
+ boto3_session = session.Session()
+ )
+
+ try:
wr.s3.to_parquet(
df=out_sdf.toPandas(),
path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats",
@@ -123,11 +133,7 @@ for schema_name in ["sra"]:
compression="snappy",
use_threads=True,
mode="overwrite_partitions",
- boto3_session=session.Session(
- aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
- aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
- region_name=environ["OUT_AWS_DEFAULT_REGION"]
- )
+ boto3_session=boto3_session
)
except EmptyDataFrame:
pass