summaryrefslogtreecommitdiffstats
path: root/INFO.yaml
blob: 06f749d368e1368fc679a2e593ffb5d9aa9b73d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
---
project: 'vpp'
project_creation_date: '2015-12-08'
project_category: ''
lifecycle_state: 'Incubation'
project_lead: &vpp_ptl
    name: 'Dave Barach'
    email: 'openvpp@barachs.net'
    id: 'dbarach'
    company: ''
    timezone: ''
primary_contact: *vpp_ptl
issue_tracking:
    type: 'jira'
    url: 'https://jira.fd.io/projects/vpp'
    key: 'VPP'
mailing_list:
    type: 'groups.io'
    url: 'https://lists.fd.io/g/vpp-dev'
    tag: '<[sub-project_name]>'
realtime_discussion:
    type: 'irc'
    server: 'freenode.net'
    channel: 'fdio-vpp'
meetings:
    - type: 'zoom'
      agenda: 'n/a'
      url: 'https://wiki.fd.io/view/VPP/Meeting'
      server: 'n/a'
      channel: 'fdio-vpp'
      repeats: 'weekly'
      time: '08:00 PT'
repositories:
    - 'vpp'
committers:
    - <<: *vpp_ptl
    - name: 'Dave Barach'
      company: 'cisco'
      email: 'openvpp@barachs.net'
      id: 'dbarach'
      timezone: ''
    - name: 'Florin Coras'
      company: 'cisco'
      email: 'florin.coras@gmail.com'
      id: 'florin.coras'
      timezone: ''
    - name: 'Benoit Ganne'
      company: 'cisco'
      email: 'bganne@cisco.com'
      id: 'bganne'
      timezone: ''
    - name: 'John Lo'
      company: 'cisco'
      email: 'loj@cisco.com'
      id: 'lojohn'
      timezone: ''
    - name: 'Chris Luke'
      company: 'comcast'
      email: 'chris_luke@comcast.com'
      id: 'chrisluke'
      timezone: ''
    - name: 'Damjan Marion'
      company: 'cisco'
      email: 'damarion@cisco.com'
      id: 'dmarion'
      timezone: ''
    - name: 'Neale Ranns'
      company: 'cisco'
      email: 'nranns@cisco.com'
      id: 'nranns'
      timezone: ''
    - name: 'Matthew Smith'
      company: 'netgate'
      email: 'mgsmith@netgate.com'
      id: 'mgsmith'
      timezone: ''
    - name: 'Ole Trøan'
      company: 'employees'
      email: 'otroan@employees.org'
      id: 'otroan'
      timezone: ''
    - name: 'Paul Vinciguerra'
      company: 'vinciconsulting'
      email: 'pvinci@vinciconsulting.com'
      id: 'pvinci'
      timezone: ''
    - name: 'Dave Wallace'
      company: 'gmail'
      email: 'dwallacelf@gmail.com'
      id: 'dwallacelf'
      timezone: ''
    - name: 'Ed Warnicke'
      company: 'gmail'
      email: 'hagbard@gmail.com'
      id: 'hagbard'
      timezone: ''
    - name: 'Andrew Yourtchenko'
      company: 'cisco'
      email: 'ayourtch@cisco.com'
      id: 'ayourtch'
      timezone: ''
tsc:
    # yamllint disable rule:line-length
    approval: ''
    changes:
        - type: 'removal'
          name: ''
          link: ''
        - type: 'promotion'
          name: ''
          link: ''
an>option): """Loads Spark DataFrame schema from JSON file. :param option: File name suffix for the DataFrame schema. :type option: string :returns: DataFrame schema. :rtype: StructType """ with open(f"trending_{option}.json", "r", encoding="UTF-8") as f_schema: return StructType.fromJson(load(f_schema)) def schema_dump_from_json(option): """Loads JSON with data and dumps Spark DataFrame schema into JSON file. :param option: File name suffix for the JSON data. :type option: string """ schema_dump(spark \ .read \ .option("multiline", "true") \ .json(f"data_{option}.json") \ .schema, option ) def flatten_frame(nested_sdf): """Unnest Spark DataFrame in case there nested structered columns. :param nested_sdf: Spark DataFrame. :type nested_sdf: DataFrame :returns: Unnest DataFrame. :rtype: DataFrame """ stack = [((), nested_sdf)] columns = [] while len(stack) > 0: parents, sdf = stack.pop() for column_name, column_type in sdf.dtypes: if column_type[:6] == "struct": projected_sdf = sdf.select(column_name + ".*") stack.append((parents + (column_name,), projected_sdf)) else: columns.append( col(".".join(parents + (column_name,))) \ .alias("_".join(parents + (column_name,))) ) return nested_sdf.select(columns) def process_json_to_dataframe(schema_name, paths): """Processes JSON to Spark DataFrame. :param schema_name: Schema name. :type schema_name: string :param paths: S3 paths to process. :type paths: list :returns: Spark DataFrame. :rtype: DataFrame """ drop_subset = [ "dut_type", "dut_version", "passed", "test_name_long", "test_name_short", "test_type", "version" ] # load schemas schema = schema_load(schema_name) # create empty DF out of schemas sdf = spark.createDataFrame([], schema) # filter list filtered = [path for path in paths if schema_name in path] # select for path in filtered: print(path) sdf_loaded = spark \ .read \ .option("multiline", "true") \ .schema(schema) \ .json(path) \ .withColumn("job", lit("local")) \ .withColumn("build", lit("unknown")) sdf = sdf.unionByName(sdf_loaded, allowMissingColumns=True) # drop rows with all nulls and drop rows with null in critical frames sdf = sdf.na.drop(how="all") sdf = sdf.na.drop(how="any", thresh=None, subset=drop_subset) # flatten frame sdf = flatten_frame(sdf) return sdf # create SparkContext and GlueContext spark_context = SparkContext.getOrCreate() spark_context.setLogLevel("WARN") glue_context = GlueContext(spark_context) spark = glue_context.spark_session # files of interest paths = [] for file in Path(PATH).glob(f"**/*{SUFFIX}"): if file.name not in IGNORE_SUFFIX: paths.append(str(file)) for schema_name in ["mrr", "ndrpdr", "soak"]: out_sdf = process_json_to_dataframe(schema_name, paths) out_sdf.show() out_sdf.printSchema() out_sdf \ .withColumn("year", lit(datetime.now().year)) \ .withColumn("month", lit(datetime.now().month)) \ .withColumn("day", lit(datetime.now().day)) \ .repartition(1) \ .write \ .partitionBy("test_type", "year", "month", "day") \ .mode("append") \ .parquet("local.parquet")