diff options
Diffstat (limited to 'netmodel/network/flow_table.py')
-rw-r--r-- | netmodel/network/flow_table.py | 145 |
1 files changed, 113 insertions, 32 deletions
diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py index 99e42e99..2da40d1c 100644 --- a/netmodel/network/flow_table.py +++ b/netmodel/network/flow_table.py @@ -16,7 +16,10 @@ # limitations under the License. # -from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE +import copy +from collections import defaultdict, Counter + +from netmodel.model.query import Query, ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE # Per-interface flow table class SubFlowTable: @@ -25,21 +28,33 @@ class SubFlowTable: self._interface = interface # Flow -> ingress interface - self._flows = dict() - - # Ingress interface -> list of subscription - self._subscriptions = dict() + self._flows = defaultdict(set) def add(self, packet, ingress_interface): flow = packet.get_flow() - self._flows[flow] = ingress_interface + self._flows[flow].add(ingress_interface) def match(self, packet): flow = packet.get_flow() return self._flows.get(flow.get_reverse()) + def _on_interface_delete(self, interface): + """ + Returns: + False is the flow table is empty. + """ + to_remove = set() + for flow, ingress_interfaces in self._flows.items(): + ingress_interfaces.discard(interface) + if not ingress_interfaces: + to_remove.add(flow) + for flow in to_remove: + del self._flows[flow] + + return len(self._flows) > 0 + class Subscription: - def __init__(self, packet, ingress_list, egress_list): + def __init__(self, packet, ingress_set, egress_set): """ Args: packet : subscription packet @@ -47,8 +62,22 @@ class Subscription: egress_list (List[Interface]) : list of egress interface """ self._packet = packet - self._ingress_list = ingress_list - self._egress_list = egress_list + #self._ingress_set = ingress_set + #self._egress_set = egress_set + + def get_tuple(self): + return (self._packet,) + + def __eq__(self, other): + return self.get_tuple() == other.get_tuple() + + def __hash__(self): + return hash(self.get_tuple()) + + def __repr__(self): + return '<Subscription {}>'.format(self._packet.to_query()) + + __str__ = __repr__ class FlowTable: """ @@ -76,12 +105,30 @@ class FlowTable: # The Flow Table also maintains a list of subscriptions doubly indexed # by both subscriptors, and egress interfaces - # ingress_interface -> list of subscriptions - self._ingress_subscriptions = dict() - - # egress_interface -> list of subscriptions - self._egress_subscriptions = dict() - + # ingress_interface -> bag of subscriptions + self._ingress_subscriptions = defaultdict(Counter) + + # egress_interface -> bag of subscriptions + self._egress_subscriptions = defaultdict(Counter) + + def dump(self, msg=''): + print("="*80) + print("FLOW TABLE {}".format(msg)) + print("-" * 80) + print("SubFlowTables") + for interface, flow_table in self._sub_flow_tables.items(): + for k, v in flow_table._flows.items(): + print(interface, "\t", k, "\t", v) + print("-" * 80) + print("Ingress subscriptions") + for interface, subscriptions in self._ingress_subscriptions.items(): + print(interface, "\t", subscriptions) + print("-" * 80) + print("Egress subscriptions") + for interface, subscriptions in self._egress_subscriptions.items(): + print(interface, "\t", subscriptions) + print("=" * 80) + print("") def match(self, packet, interface): """ @@ -95,34 +142,30 @@ class FlowTable: if not sub_flow_table: return None return sub_flow_table.match(packet) - - def add(self, packet, ingress_interface, interface): - sub_flow_table = self._sub_flow_tables.get(interface) - if not sub_flow_table: - sub_flow_table = SubFlowTable(interface) - self._sub_flow_tables[interface] = sub_flow_table - sub_flow_table.add(packet, ingress_interface) + + def add(self, packet, ingress_interface, interfaces): + for interface in interfaces: + sub_flow_table = self._sub_flow_tables.get(interface) + if not sub_flow_table: + sub_flow_table = SubFlowTable(interface) + self._sub_flow_tables[interface] = sub_flow_table + sub_flow_table.add(packet, ingress_interface) # If the flow is a subscription, we need to associate it to the list query = packet.to_query() if query.action == ACTION_SUBSCRIBE: # XXX we currently don't merge subscriptions, and assume a single # next hop interface - s = Subscription(packet, [ingress_interface], [interface]) + s = Subscription(packet, set([ingress_interface]), interfaces) if ingress_interface: - if not ingress_interface in self._ingress_subscriptions: - self._ingress_subscriptions[ingress_interface] = list() - self._ingress_subscriptions[ingress_interface].append(s) - - if not interface in self._egress_subscriptions: - self._egress_subscriptions[interface] = list() - self._egress_subscriptions[interface].append(s) + self._ingress_subscriptions[ingress_interface] += Counter([s]) + for interface in interfaces: + self._egress_subscriptions[interface] += Counter([s]) elif query.action == ACTION_UNSUBSCRIBE: raise NotImplementedError - # Events def _on_interface_up(self, interface): @@ -141,9 +184,47 @@ class FlowTable: """ Callback: an interface has been deleted - Cancel all subscriptions that have been issues from + Cancel all subscriptions that have been issues from netmodel.network.interface. Remove all pointers to subscriptions pending on this interface """ + if interface in self._ingress_subscriptions: + # If the interface we delete at the origin of the subscription, + # let's also remove corresponding egress subscriptions + subs = self._ingress_subscriptions[interface] + if not subs: + return + + to_remove = set() + for _interface, subscriptions in self._egress_subscriptions.items(): + + removed = subs & subscriptions + if removed: + for s in removed: + # We found a subscription of this interface on an other + # interface; send unsubscribe... + action, params = s._packet.payload + p = copy.deepcopy(s._packet) + p.payload = (ACTION_UNSUBSCRIBE, params) + _interface.send(p) + # ... and remove them + subscriptions -= removed + + # if the interface has no more subscription remove it. + if not subscriptions: + to_remove.add(_interface) + + for i in to_remove: + del self._egress_subscriptions[i] + del self._ingress_subscriptions[interface] + + # Remove interface from flow table destination + to_remove = set() + for _interface, sub_flow_table in self._sub_flow_tables.items(): + remove = sub_flow_table._on_interface_delete(interface) + if not remove: + to_remove.add(_interface) + for _interface in to_remove: + del self._sub_flow_tables[_interface] |