aboutsummaryrefslogtreecommitdiffstats
path: root/netmodel/network/flow_table.py
diff options
context:
space:
mode:
Diffstat (limited to 'netmodel/network/flow_table.py')
-rw-r--r--netmodel/network/flow_table.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py
new file mode 100644
index 00000000..86c6e52e
--- /dev/null
+++ b/netmodel/network/flow_table.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE
+
+# Per-interface flow table
+class SubFlowTable:
+ def __init__(self, interface):
+ # Interface to which this flow table is associated
+ self._interface = interface
+
+ # Flow -> ingress interface
+ self._flows = dict()
+
+ # Ingress interface -> list of subscription
+ self._subscriptions = dict()
+
+ def add(self, packet, ingress_interface):
+ flow = packet.get_flow()
+ self._flows[flow] = ingress_interface
+
+ def match(self, packet):
+ flow = packet.get_flow()
+ return self._flows.get(flow.get_reverse())
+
+class Subscription:
+ def __init__(self, packet, ingress_list, egress_list):
+ """
+ Args:
+ packet : subscription packet
+ ingress_list (List[Interface]) : list of ingress interface
+ egress_list (List[Interface]) : list of egress interface
+ """
+ self._packet = packet
+ self._ingress_list = ingress_list
+ self._egress_list = egress_list
+
+class FlowTable:
+ """
+ The flow table managed flow association between packets, as well as the
+ subscription state for each flow.
+
+ Event management
+ ================
+
+ The flow table has to be notified from netmodel.network.interface becomes
+ up, and when it is deleted. This is handled through the following events:
+
+ _on_interface_up
+ . resend any pending subscription on the interface
+ _on_interface_deleted
+ . delete any subscription
+ """
+
+ def __init__(self):
+ # Per interface (sub) flow tables
+ # flow are added when forwarded using FIB
+ # matched upon returning
+ self._sub_flow_tables = dict()
+
+ # The Flow Table also maintains a list of subscriptions doubly indexed
+ # by both subscriptors, and egress interfaces
+
+ # ingress_interface -> list of subscriptions
+ self._ingress_subscriptions = dict()
+
+ # egress_interface -> list of subscriptions
+ self._egress_subscriptions = dict()
+
+
+ def match(self, packet, interface):
+ """
+ Check whether the packet arriving on interface is a reply.
+
+ Returns:
+ interface that originally requested the packet
+ None if not found
+ """
+ sub_flow_table = self._sub_flow_tables.get(interface)
+ if not sub_flow_table:
+ return None
+ return sub_flow_table.match(packet)
+
+ def add(self, packet, ingress_interface, interface):
+ sub_flow_table = self._sub_flow_tables.get(interface)
+ if not sub_flow_table:
+ sub_flow_table = SubFlowTable(interface)
+ self._sub_flow_tables[interface] = sub_flow_table
+ sub_flow_table.add(packet, ingress_interface)
+
+ # If the flow is a subscription, we need to associate it to the list
+ query = packet.to_query()
+ if query.action == ACTION_SUBSCRIBE:
+ print('adding subscription', query.to_dict())
+ # XXX we currently don't merge subscriptions, and assume a single
+ # next hop interface
+ s = Subscription(packet, [ingress_interface], [interface])
+
+ if ingress_interface:
+ if not ingress_interface in self._ingress_subscriptions:
+ self._ingress_subscriptions[ingress_interface] = list()
+ self._ingress_subscriptions[ingress_interface].append(s)
+
+ if not interface in self._egress_subscriptions:
+ self._egress_subscriptions[interface] = list()
+ self._egress_subscriptions[interface].append(s)
+
+ elif query.action == ACTION_UNSUBSCRIBE:
+ raise NotImplementedError
+
+
+ # Events
+
+ def _on_interface_up(self, interface):
+ """
+ Callback: an interface gets back up after downtime.
+
+ Resend all pending subscriptions when an interface comes back up.
+ """
+ subscriptions = self._egress_subscriptions.get(interface)
+ if not subscriptions:
+ return
+ for s in subscriptions:
+ interface.send(s._packet)
+
+ def _on_interface_delete(self, interface):
+ """
+ Callback: an interface has been deleted
+
+ Cancel all subscriptions that have been issues from
+ netmodel.network.interface.
+ Remove all pointers to subscriptions pending on this interface
+ """
+ if interface in self._ingress_subscriptions:
+ del self._ingress_subscriptions[interface]