diff options
Diffstat (limited to 'netmodel/interfaces/vpp/__init__.py')
-rw-r--r-- | netmodel/interfaces/vpp/__init__.py | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/netmodel/interfaces/vpp/__init__.py b/netmodel/interfaces/vpp/__init__.py new file mode 100644 index 00000000..34d106fd --- /dev/null +++ b/netmodel/interfaces/vpp/__init__.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 + +# Dependency: vpp-api-python + +import asyncio +import asyncio.subprocess +import collections +import copy +import logging +import pyparsing as pp +import socket +import time + +from netmodel.model.attribute import Attribute +from netmodel.model.filter import Filter +from netmodel.model.query import Query, ACTION2STR, ACTION_UPDATE +from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE +from netmodel.model.object import Object +from netmodel.model.type import Double, String +from netmodel.network.interface import Interface as BaseInterface +from netmodel.network.packet import Packet + +log = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 1 # s +KEY_FIELD = 'device_name' + +def parse(s): + kw_name = pp.Keyword('Name') + kw_idx = pp.Keyword('Idx') + kw_state = pp.Keyword('State') + kw_counter = pp.Keyword('Counter') + kw_count = pp.Keyword('Count') + + kw_up = pp.CaselessKeyword('up') + kw_down = pp.CaselessKeyword('down') + kw_rx_packets = pp.CaselessKeyword('rx packets') + kw_rx_bytes = pp.CaselessKeyword('rx bytes') + kw_tx_packets = pp.CaselessKeyword('tx packets') + kw_tx_bytes = pp.CaselessKeyword('tx bytes') + kw_drops = pp.CaselessKeyword('drops') + kw_ip4 = pp.CaselessKeyword('ip4') + kw_ip6 = pp.CaselessKeyword('ip6') + kw_tx_error = pp.CaselessKeyword('tx-error') + kw_rx_miss = pp.CaselessKeyword('rx-miss') + + header = kw_name + kw_idx + kw_state + kw_counter + kw_count + + interface = (pp.Word(pp.alphanums + '/' + '-').setResultsName('device_name') + \ + pp.Word(pp.nums).setResultsName('index') + \ + pp.oneOf(['up', 'down']).setResultsName('state') + \ + pp.Optional(kw_rx_packets + pp.Word(pp.nums).setResultsName('rx_packets')) + \ + pp.Optional(kw_rx_bytes + pp.Word(pp.nums).setResultsName('rx_bytes')) + \ + pp.Optional(kw_tx_packets + pp.Word(pp.nums).setResultsName('tx_packets')) + \ + pp.Optional(kw_tx_bytes + pp.Word(pp.nums).setResultsName('tx_bytes')) + \ + pp.Optional(kw_drops + pp.Word(pp.nums).setResultsName('drops')) + \ + pp.Optional(kw_ip4 + pp.Word(pp.nums).setResultsName('ip4')) + \ + pp.Optional(kw_ip6 + pp.Word(pp.nums).setResultsName('ip6')) + \ + pp.Optional(kw_rx_miss + pp.Word(pp.nums).setResultsName('rx_miss')) + \ + pp.Optional(kw_tx_error + pp.Word(pp.nums).setResultsName('tx_error')) + ).setParseAction(lambda t: t.asDict()) + + bnf = ( + header.suppress() + + pp.OneOrMore(interface) + ).setParseAction(lambda t: t.asList()) + + return bnf.parseString(s, parseAll = True).asList() + +class VPPInterface(Object): + __type__ = 'vpp_interface' + + node = Attribute(String) + device_name = Attribute(String) + bw_upstream = Attribute(Double) # bytes + bw_downstream = Attribute(Double) # bytes + +class VPPCtlInterface(BaseInterface): + __interface__ = 'vppctl' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.register_object(VPPInterface) + + # Set of monitored interfaces + self._interfaces = collections.defaultdict(int) + self._running = False + + # interface -> (time, rx, tx) + self._last = dict() + + async def _tick(self): + while self._running: + try: + create = asyncio.create_subprocess_exec( + 'vppctl_wrapper', 'show', 'int', + stdout=asyncio.subprocess.PIPE, + ) + proc = await create + await proc.wait() + stdout = await proc.stdout.read() + + if proc.returncode: + log.error("error") + return + + interfaces = parse(stdout.decode()) + last = copy.copy(self._last) + self._last = dict() + now = time.time() + for interface in interfaces: + if not interface['device_name'] in self._interfaces: + continue + tx = float(interface['tx_bytes']) + rx = float(interface['rx_bytes']) + self._last[interface['device_name']] = (now, rx, tx) + + if not interface['device_name'] in last: + continue + prev_now, prev_rx, prev_tx = last[interface['device_name']] + + # Per interface throughput computation + ret = { + 'node' : socket.gethostname(), + 'device_name' : interface['device_name'], + 'bw_upstream' : (tx - prev_tx) / (now - prev_now), + 'bw_downstream' : (rx - prev_rx) / (now - prev_now), + } + + f_list = [[KEY_FIELD, '==', interface['device_name']]] + del interface['device_name'] + query = Query(ACTION_UPDATE, VPPInterface.__type__, + filter = Filter.from_list(f_list), + params = ret) + self.receive(Packet.from_query(query, reply = True)) + except Exception as e: + import traceback; traceback.print_exc() + log.error("Could not perform measurement {}".format(e)) + + await asyncio.sleep(DEFAULT_INTERVAL) + + #-------------------------------------------------------------------------- + # Router interface + #-------------------------------------------------------------------------- + + def send_impl(self, packet): + query = packet.to_query() + + if query.action not in (ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE): + log.warning("Ignore unknown action {}".format( + ACTION2STR[query.action])) + return + + # We currently simply extract it from the filter + interfaces = set([p.value for p in query.filter if p.key == KEY_FIELD]) + + for interface in interfaces: + if query.action == ACTION_SUBSCRIBE: + self._interfaces[interface] += 1 + else: + self._interfaces[interface] -= 1 + + all_interfaces = set([k for k, v in self._interfaces.items() if v > 0]) + + if all_interfaces and not self._running: + self._running = True + asyncio.ensure_future(self._tick()) + elif not all_interfaces and self._running: + self._running = False + + +#------------------------------------------------------------------------------- + +if __name__ == '__main__': + x=""" Name Idx State Counter Count + TenGigabitEthernetc/0/1 1 up rx packets 3511586 + rx bytes 4785592030 + tx packets 3511678 + tx bytes 313021701 + drops 7 + ip4 161538 + ip6 3350047 + tx-error 2 + host-bh1 4 up rx packets 5 + rx bytes 394 + tx packets 10 + tx bytes 860 + drops 4 + ip6 4 + host-bh2 6 up rx packets 3164301 + rx bytes 287315869 + tx packets 3164238 + tx bytes 4290944332 + drops 4 + ip4 161539 + ip6 3002759 + host-bh3 7 up rx packets 33066 + rx bytes 2446928 + tx packets 33060 + tx bytes 47058708 + drops 5 + ip6 33065 + host-bh4 5 up rx packets 114407 + rx bytes 8466166 + tx packets 114412 + tx bytes 162905294 + drops 7 + ip6 114406 + host-bh5 3 up rx packets 150574 + rx bytes 11142524 + tx packets 150578 + tx bytes 214407016 + drops 7 + ip6 150573 + host-bh6 2 up rx packets 49380 + rx bytes 3654160 + tx packets 49368 + tx bytes 70283976 + drops 9 + ip6 49377 + local0 0 down drops 3 + """ + + r = parse(x) + print(r) |