aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python')
-rw-r--r--resources/libraries/python/LispSetup.py483
-rw-r--r--resources/libraries/python/LispUtil.py427
2 files changed, 110 insertions, 800 deletions
diff --git a/resources/libraries/python/LispSetup.py b/resources/libraries/python/LispSetup.py
index 666b6e636d..59cb1a808d 100644
--- a/resources/libraries/python/LispSetup.py
+++ b/resources/libraries/python/LispSetup.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016-2019 Cisco and/or its affiliates.
+# Copyright (c) 2016-2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -13,11 +13,66 @@
"""Library to set up Lisp in topology."""
+from enum import IntEnum
+
from ipaddress import ip_address
-from resources.libraries.python.L2Util import L2Util
+from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-from resources.libraries.python.topology import NodeType
+
+
+class EidType(IntEnum):
+ """EID types."""
+ PREFIX = 0
+ MAC = 1
+ NSH = 2
+
+
+class LispEid:
+ """Class for lisp eid."""
+
+ @staticmethod
+ def create_eid(eid, prefix_len):
+ """Create lisp eid object.
+
+ :param eid: Eid value.
+ :param prefix_len: Prefix len if the eid is IP address.
+ :type eid: str
+ :type prefix_len: int
+ """
+ eid_addr = dict(prefix=IPUtil.create_prefix_object(
+ ip_address(eid), prefix_len)
+ ) if prefix_len else dict(mac=str(eid))
+
+ return dict(
+ type=getattr(
+ EidType, u"PREFIX" if prefix_len else u"MAC"
+ ).value,
+ address=eid_addr
+ )
+
+
+class LispRemoteLocator:
+ """Class for lisp remote locator."""
+
+ @staticmethod
+ def create_rloc(ip_addr, priority=0, weight=0):
+ """Create lisp remote locator object.
+
+ :param ip_addr: IP/IPv6 address.
+ :param priority: Priority of the remote locator.
+ :param weight: Weight of the remote locator.
+ :type ip_addr: str
+ :type priority: int
+ :type weight: int
+ """
+ return [
+ dict(
+ priority=priority,
+ weight=weight,
+ ip_address=ip_address(ip_addr)
+ )
+ ]
class LispStatus:
@@ -32,14 +87,14 @@ class LispStatus:
:type node: dict
:type state: str
"""
- args = dict(is_en=0 if state == u"disable" else 1)
-
+ args = dict(is_enable=bool(state == u"enable"))
cmd = u"lisp_enable_disable"
err_msg = f"Failed to set LISP status on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+
class LispRemoteMapping:
"""Class for lisp remote mapping API."""
@@ -66,39 +121,15 @@ class LispRemoteMapping:
:type rloc: str
:type is_mac: bool
"""
- if not is_mac:
- eid_type = 0 if ip_address(deid).version == 4 else 1
- eid_packed = ip_address(deid).packed
- seid_packed = ip_address(seid).packed
- eid_len = deid_prefix
- seid_len = seid_prefix
- else:
- eid_type = 2
- eid_packed = L2Util.mac_to_bin(deid)
- seid_packed = L2Util.mac_to_bin(seid)
- eid_len = 0
- seid_len = 0
-
- rlocs = [
- dict(
- is_ip4=1 if ip_address(rloc).version == 4 else 0,
- addr=ip_address(rloc).packed
- )
- ]
-
args = dict(
- is_add=1,
- is_src_dst=1,
+ is_add=True,
+ is_src_dst=True,
vni=int(vni),
- eid_type=eid_type,
- eid=eid_packed,
- eid_len=eid_len,
- seid=seid_packed,
- seid_len=seid_len,
+ deid=LispEid.create_eid(deid, deid_prefix if not is_mac else None),
+ seid=LispEid.create_eid(seid, seid_prefix if not is_mac else None),
rloc_num=1,
- rlocs=rlocs
+ rlocs=LispRemoteLocator.create_rloc(rloc)
)
-
cmd = u"lisp_add_del_remote_mapping"
err_msg = f"Failed to add remote mapping on host {node[u'host']}"
@@ -126,47 +157,22 @@ class LispRemoteMapping:
:type rloc: str
"""
# used only with IPs
- is_mac = False
-
- if not is_mac:
- eid_type = 0 if ip_address(deid).version == 4 else 1
- eid_packed = ip_address(deid).packed
- seid_packed = ip_address(seid).packed
- eid_len = deid_prefix
- seid_len = seid_prefix
- else:
- eid_type = 2
- eid_packed = L2Util.mac_to_bin(deid)
- seid_packed = L2Util.mac_to_bin(seid)
- eid_len = 0
- seid_len = 0
-
- rlocs = [
- dict(
- is_ip4=1 if ip_address(str(rloc)).version == 4 else 0,
- addr=ip_address(str(rloc)).packed
- )
- ]
-
args = dict(
- is_add=0,
- is_src_dst=1,
+ is_add=False,
+ is_src_dst=True,
vni=int(vni),
- eid_type=eid_type,
- eid=eid_packed,
- eid_len=eid_len,
- seid=seid_packed,
- seid_len=seid_len,
+ deid=LispEid.create_eid(deid, deid_prefix),
+ seid=LispEid.create_eid(seid, seid_prefix),
rloc_num=1,
- rlocs=rlocs
+ rlocs=LispRemoteLocator.create_rloc(rloc)
)
-
cmd = u"lisp_add_del_remote_mapping"
err_msg = f"Failed to delete remote mapping on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+
class LispAdjacency:
"""Class for lisp adjacency API."""
@@ -190,29 +196,12 @@ class LispAdjacency:
:type seid_prefix: int
:type is_mac: bool
"""
- if not is_mac:
- eid_type = 0 if ip_address(deid).version == 4 else 1
- reid = ip_address(deid).packed
- leid = ip_address(seid).packed
- reid_len = deid_prefix
- leid_len = seid_prefix
- else:
- eid_type = 2
- reid = L2Util.mac_to_bin(deid)
- leid = L2Util.mac_to_bin(seid)
- reid_len = 0
- leid_len = 0
-
args = dict(
- is_add=1,
+ is_add=True,
vni=int(vni),
- eid_type=eid_type,
- reid=reid,
- reid_len=reid_len,
- leid=leid,
- leid_len=leid_len
+ reid=LispEid.create_eid(deid, deid_prefix if not is_mac else None),
+ leid=LispEid.create_eid(seid, seid_prefix if not is_mac else None)
)
-
cmd = u"lisp_add_del_adjacency"
err_msg = f"Failed to add lisp adjacency on host {node[u'host']}"
@@ -238,39 +227,21 @@ class LispAdjacency:
:type seid_prefix: int
"""
# used only with IPs
- is_mac = False
-
- if not is_mac:
- eid_type = 0 if ip_address(deid).version == 4 else 1
- reid = ip_address(deid).packed
- leid = ip_address(seid).packed
- reid_len = deid_prefix
- leid_len = seid_prefix
- else:
- eid_type = 2
- reid = L2Util.mac_to_bin(deid)
- leid = L2Util.mac_to_bin(seid)
- reid_len = 0
- leid_len = 0
-
args = dict(
- is_add=0,
+ is_add=False,
vni=int(vni),
- eid_type=eid_type,
- reid=reid,
- reid_len=reid_len,
- leid=leid,
- leid_len=leid_len
+ eid=LispEid.create_eid(deid, deid_prefix),
+ leid=LispEid.create_eid(seid, seid_prefix)
)
-
cmd = u"lisp_add_del_adjacency"
err_msg = f"Failed to delete lisp adjacency on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+
class LispGpeStatus:
- """Clas for LISP GPE status manipulation."""
+ """Class for LISP GPE status manipulation."""
@staticmethod
def vpp_lisp_gpe_enable_disable(node, state):
@@ -281,76 +252,13 @@ class LispGpeStatus:
:type node: dict
:type state: str
"""
- args = dict(is_en=0 if state == u"disable" else 1)
-
+ args = dict(is_enable=bool(state == u"enable"))
cmd = u"gpe_enable_disable"
err_msg = f"Failed to set LISP GPE status on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
-class LispGpeForwardEntry:
- """The functionality needed for these methods is not implemented in VPP
- (VAT). Bug https://jira.fd.io/browse/VPP-334 was open to cover this issue.
-
- TODO: Implement when VPP-334 is fixed.
- """
-
- @staticmethod
- def add_lisp_gpe_forward_entry(node, *args):
- """Not implemented"""
- # TODO: Implement when VPP-334 is fixed.
-
- @staticmethod
- def del_lisp_gpe_forward_entry(node, *args):
- """Not implemented"""
- # TODO: Implement when VPP-334 is fixed.
-
-
-class LispMapResolver:
- """Class for Lisp map resolver API."""
-
- @staticmethod
- def vpp_add_map_resolver(node, map_resolver_ip):
- """Set lisp map resolver on the VPP node in topology.
-
- :param node: VPP node.
- :param map_resolver_ip: IP address of the map resolver.
- :type node: dict
- :type map_resolver_ip: str
- """
- args = dict(
- is_add=1,
- is_ipv6=0 if ip_address(map_resolver_ip).version == 4 else 1,
- ip_address=ip_address(map_resolver_ip).packed
- )
-
- cmd = u"lisp_add_del_map_resolver"
- err_msg = f"Failed to add map resolver on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_reply(err_msg)
-
- @staticmethod
- def vpp_del_map_resolver(node, map_resolver_ip):
- """Unset lisp map resolver on the VPP node in topology.
-
- :param node: VPP node.
- :param map_resolver_ip: IP address of the map resolver.
- :type node: dict
- :type map_resolver_ip: str
- """
- args = dict(
- is_add=0,
- is_ipv6=0 if ip_address(map_resolver_ip).version == 4 else 1,
- ip_address=ip_address(map_resolver_ip).packed
- )
-
- cmd = u"lisp_add_del_map_resolver"
- err_msg = f"Failed to delete map resolver on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- papi_exec.add(cmd, **args).get_reply(err_msg)
class LispLocalEid:
"""Class for Lisp local eid API."""
@@ -362,31 +270,21 @@ class LispLocalEid:
:param node: VPP node.
:param locator_set_name: Name of the locator_set.
- :param vni: vni value.
+ :param vni: Vni value.
:param eid: Eid value.
- :param prefix_len: prefix len if the eid is IP address.
+ :param prefix_len: Prefix len if the eid is IP address.
:type node: dict
:type locator_set_name: str
:type vni: int
:type eid: str
:type prefix_len: int
"""
- if prefix_len:
- eid_type = 0 if ip_address(eid).version == 4 else 1
- eid_packed = ip_address(eid).packed
- else:
- eid_type = 2
- eid_packed = L2Util.mac_to_bin(eid)
-
args = dict(
- is_add=1,
- eid_type=eid_type,
- eid=eid_packed,
- prefix_len=prefix_len,
+ is_add=True,
+ eid=LispEid.create_eid(eid, prefix_len),
locator_set_name=locator_set_name,
vni=int(vni)
)
-
cmd = u"lisp_add_del_local_eid"
err_msg = f"Failed to add local eid on host {node[u'host']}"
@@ -396,41 +294,32 @@ class LispLocalEid:
@staticmethod
def vpp_del_lisp_local_eid(
node, locator_set_name, vni, eid, prefix_len=None):
- """Set lisp eid addres on the VPP node in topology.
+ """Set lisp eid address on the VPP node in topology.
:param node: VPP node.
:param locator_set_name: Name of the locator_set.
- :param vni: vni value.
+ :param vni: Vni value.
:param eid: Eid value.
- :param prefix_len: prefix len if the eid is IP address.
+ :param prefix_len: Prefix len if the eid is IP address.
:type node: dict
:type locator_set_name: str
:type vni: int
:type eid: str
:type prefix_len: int
"""
- if prefix_len:
- eid_type = 0 if ip_address(eid).version == 4 else 1
- eid_packed = ip_address(eid).packed
- else:
- eid_type = 2
- eid_packed = L2Util.mac_to_bin(eid)
-
args = dict(
- is_add=0,
- eid_type=eid_type,
- eid=eid_packed,
- prefix_len=prefix_len,
+ is_add=False,
+ eid=LispEid.create_eid(eid, prefix_len),
locator_set_name=locator_set_name,
vni=int(vni)
)
-
cmd = u"lisp_add_del_local_eid"
err_msg = f"Failed to delete local eid on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+
class LispLocator:
"""Class for the Lisp Locator API."""
@@ -440,9 +329,9 @@ class LispLocator:
:param node: VPP node.
:param locator_name: Name of the locator_set.
- :param sw_if_index: sw_if_index if the interface.
- :param priority: priority of the locator.
- :param weight: weight of the locator.
+ :param sw_if_index: Sw_if_index if the interface.
+ :param priority: Priority of the locator.
+ :param weight: Weight of the locator.
:type node: dict
:type locator_name: str
:type sw_if_index: int
@@ -451,13 +340,12 @@ class LispLocator:
"""
args = dict(
- is_add=1,
+ is_add=True,
locator_set_name=locator_name,
sw_if_index=sw_if_index,
priority=priority,
weight=weight
)
-
cmd = u"lisp_add_del_locator"
err_msg = f"Failed to add locator on host {node[u'host']}"
@@ -470,9 +358,9 @@ class LispLocator:
:param node: VPP node.
:param locator_name: Name of the locator_set.
- :param sw_if_index: sw_if_index if the interface.
- :param priority: priority of the locator.
- :param weight: weight of the locator.
+ :param sw_if_index: Sw_if_index if the interface.
+ :param priority: Priority of the locator.
+ :param weight: Weight of the locator.
:type node: dict
:type locator_name: str
:type sw_if_index: int
@@ -480,19 +368,19 @@ class LispLocator:
:type weight: int
"""
args = dict(
- is_add=0,
+ is_add=False,
locator_set_name=locator_name,
sw_if_index=sw_if_index,
priority=priority,
weight=weight
)
-
cmd = u"lisp_add_del_locator"
err_msg = f"Failed to delete locator on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
+
class LispLocatorSet:
"""Class for Lisp Locator Set API."""
@@ -506,12 +394,11 @@ class LispLocatorSet:
:type name: str
"""
args = dict(
- is_add=1,
+ is_add=True,
locator_set_name=name,
locator_num=0,
locators=[]
)
-
cmd = u"lisp_add_del_locator_set"
err_msg = f"Failed to add locator set on host {node[u'host']}"
@@ -528,154 +415,17 @@ class LispLocatorSet:
:type name: str
"""
args = dict(
- is_add=0,
+ is_add=False,
locator_set_name=name,
locator_num=0,
locators=[]
)
-
cmd = u"lisp_add_del_locator_set"
err_msg = f"Failed to delete locator set on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
-class LispSetup:
- """Lisp setup in topology."""
-
- @staticmethod
- def vpp_set_lisp_locator_set(node, locator_set_list):
- """Set lisp locator_sets on VPP node in topology.
-
- :param node: VPP node.
- :param locator_set_list: List of locator_set.
- :type node: dict
- :type locator_set_list: list
- """
-
- if node[u"type"] != NodeType.DUT:
- raise ValueError(u"Node is not DUT")
-
- lisp_locator = LispLocator()
- lisp_locator_set = LispLocatorSet()
- for locator_set in locator_set_list:
- locator_set_name = locator_set.get(u"locator-set")
- locator_list = locator_set.get(u"locator")
- lisp_locator_set.vpp_add_lisp_locator_set(node, locator_set_name)
- for locator in locator_list:
- sw_if_index = locator.get(u"locator-index")
- priority = locator.get(u"priority")
- weight = locator.get(u"weight")
- lisp_locator.vpp_add_lisp_locator(
- node, locator_set_name, sw_if_index, priority, weight
- )
-
- @staticmethod
- def vpp_unset_lisp_locator_set(node, locator_set_list):
- """Unset lisp locator_sets on VPP node in topology.
-
- :param node: VPP node.
- :param locator_set_list: List of locator_set.
- :type node: dict
- :type locator_set_list: list
- """
- if node[u"type"] != NodeType.DUT:
- raise ValueError(u"Lisp locator set, node is not DUT")
-
- lisp_locator = LispLocator()
- lisp_locator_set = LispLocatorSet()
- for locator_set in locator_set_list:
- locator_set_name = locator_set.get(u"locator-set")
- locator_list = locator_set.get(u"locator")
- for locator in locator_list:
- sw_if_index = locator.get(u"locator-index")
- priority = locator.get(u"priority")
- weight = locator.get(u"weight")
- lisp_locator.vpp_del_lisp_locator(
- node, locator_set_name, sw_if_index, priority, weight
- )
-
- lisp_locator_set.vpp_del_lisp_locator_set(node, locator_set_name)
-
- @staticmethod
- def vpp_set_lisp_eid_table(node, eid_table):
- """Set lisp eid tables on VPP node in topology.
-
- :param node: VPP node.
- :param eid_table: Dictionary containing information of eid_table.
- :type node: dict
- :type eid_table: dict
- """
- if node[u"type"] != NodeType.DUT:
- raise ValueError(u"Node is not DUT")
-
- lisp_locator_set = LispLocatorSet()
- lisp_eid = LispLocalEid()
- for eid in eid_table:
- vni = eid.get(u"vni")
- eid_address = eid.get(u"eid")
- eid_prefix_len = eid.get(u"eid-prefix-len")
- locator_set_name = eid.get(u"locator-set")
- lisp_locator_set.vpp_add_lisp_locator_set(node, locator_set_name)
- lisp_eid.vpp_add_lisp_local_eid(
- node, locator_set_name, vni, eid_address, eid_prefix_len
- )
-
- @staticmethod
- def vpp_unset_lisp_eid_table(node, eid_table):
- """Unset lisp eid tables on VPP node in topology.
-
- :param node: VPP node.
- :param eid_table: Dictionary containing information of eid_table.
- :type node: dict
- :type eid_table: dict
- """
- if node[u"type"] != NodeType.DUT:
- raise ValueError(u"Node is not DUT")
-
- locator_set_list = []
- lisp_locator_set = LispLocatorSet()
- lisp_eid = LispLocalEid()
- for eid in eid_table:
- vni = eid.get(u"vni")
- eid_address = eid.get(u"eid")
- eid_prefix_len = eid.get(u"eid-prefix-len")
- locator_set_name = eid.get(u"locator-set")
- if locator_set_name not in locator_set_list:
- locator_set_list.append(locator_set_name)
-
- lisp_eid.vpp_del_lisp_local_eid(
- node, locator_set_name, vni, eid_address, eid_prefix_len
- )
-
- for locator_set_name in locator_set_list:
- lisp_locator_set.vpp_del_lisp_locator_set(node, locator_set_name)
-
- @staticmethod
- def vpp_set_lisp_map_resolver(node, map_resolver):
- """Set lisp map resolvers on VPP node in topology.
-
- :param node: VPP node.
- :param map_resolver: Dictionary containing information of map resolver.
- :type node: dict
- :type map_resolver: dict
- """
- lisp_map_res = LispMapResolver()
- for map_ip in map_resolver:
- lisp_map_res.vpp_add_map_resolver(node, map_ip.get(u"map resolver"))
-
- @staticmethod
- def vpp_unset_lisp_map_resolver(node, map_resolver):
- """Unset lisp map resolvers on VPP node in topology.
-
- :param node: VPP node.
- :param map_resolver: Dictionary containing information of map resolver.
- :type node: dict
- :type map_resolver: dict
- """
- lisp_map_res = LispMapResolver()
- for map_ip in map_resolver:
- lisp_map_res.vpp_del_map_resolver(node, map_ip.get(u"map resolver"))
class LispEidTableMap:
"""
@@ -697,28 +447,15 @@ class LispEidTableMap:
:type vrf: int
"""
# adding default mapping vni=0, vrf=0 needs to be skipped
- skip = False
-
- if bd_id:
- is_l2 = 1
- dp_table = bd_id
- else:
- is_l2 = 0
- dp_table = vrf
- # skip adding default mapping
- if (int(vrf) == 0) and (int(vni) == 0):
- skip = True
-
- args = dict(
- is_add=1,
- vni=int(vni),
- dp_table=int(dp_table),
- is_l2=is_l2
- )
-
- cmd = u"lisp_eid_table_add_del_map"
- err_msg = f"Failed to add eid table map on host {node[u'host']}"
+ if bd_id is not None or int(vrf if vrf else 0) or int(vni):
+ args = dict(
+ is_add=True,
+ vni=int(vni),
+ dp_table=int(bd_id) if bd_id is not None else int(vrf),
+ is_l2=bool(bd_id is not None)
+ )
+ cmd = u"lisp_eid_table_add_del_map"
+ err_msg = f"Failed to add eid table map on host {node[u'host']}"
- if not skip:
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py
deleted file mode 100644
index a77f4ad854..0000000000
--- a/resources/libraries/python/LispUtil.py
+++ /dev/null
@@ -1,427 +0,0 @@
-# Copyright (c) 2019 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Lisp utilities library."""
-
-from ipaddress import IPv4Address, IPv6Address
-from robot.api import logger
-
-from resources.libraries.python.L2Util import L2Util
-from resources.libraries.python.PapiExecutor import PapiSocketExecutor
-from resources.libraries.python.topology import Topology
-
-class LispUtil:
- """Implements keywords for Lisp tests."""
-
- @staticmethod
- def vpp_show_lisp_state(node):
- """Get lisp state from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: Lisp gpe state.
- :rtype: dict
- """
- cmd = u"show_lisp_status"
- err_msg = f"Failed to get LISP status on host {node['host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \
- else u"disabled"
- data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \
- else u"disabled"
- return data
-
- @staticmethod
- def vpp_show_lisp_locator_set(node, items_filter):
- """Get lisp locator_set from VPP node.
-
- :param node: VPP node.
- :param items_filter: Filter which specifies which items should be
- retrieved - local, remote, empty string = both.
- :type node: dict
- :type items_filter: str
- :returns: Lisp locator_set data as python list.
- :rtype: list
- """
- ifilter = {u"_": 0, u"_local": 1, u"_remote": 2}
- args = dict(
- filter=ifilter[u"_" + items_filter]
- )
-
- cmd = u"lisp_locator_set_dump"
- err_msg = f"Failed to get LISP locator set on host {node['host']}"
-
- try:
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd, **args).get_details(err_msg)
- data = list()
- for locator in details:
- data.append(
- {u"ls_name": locator[u"ls_name"].rstrip(b'\0'),
- u"ls_index": locator[u"ls_index"]}
- )
- return data
- except (ValueError, LookupError) as err:
- logger.warn(f"Failed to get LISP locator set {err}")
- return list()
-
- @staticmethod
- def vpp_show_lisp_eid_table(node):
- """Get lisp eid table from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: Lisp eid table as python list.
- :rtype: list
- """
- cmd = u"lisp_eid_table_dump"
- err_msg = f"Failed to get LISP eid table on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd).get_details(err_msg)
-
- data = list()
- for eid_details in details:
- eid = u"Bad eid type"
- if eid_details[u"eid_type"] == 0:
- prefix = str(eid_details[u"eid_prefix_len"])
- eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \
- prefix
- elif eid_details[u"eid_type"] == 1:
- prefix = str(eid_details[u"eid_prefix_len"])
- eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix
- elif eid_details[u"eid_type"] == 2:
- eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6]))
- data.append(
- {
- u"action": eid_details[u"action"],
- u"is_local": eid_details[u"is_local"],
- u"eid": eid,
- u"vni": eid_details[u"vni"],
- u"ttl": eid_details[u"ttl"],
- u"authoritative": eid_details[u"authoritative"]
- }
- )
- return data
-
- @staticmethod
- def vpp_show_lisp_map_resolver(node):
- """Get lisp map resolver from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: Lisp map resolver as python list.
- :rtype: list
- """
- cmd = u"lisp_map_resolver_dump"
- err_msg = f"Failed to get LISP map resolver on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd).get_details(err_msg)
-
- data = list()
- for resolver in details:
- address = u"Bad is_ipv6 flag"
- if resolver[u"is_ipv6"] == 0:
- address = str(IPv4Address(resolver[u"ip_address"][0:4]))
- elif resolver[u"is_ipv6"] == 1:
- address = str(IPv6Address(resolver[u"ip_address"]))
- data.append({u"map resolver": address})
- return data
-
- @staticmethod
- def vpp_show_lisp_map_register(node):
- """Get LISP Map Register from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: LISP Map Register as python dict.
- :rtype: dict
- """
- cmd = u"show_lisp_map_register_state"
- err_msg = f"Failed to get LISP map register state on host " \
- f"{node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
- logger.info(data)
- return data
-
- @staticmethod
- def vpp_show_lisp_map_request_mode(node):
- """Get LISP Map Request mode from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: LISP Map Request mode as python dict.
- :rtype: dict
- """
- cmd = u"show_lisp_map_request_mode"
- err_msg = f"Failed to get LISP map request mode on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \
- else u"dst-only"
- logger.info(data)
- return data
-
- @staticmethod
- def vpp_show_lisp_map_server(node):
- """Get LISP Map Server from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: LISP Map Server as python list.
- :rtype: list
- """
- cmd = u"lisp_map_server_dump"
- err_msg = f"Failed to get LISP map server on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- details = papi_exec.add(cmd).get_details(err_msg)
-
- data = list()
- for server in details:
- address = u"Bad is_ipv6 flag"
- if server[u"is_ipv6"] == 0:
- address = str(IPv4Address(server[u"ip_address"][0:4]))
- elif server[u"is_ipv6"] == 1:
- address = str(IPv6Address(server[u"ip_address"]))
- data.append({u"map-server": address})
- logger.info(data)
- return data
-
- @staticmethod
- def vpp_show_lisp_petr_config(node):
- """Get LISP PETR configuration from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: LISP PETR configuration as python dict.
- :rtype: dict
- """
- # Note: VAT is returning ipv6 address instead of ipv4
- cmd = u"show_lisp_use_petr"
- err_msg = f"Failed to get LISP petr config on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
- address = u"Bad is_ip4 flag"
- if reply[u"is_ip4"] == 0:
- address = str(IPv6Address(reply[u"address"]))
- elif reply[u"is_ip4"] == 1:
- address = str(IPv4Address(reply[u"address"][0:4]))
- data[u"address"] = address
- logger.info(data)
- return data
-
- @staticmethod
- def vpp_show_lisp_rloc_config(node):
- """Get LISP RLOC configuration from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: LISP RLOC configuration as python dict.
- :rtype: dict
- """
- cmd = u"show_lisp_rloc_probe_state"
- err_msg = f"Failed to get LISP rloc config on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
- logger.info(data)
- return data
-
- @staticmethod
- def vpp_show_lisp_pitr(node):
- """Get Lisp PITR feature config from VPP node.
-
- :param node: VPP node.
- :type node: dict
- :returns: Lisp PITR config data.
- :rtype: dict
- """
- cmd = u"show_lisp_pitr"
- err_msg = f"Failed to get LISP pitr on host {node[u'host']}"
-
- with PapiSocketExecutor(node) as papi_exec:
- reply = papi_exec.add(cmd).get_reply(err_msg)
-
- data = dict()
- data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
- return data
-
- @staticmethod
- def lisp_should_be_equal(lisp_val1, lisp_val2):
- """Fail if the lisp values are not equal.
-
- :param lisp_val1: First lisp value.
- :param lisp_val2: Second lisp value.
- :type lisp_val1: list
- :type lisp_val2: list
- """
- len1 = len(lisp_val1)
- len2 = len(lisp_val2)
-
- if len1 != len2:
- raise RuntimeError(
- f"Values are not same. Value 1 {lisp_val1} \n"
- f"Value 2 {lisp_val2}."
- )
-
- for tmp in lisp_val1:
- if tmp not in lisp_val2:
- raise RuntimeError(
- f"Value {tmp} not found in vpp:\n{lisp_val2}"
- )
-
- def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
- """Fail if the lisp values are not equal.
-
- :param locator_set1: Generate lisp value.
- :param locator_set2: Lisp value from VPP.
- :type locator_set1: list
- :type locator_set2: list
- """
- locator_set_list = list()
-
- for item in locator_set1:
- if item not in locator_set_list:
- locator_set_list.append(item)
-
- self.lisp_should_be_equal(locator_set_list, locator_set2)
-
- @staticmethod
- def generate_unique_lisp_locator_set_data(node, locator_set_number):
- """Generate a list of lisp locator_set we want set to VPP and
- then check if it is set correctly. All locator_sets are unique.
-
- :param node: VPP node.
- :param locator_set_number: Generate n locator_set.
- :type node: dict
- :type locator_set_number: str
- :returns: list of lisp locator_set, list of lisp locator_set expected
- from VAT.
- :rtype: tuple
- """
- topo = Topology()
- locator_set_list = list()
- locator_set_list_vat = list()
-
- i = 0
- for num in range(0, int(locator_set_number)):
- locator_list = list()
- for interface in list(node[u"interfaces"].values()):
- link = interface.get(u"link")
- i += 1
- if link is None:
- continue
-
- if_name = topo.get_interface_by_link_name(node, link)
- sw_if_index = topo.get_interface_sw_index(node, if_name)
- if if_name is not None:
- locator = {
- u"locator-index": sw_if_index,
- u"priority": i,
- u"weight": i
- }
- locator_list.append(locator)
-
- l_name = f"ls{num}"
- locator_set = {
- u"locator-set": l_name,
- u"locator": locator_list
- }
- locator_set_list.append(locator_set)
-
- locator_set_vat = {
- u"ls_name": l_name,
- u"ls_index": num
- }
- locator_set_list_vat.append(locator_set_vat)
-
- return locator_set_list, locator_set_list_vat
-
- @staticmethod
- def generate_duplicate_lisp_locator_set_data(node, locator_set_number):
- """Generate a list of lisp locator_set we want set to VPP and
- then check if it is set correctly. Some locator_sets are duplicated.
-
- :param node: VPP node.
- :param locator_set_number: Generate n locator_set.
- :type node: dict
- :type locator_set_number: str
- :returns: list of lisp locator_set, list of lisp locator_set expected
- from VAT.
- :rtype: tuple
- """
- topo = Topology()
- locator_set_list = list()
- locator_set_list_vat = list()
-
- i = 0
- for num in range(0, int(locator_set_number)):
- locator_list = []
- for interface in list(node[u"interfaces"].values()):
- link = interface.get(u"link")
- i += 1
- if link is None:
- continue
-
- if_name = topo.get_interface_by_link_name(node, link)
- sw_if_index = topo.get_interface_sw_index(node, if_name)
- if if_name is not None:
- l_name = f"ls{num}"
- locator = {
- u"locator-index": sw_if_index,
- u"priority": i,
- u"weight": i
- }
- locator_list.append(locator)
- locator_set = {
- u"locator-set": l_name,
- u"locator": locator_list
- }
- locator_set_list.append(locator_set)
-
- locator_set_vat = {
- u"ls_name": l_name,
- u"ls_index": num
- }
- locator_set_list_vat.append(locator_set_vat)
-
- return locator_set_list, locator_set_list_vat
-
- def lisp_is_empty(self, lisp_params):
- """Check if the input param are empty.
-
- :param lisp_params: Should be empty list.
- :type lisp_params: list
- """
- self.lisp_should_be_equal([], lisp_params)