diff options
author | pmikus <pmikus@cisco.com> | 2021-04-19 12:22:20 +0000 |
---|---|---|
committer | Peter Mikus <pmikus@cisco.com> | 2021-05-28 05:51:32 +0000 |
commit | d255d2545ee6cdc871bc35314fad72c3c48b225b (patch) | |
tree | f4a0a6462ec9cc856829caa0641e87bcaf37cc4c /resources/tools/telemetry | |
parent | 82863d5b8422b1b817d86bd6b1829a06a49feb02 (diff) |
Framework: Telemetry retake
Signed-off-by: pmikus <pmikus@cisco.com>
Change-Id: I2f019a083916aec9f7816266f6ad5b92dcc31fa0
Diffstat (limited to 'resources/tools/telemetry')
-rw-r--r-- | resources/tools/telemetry/__init__.py | 12 | ||||
-rwxr-xr-x | resources/tools/telemetry/__main__.py | 49 | ||||
-rw-r--r-- | resources/tools/telemetry/bundle_bpf.py | 112 | ||||
-rw-r--r-- | resources/tools/telemetry/bundle_vpp.py | 505 | ||||
-rw-r--r-- | resources/tools/telemetry/executor.py | 107 | ||||
-rw-r--r-- | resources/tools/telemetry/metrics.py | 619 | ||||
-rw-r--r-- | resources/tools/telemetry/parser.py | 107 | ||||
-rw-r--r-- | resources/tools/telemetry/serializer.py | 110 |
8 files changed, 1621 insertions, 0 deletions
diff --git a/resources/tools/telemetry/__init__.py b/resources/tools/telemetry/__init__.py new file mode 100644 index 0000000000..284a1ce972 --- /dev/null +++ b/resources/tools/telemetry/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/resources/tools/telemetry/__main__.py b/resources/tools/telemetry/__main__.py new file mode 100755 index 0000000000..2ab87b661a --- /dev/null +++ b/resources/tools/telemetry/__main__.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Telemetry Exporter.""" + +from argparse import ArgumentParser, RawDescriptionHelpFormatter + +from .executor import Executor + +def main(): + """ + Main entry function when called from cli + """ + parser = ArgumentParser( + description=u"Telemetry Exporter.", + formatter_class=RawDescriptionHelpFormatter + ) + parser.add_argument( + u"--config", required=True, type=str, + help=u"YAML configuration file." + ) + parser.add_argument( + u"--hook", required=False, type=str, + help=u"Process ID or socket." + ) + parser.add_argument( + u"--daemon", required=False, type=bool, + help=u"Run as daemon." + ) + args = parser.parse_args() + if args.daemon: + Executor(args.config).execute_daemon(args.hook) + else: + Executor(args.config).execute(args.hook) + +if __name__ == u"__main__": + main() diff --git a/resources/tools/telemetry/bundle_bpf.py b/resources/tools/telemetry/bundle_bpf.py new file mode 100644 index 0000000000..77bc9acf91 --- /dev/null +++ b/resources/tools/telemetry/bundle_bpf.py @@ -0,0 +1,112 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""BPF performance bundle.""" + +from logging import getLogger +import sys + +from bcc import BPF + + +class BundleBpf: + """ + Creates a BPF object. This is the main object for defining a BPF program, + and interacting with its output. + + Syntax: BPF({text=BPF_program | src_file=filename} + [, usdt_contexts=[USDT_object, ...]] + [, cflags=[arg1, ...]] [, debug=int] + ) + + Exactly one of text or src_file must be supplied (not both). + """ + def __init__(self, program, serializer, hook): + """Initialize Bundle BPF Perf event class. + + :param program: BPF C code. + :param serializer: Metric serializer. + :param hook: Process ID. + :type program: dict + :type serializer: Serializer + :type hook: int + """ + self.obj = None + self.code = program[u"code"] + self.metrics = program[u"metrics"] + self.events = program[u"events"] + self.api_replies_list = list() + self.serializer = serializer + self.hook = hook + + self.obj = BPF(text=self.code) + + def attach(self, duration): + """ + Attach events to BPF. + + :param duration: Trial duration. + :type duration: int + """ + try: + for event in self.events: + self.obj.attach_perf_event( + ev_type=event[u"type"], + ev_config=event[u"name"], + fn_name=event[u"target"], + sample_period=duration + ) + except AttributeError: + getLogger(__name__).error(u"Cannot attach BPF events!") + sys.exit(1) + + def detach(self): + """ + Dettach events from BPF. + """ + try: + for event in self.events: + self.obj.detach_perf_event( + ev_type=event[u"type"], + ev_config=event[u"name"] + ) + except AttributeError: + getLogger(__name__).error(u"Cannot dettach BPF events!") + sys.exit(1) + + def fetch_data(self): + """ + Fetch data by invoking API calls to BPF. + """ + self.serializer.create(metrics=self.metrics) + for _, metric_list in self.metrics.items(): + for metric in metric_list: + for (key, val) in self.obj.get_table(metric[u"name"]).items(): + item = dict() + labels = dict() + item[u"name"] = metric[u"name"] + item[u"value"] = val.value + for label in metric[u"labels"]: + labels[label] = getattr(key, label) + item[u"labels"] = labels + self.api_replies_list.append(item) + getLogger(__name__).info(item) + + def process_data(self): + """ + Post process API replies. + """ + for item in self.api_replies_list: + self.serializer.serialize( + metric=item[u"name"], labels=item[u"labels"], item=item + ) diff --git a/resources/tools/telemetry/bundle_vpp.py b/resources/tools/telemetry/bundle_vpp.py new file mode 100644 index 0000000000..01526fe83f --- /dev/null +++ b/resources/tools/telemetry/bundle_vpp.py @@ -0,0 +1,505 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VPP execution bundle.""" + +from copy import deepcopy +from logging import getLogger +from re import fullmatch, sub +import struct +import sys + +from vpp_papi.vpp_papi import VPPApiClient as vpp_class + + +M_RUN_THREAD = ( + r"Thread\s" + r"(?P<thread_id>\d+)\s" + r"(?P<thread_name>\S+)\s.*" + r"(?P<thread_lcore>\d+).*" +) +M_RUN_SEPARATOR = ( + r"(-)+" +) +M_RUN_NODES = ( + r"(?P<name>\S+)\s+" + r"(?P<state>\S+\s\S+|\S+)\s+" + r"(?P<calls>\d+)\s+" + r"(?P<vectors>\d+)\s+" + r"(?P<suspends>\d+)\s+" + r"(?P<clocks>\S+)\s+" + r"(?P<vectors_calls>\S+)" +) +M_RUN_TIME = ( + r"Time\s\S+,\s\d+\ssec\sinternal\snode\svector\srate\s" + r"(?P<rate>\S+)\sloops/sec\s" + r"(?P<loops>\S+)" +) +M_INT_BEGIN = ( + r"(?P<name>\S+)\s+" + r"(?P<index>\S+)\s+" + r"(?P<state>\S+)\s+" + r"(?P<mtu>\S+)\s+" + r"(?P<counter>\S+\s\S+|\S+)\s+" + r"(?P<count>\d+)" +) +M_INT_CONT = ( + r"\s+" + r"(?P<counter>\S+\s\S+|\S+)\s+" + r"(?P<count>\d+)" +) +M_NODE_COUNTERS_THREAD = ( + r"Thread\s" + r"(?P<thread_id>\d+)\s\(" + r"(?P<thread_name>\S+)\):\s*" +) +M_NODE_COUNTERS = ( + r"\s*" + r"(?P<count>\d+)\s+" + r"(?P<name>\S+)\s+" + r"(?P<reason>(\S+\s)+)\s+" + r"(?P<severity>\S+)\s+" + r"(?P<index>\d+)\s*" +) +M_PMB_CS_HEADER = ( + r"\s*per-thread\s+context\s+switches.*" +) +M_PMB_CS = ( + r"(?P<thread_name>\S+)\s+\(" + r"(?P<thread_id>\S+)\)\s+\S+\s+" + r"(?P<context_switches>[\d\.]+)" +) +M_PMB_PF_HEADER = ( + r"\s*per-thread\s+page\s+faults.*" +) +M_PMB_PF = ( + r"(?P<thread_name>\S+)\s+\(" + r"(?P<thread_id>\S+)\)\s+\S+\s+" + r"(?P<minor_page_faults>[\d\.]+)\s+" + r"(?P<major_page_faults>[\d\.]+)" +) +M_PMB_THREAD = ( + r"\s*" + r"(?P<thread_name>\S+)\s+\(" + r"(?P<thread_id>\d+)\)\s*" +) +M_PMB_IC_HEADER = ( + r"\s*instructions/packet,\s+cycles/packet\s+and\s+IPC.*" +) +M_PMB_IC_NODE = ( + r"\s*" + r"(?P<node_name>\S+)\s+" + r"(?P<calls>[\d\.]+)\s+" + r"(?P<packets>[\d\.]+)\s+" + r"(?P<packets_per_call>[\d\.]+)\s+" + r"(?P<clocks_per_packets>[\d\.]+)\s+" + r"(?P<instructions_per_packets>[\d\.]+)\s+" + r"(?P<ipc>[\d\.]+)" +) +M_PMB_CM_HEADER = ( + r"\s*cache\s+hits\s+and\s+misses.*" +) +M_PMB_CM_NODE = ( + r"\s*" + r"(?P<node_name>\S+)\s+" + r"(?P<l1_hit>[\d\.]+)\s+" + r"(?P<l1_miss>[\d\.]+)\s+" + r"(?P<l2_hit>[\d\.]+)\s+" + r"(?P<l2_miss>[\d\.]+)\s+" + r"(?P<l3_hit>[\d\.]+)\s+" + r"(?P<l3_miss>[\d\.]+)" +) +M_PMB_LO_HEADER = ( + r"\s*load\s+operations.*" +) +M_PMB_LO_NODE = ( + r"\s*" + r"(?P<node_name>\S+)\s+" + r"(?P<calls>[\d\.]+)\s+" + r"(?P<packets>[\d\.]+)\s+" + r"(?P<one>[\d\.]+)\s+" + r"(?P<two>[\d\.]+)\s+" + r"(?P<three>[\d\.]+)" +) +M_PMB_BM_HEADER = ( + r"\s*Branches,\s+branches\s+taken\s+and\s+mis-predictions.*" +) +M_PMB_BM_NODE = ( + r"\s*" + r"(?P<node_name>\S+)\s+" + r"(?P<branches_per_call>[\d\.]+)\s+" + r"(?P<branches_per_packet>[\d\.]+)\s+" + r"(?P<taken_per_call>[\d\.]+)\s+" + r"(?P<taken_per_packet>[\d\.]+)\s+" + r"(?P<mis_predictions>[\d\.]+)" +) +M_PMB_PL_HEADER = ( + r"\s*Thread\s+power\s+licensing.*" +) +M_PMB_PL_NODE = ( + r"\s*" + r"(?P<node_name>\S+)\s+" + r"(?P<lvl0>[\d\.]+)\s+" + r"(?P<lvl1>[\d\.]+)\s+" + r"(?P<lvl2>[\d\.]+)\s+" + r"(?P<throttle>[\d\.]+)" +) +M_PMB_MB_HEADER = ( + r"\s*memory\s+reads\s+and\s+writes\s+per\s+memory\s+controller.*" +) +M_PMB_MB = ( + r"\s*" + r"(?P<name>\S+)\s+" + r"(?P<runtime>[\d\.]+)\s+" + r"(?P<reads_mbs>[\d\.]+)\s+" + r"(?P<writes_mbs>[\d\.]+)\s+" + r"(?P<total_mbs>[\d\.]+)" +) + + +class BundleVpp: + """ + Creates a VPP object. This is the main object for defining a VPP program, + and interacting with its output. + """ + def __init__(self, program, serializer, hook): + """ + Initialize Bundle VPP class. + + :param program: VPP instructions. + :param serializer: Metric serializer. + :param hook: VPP API socket. + :type program: dict + :type serializer: Serializer + :type hook: int + """ + self.obj = None + self.code = program[u"code"] + self.metrics = program[u"metrics"] + self.api_command_list = list() + self.api_replies_list = list() + self.serializer = serializer + + vpp_class.apidir = u"/usr/share/vpp/api" + self.obj = vpp_class( + use_socket=True, + server_address=hook, + async_thread=False, + read_timeout=14, + logger=getLogger(__name__) + ) + + def attach(self, duration): + """ + Attach events to VPP. + + :param duration: Trial duration. + :type duration: int + """ + try: + self.obj.connect(name=u"telemetry") + except (ConnectionRefusedError, OSError): + getLogger(__name__).error(u"Cannot connect to VPP!") + sys.exit(1) + + for command in self.code.splitlines(): + api_name = u"cli_inband" + api_args = dict(cmd=command.format(duration=duration)) + self.api_command_list.append( + dict(api_name=api_name, api_args=deepcopy(api_args)) + ) + + def detach(self): + """ + Detach from VPP. + """ + self.obj.disconnect() + + def fetch_data(self): + """ + Fetch data by invoking API calls to VPP socket. + """ + for command in self.api_command_list: + try: + papi_fn = getattr(self.obj.api, command[u"api_name"]) + getLogger(__name__).info(command[u"api_args"][u"cmd"]) + replies = papi_fn(**command[u"api_args"]) + except (AttributeError, IOError, struct.error) as err: + raise AssertionError(err) + + if not isinstance(replies, list): + replies = [replies] + for reply in replies: + self.api_replies_list.append(reply) + reply = sub(r"\x1b[^m]*m", u"", reply.reply) + if reply: + getLogger(__name__).info(reply) + else: + getLogger(__name__).info(u"<no reply>") + self.serializer.create(metrics=self.metrics) + + def process_data(self): + """ + Post process command reply. + """ + for command in zip(self.api_command_list, self.api_replies_list): + self_fn = command[0][u"api_args"][u"cmd"] + try: + self_fn = getattr(self, self_fn.replace(u" ", u"_")) + self_fn(command[1].reply) + except AttributeError: + pass + + def show_interface(self, reply): + """ + Parse the show interface output. + + Output format: + { + "name": "rx_packets", + "labels": { + "name": "tap0", + "index": "0", + }, + "value": "31", + }, + + :param reply: API reply. + :type reply: str + """ + for line in reply.splitlines(): + item = dict() + labels = dict() + if fullmatch(M_INT_BEGIN, line): + ifc = fullmatch(M_INT_BEGIN, line).groupdict() + metric = ifc[u"counter"].replace(" ", "_").replace("-", "_") + item[u"name"] = metric + item[u"value"] = ifc[u"count"] + if fullmatch(M_INT_CONT, line): + ifc_cnt = fullmatch(M_INT_CONT, line).groupdict() + metric = ifc_cnt[u"counter"].replace(" ", "_").replace("-", "_") + item[u"name"] = metric + item[u"value"] = ifc_cnt[u"count"] + if fullmatch(M_INT_BEGIN, line) or fullmatch(M_INT_CONT, line): + labels[u"name"] = ifc[u"name"] + labels[u"index"] = ifc[u"index"] + item[u"labels"] = labels + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + def show_runtime(self, reply): + """ + Parse the show runtime output. + + Output format: + { + "name": "clocks", + "labels": { + "name": "virtio-input", + "state": "polling", + "thread_name": "vpp_wk_1", + "thread_id": "2", + "thread_lcore": "3", + }, + "value": "3.17e2", + }, + + :param reply: API reply. + :type reply: str + """ + for line in reply.splitlines(): + if fullmatch(M_RUN_THREAD, line): + thread = fullmatch(M_RUN_THREAD, line).groupdict() + if fullmatch(M_RUN_NODES, line): + nodes = fullmatch(M_RUN_NODES, line).groupdict() + for metric in self.serializer.metric_registry: + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"name"] = nodes[u"name"] + labels[u"state"] = nodes[u"state"] + try: + labels[u"thread_name"] = thread[u"thread_name"] + labels[u"thread_id"] = thread[u"thread_id"] + labels[u"thread_lcore"] = thread[u"thread_lcore"] + except UnboundLocalError: + labels[u"thread_name"] = u"vpp_main" + labels[u"thread_id"] = u"0" + labels[u"thread_lcore"] = u"0" + item[u"labels"] = labels + item[u"value"] = nodes[metric] + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + def show_node_counters_verbose(self, reply): + """ + Parse the show node conuter output. + + Output format: + { + "name": "node_counters", + "labels": { + "name": "dpdk-input", + "reason": "no_error", + "severity": "error", + "thread_name": "vpp_wk_1", + "thread_id": "2", + }, + "value": "1", + }, + + :param reply: API reply. + :type reply: str + """ + for line in reply.splitlines(): + if fullmatch(M_NODE_COUNTERS_THREAD, line): + thread = fullmatch(M_NODE_COUNTERS_THREAD, line).groupdict() + if fullmatch(M_NODE_COUNTERS, line): + nodes = fullmatch(M_NODE_COUNTERS, line).groupdict() + for metric in self.serializer.metric_registry_registry: + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"name"] = nodes[u"name"] + labels[u"reason"] = nodes[u"reason"] + labels[u"severity"] = nodes[u"severity"] + try: + labels[u"thread_name"] = thread[u"thread_name"] + labels[u"thread_id"] = thread[u"thread_id"] + except UnboundLocalError: + labels[u"thread_name"] = u"vpp_main" + labels[u"thread_id"] = u"0" + item[u"labels"] = labels + item[u"value"] = nodes[u"count"] + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + def show_perfmon_statistics(self, reply): + """ + Parse the permon output. + + Output format: + { + "name": "clocks", + "labels": { + "name": "virtio-input", + "state": "polling", + "thread_name": "vpp_wk_1", + "thread_id": "2", + "thread_lcore": "3", + }, + "value": "3.17e2", + }, + + :param reply: API reply. + :type reply: str + """ + def perfmon_threads(reply, regex_threads): + for line in reply.splitlines(): + if fullmatch(regex_threads, line): + threads = fullmatch(regex_threads, line).groupdict() + for metric in self.serializer.metric_registry: + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"name"] = threads[u"thread_name"] + labels[u"id"] = threads[u"thread_id"] + item[u"labels"] = labels + item[u"value"] = threads[metric] + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + def perfmon_nodes(reply, regex_threads, regex_nodes): + for line in reply.splitlines(): + if fullmatch(regex_threads, line): + thread = fullmatch(regex_threads, line).groupdict() + if fullmatch(regex_nodes, line): + node = fullmatch(regex_nodes, line).groupdict() + for metric in self.serializer.metric_registry: + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"name"] = node[u"node_name"] + labels[u"thread_name"] = thread[u"thread_name"] + labels[u"thread_id"] = thread[u"thread_id"] + item[u"labels"] = labels + item[u"value"] = node[metric] + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + def perfmon_system(reply, regex_line): + for line in reply.splitlines(): + if fullmatch(regex_line, line): + name = fullmatch(regex_line, line).groupdict() + for metric in self.serializer.metric_registry: + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"name"] = name[u"name"] + item[u"labels"] = labels + item[u"value"] = name[metric] + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) + + reply = sub(r"\x1b[^m]*m", u"", reply) + + if fullmatch(M_PMB_CS_HEADER, reply.splitlines()[0]): + perfmon_threads(reply, M_PMB_CS) + if fullmatch(M_PMB_PF_HEADER, reply.splitlines()[0]): + perfmon_threads(reply, M_PMB_PF) + if fullmatch(M_PMB_IC_HEADER, reply.splitlines()[0]): + perfmon_nodes(reply, M_PMB_THREAD, M_PMB_IC_NODE) + if fullmatch(M_PMB_CM_HEADER, reply.splitlines()[0]): + perfmon_nodes(reply, M_PMB_THREAD, M_PMB_CM_NODE) + if fullmatch(M_PMB_LO_HEADER, reply.splitlines()[0]): + perfmon_nodes(reply, M_PMB_THREAD, M_PMB_LO_NODE) + if fullmatch(M_PMB_BM_HEADER, reply.splitlines()[0]): + perfmon_nodes(reply, M_PMB_THREAD, M_PMB_BM_NODE) + if fullmatch(M_PMB_PL_HEADER, reply.splitlines()[0]): + perfmon_nodes(reply, M_PMB_THREAD, M_PMB_PL_NODE) + if fullmatch(M_PMB_MB_HEADER, reply.splitlines()[0]): + perfmon_system(reply, M_PMB_MB) + + def show_version(self, reply): + """ + Parse the version output. + + Output format: + { + "name": "version", + "labels": { + "version": "v21.06-rc0~596-g1ca6c65e5~b1065", + }, + "value": 1.0, + }, + + :param reply: API reply. + :type reply: str + """ + for metric in self.serializer.metric_registry: + version = reply.split()[1] + item = dict() + labels = dict() + item[u"name"] = metric + labels[u"version"] = version + item[u"labels"] = labels + item[u"value"] = 1.0 + self.serializer.serialize( + metric=metric, labels=labels, item=item + ) diff --git a/resources/tools/telemetry/executor.py b/resources/tools/telemetry/executor.py new file mode 100644 index 0000000000..75db4b6a40 --- /dev/null +++ b/resources/tools/telemetry/executor.py @@ -0,0 +1,107 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Config executor library.""" + +from importlib import import_module +from logging.config import dictConfig +from logging import getLogger +import sys + +from .parser import Parser +from .serializer import Serializer + + +class Executor: + """ + Executor class reponsible for executing configuration. + """ + def __init__(self, configuration_file): + """ + Config Executor init. + + :param configuration_file: Telemetry configuration file path. + :type configuration_file: str + """ + self.parser = Parser(configuration_file) + self.log = self.parser.config[u"logging"] + self.programs = self.parser.config[u"programs"] + self.scheduler = self.parser.config[u"scheduler"] + + dictConfig(self.log) + + def execute(self, hook=None): + """ + Main executor function will run programs from all bundles in a loop. + + Function call: + attach(duration) + fetch_data() + process_data() + detach() + + :param hook: Process ID or socket to attach. None by default. + :type hook: int + """ + for program in self.programs: + serializer = Serializer() + try: + package = program[u"name"] + name = f"telemetry.{package}" + package = package.replace("_", " ").title().replace(" ", "") + module = import_module( + name=name, + package=package + ) + bundle = getattr(module, package)( + program=program, + serializer=serializer, + hook=hook + ) + bundle.attach(duration=self.scheduler[u"duration"]) + bundle.fetch_data() + bundle.process_data() + bundle.detach() + except (ImportError, AttributeError) as exc: + raise ExecutorError( + f"Error executing bundle {package!r}! - {exc}" + ) + serializer.publish() + + def execute_daemon(self, hook=None): + """ + Daemon executor will execute endless loop. + + :param hook: Process ID to attach. None by default. + :type hook: int + """ + while True: + self.execute(hook=hook) + + +class ExecutorError(Exception): + """ + Creates a Executor Error Exception. This exception is supposed to handle + all the errors raised during executing. + """ + def __init__(self, message): + """ + Execute Error Excpetion init. + + :param message: Exception error message. + :type message: str + """ + super().__init__() + self.message = message + getLogger(__name__).error(message) + sys.exit(1) diff --git a/resources/tools/telemetry/metrics.py b/resources/tools/telemetry/metrics.py new file mode 100644 index 0000000000..e5a66b3e0c --- /dev/null +++ b/resources/tools/telemetry/metrics.py @@ -0,0 +1,619 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Metric library.""" + +from collections import namedtuple +from threading import Lock +from time import monotonic +import re + + +class Value: + """ + A value storage protected by a mutex. + """ + def __init__(self): + """ + Initialize value to default and create a lock. + """ + self._value = 0.0 + self._lock = Lock() + self._timestamp = None + + def inc(self, amount): + """ + Increment value by amount under mutex. + Add a timestamp of capturing value. + + :param amount: Amount of increment. + :type amount: int or float + """ + with self._lock: + self._value += amount + self._timestamp = monotonic() + + def set(self, value): + """ + Set to a specific value under mutex. + Add a timestamp of capturing value. + + :param value: Amount of increment. + :type value: int or float + """ + with self._lock: + self._value = value + self._timestamp = monotonic() + + def get(self): + """ + Get a value under mutex. + + :returns: Stored value. + :rtype: int or float + """ + with self._lock: + return self._value + + def get_timestamp(self): + """ + Get a timestamp under mutex. + + :returns: Stored timestamp. + :rtype: str + """ + with self._lock: + return self._timestamp + + +class Metric: + """ + A single metric parent and its samples. + """ + def __init__(self, name, documentation, typ): + """ + Initialize class and do basic sanitize. + + :param name: Full metric name. + :param documentation: Metric HELP string. + :param typ: Metric type [counter|gauge|info]. + :type name: str + :type documentation: str + :type typ: str + """ + self.metric_types = ( + u"counter", u"gauge", u"info" + ) + self.metric_sample = namedtuple( + u"Sample", [u"name", u"labels", u"value", u"timestamp"] + ) + + if not re.compile(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").match(name): + raise ValueError(f"Invalid metric name: {name}!") + if typ not in self.metric_types: + raise ValueError(f"Invalid metric type: {typ}!") + + self.name = name + self.documentation = documentation + self.type = typ + self.samples = [] + + def add_sample(self, name, labels, value, timestamp): + """ + Add a sample (entry) to the metric. + + :param name: Full metric name. + :param labels: Metric labels. + :param value: Metric value. + :param timestamp: Timestamp. Default to be when accessed. + :type name: str + :type lables: tuple + :type value: int or float + :type timestamp: float + """ + self.samples.append( + self.metric_sample(name, labels, value, timestamp) + ) + + def __eq__(self, other): + """ + Check equality of added metric. + + :param other: Metric to compare. + :type other: Metric + """ + return (isinstance(other, Metric) + and self.name == other.name + and self.documentation == other.documentation + and self.type == other.type + and self.samples == other.samples) + + def __repr__(self): + """ + Represantation as a string for a debug print. + """ + return ( + f"Metric({self.name}, " + f"{self.documentation}, " + f"{self.type}, " + f"{self.samples})" + ) + + +class MetricBase: + """ + Abstract class for Metric implementation. + """ + _type = None + + def __init__( + self, name, documentation, labelnames=(), namespace="", + subsystem="", labelvalues=None, + ): + """ + Metric initialization. + + :param name: Metric name. + :param documentation: Metric HELP string. + :param labelnames: Metric label list. + :param namespace: Metric namespace (will be added as prefix). + :param subsystem: Metric susbsystem (will be added as prefix). + :param labelvalues: Metric label values. + :type name: str + :type documentation: str + :type labelnames: list + :type namespace: str + :type subsystem: str + :type labelvalues: list + """ + self._name = self.validate_name(name, namespace, subsystem) + self._labelnames = self.validate_labelnames(labelnames) + self._labelvalues = tuple(labelvalues or ()) + self._documentation = documentation + + if self._is_parent(): + self._lock = Lock() + self._metrics = {} + + if self._is_observable(): + self._metric_init() + + @staticmethod + def validate_name(name, namespace, subsystem): + """ + Construct metric full name and validate naming convention. + + :param name: Metric name. + :param namespace: Metric namespace (will be added as prefix). + :param subsystem: Metric susbsystem (will be added as prefix). + :type name: str + :type namespace: str + :type subsystem: str + :returns: Metric full name. + :rtype: str + :rasies ValueError: If name does not conform with naming conventions. + """ + full_name = u"" + full_name += f"{namespace}_" if namespace else u"" + full_name += f"{subsystem}_" if subsystem else u"" + full_name += name + + if not re.compile(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").match(full_name): + raise ValueError( + f"Invalid metric name: {full_name}!" + ) + return full_name + + @staticmethod + def validate_labelnames(labelnames): + """ + Create label tuple and validate naming convention. + + :param labelnames: Metric label list. + :type labelnames: list + :returns: Label names. + :rtype: tuple + :rasies ValueError: If name does not conform with naming conventions. + """ + labelnames = tuple(labelnames) + for label in labelnames: + if not re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$").match(label): + raise ValueError(f"Invalid label metric name: {label}!") + if re.compile(r"^__.*$").match(label): + raise ValueError(f"Reserved label metric name: {label}!") + return labelnames + + def _is_observable(self): + """ + Check whether this metric is observable, i.e. + * a metric without label names and values, or + * the child of a labelled metric. + + :return: Observable + :rtype: bool + """ + return not self._labelnames or (self._labelnames and self._labelvalues) + + def _is_parent(self): + """ + Check whether metric is parent, i.e. + * a metric with label names but not its values. + + :return: Parent + :rtype: bool + """ + return self._labelnames and not self._labelvalues + + def _get_metric(self): + """ + Returns metric that will handle samples. + + :returns: Metric object. + :rtype: Metric + """ + return Metric(self._name, self._documentation, self._type) + + def describe(self): + """ + Returns metric that will handle samples. + + :returns: List of metric objects. + :rtype: list + """ + return [self._get_metric()] + + def collect(self): + """ + Returns metric with samples. + + :returns: List with metric object. + :rtype: list + """ + metric = self._get_metric() + for suffix, labels, value, timestamp in self.samples(): + metric.add_sample(self._name + suffix, labels, value, timestamp) + return [metric] + + def labels(self, *labelvalues, **labelkwargs): + """ + Return the child for the given labelset. + + :param labelvalues: Label values. + :param labelkwargs: Dictionary with label names and values. + :type labelvalues: list + :type labelkwargs: dict + :returns: Metric with labels and values. + :rtype: Metric + :raises ValueError: If labels were not initialized. + :raises ValueError: If labels are already set (chaining). + :raises ValueError: If both parameters are passed. + :raises ValueError: If label values are not matching label names. + """ + if not self._labelnames: + raise ValueError( + f"No label names were set when constructing {self}!" + ) + + if self._labelvalues: + raise ValueError( + f"{self} already has labels set; can not chain .labels() calls!" + ) + + if labelvalues and labelkwargs: + raise ValueError( + u"Can't pass both *args and **kwargs!" + ) + + if labelkwargs: + if sorted(labelkwargs) != sorted(self._labelnames): + raise ValueError(u"Incorrect label names!") + labelvalues = tuple(labelkwargs[l] for l in self._labelnames) + else: + if len(labelvalues) != len(self._labelnames): + raise ValueError(u"Incorrect label count!") + labelvalues = tuple(l for l in labelvalues) + with self._lock: + if labelvalues not in self._metrics: + self._metrics[labelvalues] = self.__class__( + self._name, + documentation=self._documentation, + labelnames=self._labelnames, + labelvalues=labelvalues + ) + return self._metrics[labelvalues] + + def samples(self): + """ + Returns samples wheter an object is parent or child. + + :returns: List of Metric objects with values. + :rtype: list + """ + if self._is_parent(): + return self._multi_samples() + return self._child_samples() + + def _multi_samples(self): + """ + Returns parent and its childs with its values. + + :returns: List of Metric objects with values. + :rtype: list + """ + with self._lock: + metrics = self._metrics.copy() + for labels, metric in metrics.items(): + series_labels = list(zip(self._labelnames, labels)) + for suffix, sample_labels, value, timestamp in metric.samples(): + yield ( + suffix, dict(series_labels + list(sample_labels.items())), + value, timestamp + ) + + def _child_samples(self): + """ + Returns child with its values. Should be implemented by child class. + + :raises NotImplementedError: If implementation in not in subclass. + """ + raise NotImplementedError( + f"_child_samples() must be implemented by {self}!" + ) + + def _metric_init(self): + """ + Initialize the metric object as a child. + + :raises NotImplementedError: If implementation in not in subclass. + """ + raise NotImplementedError( + f"_metric_init() must be implemented by {self}!" + ) + + def __str__(self): + """ + String for a debug print. + """ + return f"{self._type}:{self._name}" + + def __repr__(self): + """ + Represantation as a string for a debug print. + """ + metric_type = type(self) + return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" + + +class Counter(MetricBase): + """ + A Counter tracks counts of events or running totals. + """ + _type = u"counter" + + def __init__(self, + name, + documentation, + labelnames=(), + namespace=u"", + subsystem=u"", + labelvalues=None + ): + """ + Initialize the Counter metric object. + + :param name: Metric name. + :param documentation: Metric HELP string. + :param labelnames: Metric label list. + :param namespace: Metric namespace (will be added as prefix). + :param subsystem: Metric susbsystem (will be added as prefix). + :param labelvalues: Metric label values. + :type name: str + :type documentation: str + :type labelnames: list + :type namespace: str + :type subsystem: str + :type labelvalues: list + """ + super(Counter, self).__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + labelvalues=labelvalues, + ) + + def _metric_init(self): + """ + Initialize counter value. + """ + self._value = Value() + + def inc(self, amount=1): + """ + Increment counter by the given amount. + + :param amount: Amount to increment. + :type amount: int or float + :raises ValueError: If amout is not positive. + """ + if amount < 0: + raise ValueError( + u"Counters can only be incremented by non-negative amounts." + ) + self._value.inc(amount) + + def _child_samples(self): + """ + Returns list of child samples. + + :returns: List of child samples. + :rtype: tuple + """ + return ((u"", {}, self._value.get(), self._value.get_timestamp()),) + + +class Gauge(MetricBase): + """ + Gauge metric, to report instantaneous values. + """ + _type = u"gauge" + + def __init__(self, + name, + documentation, + labelnames=(), + namespace=u"", + subsystem=u"", + labelvalues=None + ): + """ + Initialize the Gauge metric object. + + :param name: Metric name. + :param documentation: Metric HELP string. + :param labelnames: Metric label list. + :param namespace: Metric namespace (will be added as prefix). + :param subsystem: Metric susbsystem (will be added as prefix). + :param labelvalues: Metric label values. + :type name: str + :type documentation: str + :type labelnames: list + :type namespace: str + :type subsystem: str + :type labelvalues: list + """ + super(Gauge, self).__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + labelvalues=labelvalues, + ) + + def _metric_init(self): + """ + Initialize gauge value. + """ + self._value = Value() + + def inc(self, amount=1): + """ + Increment gauge by the given amount. + + :param amount: Amount to increment. + :type amount: int or float + """ + self._value.inc(amount) + + def dec(self, amount=1): + """ + Decrement gauge by the given amount. + + :param amount: Amount to decrement. + :type amount: int or float + """ + self._value.inc(-amount) + + def set(self, value): + """ + Set gauge to the given value. + + :param amount: Value to set. + :type amount: int or float + """ + self._value.set(float(value)) + + def _child_samples(self): + """ + Returns list of child samples. + + :returns: List of child samples. + :rtype: tuple + """ + return ((u"", {}, self._value.get(), self._value.get_timestamp()),) + + +class Info(MetricBase): + """ + Info metric, key-value pairs. + """ + _type = u"info" + + def __init__(self, + name, + documentation, + labelnames=(), + namespace=u"", + subsystem=u"", + labelvalues=None + ): + """ + Initialize the Info metric object. + + :param name: Metric name. + :param documentation: Metric HELP string. + :param labelnames: Metric label list. + :param namespace: Metric namespace (will be added as prefix). + :param subsystem: Metric susbsystem (will be added as prefix). + :param labelvalues: Metric label values. + :type name: str + :type documentation: str + :type labelnames: list + :type namespace: str + :type subsystem: str + :type labelvalues: list + """ + super(Info, self).__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + namespace=namespace, + subsystem=subsystem, + labelvalues=labelvalues, + ) + + def _metric_init(self): + """ + Initialize gauge value and time it was created. + """ + self._labelname_set = set(self._labelnames) + self._lock = Lock() + self._value = {} + + def info(self, value): + """ + Set info to the given value. + + :param amount: Value to set. + :type amount: int or float + :raises ValueError: If lables are overlapping. + """ + if self._labelname_set.intersection(value.keys()): + raise ValueError( + u"Overlapping labels for Info metric, " + f"metric: {self._labelnames} child: {value}!" + ) + with self._lock: + self._value = dict(value) + + def _child_samples(self): + """ + Returns list of child samples. + + :returns: List of child samples. + :rtype: tuple + """ + with self._lock: + return ((u"_info", self._value, 1.0, monotonic()),) diff --git a/resources/tools/telemetry/parser.py b/resources/tools/telemetry/parser.py new file mode 100644 index 0000000000..c6cc167066 --- /dev/null +++ b/resources/tools/telemetry/parser.py @@ -0,0 +1,107 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Configuration parsing library.""" + +from logging import getLogger +from pathlib import Path +import sys + +from yaml import safe_load, YAMLError + + +class Parser: + """ + Parser class reponsible for loading configuration. + """ + def __init__(self, configuration_file): + """ + Config Parser init. + + :param configuration_file: Telemetry configuration file path. + :type configuration_file: str + """ + self.instance = None + self.config = None + self.suffix = Path(configuration_file).suffix[1:].capitalize() + + try: + self.instance = globals()[self.suffix+"Loader"](configuration_file) + except KeyError: + raise ParserError(u"Unsupported file format!") + + self.config = FileLoader(self.instance).load() + + +class FileLoader: + """ + Creates a File Loader object. This is the main object for interacting + with configuration file. + """ + def __init__(self, loader): + """ + File Loader class init. + + :param loader: Loader object responsible for handling file type. + :type loader: obj + """ + self.loader = loader + + def load(self): + """ + File format parser. + """ + return self.loader.load() + + +class YamlLoader: + """ + Creates a YAML Loader object. This is the main object for interacting + with YAML file. + """ + def __init__(self, configuration_file): + """ + YAML Loader class init. + + :param configuration_file: YAML configuration file path. + :type configuration_file: str + """ + self.configuration_file = configuration_file + + def load(self): + """ + YAML format parser. + """ + with open(self.configuration_file, u"r") as stream: + try: + return safe_load(stream) + except YAMLError as exc: + raise ParserError(str(exc)) + + +class ParserError(Exception): + """ + Creates a Parser Error Exception. This exception is supposed to handle + all the errors raised during processing. + """ + def __init__(self, message): + """ + Parser Error Excpetion init. + + :param message: Exception error message. + :type message: str + """ + super().__init__() + self.message = message + getLogger(__name__).error(self.message) + sys.exit(1) diff --git a/resources/tools/telemetry/serializer.py b/resources/tools/telemetry/serializer.py new file mode 100644 index 0000000000..e8315c832d --- /dev/null +++ b/resources/tools/telemetry/serializer.py @@ -0,0 +1,110 @@ +# Copyright (c) 2021 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Config executor library.""" + +from importlib import import_module +from logging import getLogger + + +class Serializer: + """ + Executor class reponsible for executing configuration. + """ + def __init__(self): + """ + Config Executor init.= + """ + self.metric_registry = dict() + + def create(self, metrics): + """ + Create metrics based on input configuration. + + :param metrics: Metric list to create. + :type metrics: list + """ + for metric_type, metric_list in metrics.items(): + for metric in metric_list: + module = import_module( + name=u"telemetry.metrics", package=metric_type.capitalize() + ) + self.metric_registry[metric[u"name"]] = getattr( + module, metric_type.capitalize() + )(**metric) + + def serialize(self, metric, labels, item): + """ + Serialize metric into destination format. + + :param metrics: Metric name. + :param labels: Metric labels. + :param item: Metric dict. + :type metrics: str + :type labels: dict + :type item: dict + """ + if type(self.metric_registry[metric]).__name__ == u"Counter": + self.metric_registry[metric].labels(**labels).inc( + float(item[u"value"]) + ) + if type(self.metric_registry[metric]).__name__ == u"Gauge": + self.metric_registry[metric].labels(**labels).set( + float(item[u"value"]) + ) + if type(self.metric_registry[metric]).__name__ == u"Info": + self.metric_registry[metric].labels(**labels).info( + item[u"value"] + ) + + def publish(self): + """ + Publish metric into logger. + """ + output = [] + for _, metric_list in self.metric_registry.items(): + for metric in metric_list.collect(): + mname = metric.name + mtype = metric.type + + # Adjust from OpenMetrics into Prometheus format. + mname = f"{mname}_total" if mtype == u"counter" else mname + mname = f"{mname}_info" if mtype == u"info" else mname + if mtype in (u"info", u"stateset"): + mtype = u"gauge" + if mtype in (u"gaugehistogram", u"histogram"): + mtype = u"histogram" + + mdocumentation = metric.documentation.replace(u"\\", r"\\") + mdocumentation = mdocumentation.replace(u"\n", r"\n") + output.append(f"# HELP {mname} {mdocumentation}\n") + output.append(f"# TYPE {mname} {mtype}\n") + + for line in metric.samples: + if line.labels: + llabel = [] + for k, value in sorted(line.labels.items()): + value = value.replace(u"\\", r"\\") + value = value.replace(u"\n", r"\n") + value = value.replace(u'"', r'\"') + llabel.append(f'{k}="{value}"') + labelstr = f"{{{','.join(llabel)}}}" + else: + labelstr = u"" + + timestamp = f" {int(float(line.timestamp) * 1000):d}" \ + if line.timestamp else u"" + output.append( + f"{line.name}{labelstr} {line.value}{timestamp}\n" + ) + getLogger(u"prom").info(u"".join(output)) |