diff options
Diffstat (limited to 'netmodel/network')
-rw-r--r-- | netmodel/network/fib.py | 10 | ||||
-rw-r--r-- | netmodel/network/flow_table.py | 145 | ||||
-rw-r--r-- | netmodel/network/interface.py | 20 | ||||
-rw-r--r-- | netmodel/network/packet.py | 34 | ||||
-rw-r--r-- | netmodel/network/prefix.py | 13 | ||||
-rw-r--r-- | netmodel/network/router.py | 35 |
6 files changed, 181 insertions, 76 deletions
diff --git a/netmodel/network/fib.py b/netmodel/network/fib.py index e6b81607..11b90b22 100644 --- a/netmodel/network/fib.py +++ b/netmodel/network/fib.py @@ -39,7 +39,10 @@ class FIB: self._entries = dict() def add(self, prefix, next_hops = None): - self._entries[prefix] = FIBEntry(prefix, next_hops) + if prefix not in self._entries: + self._entries[prefix] = FIBEntry(prefix, next_hops) + else: + self._entries[prefix].update(next_hops) def update(self, prefix, next_hops = None): entry = self._entries.get(prefix) @@ -54,10 +57,11 @@ class FIB: raise Exception('prefix not found') entry.remove(next_hops) return - + del self._entries[prefix] def get(self, object_name): for entry in self._entries.values(): if entry._prefix.object_name == object_name: - return next(iter(entry._next_hops)) + return entry._next_hops + return None diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py index 99e42e99..2da40d1c 100644 --- a/netmodel/network/flow_table.py +++ b/netmodel/network/flow_table.py @@ -16,7 +16,10 @@ # limitations under the License. # -from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE +import copy +from collections import defaultdict, Counter + +from netmodel.model.query import Query, ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE # Per-interface flow table class SubFlowTable: @@ -25,21 +28,33 @@ class SubFlowTable: self._interface = interface # Flow -> ingress interface - self._flows = dict() - - # Ingress interface -> list of subscription - self._subscriptions = dict() + self._flows = defaultdict(set) def add(self, packet, ingress_interface): flow = packet.get_flow() - self._flows[flow] = ingress_interface + self._flows[flow].add(ingress_interface) def match(self, packet): flow = packet.get_flow() return self._flows.get(flow.get_reverse()) + def _on_interface_delete(self, interface): + """ + Returns: + False is the flow table is empty. + """ + to_remove = set() + for flow, ingress_interfaces in self._flows.items(): + ingress_interfaces.discard(interface) + if not ingress_interfaces: + to_remove.add(flow) + for flow in to_remove: + del self._flows[flow] + + return len(self._flows) > 0 + class Subscription: - def __init__(self, packet, ingress_list, egress_list): + def __init__(self, packet, ingress_set, egress_set): """ Args: packet : subscription packet @@ -47,8 +62,22 @@ class Subscription: egress_list (List[Interface]) : list of egress interface """ self._packet = packet - self._ingress_list = ingress_list - self._egress_list = egress_list + #self._ingress_set = ingress_set + #self._egress_set = egress_set + + def get_tuple(self): + return (self._packet,) + + def __eq__(self, other): + return self.get_tuple() == other.get_tuple() + + def __hash__(self): + return hash(self.get_tuple()) + + def __repr__(self): + return '<Subscription {}>'.format(self._packet.to_query()) + + __str__ = __repr__ class FlowTable: """ @@ -76,12 +105,30 @@ class FlowTable: # The Flow Table also maintains a list of subscriptions doubly indexed # by both subscriptors, and egress interfaces - # ingress_interface -> list of subscriptions - self._ingress_subscriptions = dict() - - # egress_interface -> list of subscriptions - self._egress_subscriptions = dict() - + # ingress_interface -> bag of subscriptions + self._ingress_subscriptions = defaultdict(Counter) + + # egress_interface -> bag of subscriptions + self._egress_subscriptions = defaultdict(Counter) + + def dump(self, msg=''): + print("="*80) + print("FLOW TABLE {}".format(msg)) + print("-" * 80) + print("SubFlowTables") + for interface, flow_table in self._sub_flow_tables.items(): + for k, v in flow_table._flows.items(): + print(interface, "\t", k, "\t", v) + print("-" * 80) + print("Ingress subscriptions") + for interface, subscriptions in self._ingress_subscriptions.items(): + print(interface, "\t", subscriptions) + print("-" * 80) + print("Egress subscriptions") + for interface, subscriptions in self._egress_subscriptions.items(): + print(interface, "\t", subscriptions) + print("=" * 80) + print("") def match(self, packet, interface): """ @@ -95,34 +142,30 @@ class FlowTable: if not sub_flow_table: return None return sub_flow_table.match(packet) - - def add(self, packet, ingress_interface, interface): - sub_flow_table = self._sub_flow_tables.get(interface) - if not sub_flow_table: - sub_flow_table = SubFlowTable(interface) - self._sub_flow_tables[interface] = sub_flow_table - sub_flow_table.add(packet, ingress_interface) + + def add(self, packet, ingress_interface, interfaces): + for interface in interfaces: + sub_flow_table = self._sub_flow_tables.get(interface) + if not sub_flow_table: + sub_flow_table = SubFlowTable(interface) + self._sub_flow_tables[interface] = sub_flow_table + sub_flow_table.add(packet, ingress_interface) # If the flow is a subscription, we need to associate it to the list query = packet.to_query() if query.action == ACTION_SUBSCRIBE: # XXX we currently don't merge subscriptions, and assume a single # next hop interface - s = Subscription(packet, [ingress_interface], [interface]) + s = Subscription(packet, set([ingress_interface]), interfaces) if ingress_interface: - if not ingress_interface in self._ingress_subscriptions: - self._ingress_subscriptions[ingress_interface] = list() - self._ingress_subscriptions[ingress_interface].append(s) - - if not interface in self._egress_subscriptions: - self._egress_subscriptions[interface] = list() - self._egress_subscriptions[interface].append(s) + self._ingress_subscriptions[ingress_interface] += Counter([s]) + for interface in interfaces: + self._egress_subscriptions[interface] += Counter([s]) elif query.action == ACTION_UNSUBSCRIBE: raise NotImplementedError - # Events def _on_interface_up(self, interface): @@ -141,9 +184,47 @@ class FlowTable: """ Callback: an interface has been deleted - Cancel all subscriptions that have been issues from + Cancel all subscriptions that have been issues from netmodel.network.interface. Remove all pointers to subscriptions pending on this interface """ + if interface in self._ingress_subscriptions: + # If the interface we delete at the origin of the subscription, + # let's also remove corresponding egress subscriptions + subs = self._ingress_subscriptions[interface] + if not subs: + return + + to_remove = set() + for _interface, subscriptions in self._egress_subscriptions.items(): + + removed = subs & subscriptions + if removed: + for s in removed: + # We found a subscription of this interface on an other + # interface; send unsubscribe... + action, params = s._packet.payload + p = copy.deepcopy(s._packet) + p.payload = (ACTION_UNSUBSCRIBE, params) + _interface.send(p) + # ... and remove them + subscriptions -= removed + + # if the interface has no more subscription remove it. + if not subscriptions: + to_remove.add(_interface) + + for i in to_remove: + del self._egress_subscriptions[i] + del self._ingress_subscriptions[interface] + + # Remove interface from flow table destination + to_remove = set() + for _interface, sub_flow_table in self._sub_flow_tables.items(): + remove = sub_flow_table._on_interface_delete(interface) + if not remove: + to_remove.add(_interface) + for _interface in to_remove: + del self._sub_flow_tables[_interface] diff --git a/netmodel/network/interface.py b/netmodel/network/interface.py index c9e31422..3bad4c41 100644 --- a/netmodel/network/interface.py +++ b/netmodel/network/interface.py @@ -44,7 +44,7 @@ class InterfaceState(enum.Enum): def register_interfaces(): Interface._factory = dict() - for loader, module_name, is_pkg in pkgutil.walk_packages(interfaces.__path__, + for loader, module_name, is_pkg in pkgutil.walk_packages(interfaces.__path__, interfaces.__name__ + '.'): # Note: we cannot skip files using is_pkg otherwise it will ignore # interfaces defined outside of __init__.py @@ -95,12 +95,12 @@ class Interface: self._tx_buffer = list() self._state = InterfaceState.Down - self._error = None + self._error = None self._reconnecting = True self._reconnection_delay = RECONNECTION_DELAY self._registered_objects = dict() - + # Callbacks self._up_callbacks = list() self._down_callbacks = list() @@ -119,7 +119,7 @@ class Interface: def __hash__(self): return hash(self._name) - #--------------------------------------------------------------------------- + #--------------------------------------------------------------------------- def register_object(self, obj): self._registered_objects[obj.__type__] = obj @@ -127,9 +127,9 @@ class Interface: def get_prefixes(self): return [ Prefix(v.__type__) for v in self._registered_objects.values() ] - #--------------------------------------------------------------------------- + #--------------------------------------------------------------------------- # State management, callbacks - #--------------------------------------------------------------------------- + #--------------------------------------------------------------------------- def set_state(self, state): asyncio.ensure_future(self._set_state(state)) @@ -170,7 +170,7 @@ class Interface: for cb, args, kwargs in self._delete_callbacks: cb(interface, *args, **kwargs) - #-------------------------------------------------------------------------- + #-------------------------------------------------------------------------- def set_reconnecting(self, reconnecting): self._reconnecting = reconnecting @@ -253,13 +253,13 @@ class Interface: def receive_impl(self, packet): ingress_interface = self cb = self._callback - if cb is None: - return if self._hook: new_packet = self._hook(packet) - if new_packet is not None: + if cb is not None and new_packet is not None: cb(new_packet, ingress_interface=ingress_interface) return + if cb is None: + return cb(packet, ingress_interface=ingress_interface) #-------------------------------------------------------------------------- diff --git a/netmodel/network/packet.py b/netmodel/network/packet.py index 9552b0e7..7edcccc4 100644 --- a/netmodel/network/packet.py +++ b/netmodel/network/packet.py @@ -109,7 +109,7 @@ class VICNTLV: @staticmethod def get_type(buf): - (typelen, ) = struct.unpack(NETMODEL_TLV_TYPELEN_STR, + (typelen, ) = struct.unpack(NETMODEL_TLV_TYPELEN_STR, buf[:NETMODEL_TLV_SIZE]) return (typelen & NETMODEL_TLV_TYPE_MASK) >> NETMODEL_TLV_TYPE_SHIFT @@ -179,7 +179,7 @@ class VICNTLV: class ObjectName(VICNTLV): pass @VICNTLV.set_tlv_type(NETMODEL_TLV_FIELD) -class Field(VICNTLV): +class Field(VICNTLV): """Field == STR """ @@ -217,7 +217,7 @@ class Prefix(Object, Prefix_, VICNTLV): def __hash__(self): return hash(self.get_tuple()) - + @VICNTLV.set_tlv_type(NETMODEL_TLV_SRC) class Source(Prefix): """Source address @@ -251,12 +251,12 @@ class Packet(Object, VICNTLV): source = Source() destination = Destination(Prefix) protocol = Protocol(String, default = 'query') - flags = Flags() - payload = Payload() + flags = Flags(String) + payload = Payload(String) # This should be dispatched across L3 L4 L7 - def __init__(self, source = None, destination = None, protocol = None, + def __init__(self, source = None, destination = None, protocol = None, flags = 0, payload = None): self.source = source self.destination = destination @@ -272,8 +272,8 @@ class Packet(Object, VICNTLV): packet = Packet() if src_query: address = Prefix( - object_name = src_query.object_name, - filter = src_query.filter, + object_name = src_query.object_name, + filter = src_query.filter, field_names = src_query.field_names, aggregate = src_query.aggregate) if reply: @@ -283,8 +283,8 @@ class Packet(Object, VICNTLV): if query: address = Prefix( - object_name = query.object_name, - filter = query.filter, + object_name = query.object_name, + filter = query.filter, field_names = query.field_names, aggregate = query.aggregate) @@ -310,7 +310,7 @@ class Packet(Object, VICNTLV): field_names = address.field_names aggregate = address.aggregate - return Query(action, object_name, filter, params, field_names, + return Query(action, object_name, filter, params, field_names, aggregate = aggregate, last = self.last, reply = self.reply) @property @@ -335,6 +335,16 @@ class Packet(Object, VICNTLV): else: self.flags &= ~FLAG_REPLY + def get_tuple(self): + return (self.source, self.destination, self.protocol, self.flags, + self.payload) + + def __eq__(self, other): + return self.get_tuple() == other.get_tuple() + + def __hash__(self): + return hash(self.get_tuple()) + class ErrorPacket(Packet): """ Analog with ICMP errors packets in IP networks @@ -344,7 +354,7 @@ class ErrorPacket(Packet): # Constructor #-------------------------------------------------------------------------- - def __init__(self, type = ERROR, code = ERROR, message = None, + def __init__(self, type = ERROR, code = ERROR, message = None, traceback = None, **kwargs): assert not traceback or isinstance(traceback, str) diff --git a/netmodel/network/prefix.py b/netmodel/network/prefix.py index 00b5db71..d444a56d 100644 --- a/netmodel/network/prefix.py +++ b/netmodel/network/prefix.py @@ -17,20 +17,23 @@ # class Prefix: - def __init__(self, object_name = None, filter = None, field_names = None, + def __init__(self, object_name = None, filter = None, field_names = None, aggregate = None): self.object_name = object_name self.filter = filter self.field_names = field_names self.aggregate = aggregate - def __hash__(self): - return hash(self.get_tuple()) - def get_tuple(self): - return (self.object_name, self.filter, self.field_names, + return (self.object_name, self.filter, self.field_names, self.aggregate) + def __eq__(self, other): + return self.get_tuple() == other.get_tuple() + + def __hash__(self): + return hash(self.get_tuple()) + def __repr__(self): return '<Prefix {}>'.format(self.get_tuple()) diff --git a/netmodel/network/router.py b/netmodel/network/router.py index 84d69dca..871cefe0 100644 --- a/netmodel/network/router.py +++ b/netmodel/network/router.py @@ -68,7 +68,7 @@ class Router: #-------------------------------------------------------------------------- def register_local_collection(self, cls): - self.get_interface(LOCAL_NAMESPACE).register_collection(cls, + self.get_interface(LOCAL_NAMESPACE).register_collection(cls, LOCAL_NAMESPACE) def register_collection(self, cls, namespace=None): @@ -97,7 +97,7 @@ class Router: interface.set_state(InterfaceState.PendingUp) for prefix in interface.get_prefixes(): - self._fib.add(prefix, [interface]) + self._fib.add(prefix, set([interface])) return interface @@ -112,8 +112,8 @@ class Router: def on_interface_up(self, interface): """ - This callback is triggered when an interface becomes up. - + This callback is triggered when an interface becomes up. + The router will request metadata. The flow table is notified. """ @@ -139,7 +139,7 @@ class Router: # Public API #--------------------------------------------------------------------------- - def add_interface(self, interface_type, name=None, namespace=None, + def add_interface(self, interface_type, name=None, namespace=None, **platform_config): """ namespace is used to force appending of a namespace to the tables. @@ -161,7 +161,7 @@ class Router: interface = interface_cls(self, **platform_config) except Exception as e: traceback.print_exc() - raise Exception("Cannot create interface %s of type %s with parameters %r: %s" + raise Exception("Cannot create interface %s of type %s with parameters %r: %s" % (name, interface_type, platform_config, e)) self._register_interface(interface, name) @@ -231,9 +231,10 @@ class Router: interface flow table - using the FIB if no match is found """ - orig_interface = self._flow_table.match(packet, ingress_interface) - if orig_interface: - orig_interface.send(packet) + orig_interfaces = self._flow_table.match(packet, ingress_interface) + if orig_interfaces: + for orig_interface in orig_interfaces: + orig_interface.send(packet) return if isinstance(packet, str): @@ -247,11 +248,17 @@ class Router: return # Get route from FIB - interface = self._fib.get(packet.destination.object_name) - if not interface: + if packet.destination is None: + log.warning("Ignored reply packet with no match in flow table {}".format( + packet.to_query())) + return + interfaces = self._fib.get(packet.destination.object_name) + if not interfaces: + log.error('No match in FIB for {}'.format( + packet.destination.object_name)) return # Update flow table before sending - self._flow_table.add(packet, ingress_interface, interface) - - interface.send(packet) + self._flow_table.add(packet, ingress_interface, interfaces) + for interface in interfaces: + interface.send(packet) |