diff options
Diffstat (limited to 'netmodel/network')
-rw-r--r-- | netmodel/network/__init__.py | 0 | ||||
-rw-r--r-- | netmodel/network/fib.py | 63 | ||||
-rw-r--r-- | netmodel/network/flow.py | 57 | ||||
-rw-r--r-- | netmodel/network/flow_table.py | 150 | ||||
-rw-r--r-- | netmodel/network/interface.py | 281 | ||||
-rw-r--r-- | netmodel/network/packet.py | 429 | ||||
-rw-r--r-- | netmodel/network/prefix.py | 37 | ||||
-rw-r--r-- | netmodel/network/router.py | 257 |
8 files changed, 1274 insertions, 0 deletions
diff --git a/netmodel/network/__init__.py b/netmodel/network/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/netmodel/network/__init__.py diff --git a/netmodel/network/fib.py b/netmodel/network/fib.py new file mode 100644 index 00000000..e6b81607 --- /dev/null +++ b/netmodel/network/fib.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class FIBEntry: + def __init__(self, prefix, next_hops = None): + if next_hops is None: + next_hops = set() + + self._prefix = prefix + self._next_hops = next_hops + + def update(self, next_hops = None): + if not next_hops: + return + self._next_hops |= next_hops + + def remove(self, next_hops = None): + if not next_hops: + return + self._next_hops &= next_hops + +class FIB: + def __init__(self): + self._entries = dict() + + def add(self, prefix, next_hops = None): + self._entries[prefix] = FIBEntry(prefix, next_hops) + + def update(self, prefix, next_hops = None): + entry = self._entries.get(prefix) + if not entry: + raise Exception('prefix not found') + entry.update(next_hops) + + def remove(self, prefix, next_hops = None): + if next_hop: + entry = self._entries.get(prefix) + if not entry: + raise Exception('prefix not found') + entry.remove(next_hops) + return + + del self._entries[prefix] + + def get(self, object_name): + for entry in self._entries.values(): + if entry._prefix.object_name == object_name: + return next(iter(entry._next_hops)) diff --git a/netmodel/network/flow.py b/netmodel/network/flow.py new file mode 100644 index 00000000..bce4512c --- /dev/null +++ b/netmodel/network/flow.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class Flow: + def __init__(self, source, destination): + """ + Constructor. + Args: + source: The source Prefix of this Flow. + destination: The destination Prefix of this Flow. + """ + self.source = source + self.destination = destination + + def get_reverse(self): + """ + Make the reverse Flow of this Flow. + Returns: + The reverse Flow. + """ + return Flow(self.destination, self.source) + + def __eq__(self, other): + """ + Tests whether two Flows are equal or not. + Args: + other: A Flow instance. + Returns: + True iif self == other. + """ + if self.source and other.source and self.source != other.source: + return False + if self.destination and other.destination and self.destination != other.destination: + return False + return True + + def __hash__(self): + # Order is important + return hash((self.source, self.destination)) + + def __repr__(self): + return "<Flow %s -> %s>" % (self.source, self.destination) diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py new file mode 100644 index 00000000..86c6e52e --- /dev/null +++ b/netmodel/network/flow_table.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE + +# Per-interface flow table +class SubFlowTable: + def __init__(self, interface): + # Interface to which this flow table is associated + self._interface = interface + + # Flow -> ingress interface + self._flows = dict() + + # Ingress interface -> list of subscription + self._subscriptions = dict() + + def add(self, packet, ingress_interface): + flow = packet.get_flow() + self._flows[flow] = ingress_interface + + def match(self, packet): + flow = packet.get_flow() + return self._flows.get(flow.get_reverse()) + +class Subscription: + def __init__(self, packet, ingress_list, egress_list): + """ + Args: + packet : subscription packet + ingress_list (List[Interface]) : list of ingress interface + egress_list (List[Interface]) : list of egress interface + """ + self._packet = packet + self._ingress_list = ingress_list + self._egress_list = egress_list + +class FlowTable: + """ + The flow table managed flow association between packets, as well as the + subscription state for each flow. + + Event management + ================ + + The flow table has to be notified from netmodel.network.interface becomes + up, and when it is deleted. This is handled through the following events: + + _on_interface_up + . resend any pending subscription on the interface + _on_interface_deleted + . delete any subscription + """ + + def __init__(self): + # Per interface (sub) flow tables + # flow are added when forwarded using FIB + # matched upon returning + self._sub_flow_tables = dict() + + # The Flow Table also maintains a list of subscriptions doubly indexed + # by both subscriptors, and egress interfaces + + # ingress_interface -> list of subscriptions + self._ingress_subscriptions = dict() + + # egress_interface -> list of subscriptions + self._egress_subscriptions = dict() + + + def match(self, packet, interface): + """ + Check whether the packet arriving on interface is a reply. + + Returns: + interface that originally requested the packet + None if not found + """ + sub_flow_table = self._sub_flow_tables.get(interface) + if not sub_flow_table: + return None + return sub_flow_table.match(packet) + + def add(self, packet, ingress_interface, interface): + sub_flow_table = self._sub_flow_tables.get(interface) + if not sub_flow_table: + sub_flow_table = SubFlowTable(interface) + self._sub_flow_tables[interface] = sub_flow_table + sub_flow_table.add(packet, ingress_interface) + + # If the flow is a subscription, we need to associate it to the list + query = packet.to_query() + if query.action == ACTION_SUBSCRIBE: + print('adding subscription', query.to_dict()) + # XXX we currently don't merge subscriptions, and assume a single + # next hop interface + s = Subscription(packet, [ingress_interface], [interface]) + + if ingress_interface: + if not ingress_interface in self._ingress_subscriptions: + self._ingress_subscriptions[ingress_interface] = list() + self._ingress_subscriptions[ingress_interface].append(s) + + if not interface in self._egress_subscriptions: + self._egress_subscriptions[interface] = list() + self._egress_subscriptions[interface].append(s) + + elif query.action == ACTION_UNSUBSCRIBE: + raise NotImplementedError + + + # Events + + def _on_interface_up(self, interface): + """ + Callback: an interface gets back up after downtime. + + Resend all pending subscriptions when an interface comes back up. + """ + subscriptions = self._egress_subscriptions.get(interface) + if not subscriptions: + return + for s in subscriptions: + interface.send(s._packet) + + def _on_interface_delete(self, interface): + """ + Callback: an interface has been deleted + + Cancel all subscriptions that have been issues from + netmodel.network.interface. + Remove all pointers to subscriptions pending on this interface + """ + if interface in self._ingress_subscriptions: + del self._ingress_subscriptions[interface] diff --git a/netmodel/network/interface.py b/netmodel/network/interface.py new file mode 100644 index 00000000..c9e31422 --- /dev/null +++ b/netmodel/network/interface.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import enum +import inspect +import logging +import pkgutil +import sys +import traceback + +import netmodel.interfaces as interfaces +from netmodel.network.packet import Packet +from netmodel.network.prefix import Prefix + +# Tag identifying an interface name +INTERFACE_NAME_PROPERTY = '__interface__' + +RECONNECTION_DELAY = 10 + +log = logging.getLogger(__name__) + +class InterfaceState(enum.Enum): + Up = 'up' + Down = 'down' + PendingUp = 'up (pending)' + PendingDown = 'down (pending)' + Error = 'error' + +def register_interfaces(): + Interface._factory = dict() + for loader, module_name, is_pkg in pkgutil.walk_packages(interfaces.__path__, + interfaces.__name__ + '.'): + # Note: we cannot skip files using is_pkg otherwise it will ignore + # interfaces defined outside of __init__.py + #if not is_pkg: + # continue + try: + module = loader.find_module(module_name).load_module(module_name) + for _, obj in inspect.getmembers(module): + if not inspect.isclass(obj): + continue + if not issubclass(obj, Interface): + continue + if obj is Interface: + continue + if not hasattr(obj, INTERFACE_NAME_PROPERTY): + log.warning('Ignored interface' + module_name + \ + 'with no ' + INTERFACE_NAME_PROPERTY + ' property') + continue + name = getattr(obj, INTERFACE_NAME_PROPERTY) + Interface._factory[name] = obj + + except ImportError as e: + log.warning('Interface {} automatically disabled. ' \ + 'Please install dependencies if you wish to use it: {}'\ + .format(module_name, e)) + + except Exception as e: + log.warning('Failed to load interface {}: {}'.format( + module_name, e)) + +#------------------------------------------------------------------------------ + +class Interface: + @classmethod + def get(cls, name): + if not hasattr(cls, '_factory'): + register_interfaces() + return cls._factory.get(name, None) + + STATE_DOWN = 0 + STATE_PENDING_UP = 1 + STATE_UP = 2 + STATE_PENDING_DOWN = 3 + + def __init__(self, *args, **kwargs): + self._callback = kwargs.pop('callback', None) + self._hook = kwargs.pop('hook', None) + + self._tx_buffer = list() + self._state = InterfaceState.Down + self._error = None + self._reconnecting = True + self._reconnection_delay = RECONNECTION_DELAY + + self._registered_objects = dict() + + # Callbacks + self._up_callbacks = list() + self._down_callbacks = list() + self._spawn_callbacks = list() + self._delete_callbacks = list() + + # Set upon registration + self._name = None + + def terminate(self): + self.set_state(InterfaceState.PendingDown) + + def __repr__(self): + return "<Interface %s>" % (self.__class__.__name__) + + def __hash__(self): + return hash(self._name) + + #--------------------------------------------------------------------------- + + def register_object(self, obj): + self._registered_objects[obj.__type__] = obj + + def get_prefixes(self): + return [ Prefix(v.__type__) for v in self._registered_objects.values() ] + + #--------------------------------------------------------------------------- + # State management, callbacks + #--------------------------------------------------------------------------- + + def set_state(self, state): + asyncio.ensure_future(self._set_state(state)) + + async def _set_state(self, state): + self._state = state + + if state == InterfaceState.PendingUp: + await self.pending_up_impl() + elif state == InterfaceState.PendingDown: + await self.pending_down_impl() + elif state == InterfaceState.Error: + pass + elif state == InterfaceState.Up: + log.info("Interface {} : new state UP.".format(self.__interface__,)) + if self._tx_buffer: + log.info("Platform %s: sending %d buffered packets." % + (self.__interface__, len(self._tx_buffer))) + while self._tx_buffer: + packet = self._tx_buffer.pop() + self.send_impl(packet) + # Trigger callbacks to inform interface is up + for cb, args, kwargs in self._up_callbacks: + cb(self, *args, **kwargs) + elif state == InterfaceState.Down: + log.info("Interface %s: new state DOWN." % (self.__interface__,)) + self._state = self.STATE_DOWN + # Trigger callbacks to inform interface is down + for cb, args, kwargs in self._down_callbacks: + cb(self, *args, **kwargs) + + def spawn_interface(self, interface): + #print('spawn interface', interface) + for cb, args, kwargs in self._spawn_callbacks: + cb(interface, *args, **kwargs) + + def delete_interface(self, interface): + for cb, args, kwargs in self._delete_callbacks: + cb(interface, *args, **kwargs) + + #-------------------------------------------------------------------------- + + def set_reconnecting(self, reconnecting): + self._reconnecting = reconnecting + + def get_interface_type(self): + return self.__interface__ + + def get_description(self): + return str(self) + + def get_status(self): + return 'UP' if self.is_up() else 'ERROR' if self.is_error() else 'DOWN' + + def is_up(self): + return self._state == InterfaceState.Up + + def is_down(self): + return not self.is_up() + + def is_error(self): + return self.is_down() and self._error is not None + + def reinit_impl(self): + pass + + def reinit(self, **platform_config): + self.set_down() + if platform_config: + self.reconnect_impl(self, **platform_config) + self.set_up() + + #-------------------------------------------------------------------------- + # Callback management + #-------------------------------------------------------------------------- + + def add_up_callback(self, callback, *args, **kwargs): + cb_tuple = (callback, args, kwargs) + self._up_callbacks.append(cb_tuple) + + def del_up_callback(self, callback): + self._up_callbacks = [cb for cb in self._up_callbacks \ + if cb[0] == callback] + + def add_down_callback(self, callback, *args, **kwargs): + cb_tuple = (callback, args, kwargs) + self._down_callbacks.append(cb_tuple) + + def del_down_callback(self, callback): + self._down_callbacks = [cb for cb in self._down_callbacks \ + if cb[0] == callback] + + def add_spawn_callback(self, callback, *args, **kwargs): + cb_tuple = (callback, args, kwargs) + self._spawn_callbacks.append(cb_tuple) + + def del_spawn_callback(self, callback): + self._spawn_callbacks = [cb for cb in self._spawn_callbacks \ + if cb[0] == callback] + + def add_delete_callback(self, callback, *args, **kwargs): + cb_tuple = (callback, args, kwargs) + self._delete_callbacks.append(cb_tuple) + + def del_delete_callback(self, callback): + self._delete_callbacks = [cb for cb in self._delete_callbacks \ + if cb[0] == callback] + + #-------------------------------------------------------------------------- + # Interface API + #-------------------------------------------------------------------------- + + async def pending_up_impl(self): + self.set_state(InterfaceState.Up) + + def send_impl(self, packet): + query = packet.to_query() + obj = self._registered_objects.get(query.object_name) + obj.get(query, self) + + def receive_impl(self, packet): + ingress_interface = self + cb = self._callback + if cb is None: + return + if self._hook: + new_packet = self._hook(packet) + if new_packet is not None: + cb(new_packet, ingress_interface=ingress_interface) + return + cb(packet, ingress_interface=ingress_interface) + + #-------------------------------------------------------------------------- + + def send(self, packet): + if self.is_up(): + self.send_impl(packet) + else: + self._tx_buffer.append(packet) + + def receive(self, packet): + """ + For packets received from outside (eg. a remote server). + """ + self.receive_impl(packet) + + def execute(self, query): + self.send(Packet.from_query(query)) + diff --git a/netmodel/network/packet.py b/netmodel/network/packet.py new file mode 100644 index 00000000..9552b0e7 --- /dev/null +++ b/netmodel/network/packet.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import copy +import enum +import logging +import pickle +import traceback +import uuid + +from types import GeneratorType + +from netmodel.network.flow import Flow +from netmodel.network.prefix import Prefix as Prefix_ +from netmodel.model.attribute import Attribute +from netmodel.model.type import String +from netmodel.model.query import Query +from netmodel.model.filter import Filter as Filter_ +from netmodel.model.field_names import FieldNames as FieldNames_ +from netmodel.model.object import Object + +log = logging.getLogger(__name__) + +class NetmodelException(Exception): + pass + +#------------------------------------------------------------------------------ + +NETMODEL_TLV_TYPELEN_STR = '!H' +NETMODEL_TLV_SIZE = 2 +NETMODEL_TLV_TYPE_MASK = 0xfe00 +NETMODEL_TLV_TYPE_SHIFT = 9 +NETMODEL_TLV_LENGTH_MASK = 0x01ff + +NETMODEL_TLV_OBJECT_NAME = 1 +NETMODEL_TLV_FIELD = 2 +NETMODEL_TLV_FIELDS = 3 +NETMODEL_TLV_PREDICATE = 4 +NETMODEL_TLV_FILTER = 5 +NETMODEL_TLV_SRC = 6 +NETMODEL_TLV_DST = 7 +NETMODEL_TLV_PROTOCOL = 8 +NETMODEL_TLV_FLAGS = 9 +NETMODEL_TLV_PAYLOAD = 10 +NETMODEL_TLV_PACKET = 11 + +SUCCESS = 0 +WARNING = 1 +ERROR = 2 + +FLAG_LAST = 1 << 0 +FLAG_REPLY = 1 << 1 + +#------------------------------------------------------------------------------ + +class PacketProtocol(enum.Enum): + Query = 'query' + Error = 'error' + +#------------------------------------------------------------------------------ + +class VICNTLV: + + _LEN_MIN = 0 + _LEN_MAX = 511 + tlv_type = None + + _tlv_parsers = {} # Attributes... + tlvs = [] + + def decode(self, buf): + (self.typelen, ) = struct.unpack( + NETMODEL_TLV_TYPELEN_STR, buf[:NETMODEL_TLV_SIZE]) + tlv_type = \ + (self.typelen & NETMODEL_TLV_TYPE_MASK) >> NETMODEL_TLV_TYPE_SHIFT + assert self.tlv_type == tlv_type + + self.len = self.typelen & NETMODEL_TLV_LENGTH_MASK + assert len(buf) >= self.len + NETMODEL_TLV_SIZE + + self.tlv_info = buf[NETMODEL_TLV_SIZE:] + self.tlv_info = self.tlv_info[:self.len] + + #-------------------------------------------------------------------------- + # Descriptor protocol + #-------------------------------------------------------------------------- + + @classmethod + def _get_tlv_parsers(cls): + if not cls._tlv_parsers: + cls._tlv_parsers = None + return cls._tlv_parsers + + + @staticmethod + def get_type(buf): + (typelen, ) = struct.unpack(NETMODEL_TLV_TYPELEN_STR, + buf[:NETMODEL_TLV_SIZE]) + return (typelen & NETMODEL_TLV_TYPE_MASK) >> NETMODEL_TLV_TYPE_SHIFT + + + def _len_valid(self): + return self._LEN_MIN <= self.len and self.len <= self._LEN_MAX + + #-------------------------------------------------------------------------- + + @classmethod + def _parser(cls, buf): + tlvs = [] + + while buf: + tlv_type = VICNTLV.get_type(buf) + tlv = cls._tlv_parsers[tlv_type](buf) + tlvs.append(tlv) + offset = NETMODEL_TLV_SIZE + tlv.len + buf = buf[offset:] + if tlv.tlv_type == NETMODEL_TLV_END: + break + assert len(buf) > 0 + + pkt = cls(tlvs) + + assert pkt._tlvs_len_valid() + assert pkt._tlvs_valid() + + return pkt, None, buf + + @classmethod + def parser(cls, buf): + try: + return cls._parser(buf) + except: + return None, None, buf + + def serialize(self, payload, prev): + data = bytearray() + for tlv in self.tlvs: + data += tlv.serialize() + + return data + + @classmethod + def set_type(cls, tlv_cls): + cls._tlv_parsers[tlv_cls.tlv_type] = tlv_cls + + @classmethod + def get_type(cls, tlv_type): + return cls._tlv_parsers[tlv_type] + + @classmethod + def set_tlv_type(cls, tlv_type): + def _set_type(tlv_cls): + tlv_cls.tlv_type = tlv_type + #cls.set_type(tlv_cls) + return tlv_cls + return _set_type + + def __len__(self): + return sum(NETMODEL_TLV_SIZE + tlv.len for tlv in self.tlvs) + +#------------------------------------------------------------------------------ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_OBJECT_NAME) +class ObjectName(VICNTLV): pass + +@VICNTLV.set_tlv_type(NETMODEL_TLV_FIELD) +class Field(VICNTLV): + """Field == STR + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_PREDICATE) +class Predicate(VICNTLV): + """Predicate == key, op, value + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_FILTER) +class Filter(Filter_, VICNTLV): + """Filter == Array<Predicates> + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_FIELDS) +class FieldNames(FieldNames_, VICNTLV): + """Fields == Array<Field> + """ + + +class Prefix(Object, Prefix_, VICNTLV): + object_name = ObjectName() + filter = Filter() + field_names = FieldNames() + + def __init__(self, *args, **kwargs): + Object.__init__(self) + Prefix_.__init__(self, *args, **kwargs) + VICNTLV.__init__(self) + + def get_tuple(self): + return (self.object_name, self.filter, self.field_names) + + def __eq__(self, other): + return self.get_tuple() == other.get_tuple() + + def __hash__(self): + return hash(self.get_tuple()) + +@VICNTLV.set_tlv_type(NETMODEL_TLV_SRC) +class Source(Prefix): + """Source address + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_DST) +class Destination(Prefix): + """Destination address + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_PROTOCOL) +class Protocol(Attribute, VICNTLV): + """Protocol + """ + + +@VICNTLV.set_tlv_type(NETMODEL_TLV_FLAGS) +class Flags(Attribute, VICNTLV): + """Flags: last, ... + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_PAYLOAD) +class Payload(Attribute, VICNTLV): + """Payload + """ + +@VICNTLV.set_tlv_type(NETMODEL_TLV_PACKET) +class Packet(Object, VICNTLV): + """Base packet class + """ + source = Source() + destination = Destination(Prefix) + protocol = Protocol(String, default = 'query') + flags = Flags() + payload = Payload() + + # This should be dispatched across L3 L4 L7 + + def __init__(self, source = None, destination = None, protocol = None, + flags = 0, payload = None): + self.source = source + self.destination = destination + self.protocol = protocol + self.flags = flags + self.payload = payload + + def get_flow(self): + return Flow(self.source, self.destination) + + @staticmethod + def from_query(query, src_query = None, reply = False): + packet = Packet() + if src_query: + address = Prefix( + object_name = src_query.object_name, + filter = src_query.filter, + field_names = src_query.field_names, + aggregate = src_query.aggregate) + if reply: + packet.destination = address + else: + packet.source = address + + if query: + address = Prefix( + object_name = query.object_name, + filter = query.filter, + field_names = query.field_names, + aggregate = query.aggregate) + + if reply: + packet.source = address + else: + packet.destination = address + + packet.payload = (query.action, query.params) + + packet.protocol = 'sync' + packet.last = not query or query.last + packet.reply = reply + + return packet + + def to_query(self): + action, params = self.payload + + address = self.source if self.reply else self.destination + object_name = address.object_name + filter = address.filter + field_names = address.field_names + aggregate = address.aggregate + + return Query(action, object_name, filter, params, field_names, + aggregate = aggregate, last = self.last, reply = self.reply) + + @property + def last(self): + return self.flags & FLAG_LAST + + @last.setter + def last(self, last): + if last: + self.flags |= FLAG_LAST + else: + self.flags &= ~FLAG_LAST + + @property + def reply(self): + return self.flags & FLAG_REPLY + + @reply.setter + def reply(self, reply): + if reply: + self.flags |= FLAG_REPLY + else: + self.flags &= ~FLAG_REPLY + +class ErrorPacket(Packet): + """ + Analog with ICMP errors packets in IP networks + """ + + #-------------------------------------------------------------------------- + # Constructor + #-------------------------------------------------------------------------- + + def __init__(self, type = ERROR, code = ERROR, message = None, + traceback = None, **kwargs): + assert not traceback or isinstance(traceback, str) + + Packet.__init__(self, **kwargs) + self.protocol = PacketProtocol.Error + self.last = True + self._type = type + self._code = code + self._message = message + self._traceback = traceback + + #-------------------------------------------------------------------------- + # Static methods + #-------------------------------------------------------------------------- + + @staticmethod + def from_exception(packet, e): + if isinstance(e, NetmodelException): + error_packet = ErrorPacket( + type = e.TYPE, # eg. ERROR + code = e.CODE, # eg. BADARGS + message = str(e), #e.message, + traceback = traceback.format_exc(), + last = True + ) + else: + error_packet = ErrorPacket( + type = ERROR, + code = UNHANDLED_EXCEPTION, + message = str(e), + traceback = traceback.format_exc(), + last = True + ) + error_packet.set_source(packet.get_destination()) + error_packet.set_destination(packet.get_source()) + return error_packet + + def get_message(self): + """ + Returns: + The error message related to this ErrorPacket. + """ + return self._message + + def get_traceback(self): + """ + Returns: + The traceback related to this ErrorPacket. + """ + return self._traceback + + def get_origin(self): + """ + Returns: + A value among {code::CORE, code::GATEWAY} + identifying who is the origin of this ErrorPacket. + """ + return self._origin + + def get_code(self): + """ + Returns: + The error code of the Error carried by this ErrorPacket. + """ + return self._code + + def get_type(self): + """ + Returns: + The error type of the Error carried by this ErrorPacket. + """ + return self._type + + def __repr__(self): + """ + Returns: + The '%r' representation of this ERROR Packet. + """ + return "<Packet.%s: %s>" % ( + Packet.get_protocol_name(self.get_protocol()), + self.get_message() + ) diff --git a/netmodel/network/prefix.py b/netmodel/network/prefix.py new file mode 100644 index 00000000..00b5db71 --- /dev/null +++ b/netmodel/network/prefix.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class Prefix: + def __init__(self, object_name = None, filter = None, field_names = None, + aggregate = None): + self.object_name = object_name + self.filter = filter + self.field_names = field_names + self.aggregate = aggregate + + def __hash__(self): + return hash(self.get_tuple()) + + def get_tuple(self): + return (self.object_name, self.filter, self.field_names, + self.aggregate) + + def __repr__(self): + return '<Prefix {}>'.format(self.get_tuple()) + + __str__ = __repr__ diff --git a/netmodel/network/router.py b/netmodel/network/router.py new file mode 100644 index 00000000..84d69dca --- /dev/null +++ b/netmodel/network/router.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import logging +import random +import string +import traceback + +from netmodel.network.interface import Interface, InterfaceState +from netmodel.network.fib import FIB +from netmodel.network.flow_table import FlowTable +from netmodel.network.packet import Packet, ErrorPacket + +log = logging.getLogger(__name__) + +class Router: + + #-------------------------------------------------------------------------- + # Constructor, destructor, accessors + #-------------------------------------------------------------------------- + + def __init__(self, vicn_callback = None): + """ + Constructor. + Args: + allowed_capabilities: A Capabilities instance which defines which + operation can be performed by this Router. Pass None if there + is no restriction. + """ + # FIB + self._fib = FIB() + + # Per-interface flow table + self._flow_table = FlowTable() + + # interface_uuid -> interface + self._interfaces = dict() + + self._vicn_callback = vicn_callback + + def terminate(self): + for interface in self._interfaces.values(): + interface.terminate() + + # Accessors + + def get_fib(self): + return self._fib + + #-------------------------------------------------------------------------- + # Collection management + #-------------------------------------------------------------------------- + + def register_local_collection(self, cls): + self.get_interface(LOCAL_NAMESPACE).register_collection(cls, + LOCAL_NAMESPACE) + + def register_collection(self, cls, namespace=None): + self.get_interface(LOCAL_NAMESPACE).register_collection(cls, namespace) + + #-------------------------------------------------------------------------- + # Interface management + #-------------------------------------------------------------------------- + + def _register_interface(self, interface, name=None): + if not name: + name = 'interface-' + ''.join(random.choice(string.ascii_uppercase + + string.digits) for _ in range(3)) + interface.name = name + self._interfaces[name] = interface + + # Populate interface callbacks + interface.add_up_callback(self.on_interface_up) + interface.add_down_callback(self.on_interface_down) + interface.add_spawn_callback(self.on_interface_spawn) + interface.add_delete_callback(self.on_interface_delete) + + log.info('Successfully created interface {} with name {}'.format( + interface.__interface__, name)) + + interface.set_state(InterfaceState.PendingUp) + + for prefix in interface.get_prefixes(): + self._fib.add(prefix, [interface]) + + return interface + + def _unregister_interface(self, interface): + del self._interfaces[interface.name] + + # Interface events + + #-------------------------------------------------------------------------- + # Interface management + #-------------------------------------------------------------------------- + + def on_interface_up(self, interface): + """ + This callback is triggered when an interface becomes up. + + The router will request metadata. + The flow table is notified. + """ + self._flow_table._on_interface_up(interface) + + def on_interface_down(self, interface): + # We need to remove corresponding FIB entries + log.info("Router interface is now down") + + def on_interface_spawn(self, interface): + self._register_interface(interface) + + def on_interface_delete(self, interface): + """Callback : an interface has been deleted. + + - TODO : purge the FIB + - inform the flow table for managing pending subscriptions. + """ + self._unregister_interface(interface) + self._flow_table._on_interface_delete(interface) + + #--------------------------------------------------------------------------- + # Public API + #--------------------------------------------------------------------------- + + def add_interface(self, interface_type, name=None, namespace=None, + **platform_config): + """ + namespace is used to force appending of a namespace to the tables. + existing namespaces are thus ignored. + + # This is the public facing interface, which internally uses + # _register_interface. + """ + interface_cls = Interface.get(interface_type) + if interface_cls is None: + log.warning("Could not create a %(interface_type)s interface" % \ + locals()) + return None + + try: + # passes a callback to the Interface + # no hook + platform_config['callback'] = self._on_receive + interface = interface_cls(self, **platform_config) + except Exception as e: + traceback.print_exc() + raise Exception("Cannot create interface %s of type %s with parameters %r: %s" + % (name, interface_type, + platform_config, e)) + self._register_interface(interface, name) + return interface + + def is_interface_up(self, interface_name): + interface = self._interfaces.get(interface_name) + if not interface: + return False + return self._interfaces[interface_name].is_up() + + def del_platform(self, platform_name, rebuild = True): + """ + Remove a platform from this Router. This platform is no more + registered. The corresponding Announces are also removed. + Args: + platform_name: A String containing a platform name. + rebuild: True if the DbGraph must be rebuild. + Returns: + True if it altered this Router. + """ + ret = False + try: + del self._interfaces[platform_name] + ret = True + except KeyError: + pass + + self.disable_platform(platform_name, rebuild) + return ret + + def get_interface(self, platform_name): + """ + Retrieve the Interface instance corresponding to a platform. + Args: + platform_name: A String containing the name of the platform. + Raises: + ValueError: if platform_name is invalid. + RuntimeError: in case of failure. + Returns: + The corresponding Interface if found, None otherwise. + """ + if platform_name.lower() != platform_name: + raise ValueError("Invalid platform_name = %s, must be lower case" \ + % platform_name) + + if platform_name in self._interfaces: + return self._interfaces[platform_name] + + raise RuntimeError("%s is not yet registered" % platform_name) + + def get_interface_names(self): + return self._interfaces.keys() + + def get_interfaces(self): + return self._interfaces.values() + + #-------------------------------------------------------------------------- + # Packet operations + #-------------------------------------------------------------------------- + + def _on_receive(self, packet, ingress_interface): + """Handles reception of a new packet. + + An incoming packet is forwarder either: + - using the reverse path is there is a match with the ingress + interface flow table + - using the FIB if no match is found + """ + orig_interface = self._flow_table.match(packet, ingress_interface) + if orig_interface: + orig_interface.send(packet) + return + + if isinstance(packet, str): + # Workaround : internal command + if self._vicn_callback: + self._vicn_callback(packet) + return + + if packet.source is None and packet.destination is None: + log.warning('TODO: handle NULL packet, need source on all packets') + return + + # Get route from FIB + interface = self._fib.get(packet.destination.object_name) + if not interface: + return + + # Update flow table before sending + self._flow_table.add(packet, ingress_interface, interface) + + interface.send(packet) |