aboutsummaryrefslogtreecommitdiffstats
path: root/netmodel/network/flow_table.py
diff options
context:
space:
mode:
Diffstat (limited to 'netmodel/network/flow_table.py')
-rw-r--r--netmodel/network/flow_table.py145
1 files changed, 113 insertions, 32 deletions
diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py
index 99e42e99..2da40d1c 100644
--- a/netmodel/network/flow_table.py
+++ b/netmodel/network/flow_table.py
@@ -16,7 +16,10 @@
# limitations under the License.
#
-from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE
+import copy
+from collections import defaultdict, Counter
+
+from netmodel.model.query import Query, ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE
# Per-interface flow table
class SubFlowTable:
@@ -25,21 +28,33 @@ class SubFlowTable:
self._interface = interface
# Flow -> ingress interface
- self._flows = dict()
-
- # Ingress interface -> list of subscription
- self._subscriptions = dict()
+ self._flows = defaultdict(set)
def add(self, packet, ingress_interface):
flow = packet.get_flow()
- self._flows[flow] = ingress_interface
+ self._flows[flow].add(ingress_interface)
def match(self, packet):
flow = packet.get_flow()
return self._flows.get(flow.get_reverse())
+ def _on_interface_delete(self, interface):
+ """
+ Returns:
+ False is the flow table is empty.
+ """
+ to_remove = set()
+ for flow, ingress_interfaces in self._flows.items():
+ ingress_interfaces.discard(interface)
+ if not ingress_interfaces:
+ to_remove.add(flow)
+ for flow in to_remove:
+ del self._flows[flow]
+
+ return len(self._flows) > 0
+
class Subscription:
- def __init__(self, packet, ingress_list, egress_list):
+ def __init__(self, packet, ingress_set, egress_set):
"""
Args:
packet : subscription packet
@@ -47,8 +62,22 @@ class Subscription:
egress_list (List[Interface]) : list of egress interface
"""
self._packet = packet
- self._ingress_list = ingress_list
- self._egress_list = egress_list
+ #self._ingress_set = ingress_set
+ #self._egress_set = egress_set
+
+ def get_tuple(self):
+ return (self._packet,)
+
+ def __eq__(self, other):
+ return self.get_tuple() == other.get_tuple()
+
+ def __hash__(self):
+ return hash(self.get_tuple())
+
+ def __repr__(self):
+ return '<Subscription {}>'.format(self._packet.to_query())
+
+ __str__ = __repr__
class FlowTable:
"""
@@ -76,12 +105,30 @@ class FlowTable:
# The Flow Table also maintains a list of subscriptions doubly indexed
# by both subscriptors, and egress interfaces
- # ingress_interface -> list of subscriptions
- self._ingress_subscriptions = dict()
-
- # egress_interface -> list of subscriptions
- self._egress_subscriptions = dict()
-
+ # ingress_interface -> bag of subscriptions
+ self._ingress_subscriptions = defaultdict(Counter)
+
+ # egress_interface -> bag of subscriptions
+ self._egress_subscriptions = defaultdict(Counter)
+
+ def dump(self, msg=''):
+ print("="*80)
+ print("FLOW TABLE {}".format(msg))
+ print("-" * 80)
+ print("SubFlowTables")
+ for interface, flow_table in self._sub_flow_tables.items():
+ for k, v in flow_table._flows.items():
+ print(interface, "\t", k, "\t", v)
+ print("-" * 80)
+ print("Ingress subscriptions")
+ for interface, subscriptions in self._ingress_subscriptions.items():
+ print(interface, "\t", subscriptions)
+ print("-" * 80)
+ print("Egress subscriptions")
+ for interface, subscriptions in self._egress_subscriptions.items():
+ print(interface, "\t", subscriptions)
+ print("=" * 80)
+ print("")
def match(self, packet, interface):
"""
@@ -95,34 +142,30 @@ class FlowTable:
if not sub_flow_table:
return None
return sub_flow_table.match(packet)
-
- def add(self, packet, ingress_interface, interface):
- sub_flow_table = self._sub_flow_tables.get(interface)
- if not sub_flow_table:
- sub_flow_table = SubFlowTable(interface)
- self._sub_flow_tables[interface] = sub_flow_table
- sub_flow_table.add(packet, ingress_interface)
+
+ def add(self, packet, ingress_interface, interfaces):
+ for interface in interfaces:
+ sub_flow_table = self._sub_flow_tables.get(interface)
+ if not sub_flow_table:
+ sub_flow_table = SubFlowTable(interface)
+ self._sub_flow_tables[interface] = sub_flow_table
+ sub_flow_table.add(packet, ingress_interface)
# If the flow is a subscription, we need to associate it to the list
query = packet.to_query()
if query.action == ACTION_SUBSCRIBE:
# XXX we currently don't merge subscriptions, and assume a single
# next hop interface
- s = Subscription(packet, [ingress_interface], [interface])
+ s = Subscription(packet, set([ingress_interface]), interfaces)
if ingress_interface:
- if not ingress_interface in self._ingress_subscriptions:
- self._ingress_subscriptions[ingress_interface] = list()
- self._ingress_subscriptions[ingress_interface].append(s)
-
- if not interface in self._egress_subscriptions:
- self._egress_subscriptions[interface] = list()
- self._egress_subscriptions[interface].append(s)
+ self._ingress_subscriptions[ingress_interface] += Counter([s])
+ for interface in interfaces:
+ self._egress_subscriptions[interface] += Counter([s])
elif query.action == ACTION_UNSUBSCRIBE:
raise NotImplementedError
-
# Events
def _on_interface_up(self, interface):
@@ -141,9 +184,47 @@ class FlowTable:
"""
Callback: an interface has been deleted
- Cancel all subscriptions that have been issues from
+ Cancel all subscriptions that have been issues from
netmodel.network.interface.
Remove all pointers to subscriptions pending on this interface
"""
+
if interface in self._ingress_subscriptions:
+ # If the interface we delete at the origin of the subscription,
+ # let's also remove corresponding egress subscriptions
+ subs = self._ingress_subscriptions[interface]
+ if not subs:
+ return
+
+ to_remove = set()
+ for _interface, subscriptions in self._egress_subscriptions.items():
+
+ removed = subs & subscriptions
+ if removed:
+ for s in removed:
+ # We found a subscription of this interface on an other
+ # interface; send unsubscribe...
+ action, params = s._packet.payload
+ p = copy.deepcopy(s._packet)
+ p.payload = (ACTION_UNSUBSCRIBE, params)
+ _interface.send(p)
+ # ... and remove them
+ subscriptions -= removed
+
+ # if the interface has no more subscription remove it.
+ if not subscriptions:
+ to_remove.add(_interface)
+
+ for i in to_remove:
+ del self._egress_subscriptions[i]
+
del self._ingress_subscriptions[interface]
+
+ # Remove interface from flow table destination
+ to_remove = set()
+ for _interface, sub_flow_table in self._sub_flow_tables.items():
+ remove = sub_flow_table._on_interface_delete(interface)
+ if not remove:
+ to_remove.add(_interface)
+ for _interface in to_remove:
+ del self._sub_flow_tables[_interface]