aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--csit.infra.etl/coverage_device_rls2210.py (renamed from csit.infra.etl/coverage_rls2206.py)53
-rw-r--r--csit.infra.etl/coverage_mrr_rls2210.py170
-rw-r--r--csit.infra.etl/coverage_ndrpdr_rls2210.py170
-rw-r--r--csit.infra.etl/coverage_soak_rls2210.py170
-rw-r--r--csit.infra.etl/iterative_mrr_rls2210.py (renamed from csit.infra.etl/iterative_rls2206.py)53
-rw-r--r--csit.infra.etl/iterative_ndrpdr_rls2210.py170
-rw-r--r--csit.infra.etl/iterative_soak_rls2210.py170
-rw-r--r--csit.infra.etl/trending_mrr.py (renamed from csit.infra.etl/trending.py)51
-rw-r--r--csit.infra.etl/trending_ndrpdr.py171
-rw-r--r--csit.infra.etl/trending_soak.py171
-rw-r--r--fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl541
11 files changed, 1569 insertions, 321 deletions
diff --git a/csit.infra.etl/coverage_rls2206.py b/csit.infra.etl/coverage_device_rls2210.py
index 4e2619d924..9c9e1c9603 100644
--- a/csit.infra.etl/coverage_rls2206.py
+++ b/csit.infra.etl/coverage_device_rls2210.py
@@ -141,31 +141,30 @@ paths = wr.s3.list_objects(
ignore_empty=True
)
-filtered_paths = [path for path in paths if "report-coverage-2206" in path]
-
-for schema_name in ["mrr", "ndrpdr", "soak", "device"]:
- out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
- out_sdf.printSchema()
- out_sdf = out_sdf \
- .withColumn("year", lit(datetime.now().year)) \
- .withColumn("month", lit(datetime.now().month)) \
- .withColumn("day", lit(datetime.now().day)) \
- .repartition(1)
-
- try:
- wr.s3.to_parquet(
- df=out_sdf.toPandas(),
- path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2206",
- dataset=True,
- partition_cols=["test_type", "year", "month", "day"],
- compression="snappy",
- use_threads=True,
- mode="overwrite_partitions",
- boto3_session=session.Session(
- aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
- aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
- region_name=environ["OUT_AWS_DEFAULT_REGION"]
- )
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
)
- except EmptyDataFrame:
- pass
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/coverage_mrr_rls2210.py b/csit.infra.etl/coverage_mrr_rls2210.py
new file mode 100644
index 0000000000..9c9e1c9603
--- /dev/null
+++ b/csit.infra.etl/coverage_mrr_rls2210.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/coverage_ndrpdr_rls2210.py b/csit.infra.etl/coverage_ndrpdr_rls2210.py
new file mode 100644
index 0000000000..9c9e1c9603
--- /dev/null
+++ b/csit.infra.etl/coverage_ndrpdr_rls2210.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/coverage_soak_rls2210.py b/csit.infra.etl/coverage_soak_rls2210.py
new file mode 100644
index 0000000000..9c9e1c9603
--- /dev/null
+++ b/csit.infra.etl/coverage_soak_rls2210.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-coverage-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/iterative_rls2206.py b/csit.infra.etl/iterative_mrr_rls2210.py
index 88c644b625..b7a8dbcbfa 100644
--- a/csit.infra.etl/iterative_rls2206.py
+++ b/csit.infra.etl/iterative_mrr_rls2210.py
@@ -141,31 +141,30 @@ paths = wr.s3.list_objects(
ignore_empty=True
)
-filtered_paths = [path for path in paths if "report-iterative-2206" in path]
-
-for schema_name in ["mrr", "ndrpdr", "soak"]:
- out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
- out_sdf.printSchema()
- out_sdf = out_sdf \
- .withColumn("year", lit(datetime.now().year)) \
- .withColumn("month", lit(datetime.now().month)) \
- .withColumn("day", lit(datetime.now().day)) \
- .repartition(1)
-
- try:
- wr.s3.to_parquet(
- df=out_sdf.toPandas(),
- path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2206",
- dataset=True,
- partition_cols=["test_type", "year", "month", "day"],
- compression="snappy",
- use_threads=True,
- mode="overwrite_partitions",
- boto3_session=session.Session(
- aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
- aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
- region_name=environ["OUT_AWS_DEFAULT_REGION"]
- )
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
)
- except EmptyDataFrame:
- pass
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/iterative_ndrpdr_rls2210.py b/csit.infra.etl/iterative_ndrpdr_rls2210.py
new file mode 100644
index 0000000000..70ab8158a8
--- /dev/null
+++ b/csit.infra.etl/iterative_ndrpdr_rls2210.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/iterative_soak_rls2210.py b/csit.infra.etl/iterative_soak_rls2210.py
new file mode 100644
index 0000000000..b74d7b44dc
--- /dev/null
+++ b/csit.infra.etl/iterative_soak_rls2210.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "report-iterative-2210" in path]
+
+out_sdf = process_json_to_dataframe("soak", filtered_paths)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2210",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/trending.py b/csit.infra.etl/trending_mrr.py
index bc27aaa063..1ba8c69b1b 100644
--- a/csit.infra.etl/trending.py
+++ b/csit.infra.etl/trending_mrr.py
@@ -143,30 +143,29 @@ paths = wr.s3.list_objects(
filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
-for schema_name in ["mrr", "ndrpdr", "soak"]:
- out_sdf = process_json_to_dataframe(schema_name, filtered_paths)
- out_sdf.show(truncate=False)
- out_sdf.printSchema()
- out_sdf = out_sdf \
- .withColumn("year", lit(datetime.now().year)) \
- .withColumn("month", lit(datetime.now().month)) \
- .withColumn("day", lit(datetime.now().day)) \
- .repartition(1)
-
- try:
- wr.s3.to_parquet(
- df=out_sdf.toPandas(),
- path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
- dataset=True,
- partition_cols=["test_type", "year", "month", "day"],
- compression="snappy",
- use_threads=True,
- mode="overwrite_partitions",
- boto3_session=session.Session(
- aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
- aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
- region_name=environ["OUT_AWS_DEFAULT_REGION"]
- )
+out_sdf = process_json_to_dataframe("mrr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
)
- except EmptyDataFrame:
- pass
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/trending_ndrpdr.py b/csit.infra.etl/trending_ndrpdr.py
new file mode 100644
index 0000000000..d3c51ba757
--- /dev/null
+++ b/csit.infra.etl/trending_ndrpdr.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
+
+out_sdf = process_json_to_dataframe("ndrpdr", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/csit.infra.etl/trending_soak.py b/csit.infra.etl/trending_soak.py
new file mode 100644
index 0000000000..e54cf9f18a
--- /dev/null
+++ b/csit.infra.etl/trending_soak.py
@@ -0,0 +1,171 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2022 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""ETL script running on top of the s3://"""
+
+from datetime import datetime, timedelta
+from json import load
+from os import environ
+from pytz import utc
+
+import awswrangler as wr
+from awswrangler.exceptions import EmptyDataFrame
+from awsglue.context import GlueContext
+from boto3 import session
+from pyspark.context import SparkContext
+from pyspark.sql.functions import col, lit, regexp_replace
+from pyspark.sql.types import StructType
+
+
+S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index"
+S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index"
+PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*"
+SUFFIX="info.json.gz"
+IGNORE_SUFFIX=[
+ "suite.info.json.gz",
+ "setup.info.json.gz",
+ "teardown.info.json.gz",
+ "suite.output.info.json.gz",
+ "setup.output.info.json.gz",
+ "teardown.output.info.json.gz"
+]
+LAST_MODIFIED_END=utc.localize(
+ datetime.strptime(
+ f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}",
+ "%Y-%m-%d"
+ )
+)
+LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1)
+
+
+def flatten_frame(nested_sdf):
+ """Unnest Spark DataFrame in case there nested structered columns.
+
+ :param nested_sdf: Spark DataFrame.
+ :type nested_sdf: DataFrame
+ :returns: Unnest DataFrame.
+ :rtype: DataFrame
+ """
+ stack = [((), nested_sdf)]
+ columns = []
+ while len(stack) > 0:
+ parents, sdf = stack.pop()
+ for column_name, column_type in sdf.dtypes:
+ if column_type[:6] == "struct":
+ projected_sdf = sdf.select(column_name + ".*")
+ stack.append((parents + (column_name,), projected_sdf))
+ else:
+ columns.append(
+ col(".".join(parents + (column_name,))) \
+ .alias("_".join(parents + (column_name,)))
+ )
+ return nested_sdf.select(columns)
+
+
+def process_json_to_dataframe(schema_name, paths):
+ """Processes JSON to Spark DataFrame.
+
+ :param schema_name: Schema name.
+ :type schema_name: string
+ :param paths: S3 paths to process.
+ :type paths: list
+ :returns: Spark DataFrame.
+ :rtype: DataFrame
+ """
+ drop_subset = [
+ "dut_type", "dut_version",
+ "passed",
+ "test_name_long", "test_name_short",
+ "test_type",
+ "version"
+ ]
+
+ # load schemas
+ with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema:
+ schema = StructType.fromJson(load(f_schema))
+
+ # create empty DF out of schemas
+ sdf = spark.createDataFrame([], schema)
+
+ # filter list
+ filtered = [path for path in paths if schema_name in path]
+
+ # select
+ for path in filtered:
+ print(path)
+
+ sdf_loaded = spark \
+ .read \
+ .option("multiline", "true") \
+ .schema(schema) \
+ .json(path) \
+ .withColumn("job", lit(path.split("/")[4])) \
+ .withColumn("build", lit(path.split("/")[5]))
+ sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True)
+
+ # drop rows with all nulls and drop rows with null in critical frames
+ sdf = sdf.na.drop(how="all")
+ sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset)
+
+ # flatten frame
+ sdf = flatten_frame(sdf)
+
+ return sdf
+
+
+# create SparkContext and GlueContext
+spark_context = SparkContext.getOrCreate()
+spark_context.setLogLevel("WARN")
+glue_context = GlueContext(spark_context)
+spark = glue_context.spark_session
+
+# files of interest
+paths = wr.s3.list_objects(
+ path=PATH,
+ suffix=SUFFIX,
+ last_modified_begin=LAST_MODIFIED_BEGIN,
+ last_modified_end=LAST_MODIFIED_END,
+ ignore_suffix=IGNORE_SUFFIX,
+ ignore_empty=True
+)
+
+filtered_paths = [path for path in paths if "daily" in path or "weekly" in path]
+
+out_sdf = process_json_to_dataframe("soak", filtered_paths)
+out_sdf.show(truncate=False)
+out_sdf.printSchema()
+out_sdf = out_sdf \
+ .withColumn("year", lit(datetime.now().year)) \
+ .withColumn("month", lit(datetime.now().month)) \
+ .withColumn("day", lit(datetime.now().day)) \
+ .repartition(1)
+
+try:
+ wr.s3.to_parquet(
+ df=out_sdf.toPandas(),
+ path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending",
+ dataset=True,
+ partition_cols=["test_type", "year", "month", "day"],
+ compression="snappy",
+ use_threads=True,
+ mode="overwrite_partitions",
+ boto3_session=session.Session(
+ aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"],
+ aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"],
+ region_name=environ["OUT_AWS_DEFAULT_REGION"]
+ )
+ )
+except EmptyDataFrame:
+ pass
diff --git a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl
index 0d0ecfa318..0abb0e5d51 100644
--- a/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl
+++ b/fdio.infra.terraform/terraform-nomad-pyspark-etl/conf/nomad/etl.hcl.tftpl
@@ -1,100 +1,100 @@
job "${job_name}" {
- # The "datacenters" parameter specifies the list of datacenters which should
- # be considered when placing this task. This must be provided.
datacenters = "${datacenters}"
-
- # The "type" parameter controls the type of job, which impacts the scheduler's
- # decision on placement. For a full list of job types and their differences,
- # please see the online documentation.
- #
- # https://www.nomadproject.io/docs/jobspec/schedulers
- #
type = "${type}"
-
- # The periodic stanza allows a job to run at fixed times, dates, or intervals.
- # The easiest way to think about the periodic scheduler is "Nomad cron" or
- # "distributed cron".
- #
- # https://www.nomadproject.io/docs/job-specification/periodic
- #
periodic {
cron = "${cron}"
prohibit_overlap = "${prohibit_overlap}"
time_zone = "${time_zone}"
}
-
- # The "group" stanza defines a series of tasks that should be co-located on
- # the same Nomad client. Any task within a group will be placed on the same
- # client.
- #
- # https://www.nomadproject.io/docs/job-specification/group
- #
group "${job_name}-master" {
- # The restart stanza configures a tasks's behavior on task failure. Restarts
- # happen on the client that is running the task.
- #
- # https://www.nomadproject.io/docs/job-specification/restart
- #
restart {
mode = "fail"
}
-
- # The constraint allows restricting the set of eligible nodes. Constraints
- # may filter on attributes or client metadata.
- #
- # For more information and examples on the "volume" stanza, please see
- # the online documentation at:
- #
- # https://www.nomadproject.io/docs/job-specification/constraint
- #
constraint {
attribute = "$${attr.cpu.arch}"
operator = "!="
value = "arm64"
}
-
constraint {
attribute = "$${node.class}"
value = "builder"
}
-
- # The "task" stanza creates an individual unit of work, such as a Docker
- # container, web application, or batch processing.
- #
- # https://www.nomadproject.io/docs/job-specification/task.html
- #
- task "${job_name}-trending" {
- # The artifact stanza instructs Nomad to fetch and unpack a remote
- # resource, such as a file, tarball, or binary.
- #
- # https://www.nomadproject.io/docs/job-specification/artifact
- #
+ task "${job_name}-trending-mrr" {
artifact {
source = "git::https://github.com/FDio/csit"
destination = "local/csit"
}
-
- # The "driver" parameter specifies the task driver that should be used to
- # run the task.
driver = "docker"
-
- # The "config" stanza specifies the driver configuration, which is passed
- # directly to the driver to start the task. The details of configurations
- # are specific to each driver, so please see specific driver
- # documentation for more information.
config {
image = "${image}"
command = "gluesparksubmit"
args = [
"--driver-memory", "30g",
"--executor-memory", "30g",
- "trending.py"
+ "trending_mrr.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-trending-ndrpdr" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "30g",
+ "--executor-memory", "30g",
+ "trending_ndrpdr.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-trending-soak" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "30g",
+ "--executor-memory", "30g",
+ "trending_soak.py"
]
work_dir = "/local/csit/csit.infra.etl"
}
-
- # The env stanza configures a list of environment variables to populate
- # the task's environment before starting.
env {
AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
@@ -104,38 +104,17 @@ job "${job_name}" {
OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
${ envs }
}
-
- # The "resources" stanza describes the requirements a task needs to
- # execute. Resource requirements include memory, network, cpu, and more.
- # This ensures the task will execute on a machine that contains enough
- # resource capacity.
- #
- # https://www.nomadproject.io/docs/job-specification/resources
- #
resources {
cpu = ${cpu}
memory = ${memory}
}
}
task "${job_name}-stats" {
- # The artifact stanza instructs Nomad to fetch and unpack a remote
- # resource, such as a file, tarball, or binary.
- #
- # https://www.nomadproject.io/docs/job-specification/artifact
- #
artifact {
source = "git::https://github.com/FDio/csit"
destination = "local/csit"
}
-
- # The "driver" parameter specifies the task driver that should be used to
- # run the task.
driver = "docker"
-
- # The "config" stanza specifies the driver configuration, which is passed
- # directly to the driver to start the task. The details of configurations
- # are specific to each driver, so please see specific driver
- # documentation for more information.
config {
image = "${image}"
command = "gluesparksubmit"
@@ -146,9 +125,6 @@ job "${job_name}" {
]
work_dir = "/local/csit/csit.infra.etl"
}
-
- # The env stanza configures a list of environment variables to populate
- # the task's environment before starting.
env {
AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
@@ -158,165 +134,248 @@ job "${job_name}" {
OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
${ envs }
}
-
- # The "resources" stanza describes the requirements a task needs to
- # execute. Resource requirements include memory, network, cpu, and more.
- # This ensures the task will execute on a machine that contains enough
- # resource capacity.
- #
- # https://www.nomadproject.io/docs/job-specification/resources
- #
resources {
cpu = ${cpu}
memory = ${memory}
}
}
}
-# group "${job_name}-rls2206" {
-# # The restart stanza configures a tasks's behavior on task failure. Restarts
-# # happen on the client that is running the task.
-# #
-# # https://www.nomadproject.io/docs/job-specification/restart
-# #
-# restart {
-# mode = "fail"
-# }
-#
-# # The constraint allows restricting the set of eligible nodes. Constraints
-# # may filter on attributes or client metadata.
-# #
-# # For more information and examples on the "volume" stanza, please see
-# # the online documentation at:
-# #
-# # https://www.nomadproject.io/docs/job-specification/constraint
-# #
-# constraint {
-# attribute = "$${attr.cpu.arch}"
-# operator = "!="
-# value = "arm64"
-# }
-#
-# constraint {
-# attribute = "$${node.class}"
-# value = "builder"
-# }
-#
-# # The "task" stanza creates an individual unit of work, such as a Docker
-# # container, web application, or batch processing.
-# #
-# # https://www.nomadproject.io/docs/job-specification/task.html
-# #
-# task "${job_name}-coverage" {
-# # The artifact stanza instructs Nomad to fetch and unpack a remote
-# # resource, such as a file, tarball, or binary.
-# #
-# # https://www.nomadproject.io/docs/job-specification/artifact
-# #
-# artifact {
-# source = "git::https://github.com/FDio/csit"
-# destination = "local/csit"
-# }
-#
-# # The "driver" parameter specifies the task driver that should be used to
-# # run the task.
-# driver = "docker"
-#
-# # The "config" stanza specifies the driver configuration, which is passed
-# # directly to the driver to start the task. The details of configurations
-# # are specific to each driver, so please see specific driver
-# # documentation for more information.
-# config {
-# image = "${image}"
-# command = "gluesparksubmit"
-# args = [
-# "--driver-memory", "20g",
-# "--executor-memory", "20g",
-# "--executor-cores", "2",
-# "--master", "local[2]",
-# "coverage_rls2206.py"
-# ]
-# work_dir = "/local/csit/csit.infra.etl"
-# }
-#
-# # The env stanza configures a list of environment variables to populate
-# # the task's environment before starting.
-# env {
-# AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
-# AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
-# AWS_DEFAULT_REGION = "${aws_default_region}"
-# OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
-# OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
-# OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
-# ${ envs }
-# }
-#
-# # The "resources" stanza describes the requirements a task needs to
-# # execute. Resource requirements include memory, network, cpu, and more.
-# # This ensures the task will execute on a machine that contains enough
-# # resource capacity.
-# #
-# # https://www.nomadproject.io/docs/job-specification/resources
-# #
-# resources {
-# cpu = ${cpu}
-# memory = ${memory}
-# }
-# }
-# task "${job_name}-iterative" {
-# # The artifact stanza instructs Nomad to fetch and unpack a remote
-# # resource, such as a file, tarball, or binary.
-# #
-# # https://www.nomadproject.io/docs/job-specification/artifact
-# #
-# artifact {
-# source = "git::https://github.com/FDio/csit"
-# destination = "local/csit"
-# }
-#
-# # The "driver" parameter specifies the task driver that should be used to
-# # run the task.
-# driver = "docker"
-#
-# # The "config" stanza specifies the driver configuration, which is passed
-# # directly to the driver to start the task. The details of configurations
-# # are specific to each driver, so please see specific driver
-# # documentation for more information.
-# config {
-# image = "${image}"
-# command = "gluesparksubmit"
-# args = [
-# "--driver-memory", "20g",
-# "--executor-memory", "20g",
-# "--executor-cores", "2",
-# "--master", "local[2]",
-# "iterative_rls2206.py"
-# ]
-# work_dir = "/local/csit/csit.infra.etl"
-# }
-#
-# # The env stanza configures a list of environment variables to populate
-# # the task's environment before starting.
-# env {
-# AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
-# AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
-# AWS_DEFAULT_REGION = "${aws_default_region}"
-# OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
-# OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
-# OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
-# ${ envs }
-# }
-#
-# # The "resources" stanza describes the requirements a task needs to
-# # execute. Resource requirements include memory, network, cpu, and more.
-# # This ensures the task will execute on a machine that contains enough
-# # resource capacity.
-# #
-# # https://www.nomadproject.io/docs/job-specification/resources
-# #
-# resources {
-# cpu = ${cpu}
-# memory = ${memory}
-# }
-# }
-# }
+ group "${job_name}-rls2210" {
+ restart {
+ mode = "fail"
+ }
+ constraint {
+ attribute = "$${attr.cpu.arch}"
+ operator = "!="
+ value = "arm64"
+ }
+ constraint {
+ attribute = "$${node.class}"
+ value = "builder"
+ }
+ task "${job_name}-coverage-device" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "coverage_device_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-coverage-mrr" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "coverage_mrr_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-coverage-ndrpdr" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "coverage_ndrpdr_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-coverage-soak" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "coverage_soak_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-iterative-mrr" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "iterative_mrr_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-iterative-ndrpdr" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "iterative_ndrpdr_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ task "${job_name}-iterative-soak" {
+ artifact {
+ source = "git::https://github.com/FDio/csit"
+ destination = "local/csit"
+ }
+ driver = "docker"
+ config {
+ image = "${image}"
+ command = "gluesparksubmit"
+ args = [
+ "--driver-memory", "20g",
+ "--executor-memory", "20g",
+ "--executor-cores", "2",
+ "--master", "local[2]",
+ "iterative_soak_rls2210.py"
+ ]
+ work_dir = "/local/csit/csit.infra.etl"
+ }
+ env {
+ AWS_ACCESS_KEY_ID = "${aws_access_key_id}"
+ AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}"
+ AWS_DEFAULT_REGION = "${aws_default_region}"
+ OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}"
+ OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}"
+ OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}"
+ ${ envs }
+ }
+ resources {
+ cpu = ${cpu}
+ memory = ${memory}
+ }
+ }
+ }
}