aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/VPPUtil.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/VPPUtil.py')
-rw-r--r--resources/libraries/python/VPPUtil.py122
1 files changed, 55 insertions, 67 deletions
diff --git a/resources/libraries/python/VPPUtil.py b/resources/libraries/python/VPPUtil.py
index 72b6142306..72325d8169 100644
--- a/resources/libraries/python/VPPUtil.py
+++ b/resources/libraries/python/VPPUtil.py
@@ -22,7 +22,7 @@ from resources.libraries.python.ssh import exec_cmd_no_error
from resources.libraries.python.topology import NodeType
-class VPPUtil(object):
+class VPPUtil:
"""General class for any VPP related methods/functions."""
@staticmethod
@@ -38,20 +38,20 @@ class VPPUtil(object):
:type additional_cmds: tuple
"""
def_setting_tb_displayed = {
- 'IPv6 FIB': 'ip6 fib',
- 'IPv4 FIB': 'ip fib',
- 'Interface IP': 'int addr',
- 'Interfaces': 'int',
- 'ARP': 'ip arp',
- 'Errors': 'err'
+ u"IPv6 FIB": u"ip6 fib",
+ u"IPv4 FIB": u"ip fib",
+ u"Interface IP": u"int addr",
+ u"Interfaces": u"int",
+ u"ARP": u"ip arp",
+ u"Errors": u"err"
}
if additional_cmds:
for cmd in additional_cmds:
- def_setting_tb_displayed['Custom Setting: {}'.format(cmd)] = cmd
+ def_setting_tb_displayed[f"Custom Setting: {cmd}"] = cmd
for _, cmd in def_setting_tb_displayed.items():
- command = 'vppctl sh {cmd}'.format(cmd=cmd)
+ command = f"vppctl sh {cmd}"
exec_cmd_no_error(node, command, timeout=30, sudo=True)
@staticmethod
@@ -71,7 +71,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.restart_vpp_service(node)
@staticmethod
@@ -91,7 +91,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.stop_vpp_service(node)
@staticmethod
@@ -101,9 +101,8 @@ class VPPUtil(object):
:param node: Topology node.
:type node: dict
"""
- cmd = 'command -v vpp'
- exec_cmd_no_error(
- node, cmd, message='VPP is not installed!')
+ cmd = u"command -v vpp"
+ exec_cmd_no_error(node, cmd, message=u"VPP is not installed!")
@staticmethod
def verify_vpp_started(node):
@@ -112,15 +111,16 @@ class VPPUtil(object):
:param node: Topology node.
:type node: dict
"""
- cmd = 'echo "show pci" | sudo socat - UNIX-CONNECT:/run/vpp/cli.sock'
+ cmd = u"echo \"show pci\" | sudo socat - UNIX-CONNECT:/run/vpp/cli.sock"
exec_cmd_no_error(
- node, cmd, sudo=False, message='VPP failed to start!', retries=120)
+ node, cmd, sudo=False, message=u"VPP failed to start!", retries=120
+ )
- cmd = ('vppctl show pci 2>&1 | '
- 'fgrep -v "Connection refused" | '
- 'fgrep -v "No such file or directory"')
+ cmd = u"vppctl show pci 2>&1 | fgrep -v \"Connection refused\" | " \
+ u"fgrep -v \"No such file or directory\""
exec_cmd_no_error(
- node, cmd, sudo=True, message='VPP failed to start!', retries=120)
+ node, cmd, sudo=True, message=u"VPP failed to start!", retries=120
+ )
@staticmethod
def verify_vpp(node):
@@ -133,9 +133,9 @@ class VPPUtil(object):
"""
VPPUtil.verify_vpp_installed(node)
try:
- # Verify responsivness of vppctl.
+ # Verify responsiveness of vppctl.
VPPUtil.verify_vpp_started(node)
- # Verify responsivness of PAPI.
+ # Verify responsiveness of PAPI.
VPPUtil.show_log(node)
VPPUtil.vpp_show_version(node)
finally:
@@ -149,33 +149,23 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.verify_vpp(node)
@staticmethod
- def vpp_show_version(node, verbose=True):
+ def vpp_show_version(node):
"""Run "show_version" PAPI command.
:param node: Node to run command on.
- :param verbose: Show version, compile date and compile location if True
- otherwise show only version.
:type node: dict
- :type verbose: bool
:returns: VPP version.
:rtype: str
"""
- cmd = 'show_version'
+ cmd = u"show_version"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply()
- return_version = reply['version'].rstrip('\0x00')
- version = 'VPP version: {ver}\n'.format(ver=return_version)
- if verbose:
- version += ('Compile date: {date}\n'
- 'Compile location: {cl}\n'.
- format(date=reply['build_date'].rstrip('\0x00'),
- cl=reply['build_directory'].rstrip('\0x00')))
- logger.info(version)
- return return_version
+ logger.info(f"VPP version: {reply[u'version']}\n")
+ return f"{reply[u'version']}"
@staticmethod
def show_vpp_version_on_all_duts(nodes):
@@ -185,7 +175,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.vpp_show_version(node)
@staticmethod
@@ -196,29 +186,27 @@ class VPPUtil(object):
:type node: dict
"""
- cmd = 'sw_interface_dump'
+ cmd = u"sw_interface_dump"
args = dict(
name_filter_valid=False,
- name_filter=''
+ name_filter=u""
)
- err_msg = 'Failed to get interface dump on host {host}'.format(
- host=node['host'])
+ err_msg = f"Failed to get interface dump on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
for if_dump in details:
- if_dump['l2_address'] = str(if_dump['l2_address'])
- if_dump['b_dmac'] = str(if_dump['b_dmac'])
- if_dump['b_smac'] = str(if_dump['b_smac'])
- if_dump['flags'] = if_dump['flags'].value
- if_dump['type'] = if_dump['type'].value
- if_dump['link_duplex'] = if_dump['link_duplex'].value
- if_dump['sub_if_flags'] = if_dump['sub_if_flags'].value \
- if hasattr(if_dump['sub_if_flags'], 'value') \
- else int(if_dump['sub_if_flags'])
+ if_dump[u"l2_address"] = str(if_dump[u"l2_address"])
+ if_dump[u"b_dmac"] = str(if_dump[u"b_dmac"])
+ if_dump[u"b_smac"] = str(if_dump[u"b_smac"])
+ if_dump[u"flags"] = if_dump[u"flags"].value
+ if_dump[u"type"] = if_dump[u"type"].value
+ if_dump[u"link_duplex"] = if_dump[u"link_duplex"].value
+ if_dump[u"sub_if_flags"] = if_dump[u"sub_if_flags"].value \
+ if hasattr(if_dump[u"sub_if_flags"], u"value") \
+ else int(if_dump[u"sub_if_flags"])
# TODO: return only base data
- logger.trace('Interface data of host {host}:\n{details}'.format(
- host=node['host'], details=details))
+ logger.trace(f"Interface data of host {node[u'host']}:\n{details}")
@staticmethod
def vpp_enable_traces_on_dut(node, fail_on_error=False):
@@ -231,10 +219,10 @@ class VPPUtil(object):
:type fail_on_error: bool
"""
cmds = [
- "trace add dpdk-input 50",
- "trace add vhost-user-input 50",
- "trace add memif-input 50",
- "trace add avf-input 50"
+ u"trace add dpdk-input 50",
+ u"trace add vhost-user-input 50",
+ u"trace add memif-input 50",
+ u"trace add avf-input 50"
]
for cmd in cmds:
@@ -255,7 +243,7 @@ class VPPUtil(object):
:type fail_on_error: bool
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.vpp_enable_traces_on_dut(node, fail_on_error)
@staticmethod
@@ -275,7 +263,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.vpp_enable_elog_traces(node)
@staticmethod
@@ -285,7 +273,7 @@ class VPPUtil(object):
:param node: Topology node.
:type node: dict
"""
- PapiSocketExecutor.run_cli_cmd(node, "show event-logger")
+ PapiSocketExecutor.run_cli_cmd(node, u"show event-logger")
@staticmethod
def show_event_logger_on_all_duts(nodes):
@@ -295,7 +283,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.show_event_logger(node)
@staticmethod
@@ -305,7 +293,7 @@ class VPPUtil(object):
:param node: Topology node.
:type node: dict
"""
- PapiSocketExecutor.run_cli_cmd(node, "show logging")
+ PapiSocketExecutor.run_cli_cmd(node, u"show logging")
@staticmethod
def show_log_on_all_duts(nodes):
@@ -315,7 +303,7 @@ class VPPUtil(object):
:type nodes: dict
"""
for node in nodes.values():
- if node['type'] == NodeType.DUT:
+ if node[u"type"] == NodeType.DUT:
VPPUtil.show_log(node)
@staticmethod
@@ -327,19 +315,19 @@ class VPPUtil(object):
:returns: VPP thread data.
:rtype: list
"""
- cmd = 'show_threads'
+ cmd = u"show_threads"
with PapiSocketExecutor(node) as papi_exec:
reply = papi_exec.add(cmd).get_reply()
threads_data = list()
- for thread in reply["thread_data"]:
+ for thread in reply[u"thread_data"]:
thread_data = list()
for item in thread:
- if isinstance(item, unicode):
+ if isinstance(item, str):
item = item.rstrip('\x00')
thread_data.append(item)
threads_data.append(thread_data)
- logger.trace("show threads:\n{threads}".format(threads=threads_data))
+ logger.trace(f"show threads:\n{threads_data}")
return threads_data