From b0b7f8dfdd37d45de0f3fcf758798e3bfbb9381d Mon Sep 17 00:00:00 2001 From: pmikus Date: Thu, 17 Aug 2023 12:26:28 +0000 Subject: feat(terraform): ETL for 2310 Signed-off-by: pmikus Change-Id: I3013b8be6a7f06d2f1f3b8320e7cb6f057a47491 --- csit.infra.etl/coverage_device_rls2306.py | 170 -------------------- csit.infra.etl/coverage_device_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/coverage_hoststack_rls2306.py | 171 --------------------- csit.infra.etl/coverage_hoststack_rls2310.py | 171 +++++++++++++++++++++ csit.infra.etl/coverage_mrr_rls2306.py | 170 -------------------- csit.infra.etl/coverage_mrr_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/coverage_ndrpdr_rls2306.py | 170 -------------------- csit.infra.etl/coverage_ndrpdr_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/coverage_reconf_rls2306.py | 171 --------------------- csit.infra.etl/coverage_reconf_rls2310.py | 171 +++++++++++++++++++++ csit.infra.etl/coverage_soak_rls2306.py | 170 -------------------- csit.infra.etl/coverage_soak_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/iterative_hoststack_rls2306.py | 171 --------------------- csit.infra.etl/iterative_hoststack_rls2310.py | 171 +++++++++++++++++++++ csit.infra.etl/iterative_mrr_rls2306.py | 170 -------------------- csit.infra.etl/iterative_mrr_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/iterative_ndrpdr_rls2306.py | 170 -------------------- csit.infra.etl/iterative_ndrpdr_rls2310.py | 170 ++++++++++++++++++++ csit.infra.etl/iterative_reconf_rls2306.py | 171 --------------------- csit.infra.etl/iterative_reconf_rls2310.py | 171 +++++++++++++++++++++ csit.infra.etl/iterative_soak_rls2306.py | 170 -------------------- csit.infra.etl/iterative_soak_rls2310.py | 170 ++++++++++++++++++++ .../terraform-nomad-pyspark-etl/README.md | 10 +- .../nomad/etl-coverage-device-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-coverage-device-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-coverage-hoststack-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-coverage-hoststack-rls2310.hcl.tftpl | 55 +++++++ .../conf/nomad/etl-coverage-mrr-rls2306.hcl.tftpl | 55 ------- .../conf/nomad/etl-coverage-mrr-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-coverage-ndrpdr-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-coverage-ndrpdr-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-coverage-reconf-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-coverage-reconf-rls2310.hcl.tftpl | 55 +++++++ .../conf/nomad/etl-coverage-soak-rls2306.hcl.tftpl | 55 ------- .../conf/nomad/etl-coverage-soak-rls2310.hcl.tftpl | 55 +++++++ .../etl-iterative-hoststack-rls2306.hcl.tftpl | 55 ------- .../etl-iterative-hoststack-rls2310.hcl.tftpl | 55 +++++++ .../conf/nomad/etl-iterative-mrr-rls2306.hcl.tftpl | 55 ------- .../conf/nomad/etl-iterative-mrr-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-iterative-ndrpdr-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-iterative-ndrpdr-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-iterative-reconf-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-iterative-reconf-rls2310.hcl.tftpl | 55 +++++++ .../nomad/etl-iterative-soak-rls2306.hcl.tftpl | 55 ------- .../nomad/etl-iterative-soak-rls2310.hcl.tftpl | 55 +++++++ .../terraform-nomad-pyspark-etl/fdio/main.tf | 44 +++--- .../terraform-nomad-pyspark-etl/fdio/versions.tf | 4 +- .../terraform-nomad-pyspark-etl/versions.tf | 4 +- 48 files changed, 2510 insertions(+), 2510 deletions(-) delete mode 100644 csit.infra.etl/coverage_device_rls2306.py create mode 100644 csit.infra.etl/coverage_device_rls2310.py delete mode 100644 csit.infra.etl/coverage_hoststack_rls2306.py create mode 100644 csit.infra.etl/coverage_hoststack_rls2310.py delete mode 100644 csit.infra.etl/coverage_mrr_rls2306.py create mode 100644 csit.infra.etl/coverage_mrr_rls2310.py delete mode 100644 csit.infra.etl/coverage_ndrpdr_rls2306.py create mode 100644 csit.infra.etl/coverage_ndrpdr_rls2310.py delete mode 100644 csit.infra.etl/coverage_reconf_rls2306.py create mode 100644 csit.infra.etl/coverage_reconf_rls2310.py delete mode 100644 csit.infra.etl/coverage_soak_rls2306.py create mode 100644 csit.infra.etl/coverage_soak_rls2310.py delete mode 100644 csit.infra.etl/iterative_hoststack_rls2306.py create mode 100644 csit.infra.etl/iterative_hoststack_rls2310.py delete mode 100644 csit.infra.etl/iterative_mrr_rls2306.py create mode 100644 csit.infra.etl/iterative_mrr_rls2310.py delete mode 100644 csit.infra.etl/iterative_ndrpdr_rls2306.py create mode 100644 csit.infra.etl/iterative_ndrpdr_rls2310.py delete mode 100644 csit.infra.etl/iterative_reconf_rls2306.py create mode 100644 csit.infra.etl/iterative_reconf_rls2310.py delete mode 100644 csit.infra.etl/iterative_soak_rls2306.py create mode 100644 csit.infra.etl/iterative_soak_rls2310.py delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2310.hcl.tftpl delete mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2306.hcl.tftpl create mode 100644 fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2310.hcl.tftpl diff --git a/csit.infra.etl/coverage_device_rls2306.py b/csit.infra.etl/coverage_device_rls2306.py deleted file mode 100644 index 2d4f59b29c..0000000000 --- a/csit.infra.etl/coverage_device_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-vpp-device-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("device", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_device_rls2310.py b/csit.infra.etl/coverage_device_rls2310.py new file mode 100644 index 0000000000..509a73f0de --- /dev/null +++ b/csit.infra.etl/coverage_device_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-vpp-device-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("device", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_hoststack_rls2306.py b/csit.infra.etl/coverage_hoststack_rls2306.py deleted file mode 100644 index 9a5f7a5bc9..0000000000 --- a/csit.infra.etl/coverage_hoststack_rls2306.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("hoststack", filtered_paths) -out_sdf.show(truncate=False) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_hoststack_rls2310.py b/csit.infra.etl/coverage_hoststack_rls2310.py new file mode 100644 index 0000000000..7c9e950bf7 --- /dev/null +++ b/csit.infra.etl/coverage_hoststack_rls2310.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("hoststack", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_mrr_rls2306.py b/csit.infra.etl/coverage_mrr_rls2306.py deleted file mode 100644 index 02ea40ef2b..0000000000 --- a/csit.infra.etl/coverage_mrr_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("mrr", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_mrr_rls2310.py b/csit.infra.etl/coverage_mrr_rls2310.py new file mode 100644 index 0000000000..4f8b5c8799 --- /dev/null +++ b/csit.infra.etl/coverage_mrr_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_ndrpdr_rls2306.py b/csit.infra.etl/coverage_ndrpdr_rls2306.py deleted file mode 100644 index 5987e3eaa1..0000000000 --- a/csit.infra.etl/coverage_ndrpdr_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_ndrpdr_rls2310.py b/csit.infra.etl/coverage_ndrpdr_rls2310.py new file mode 100644 index 0000000000..f2b30ed4fa --- /dev/null +++ b/csit.infra.etl/coverage_ndrpdr_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_reconf_rls2306.py b/csit.infra.etl/coverage_reconf_rls2306.py deleted file mode 100644 index 32885744d7..0000000000 --- a/csit.infra.etl/coverage_reconf_rls2306.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("reconf", filtered_paths) -out_sdf.show(truncate=False) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_reconf_rls2310.py b/csit.infra.etl/coverage_reconf_rls2310.py new file mode 100644 index 0000000000..aff458c30a --- /dev/null +++ b/csit.infra.etl/coverage_reconf_rls2310.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("reconf", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_soak_rls2306.py b/csit.infra.etl/coverage_soak_rls2306.py deleted file mode 100644 index fd4b9fed35..0000000000 --- a/csit.infra.etl/coverage_soak_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-coverage-2306" in path] - -out_sdf = process_json_to_dataframe("soak", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/coverage_soak_rls2310.py b/csit.infra.etl/coverage_soak_rls2310.py new file mode 100644 index 0000000000..2e1a739d74 --- /dev/null +++ b/csit.infra.etl/coverage_soak_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2310" in path] + +out_sdf = process_json_to_dataframe("soak", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_hoststack_rls2306.py b/csit.infra.etl/iterative_hoststack_rls2306.py deleted file mode 100644 index 0b179dea88..0000000000 --- a/csit.infra.etl/iterative_hoststack_rls2306.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-iterative-2306" in path] - -out_sdf = process_json_to_dataframe("hoststack", filtered_paths) -out_sdf.show(truncate=False) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/iterative_hoststack_rls2310.py b/csit.infra.etl/iterative_hoststack_rls2310.py new file mode 100644 index 0000000000..7b2984694a --- /dev/null +++ b/csit.infra.etl/iterative_hoststack_rls2310.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2310" in path] + +out_sdf = process_json_to_dataframe("hoststack", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_mrr_rls2306.py b/csit.infra.etl/iterative_mrr_rls2306.py deleted file mode 100644 index 33ffff7ce4..0000000000 --- a/csit.infra.etl/iterative_mrr_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-iterative-2306" in path] - -out_sdf = process_json_to_dataframe("mrr", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/iterative_mrr_rls2310.py b/csit.infra.etl/iterative_mrr_rls2310.py new file mode 100644 index 0000000000..12a4c26fb6 --- /dev/null +++ b/csit.infra.etl/iterative_mrr_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2310" in path] + +out_sdf = process_json_to_dataframe("mrr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_ndrpdr_rls2306.py b/csit.infra.etl/iterative_ndrpdr_rls2306.py deleted file mode 100644 index 9b3a36be18..0000000000 --- a/csit.infra.etl/iterative_ndrpdr_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-iterative-2306" in path] - -out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/iterative_ndrpdr_rls2310.py b/csit.infra.etl/iterative_ndrpdr_rls2310.py new file mode 100644 index 0000000000..d368c4a8fc --- /dev/null +++ b/csit.infra.etl/iterative_ndrpdr_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2310" in path] + +out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_reconf_rls2306.py b/csit.infra.etl/iterative_reconf_rls2306.py deleted file mode 100644 index a45ac66a83..0000000000 --- a/csit.infra.etl/iterative_reconf_rls2306.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-iterative-2306" in path] - -out_sdf = process_json_to_dataframe("reconf", filtered_paths) -out_sdf.show(truncate=False) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/iterative_reconf_rls2310.py b/csit.infra.etl/iterative_reconf_rls2310.py new file mode 100644 index 0000000000..ef5e604af3 --- /dev/null +++ b/csit.infra.etl/iterative_reconf_rls2310.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2310" in path] + +out_sdf = process_json_to_dataframe("reconf", filtered_paths) +out_sdf.show(truncate=False) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_soak_rls2306.py b/csit.infra.etl/iterative_soak_rls2306.py deleted file mode 100644 index ccc5012199..0000000000 --- a/csit.infra.etl/iterative_soak_rls2306.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2023 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""ETL script running on top of the s3://""" - -from datetime import datetime, timedelta -from json import load -from os import environ -from pytz import utc - -import awswrangler as wr -from awswrangler.exceptions import EmptyDataFrame -from awsglue.context import GlueContext -from boto3 import session -from pyspark.context import SparkContext -from pyspark.sql.functions import col, lit, regexp_replace -from pyspark.sql.types import StructType - - -S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" -S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" -PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" -SUFFIX="info.json.gz" -IGNORE_SUFFIX=[ - "suite.info.json.gz", - "setup.info.json.gz", - "teardown.info.json.gz", - "suite.output.info.json.gz", - "setup.output.info.json.gz", - "teardown.output.info.json.gz" -] -LAST_MODIFIED_END=utc.localize( - datetime.strptime( - f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", - "%Y-%m-%d" - ) -) -LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) - - -def flatten_frame(nested_sdf): - """Unnest Spark DataFrame in case there nested structered columns. - - :param nested_sdf: Spark DataFrame. - :type nested_sdf: DataFrame - :returns: Unnest DataFrame. - :rtype: DataFrame - """ - stack = [((), nested_sdf)] - columns = [] - while len(stack) > 0: - parents, sdf = stack.pop() - for column_name, column_type in sdf.dtypes: - if column_type[:6] == "struct": - projected_sdf = sdf.select(column_name + ".*") - stack.append((parents + (column_name,), projected_sdf)) - else: - columns.append( - col(".".join(parents + (column_name,))) \ - .alias("_".join(parents + (column_name,))) - ) - return nested_sdf.select(columns) - - -def process_json_to_dataframe(schema_name, paths): - """Processes JSON to Spark DataFrame. - - :param schema_name: Schema name. - :type schema_name: string - :param paths: S3 paths to process. - :type paths: list - :returns: Spark DataFrame. - :rtype: DataFrame - """ - drop_subset = [ - "dut_type", "dut_version", - "passed", - "test_name_long", "test_name_short", - "test_type", - "version" - ] - - # load schemas - with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: - schema = StructType.fromJson(load(f_schema)) - - # create empty DF out of schemas - sdf = spark.createDataFrame([], schema) - - # filter list - filtered = [path for path in paths if schema_name in path] - - # select - for path in filtered: - print(path) - - sdf_loaded = spark \ - .read \ - .option("multiline", "true") \ - .schema(schema) \ - .json(path) \ - .withColumn("job", lit(path.split("/")[4])) \ - .withColumn("build", lit(path.split("/")[5])) - sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) - - # drop rows with all nulls and drop rows with null in critical frames - sdf = sdf.na.drop(how="all") - sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) - - # flatten frame - sdf = flatten_frame(sdf) - - return sdf - - -# create SparkContext and GlueContext -spark_context = SparkContext.getOrCreate() -spark_context.setLogLevel("WARN") -glue_context = GlueContext(spark_context) -spark = glue_context.spark_session - -# files of interest -paths = wr.s3.list_objects( - path=PATH, - suffix=SUFFIX, - last_modified_begin=LAST_MODIFIED_BEGIN, - last_modified_end=LAST_MODIFIED_END, - ignore_suffix=IGNORE_SUFFIX, - ignore_empty=True -) - -filtered_paths = [path for path in paths if "report-iterative-2306" in path] - -out_sdf = process_json_to_dataframe("soak", filtered_paths) -out_sdf.printSchema() -out_sdf = out_sdf \ - .withColumn("year", lit(datetime.now().year)) \ - .withColumn("month", lit(datetime.now().month)) \ - .withColumn("day", lit(datetime.now().day)) \ - .repartition(1) - -try: - wr.s3.to_parquet( - df=out_sdf.toPandas(), - path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2306", - dataset=True, - partition_cols=["test_type", "year", "month", "day"], - compression="snappy", - use_threads=True, - mode="overwrite_partitions", - boto3_session=session.Session( - aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], - aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], - region_name=environ["OUT_AWS_DEFAULT_REGION"] - ) - ) -except EmptyDataFrame: - pass diff --git a/csit.infra.etl/iterative_soak_rls2310.py b/csit.infra.etl/iterative_soak_rls2310.py new file mode 100644 index 0000000000..cfe733ba9a --- /dev/null +++ b/csit.infra.etl/iterative_soak_rls2310.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2023 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2310" in path] + +out_sdf = process_json_to_dataframe("soak", filtered_paths) +out_sdf.printSchema() +out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + +try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2310", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) +except EmptyDataFrame: + pass diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/README.md b/fdio.infra.terraform/terraform-nomad-pyspark-etl/README.md index 1147ddb16a..d61c8778d4 100644 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/README.md +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/README.md @@ -3,14 +3,14 @@ | Name | Version | |------|---------| -| [terraform](#requirement\_terraform) | >= 1.1.4 | -| [nomad](#requirement\_nomad) | >= 1.4.16 | +| [terraform](#requirement\_terraform) | >= 1.5.4 | +| [nomad](#requirement\_nomad) | >= 1.4.20 | ## Providers | Name | Version | |------|---------| -| [nomad](#provider\_nomad) | >= 1.4.16 | +| [nomad](#provider\_nomad) | >= 1.4.20 | ## Modules @@ -20,7 +20,7 @@ No modules. | Name | Type | |------|------| -| [nomad_job.nomad_job_etl](https://registry.terraform.io/providers/hashicorp/nomad/latest/docs/resources/job) | resource | +| [nomad_job.nomad_job](https://registry.terraform.io/providers/hashicorp/nomad/latest/docs/resources/job) | resource | ## Inputs @@ -35,7 +35,7 @@ No modules. | [envs](#input\_envs) | Specifies ETL environment variables. | `list(string)` | `[]` | no | | [image](#input\_image) | Specifies the Docker image to run. | `string` | `"pmikus/docker-ubuntu-focal-aws-glue:latest"` | no | | [job\_name](#input\_job\_name) | Specifies a name for the job. | `string` | `"etl"` | no | -| [memory](#input\_memory) | Specifies the memory required in MB. | `number` | `20000` | no | +| [memory](#input\_memory) | Specifies the memory required in MB. | `number` | `50000` | no | | [out\_aws\_access\_key\_id](#input\_out\_aws\_access\_key\_id) | AWS access key. | `string` | `"aws"` | no | | [out\_aws\_default\_region](#input\_out\_aws\_default\_region) | AWS region | `string` | `"aws"` | no | | [out\_aws\_secret\_access\_key](#input\_out\_aws\_secret\_access\_key) | AWS secret key | `string` | `"aws"` | no | diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2306.hcl.tftpl deleted file mode 100644 index c769659bc6..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_device_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2310.hcl.tftpl new file mode 100644 index 0000000000..217197667a --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-device-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_device_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2306.hcl.tftpl deleted file mode 100644 index 3567f0258e..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_hoststack_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2310.hcl.tftpl new file mode 100644 index 0000000000..6aa5268df1 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-hoststack-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_hoststack_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2306.hcl.tftpl deleted file mode 100644 index edfd995980..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_mrr_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2310.hcl.tftpl new file mode 100644 index 0000000000..6358dbe690 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-mrr-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_mrr_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2306.hcl.tftpl deleted file mode 100644 index 298853b15e..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_ndrpdr_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2310.hcl.tftpl new file mode 100644 index 0000000000..c2400ad444 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-ndrpdr-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_ndrpdr_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2306.hcl.tftpl deleted file mode 100644 index 82630ba64d..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_reconf_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2310.hcl.tftpl new file mode 100644 index 0000000000..3bea4e77b4 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-reconf-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_reconf_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2306.hcl.tftpl deleted file mode 100644 index f32a7cf024..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "coverage_soak_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2310.hcl.tftpl new file mode 100644 index 0000000000..8ca898be37 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-coverage-soak-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "coverage_soak_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} \ No newline at end of file diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2306.hcl.tftpl deleted file mode 100644 index c4ebf032d8..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "iterative_hoststack_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2310.hcl.tftpl new file mode 100644 index 0000000000..285fcac9c0 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-hoststack-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_hoststack_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2306.hcl.tftpl deleted file mode 100644 index d4e0f9598f..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "iterative_mrr_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2310.hcl.tftpl new file mode 100644 index 0000000000..764dfe7147 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-mrr-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_mrr_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2306.hcl.tftpl deleted file mode 100644 index e18ebbc6e2..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "iterative_ndrpdr_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2310.hcl.tftpl new file mode 100644 index 0000000000..5ea75a49d9 --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-ndrpdr-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_ndrpdr_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2306.hcl.tftpl deleted file mode 100644 index 46b5ba2c80..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "iterative_reconf_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2310.hcl.tftpl new file mode 100644 index 0000000000..17ad83e25a --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-reconf-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_reconf_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2306.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2306.hcl.tftpl deleted file mode 100644 index 7df5aef88f..0000000000 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2306.hcl.tftpl +++ /dev/null @@ -1,55 +0,0 @@ -job "${job_name}" { - datacenters = "${datacenters}" - type = "${type}" - periodic { - cron = "${cron}" - prohibit_overlap = "${prohibit_overlap}" - time_zone = "${time_zone}" - } - group "${job_name}" { - restart { - mode = "fail" - } - constraint { - attribute = "$${attr.cpu.arch}" - operator = "!=" - value = "arm64" - } - constraint { - attribute = "$${node.class}" - value = "builder" - } - task "${job_name}" { - artifact { - source = "git::https://github.com/FDio/csit" - destination = "local/csit" - } - driver = "docker" - config { - image = "${image}" - command = "gluesparksubmit" - args = [ - "--driver-memory", "20g", - "--executor-memory", "20g", - "--executor-cores", "2", - "--master", "local[2]", - "iterative_soak_rls2306.py" - ] - work_dir = "/local/csit/csit.infra.etl" - } - env { - AWS_ACCESS_KEY_ID = "${aws_access_key_id}" - AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" - AWS_DEFAULT_REGION = "${aws_default_region}" - OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" - OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" - OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" - ${ envs } - } - resources { - cpu = ${cpu} - memory = ${memory} - } - } - } -} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2310.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2310.hcl.tftpl new file mode 100644 index 0000000000..69753701ce --- /dev/null +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl-iterative-soak-rls2310.hcl.tftpl @@ -0,0 +1,55 @@ +job "${job_name}" { + datacenters = "${datacenters}" + type = "${type}" + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + group "${job_name}" { + restart { + mode = "fail" + } + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + constraint { + attribute = "$${node.class}" + value = "builder" + } + task "${job_name}" { + artifact { + source = "git::https://github.com/FDio/csit" + destination = "local/csit" + } + driver = "docker" + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "--executor-cores", "2", + "--master", "local[2]", + "iterative_soak_rls2310.py" + ] + work_dir = "/local/csit/csit.infra.etl" + } + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/main.tf b/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/main.tf index ec1a38d375..cb6b3a0021 100644 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/main.tf +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/main.tf @@ -74,7 +74,7 @@ module "etl-trending-ndrpdr" { job_name = "etl-trending-ndrpdr" } -module "etl-iterative-hoststack-rls2306" { +module "etl-iterative-hoststack-rls2310" { providers = { nomad = nomad.yul1 } @@ -88,10 +88,10 @@ module "etl-iterative-hoststack-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-iterative-hoststack-rls2306" + job_name = "etl-iterative-hoststack-rls2310" } -module "etl-iterative-mrr-rls2306" { +module "etl-iterative-mrr-rls2310" { providers = { nomad = nomad.yul1 } @@ -105,10 +105,10 @@ module "etl-iterative-mrr-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-iterative-mrr-rls2306" + job_name = "etl-iterative-mrr-rls2310" } -module "etl-iterative-ndrpdr-rls2306" { +module "etl-iterative-ndrpdr-rls2310" { providers = { nomad = nomad.yul1 } @@ -122,10 +122,10 @@ module "etl-iterative-ndrpdr-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-iterative-ndrpdr-rls2306" + job_name = "etl-iterative-ndrpdr-rls2310" } -module "etl-iterative-reconf-rls2306" { +module "etl-iterative-reconf-rls2310" { providers = { nomad = nomad.yul1 } @@ -139,10 +139,10 @@ module "etl-iterative-reconf-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-iterative-reconf-rls2306" + job_name = "etl-iterative-reconf-rls2310" } -module "etl-iterative-soak-rls2306" { +module "etl-iterative-soak-rls2310" { providers = { nomad = nomad.yul1 } @@ -156,10 +156,10 @@ module "etl-iterative-soak-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-iterative-soak-rls2306" + job_name = "etl-iterative-soak-rls2310" } -module "etl-coverage-device-rls2306" { +module "etl-coverage-device-rls2310" { providers = { nomad = nomad.yul1 } @@ -173,10 +173,10 @@ module "etl-coverage-device-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-device-rls2306" + job_name = "etl-coverage-device-rls2310" } -module "etl-coverage-hoststack-rls2306" { +module "etl-coverage-hoststack-rls2310" { providers = { nomad = nomad.yul1 } @@ -190,10 +190,10 @@ module "etl-coverage-hoststack-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-hoststack-rls2306" + job_name = "etl-coverage-hoststack-rls2310" } -module "etl-coverage-mrr-rls2306" { +module "etl-coverage-mrr-rls2310" { providers = { nomad = nomad.yul1 } @@ -207,10 +207,10 @@ module "etl-coverage-mrr-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-mrr-rls2306" + job_name = "etl-coverage-mrr-rls2310" } -module "etl-coverage-ndrpdr-rls2306" { +module "etl-coverage-ndrpdr-rls2310" { providers = { nomad = nomad.yul1 } @@ -224,10 +224,10 @@ module "etl-coverage-ndrpdr-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-ndrpdr-rls2306" + job_name = "etl-coverage-ndrpdr-rls2310" } -module "etl-coverage-reconf-rls2306" { +module "etl-coverage-reconf-rls2310" { providers = { nomad = nomad.yul1 } @@ -241,10 +241,10 @@ module "etl-coverage-reconf-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-reconf-rls2306" + job_name = "etl-coverage-reconf-rls2310" } -module "etl-coverage-soak-rls2306" { +module "etl-coverage-soak-rls2310" { providers = { nomad = nomad.yul1 } @@ -258,5 +258,5 @@ module "etl-coverage-soak-rls2306" { out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] cron = "@daily" datacenters = ["yul1"] - job_name = "etl-coverage-soak-rls2306" + job_name = "etl-coverage-soak-rls2310" } diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/versions.tf b/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/versions.tf index 2a5c83f29f..0c05e76d65 100644 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/versions.tf +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/fdio/versions.tf @@ -7,11 +7,11 @@ terraform { required_providers { nomad = { source = "hashicorp/nomad" - version = ">= 1.4.19" + version = ">= 1.4.20" } vault = { version = ">= 3.12.0" } } - required_version = ">= 1.3.7" + required_version = ">= 1.5.4" } diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/versions.tf b/fdio.infra.terraform/terraform-nomad-pyspark-etl/versions.tf index a319c35908..f40435fe77 100644 --- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/versions.tf +++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/versions.tf @@ -2,8 +2,8 @@ terraform { required_providers { nomad = { source = "hashicorp/nomad" - version = ">= 1.4.19" + version = ">= 1.4.20" } } - required_version = ">= 1.3.7" + required_version = ">= 1.5.4" } -- cgit 1.2.3-korg