aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries')
-rw-r--r--resources/libraries/python/DUTSetup.py17
-rw-r--r--resources/libraries/python/DropRateSearch.py6
-rw-r--r--resources/libraries/python/FlowUtil.py20
-rw-r--r--resources/libraries/python/IPUtil.py4
-rw-r--r--resources/libraries/python/IPsecUtil.py14
-rw-r--r--resources/libraries/python/IncrementUtil.py2
-rw-r--r--resources/libraries/python/NodePath.py3
7 files changed, 25 insertions, 41 deletions
diff --git a/resources/libraries/python/DUTSetup.py b/resources/libraries/python/DUTSetup.py
index 504022381e..9d0a3a8ff7 100644
--- a/resources/libraries/python/DUTSetup.py
+++ b/resources/libraries/python/DUTSetup.py
@@ -408,11 +408,11 @@ class DUTSetup:
# sriov is not supported and we want 0 VFs
# no need to do anything
return
- else:
- raise RuntimeError(
- f"Can't configure {numvfs} VFs on {pf_pci_addr} device "
- f"on {node[u'host']} since it doesn't support SR-IOV."
- )
+
+ raise RuntimeError(
+ f"Can't configure {numvfs} VFs on {pf_pci_addr} device "
+ f"on {node[u'host']} since it doesn't support SR-IOV."
+ )
pci = pf_pci_addr.replace(u":", r"\:")
command = f"sh -c \"echo {numvfs} | " \
@@ -572,10 +572,9 @@ class DUTSetup:
# the directory doesn't exist which means the device is not bound
# to any driver
return None
- else:
- cmd = f"basename $(readlink -f {driver_path})"
- ret_val, _ = exec_cmd_no_error(node, cmd)
- return ret_val.strip()
+ cmd = f"basename $(readlink -f {driver_path})"
+ ret_val, _ = exec_cmd_no_error(node, cmd)
+ return ret_val.strip()
@staticmethod
def verify_kernel_module(node, module, force_load=False):
diff --git a/resources/libraries/python/DropRateSearch.py b/resources/libraries/python/DropRateSearch.py
index b1991ee046..2417df8c41 100644
--- a/resources/libraries/python/DropRateSearch.py
+++ b/resources/libraries/python/DropRateSearch.py
@@ -426,10 +426,8 @@ class DropRateSearch(metaclass=ABCMeta):
self._search_result = SearchResults.SUCCESS
self._search_result_rate = rate
return
- else:
- raise RuntimeError(u"Unknown search result")
- else:
- raise Exception(u"Unknown search direction")
+ raise RuntimeError(u"Unknown search result")
+ raise Exception(u"Unknown search direction")
def verify_search_result(self):
"""Fail if search was not successful.
diff --git a/resources/libraries/python/FlowUtil.py b/resources/libraries/python/FlowUtil.py
index 1f647a8e11..1dcb7e0fae 100644
--- a/resources/libraries/python/FlowUtil.py
+++ b/resources/libraries/python/FlowUtil.py
@@ -19,6 +19,8 @@ from resources.libraries.python.topology import Topology
from resources.libraries.python.ssh import exec_cmd_no_error
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
+from vpp_papi import VppEnum
+
class FlowUtil:
"""Utilities for flow configuration."""
@@ -48,8 +50,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_n_tuple"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_N_TUPLE
@@ -99,8 +99,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip6_n_tuple"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP6_N_TUPLE
@@ -147,8 +145,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4
@@ -191,8 +187,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip6"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP6
@@ -237,8 +231,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_gtpu"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_GTPU
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
@@ -273,8 +265,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
if proto == u"ESP":
flow = u"ip4_ipsec_esp"
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ESP
@@ -312,8 +302,6 @@ class FlowUtil:
:returns: flow_index.
:rtype: int
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_l2tpv3oip"
flow_proto = 115 # IP_API_PROTO_L2TP
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_L2TPV3OIP
@@ -347,8 +335,6 @@ class FlowUtil:
:type value: int
:returns: flow_index.
"""
- from vpp_papi import VppEnum
-
flow = u"ip4_vxlan"
flow_type = VppEnum.vl_api_flow_type_t.FLOW_TYPE_IP4_VXLAN
flow_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
@@ -387,8 +373,6 @@ class FlowUtil:
:rtype: int
:raises ValueError: If action type is not supported.
"""
- from vpp_papi import VppEnum
-
cmd = u"flow_add"
if action == u"redirect-to-queue":
diff --git a/resources/libraries/python/IPUtil.py b/resources/libraries/python/IPUtil.py
index 57a738bb46..4a5a413fc8 100644
--- a/resources/libraries/python/IPUtil.py
+++ b/resources/libraries/python/IPUtil.py
@@ -155,8 +155,8 @@ class NetworkIncrement(ObjIncrement):
return f"{self._value.network_address}/{self._prefix_len}"
elif self._format == u"addr":
return f"{self._value.network_address}"
- else:
- raise RuntimeError(f"Unsupported format {self._format}")
+
+ raise RuntimeError(f"Unsupported format {self._format}")
class IPUtil:
diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py
index acae5bd91d..e455dd7a88 100644
--- a/resources/libraries/python/IPsecUtil.py
+++ b/resources/libraries/python/IPsecUtil.py
@@ -787,11 +787,11 @@ class IPsecUtil:
:type action: IPsecUtil.PolicyAction
:type inbound: bool
:type bidirectional: bool
- :raises NotImplemented: When the action is PolicyAction.PROTECT.
+ :raises NotImplementedError: When the action is PolicyAction.PROTECT.
"""
if action == PolicyAction.PROTECT:
- raise NotImplemented('Policy action PROTECT is not supported.')
+ raise NotImplementedError('Policy action PROTECT is not supported.')
spd_id_dir1 = 1
spd_id_dir2 = 2
@@ -1014,7 +1014,7 @@ class IPsecUtil:
tmp_filename = f"/tmp/ipsec_spd_{spd_id}_add_del_entry.script"
with open(tmp_filename, 'w') as tmp_file:
- for i in range(n_entries):
+ for _ in range(n_entries):
direction = u'inbound' if inbound else u'outbound'
sa = f' sa {sa_id.inc_fmt()}' if sa_id is not None else ''
protocol = f' protocol {protocol}' if proto else ''
@@ -1041,7 +1041,7 @@ class IPsecUtil:
os.remove(tmp_filename)
return
- for i in range(n_entries):
+ for _ in range(n_entries):
IPsecUtil.vpp_ipsec_add_spd_entry(
node, spd_id, next(priority), action, inbound,
next(sa_id) if sa_id is not None else sa_id,
@@ -2208,7 +2208,8 @@ class IPsecUtil:
crypto_key, integ_alg, integ_key, tunnel_ip1, tunnel_ip2
)
IPsecUtil.vpp_ipsec_add_spd_entries(
- nodes[u"DUT2"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0),
+ nodes[u"DUT2"], n_tunnels, spd_id,
+ priority=ObjIncrement(p_lo, 0),
action=PolicyAction.PROTECT, inbound=True,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2))
@@ -2219,7 +2220,8 @@ class IPsecUtil:
crypto_key, integ_alg, integ_key, tunnel_ip2, tunnel_ip1
)
IPsecUtil.vpp_ipsec_add_spd_entries(
- nodes[u"DUT2"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0),
+ nodes[u"DUT2"], n_tunnels, spd_id,
+ priority=ObjIncrement(p_lo, 0),
action=PolicyAction.PROTECT, inbound=False,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1))
diff --git a/resources/libraries/python/IncrementUtil.py b/resources/libraries/python/IncrementUtil.py
index 544e5708f0..45260ae06f 100644
--- a/resources/libraries/python/IncrementUtil.py
+++ b/resources/libraries/python/IncrementUtil.py
@@ -14,7 +14,7 @@
"""Increment utilities library."""
-class ObjIncrement(object):
+class ObjIncrement():
"""
An iterator class used to generate incremented values in each iteration
or when inc_fmt is called.
diff --git a/resources/libraries/python/NodePath.py b/resources/libraries/python/NodePath.py
index d1f974aafe..b9b4fc94e6 100644
--- a/resources/libraries/python/NodePath.py
+++ b/resources/libraries/python/NodePath.py
@@ -217,7 +217,8 @@ class NodePath:
raise RuntimeError(u"No path for topology")
return self._path[-2]
- def compute_circular_topology(self, nodes, filter_list=None, nic_pfs=1,
+ def compute_circular_topology(
+ self, nodes, filter_list=None, nic_pfs=1,
always_same_link=False, topo_has_tg=True, topo_has_dut=True):
"""Return computed circular path.