aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/L2Util.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/L2Util.py')
-rw-r--r--resources/libraries/python/L2Util.py293
1 files changed, 155 insertions, 138 deletions
diff --git a/resources/libraries/python/L2Util.py b/resources/libraries/python/L2Util.py
index 4ca0c47308..a49c556653 100644
--- a/resources/libraries/python/L2Util.py
+++ b/resources/libraries/python/L2Util.py
@@ -13,9 +13,6 @@
"""L2 Utilities Library."""
-import binascii
-from textwrap import wrap
-
from enum import IntEnum
from resources.libraries.python.Constants import Constants
@@ -37,7 +34,7 @@ class L2VtrOp(IntEnum):
L2_VTR_TRANSLATE_2_2 = 8
-class L2Util(object):
+class L2Util:
"""Utilities for l2 configuration."""
@staticmethod
@@ -50,7 +47,7 @@ class L2Util(object):
:returns: Integer representation of MAC address.
:rtype: int
"""
- return int(mac_str.replace(':', ''), 16)
+ return int(mac_str.replace(u":", u""), 16)
@staticmethod
def int_to_mac(mac_int):
@@ -62,7 +59,9 @@ class L2Util(object):
:returns: String representation of MAC address.
:rtype: str
"""
- return ':'.join(wrap("{:012x}".format(mac_int), width=2))
+ return u":".join(
+ f"{hex(mac_int)[2:]:0>12}"[i:i+2] for i in range(0, 12, 2)
+ )
@staticmethod
def mac_to_bin(mac_str):
@@ -72,9 +71,9 @@ class L2Util(object):
:param mac_str: MAC address in string representation.
:type mac_str: str
:returns: Binary representation of MAC address.
- :rtype: binary
+ :rtype: bytes
"""
- return binascii.unhexlify(mac_str.replace(':', ''))
+ return bytes.fromhex(mac_str.replace(u":", u""))
@staticmethod
def bin_to_mac(mac_bin):
@@ -82,17 +81,15 @@ class L2Util(object):
(\x01\x02\x03\x04\x05\x06) to string format (e.g. 01:02:03:04:05:06).
:param mac_bin: MAC address in binary representation.
- :type mac_bin: binary
+ :type mac_bin: bytes
:returns: String representation of MAC address.
:rtype: str
"""
- mac_str = ':'.join(binascii.hexlify(mac_bin)[i:i + 2]
- for i in range(0, 12, 2))
- return str(mac_str.decode('ascii'))
+ return u":".join(mac_bin.hex()[i:i + 2] for i in range(0, 12, 2))
@staticmethod
- def vpp_add_l2fib_entry(node, mac, interface, bd_id, static_mac=1,
- filter_mac=0, bvi_mac=0):
+ def vpp_add_l2fib_entry(
+ node, mac, interface, bd_id, static_mac=1, filter_mac=0, bvi_mac=0):
""" Create a static L2FIB entry on a VPP node.
:param node: Node to add L2FIB entry on.
@@ -113,28 +110,29 @@ class L2Util(object):
:type filter_mac: int or str
:type bvi_mac: int or str
"""
-
- if isinstance(interface, basestring):
+ if isinstance(interface, str):
sw_if_index = Topology.get_interface_sw_index(node, interface)
else:
sw_if_index = interface
- cmd = 'l2fib_add_del'
- err_msg = 'Failed to add L2FIB entry on host {host}'.format(
- host=node['host'])
- args = dict(mac=L2Util.mac_to_bin(mac),
- bd_id=int(bd_id),
- sw_if_index=sw_if_index,
- is_add=1,
- static_mac=int(static_mac),
- filter_mac=int(filter_mac),
- bvi_mac=int(bvi_mac))
+ cmd = u"l2fib_add_del"
+ err_msg = f"Failed to add L2FIB entry on host {node[u'host']}"
+ args = dict(
+ mac=L2Util.mac_to_bin(mac),
+ bd_id=int(bd_id),
+ sw_if_index=sw_if_index,
+ is_add=1,
+ static_mac=int(static_mac),
+ filter_mac=int(filter_mac),
+ bvi_mac=int(bvi_mac)
+
+ )
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@staticmethod
- def create_l2_bd(node, bd_id, flood=1, uu_flood=1, forward=1, learn=1,
- arp_term=0):
+ def create_l2_bd(
+ node, bd_id, flood=1, uu_flood=1, forward=1, learn=1, arp_term=0):
"""Create an L2 bridge domain on a VPP node.
:param node: Node where we wish to crate the L2 bridge domain.
@@ -157,17 +155,17 @@ class L2Util(object):
:type learn: int or str
:type arp_term: int or str
"""
-
- cmd = 'bridge_domain_add_del'
- err_msg = 'Failed to create L2 bridge domain on host {host}'.format(
- host=node['host'])
- args = dict(bd_id=int(bd_id),
- flood=int(flood),
- uu_flood=int(uu_flood),
- forward=int(forward),
- learn=int(learn),
- arp_term=int(arp_term),
- is_add=1)
+ cmd = u"bridge_domain_add_del"
+ err_msg = f"Failed to create L2 bridge domain on host {node[u'host']}"
+ args = dict(
+ bd_id=int(bd_id),
+ flood=int(flood),
+ uu_flood=int(uu_flood),
+ forward=int(forward),
+ learn=int(learn),
+ arp_term=int(arp_term),
+ is_add=1
+ )
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@@ -189,17 +187,19 @@ class L2Util(object):
:type shg: int or str
:type port_type: int or str
"""
-
sw_if_index = Topology.get_interface_sw_index(node, interface)
- cmd = 'sw_interface_set_l2_bridge'
- err_msg = 'Failed to add interface {ifc} to L2 bridge domain on host ' \
- '{host}'.format(ifc=interface, host=node['host'])
- args = dict(rx_sw_if_index=sw_if_index,
- bd_id=int(bd_id),
- shg=int(shg),
- port_type=int(port_type),
- enable=1)
+ cmd = u"sw_interface_set_l2_bridge"
+ err_msg = f"Failed to add interface {interface} to L2 bridge domain " \
+ f"on host {node[u'host']}"
+ args = dict(
+ rx_sw_if_index=sw_if_index,
+ bd_id=int(bd_id),
+ shg=int(shg),
+ port_type=int(port_type),
+ enable=1
+ )
+
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@@ -218,35 +218,40 @@ class L2Util(object):
:type port_2: str
:type learn: bool
"""
-
sw_if_index1 = Topology.get_interface_sw_index(node, port_1)
sw_if_index2 = Topology.get_interface_sw_index(node, port_2)
learn_int = 1 if learn else 0
- cmd1 = 'bridge_domain_add_del'
- args1 = dict(bd_id=int(bd_id),
- flood=1,
- uu_flood=1,
- forward=1,
- learn=learn_int,
- arp_term=0,
- is_add=1)
-
- cmd2 = 'sw_interface_set_l2_bridge'
- args2 = dict(rx_sw_if_index=sw_if_index1,
- bd_id=int(bd_id),
- shg=0,
- port_type=0,
- enable=1)
-
- args3 = dict(rx_sw_if_index=sw_if_index2,
- bd_id=int(bd_id),
- shg=0,
- port_type=0,
- enable=1)
-
- err_msg = 'Failed to add L2 bridge domain with 2 interfaces on host' \
- ' {host}'.format(host=node['host'])
+ cmd1 = u"bridge_domain_add_del"
+ args1 = dict(
+ bd_id=int(bd_id),
+ flood=1,
+ uu_flood=1,
+ forward=1,
+ learn=learn_int,
+ arp_term=0,
+ is_add=1
+ )
+
+ cmd2 = u"sw_interface_set_l2_bridge"
+ args2 = dict(
+ rx_sw_if_index=sw_if_index1,
+ bd_id=int(bd_id),
+ shg=0,
+ port_type=0,
+ enable=1
+ )
+
+ args3 = dict(
+ rx_sw_if_index=sw_if_index2,
+ bd_id=int(bd_id),
+ shg=0,
+ port_type=0,
+ enable=1
+ )
+
+ err_msg = f"Failed to add L2 bridge domain with 2 interfaces " \
+ f"on host {node[u'host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd1, **args1).add(cmd2, **args2).add(cmd2, **args3)
@@ -263,27 +268,29 @@ class L2Util(object):
:type interface1: str or int
:type interface2: str or int
"""
-
- if isinstance(interface1, basestring):
+ if isinstance(interface1, str):
sw_iface1 = Topology().get_interface_sw_index(node, interface1)
else:
sw_iface1 = interface1
- if isinstance(interface2, basestring):
+ if isinstance(interface2, str):
sw_iface2 = Topology().get_interface_sw_index(node, interface2)
else:
sw_iface2 = interface2
- cmd = 'sw_interface_set_l2_xconnect'
- args1 = dict(rx_sw_if_index=sw_iface1,
- tx_sw_if_index=sw_iface2,
- enable=1)
- args2 = dict(rx_sw_if_index=sw_iface2,
- tx_sw_if_index=sw_iface1,
- enable=1)
-
- err_msg = 'Failed to add L2 cross-connect between two interfaces on' \
- ' host {host}'.format(host=node['host'])
+ cmd = u"sw_interface_set_l2_xconnect"
+ args1 = dict(
+ rx_sw_if_index=sw_iface1,
+ tx_sw_if_index=sw_iface2,
+ enable=1
+ )
+ args2 = dict(
+ rx_sw_if_index=sw_iface2,
+ tx_sw_if_index=sw_iface1,
+ enable=1
+ )
+ err_msg = f"Failed to add L2 cross-connect between two interfaces " \
+ f"on host {node['host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args1).add(cmd, **args2).get_replies(err_msg)
@@ -299,27 +306,29 @@ class L2Util(object):
:type interface1: str or int
:type interface2: str or int
"""
-
- if isinstance(interface1, basestring):
+ if isinstance(interface1, str):
sw_iface1 = Topology().get_interface_sw_index(node, interface1)
else:
sw_iface1 = interface1
- if isinstance(interface2, basestring):
+ if isinstance(interface2, str):
sw_iface2 = Topology().get_interface_sw_index(node, interface2)
else:
sw_iface2 = interface2
- cmd = 'l2_patch_add_del'
- args1 = dict(rx_sw_if_index=sw_iface1,
- tx_sw_if_index=sw_iface2,
- is_add=1)
- args2 = dict(rx_sw_if_index=sw_iface2,
- tx_sw_if_index=sw_iface1,
- is_add=1)
-
- err_msg = 'Failed to add L2 patch between two interfaces on' \
- ' host {host}'.format(host=node['host'])
+ cmd = u"l2_patch_add_del"
+ args1 = dict(
+ rx_sw_if_index=sw_iface1,
+ tx_sw_if_index=sw_iface2,
+ is_add=1
+ )
+ args2 = dict(
+ rx_sw_if_index=sw_iface2,
+ tx_sw_if_index=sw_iface1,
+ is_add=1
+ )
+ err_msg = f"Failed to add L2 patch between two interfaces " \
+ f"on host {node['host']}"
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args1).add(cmd, **args2).get_replies(err_msg)
@@ -340,15 +349,17 @@ class L2Util(object):
:type if_2: str
:type set_up: bool
"""
-
- cmd = 'brctl addbr {0}'.format(br_name)
+ cmd = f"brctl addbr {br_name}"
exec_cmd_no_error(node, cmd, sudo=True)
- cmd = 'brctl addif {0} {1}'.format(br_name, if_1)
+
+ cmd = f"brctl addif {br_name} {if_1}"
exec_cmd_no_error(node, cmd, sudo=True)
- cmd = 'brctl addif {0} {1}'.format(br_name, if_2)
+
+ cmd = f"brctl addif {br_name} {if_2}"
exec_cmd_no_error(node, cmd, sudo=True)
+
if set_up:
- cmd = 'ip link set dev {0} up'.format(br_name)
+ cmd = f"ip link set dev {br_name} up"
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
@@ -366,15 +377,15 @@ class L2Util(object):
:type br_name: str
:type set_down: bool
"""
-
if set_down:
- cmd = 'ip link set dev {0} down'.format(br_name)
+ cmd = f"ip link set dev {br_name} down"
exec_cmd_no_error(node, cmd, sudo=True)
- cmd = 'brctl delbr {0}'.format(br_name)
+
+ cmd = f"brctl delbr {br_name}"
exec_cmd_no_error(node, cmd, sudo=True)
@staticmethod
- def vpp_get_bridge_domain_data(node, bd_id=0xffffffff):
+ def vpp_get_bridge_domain_data(node, bd_id=Constants.BITWISE_NON_ZERO):
"""Get all bridge domain data from a VPP node. If a domain ID number is
provided, return only data for the matching bridge domain.
@@ -386,23 +397,27 @@ class L2Util(object):
or a single dictionary for the specified bridge domain.
:rtype: list or dict
"""
+ cmd = u"bridge_domain_dump"
+ args = dict(
+ bd_id=int(bd_id)
+ )
+ err_msg = f"Failed to get L2FIB dump on host {node[u'host']}"
- cmd = 'bridge_domain_dump'
- args = dict(bd_id=int(bd_id))
- err_msg = 'Failed to get L2FIB dump on host {host}'.format(
- host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
- if bd_id == Constants.BITWISE_NON_ZERO:
- return details
+ retval = details if bd_id == Constants.BITWISE_NON_ZERO else None
+
for bridge_domain in details:
- if bridge_domain['bd_id'] == bd_id:
- return bridge_domain
+ if bridge_domain[u"bd_id"] == bd_id:
+ retval = bridge_domain
+
+ return retval
@staticmethod
- def l2_vlan_tag_rewrite(node, interface, tag_rewrite_method,
- push_dot1q=True, tag1_id=None, tag2_id=None):
+ def l2_vlan_tag_rewrite(
+ node, interface, tag_rewrite_method, push_dot1q=True, tag1_id=None,
+ tag2_id=None):
"""Rewrite tags in ethernet frame.
:param node: Node to rewrite tags.
@@ -419,27 +434,29 @@ class L2Util(object):
:type tag1_id: int
:type tag2_id: int
"""
-
tag1_id = int(tag1_id) if tag1_id else 0
tag2_id = int(tag2_id) if tag2_id else 0
- vtr_oper = getattr(L2VtrOp, 'L2_VTR_{}'.format(
- tag_rewrite_method.replace('-', '_').upper()))
+ vtr_oper = getattr(
+ L2VtrOp, f"L2_VTR_{tag_rewrite_method.replace(u'-', u'_').upper()}"
+ )
- if isinstance(interface, basestring):
+ if isinstance(interface, str):
iface_key = Topology.get_interface_by_name(node, interface)
sw_if_index = Topology.get_interface_sw_index(node, iface_key)
else:
sw_if_index = interface
- cmd = 'l2_interface_vlan_tag_rewrite'
- args = dict(sw_if_index=sw_if_index,
- vtr_op=int(vtr_oper),
- push_dot1q=int(push_dot1q),
- tag1=tag1_id,
- tag2=tag2_id)
- err_msg = 'Failed to set VLAN TAG rewrite on host {host}'.format(
- host=node['host'])
+ cmd = u"l2_interface_vlan_tag_rewrite"
+ args = dict(
+ sw_if_index=sw_if_index,
+ vtr_op=int(vtr_oper),
+ push_dot1q=int(push_dot1q),
+ tag1=tag1_id,
+ tag2=tag2_id
+ )
+ err_msg = f"Failed to set VLAN TAG rewrite on host {node['host']}"
+
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@@ -454,16 +471,17 @@ class L2Util(object):
:returns: L2 FIB table.
:rtype: list
"""
+ cmd = u"l2_fib_table_dump"
+ args = dict(
+ bd_id=int(bd_id)
+ )
+ err_msg = f"Failed to get L2FIB dump on host {node['host']}"
- cmd = 'l2_fib_table_dump'
- args = dict(bd_id=int(bd_id))
- err_msg = 'Failed to get L2FIB dump on host {host}'.format(
- host=node['host'])
with PapiSocketExecutor(node) as papi_exec:
details = papi_exec.add(cmd, **args).get_details(err_msg)
for fib_item in details:
- fib_item['mac'] = L2Util.bin_to_mac(fib_item['mac'])
+ fib_item[u"mac"] = L2Util.bin_to_mac(fib_item[u"mac"])
return details
@@ -480,13 +498,12 @@ class L2Util(object):
:returns: L2 FIB entry
:rtype: dict
"""
-
bd_data = L2Util.vpp_get_bridge_domain_data(node)
- bd_id = bd_data[bd_index-1]['bd_id']
+ bd_id = bd_data[bd_index-1][u"bd_id"]
table = L2Util.get_l2_fib_table(node, bd_id)
for entry in table:
- if entry['mac'] == mac:
+ if entry[u"mac"] == mac:
return entry
return {}