diff options
author | Jan Gelety <jgelety@cisco.com> | 2020-03-31 02:59:44 +0200 |
---|---|---|
committer | Jan Gelety <jgelety@cisco.com> | 2020-04-28 15:16:18 +0200 |
commit | 99b2e6a806652ea5e55f98f0a9fe31c45e6466d6 (patch) | |
tree | 758e91721724efc7daf9b07f2372ae3a637f9277 /resources/libraries/python | |
parent | de3a83186108b8f217ad2f313f05fd04f7c0ddf7 (diff) |
CSIT-1597 API cleanup: lisp
- cover API changes in VPP: https://gerrit.fd.io/r/c/vpp/+/24663
- update vpp stable to version 20.05-rc0~637
- remove unused L1 and L2 lisp KWs
Change-Id: I2672b6a375ad70c82f331dcc991c145e868108b9
Signed-off-by: Jan Gelety <jgelety@cisco.com>
Diffstat (limited to 'resources/libraries/python')
-rw-r--r-- | resources/libraries/python/LispSetup.py | 483 | ||||
-rw-r--r-- | resources/libraries/python/LispUtil.py | 427 |
2 files changed, 110 insertions, 800 deletions
diff --git a/resources/libraries/python/LispSetup.py b/resources/libraries/python/LispSetup.py index 666b6e636d..59cb1a808d 100644 --- a/resources/libraries/python/LispSetup.py +++ b/resources/libraries/python/LispSetup.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2019 Cisco and/or its affiliates. +# Copyright (c) 2016-2020 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,11 +13,66 @@ """Library to set up Lisp in topology.""" +from enum import IntEnum + from ipaddress import ip_address -from resources.libraries.python.L2Util import L2Util +from resources.libraries.python.IPUtil import IPUtil from resources.libraries.python.PapiExecutor import PapiSocketExecutor -from resources.libraries.python.topology import NodeType + + +class EidType(IntEnum): + """EID types.""" + PREFIX = 0 + MAC = 1 + NSH = 2 + + +class LispEid: + """Class for lisp eid.""" + + @staticmethod + def create_eid(eid, prefix_len): + """Create lisp eid object. + + :param eid: Eid value. + :param prefix_len: Prefix len if the eid is IP address. + :type eid: str + :type prefix_len: int + """ + eid_addr = dict(prefix=IPUtil.create_prefix_object( + ip_address(eid), prefix_len) + ) if prefix_len else dict(mac=str(eid)) + + return dict( + type=getattr( + EidType, u"PREFIX" if prefix_len else u"MAC" + ).value, + address=eid_addr + ) + + +class LispRemoteLocator: + """Class for lisp remote locator.""" + + @staticmethod + def create_rloc(ip_addr, priority=0, weight=0): + """Create lisp remote locator object. + + :param ip_addr: IP/IPv6 address. + :param priority: Priority of the remote locator. + :param weight: Weight of the remote locator. + :type ip_addr: str + :type priority: int + :type weight: int + """ + return [ + dict( + priority=priority, + weight=weight, + ip_address=ip_address(ip_addr) + ) + ] class LispStatus: @@ -32,14 +87,14 @@ class LispStatus: :type node: dict :type state: str """ - args = dict(is_en=0 if state == u"disable" else 1) - + args = dict(is_enable=bool(state == u"enable")) cmd = u"lisp_enable_disable" err_msg = f"Failed to set LISP status on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) + class LispRemoteMapping: """Class for lisp remote mapping API.""" @@ -66,39 +121,15 @@ class LispRemoteMapping: :type rloc: str :type is_mac: bool """ - if not is_mac: - eid_type = 0 if ip_address(deid).version == 4 else 1 - eid_packed = ip_address(deid).packed - seid_packed = ip_address(seid).packed - eid_len = deid_prefix - seid_len = seid_prefix - else: - eid_type = 2 - eid_packed = L2Util.mac_to_bin(deid) - seid_packed = L2Util.mac_to_bin(seid) - eid_len = 0 - seid_len = 0 - - rlocs = [ - dict( - is_ip4=1 if ip_address(rloc).version == 4 else 0, - addr=ip_address(rloc).packed - ) - ] - args = dict( - is_add=1, - is_src_dst=1, + is_add=True, + is_src_dst=True, vni=int(vni), - eid_type=eid_type, - eid=eid_packed, - eid_len=eid_len, - seid=seid_packed, - seid_len=seid_len, + deid=LispEid.create_eid(deid, deid_prefix if not is_mac else None), + seid=LispEid.create_eid(seid, seid_prefix if not is_mac else None), rloc_num=1, - rlocs=rlocs + rlocs=LispRemoteLocator.create_rloc(rloc) ) - cmd = u"lisp_add_del_remote_mapping" err_msg = f"Failed to add remote mapping on host {node[u'host']}" @@ -126,47 +157,22 @@ class LispRemoteMapping: :type rloc: str """ # used only with IPs - is_mac = False - - if not is_mac: - eid_type = 0 if ip_address(deid).version == 4 else 1 - eid_packed = ip_address(deid).packed - seid_packed = ip_address(seid).packed - eid_len = deid_prefix - seid_len = seid_prefix - else: - eid_type = 2 - eid_packed = L2Util.mac_to_bin(deid) - seid_packed = L2Util.mac_to_bin(seid) - eid_len = 0 - seid_len = 0 - - rlocs = [ - dict( - is_ip4=1 if ip_address(str(rloc)).version == 4 else 0, - addr=ip_address(str(rloc)).packed - ) - ] - args = dict( - is_add=0, - is_src_dst=1, + is_add=False, + is_src_dst=True, vni=int(vni), - eid_type=eid_type, - eid=eid_packed, - eid_len=eid_len, - seid=seid_packed, - seid_len=seid_len, + deid=LispEid.create_eid(deid, deid_prefix), + seid=LispEid.create_eid(seid, seid_prefix), rloc_num=1, - rlocs=rlocs + rlocs=LispRemoteLocator.create_rloc(rloc) ) - cmd = u"lisp_add_del_remote_mapping" err_msg = f"Failed to delete remote mapping on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) + class LispAdjacency: """Class for lisp adjacency API.""" @@ -190,29 +196,12 @@ class LispAdjacency: :type seid_prefix: int :type is_mac: bool """ - if not is_mac: - eid_type = 0 if ip_address(deid).version == 4 else 1 - reid = ip_address(deid).packed - leid = ip_address(seid).packed - reid_len = deid_prefix - leid_len = seid_prefix - else: - eid_type = 2 - reid = L2Util.mac_to_bin(deid) - leid = L2Util.mac_to_bin(seid) - reid_len = 0 - leid_len = 0 - args = dict( - is_add=1, + is_add=True, vni=int(vni), - eid_type=eid_type, - reid=reid, - reid_len=reid_len, - leid=leid, - leid_len=leid_len + reid=LispEid.create_eid(deid, deid_prefix if not is_mac else None), + leid=LispEid.create_eid(seid, seid_prefix if not is_mac else None) ) - cmd = u"lisp_add_del_adjacency" err_msg = f"Failed to add lisp adjacency on host {node[u'host']}" @@ -238,39 +227,21 @@ class LispAdjacency: :type seid_prefix: int """ # used only with IPs - is_mac = False - - if not is_mac: - eid_type = 0 if ip_address(deid).version == 4 else 1 - reid = ip_address(deid).packed - leid = ip_address(seid).packed - reid_len = deid_prefix - leid_len = seid_prefix - else: - eid_type = 2 - reid = L2Util.mac_to_bin(deid) - leid = L2Util.mac_to_bin(seid) - reid_len = 0 - leid_len = 0 - args = dict( - is_add=0, + is_add=False, vni=int(vni), - eid_type=eid_type, - reid=reid, - reid_len=reid_len, - leid=leid, - leid_len=leid_len + eid=LispEid.create_eid(deid, deid_prefix), + leid=LispEid.create_eid(seid, seid_prefix) ) - cmd = u"lisp_add_del_adjacency" err_msg = f"Failed to delete lisp adjacency on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) + class LispGpeStatus: - """Clas for LISP GPE status manipulation.""" + """Class for LISP GPE status manipulation.""" @staticmethod def vpp_lisp_gpe_enable_disable(node, state): @@ -281,76 +252,13 @@ class LispGpeStatus: :type node: dict :type state: str """ - args = dict(is_en=0 if state == u"disable" else 1) - + args = dict(is_enable=bool(state == u"enable")) cmd = u"gpe_enable_disable" err_msg = f"Failed to set LISP GPE status on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) -class LispGpeForwardEntry: - """The functionality needed for these methods is not implemented in VPP - (VAT). Bug https://jira.fd.io/browse/VPP-334 was open to cover this issue. - - TODO: Implement when VPP-334 is fixed. - """ - - @staticmethod - def add_lisp_gpe_forward_entry(node, *args): - """Not implemented""" - # TODO: Implement when VPP-334 is fixed. - - @staticmethod - def del_lisp_gpe_forward_entry(node, *args): - """Not implemented""" - # TODO: Implement when VPP-334 is fixed. - - -class LispMapResolver: - """Class for Lisp map resolver API.""" - - @staticmethod - def vpp_add_map_resolver(node, map_resolver_ip): - """Set lisp map resolver on the VPP node in topology. - - :param node: VPP node. - :param map_resolver_ip: IP address of the map resolver. - :type node: dict - :type map_resolver_ip: str - """ - args = dict( - is_add=1, - is_ipv6=0 if ip_address(map_resolver_ip).version == 4 else 1, - ip_address=ip_address(map_resolver_ip).packed - ) - - cmd = u"lisp_add_del_map_resolver" - err_msg = f"Failed to add map resolver on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - papi_exec.add(cmd, **args).get_reply(err_msg) - - @staticmethod - def vpp_del_map_resolver(node, map_resolver_ip): - """Unset lisp map resolver on the VPP node in topology. - - :param node: VPP node. - :param map_resolver_ip: IP address of the map resolver. - :type node: dict - :type map_resolver_ip: str - """ - args = dict( - is_add=0, - is_ipv6=0 if ip_address(map_resolver_ip).version == 4 else 1, - ip_address=ip_address(map_resolver_ip).packed - ) - - cmd = u"lisp_add_del_map_resolver" - err_msg = f"Failed to delete map resolver on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - papi_exec.add(cmd, **args).get_reply(err_msg) class LispLocalEid: """Class for Lisp local eid API.""" @@ -362,31 +270,21 @@ class LispLocalEid: :param node: VPP node. :param locator_set_name: Name of the locator_set. - :param vni: vni value. + :param vni: Vni value. :param eid: Eid value. - :param prefix_len: prefix len if the eid is IP address. + :param prefix_len: Prefix len if the eid is IP address. :type node: dict :type locator_set_name: str :type vni: int :type eid: str :type prefix_len: int """ - if prefix_len: - eid_type = 0 if ip_address(eid).version == 4 else 1 - eid_packed = ip_address(eid).packed - else: - eid_type = 2 - eid_packed = L2Util.mac_to_bin(eid) - args = dict( - is_add=1, - eid_type=eid_type, - eid=eid_packed, - prefix_len=prefix_len, + is_add=True, + eid=LispEid.create_eid(eid, prefix_len), locator_set_name=locator_set_name, vni=int(vni) ) - cmd = u"lisp_add_del_local_eid" err_msg = f"Failed to add local eid on host {node[u'host']}" @@ -396,41 +294,32 @@ class LispLocalEid: @staticmethod def vpp_del_lisp_local_eid( node, locator_set_name, vni, eid, prefix_len=None): - """Set lisp eid addres on the VPP node in topology. + """Set lisp eid address on the VPP node in topology. :param node: VPP node. :param locator_set_name: Name of the locator_set. - :param vni: vni value. + :param vni: Vni value. :param eid: Eid value. - :param prefix_len: prefix len if the eid is IP address. + :param prefix_len: Prefix len if the eid is IP address. :type node: dict :type locator_set_name: str :type vni: int :type eid: str :type prefix_len: int """ - if prefix_len: - eid_type = 0 if ip_address(eid).version == 4 else 1 - eid_packed = ip_address(eid).packed - else: - eid_type = 2 - eid_packed = L2Util.mac_to_bin(eid) - args = dict( - is_add=0, - eid_type=eid_type, - eid=eid_packed, - prefix_len=prefix_len, + is_add=False, + eid=LispEid.create_eid(eid, prefix_len), locator_set_name=locator_set_name, vni=int(vni) ) - cmd = u"lisp_add_del_local_eid" err_msg = f"Failed to delete local eid on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) + class LispLocator: """Class for the Lisp Locator API.""" @@ -440,9 +329,9 @@ class LispLocator: :param node: VPP node. :param locator_name: Name of the locator_set. - :param sw_if_index: sw_if_index if the interface. - :param priority: priority of the locator. - :param weight: weight of the locator. + :param sw_if_index: Sw_if_index if the interface. + :param priority: Priority of the locator. + :param weight: Weight of the locator. :type node: dict :type locator_name: str :type sw_if_index: int @@ -451,13 +340,12 @@ class LispLocator: """ args = dict( - is_add=1, + is_add=True, locator_set_name=locator_name, sw_if_index=sw_if_index, priority=priority, weight=weight ) - cmd = u"lisp_add_del_locator" err_msg = f"Failed to add locator on host {node[u'host']}" @@ -470,9 +358,9 @@ class LispLocator: :param node: VPP node. :param locator_name: Name of the locator_set. - :param sw_if_index: sw_if_index if the interface. - :param priority: priority of the locator. - :param weight: weight of the locator. + :param sw_if_index: Sw_if_index if the interface. + :param priority: Priority of the locator. + :param weight: Weight of the locator. :type node: dict :type locator_name: str :type sw_if_index: int @@ -480,19 +368,19 @@ class LispLocator: :type weight: int """ args = dict( - is_add=0, + is_add=False, locator_set_name=locator_name, sw_if_index=sw_if_index, priority=priority, weight=weight ) - cmd = u"lisp_add_del_locator" err_msg = f"Failed to delete locator on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) + class LispLocatorSet: """Class for Lisp Locator Set API.""" @@ -506,12 +394,11 @@ class LispLocatorSet: :type name: str """ args = dict( - is_add=1, + is_add=True, locator_set_name=name, locator_num=0, locators=[] ) - cmd = u"lisp_add_del_locator_set" err_msg = f"Failed to add locator set on host {node[u'host']}" @@ -528,154 +415,17 @@ class LispLocatorSet: :type name: str """ args = dict( - is_add=0, + is_add=False, locator_set_name=name, locator_num=0, locators=[] ) - cmd = u"lisp_add_del_locator_set" err_msg = f"Failed to delete locator set on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) -class LispSetup: - """Lisp setup in topology.""" - - @staticmethod - def vpp_set_lisp_locator_set(node, locator_set_list): - """Set lisp locator_sets on VPP node in topology. - - :param node: VPP node. - :param locator_set_list: List of locator_set. - :type node: dict - :type locator_set_list: list - """ - - if node[u"type"] != NodeType.DUT: - raise ValueError(u"Node is not DUT") - - lisp_locator = LispLocator() - lisp_locator_set = LispLocatorSet() - for locator_set in locator_set_list: - locator_set_name = locator_set.get(u"locator-set") - locator_list = locator_set.get(u"locator") - lisp_locator_set.vpp_add_lisp_locator_set(node, locator_set_name) - for locator in locator_list: - sw_if_index = locator.get(u"locator-index") - priority = locator.get(u"priority") - weight = locator.get(u"weight") - lisp_locator.vpp_add_lisp_locator( - node, locator_set_name, sw_if_index, priority, weight - ) - - @staticmethod - def vpp_unset_lisp_locator_set(node, locator_set_list): - """Unset lisp locator_sets on VPP node in topology. - - :param node: VPP node. - :param locator_set_list: List of locator_set. - :type node: dict - :type locator_set_list: list - """ - if node[u"type"] != NodeType.DUT: - raise ValueError(u"Lisp locator set, node is not DUT") - - lisp_locator = LispLocator() - lisp_locator_set = LispLocatorSet() - for locator_set in locator_set_list: - locator_set_name = locator_set.get(u"locator-set") - locator_list = locator_set.get(u"locator") - for locator in locator_list: - sw_if_index = locator.get(u"locator-index") - priority = locator.get(u"priority") - weight = locator.get(u"weight") - lisp_locator.vpp_del_lisp_locator( - node, locator_set_name, sw_if_index, priority, weight - ) - - lisp_locator_set.vpp_del_lisp_locator_set(node, locator_set_name) - - @staticmethod - def vpp_set_lisp_eid_table(node, eid_table): - """Set lisp eid tables on VPP node in topology. - - :param node: VPP node. - :param eid_table: Dictionary containing information of eid_table. - :type node: dict - :type eid_table: dict - """ - if node[u"type"] != NodeType.DUT: - raise ValueError(u"Node is not DUT") - - lisp_locator_set = LispLocatorSet() - lisp_eid = LispLocalEid() - for eid in eid_table: - vni = eid.get(u"vni") - eid_address = eid.get(u"eid") - eid_prefix_len = eid.get(u"eid-prefix-len") - locator_set_name = eid.get(u"locator-set") - lisp_locator_set.vpp_add_lisp_locator_set(node, locator_set_name) - lisp_eid.vpp_add_lisp_local_eid( - node, locator_set_name, vni, eid_address, eid_prefix_len - ) - - @staticmethod - def vpp_unset_lisp_eid_table(node, eid_table): - """Unset lisp eid tables on VPP node in topology. - - :param node: VPP node. - :param eid_table: Dictionary containing information of eid_table. - :type node: dict - :type eid_table: dict - """ - if node[u"type"] != NodeType.DUT: - raise ValueError(u"Node is not DUT") - - locator_set_list = [] - lisp_locator_set = LispLocatorSet() - lisp_eid = LispLocalEid() - for eid in eid_table: - vni = eid.get(u"vni") - eid_address = eid.get(u"eid") - eid_prefix_len = eid.get(u"eid-prefix-len") - locator_set_name = eid.get(u"locator-set") - if locator_set_name not in locator_set_list: - locator_set_list.append(locator_set_name) - - lisp_eid.vpp_del_lisp_local_eid( - node, locator_set_name, vni, eid_address, eid_prefix_len - ) - - for locator_set_name in locator_set_list: - lisp_locator_set.vpp_del_lisp_locator_set(node, locator_set_name) - - @staticmethod - def vpp_set_lisp_map_resolver(node, map_resolver): - """Set lisp map resolvers on VPP node in topology. - - :param node: VPP node. - :param map_resolver: Dictionary containing information of map resolver. - :type node: dict - :type map_resolver: dict - """ - lisp_map_res = LispMapResolver() - for map_ip in map_resolver: - lisp_map_res.vpp_add_map_resolver(node, map_ip.get(u"map resolver")) - - @staticmethod - def vpp_unset_lisp_map_resolver(node, map_resolver): - """Unset lisp map resolvers on VPP node in topology. - - :param node: VPP node. - :param map_resolver: Dictionary containing information of map resolver. - :type node: dict - :type map_resolver: dict - """ - lisp_map_res = LispMapResolver() - for map_ip in map_resolver: - lisp_map_res.vpp_del_map_resolver(node, map_ip.get(u"map resolver")) class LispEidTableMap: """ @@ -697,28 +447,15 @@ class LispEidTableMap: :type vrf: int """ # adding default mapping vni=0, vrf=0 needs to be skipped - skip = False - - if bd_id: - is_l2 = 1 - dp_table = bd_id - else: - is_l2 = 0 - dp_table = vrf - # skip adding default mapping - if (int(vrf) == 0) and (int(vni) == 0): - skip = True - - args = dict( - is_add=1, - vni=int(vni), - dp_table=int(dp_table), - is_l2=is_l2 - ) - - cmd = u"lisp_eid_table_add_del_map" - err_msg = f"Failed to add eid table map on host {node[u'host']}" + if bd_id is not None or int(vrf if vrf else 0) or int(vni): + args = dict( + is_add=True, + vni=int(vni), + dp_table=int(bd_id) if bd_id is not None else int(vrf), + is_l2=bool(bd_id is not None) + ) + cmd = u"lisp_eid_table_add_del_map" + err_msg = f"Failed to add eid table map on host {node[u'host']}" - if not skip: with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py deleted file mode 100644 index a77f4ad854..0000000000 --- a/resources/libraries/python/LispUtil.py +++ /dev/null @@ -1,427 +0,0 @@ -# Copyright (c) 2019 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Lisp utilities library.""" - -from ipaddress import IPv4Address, IPv6Address -from robot.api import logger - -from resources.libraries.python.L2Util import L2Util -from resources.libraries.python.PapiExecutor import PapiSocketExecutor -from resources.libraries.python.topology import Topology - -class LispUtil: - """Implements keywords for Lisp tests.""" - - @staticmethod - def vpp_show_lisp_state(node): - """Get lisp state from VPP node. - - :param node: VPP node. - :type node: dict - :returns: Lisp gpe state. - :rtype: dict - """ - cmd = u"show_lisp_status" - err_msg = f"Failed to get LISP status on host {node['host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \ - else u"disabled" - data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \ - else u"disabled" - return data - - @staticmethod - def vpp_show_lisp_locator_set(node, items_filter): - """Get lisp locator_set from VPP node. - - :param node: VPP node. - :param items_filter: Filter which specifies which items should be - retrieved - local, remote, empty string = both. - :type node: dict - :type items_filter: str - :returns: Lisp locator_set data as python list. - :rtype: list - """ - ifilter = {u"_": 0, u"_local": 1, u"_remote": 2} - args = dict( - filter=ifilter[u"_" + items_filter] - ) - - cmd = u"lisp_locator_set_dump" - err_msg = f"Failed to get LISP locator set on host {node['host']}" - - try: - with PapiSocketExecutor(node) as papi_exec: - details = papi_exec.add(cmd, **args).get_details(err_msg) - data = list() - for locator in details: - data.append( - {u"ls_name": locator[u"ls_name"].rstrip(b'\0'), - u"ls_index": locator[u"ls_index"]} - ) - return data - except (ValueError, LookupError) as err: - logger.warn(f"Failed to get LISP locator set {err}") - return list() - - @staticmethod - def vpp_show_lisp_eid_table(node): - """Get lisp eid table from VPP node. - - :param node: VPP node. - :type node: dict - :returns: Lisp eid table as python list. - :rtype: list - """ - cmd = u"lisp_eid_table_dump" - err_msg = f"Failed to get LISP eid table on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - details = papi_exec.add(cmd).get_details(err_msg) - - data = list() - for eid_details in details: - eid = u"Bad eid type" - if eid_details[u"eid_type"] == 0: - prefix = str(eid_details[u"eid_prefix_len"]) - eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \ - prefix - elif eid_details[u"eid_type"] == 1: - prefix = str(eid_details[u"eid_prefix_len"]) - eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix - elif eid_details[u"eid_type"] == 2: - eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6])) - data.append( - { - u"action": eid_details[u"action"], - u"is_local": eid_details[u"is_local"], - u"eid": eid, - u"vni": eid_details[u"vni"], - u"ttl": eid_details[u"ttl"], - u"authoritative": eid_details[u"authoritative"] - } - ) - return data - - @staticmethod - def vpp_show_lisp_map_resolver(node): - """Get lisp map resolver from VPP node. - - :param node: VPP node. - :type node: dict - :returns: Lisp map resolver as python list. - :rtype: list - """ - cmd = u"lisp_map_resolver_dump" - err_msg = f"Failed to get LISP map resolver on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - details = papi_exec.add(cmd).get_details(err_msg) - - data = list() - for resolver in details: - address = u"Bad is_ipv6 flag" - if resolver[u"is_ipv6"] == 0: - address = str(IPv4Address(resolver[u"ip_address"][0:4])) - elif resolver[u"is_ipv6"] == 1: - address = str(IPv6Address(resolver[u"ip_address"])) - data.append({u"map resolver": address}) - return data - - @staticmethod - def vpp_show_lisp_map_register(node): - """Get LISP Map Register from VPP node. - - :param node: VPP node. - :type node: dict - :returns: LISP Map Register as python dict. - :rtype: dict - """ - cmd = u"show_lisp_map_register_state" - err_msg = f"Failed to get LISP map register state on host " \ - f"{node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" - logger.info(data) - return data - - @staticmethod - def vpp_show_lisp_map_request_mode(node): - """Get LISP Map Request mode from VPP node. - - :param node: VPP node. - :type node: dict - :returns: LISP Map Request mode as python dict. - :rtype: dict - """ - cmd = u"show_lisp_map_request_mode" - err_msg = f"Failed to get LISP map request mode on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \ - else u"dst-only" - logger.info(data) - return data - - @staticmethod - def vpp_show_lisp_map_server(node): - """Get LISP Map Server from VPP node. - - :param node: VPP node. - :type node: dict - :returns: LISP Map Server as python list. - :rtype: list - """ - cmd = u"lisp_map_server_dump" - err_msg = f"Failed to get LISP map server on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - details = papi_exec.add(cmd).get_details(err_msg) - - data = list() - for server in details: - address = u"Bad is_ipv6 flag" - if server[u"is_ipv6"] == 0: - address = str(IPv4Address(server[u"ip_address"][0:4])) - elif server[u"is_ipv6"] == 1: - address = str(IPv6Address(server[u"ip_address"])) - data.append({u"map-server": address}) - logger.info(data) - return data - - @staticmethod - def vpp_show_lisp_petr_config(node): - """Get LISP PETR configuration from VPP node. - - :param node: VPP node. - :type node: dict - :returns: LISP PETR configuration as python dict. - :rtype: dict - """ - # Note: VAT is returning ipv6 address instead of ipv4 - cmd = u"show_lisp_use_petr" - err_msg = f"Failed to get LISP petr config on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" - address = u"Bad is_ip4 flag" - if reply[u"is_ip4"] == 0: - address = str(IPv6Address(reply[u"address"])) - elif reply[u"is_ip4"] == 1: - address = str(IPv4Address(reply[u"address"][0:4])) - data[u"address"] = address - logger.info(data) - return data - - @staticmethod - def vpp_show_lisp_rloc_config(node): - """Get LISP RLOC configuration from VPP node. - - :param node: VPP node. - :type node: dict - :returns: LISP RLOC configuration as python dict. - :rtype: dict - """ - cmd = u"show_lisp_rloc_probe_state" - err_msg = f"Failed to get LISP rloc config on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled" - logger.info(data) - return data - - @staticmethod - def vpp_show_lisp_pitr(node): - """Get Lisp PITR feature config from VPP node. - - :param node: VPP node. - :type node: dict - :returns: Lisp PITR config data. - :rtype: dict - """ - cmd = u"show_lisp_pitr" - err_msg = f"Failed to get LISP pitr on host {node[u'host']}" - - with PapiSocketExecutor(node) as papi_exec: - reply = papi_exec.add(cmd).get_reply(err_msg) - - data = dict() - data[u"status"] = u"enabled" if reply[u"status"] else u"disabled" - return data - - @staticmethod - def lisp_should_be_equal(lisp_val1, lisp_val2): - """Fail if the lisp values are not equal. - - :param lisp_val1: First lisp value. - :param lisp_val2: Second lisp value. - :type lisp_val1: list - :type lisp_val2: list - """ - len1 = len(lisp_val1) - len2 = len(lisp_val2) - - if len1 != len2: - raise RuntimeError( - f"Values are not same. Value 1 {lisp_val1} \n" - f"Value 2 {lisp_val2}." - ) - - for tmp in lisp_val1: - if tmp not in lisp_val2: - raise RuntimeError( - f"Value {tmp} not found in vpp:\n{lisp_val2}" - ) - - def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2): - """Fail if the lisp values are not equal. - - :param locator_set1: Generate lisp value. - :param locator_set2: Lisp value from VPP. - :type locator_set1: list - :type locator_set2: list - """ - locator_set_list = list() - - for item in locator_set1: - if item not in locator_set_list: - locator_set_list.append(item) - - self.lisp_should_be_equal(locator_set_list, locator_set2) - - @staticmethod - def generate_unique_lisp_locator_set_data(node, locator_set_number): - """Generate a list of lisp locator_set we want set to VPP and - then check if it is set correctly. All locator_sets are unique. - - :param node: VPP node. - :param locator_set_number: Generate n locator_set. - :type node: dict - :type locator_set_number: str - :returns: list of lisp locator_set, list of lisp locator_set expected - from VAT. - :rtype: tuple - """ - topo = Topology() - locator_set_list = list() - locator_set_list_vat = list() - - i = 0 - for num in range(0, int(locator_set_number)): - locator_list = list() - for interface in list(node[u"interfaces"].values()): - link = interface.get(u"link") - i += 1 - if link is None: - continue - - if_name = topo.get_interface_by_link_name(node, link) - sw_if_index = topo.get_interface_sw_index(node, if_name) - if if_name is not None: - locator = { - u"locator-index": sw_if_index, - u"priority": i, - u"weight": i - } - locator_list.append(locator) - - l_name = f"ls{num}" - locator_set = { - u"locator-set": l_name, - u"locator": locator_list - } - locator_set_list.append(locator_set) - - locator_set_vat = { - u"ls_name": l_name, - u"ls_index": num - } - locator_set_list_vat.append(locator_set_vat) - - return locator_set_list, locator_set_list_vat - - @staticmethod - def generate_duplicate_lisp_locator_set_data(node, locator_set_number): - """Generate a list of lisp locator_set we want set to VPP and - then check if it is set correctly. Some locator_sets are duplicated. - - :param node: VPP node. - :param locator_set_number: Generate n locator_set. - :type node: dict - :type locator_set_number: str - :returns: list of lisp locator_set, list of lisp locator_set expected - from VAT. - :rtype: tuple - """ - topo = Topology() - locator_set_list = list() - locator_set_list_vat = list() - - i = 0 - for num in range(0, int(locator_set_number)): - locator_list = [] - for interface in list(node[u"interfaces"].values()): - link = interface.get(u"link") - i += 1 - if link is None: - continue - - if_name = topo.get_interface_by_link_name(node, link) - sw_if_index = topo.get_interface_sw_index(node, if_name) - if if_name is not None: - l_name = f"ls{num}" - locator = { - u"locator-index": sw_if_index, - u"priority": i, - u"weight": i - } - locator_list.append(locator) - locator_set = { - u"locator-set": l_name, - u"locator": locator_list - } - locator_set_list.append(locator_set) - - locator_set_vat = { - u"ls_name": l_name, - u"ls_index": num - } - locator_set_list_vat.append(locator_set_vat) - - return locator_set_list, locator_set_list_vat - - def lisp_is_empty(self, lisp_params): - """Check if the input param are empty. - - :param lisp_params: Should be empty list. - :type lisp_params: list - """ - self.lisp_should_be_equal([], lisp_params) |