aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/TestConfig.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/TestConfig.py')
-rw-r--r--resources/libraries/python/TestConfig.py313
1 files changed, 167 insertions, 146 deletions
diff --git a/resources/libraries/python/TestConfig.py b/resources/libraries/python/TestConfig.py
index 48f67e5a0f..9f83fbb5fc 100644
--- a/resources/libraries/python/TestConfig.py
+++ b/resources/libraries/python/TestConfig.py
@@ -25,7 +25,7 @@ from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatExecutor
-class TestConfig(object):
+class TestConfig:
"""Contains special test configurations implemented in python for faster
execution."""
@@ -69,17 +69,20 @@ class TestConfig(object):
# configure IPs, create VXLAN interfaces and VLAN sub-interfaces
vxlan_count = TestConfig.vpp_create_vxlan_and_vlan_interfaces(
node, node_vxlan_if, node_vlan_if, n_tunnels, vni_start,
- src_ip_start, dst_ip_start, ip_step)
+ src_ip_start, dst_ip_start, ip_step
+ )
# update topology with VXLAN interfaces and VLAN sub-interfaces data
# and put interfaces up
TestConfig.vpp_put_vxlan_and_vlan_interfaces_up(
- node, vxlan_count, node_vlan_if)
+ node, vxlan_count, node_vlan_if
+ )
# configure bridge domains, ARPs and routes
TestConfig.vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
- ip_step, bd_id_start)
+ ip_step, bd_id_start
+ )
@staticmethod
def vpp_create_vxlan_and_vlan_interfaces(
@@ -110,47 +113,49 @@ class TestConfig(object):
:returns: Number of created VXLAN interfaces.
:rtype: int
"""
- src_ip_addr_start = ip_address(unicode(src_ip_start))
- dst_ip_addr_start = ip_address(unicode(dst_ip_start))
+ src_ip_start = ip_address(src_ip_start)
+ dst_ip_start = ip_address(dst_ip_start)
if vxlan_count > 10:
commands = list()
- tmp_fn = '/tmp/create_vxlan_interfaces.config'
- for i in xrange(0, vxlan_count):
+ for i in range(0, vxlan_count):
try:
- src_ip = src_ip_addr_start + i * ip_step
- dst_ip = dst_ip_addr_start + i * ip_step
+ src_ip = src_ip_start + i * ip_step
+ dst_ip = dst_ip_start + i * ip_step
except AddressValueError:
- logger.warn("Can't do more iterations - IP address limit "
- "has been reached.")
+ logger.warn(
+ u"Can't do more iterations - IP address limit "
+ u"has been reached."
+ )
vxlan_count = i
break
commands.append(
- 'sw_interface_add_del_address sw_if_index {sw_idx} '
- '{ip}/{ip_len}\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, node_vxlan_if),
- ip=src_ip,
- ip_len=128 if src_ip.version == 6 else 32))
+ f"sw_interface_add_del_address sw_if_index "
+ f"{Topology.get_interface_sw_index(node, node_vxlan_if)} "
+ f"{src_ip}/{128 if src_ip.version == 6 else 32}\n"
+ )
commands.append(
- 'vxlan_add_del_tunnel src {srcip} dst {dstip} vni {vni}\n'\
- .format(srcip=src_ip, dstip=dst_ip,
- vni=vni_start + i))
+ f"vxlan_add_del_tunnel src {src_ip} dst {dst_ip} "
+ f"vni {vni_start + i}\n"
+ )
commands.append(
- 'create_vlan_subif sw_if_index {sw_idx} vlan {vlan}\n'\
- .format(sw_idx=Topology.get_interface_sw_index(
- node, node_vlan_if), vlan=i + 1))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"create_vlan_subif sw_if_index "
+ f"{Topology.get_interface_sw_index(node, node_vlan_if)} "
+ f"vlan {i + 1}\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/create_vxlan_interfaces.config", commands
+ )
return vxlan_count
- cmd1 = 'sw_interface_add_del_address'
+ cmd1 = u"sw_interface_add_del_address"
args1 = dict(
sw_if_index=InterfaceUtil.get_interface_index(node, node_vxlan_if),
is_add=True,
del_all=False,
prefix=None
)
- cmd2 = 'vxlan_add_del_tunnel'
+ cmd2 = u"vxlan_add_del_tunnel"
args2 = dict(
is_add=1,
is_ipv6=0,
@@ -162,7 +167,7 @@ class TestConfig(object):
decap_next_index=Constants.BITWISE_NON_ZERO,
vni=None
)
- cmd3 = 'create_vlan_subif'
+ cmd3 = u"create_vlan_subif"
args3 = dict(
sw_if_index=InterfaceUtil.get_interface_index(
node, node_vlan_if),
@@ -170,22 +175,25 @@ class TestConfig(object):
)
with PapiSocketExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
+ for i in range(0, vxlan_count):
try:
- src_ip = src_ip_addr_start + i * ip_step
- dst_ip = dst_ip_addr_start + i * ip_step
+ src_ip = src_ip_start + i * ip_step
+ dst_ip = dst_ip_start + i * ip_step
except AddressValueError:
- logger.warn("Can't do more iterations - IP address limit "
- "has been reached.")
+ logger.warn(
+ u"Can't do more iterations - IP address limit "
+ u"has been reached."
+ )
vxlan_count = i
break
- args1['prefix'] = IPUtil.create_prefix_object(
- src_ip, 128 if src_ip_addr_start.version == 6 else 32)
- args2['src_address'] = getattr(src_ip, 'packed')
- args2['dst_address'] = getattr(dst_ip, 'packed')
- args2['vni'] = int(vni_start) + i
- args3['vlan_id'] = i + 1
- history = False if 1 < i < vxlan_count else True
+ args1[u"prefix"] = IPUtil.create_prefix_object(
+ src_ip, 128 if src_ip_start.version == 6 else 32
+ )
+ args2[u"src_address"] = getattr(src_ip, u"packed")
+ args2[u"dst_address"] = getattr(dst_ip, u"packed")
+ args2[u"vni"] = int(vni_start) + i
+ args3[u"vlan_id"] = i + 1
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd1, history=history, **args1).\
add(cmd2, history=history, **args2).\
add(cmd3, history=history, **args3)
@@ -207,50 +215,51 @@ class TestConfig(object):
:type vxlan_count: int
:type node_vlan_if: str
"""
- if_data = InterfaceUtil.vpp_get_interface_data(node)
- vlan_if_name = Topology.get_interface_name(node, node_vlan_if)
-
if vxlan_count > 10:
- tmp_fn = '/tmp/put_subinterfaces_up.config'
commands = list()
- for i in xrange(0, vxlan_count):
- vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel')
- vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i)
- vxlan_found = False
+ for i in range(0, vxlan_count):
+ vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
+ vxlan_subif_name = f"vxlan_tunnel{i}"
+ founds = dict(vxlan=False, vlan=False)
vxlan_subif_idx = None
- vlan_subif_key = Topology.add_new_port(node, 'vlan_subif')
- vlan_subif_name = '{if_name}.{vlan}'.format(
- if_name=vlan_if_name, vlan=i + 1)
- vlan_found = False
+ vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
+ vlan_subif_name = \
+ f"{Topology.get_interface_name(node, node_vlan_if)}.{i + 1}"
vlan_idx = None
- for data in if_data:
- if_name = data['interface_name']
- if not vxlan_found and if_name == vxlan_subif_name:
- vxlan_subif_idx = data['sw_if_index']
- vxlan_found = True
- elif not vlan_found and if_name == vlan_subif_name:
- vlan_idx = data['sw_if_index']
- vlan_found = True
- if vxlan_found and vlan_found:
+ for data in InterfaceUtil.vpp_get_interface_data(node):
+ if_name = data[u"interface_name"]
+ if not founds[u"vxlan"] and if_name == vxlan_subif_name:
+ vxlan_subif_idx = data[u"sw_if_index"]
+ founds[u"vxlan"] = True
+ elif not founds[u"vlan"] and if_name == vlan_subif_name:
+ vlan_idx = data[u"sw_if_index"]
+ founds[u"vlan"] = True
+ if founds[u"vxlan"] and founds[u"vlan"]:
break
Topology.update_interface_sw_if_index(
node, vxlan_subif_key, vxlan_subif_idx)
Topology.update_interface_name(
node, vxlan_subif_key, vxlan_subif_name)
commands.append(
- 'sw_interface_set_flags sw_if_index {sw_idx} admin-up '
- 'link-up\n'.format(sw_idx=vxlan_subif_idx))
+ f"sw_interface_set_flags sw_if_index {vxlan_subif_idx} "
+ f"admin-up link-up\n"
+ )
Topology.update_interface_sw_if_index(
- node, vlan_subif_key, vlan_idx)
+ node, vlan_subif_key, vlan_idx
+ )
Topology.update_interface_name(
- node, vlan_subif_key, vlan_subif_name)
+ node, vlan_subif_key, vlan_subif_name
+ )
commands.append(
- 'sw_interface_set_flags sw_if_index {sw_idx} admin-up '
- 'link-up\n'.format(sw_idx=vlan_idx))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"sw_interface_set_flags sw_if_index {vlan_idx} admin-up "
+ f"link-up\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/put_subinterfaces_up.config", commands
+ )
return
- cmd = 'sw_interface_set_flags'
+ cmd = u"sw_interface_set_flags"
args1 = dict(
sw_if_index=None,
flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
@@ -261,38 +270,41 @@ class TestConfig(object):
)
with PapiSocketExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- vxlan_subif_key = Topology.add_new_port(node, 'vxlan_tunnel')
- vxlan_subif_name = 'vxlan_tunnel{nr}'.format(nr=i)
- vxlan_found = False
+ for i in range(0, vxlan_count):
+ vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
+ vxlan_subif_name = f"vxlan_tunnel{i}"
+ founds = dict(vxlan=False, vlan=False)
vxlan_subif_idx = None
- vlan_subif_key = Topology.add_new_port(node, 'vlan_subif')
- vlan_subif_name = '{if_name}.{vlan}'.format(
- if_name=vlan_if_name, vlan=i+1)
- vlan_found = False
+ vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
+ vlan_subif_name = \
+ f"{Topology.get_interface_name(node, node_vlan_if)}.{i+1}"
vlan_idx = None
- for data in if_data:
- if not vxlan_found \
- and data['interface_name'] == vxlan_subif_name:
- vxlan_subif_idx = data['sw_if_index']
- vxlan_found = True
- elif not vlan_found \
- and data['interface_name'] == vlan_subif_name:
- vlan_idx = data['sw_if_index']
- vlan_found = True
- if vxlan_found and vlan_found:
+ for data in InterfaceUtil.vpp_get_interface_data(node):
+ if not founds[u"vxlan"] \
+ and data[u"interface_name"] == vxlan_subif_name:
+ vxlan_subif_idx = data[u"sw_if_index"]
+ founds[u"vxlan"] = True
+ elif not founds[u"vlan"] \
+ and data[u"interface_name"] == vlan_subif_name:
+ vlan_idx = data[u"sw_if_index"]
+ founds[u"vlan"] = True
+ if founds[u"vxlan"] and founds[u"vlan"]:
break
Topology.update_interface_sw_if_index(
- node, vxlan_subif_key, vxlan_subif_idx)
+ node, vxlan_subif_key, vxlan_subif_idx
+ )
Topology.update_interface_name(
- node, vxlan_subif_key, vxlan_subif_name)
- args1['sw_if_index'] = vxlan_subif_idx
+ node, vxlan_subif_key, vxlan_subif_name
+ )
+ args1[u"sw_if_index"] = vxlan_subif_idx
Topology.update_interface_sw_if_index(
- node, vlan_subif_key, vlan_idx)
+ node, vlan_subif_key, vlan_idx
+ )
Topology.update_interface_name(
- node, vlan_subif_key, vlan_subif_name)
- args2['sw_if_index'] = vlan_idx
- history = False if 1 < i < vxlan_count else True
+ node, vlan_subif_key, vlan_subif_name
+ )
+ args2[u"sw_if_index"] = vlan_idx
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd, history=history, **args1). \
add(cmd, history=history, **args2)
papi_exec.add(cmd, **args1).add(cmd, **args2)
@@ -325,92 +337,101 @@ class TestConfig(object):
:type ip_step: int
:type bd_id_start: int
"""
- dst_ip_addr_start = ip_address(unicode(dst_ip_start))
+ dst_ip_start = ip_address(dst_ip_start)
if vxlan_count > 1:
sw_idx_vxlan = Topology.get_interface_sw_index(node, node_vxlan_if)
- tmp_fn = '/tmp/configure_routes_and_bridge_domains.config'
commands = list()
- for i in xrange(0, vxlan_count):
- dst_ip = dst_ip_addr_start + i * ip_step
+ for i in range(0, vxlan_count):
+ dst_ip = dst_ip_start + i * ip_step
commands.append(
- 'ip_neighbor_add_del sw_if_index {sw_idx} dst {ip} '
- 'mac {mac}\n'.format(
- sw_idx=sw_idx_vxlan,
- ip=dst_ip,
- mac=Topology.get_interface_mac(op_node, op_node_if)))
+ f"ip_neighbor_add_del sw_if_index {sw_idx_vxlan} "
+ f"dst {dst_ip} "
+ f"mac {Topology.get_interface_mac(op_node, op_node_if)}\n"
+ )
commands.append(
- 'ip_route_add_del {ip}/{ip_len} count 1 via {ip} '
- 'sw_if_index {sw_idx}\n'.format(
- ip=dst_ip,
- ip_len=128 if dst_ip.version == 6 else 32,
- sw_idx=sw_idx_vxlan))
+ f"ip_route_add_del "
+ f"{dst_ip}/{128 if dst_ip.version == 6 else 32} count 1 "
+ f"via {dst_ip} sw_if_index {sw_idx_vxlan}\n"
+ )
+ sw_idx_vxlan = Topology.get_interface_sw_index(
+ node, f"vxlan_tunnel{i + 1}"
+ )
commands.append(
- 'sw_interface_set_l2_bridge sw_if_index {sw_idx} '
- 'bd_id {bd_id} shg 0 enable\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, 'vxlan_tunnel{nr}'.format(nr=i + 1)),
- bd_id=bd_id_start + i))
+ f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vxlan} "
+ f"bd_id {bd_id_start + i} shg 0 enable\n"
+ )
+ sw_idx_vlan = Topology.get_interface_sw_index(
+ node, f"vlan_subif{i + 1}"
+ )
commands.append(
- 'sw_interface_set_l2_bridge sw_if_index {sw_idx} '
- 'bd_id {bd_id} shg 0 enable\n'.format(
- sw_idx=Topology.get_interface_sw_index(
- node, 'vlan_subif{nr}'.format(nr=i + 1)),
- bd_id=bd_id_start + i))
- VatExecutor().write_and_execute_script(node, tmp_fn, commands)
+ f"sw_interface_set_l2_bridge sw_if_index {sw_idx_vlan} "
+ f"bd_id {bd_id_start + i} shg 0 enable\n"
+ )
+ VatExecutor().write_and_execute_script(
+ node, u"/tmp/configure_routes_and_bridge_domains.config",
+ commands
+ )
return
- cmd1 = 'ip_neighbor_add_del'
+ cmd1 = u"ip_neighbor_add_del"
neighbor = dict(
sw_if_index=Topology.get_interface_sw_index(node, node_vxlan_if),
flags=0,
mac_address=Topology.get_interface_mac(op_node, op_node_if),
- ip_address='')
+ ip_address=u""
+ )
args1 = dict(
is_add=1,
- neighbor=neighbor)
- cmd2 = 'ip_route_add_del'
+ neighbor=neighbor
+ )
+ cmd2 = u"ip_route_add_del"
kwargs = dict(
interface=node_vxlan_if,
- gateway=str(dst_ip_addr_start))
+ gateway=str(dst_ip_start)
+ )
route = IPUtil.compose_vpp_route_structure(
- node,
- str(dst_ip_addr_start),
- 128 if dst_ip_addr_start.version == 6 else 32,
- **kwargs)
+ node, str(dst_ip_start),
+ 128 if dst_ip_start.version == 6 else 32, **kwargs
+ )
args2 = dict(
is_add=1,
is_multipath=0,
- route=route)
- cmd3 = 'sw_interface_set_l2_bridge'
+ route=route
+ )
+ cmd3 = u"sw_interface_set_l2_bridge"
args3 = dict(
rx_sw_if_index=None,
bd_id=None,
shg=0,
port_type=0,
- enable=1)
+ enable=1
+ )
args4 = dict(
rx_sw_if_index=None,
bd_id=None,
shg=0,
port_type=0,
- enable=1)
+ enable=1
+ )
with PapiSocketExecutor(node) as papi_exec:
- for i in xrange(0, vxlan_count):
- dst_ip = dst_ip_addr_start + i * ip_step
- args1['neighbor']['ip_address'] = str(dst_ip)
- args2['route']['prefix']['address']['un'] = \
- IPUtil.union_addr(dst_ip)
- args2['route']['paths'][0]['nh']['address'] = \
- IPUtil.union_addr(dst_ip)
- args3['rx_sw_if_index'] = Topology.get_interface_sw_index(
- node, 'vxlan_tunnel{nr}'.format(nr=i+1))
- args3['bd_id'] = int(bd_id_start+i)
- args4['rx_sw_if_index'] = Topology.get_interface_sw_index(
- node, 'vlan_subif{nr}'.format(nr=i+1))
- args4['bd_id'] = int(bd_id_start+i)
- history = False if 1 < i < vxlan_count else True
+ for i in range(0, vxlan_count):
+ args1[u"neighbor"][u"ip_address"] = \
+ str(dst_ip_start + i * ip_step)
+ args2[u"route"][u"prefix"][u"address"][u"un"] = \
+ IPUtil.union_addr(dst_ip_start + i * ip_step)
+ args2[u"route"][u"paths"][0][u"nh"][u"address"] = \
+ IPUtil.union_addr(dst_ip_start + i * ip_step)
+ args3[u"rx_sw_if_index"] = Topology.get_interface_sw_index(
+ node, f"vxlan_tunnel{i+1}"
+ )
+ args3[u"bd_id"] = int(bd_id_start+i)
+ args4[u"rx_sw_if_index"] = Topology.get_interface_sw_index(
+ node, f"vlan_subif{i+1}"
+ )
+ args4[u"bd_id"] = int(bd_id_start+i)
+ history = bool(not 1 < i < vxlan_count - 1)
papi_exec.add(cmd1, history=history, **args1). \
add(cmd2, history=history, **args2). \
add(cmd3, history=history, **args3). \