aboutsummaryrefslogtreecommitdiffstats
path: root/resources/tools
diff options
context:
space:
mode:
authorpmikus <pmikus@cisco.com>2021-04-19 12:22:20 +0000
committerPeter Mikus <pmikus@cisco.com>2021-05-28 05:51:32 +0000
commitd255d2545ee6cdc871bc35314fad72c3c48b225b (patch)
treef4a0a6462ec9cc856829caa0641e87bcaf37cc4c /resources/tools
parent82863d5b8422b1b817d86bd6b1829a06a49feb02 (diff)
Framework: Telemetry retake
Signed-off-by: pmikus <pmikus@cisco.com> Change-Id: I2f019a083916aec9f7816266f6ad5b92dcc31fa0
Diffstat (limited to 'resources/tools')
-rw-r--r--resources/tools/telemetry/__init__.py12
-rwxr-xr-xresources/tools/telemetry/__main__.py49
-rw-r--r--resources/tools/telemetry/bundle_bpf.py112
-rw-r--r--resources/tools/telemetry/bundle_vpp.py505
-rw-r--r--resources/tools/telemetry/executor.py107
-rw-r--r--resources/tools/telemetry/metrics.py619
-rw-r--r--resources/tools/telemetry/parser.py107
-rw-r--r--resources/tools/telemetry/serializer.py110
8 files changed, 1621 insertions, 0 deletions
diff --git a/resources/tools/telemetry/__init__.py b/resources/tools/telemetry/__init__.py
new file mode 100644
index 0000000000..284a1ce972
--- /dev/null
+++ b/resources/tools/telemetry/__init__.py
@@ -0,0 +1,12 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/resources/tools/telemetry/__main__.py b/resources/tools/telemetry/__main__.py
new file mode 100755
index 0000000000..2ab87b661a
--- /dev/null
+++ b/resources/tools/telemetry/__main__.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Telemetry Exporter."""
+
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+
+from .executor import Executor
+
+def main():
+ """
+ Main entry function when called from cli
+ """
+ parser = ArgumentParser(
+ description=u"Telemetry Exporter.",
+ formatter_class=RawDescriptionHelpFormatter
+ )
+ parser.add_argument(
+ u"--config", required=True, type=str,
+ help=u"YAML configuration file."
+ )
+ parser.add_argument(
+ u"--hook", required=False, type=str,
+ help=u"Process ID or socket."
+ )
+ parser.add_argument(
+ u"--daemon", required=False, type=bool,
+ help=u"Run as daemon."
+ )
+ args = parser.parse_args()
+ if args.daemon:
+ Executor(args.config).execute_daemon(args.hook)
+ else:
+ Executor(args.config).execute(args.hook)
+
+if __name__ == u"__main__":
+ main()
diff --git a/resources/tools/telemetry/bundle_bpf.py b/resources/tools/telemetry/bundle_bpf.py
new file mode 100644
index 0000000000..77bc9acf91
--- /dev/null
+++ b/resources/tools/telemetry/bundle_bpf.py
@@ -0,0 +1,112 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""BPF performance bundle."""
+
+from logging import getLogger
+import sys
+
+from bcc import BPF
+
+
+class BundleBpf:
+ """
+ Creates a BPF object. This is the main object for defining a BPF program,
+ and interacting with its output.
+
+ Syntax: BPF({text=BPF_program | src_file=filename}
+ [, usdt_contexts=[USDT_object, ...]]
+ [, cflags=[arg1, ...]] [, debug=int]
+ )
+
+ Exactly one of text or src_file must be supplied (not both).
+ """
+ def __init__(self, program, serializer, hook):
+ """Initialize Bundle BPF Perf event class.
+
+ :param program: BPF C code.
+ :param serializer: Metric serializer.
+ :param hook: Process ID.
+ :type program: dict
+ :type serializer: Serializer
+ :type hook: int
+ """
+ self.obj = None
+ self.code = program[u"code"]
+ self.metrics = program[u"metrics"]
+ self.events = program[u"events"]
+ self.api_replies_list = list()
+ self.serializer = serializer
+ self.hook = hook
+
+ self.obj = BPF(text=self.code)
+
+ def attach(self, duration):
+ """
+ Attach events to BPF.
+
+ :param duration: Trial duration.
+ :type duration: int
+ """
+ try:
+ for event in self.events:
+ self.obj.attach_perf_event(
+ ev_type=event[u"type"],
+ ev_config=event[u"name"],
+ fn_name=event[u"target"],
+ sample_period=duration
+ )
+ except AttributeError:
+ getLogger(__name__).error(u"Cannot attach BPF events!")
+ sys.exit(1)
+
+ def detach(self):
+ """
+ Dettach events from BPF.
+ """
+ try:
+ for event in self.events:
+ self.obj.detach_perf_event(
+ ev_type=event[u"type"],
+ ev_config=event[u"name"]
+ )
+ except AttributeError:
+ getLogger(__name__).error(u"Cannot dettach BPF events!")
+ sys.exit(1)
+
+ def fetch_data(self):
+ """
+ Fetch data by invoking API calls to BPF.
+ """
+ self.serializer.create(metrics=self.metrics)
+ for _, metric_list in self.metrics.items():
+ for metric in metric_list:
+ for (key, val) in self.obj.get_table(metric[u"name"]).items():
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric[u"name"]
+ item[u"value"] = val.value
+ for label in metric[u"labels"]:
+ labels[label] = getattr(key, label)
+ item[u"labels"] = labels
+ self.api_replies_list.append(item)
+ getLogger(__name__).info(item)
+
+ def process_data(self):
+ """
+ Post process API replies.
+ """
+ for item in self.api_replies_list:
+ self.serializer.serialize(
+ metric=item[u"name"], labels=item[u"labels"], item=item
+ )
diff --git a/resources/tools/telemetry/bundle_vpp.py b/resources/tools/telemetry/bundle_vpp.py
new file mode 100644
index 0000000000..01526fe83f
--- /dev/null
+++ b/resources/tools/telemetry/bundle_vpp.py
@@ -0,0 +1,505 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""VPP execution bundle."""
+
+from copy import deepcopy
+from logging import getLogger
+from re import fullmatch, sub
+import struct
+import sys
+
+from vpp_papi.vpp_papi import VPPApiClient as vpp_class
+
+
+M_RUN_THREAD = (
+ r"Thread\s"
+ r"(?P<thread_id>\d+)\s"
+ r"(?P<thread_name>\S+)\s.*"
+ r"(?P<thread_lcore>\d+).*"
+)
+M_RUN_SEPARATOR = (
+ r"(-)+"
+)
+M_RUN_NODES = (
+ r"(?P<name>\S+)\s+"
+ r"(?P<state>\S+\s\S+|\S+)\s+"
+ r"(?P<calls>\d+)\s+"
+ r"(?P<vectors>\d+)\s+"
+ r"(?P<suspends>\d+)\s+"
+ r"(?P<clocks>\S+)\s+"
+ r"(?P<vectors_calls>\S+)"
+)
+M_RUN_TIME = (
+ r"Time\s\S+,\s\d+\ssec\sinternal\snode\svector\srate\s"
+ r"(?P<rate>\S+)\sloops/sec\s"
+ r"(?P<loops>\S+)"
+)
+M_INT_BEGIN = (
+ r"(?P<name>\S+)\s+"
+ r"(?P<index>\S+)\s+"
+ r"(?P<state>\S+)\s+"
+ r"(?P<mtu>\S+)\s+"
+ r"(?P<counter>\S+\s\S+|\S+)\s+"
+ r"(?P<count>\d+)"
+)
+M_INT_CONT = (
+ r"\s+"
+ r"(?P<counter>\S+\s\S+|\S+)\s+"
+ r"(?P<count>\d+)"
+)
+M_NODE_COUNTERS_THREAD = (
+ r"Thread\s"
+ r"(?P<thread_id>\d+)\s\("
+ r"(?P<thread_name>\S+)\):\s*"
+)
+M_NODE_COUNTERS = (
+ r"\s*"
+ r"(?P<count>\d+)\s+"
+ r"(?P<name>\S+)\s+"
+ r"(?P<reason>(\S+\s)+)\s+"
+ r"(?P<severity>\S+)\s+"
+ r"(?P<index>\d+)\s*"
+)
+M_PMB_CS_HEADER = (
+ r"\s*per-thread\s+context\s+switches.*"
+)
+M_PMB_CS = (
+ r"(?P<thread_name>\S+)\s+\("
+ r"(?P<thread_id>\S+)\)\s+\S+\s+"
+ r"(?P<context_switches>[\d\.]+)"
+)
+M_PMB_PF_HEADER = (
+ r"\s*per-thread\s+page\s+faults.*"
+)
+M_PMB_PF = (
+ r"(?P<thread_name>\S+)\s+\("
+ r"(?P<thread_id>\S+)\)\s+\S+\s+"
+ r"(?P<minor_page_faults>[\d\.]+)\s+"
+ r"(?P<major_page_faults>[\d\.]+)"
+)
+M_PMB_THREAD = (
+ r"\s*"
+ r"(?P<thread_name>\S+)\s+\("
+ r"(?P<thread_id>\d+)\)\s*"
+)
+M_PMB_IC_HEADER = (
+ r"\s*instructions/packet,\s+cycles/packet\s+and\s+IPC.*"
+)
+M_PMB_IC_NODE = (
+ r"\s*"
+ r"(?P<node_name>\S+)\s+"
+ r"(?P<calls>[\d\.]+)\s+"
+ r"(?P<packets>[\d\.]+)\s+"
+ r"(?P<packets_per_call>[\d\.]+)\s+"
+ r"(?P<clocks_per_packets>[\d\.]+)\s+"
+ r"(?P<instructions_per_packets>[\d\.]+)\s+"
+ r"(?P<ipc>[\d\.]+)"
+)
+M_PMB_CM_HEADER = (
+ r"\s*cache\s+hits\s+and\s+misses.*"
+)
+M_PMB_CM_NODE = (
+ r"\s*"
+ r"(?P<node_name>\S+)\s+"
+ r"(?P<l1_hit>[\d\.]+)\s+"
+ r"(?P<l1_miss>[\d\.]+)\s+"
+ r"(?P<l2_hit>[\d\.]+)\s+"
+ r"(?P<l2_miss>[\d\.]+)\s+"
+ r"(?P<l3_hit>[\d\.]+)\s+"
+ r"(?P<l3_miss>[\d\.]+)"
+)
+M_PMB_LO_HEADER = (
+ r"\s*load\s+operations.*"
+)
+M_PMB_LO_NODE = (
+ r"\s*"
+ r"(?P<node_name>\S+)\s+"
+ r"(?P<calls>[\d\.]+)\s+"
+ r"(?P<packets>[\d\.]+)\s+"
+ r"(?P<one>[\d\.]+)\s+"
+ r"(?P<two>[\d\.]+)\s+"
+ r"(?P<three>[\d\.]+)"
+)
+M_PMB_BM_HEADER = (
+ r"\s*Branches,\s+branches\s+taken\s+and\s+mis-predictions.*"
+)
+M_PMB_BM_NODE = (
+ r"\s*"
+ r"(?P<node_name>\S+)\s+"
+ r"(?P<branches_per_call>[\d\.]+)\s+"
+ r"(?P<branches_per_packet>[\d\.]+)\s+"
+ r"(?P<taken_per_call>[\d\.]+)\s+"
+ r"(?P<taken_per_packet>[\d\.]+)\s+"
+ r"(?P<mis_predictions>[\d\.]+)"
+)
+M_PMB_PL_HEADER = (
+ r"\s*Thread\s+power\s+licensing.*"
+)
+M_PMB_PL_NODE = (
+ r"\s*"
+ r"(?P<node_name>\S+)\s+"
+ r"(?P<lvl0>[\d\.]+)\s+"
+ r"(?P<lvl1>[\d\.]+)\s+"
+ r"(?P<lvl2>[\d\.]+)\s+"
+ r"(?P<throttle>[\d\.]+)"
+)
+M_PMB_MB_HEADER = (
+ r"\s*memory\s+reads\s+and\s+writes\s+per\s+memory\s+controller.*"
+)
+M_PMB_MB = (
+ r"\s*"
+ r"(?P<name>\S+)\s+"
+ r"(?P<runtime>[\d\.]+)\s+"
+ r"(?P<reads_mbs>[\d\.]+)\s+"
+ r"(?P<writes_mbs>[\d\.]+)\s+"
+ r"(?P<total_mbs>[\d\.]+)"
+)
+
+
+class BundleVpp:
+ """
+ Creates a VPP object. This is the main object for defining a VPP program,
+ and interacting with its output.
+ """
+ def __init__(self, program, serializer, hook):
+ """
+ Initialize Bundle VPP class.
+
+ :param program: VPP instructions.
+ :param serializer: Metric serializer.
+ :param hook: VPP API socket.
+ :type program: dict
+ :type serializer: Serializer
+ :type hook: int
+ """
+ self.obj = None
+ self.code = program[u"code"]
+ self.metrics = program[u"metrics"]
+ self.api_command_list = list()
+ self.api_replies_list = list()
+ self.serializer = serializer
+
+ vpp_class.apidir = u"/usr/share/vpp/api"
+ self.obj = vpp_class(
+ use_socket=True,
+ server_address=hook,
+ async_thread=False,
+ read_timeout=14,
+ logger=getLogger(__name__)
+ )
+
+ def attach(self, duration):
+ """
+ Attach events to VPP.
+
+ :param duration: Trial duration.
+ :type duration: int
+ """
+ try:
+ self.obj.connect(name=u"telemetry")
+ except (ConnectionRefusedError, OSError):
+ getLogger(__name__).error(u"Cannot connect to VPP!")
+ sys.exit(1)
+
+ for command in self.code.splitlines():
+ api_name = u"cli_inband"
+ api_args = dict(cmd=command.format(duration=duration))
+ self.api_command_list.append(
+ dict(api_name=api_name, api_args=deepcopy(api_args))
+ )
+
+ def detach(self):
+ """
+ Detach from VPP.
+ """
+ self.obj.disconnect()
+
+ def fetch_data(self):
+ """
+ Fetch data by invoking API calls to VPP socket.
+ """
+ for command in self.api_command_list:
+ try:
+ papi_fn = getattr(self.obj.api, command[u"api_name"])
+ getLogger(__name__).info(command[u"api_args"][u"cmd"])
+ replies = papi_fn(**command[u"api_args"])
+ except (AttributeError, IOError, struct.error) as err:
+ raise AssertionError(err)
+
+ if not isinstance(replies, list):
+ replies = [replies]
+ for reply in replies:
+ self.api_replies_list.append(reply)
+ reply = sub(r"\x1b[^m]*m", u"", reply.reply)
+ if reply:
+ getLogger(__name__).info(reply)
+ else:
+ getLogger(__name__).info(u"<no reply>")
+ self.serializer.create(metrics=self.metrics)
+
+ def process_data(self):
+ """
+ Post process command reply.
+ """
+ for command in zip(self.api_command_list, self.api_replies_list):
+ self_fn = command[0][u"api_args"][u"cmd"]
+ try:
+ self_fn = getattr(self, self_fn.replace(u" ", u"_"))
+ self_fn(command[1].reply)
+ except AttributeError:
+ pass
+
+ def show_interface(self, reply):
+ """
+ Parse the show interface output.
+
+ Output format:
+ {
+ "name": "rx_packets",
+ "labels": {
+ "name": "tap0",
+ "index": "0",
+ },
+ "value": "31",
+ },
+
+ :param reply: API reply.
+ :type reply: str
+ """
+ for line in reply.splitlines():
+ item = dict()
+ labels = dict()
+ if fullmatch(M_INT_BEGIN, line):
+ ifc = fullmatch(M_INT_BEGIN, line).groupdict()
+ metric = ifc[u"counter"].replace(" ", "_").replace("-", "_")
+ item[u"name"] = metric
+ item[u"value"] = ifc[u"count"]
+ if fullmatch(M_INT_CONT, line):
+ ifc_cnt = fullmatch(M_INT_CONT, line).groupdict()
+ metric = ifc_cnt[u"counter"].replace(" ", "_").replace("-", "_")
+ item[u"name"] = metric
+ item[u"value"] = ifc_cnt[u"count"]
+ if fullmatch(M_INT_BEGIN, line) or fullmatch(M_INT_CONT, line):
+ labels[u"name"] = ifc[u"name"]
+ labels[u"index"] = ifc[u"index"]
+ item[u"labels"] = labels
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ def show_runtime(self, reply):
+ """
+ Parse the show runtime output.
+
+ Output format:
+ {
+ "name": "clocks",
+ "labels": {
+ "name": "virtio-input",
+ "state": "polling",
+ "thread_name": "vpp_wk_1",
+ "thread_id": "2",
+ "thread_lcore": "3",
+ },
+ "value": "3.17e2",
+ },
+
+ :param reply: API reply.
+ :type reply: str
+ """
+ for line in reply.splitlines():
+ if fullmatch(M_RUN_THREAD, line):
+ thread = fullmatch(M_RUN_THREAD, line).groupdict()
+ if fullmatch(M_RUN_NODES, line):
+ nodes = fullmatch(M_RUN_NODES, line).groupdict()
+ for metric in self.serializer.metric_registry:
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"name"] = nodes[u"name"]
+ labels[u"state"] = nodes[u"state"]
+ try:
+ labels[u"thread_name"] = thread[u"thread_name"]
+ labels[u"thread_id"] = thread[u"thread_id"]
+ labels[u"thread_lcore"] = thread[u"thread_lcore"]
+ except UnboundLocalError:
+ labels[u"thread_name"] = u"vpp_main"
+ labels[u"thread_id"] = u"0"
+ labels[u"thread_lcore"] = u"0"
+ item[u"labels"] = labels
+ item[u"value"] = nodes[metric]
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ def show_node_counters_verbose(self, reply):
+ """
+ Parse the show node conuter output.
+
+ Output format:
+ {
+ "name": "node_counters",
+ "labels": {
+ "name": "dpdk-input",
+ "reason": "no_error",
+ "severity": "error",
+ "thread_name": "vpp_wk_1",
+ "thread_id": "2",
+ },
+ "value": "1",
+ },
+
+ :param reply: API reply.
+ :type reply: str
+ """
+ for line in reply.splitlines():
+ if fullmatch(M_NODE_COUNTERS_THREAD, line):
+ thread = fullmatch(M_NODE_COUNTERS_THREAD, line).groupdict()
+ if fullmatch(M_NODE_COUNTERS, line):
+ nodes = fullmatch(M_NODE_COUNTERS, line).groupdict()
+ for metric in self.serializer.metric_registry_registry:
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"name"] = nodes[u"name"]
+ labels[u"reason"] = nodes[u"reason"]
+ labels[u"severity"] = nodes[u"severity"]
+ try:
+ labels[u"thread_name"] = thread[u"thread_name"]
+ labels[u"thread_id"] = thread[u"thread_id"]
+ except UnboundLocalError:
+ labels[u"thread_name"] = u"vpp_main"
+ labels[u"thread_id"] = u"0"
+ item[u"labels"] = labels
+ item[u"value"] = nodes[u"count"]
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ def show_perfmon_statistics(self, reply):
+ """
+ Parse the permon output.
+
+ Output format:
+ {
+ "name": "clocks",
+ "labels": {
+ "name": "virtio-input",
+ "state": "polling",
+ "thread_name": "vpp_wk_1",
+ "thread_id": "2",
+ "thread_lcore": "3",
+ },
+ "value": "3.17e2",
+ },
+
+ :param reply: API reply.
+ :type reply: str
+ """
+ def perfmon_threads(reply, regex_threads):
+ for line in reply.splitlines():
+ if fullmatch(regex_threads, line):
+ threads = fullmatch(regex_threads, line).groupdict()
+ for metric in self.serializer.metric_registry:
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"name"] = threads[u"thread_name"]
+ labels[u"id"] = threads[u"thread_id"]
+ item[u"labels"] = labels
+ item[u"value"] = threads[metric]
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ def perfmon_nodes(reply, regex_threads, regex_nodes):
+ for line in reply.splitlines():
+ if fullmatch(regex_threads, line):
+ thread = fullmatch(regex_threads, line).groupdict()
+ if fullmatch(regex_nodes, line):
+ node = fullmatch(regex_nodes, line).groupdict()
+ for metric in self.serializer.metric_registry:
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"name"] = node[u"node_name"]
+ labels[u"thread_name"] = thread[u"thread_name"]
+ labels[u"thread_id"] = thread[u"thread_id"]
+ item[u"labels"] = labels
+ item[u"value"] = node[metric]
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ def perfmon_system(reply, regex_line):
+ for line in reply.splitlines():
+ if fullmatch(regex_line, line):
+ name = fullmatch(regex_line, line).groupdict()
+ for metric in self.serializer.metric_registry:
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"name"] = name[u"name"]
+ item[u"labels"] = labels
+ item[u"value"] = name[metric]
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
+
+ reply = sub(r"\x1b[^m]*m", u"", reply)
+
+ if fullmatch(M_PMB_CS_HEADER, reply.splitlines()[0]):
+ perfmon_threads(reply, M_PMB_CS)
+ if fullmatch(M_PMB_PF_HEADER, reply.splitlines()[0]):
+ perfmon_threads(reply, M_PMB_PF)
+ if fullmatch(M_PMB_IC_HEADER, reply.splitlines()[0]):
+ perfmon_nodes(reply, M_PMB_THREAD, M_PMB_IC_NODE)
+ if fullmatch(M_PMB_CM_HEADER, reply.splitlines()[0]):
+ perfmon_nodes(reply, M_PMB_THREAD, M_PMB_CM_NODE)
+ if fullmatch(M_PMB_LO_HEADER, reply.splitlines()[0]):
+ perfmon_nodes(reply, M_PMB_THREAD, M_PMB_LO_NODE)
+ if fullmatch(M_PMB_BM_HEADER, reply.splitlines()[0]):
+ perfmon_nodes(reply, M_PMB_THREAD, M_PMB_BM_NODE)
+ if fullmatch(M_PMB_PL_HEADER, reply.splitlines()[0]):
+ perfmon_nodes(reply, M_PMB_THREAD, M_PMB_PL_NODE)
+ if fullmatch(M_PMB_MB_HEADER, reply.splitlines()[0]):
+ perfmon_system(reply, M_PMB_MB)
+
+ def show_version(self, reply):
+ """
+ Parse the version output.
+
+ Output format:
+ {
+ "name": "version",
+ "labels": {
+ "version": "v21.06-rc0~596-g1ca6c65e5~b1065",
+ },
+ "value": 1.0,
+ },
+
+ :param reply: API reply.
+ :type reply: str
+ """
+ for metric in self.serializer.metric_registry:
+ version = reply.split()[1]
+ item = dict()
+ labels = dict()
+ item[u"name"] = metric
+ labels[u"version"] = version
+ item[u"labels"] = labels
+ item[u"value"] = 1.0
+ self.serializer.serialize(
+ metric=metric, labels=labels, item=item
+ )
diff --git a/resources/tools/telemetry/executor.py b/resources/tools/telemetry/executor.py
new file mode 100644
index 0000000000..75db4b6a40
--- /dev/null
+++ b/resources/tools/telemetry/executor.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Config executor library."""
+
+from importlib import import_module
+from logging.config import dictConfig
+from logging import getLogger
+import sys
+
+from .parser import Parser
+from .serializer import Serializer
+
+
+class Executor:
+ """
+ Executor class reponsible for executing configuration.
+ """
+ def __init__(self, configuration_file):
+ """
+ Config Executor init.
+
+ :param configuration_file: Telemetry configuration file path.
+ :type configuration_file: str
+ """
+ self.parser = Parser(configuration_file)
+ self.log = self.parser.config[u"logging"]
+ self.programs = self.parser.config[u"programs"]
+ self.scheduler = self.parser.config[u"scheduler"]
+
+ dictConfig(self.log)
+
+ def execute(self, hook=None):
+ """
+ Main executor function will run programs from all bundles in a loop.
+
+ Function call:
+ attach(duration)
+ fetch_data()
+ process_data()
+ detach()
+
+ :param hook: Process ID or socket to attach. None by default.
+ :type hook: int
+ """
+ for program in self.programs:
+ serializer = Serializer()
+ try:
+ package = program[u"name"]
+ name = f"telemetry.{package}"
+ package = package.replace("_", " ").title().replace(" ", "")
+ module = import_module(
+ name=name,
+ package=package
+ )
+ bundle = getattr(module, package)(
+ program=program,
+ serializer=serializer,
+ hook=hook
+ )
+ bundle.attach(duration=self.scheduler[u"duration"])
+ bundle.fetch_data()
+ bundle.process_data()
+ bundle.detach()
+ except (ImportError, AttributeError) as exc:
+ raise ExecutorError(
+ f"Error executing bundle {package!r}! - {exc}"
+ )
+ serializer.publish()
+
+ def execute_daemon(self, hook=None):
+ """
+ Daemon executor will execute endless loop.
+
+ :param hook: Process ID to attach. None by default.
+ :type hook: int
+ """
+ while True:
+ self.execute(hook=hook)
+
+
+class ExecutorError(Exception):
+ """
+ Creates a Executor Error Exception. This exception is supposed to handle
+ all the errors raised during executing.
+ """
+ def __init__(self, message):
+ """
+ Execute Error Excpetion init.
+
+ :param message: Exception error message.
+ :type message: str
+ """
+ super().__init__()
+ self.message = message
+ getLogger(__name__).error(message)
+ sys.exit(1)
diff --git a/resources/tools/telemetry/metrics.py b/resources/tools/telemetry/metrics.py
new file mode 100644
index 0000000000..e5a66b3e0c
--- /dev/null
+++ b/resources/tools/telemetry/metrics.py
@@ -0,0 +1,619 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Metric library."""
+
+from collections import namedtuple
+from threading import Lock
+from time import monotonic
+import re
+
+
+class Value:
+ """
+ A value storage protected by a mutex.
+ """
+ def __init__(self):
+ """
+ Initialize value to default and create a lock.
+ """
+ self._value = 0.0
+ self._lock = Lock()
+ self._timestamp = None
+
+ def inc(self, amount):
+ """
+ Increment value by amount under mutex.
+ Add a timestamp of capturing value.
+
+ :param amount: Amount of increment.
+ :type amount: int or float
+ """
+ with self._lock:
+ self._value += amount
+ self._timestamp = monotonic()
+
+ def set(self, value):
+ """
+ Set to a specific value under mutex.
+ Add a timestamp of capturing value.
+
+ :param value: Amount of increment.
+ :type value: int or float
+ """
+ with self._lock:
+ self._value = value
+ self._timestamp = monotonic()
+
+ def get(self):
+ """
+ Get a value under mutex.
+
+ :returns: Stored value.
+ :rtype: int or float
+ """
+ with self._lock:
+ return self._value
+
+ def get_timestamp(self):
+ """
+ Get a timestamp under mutex.
+
+ :returns: Stored timestamp.
+ :rtype: str
+ """
+ with self._lock:
+ return self._timestamp
+
+
+class Metric:
+ """
+ A single metric parent and its samples.
+ """
+ def __init__(self, name, documentation, typ):
+ """
+ Initialize class and do basic sanitize.
+
+ :param name: Full metric name.
+ :param documentation: Metric HELP string.
+ :param typ: Metric type [counter|gauge|info].
+ :type name: str
+ :type documentation: str
+ :type typ: str
+ """
+ self.metric_types = (
+ u"counter", u"gauge", u"info"
+ )
+ self.metric_sample = namedtuple(
+ u"Sample", [u"name", u"labels", u"value", u"timestamp"]
+ )
+
+ if not re.compile(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").match(name):
+ raise ValueError(f"Invalid metric name: {name}!")
+ if typ not in self.metric_types:
+ raise ValueError(f"Invalid metric type: {typ}!")
+
+ self.name = name
+ self.documentation = documentation
+ self.type = typ
+ self.samples = []
+
+ def add_sample(self, name, labels, value, timestamp):
+ """
+ Add a sample (entry) to the metric.
+
+ :param name: Full metric name.
+ :param labels: Metric labels.
+ :param value: Metric value.
+ :param timestamp: Timestamp. Default to be when accessed.
+ :type name: str
+ :type lables: tuple
+ :type value: int or float
+ :type timestamp: float
+ """
+ self.samples.append(
+ self.metric_sample(name, labels, value, timestamp)
+ )
+
+ def __eq__(self, other):
+ """
+ Check equality of added metric.
+
+ :param other: Metric to compare.
+ :type other: Metric
+ """
+ return (isinstance(other, Metric)
+ and self.name == other.name
+ and self.documentation == other.documentation
+ and self.type == other.type
+ and self.samples == other.samples)
+
+ def __repr__(self):
+ """
+ Represantation as a string for a debug print.
+ """
+ return (
+ f"Metric({self.name}, "
+ f"{self.documentation}, "
+ f"{self.type}, "
+ f"{self.samples})"
+ )
+
+
+class MetricBase:
+ """
+ Abstract class for Metric implementation.
+ """
+ _type = None
+
+ def __init__(
+ self, name, documentation, labelnames=(), namespace="",
+ subsystem="", labelvalues=None,
+ ):
+ """
+ Metric initialization.
+
+ :param name: Metric name.
+ :param documentation: Metric HELP string.
+ :param labelnames: Metric label list.
+ :param namespace: Metric namespace (will be added as prefix).
+ :param subsystem: Metric susbsystem (will be added as prefix).
+ :param labelvalues: Metric label values.
+ :type name: str
+ :type documentation: str
+ :type labelnames: list
+ :type namespace: str
+ :type subsystem: str
+ :type labelvalues: list
+ """
+ self._name = self.validate_name(name, namespace, subsystem)
+ self._labelnames = self.validate_labelnames(labelnames)
+ self._labelvalues = tuple(labelvalues or ())
+ self._documentation = documentation
+
+ if self._is_parent():
+ self._lock = Lock()
+ self._metrics = {}
+
+ if self._is_observable():
+ self._metric_init()
+
+ @staticmethod
+ def validate_name(name, namespace, subsystem):
+ """
+ Construct metric full name and validate naming convention.
+
+ :param name: Metric name.
+ :param namespace: Metric namespace (will be added as prefix).
+ :param subsystem: Metric susbsystem (will be added as prefix).
+ :type name: str
+ :type namespace: str
+ :type subsystem: str
+ :returns: Metric full name.
+ :rtype: str
+ :rasies ValueError: If name does not conform with naming conventions.
+ """
+ full_name = u""
+ full_name += f"{namespace}_" if namespace else u""
+ full_name += f"{subsystem}_" if subsystem else u""
+ full_name += name
+
+ if not re.compile(r"^[a-zA-Z_:][a-zA-Z0-9_:]*$").match(full_name):
+ raise ValueError(
+ f"Invalid metric name: {full_name}!"
+ )
+ return full_name
+
+ @staticmethod
+ def validate_labelnames(labelnames):
+ """
+ Create label tuple and validate naming convention.
+
+ :param labelnames: Metric label list.
+ :type labelnames: list
+ :returns: Label names.
+ :rtype: tuple
+ :rasies ValueError: If name does not conform with naming conventions.
+ """
+ labelnames = tuple(labelnames)
+ for label in labelnames:
+ if not re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$").match(label):
+ raise ValueError(f"Invalid label metric name: {label}!")
+ if re.compile(r"^__.*$").match(label):
+ raise ValueError(f"Reserved label metric name: {label}!")
+ return labelnames
+
+ def _is_observable(self):
+ """
+ Check whether this metric is observable, i.e.
+ * a metric without label names and values, or
+ * the child of a labelled metric.
+
+ :return: Observable
+ :rtype: bool
+ """
+ return not self._labelnames or (self._labelnames and self._labelvalues)
+
+ def _is_parent(self):
+ """
+ Check whether metric is parent, i.e.
+ * a metric with label names but not its values.
+
+ :return: Parent
+ :rtype: bool
+ """
+ return self._labelnames and not self._labelvalues
+
+ def _get_metric(self):
+ """
+ Returns metric that will handle samples.
+
+ :returns: Metric object.
+ :rtype: Metric
+ """
+ return Metric(self._name, self._documentation, self._type)
+
+ def describe(self):
+ """
+ Returns metric that will handle samples.
+
+ :returns: List of metric objects.
+ :rtype: list
+ """
+ return [self._get_metric()]
+
+ def collect(self):
+ """
+ Returns metric with samples.
+
+ :returns: List with metric object.
+ :rtype: list
+ """
+ metric = self._get_metric()
+ for suffix, labels, value, timestamp in self.samples():
+ metric.add_sample(self._name + suffix, labels, value, timestamp)
+ return [metric]
+
+ def labels(self, *labelvalues, **labelkwargs):
+ """
+ Return the child for the given labelset.
+
+ :param labelvalues: Label values.
+ :param labelkwargs: Dictionary with label names and values.
+ :type labelvalues: list
+ :type labelkwargs: dict
+ :returns: Metric with labels and values.
+ :rtype: Metric
+ :raises ValueError: If labels were not initialized.
+ :raises ValueError: If labels are already set (chaining).
+ :raises ValueError: If both parameters are passed.
+ :raises ValueError: If label values are not matching label names.
+ """
+ if not self._labelnames:
+ raise ValueError(
+ f"No label names were set when constructing {self}!"
+ )
+
+ if self._labelvalues:
+ raise ValueError(
+ f"{self} already has labels set; can not chain .labels() calls!"
+ )
+
+ if labelvalues and labelkwargs:
+ raise ValueError(
+ u"Can't pass both *args and **kwargs!"
+ )
+
+ if labelkwargs:
+ if sorted(labelkwargs) != sorted(self._labelnames):
+ raise ValueError(u"Incorrect label names!")
+ labelvalues = tuple(labelkwargs[l] for l in self._labelnames)
+ else:
+ if len(labelvalues) != len(self._labelnames):
+ raise ValueError(u"Incorrect label count!")
+ labelvalues = tuple(l for l in labelvalues)
+ with self._lock:
+ if labelvalues not in self._metrics:
+ self._metrics[labelvalues] = self.__class__(
+ self._name,
+ documentation=self._documentation,
+ labelnames=self._labelnames,
+ labelvalues=labelvalues
+ )
+ return self._metrics[labelvalues]
+
+ def samples(self):
+ """
+ Returns samples wheter an object is parent or child.
+
+ :returns: List of Metric objects with values.
+ :rtype: list
+ """
+ if self._is_parent():
+ return self._multi_samples()
+ return self._child_samples()
+
+ def _multi_samples(self):
+ """
+ Returns parent and its childs with its values.
+
+ :returns: List of Metric objects with values.
+ :rtype: list
+ """
+ with self._lock:
+ metrics = self._metrics.copy()
+ for labels, metric in metrics.items():
+ series_labels = list(zip(self._labelnames, labels))
+ for suffix, sample_labels, value, timestamp in metric.samples():
+ yield (
+ suffix, dict(series_labels + list(sample_labels.items())),
+ value, timestamp
+ )
+
+ def _child_samples(self):
+ """
+ Returns child with its values. Should be implemented by child class.
+
+ :raises NotImplementedError: If implementation in not in subclass.
+ """
+ raise NotImplementedError(
+ f"_child_samples() must be implemented by {self}!"
+ )
+
+ def _metric_init(self):
+ """
+ Initialize the metric object as a child.
+
+ :raises NotImplementedError: If implementation in not in subclass.
+ """
+ raise NotImplementedError(
+ f"_metric_init() must be implemented by {self}!"
+ )
+
+ def __str__(self):
+ """
+ String for a debug print.
+ """
+ return f"{self._type}:{self._name}"
+
+ def __repr__(self):
+ """
+ Represantation as a string for a debug print.
+ """
+ metric_type = type(self)
+ return f"{metric_type.__module__}.{metric_type.__name__}({self._name})"
+
+
+class Counter(MetricBase):
+ """
+ A Counter tracks counts of events or running totals.
+ """
+ _type = u"counter"
+
+ def __init__(self,
+ name,
+ documentation,
+ labelnames=(),
+ namespace=u"",
+ subsystem=u"",
+ labelvalues=None
+ ):
+ """
+ Initialize the Counter metric object.
+
+ :param name: Metric name.
+ :param documentation: Metric HELP string.
+ :param labelnames: Metric label list.
+ :param namespace: Metric namespace (will be added as prefix).
+ :param subsystem: Metric susbsystem (will be added as prefix).
+ :param labelvalues: Metric label values.
+ :type name: str
+ :type documentation: str
+ :type labelnames: list
+ :type namespace: str
+ :type subsystem: str
+ :type labelvalues: list
+ """
+ super(Counter, self).__init__(
+ name=name,
+ documentation=documentation,
+ labelnames=labelnames,
+ namespace=namespace,
+ subsystem=subsystem,
+ labelvalues=labelvalues,
+ )
+
+ def _metric_init(self):
+ """
+ Initialize counter value.
+ """
+ self._value = Value()
+
+ def inc(self, amount=1):
+ """
+ Increment counter by the given amount.
+
+ :param amount: Amount to increment.
+ :type amount: int or float
+ :raises ValueError: If amout is not positive.
+ """
+ if amount < 0:
+ raise ValueError(
+ u"Counters can only be incremented by non-negative amounts."
+ )
+ self._value.inc(amount)
+
+ def _child_samples(self):
+ """
+ Returns list of child samples.
+
+ :returns: List of child samples.
+ :rtype: tuple
+ """
+ return ((u"", {}, self._value.get(), self._value.get_timestamp()),)
+
+
+class Gauge(MetricBase):
+ """
+ Gauge metric, to report instantaneous values.
+ """
+ _type = u"gauge"
+
+ def __init__(self,
+ name,
+ documentation,
+ labelnames=(),
+ namespace=u"",
+ subsystem=u"",
+ labelvalues=None
+ ):
+ """
+ Initialize the Gauge metric object.
+
+ :param name: Metric name.
+ :param documentation: Metric HELP string.
+ :param labelnames: Metric label list.
+ :param namespace: Metric namespace (will be added as prefix).
+ :param subsystem: Metric susbsystem (will be added as prefix).
+ :param labelvalues: Metric label values.
+ :type name: str
+ :type documentation: str
+ :type labelnames: list
+ :type namespace: str
+ :type subsystem: str
+ :type labelvalues: list
+ """
+ super(Gauge, self).__init__(
+ name=name,
+ documentation=documentation,
+ labelnames=labelnames,
+ namespace=namespace,
+ subsystem=subsystem,
+ labelvalues=labelvalues,
+ )
+
+ def _metric_init(self):
+ """
+ Initialize gauge value.
+ """
+ self._value = Value()
+
+ def inc(self, amount=1):
+ """
+ Increment gauge by the given amount.
+
+ :param amount: Amount to increment.
+ :type amount: int or float
+ """
+ self._value.inc(amount)
+
+ def dec(self, amount=1):
+ """
+ Decrement gauge by the given amount.
+
+ :param amount: Amount to decrement.
+ :type amount: int or float
+ """
+ self._value.inc(-amount)
+
+ def set(self, value):
+ """
+ Set gauge to the given value.
+
+ :param amount: Value to set.
+ :type amount: int or float
+ """
+ self._value.set(float(value))
+
+ def _child_samples(self):
+ """
+ Returns list of child samples.
+
+ :returns: List of child samples.
+ :rtype: tuple
+ """
+ return ((u"", {}, self._value.get(), self._value.get_timestamp()),)
+
+
+class Info(MetricBase):
+ """
+ Info metric, key-value pairs.
+ """
+ _type = u"info"
+
+ def __init__(self,
+ name,
+ documentation,
+ labelnames=(),
+ namespace=u"",
+ subsystem=u"",
+ labelvalues=None
+ ):
+ """
+ Initialize the Info metric object.
+
+ :param name: Metric name.
+ :param documentation: Metric HELP string.
+ :param labelnames: Metric label list.
+ :param namespace: Metric namespace (will be added as prefix).
+ :param subsystem: Metric susbsystem (will be added as prefix).
+ :param labelvalues: Metric label values.
+ :type name: str
+ :type documentation: str
+ :type labelnames: list
+ :type namespace: str
+ :type subsystem: str
+ :type labelvalues: list
+ """
+ super(Info, self).__init__(
+ name=name,
+ documentation=documentation,
+ labelnames=labelnames,
+ namespace=namespace,
+ subsystem=subsystem,
+ labelvalues=labelvalues,
+ )
+
+ def _metric_init(self):
+ """
+ Initialize gauge value and time it was created.
+ """
+ self._labelname_set = set(self._labelnames)
+ self._lock = Lock()
+ self._value = {}
+
+ def info(self, value):
+ """
+ Set info to the given value.
+
+ :param amount: Value to set.
+ :type amount: int or float
+ :raises ValueError: If lables are overlapping.
+ """
+ if self._labelname_set.intersection(value.keys()):
+ raise ValueError(
+ u"Overlapping labels for Info metric, "
+ f"metric: {self._labelnames} child: {value}!"
+ )
+ with self._lock:
+ self._value = dict(value)
+
+ def _child_samples(self):
+ """
+ Returns list of child samples.
+
+ :returns: List of child samples.
+ :rtype: tuple
+ """
+ with self._lock:
+ return ((u"_info", self._value, 1.0, monotonic()),)
diff --git a/resources/tools/telemetry/parser.py b/resources/tools/telemetry/parser.py
new file mode 100644
index 0000000000..c6cc167066
--- /dev/null
+++ b/resources/tools/telemetry/parser.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Configuration parsing library."""
+
+from logging import getLogger
+from pathlib import Path
+import sys
+
+from yaml import safe_load, YAMLError
+
+
+class Parser:
+ """
+ Parser class reponsible for loading configuration.
+ """
+ def __init__(self, configuration_file):
+ """
+ Config Parser init.
+
+ :param configuration_file: Telemetry configuration file path.
+ :type configuration_file: str
+ """
+ self.instance = None
+ self.config = None
+ self.suffix = Path(configuration_file).suffix[1:].capitalize()
+
+ try:
+ self.instance = globals()[self.suffix+"Loader"](configuration_file)
+ except KeyError:
+ raise ParserError(u"Unsupported file format!")
+
+ self.config = FileLoader(self.instance).load()
+
+
+class FileLoader:
+ """
+ Creates a File Loader object. This is the main object for interacting
+ with configuration file.
+ """
+ def __init__(self, loader):
+ """
+ File Loader class init.
+
+ :param loader: Loader object responsible for handling file type.
+ :type loader: obj
+ """
+ self.loader = loader
+
+ def load(self):
+ """
+ File format parser.
+ """
+ return self.loader.load()
+
+
+class YamlLoader:
+ """
+ Creates a YAML Loader object. This is the main object for interacting
+ with YAML file.
+ """
+ def __init__(self, configuration_file):
+ """
+ YAML Loader class init.
+
+ :param configuration_file: YAML configuration file path.
+ :type configuration_file: str
+ """
+ self.configuration_file = configuration_file
+
+ def load(self):
+ """
+ YAML format parser.
+ """
+ with open(self.configuration_file, u"r") as stream:
+ try:
+ return safe_load(stream)
+ except YAMLError as exc:
+ raise ParserError(str(exc))
+
+
+class ParserError(Exception):
+ """
+ Creates a Parser Error Exception. This exception is supposed to handle
+ all the errors raised during processing.
+ """
+ def __init__(self, message):
+ """
+ Parser Error Excpetion init.
+
+ :param message: Exception error message.
+ :type message: str
+ """
+ super().__init__()
+ self.message = message
+ getLogger(__name__).error(self.message)
+ sys.exit(1)
diff --git a/resources/tools/telemetry/serializer.py b/resources/tools/telemetry/serializer.py
new file mode 100644
index 0000000000..e8315c832d
--- /dev/null
+++ b/resources/tools/telemetry/serializer.py
@@ -0,0 +1,110 @@
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Config executor library."""
+
+from importlib import import_module
+from logging import getLogger
+
+
+class Serializer:
+ """
+ Executor class reponsible for executing configuration.
+ """
+ def __init__(self):
+ """
+ Config Executor init.=
+ """
+ self.metric_registry = dict()
+
+ def create(self, metrics):
+ """
+ Create metrics based on input configuration.
+
+ :param metrics: Metric list to create.
+ :type metrics: list
+ """
+ for metric_type, metric_list in metrics.items():
+ for metric in metric_list:
+ module = import_module(
+ name=u"telemetry.metrics", package=metric_type.capitalize()
+ )
+ self.metric_registry[metric[u"name"]] = getattr(
+ module, metric_type.capitalize()
+ )(**metric)
+
+ def serialize(self, metric, labels, item):
+ """
+ Serialize metric into destination format.
+
+ :param metrics: Metric name.
+ :param labels: Metric labels.
+ :param item: Metric dict.
+ :type metrics: str
+ :type labels: dict
+ :type item: dict
+ """
+ if type(self.metric_registry[metric]).__name__ == u"Counter":
+ self.metric_registry[metric].labels(**labels).inc(
+ float(item[u"value"])
+ )
+ if type(self.metric_registry[metric]).__name__ == u"Gauge":
+ self.metric_registry[metric].labels(**labels).set(
+ float(item[u"value"])
+ )
+ if type(self.metric_registry[metric]).__name__ == u"Info":
+ self.metric_registry[metric].labels(**labels).info(
+ item[u"value"]
+ )
+
+ def publish(self):
+ """
+ Publish metric into logger.
+ """
+ output = []
+ for _, metric_list in self.metric_registry.items():
+ for metric in metric_list.collect():
+ mname = metric.name
+ mtype = metric.type
+
+ # Adjust from OpenMetrics into Prometheus format.
+ mname = f"{mname}_total" if mtype == u"counter" else mname
+ mname = f"{mname}_info" if mtype == u"info" else mname
+ if mtype in (u"info", u"stateset"):
+ mtype = u"gauge"
+ if mtype in (u"gaugehistogram", u"histogram"):
+ mtype = u"histogram"
+
+ mdocumentation = metric.documentation.replace(u"\\", r"\\")
+ mdocumentation = mdocumentation.replace(u"\n", r"\n")
+ output.append(f"# HELP {mname} {mdocumentation}\n")
+ output.append(f"# TYPE {mname} {mtype}\n")
+
+ for line in metric.samples:
+ if line.labels:
+ llabel = []
+ for k, value in sorted(line.labels.items()):
+ value = value.replace(u"\\", r"\\")
+ value = value.replace(u"\n", r"\n")
+ value = value.replace(u'"', r'\"')
+ llabel.append(f'{k}="{value}"')
+ labelstr = f"{{{','.join(llabel)}}}"
+ else:
+ labelstr = u""
+
+ timestamp = f" {int(float(line.timestamp) * 1000):d}" \
+ if line.timestamp else u""
+ output.append(
+ f"{line.name}{labelstr} {line.value}{timestamp}\n"
+ )
+ getLogger(u"prom").info(u"".join(output))