1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
# Copyright (c) 2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""BPF performance bundle."""
from logging import getLogger
import sys
from bcc import BPF
from .constants import Constants
class BundleBpf:
"""
Creates a BPF object. This is the main object for defining a BPF program,
and interacting with its output.
Syntax: BPF({text=BPF_program | src_file=filename}
[, usdt_contexts=[USDT_object, ...]]
[, cflags=[arg1, ...]] [, debug=int]
)
Exactly one of text or src_file must be supplied (not both).
"""
def __init__(self, program, serializer, hook):
"""Initialize Bundle BPF Perf event class.
:param program: BPF C code.
:param serializer: Metric serializer.
:param hook: Process ID.
:type program: dict
:type serializer: Serializer
:type hook: int
"""
self.obj = None
self.code = program[u"code"]
self.metrics = program[u"metrics"]
self.events = program[u"events"]
self.api_replies_list = list()
self.serializer = serializer
self.hook = hook
self.obj = BPF(text=self.code)
def attach(self, sample_period):
"""
Attach events to BPF.
:param sample_period: A "sampling" event is one that generates
an overflow notification every N events, where N is given by
sample_period.
:type sample_period: int
"""
try:
for event in self.events:
self.obj.attach_perf_event(
ev_type=event[u"type"],
ev_config=event[u"name"],
fn_name=event[u"target"],
sample_period=sample_period
)
except AttributeError:
getLogger("console_stderr").error(f"Could not attach BPF event: "
f"{event[u'name']}")
sys.exit(Constants.err_linux_attach)
def detach(self):
"""
Detach events from BPF.
"""
try:
for event in self.events:
self.obj.detach_perf_event(
ev_type=event[u"type"],
ev_config=event[u"name"]
)
except AttributeError:
getLogger("console_stderr").error(u"Could not detach BPF events!")
sys.exit(Constants.err_linux_detach)
def fetch_data(self):
"""
Fetch data by invoking API calls to BPF.
"""
self.serializer.create(metrics=self.metrics)
max_len = {"cpu": 3, "pid": 3, "name": 4, "value": 5}
text = ""
table_name = ""
item_list = []
for _, metric_list in self.metrics.items():
for metric in metric_list:
if table_name != metric[u"name"]:
table_name = metric[u"name"]
text += f"{table_name}\n"
for (key, val) in self.obj.get_table(metric[u"name"]).items():
item = dict()
labels = dict()
item[u"name"] = metric[u"name"]
item[u"value"] = val.value
for label in metric[u"labelnames"]:
labels[label] = getattr(key, label)
item[u"labels"] = labels
item[u'labels'][u'name'] = \
item[u'labels'][u'name'].decode(u'utf-8')
if item[u"labels"][u"name"] == u"python3":
continue
if len(str(item[u'labels'][u'cpu'])) > max_len["cpu"]:
max_len["cpu"] = len(str(item[u'labels'][u'cpu']))
if len(str(item[u'labels'][u'pid'])) > max_len[u"pid"]:
max_len[u"pid"] = len(str(item[u'labels'][u'pid']))
if len(str(item[u'labels'][u'name'])) > max_len[u"name"]:
max_len[u"name"] = len(str(item[u'labels'][u'name']))
if len(str(item[u'value'])) > max_len[u"value"]:
max_len[u"value"] = len(str(item[u'value']))
self.api_replies_list.append(item)
item_list.append(item)
item_list = sorted(item_list, key=lambda x: x['labels']['cpu'])
item_list = sorted(item_list, key=lambda x: x['name'])
for itl in item_list:
if table_name != itl[u"name"]:
table_name = itl[u"name"]
text += f"\n==={table_name}===\n" \
f"cpu {u' ' * (max_len[u'cpu'] - 3)} " \
f"pid {u' ' * (max_len[u'pid'] - 3)} " \
f"name {u' ' * (max_len[u'name'] - 4)} " \
f"value {u' ' * (max_len[u'value'] - 5)}\n"
text += (
f"""{str(itl[u'labels'][u'cpu']) + u' ' *
(max_len[u"cpu"] - len(str(itl[u'labels'][u'cpu'])))} """
f"""{str(itl[u'labels'][u'pid']) + u' ' *
(max_len[u"pid"] - len(str(itl[u'labels'][u'pid'])))} """
f"""{str(itl[u'labels'][u'name']) + u' ' *
(max_len[u"name"] - len(str(itl[u'labels'][u'name'])))} """
f"""{str(itl[u'value']) + u' ' *
(max_len[u"value"] - len(str(itl[u'value'])))}\n""")
getLogger(u"console_stdout").info(text)
def process_data(self):
"""
Post process API replies.
"""
for item in self.api_replies_list:
self.serializer.serialize(
metric=item[u"name"], labels=item[u"labels"], item=item
)
|