diff options
author | Peter Mikus <pmikus@cisco.com> | 2022-02-22 11:00:47 +0100 |
---|---|---|
committer | Peter Mikus <pmikus@cisco.com> | 2022-02-24 08:01:53 +0000 |
commit | 54fd337e30acc97434b33a6d0d3c19e4aa3051ab (patch) | |
tree | 5d131a1d7c0846a209e85380b66dfc7770cbe32e | |
parent | b2cb835b34c7404b2aaee3ec30700c67537da66d (diff) |
feat(uti): etl
Signed-off-by: Peter Mikus <pmikus@cisco.com>
Change-Id: I7cdcdcbf1e4986664d5d48357688185319f67b0c
25 files changed, 4726 insertions, 0 deletions
diff --git a/csit.infra.etl/.gitignore b/csit.infra.etl/.gitignore new file mode 100644 index 0000000000..bccc1450f2 --- /dev/null +++ b/csit.infra.etl/.gitignore @@ -0,0 +1 @@ +*.parquet
\ No newline at end of file diff --git a/csit.infra.etl/coverage_device.json b/csit.infra.etl/coverage_device.json new file mode 100644 index 0000000000..12989b30fa --- /dev/null +++ b/csit.infra.etl/coverage_device.json @@ -0,0 +1,77 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/coverage_mrr.json b/csit.infra.etl/coverage_mrr.json new file mode 100644 index 0000000000..13b80ec82e --- /dev/null +++ b/csit.infra.etl/coverage_mrr.json @@ -0,0 +1,151 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "receive_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "stdev", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "values", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "double", + "type": "array" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/coverage_ndrpdr.json b/csit.infra.etl/coverage_ndrpdr.json new file mode 100644 index 0000000000..f188321bfb --- /dev/null +++ b/csit.infra.etl/coverage_ndrpdr.json @@ -0,0 +1,679 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "latency_forward", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "latency_reverse", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "ndr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/coverage_rls2202.py b/csit.infra.etl/coverage_rls2202.py new file mode 100644 index 0000000000..97b0a12b88 --- /dev/null +++ b/csit.infra.etl/coverage_rls2202.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"coverage_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-coverage-2202" in path] + +for schema_name in ["mrr", "ndrpdr", "soak", "device"]: + out_sdf = process_json_to_dataframe(schema_name, filtered_paths) + out_sdf.show(truncate=False) + out_sdf.printSchema() + out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + + try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/coverage_rls2202", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) + except EmptyDataFrame: + pass diff --git a/csit.infra.etl/coverage_soak.json b/csit.infra.etl/coverage_soak.json new file mode 100644 index 0000000000..59eaec2e9d --- /dev/null +++ b/csit.infra.etl/coverage_soak.json @@ -0,0 +1,221 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "critical_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/iterative_mrr.json b/csit.infra.etl/iterative_mrr.json new file mode 100644 index 0000000000..13b80ec82e --- /dev/null +++ b/csit.infra.etl/iterative_mrr.json @@ -0,0 +1,151 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "receive_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "stdev", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "values", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "double", + "type": "array" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/iterative_ndrpdr.json b/csit.infra.etl/iterative_ndrpdr.json new file mode 100644 index 0000000000..f188321bfb --- /dev/null +++ b/csit.infra.etl/iterative_ndrpdr.json @@ -0,0 +1,679 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "latency_forward", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "latency_reverse", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "ndr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/iterative_rls2202.py b/csit.infra.etl/iterative_rls2202.py new file mode 100644 index 0000000000..13b6f4272c --- /dev/null +++ b/csit.infra.etl/iterative_rls2202.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"iterative_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "report-iterative-2202" in path] + +for schema_name in ["mrr", "ndrpdr", "soak"]: + out_sdf = process_json_to_dataframe(schema_name, filtered_paths) + out_sdf.show(truncate=False) + out_sdf.printSchema() + out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + + try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/iterative_rls2202", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) + except EmptyDataFrame: + pass diff --git a/csit.infra.etl/iterative_soak.json b/csit.infra.etl/iterative_soak.json new file mode 100644 index 0000000000..59eaec2e9d --- /dev/null +++ b/csit.infra.etl/iterative_soak.json @@ -0,0 +1,221 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "critical_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tags", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/local.py b/csit.infra.etl/local.py new file mode 100644 index 0000000000..79e18d1c64 --- /dev/null +++ b/csit.infra.etl/local.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the localhost""" + +from datetime import datetime +from json import dump, load +from pathlib import Path + +from awsglue.context import GlueContext +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +PATH="/app/tests" +SUFFIX="info.json" +IGNORE_SUFFIX=[ + "suite.info.json", + "setup.info.json", + "teardown.info.json", + "suite.output.info.json", + "setup.output.info.json", + "teardown.output.info.json" +] + + +def schema_dump(schema, option): + """Dumps Spark DataFrame schema into JSON file. + + :param schema: DataFrame schema. + :type schema: StructType + :param option: File name suffix for the DataFrame schema. + :type option: string + """ + with open(f"trending_{option}.json", "w", encoding="UTF-8") as f_schema: + dump(schema.jsonValue(), f_schema, indent=4, sort_keys=True) + + +def schema_load(option): + """Loads Spark DataFrame schema from JSON file. + + :param option: File name suffix for the DataFrame schema. + :type option: string + :returns: DataFrame schema. + :rtype: StructType + """ + with open(f"trending_{option}.json", "r", encoding="UTF-8") as f_schema: + return StructType.fromJson(load(f_schema)) + + +def schema_dump_from_json(option): + """Loads JSON with data and dumps Spark DataFrame schema into JSON file. + + :param option: File name suffix for the JSON data. + :type option: string + """ + schema_dump(spark \ + .read \ + .option("multiline", "true") \ + .json(f"data_{option}.json") \ + .schema, option + ) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + schema = schema_load(schema_name) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit("local")) \ + .withColumn("build", lit("unknown")) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = [] +for file in Path(PATH).glob(f"**/*{SUFFIX}"): + if file.name not in IGNORE_SUFFIX: + paths.append(str(file)) + +for schema_name in ["mrr", "ndrpdr", "soak"]: + out_sdf = process_json_to_dataframe(schema_name, paths) + out_sdf.show() + out_sdf.printSchema() + out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) \ + .write \ + .partitionBy("test_type", "year", "month", "day") \ + .mode("append") \ + .parquet("local.parquet") diff --git a/csit.infra.etl/stats.py b/csit.infra.etl/stats.py new file mode 100644 index 0000000000..ab8bcafdeb --- /dev/null +++ b/csit.infra.etl/stats.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import lit +from pyspark.sql.types import StructType + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="suite.info.json.gz" +IGNORE_SUFFIX=[] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "duration", + "version" + ] + + # load schemas + with open(f"stats_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if "tests/suite.info.json.gz" in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) \ + .withColumn("stats_type", lit(schema_name)) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +for schema_name in ["sra"]: + out_sdf = process_json_to_dataframe(schema_name, paths) + out_sdf.show(truncate=False) + out_sdf.printSchema() + out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + + try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/stats", + dataset=True, + partition_cols=["stats_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) + except EmptyDataFrame: + pass diff --git a/csit.infra.etl/stats_sra.json b/csit.infra.etl/stats_sra.json new file mode 100644 index 0000000000..5f792e9bfe --- /dev/null +++ b/csit.infra.etl/stats_sra.json @@ -0,0 +1,41 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "stats_type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/trending.py b/csit.infra.etl/trending.py new file mode 100644 index 0000000000..bc27aaa063 --- /dev/null +++ b/csit.infra.etl/trending.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the s3://""" + +from datetime import datetime, timedelta +from json import load +from os import environ +from pytz import utc + +import awswrangler as wr +from awswrangler.exceptions import EmptyDataFrame +from awsglue.context import GlueContext +from boto3 import session +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +S3_LOGS_BUCKET="fdio-logs-s3-cloudfront-index" +S3_DOCS_BUCKET="fdio-docs-s3-cloudfront-index" +PATH=f"s3://{S3_LOGS_BUCKET}/vex-yul-rot-jenkins-1/csit-*-perf-*" +SUFFIX="info.json.gz" +IGNORE_SUFFIX=[ + "suite.info.json.gz", + "setup.info.json.gz", + "teardown.info.json.gz", + "suite.output.info.json.gz", + "setup.output.info.json.gz", + "teardown.output.info.json.gz" +] +LAST_MODIFIED_END=utc.localize( + datetime.strptime( + f"{datetime.now().year}-{datetime.now().month}-{datetime.now().day}", + "%Y-%m-%d" + ) +) +LAST_MODIFIED_BEGIN=LAST_MODIFIED_END - timedelta(1) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + with open(f"trending_{schema_name}.json", "r", encoding="UTF-8") as f_schema: + schema = StructType.fromJson(load(f_schema)) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit(path.split("/")[4])) \ + .withColumn("build", lit(path.split("/")[5])) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = wr.s3.list_objects( + path=PATH, + suffix=SUFFIX, + last_modified_begin=LAST_MODIFIED_BEGIN, + last_modified_end=LAST_MODIFIED_END, + ignore_suffix=IGNORE_SUFFIX, + ignore_empty=True +) + +filtered_paths = [path for path in paths if "daily" in path or "weekly" in path] + +for schema_name in ["mrr", "ndrpdr", "soak"]: + out_sdf = process_json_to_dataframe(schema_name, filtered_paths) + out_sdf.show(truncate=False) + out_sdf.printSchema() + out_sdf = out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) + + try: + wr.s3.to_parquet( + df=out_sdf.toPandas(), + path=f"s3://{S3_DOCS_BUCKET}/csit/parquet/trending", + dataset=True, + partition_cols=["test_type", "year", "month", "day"], + compression="snappy", + use_threads=True, + mode="overwrite_partitions", + boto3_session=session.Session( + aws_access_key_id=environ["OUT_AWS_ACCESS_KEY_ID"], + aws_secret_access_key=environ["OUT_AWS_SECRET_ACCESS_KEY"], + region_name=environ["OUT_AWS_DEFAULT_REGION"] + ) + ) + except EmptyDataFrame: + pass diff --git a/csit.infra.etl/trending_mrr.json b/csit.infra.etl/trending_mrr.json new file mode 100644 index 0000000000..4e222d33d5 --- /dev/null +++ b/csit.infra.etl/trending_mrr.json @@ -0,0 +1,169 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "hosts", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "tg_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tg_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "receive_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "stdev", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "values", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "double", + "type": "array" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "message", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/trending_ndrpdr.json b/csit.infra.etl/trending_ndrpdr.json new file mode 100644 index 0000000000..fd833aa84c --- /dev/null +++ b/csit.infra.etl/trending_ndrpdr.json @@ -0,0 +1,697 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "hosts", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "tg_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tg_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "latency_forward", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "latency_reverse", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "pdr_0", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_10", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_50", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr_90", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "avg", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "hdrh", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "max", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "min", + "nullable": true, + "type": "long" + }, + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "ndr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "pdr", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "message", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/csit.infra.etl/trending_soak.json b/csit.infra.etl/trending_soak.json new file mode 100644 index 0000000000..819d3142d3 --- /dev/null +++ b/csit.infra.etl/trending_soak.json @@ -0,0 +1,239 @@ +{ + "fields": [ + { + "metadata": {}, + "name": "job", + "nullable": false, + "type": "string" + }, + { + "metadata": {}, + "name": "build", + "nullable": false, + "type": "integer" + }, + { + "metadata": {}, + "name": "duration", + "nullable": true, + "type": "double" + }, + { + "metadata": {}, + "name": "dut_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "dut_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "hosts", + "nullable": true, + "type": { + "containsNull": true, + "elementType": "string", + "type": "array" + } + }, + { + "metadata": {}, + "name": "tg_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "tg_version", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "result", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "critical_rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "lower", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "upper", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "bandwidth", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "rate", + "nullable": true, + "type": { + "fields": [ + { + "metadata": {}, + "name": "unit", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "value", + "nullable": true, + "type": "double" + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "type", + "nullable": true, + "type": "string" + } + ], + "type": "struct" + } + }, + { + "metadata": {}, + "name": "start_time", + "nullable": true, + "type": "timestamp" + }, + { + "metadata": {}, + "name": "passed", + "nullable": true, + "type": "boolean" + }, + { + "metadata": {}, + "name": "test_id", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_long", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_name_short", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "test_type", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "message", + "nullable": true, + "type": "string" + }, + { + "metadata": {}, + "name": "version", + "nullable": true, + "type": "string" + } + ], + "type": "struct" +}
\ No newline at end of file diff --git a/fdio.infra.terraform/1n_nmd/etl/conf/nomad/etl.hcl.tftpl b/fdio.infra.terraform/1n_nmd/etl/conf/nomad/etl.hcl.tftpl new file mode 100644 index 0000000000..c1d186f681 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/conf/nomad/etl.hcl.tftpl @@ -0,0 +1,318 @@ +job "${job_name}" { + # The "datacenters" parameter specifies the list of datacenters which should + # be considered when placing this task. This must be provided. + datacenters = "${datacenters}" + + # The "type" parameter controls the type of job, which impacts the scheduler's + # decision on placement. For a full list of job types and their differences, + # please see the online documentation. + # + # https://www.nomadproject.io/docs/jobspec/schedulers + # + type = "${type}" + + # The periodic stanza allows a job to run at fixed times, dates, or intervals. + # The easiest way to think about the periodic scheduler is "Nomad cron" or + # "distributed cron". + # + # https://www.nomadproject.io/docs/job-specification/periodic + # + periodic { + cron = "${cron}" + prohibit_overlap = "${prohibit_overlap}" + time_zone = "${time_zone}" + } + + # The "group" stanza defines a series of tasks that should be co-located on + # the same Nomad client. Any task within a group will be placed on the same + # client. + # + # https://www.nomadproject.io/docs/job-specification/group + # + group "${job_name}-master" { + # The restart stanza configures a tasks's behavior on task failure. Restarts + # happen on the client that is running the task. + # + # https://www.nomadproject.io/docs/job-specification/restart + # + restart { + mode = "fail" + } + + # The constraint allows restricting the set of eligible nodes. Constraints + # may filter on attributes or client metadata. + # + # For more information and examples on the "volume" stanza, please see + # the online documentation at: + # + # https://www.nomadproject.io/docs/job-specification/constraint + # + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + + constraint { + attribute = "$${node.class}" + value = "builder" + } + + # The "task" stanza creates an individual unit of work, such as a Docker + # container, web application, or batch processing. + # + # https://www.nomadproject.io/docs/job-specification/task.html + # + task "${job_name}-trending" { + # The artifact stanza instructs Nomad to fetch and unpack a remote + # resource, such as a file, tarball, or binary. + # + # https://www.nomadproject.io/docs/job-specification/artifact + # + artifact { + source = "git::https://github.com/pmikus/glue-etl-pyspark.git" + destination = "local/etl" + } + + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # The "config" stanza specifies the driver configuration, which is passed + # directly to the driver to start the task. The details of configurations + # are specific to each driver, so please see specific driver + # documentation for more information. + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "trending.py" + ] + work_dir = "/local/etl" + } + + # The env stanza configures a list of environment variables to populate + # the task's environment before starting. + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + + # The "resources" stanza describes the requirements a task needs to + # execute. Resource requirements include memory, network, cpu, and more. + # This ensures the task will execute on a machine that contains enough + # resource capacity. + # + # https://www.nomadproject.io/docs/job-specification/resources + # + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-stats" { + # The artifact stanza instructs Nomad to fetch and unpack a remote + # resource, such as a file, tarball, or binary. + # + # https://www.nomadproject.io/docs/job-specification/artifact + # + artifact { + source = "git::https://github.com/pmikus/glue-etl-pyspark.git" + destination = "local/etl" + } + + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # The "config" stanza specifies the driver configuration, which is passed + # directly to the driver to start the task. The details of configurations + # are specific to each driver, so please see specific driver + # documentation for more information. + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "10g", + "--executor-memory", "10g", + "stats.py" + ] + work_dir = "/local/etl" + } + + # The env stanza configures a list of environment variables to populate + # the task's environment before starting. + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + + # The "resources" stanza describes the requirements a task needs to + # execute. Resource requirements include memory, network, cpu, and more. + # This ensures the task will execute on a machine that contains enough + # resource capacity. + # + # https://www.nomadproject.io/docs/job-specification/resources + # + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } + group "${job_name}-rls2202" { + # The restart stanza configures a tasks's behavior on task failure. Restarts + # happen on the client that is running the task. + # + # https://www.nomadproject.io/docs/job-specification/restart + # + restart { + mode = "fail" + } + + # The constraint allows restricting the set of eligible nodes. Constraints + # may filter on attributes or client metadata. + # + # For more information and examples on the "volume" stanza, please see + # the online documentation at: + # + # https://www.nomadproject.io/docs/job-specification/constraint + # + constraint { + attribute = "$${attr.cpu.arch}" + operator = "!=" + value = "arm64" + } + + constraint { + attribute = "$${node.class}" + value = "builder" + } + + # The "task" stanza creates an individual unit of work, such as a Docker + # container, web application, or batch processing. + # + # https://www.nomadproject.io/docs/job-specification/task.html + # + task "${job_name}-coverage" { + # The artifact stanza instructs Nomad to fetch and unpack a remote + # resource, such as a file, tarball, or binary. + # + # https://www.nomadproject.io/docs/job-specification/artifact + # + artifact { + source = "git::https://github.com/pmikus/glue-etl-pyspark.git" + destination = "local/etl" + } + + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # The "config" stanza specifies the driver configuration, which is passed + # directly to the driver to start the task. The details of configurations + # are specific to each driver, so please see specific driver + # documentation for more information. + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "coverage_rls2202.py" + ] + work_dir = "/local/etl" + } + + # The env stanza configures a list of environment variables to populate + # the task's environment before starting. + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + + # The "resources" stanza describes the requirements a task needs to + # execute. Resource requirements include memory, network, cpu, and more. + # This ensures the task will execute on a machine that contains enough + # resource capacity. + # + # https://www.nomadproject.io/docs/job-specification/resources + # + resources { + cpu = ${cpu} + memory = ${memory} + } + } + task "${job_name}-iterative" { + # The artifact stanza instructs Nomad to fetch and unpack a remote + # resource, such as a file, tarball, or binary. + # + # https://www.nomadproject.io/docs/job-specification/artifact + # + artifact { + source = "git::https://github.com/pmikus/glue-etl-pyspark.git" + destination = "local/etl" + } + + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # The "config" stanza specifies the driver configuration, which is passed + # directly to the driver to start the task. The details of configurations + # are specific to each driver, so please see specific driver + # documentation for more information. + config { + image = "${image}" + command = "gluesparksubmit" + args = [ + "--driver-memory", "20g", + "--executor-memory", "20g", + "iterative_rls2202.py" + ] + work_dir = "/local/etl" + } + + # The env stanza configures a list of environment variables to populate + # the task's environment before starting. + env { + AWS_ACCESS_KEY_ID = "${aws_access_key_id}" + AWS_SECRET_ACCESS_KEY = "${aws_secret_access_key}" + AWS_DEFAULT_REGION = "${aws_default_region}" + OUT_AWS_ACCESS_KEY_ID = "${out_aws_access_key_id}" + OUT_AWS_SECRET_ACCESS_KEY = "${out_aws_secret_access_key}" + OUT_AWS_DEFAULT_REGION = "${out_aws_default_region}" + ${ envs } + } + + # The "resources" stanza describes the requirements a task needs to + # execute. Resource requirements include memory, network, cpu, and more. + # This ensures the task will execute on a machine that contains enough + # resource capacity. + # + # https://www.nomadproject.io/docs/job-specification/resources + # + resources { + cpu = ${cpu} + memory = ${memory} + } + } + } +} diff --git a/fdio.infra.terraform/1n_nmd/etl/fdio/main.tf b/fdio.infra.terraform/1n_nmd/etl/fdio/main.tf new file mode 100644 index 0000000000..3d2026f0f9 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/fdio/main.tf @@ -0,0 +1,23 @@ +data "vault_generic_secret" "fdio_logs" { + path = "kv/secret/data/etl/fdio_logs" +} + +data "vault_generic_secret" "fdio_docs" { + path = "kv/secret/data/etl/fdio_docs" +} + +module "etl" { + providers = { + nomad = nomad.yul1 + } + source = "../" + + aws_access_key_id = data.vault_generic_secret.fdio_logs.data["access_key"] + aws_secret_access_key = data.vault_generic_secret.fdio_logs.data["secret_key"] + aws_default_region = data.vault_generic_secret.fdio_logs.data["region"] + out_aws_access_key_id = data.vault_generic_secret.fdio_docs.data["access_key"] + out_aws_secret_access_key = data.vault_generic_secret.fdio_docs.data["secret_key"] + out_aws_default_region = data.vault_generic_secret.fdio_docs.data["region"] + cron = "@daily" + datacenters = ["yul1"] +} diff --git a/fdio.infra.terraform/1n_nmd/etl/fdio/providers.tf b/fdio.infra.terraform/1n_nmd/etl/fdio/providers.tf new file mode 100644 index 0000000000..c6617da02b --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/fdio/providers.tf @@ -0,0 +1,13 @@ +provider "nomad" { + address = var.nomad_provider_address + alias = "yul1" + # ca_file = var.nomad_provider_ca_file + # cert_file = var.nomad_provider_cert_file + # key_file = var.nomad_provider_key_file +} + +provider "vault" { + address = var.vault_provider_address + skip_tls_verify = var.vault_provider_skip_tls_verify + token = var.vault_provider_token +} diff --git a/fdio.infra.terraform/1n_nmd/etl/fdio/variables.tf b/fdio.infra.terraform/1n_nmd/etl/fdio/variables.tf new file mode 100644 index 0000000000..0e0b3af622 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/fdio/variables.tf @@ -0,0 +1,47 @@ +variable "nomad_acl" { + description = "Nomad ACLs enabled/disabled." + type = bool + default = false +} + +variable "nomad_provider_address" { + description = "FD.io Nomad cluster address." + type = string + default = "http://10.32.8.14:4646" +} + +variable "nomad_provider_ca_file" { + description = "A local file path to a PEM-encoded certificate authority." + type = string + default = "/etc/nomad.d/ssl/nomad-ca.pem" +} + +variable "nomad_provider_cert_file" { + description = "A local file path to a PEM-encoded certificate." + type = string + default = "/etc/nomad.d/ssl/nomad-cli.pem" +} + +variable "nomad_provider_key_file" { + description = "A local file path to a PEM-encoded private key." + type = string + default = "/etc/nomad.d/ssl/nomad-cli-key.pem" +} + +variable "vault_provider_address" { + description = "Vault cluster address." + type = string + default = "http://10.30.51.28:8200" +} + +variable "vault_provider_skip_tls_verify" { + description = "Verification of the Vault server's TLS certificate." + type = bool + default = false +} + +variable "vault_provider_token" { + description = "Vault root token." + type = string + sensitive = true +} diff --git a/fdio.infra.terraform/1n_nmd/etl/fdio/versions.tf b/fdio.infra.terraform/1n_nmd/etl/fdio/versions.tf new file mode 100644 index 0000000000..526e1d0df0 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/fdio/versions.tf @@ -0,0 +1,17 @@ +terraform { + backend "consul" { + address = "10.32.8.14:8500" + scheme = "http" + path = "terraform/etl" + } + required_providers { + nomad = { + source = "hashicorp/nomad" + version = ">= 1.4.16" + } + vault = { + version = ">= 3.2.1" + } + } + required_version = ">= 1.1.4" +} diff --git a/fdio.infra.terraform/1n_nmd/etl/main.tf b/fdio.infra.terraform/1n_nmd/etl/main.tf new file mode 100644 index 0000000000..c477da81a8 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/main.tf @@ -0,0 +1,33 @@ +locals { + datacenters = join(",", var.datacenters) + envs = join("\n", concat([], var.envs)) +} + +resource "nomad_job" "nomad_job_etl" { + jobspec = templatefile( + "${path.module}/conf/nomad/etl.hcl.tftpl", + { + aws_access_key_id = var.aws_access_key_id, + aws_secret_access_key = var.aws_secret_access_key, + aws_default_region = var.aws_default_region + cpu = var.cpu, + cron = var.cron, + datacenters = local.datacenters, + envs = local.envs, + image = var.image, + job_name = var.job_name, + memory = var.memory, + out_aws_access_key_id = var.out_aws_access_key_id, + out_aws_secret_access_key = var.out_aws_secret_access_key, + out_aws_default_region = var.out_aws_default_region + prohibit_overlap = var.prohibit_overlap, + time_zone = var.time_zone, + type = var.type, + use_vault_provider = var.vault_secret.use_vault_provider, + vault_kv_policy_name = var.vault_secret.vault_kv_policy_name, + vault_kv_path = var.vault_secret.vault_kv_path, + vault_kv_field_access_key = var.vault_secret.vault_kv_field_access_key, + vault_kv_field_secret_key = var.vault_secret.vault_kv_field_secret_key + }) + detach = false +} diff --git a/fdio.infra.terraform/1n_nmd/etl/variables.tf b/fdio.infra.terraform/1n_nmd/etl/variables.tf new file mode 100644 index 0000000000..3c6c12a943 --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/variables.tf @@ -0,0 +1,115 @@ +# Nomad +variable "datacenters" { + description = "Specifies the list of DCs to be considered placing this task." + type = list(string) + default = ["dc1"] +} + +# ETL +variable "aws_access_key_id" { + description = "AWS access key." + type = string + default = "aws" +} + +variable "aws_secret_access_key" { + description = "AWS secret key" + type = string + default = "aws" +} + +variable "aws_default_region" { + description = "AWS region" + type = string + default = "aws" +} + +variable "cpu" { + description = "Specifies the CPU required to run this task in MHz." + type = number + default = 10000 +} + +variable "cron" { + description = "Specifies a cron expression configuring the interval to launch." + type = string + default = "@daily" +} + +variable "envs" { + description = "Specifies ETL environment variables." + type = list(string) + default = [] +} + +variable "image" { + description = "Specifies the Docker image to run." + type = string + default = "pmikus/docker-ubuntu-focal-aws-glue:latest" +} + +variable "job_name" { + description = "Specifies a name for the job." + type = string + default = "etl" +} + +variable "memory" { + description = "Specifies the memory required in MB." + type = number + default = 20000 +} + +variable "out_aws_access_key_id" { + description = "AWS access key." + type = string + default = "aws" +} + +variable "out_aws_secret_access_key" { + description = "AWS secret key" + type = string + default = "aws" +} + +variable "out_aws_default_region" { + description = "AWS region" + type = string + default = "aws" +} + +variable "prohibit_overlap" { + description = "Specifies if this job should wait until previous completed." + type = bool + default = true +} + +variable "time_zone" { + description = "Specifies the time zone to evaluate the next launch interval." + type = string + default = "UTC" +} + +variable "type" { + description = "Specifies the Nomad scheduler to use." + type = string + default = "batch" +} + +variable "vault_secret" { + type = object({ + use_vault_provider = bool, + vault_kv_policy_name = string, + vault_kv_path = string, + vault_kv_field_access_key = string, + vault_kv_field_secret_key = string + }) + description = "Set of properties to be able to fetch secret from vault." + default = { + use_vault_provider = false + vault_kv_policy_name = "kv" + vault_kv_path = "secret/data/etl" + vault_kv_field_access_key = "access_key" + vault_kv_field_secret_key = "secret_key" + } +} diff --git a/fdio.infra.terraform/1n_nmd/etl/versions.tf b/fdio.infra.terraform/1n_nmd/etl/versions.tf new file mode 100644 index 0000000000..a01708f28a --- /dev/null +++ b/fdio.infra.terraform/1n_nmd/etl/versions.tf @@ -0,0 +1,9 @@ +terraform { + required_providers { + nomad = { + source = "hashicorp/nomad" + version = ">= 1.4.16" + } + } + required_version = ">= 1.1.4" +} |