aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/Classify.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/Classify.py')
-rw-r--r--resources/libraries/python/Classify.py103
1 files changed, 75 insertions, 28 deletions
diff --git a/resources/libraries/python/Classify.py b/resources/libraries/python/Classify.py
index 1938688900..2b9ab3d11c 100644
--- a/resources/libraries/python/Classify.py
+++ b/resources/libraries/python/Classify.py
@@ -16,7 +16,7 @@
import binascii
import re
-from socket import AF_INET, AF_INET6, inet_aton, inet_pton
+from ipaddress import ip_address
from robot.api import logger
@@ -40,9 +40,16 @@ class Classify(object):
:returns MAC ACL mask in hexstring format.
:rtype: str
"""
+ if ether_type:
+ end = 28
+ elif src_mac:
+ end = 24
+ else:
+ end = 12
+
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac.replace(':', ''), src_mac.replace(':', ''), ether_type)).\
- rstrip('0')
+ dst_mac.replace(':', ''), src_mac.replace(':', ''),
+ ether_type))[0:end]
@staticmethod
def _build_ip_mask(proto='', src_ip='', dst_ip='', src_port='',
@@ -62,8 +69,19 @@ class Classify(object):
:returns: IP mask in hexstring format.
:rtype: str
"""
+ if dst_port:
+ end = 48
+ elif src_port:
+ end = 44
+ elif dst_ip:
+ end = 40
+ elif src_ip:
+ end = 32
+ else:
+ end = 20
+
return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
- proto, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ proto, src_ip, dst_ip, src_port, dst_port))[0:end]
@staticmethod
def _build_ip6_mask(next_hdr='', src_ip='', dst_ip='', src_port='',
@@ -83,8 +101,19 @@ class Classify(object):
:returns: IPv6 ACL mask in hexstring format.
:rtype: str
"""
+ if dst_port:
+ end = 88
+ elif src_port:
+ end = 84
+ elif dst_ip:
+ end = 80
+ elif src_ip:
+ end = 48
+ else:
+ end = 14
+
return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
- next_hdr, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
+ next_hdr, src_ip, dst_ip, src_port, dst_port))[0:end]
@staticmethod
def _build_mac_match(dst_mac='', src_mac='', ether_type=''):
@@ -99,13 +128,16 @@ class Classify(object):
:returns: MAC ACL match data in hexstring format.
:rtype: str
"""
- if dst_mac:
- dst_mac = dst_mac.replace(':', '')
- if src_mac:
- src_mac = src_mac.replace(':', '')
+ if ether_type:
+ end = 28
+ elif src_mac:
+ end = 24
+ else:
+ end = 12
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
- dst_mac, src_mac, ether_type)).rstrip('0')
+ dst_mac.replace(':', ''), src_mac.replace(':', ''),
+ ether_type))[0:end]
@staticmethod
def _build_ip_match(proto=0, src_ip='', dst_ip='', src_port=0, dst_port=0):
@@ -125,13 +157,23 @@ class Classify(object):
:rtype: str
"""
if src_ip:
- src_ip = binascii.hexlify(inet_aton(src_ip))
+ src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
if dst_ip:
- dst_ip = binascii.hexlify(inet_aton(dst_ip))
+ dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
+ if dst_port:
+ end = 48
+ elif src_port:
+ end = 44
+ elif dst_ip:
+ end = 40
+ elif src_ip:
+ end = 32
+ else:
+ end = 20
return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ hex(dst_port)[2:]))[0:end]
@staticmethod
def _build_ip6_match(next_hdr=0, src_ip='', dst_ip='', src_port=0,
@@ -139,9 +181,9 @@ class Classify(object):
"""Build IPv6 ACL match data in hexstring format.
:param next_hdr: Next header number with valid option "x".
- :param src_ip: Source ip6 address with format of "xxx:xxxx::xxxx".
+ :param src_ip: Source ip6 address with format of "xxxx:xxxx::xxxx".
:param dst_ip: Destination ip6 address with format of
- "xxx:xxxx::xxxx".
+ "xxxx:xxxx::xxxx".
:param src_port: Source port number "x".
:param dst_port: Destination port number "x".
:type next_hdr: int
@@ -153,13 +195,23 @@ class Classify(object):
:rtype: str
"""
if src_ip:
- src_ip = binascii.hexlify(inet_pton(AF_INET6, src_ip))
+ src_ip = binascii.hexlify(ip_address(unicode(src_ip)).packed)
if dst_ip:
- dst_ip = binascii.hexlify(inet_pton(AF_INET6, dst_ip))
+ dst_ip = binascii.hexlify(ip_address(unicode(dst_ip)).packed)
+ if dst_port:
+ end = 88
+ elif src_port:
+ end = 84
+ elif dst_ip:
+ end = 80
+ elif src_ip:
+ end = 48
+ else:
+ end = 14
return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
hex(next_hdr)[2:], src_ip, dst_ip, hex(src_port)[2:],
- hex(dst_port)[2:])).rstrip('0')
+ hex(dst_port)[2:]))[0:end]
@staticmethod
def _classify_add_del_table(node, is_add, mask, match_n_vectors=1,
@@ -415,10 +467,8 @@ class Classify(object):
ip4=Classify._build_ip_mask,
ip6=Classify._build_ip6_mask
)
- if ip_version == "ip4":
- ip_addr = binascii.hexlify(inet_aton(ip_addr))
- elif ip_version == "ip6":
- ip_addr = binascii.hexlify(inet_pton(AF_INET6, ip_addr))
+ if ip_version == "ip4" or ip_version == "ip6":
+ ip_addr = binascii.hexlify(ip_address(unicode(ip_addr)).packed)
else:
raise ValueError("IP version {ver} is not supported.".
format(ver=ip_version))
@@ -785,15 +835,13 @@ class Classify(object):
groups = re.search(reg_ex_src_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["src_ip_prefix_len"] = int(grp[1])
groups = re.search(reg_ex_dst_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["dst_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["dst_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["dst_ip_prefix_len"] = int(grp[1])
groups = re.search(reg_ex_sport, rule)
@@ -846,8 +894,7 @@ class Classify(object):
groups = re.search(reg_ex_ip, rule)
if groups:
grp = groups.group(1).split(' ')[1].split('/')
- acl_rule["src_ip_addr"] = str(inet_pton(
- AF_INET6 if acl_rule["is_ipv6"] else AF_INET, grp[0]))
+ acl_rule["src_ip_addr"] = ip_address(unicode(grp[0])).packed
acl_rule["src_ip_prefix_len"] = int(grp[1])
acl_rules.append(acl_rule)