diff options
author | Peter Mikus <pmikus@cisco.com> | 2022-02-22 11:00:47 +0100 |
---|---|---|
committer | Peter Mikus <pmikus@cisco.com> | 2022-02-24 08:01:53 +0000 |
commit | 54fd337e30acc97434b33a6d0d3c19e4aa3051ab (patch) | |
tree | 5d131a1d7c0846a209e85380b66dfc7770cbe32e /csit.infra.etl/local.py | |
parent | b2cb835b34c7404b2aaee3ec30700c67537da66d (diff) |
feat(uti): etl
Signed-off-by: Peter Mikus <pmikus@cisco.com>
Change-Id: I7cdcdcbf1e4986664d5d48357688185319f67b0c
Diffstat (limited to 'csit.infra.etl/local.py')
-rw-r--r-- | csit.infra.etl/local.py | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/csit.infra.etl/local.py b/csit.infra.etl/local.py new file mode 100644 index 0000000000..79e18d1c64 --- /dev/null +++ b/csit.infra.etl/local.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ETL script running on top of the localhost""" + +from datetime import datetime +from json import dump, load +from pathlib import Path + +from awsglue.context import GlueContext +from pyspark.context import SparkContext +from pyspark.sql.functions import col, lit, regexp_replace +from pyspark.sql.types import StructType + + +PATH="/app/tests" +SUFFIX="info.json" +IGNORE_SUFFIX=[ + "suite.info.json", + "setup.info.json", + "teardown.info.json", + "suite.output.info.json", + "setup.output.info.json", + "teardown.output.info.json" +] + + +def schema_dump(schema, option): + """Dumps Spark DataFrame schema into JSON file. + + :param schema: DataFrame schema. + :type schema: StructType + :param option: File name suffix for the DataFrame schema. + :type option: string + """ + with open(f"trending_{option}.json", "w", encoding="UTF-8") as f_schema: + dump(schema.jsonValue(), f_schema, indent=4, sort_keys=True) + + +def schema_load(option): + """Loads Spark DataFrame schema from JSON file. + + :param option: File name suffix for the DataFrame schema. + :type option: string + :returns: DataFrame schema. + :rtype: StructType + """ + with open(f"trending_{option}.json", "r", encoding="UTF-8") as f_schema: + return StructType.fromJson(load(f_schema)) + + +def schema_dump_from_json(option): + """Loads JSON with data and dumps Spark DataFrame schema into JSON file. + + :param option: File name suffix for the JSON data. + :type option: string + """ + schema_dump(spark \ + .read \ + .option("multiline", "true") \ + .json(f"data_{option}.json") \ + .schema, option + ) + + +def flatten_frame(nested_sdf): + """Unnest Spark DataFrame in case there nested structered columns. + + :param nested_sdf: Spark DataFrame. + :type nested_sdf: DataFrame + :returns: Unnest DataFrame. + :rtype: DataFrame + """ + stack = [((), nested_sdf)] + columns = [] + while len(stack) > 0: + parents, sdf = stack.pop() + for column_name, column_type in sdf.dtypes: + if column_type[:6] == "struct": + projected_sdf = sdf.select(column_name + ".*") + stack.append((parents + (column_name,), projected_sdf)) + else: + columns.append( + col(".".join(parents + (column_name,))) \ + .alias("_".join(parents + (column_name,))) + ) + return nested_sdf.select(columns) + + +def process_json_to_dataframe(schema_name, paths): + """Processes JSON to Spark DataFrame. + + :param schema_name: Schema name. + :type schema_name: string + :param paths: S3 paths to process. + :type paths: list + :returns: Spark DataFrame. + :rtype: DataFrame + """ + drop_subset = [ + "dut_type", "dut_version", + "passed", + "test_name_long", "test_name_short", + "test_type", + "version" + ] + + # load schemas + schema = schema_load(schema_name) + + # create empty DF out of schemas + sdf = spark.createDataFrame([], schema) + + # filter list + filtered = [path for path in paths if schema_name in path] + + # select + for path in filtered: + print(path) + + sdf_loaded = spark \ + .read \ + .option("multiline", "true") \ + .schema(schema) \ + .json(path) \ + .withColumn("job", lit("local")) \ + .withColumn("build", lit("unknown")) + sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) + + # drop rows with all nulls and drop rows with null in critical frames + sdf = sdf.na.drop(how="all") + sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) + + # flatten frame + sdf = flatten_frame(sdf) + + return sdf + + +# create SparkContext and GlueContext +spark_context = SparkContext.getOrCreate() +spark_context.setLogLevel("WARN") +glue_context = GlueContext(spark_context) +spark = glue_context.spark_session + +# files of interest +paths = [] +for file in Path(PATH).glob(f"**/*{SUFFIX}"): + if file.name not in IGNORE_SUFFIX: + paths.append(str(file)) + +for schema_name in ["mrr", "ndrpdr", "soak"]: + out_sdf = process_json_to_dataframe(schema_name, paths) + out_sdf.show() + out_sdf.printSchema() + out_sdf \ + .withColumn("year", lit(datetime.now().year)) \ + .withColumn("month", lit(datetime.now().month)) \ + .withColumn("day", lit(datetime.now().day)) \ + .repartition(1) \ + .write \ + .partitionBy("test_type", "year", "month", "day") \ + .mode("append") \ + .parquet("local.parquet") |