aboutsummaryrefslogtreecommitdiffstats
path: root/netmodel/network
diff options
context:
space:
mode:
Diffstat (limited to 'netmodel/network')
-rw-r--r--netmodel/network/fib.py10
-rw-r--r--netmodel/network/flow_table.py145
-rw-r--r--netmodel/network/interface.py20
-rw-r--r--netmodel/network/packet.py34
-rw-r--r--netmodel/network/prefix.py13
-rw-r--r--netmodel/network/router.py35
6 files changed, 181 insertions, 76 deletions
diff --git a/netmodel/network/fib.py b/netmodel/network/fib.py
index e6b81607..11b90b22 100644
--- a/netmodel/network/fib.py
+++ b/netmodel/network/fib.py
@@ -39,7 +39,10 @@ class FIB:
self._entries = dict()
def add(self, prefix, next_hops = None):
- self._entries[prefix] = FIBEntry(prefix, next_hops)
+ if prefix not in self._entries:
+ self._entries[prefix] = FIBEntry(prefix, next_hops)
+ else:
+ self._entries[prefix].update(next_hops)
def update(self, prefix, next_hops = None):
entry = self._entries.get(prefix)
@@ -54,10 +57,11 @@ class FIB:
raise Exception('prefix not found')
entry.remove(next_hops)
return
-
+
del self._entries[prefix]
def get(self, object_name):
for entry in self._entries.values():
if entry._prefix.object_name == object_name:
- return next(iter(entry._next_hops))
+ return entry._next_hops
+ return None
diff --git a/netmodel/network/flow_table.py b/netmodel/network/flow_table.py
index 99e42e99..2da40d1c 100644
--- a/netmodel/network/flow_table.py
+++ b/netmodel/network/flow_table.py
@@ -16,7 +16,10 @@
# limitations under the License.
#
-from netmodel.model.query import ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE
+import copy
+from collections import defaultdict, Counter
+
+from netmodel.model.query import Query, ACTION_SUBSCRIBE, ACTION_UNSUBSCRIBE
# Per-interface flow table
class SubFlowTable:
@@ -25,21 +28,33 @@ class SubFlowTable:
self._interface = interface
# Flow -> ingress interface
- self._flows = dict()
-
- # Ingress interface -> list of subscription
- self._subscriptions = dict()
+ self._flows = defaultdict(set)
def add(self, packet, ingress_interface):
flow = packet.get_flow()
- self._flows[flow] = ingress_interface
+ self._flows[flow].add(ingress_interface)
def match(self, packet):
flow = packet.get_flow()
return self._flows.get(flow.get_reverse())
+ def _on_interface_delete(self, interface):
+ """
+ Returns:
+ False is the flow table is empty.
+ """
+ to_remove = set()
+ for flow, ingress_interfaces in self._flows.items():
+ ingress_interfaces.discard(interface)
+ if not ingress_interfaces:
+ to_remove.add(flow)
+ for flow in to_remove:
+ del self._flows[flow]
+
+ return len(self._flows) > 0
+
class Subscription:
- def __init__(self, packet, ingress_list, egress_list):
+ def __init__(self, packet, ingress_set, egress_set):
"""
Args:
packet : subscription packet
@@ -47,8 +62,22 @@ class Subscription:
egress_list (List[Interface]) : list of egress interface
"""
self._packet = packet
- self._ingress_list = ingress_list
- self._egress_list = egress_list
+ #self._ingress_set = ingress_set
+ #self._egress_set = egress_set
+
+ def get_tuple(self):
+ return (self._packet,)
+
+ def __eq__(self, other):
+ return self.get_tuple() == other.get_tuple()
+
+ def __hash__(self):
+ return hash(self.get_tuple())
+
+ def __repr__(self):
+ return '<Subscription {}>'.format(self._packet.to_query())
+
+ __str__ = __repr__
class FlowTable:
"""
@@ -76,12 +105,30 @@ class FlowTable:
# The Flow Table also maintains a list of subscriptions doubly indexed
# by both subscriptors, and egress interfaces
- # ingress_interface -> list of subscriptions
- self._ingress_subscriptions = dict()
-
- # egress_interface -> list of subscriptions
- self._egress_subscriptions = dict()
-
+ # ingress_interface -> bag of subscriptions
+ self._ingress_subscriptions = defaultdict(Counter)
+
+ # egress_interface -> bag of subscriptions
+ self._egress_subscriptions = defaultdict(Counter)
+
+ def dump(self, msg=''):
+ print("="*80)
+ print("FLOW TABLE {}".format(msg))
+ print("-" * 80)
+ print("SubFlowTables")
+ for interface, flow_table in self._sub_flow_tables.items():
+ for k, v in flow_table._flows.items():
+ print(interface, "\t", k, "\t", v)
+ print("-" * 80)
+ print("Ingress subscriptions")
+ for interface, subscriptions in self._ingress_subscriptions.items():
+ print(interface, "\t", subscriptions)
+ print("-" * 80)
+ print("Egress subscriptions")
+ for interface, subscriptions in self._egress_subscriptions.items():
+ print(interface, "\t", subscriptions)
+ print("=" * 80)
+ print("")
def match(self, packet, interface):
"""
@@ -95,34 +142,30 @@ class FlowTable:
if not sub_flow_table:
return None
return sub_flow_table.match(packet)
-
- def add(self, packet, ingress_interface, interface):
- sub_flow_table = self._sub_flow_tables.get(interface)
- if not sub_flow_table:
- sub_flow_table = SubFlowTable(interface)
- self._sub_flow_tables[interface] = sub_flow_table
- sub_flow_table.add(packet, ingress_interface)
+
+ def add(self, packet, ingress_interface, interfaces):
+ for interface in interfaces:
+ sub_flow_table = self._sub_flow_tables.get(interface)
+ if not sub_flow_table:
+ sub_flow_table = SubFlowTable(interface)
+ self._sub_flow_tables[interface] = sub_flow_table
+ sub_flow_table.add(packet, ingress_interface)
# If the flow is a subscription, we need to associate it to the list
query = packet.to_query()
if query.action == ACTION_SUBSCRIBE:
# XXX we currently don't merge subscriptions, and assume a single
# next hop interface
- s = Subscription(packet, [ingress_interface], [interface])
+ s = Subscription(packet, set([ingress_interface]), interfaces)
if ingress_interface:
- if not ingress_interface in self._ingress_subscriptions:
- self._ingress_subscriptions[ingress_interface] = list()
- self._ingress_subscriptions[ingress_interface].append(s)
-
- if not interface in self._egress_subscriptions:
- self._egress_subscriptions[interface] = list()
- self._egress_subscriptions[interface].append(s)
+ self._ingress_subscriptions[ingress_interface] += Counter([s])
+ for interface in interfaces:
+ self._egress_subscriptions[interface] += Counter([s])
elif query.action == ACTION_UNSUBSCRIBE:
raise NotImplementedError
-
# Events
def _on_interface_up(self, interface):
@@ -141,9 +184,47 @@ class FlowTable:
"""
Callback: an interface has been deleted
- Cancel all subscriptions that have been issues from
+ Cancel all subscriptions that have been issues from
netmodel.network.interface.
Remove all pointers to subscriptions pending on this interface
"""
+
if interface in self._ingress_subscriptions:
+ # If the interface we delete at the origin of the subscription,
+ # let's also remove corresponding egress subscriptions
+ subs = self._ingress_subscriptions[interface]
+ if not subs:
+ return
+
+ to_remove = set()
+ for _interface, subscriptions in self._egress_subscriptions.items():
+
+ removed = subs & subscriptions
+ if removed:
+ for s in removed:
+ # We found a subscription of this interface on an other
+ # interface; send unsubscribe...
+ action, params = s._packet.payload
+ p = copy.deepcopy(s._packet)
+ p.payload = (ACTION_UNSUBSCRIBE, params)
+ _interface.send(p)
+ # ... and remove them
+ subscriptions -= removed
+
+ # if the interface has no more subscription remove it.
+ if not subscriptions:
+ to_remove.add(_interface)
+
+ for i in to_remove:
+ del self._egress_subscriptions[i]
+
del self._ingress_subscriptions[interface]
+
+ # Remove interface from flow table destination
+ to_remove = set()
+ for _interface, sub_flow_table in self._sub_flow_tables.items():
+ remove = sub_flow_table._on_interface_delete(interface)
+ if not remove:
+ to_remove.add(_interface)
+ for _interface in to_remove:
+ del self._sub_flow_tables[_interface]
diff --git a/netmodel/network/interface.py b/netmodel/network/interface.py
index c9e31422..3bad4c41 100644
--- a/netmodel/network/interface.py
+++ b/netmodel/network/interface.py
@@ -44,7 +44,7 @@ class InterfaceState(enum.Enum):
def register_interfaces():
Interface._factory = dict()
- for loader, module_name, is_pkg in pkgutil.walk_packages(interfaces.__path__,
+ for loader, module_name, is_pkg in pkgutil.walk_packages(interfaces.__path__,
interfaces.__name__ + '.'):
# Note: we cannot skip files using is_pkg otherwise it will ignore
# interfaces defined outside of __init__.py
@@ -95,12 +95,12 @@ class Interface:
self._tx_buffer = list()
self._state = InterfaceState.Down
- self._error = None
+ self._error = None
self._reconnecting = True
self._reconnection_delay = RECONNECTION_DELAY
self._registered_objects = dict()
-
+
# Callbacks
self._up_callbacks = list()
self._down_callbacks = list()
@@ -119,7 +119,7 @@ class Interface:
def __hash__(self):
return hash(self._name)
- #---------------------------------------------------------------------------
+ #---------------------------------------------------------------------------
def register_object(self, obj):
self._registered_objects[obj.__type__] = obj
@@ -127,9 +127,9 @@ class Interface:
def get_prefixes(self):
return [ Prefix(v.__type__) for v in self._registered_objects.values() ]
- #---------------------------------------------------------------------------
+ #---------------------------------------------------------------------------
# State management, callbacks
- #---------------------------------------------------------------------------
+ #---------------------------------------------------------------------------
def set_state(self, state):
asyncio.ensure_future(self._set_state(state))
@@ -170,7 +170,7 @@ class Interface:
for cb, args, kwargs in self._delete_callbacks:
cb(interface, *args, **kwargs)
- #--------------------------------------------------------------------------
+ #--------------------------------------------------------------------------
def set_reconnecting(self, reconnecting):
self._reconnecting = reconnecting
@@ -253,13 +253,13 @@ class Interface:
def receive_impl(self, packet):
ingress_interface = self
cb = self._callback
- if cb is None:
- return
if self._hook:
new_packet = self._hook(packet)
- if new_packet is not None:
+ if cb is not None and new_packet is not None:
cb(new_packet, ingress_interface=ingress_interface)
return
+ if cb is None:
+ return
cb(packet, ingress_interface=ingress_interface)
#--------------------------------------------------------------------------
diff --git a/netmodel/network/packet.py b/netmodel/network/packet.py
index 9552b0e7..7edcccc4 100644
--- a/netmodel/network/packet.py
+++ b/netmodel/network/packet.py
@@ -109,7 +109,7 @@ class VICNTLV:
@staticmethod
def get_type(buf):
- (typelen, ) = struct.unpack(NETMODEL_TLV_TYPELEN_STR,
+ (typelen, ) = struct.unpack(NETMODEL_TLV_TYPELEN_STR,
buf[:NETMODEL_TLV_SIZE])
return (typelen & NETMODEL_TLV_TYPE_MASK) >> NETMODEL_TLV_TYPE_SHIFT
@@ -179,7 +179,7 @@ class VICNTLV:
class ObjectName(VICNTLV): pass
@VICNTLV.set_tlv_type(NETMODEL_TLV_FIELD)
-class Field(VICNTLV):
+class Field(VICNTLV):
"""Field == STR
"""
@@ -217,7 +217,7 @@ class Prefix(Object, Prefix_, VICNTLV):
def __hash__(self):
return hash(self.get_tuple())
-
+
@VICNTLV.set_tlv_type(NETMODEL_TLV_SRC)
class Source(Prefix):
"""Source address
@@ -251,12 +251,12 @@ class Packet(Object, VICNTLV):
source = Source()
destination = Destination(Prefix)
protocol = Protocol(String, default = 'query')
- flags = Flags()
- payload = Payload()
+ flags = Flags(String)
+ payload = Payload(String)
# This should be dispatched across L3 L4 L7
- def __init__(self, source = None, destination = None, protocol = None,
+ def __init__(self, source = None, destination = None, protocol = None,
flags = 0, payload = None):
self.source = source
self.destination = destination
@@ -272,8 +272,8 @@ class Packet(Object, VICNTLV):
packet = Packet()
if src_query:
address = Prefix(
- object_name = src_query.object_name,
- filter = src_query.filter,
+ object_name = src_query.object_name,
+ filter = src_query.filter,
field_names = src_query.field_names,
aggregate = src_query.aggregate)
if reply:
@@ -283,8 +283,8 @@ class Packet(Object, VICNTLV):
if query:
address = Prefix(
- object_name = query.object_name,
- filter = query.filter,
+ object_name = query.object_name,
+ filter = query.filter,
field_names = query.field_names,
aggregate = query.aggregate)
@@ -310,7 +310,7 @@ class Packet(Object, VICNTLV):
field_names = address.field_names
aggregate = address.aggregate
- return Query(action, object_name, filter, params, field_names,
+ return Query(action, object_name, filter, params, field_names,
aggregate = aggregate, last = self.last, reply = self.reply)
@property
@@ -335,6 +335,16 @@ class Packet(Object, VICNTLV):
else:
self.flags &= ~FLAG_REPLY
+ def get_tuple(self):
+ return (self.source, self.destination, self.protocol, self.flags,
+ self.payload)
+
+ def __eq__(self, other):
+ return self.get_tuple() == other.get_tuple()
+
+ def __hash__(self):
+ return hash(self.get_tuple())
+
class ErrorPacket(Packet):
"""
Analog with ICMP errors packets in IP networks
@@ -344,7 +354,7 @@ class ErrorPacket(Packet):
# Constructor
#--------------------------------------------------------------------------
- def __init__(self, type = ERROR, code = ERROR, message = None,
+ def __init__(self, type = ERROR, code = ERROR, message = None,
traceback = None, **kwargs):
assert not traceback or isinstance(traceback, str)
diff --git a/netmodel/network/prefix.py b/netmodel/network/prefix.py
index 00b5db71..d444a56d 100644
--- a/netmodel/network/prefix.py
+++ b/netmodel/network/prefix.py
@@ -17,20 +17,23 @@
#
class Prefix:
- def __init__(self, object_name = None, filter = None, field_names = None,
+ def __init__(self, object_name = None, filter = None, field_names = None,
aggregate = None):
self.object_name = object_name
self.filter = filter
self.field_names = field_names
self.aggregate = aggregate
- def __hash__(self):
- return hash(self.get_tuple())
-
def get_tuple(self):
- return (self.object_name, self.filter, self.field_names,
+ return (self.object_name, self.filter, self.field_names,
self.aggregate)
+ def __eq__(self, other):
+ return self.get_tuple() == other.get_tuple()
+
+ def __hash__(self):
+ return hash(self.get_tuple())
+
def __repr__(self):
return '<Prefix {}>'.format(self.get_tuple())
diff --git a/netmodel/network/router.py b/netmodel/network/router.py
index 84d69dca..871cefe0 100644
--- a/netmodel/network/router.py
+++ b/netmodel/network/router.py
@@ -68,7 +68,7 @@ class Router:
#--------------------------------------------------------------------------
def register_local_collection(self, cls):
- self.get_interface(LOCAL_NAMESPACE).register_collection(cls,
+ self.get_interface(LOCAL_NAMESPACE).register_collection(cls,
LOCAL_NAMESPACE)
def register_collection(self, cls, namespace=None):
@@ -97,7 +97,7 @@ class Router:
interface.set_state(InterfaceState.PendingUp)
for prefix in interface.get_prefixes():
- self._fib.add(prefix, [interface])
+ self._fib.add(prefix, set([interface]))
return interface
@@ -112,8 +112,8 @@ class Router:
def on_interface_up(self, interface):
"""
- This callback is triggered when an interface becomes up.
-
+ This callback is triggered when an interface becomes up.
+
The router will request metadata.
The flow table is notified.
"""
@@ -139,7 +139,7 @@ class Router:
# Public API
#---------------------------------------------------------------------------
- def add_interface(self, interface_type, name=None, namespace=None,
+ def add_interface(self, interface_type, name=None, namespace=None,
**platform_config):
"""
namespace is used to force appending of a namespace to the tables.
@@ -161,7 +161,7 @@ class Router:
interface = interface_cls(self, **platform_config)
except Exception as e:
traceback.print_exc()
- raise Exception("Cannot create interface %s of type %s with parameters %r: %s"
+ raise Exception("Cannot create interface %s of type %s with parameters %r: %s"
% (name, interface_type,
platform_config, e))
self._register_interface(interface, name)
@@ -231,9 +231,10 @@ class Router:
interface flow table
- using the FIB if no match is found
"""
- orig_interface = self._flow_table.match(packet, ingress_interface)
- if orig_interface:
- orig_interface.send(packet)
+ orig_interfaces = self._flow_table.match(packet, ingress_interface)
+ if orig_interfaces:
+ for orig_interface in orig_interfaces:
+ orig_interface.send(packet)
return
if isinstance(packet, str):
@@ -247,11 +248,17 @@ class Router:
return
# Get route from FIB
- interface = self._fib.get(packet.destination.object_name)
- if not interface:
+ if packet.destination is None:
+ log.warning("Ignored reply packet with no match in flow table {}".format(
+ packet.to_query()))
+ return
+ interfaces = self._fib.get(packet.destination.object_name)
+ if not interfaces:
+ log.error('No match in FIB for {}'.format(
+ packet.destination.object_name))
return
# Update flow table before sending
- self._flow_table.add(packet, ingress_interface, interface)
-
- interface.send(packet)
+ self._flow_table.add(packet, ingress_interface, interfaces)
+ for interface in interfaces:
+ interface.send(packet)