aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/LispUtil.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/LispUtil.py')
-rw-r--r--resources/libraries/python/LispUtil.py272
1 files changed, 137 insertions, 135 deletions
diff --git a/resources/libraries/python/LispUtil.py b/resources/libraries/python/LispUtil.py
index 114cd72202..a77f4ad854 100644
--- a/resources/libraries/python/LispUtil.py
+++ b/resources/libraries/python/LispUtil.py
@@ -13,19 +13,16 @@
"""Lisp utilities library."""
-from robot.api import logger
from ipaddress import IPv4Address, IPv6Address
+from robot.api import logger
-from resources.libraries.python.topology import Topology
-from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.L2Util import L2Util
+from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+from resources.libraries.python.topology import Topology
-class LispUtil(object):
+class LispUtil:
"""Implements keywords for Lisp tests."""
- def __init__(self):
- pass
-
@staticmethod
def vpp_show_lisp_state(node):
"""Get lisp state from VPP node.
@@ -35,17 +32,17 @@ class LispUtil(object):
:returns: Lisp gpe state.
:rtype: dict
"""
- cmd = 'show_lisp_status'
- err_msg = "Failed to get LISP status on host {host}".format(
- host=node['host'])
+ cmd = u"show_lisp_status"
+ err_msg = f"Failed to get LISP status on host {node['host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["feature_status"] = "enabled" if reply["feature_status"] else \
- "disabled"
- data["gpe_status"] = "enabled" if reply["gpe_status"] else "disabled"
+ data[u"feature_status"] = u"enabled" if reply[u"feature_status"] \
+ else u"disabled"
+ data[u"gpe_status"] = u"enabled" if reply[u"gpe_status"] \
+ else u"disabled"
return data
@staticmethod
@@ -60,24 +57,27 @@ class LispUtil(object):
:returns: Lisp locator_set data as python list.
:rtype: list
"""
+ ifilter = {u"_": 0, u"_local": 1, u"_remote": 2}
+ args = dict(
+ filter=ifilter[u"_" + items_filter]
+ )
- ifilter = {"_": 0, "_local": 1, "_remote": 2}
- args = dict(filter=ifilter["_" + items_filter])
-
- cmd = 'lisp_locator_set_dump'
- err_msg = "Failed to get LISP locator set on host {host}".format(
- host=node['host'])
+ cmd = u"lisp_locator_set_dump"
+ err_msg = f"Failed to get LISP locator set on host {node['host']}"
try:
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
- data = []
+ data = list()
for locator in details:
- data.append({"ls_name": locator["ls_name"].rstrip('\x00'),
- "ls_index": locator["ls_index"]})
+ data.append(
+ {u"ls_name": locator[u"ls_name"].rstrip(b'\0'),
+ u"ls_index": locator[u"ls_index"]}
+ )
return data
- except (ValueError, LookupError):
- return []
+ except (ValueError, LookupError) as err:
+ logger.warn(f"Failed to get LISP locator set {err}")
+ return list()
@staticmethod
def vpp_show_lisp_eid_table(node):
@@ -88,31 +88,34 @@ class LispUtil(object):
:returns: Lisp eid table as python list.
:rtype: list
"""
-
- cmd = 'lisp_eid_table_dump'
- err_msg = "Failed to get LISP eid table on host {host}".format(
- host=node['host'])
+ cmd = u"lisp_eid_table_dump"
+ err_msg = f"Failed to get LISP eid table on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd).get_details(err_msg)
- data = []
+ data = list()
for eid_details in details:
- eid = 'Bad eid type'
- if eid_details["eid_type"] == 0:
- prefix = str(eid_details["eid_prefix_len"])
- eid = str(IPv4Address(eid_details["eid"][0:4])) + "/" + prefix
- elif eid_details["eid_type"] == 1:
- prefix = str(eid_details["eid_prefix_len"])
- eid = str(IPv6Address(eid_details["eid"])) + "/" + prefix
- elif eid_details["eid_type"] == 2:
- eid = str(L2Util.bin_to_mac(eid_details["eid"][0:6]))
- data.append({"action": eid_details["action"],
- "is_local": eid_details["is_local"],
- "eid": eid,
- "vni": eid_details["vni"],
- "ttl": eid_details["ttl"],
- "authoritative": eid_details["authoritative"]})
+ eid = u"Bad eid type"
+ if eid_details[u"eid_type"] == 0:
+ prefix = str(eid_details[u"eid_prefix_len"])
+ eid = str(IPv4Address(eid_details[u"eid"][0:4])) + u"/" + \
+ prefix
+ elif eid_details[u"eid_type"] == 1:
+ prefix = str(eid_details[u"eid_prefix_len"])
+ eid = str(IPv6Address(eid_details[u"eid"])) + u"/" + prefix
+ elif eid_details[u"eid_type"] == 2:
+ eid = str(L2Util.bin_to_mac(eid_details[u"eid"][0:6]))
+ data.append(
+ {
+ u"action": eid_details[u"action"],
+ u"is_local": eid_details[u"is_local"],
+ u"eid": eid,
+ u"vni": eid_details[u"vni"],
+ u"ttl": eid_details[u"ttl"],
+ u"authoritative": eid_details[u"authoritative"]
+ }
+ )
return data
@staticmethod
@@ -124,22 +127,20 @@ class LispUtil(object):
:returns: Lisp map resolver as python list.
:rtype: list
"""
-
- cmd = 'lisp_map_resolver_dump'
- err_msg = "Failed to get LISP map resolver on host {host}".format(
- host=node['host'])
+ cmd = u"lisp_map_resolver_dump"
+ err_msg = f"Failed to get LISP map resolver on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd).get_details(err_msg)
- data = []
+ data = list()
for resolver in details:
- address = 'Bad is_ipv6 flag'
- if resolver["is_ipv6"] == 0:
- address = str(IPv4Address(resolver["ip_address"][0:4]))
- elif resolver["is_ipv6"] == 1:
- address = str(IPv6Address(resolver["ip_address"]))
- data.append({"map resolver": address})
+ address = u"Bad is_ipv6 flag"
+ if resolver[u"is_ipv6"] == 0:
+ address = str(IPv4Address(resolver[u"ip_address"][0:4]))
+ elif resolver[u"is_ipv6"] == 1:
+ address = str(IPv6Address(resolver[u"ip_address"]))
+ data.append({u"map resolver": address})
return data
@staticmethod
@@ -151,16 +152,15 @@ class LispUtil(object):
:returns: LISP Map Register as python dict.
:rtype: dict
"""
-
- cmd = 'show_lisp_map_register_state'
- err_msg = "Failed to get LISP map register state on host {host}".format(
- host=node['host'])
+ cmd = u"show_lisp_map_register_state"
+ err_msg = f"Failed to get LISP map register state on host " \
+ f"{node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["state"] = "enabled" if reply["is_enabled"] else "disabled"
+ data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
logger.info(data)
return data
@@ -173,16 +173,15 @@ class LispUtil(object):
:returns: LISP Map Request mode as python dict.
:rtype: dict
"""
-
- cmd = 'show_lisp_map_request_mode'
- err_msg = "Failed to get LISP map request mode on host {host}".format(
- host=node['host'])
+ cmd = u"show_lisp_map_request_mode"
+ err_msg = f"Failed to get LISP map request mode on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["map_request_mode"] = "src-dst" if reply["mode"] else "dst-only"
+ data[u"map_request_mode"] = u"src-dst" if reply[u"mode"] \
+ else u"dst-only"
logger.info(data)
return data
@@ -195,22 +194,20 @@ class LispUtil(object):
:returns: LISP Map Server as python list.
:rtype: list
"""
-
- cmd = 'lisp_map_server_dump'
- err_msg = "Failed to get LISP map server on host {host}".format(
- host=node['host'])
+ cmd = u"lisp_map_server_dump"
+ err_msg = f"Failed to get LISP map server on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd).get_details(err_msg)
- data = []
+ data = list()
for server in details:
- address = 'Bad is_ipv6 flag'
- if server["is_ipv6"] == 0:
- address = str(IPv4Address(server["ip_address"][0:4]))
- elif server["is_ipv6"] == 1:
- address = str(IPv6Address(server["ip_address"]))
- data.append({"map-server": address})
+ address = u"Bad is_ipv6 flag"
+ if server[u"is_ipv6"] == 0:
+ address = str(IPv4Address(server[u"ip_address"][0:4]))
+ elif server[u"is_ipv6"] == 1:
+ address = str(IPv6Address(server[u"ip_address"]))
+ data.append({u"map-server": address})
logger.info(data)
return data
@@ -223,24 +220,21 @@ class LispUtil(object):
:returns: LISP PETR configuration as python dict.
:rtype: dict
"""
-
-# Note: VAT is returning ipv6 address instead of ipv4
-
- cmd = 'show_lisp_use_petr'
- err_msg = "Failed to get LISP petr config on host {host}".format(
- host=node['host'])
+ # Note: VAT is returning ipv6 address instead of ipv4
+ cmd = u"show_lisp_use_petr"
+ err_msg = f"Failed to get LISP petr config on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["status"] = "enabled" if reply["status"] else "disabled"
- address = 'Bad is_ip4 flag'
- if reply["is_ip4"] == 0:
- address = str(IPv6Address(reply["address"]))
- elif reply["is_ip4"] == 1:
- address = str(IPv4Address(reply["address"][0:4]))
- data["address"] = address
+ data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
+ address = u"Bad is_ip4 flag"
+ if reply[u"is_ip4"] == 0:
+ address = str(IPv6Address(reply[u"address"]))
+ elif reply[u"is_ip4"] == 1:
+ address = str(IPv4Address(reply[u"address"][0:4]))
+ data[u"address"] = address
logger.info(data)
return data
@@ -253,16 +247,14 @@ class LispUtil(object):
:returns: LISP RLOC configuration as python dict.
:rtype: dict
"""
-
- cmd = 'show_lisp_rloc_probe_state'
- err_msg = "Failed to get LISP rloc config on host {host}".format(
- host=node['host'])
+ cmd = u"show_lisp_rloc_probe_state"
+ err_msg = f"Failed to get LISP rloc config on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["state"] = "enabled" if reply["is_enabled"] else "disabled"
+ data[u"state"] = u"enabled" if reply[u"is_enabled"] else u"disabled"
logger.info(data)
return data
@@ -275,16 +267,14 @@ class LispUtil(object):
:returns: Lisp PITR config data.
:rtype: dict
"""
-
- cmd = 'show_lisp_pitr'
- err_msg = "Failed to get LISP pitr on host {host}".format(
- host=node['host'])
+ cmd = u"show_lisp_pitr"
+ err_msg = f"Failed to get LISP pitr on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply(err_msg)
data = dict()
- data["status"] = "enabled" if reply["status"] else "disabled"
+ data[u"status"] = u"enabled" if reply[u"status"] else u"disabled"
return data
@staticmethod
@@ -296,19 +286,20 @@ class LispUtil(object):
:type lisp_val1: list
:type lisp_val2: list
"""
-
len1 = len(lisp_val1)
len2 = len(lisp_val2)
+
if len1 != len2:
- raise RuntimeError('Values are not same. '
- 'Value 1 {} \n'
- 'Value 2 {}.'.format(lisp_val1,
- lisp_val2))
+ raise RuntimeError(
+ f"Values are not same. Value 1 {lisp_val1} \n"
+ f"Value 2 {lisp_val2}."
+ )
for tmp in lisp_val1:
if tmp not in lisp_val2:
- raise RuntimeError('Value {} not found in vpp:\n'
- '{}'.format(tmp, lisp_val2))
+ raise RuntimeError(
+ f"Value {tmp} not found in vpp:\n{lisp_val2}"
+ )
def lisp_locator_s_should_be_equal(self, locator_set1, locator_set2):
"""Fail if the lisp values are not equal.
@@ -318,11 +309,12 @@ class LispUtil(object):
:type locator_set1: list
:type locator_set2: list
"""
+ locator_set_list = list()
- locator_set_list = []
for item in locator_set1:
if item not in locator_set_list:
locator_set_list.append(item)
+
self.lisp_should_be_equal(locator_set_list, locator_set2)
@staticmethod
@@ -338,16 +330,15 @@ class LispUtil(object):
from VAT.
:rtype: tuple
"""
-
topo = Topology()
+ locator_set_list = list()
+ locator_set_list_vat = list()
- locator_set_list = []
- locator_set_list_vat = []
i = 0
for num in range(0, int(locator_set_number)):
- locator_list = []
- for interface in node['interfaces'].values():
- link = interface.get('link')
+ locator_list = list()
+ for interface in list(node[u"interfaces"].values()):
+ link = interface.get(u"link")
i += 1
if link is None:
continue
@@ -355,18 +346,24 @@ class LispUtil(object):
if_name = topo.get_interface_by_link_name(node, link)
sw_if_index = topo.get_interface_sw_index(node, if_name)
if if_name is not None:
- locator = {'locator-index': sw_if_index,
- 'priority': i,
- 'weight': i}
+ locator = {
+ u"locator-index": sw_if_index,
+ u"priority": i,
+ u"weight": i
+ }
locator_list.append(locator)
- l_name = 'ls{0}'.format(num)
- locator_set = {'locator-set': l_name,
- 'locator': locator_list}
+ l_name = f"ls{num}"
+ locator_set = {
+ u"locator-set": l_name,
+ u"locator": locator_list
+ }
locator_set_list.append(locator_set)
- locator_set_vat = {"ls_name": l_name,
- "ls_index": num}
+ locator_set_vat = {
+ u"ls_name": l_name,
+ u"ls_index": num
+ }
locator_set_list_vat.append(locator_set_vat)
return locator_set_list, locator_set_list_vat
@@ -384,15 +381,15 @@ class LispUtil(object):
from VAT.
:rtype: tuple
"""
-
topo = Topology()
- locator_set_list = []
- locator_set_list_vat = []
+ locator_set_list = list()
+ locator_set_list_vat = list()
+
i = 0
for num in range(0, int(locator_set_number)):
locator_list = []
- for interface in node['interfaces'].values():
- link = interface.get('link')
+ for interface in list(node[u"interfaces"].values()):
+ link = interface.get(u"link")
i += 1
if link is None:
continue
@@ -400,17 +397,23 @@ class LispUtil(object):
if_name = topo.get_interface_by_link_name(node, link)
sw_if_index = topo.get_interface_sw_index(node, if_name)
if if_name is not None:
- l_name = 'ls{0}'.format(num)
- locator = {'locator-index': sw_if_index,
- 'priority': i,
- 'weight': i}
+ l_name = f"ls{num}"
+ locator = {
+ u"locator-index": sw_if_index,
+ u"priority": i,
+ u"weight": i
+ }
locator_list.append(locator)
- locator_set = {'locator-set': l_name,
- 'locator': locator_list}
+ locator_set = {
+ u"locator-set": l_name,
+ u"locator": locator_list
+ }
locator_set_list.append(locator_set)
- locator_set_vat = {"ls_name": l_name,
- "ls_index": num}
+ locator_set_vat = {
+ u"ls_name": l_name,
+ u"ls_index": num
+ }
locator_set_list_vat.append(locator_set_vat)
return locator_set_list, locator_set_list_vat
@@ -421,5 +424,4 @@ class LispUtil(object):
:param lisp_params: Should be empty list.
:type lisp_params: list
"""
-
self.lisp_should_be_equal([], lisp_params)