summaryrefslogtreecommitdiffstats
path: root/doc/ELKConnect.py
blob: fb7e2c29e8d2641f3903d140bd43bfaf522a9872 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import sys
import os
import json
import datetime
import time

ext_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, 'scripts', 'external_libs'))
elk_path = os.path.join(ext_path, 'elasticsearch')
urllib_path = os.path.join(ext_path, 'urllib3')

if elk_path not in sys.path:
    sys.path.append(elk_path)
if urllib_path not in sys.path:
    sys.path.append(urllib_path)

import elasticsearch
import elasticsearch.helpers


class ELKManager:
    def __init__(self, hostname, index='trex_perf-000004', port=9200):
        self.hostname = hostname
        self.index = index
        self.port = port
        self.setup_names = ['trex07', 'trex08', 'trex09', 'trex11', 'kiwi02']
        self.es = elasticsearch.Elasticsearch([{"host": hostname, "port": port}])
        self.all_data_raw = {}
        self.all_data_parsed = {}

    @staticmethod
    def time_res_calculation():
        milli_since_epoch = int(time.time() * 1000)
        time_2w_ago = datetime.date.timetuple(datetime.datetime.utcnow() - datetime.timedelta(weeks=2))
        two_w_ago_epoch_milli = int(time.mktime(time_2w_ago) * 1000)
        return milli_since_epoch, two_w_ago_epoch_milli

    def fetch_all_data(self):
        res = {}
        milli_since_epoch, two_weeks_ago_epoch_milli = self.time_res_calculation()
        for setup_name in self.setup_names:
            query = {
                "_source": ["info.setup.name", "test.name", "test.mpps_pc", "timestamp", "build_id"],
                "size": 10000,
                "query": {
                    "bool": {
                        "filter": [
                            {"range": {
                                "timestamp": {"gte": two_weeks_ago_epoch_milli, "lte": milli_since_epoch,
                                              "format": "epoch_millis"}}},
                            {"term": {"info.setup.name": setup_name}},
                            {"term": {"test.type": "stateless"}}
                        ]
                    }
                }
            }
            res[setup_name] = list(elasticsearch.helpers.scan(self.es, index=self.index, query=query, size=10000))
        self.all_data_raw = res

    def parse_raw_data(self):
        for setup_name in self.all_data_raw:
            for query in self.all_data_raw[setup_name]:
                setup_name = query['_source']['info']['setup']['name']
                test_name = query['_source']['test']['name']
                test_result = query['_source']['test']['mpps_pc']
                timestamp = query['_source']['timestamp']
                build_id = query['_source']['build_id']
                if setup_name not in self.all_data_parsed.keys():
                    self.all_data_parsed[setup_name] = {}
                if test_name not in self.all_data_parsed[setup_name].keys():
                    self.all_data_parsed[setup_name][test_name] = []
                self.all_data_parsed[setup_name][test_name].append(tuple((test_name, timestamp, test_result, build_id)))
            self.all_data_parsed = self.sorted(self.all_data_parsed)

    @staticmethod
    def sorted(parsed_data):
        sorted_tests_data = {}
        for setup_name in parsed_data:
            setup_tests_data = parsed_data[setup_name]
            sorted_tests_data[setup_name] = {}
            for test_name in setup_tests_data:
                sorted_tests_data[setup_name][test_name] = sorted(setup_tests_data[test_name],
                                                                  key=lambda (_, timestamp, __, ___): timestamp)
        return sorted_tests_data

    def fetch_and_parse(self):
        self.fetch_all_data()
        self.parse_raw_data()
        return self.all_data_parsed