diff options
author | pmikus <pmikus@cisco.com> | 2021-06-10 08:02:29 +0000 |
---|---|---|
committer | pmikus <pmikus@cisco.com> | 2021-06-10 11:05:48 +0000 |
commit | 7829fea4a2c8936513fa95215b7d84997f814a69 (patch) | |
tree | a75d02efa5a270fbe31a845bcc3fc66de76bb827 /GPL/traffic_scripts | |
parent | 8b25b4e89bdba964f2a3d602b8c47f551a084724 (diff) |
FIX: Pylint reduce
Signed-off-by: pmikus <pmikus@cisco.com>
Change-Id: I909942dbb920df7f0fe15c0c92cda92c3cd8d8ad
Diffstat (limited to 'GPL/traffic_scripts')
-rw-r--r-- | GPL/traffic_scripts/PacketVerifier.py | 1 | ||||
-rw-r--r-- | GPL/traffic_scripts/geneve_tunnel.py | 6 | ||||
-rw-r--r-- | GPL/traffic_scripts/ipsec_interface.py | 12 | ||||
-rw-r--r-- | GPL/traffic_scripts/ipsec_policy.py | 6 | ||||
-rw-r--r-- | GPL/traffic_scripts/lisp/lisp_check.py | 27 | ||||
-rw-r--r-- | GPL/traffic_scripts/lisp/lispgpe_check.py | 24 | ||||
-rw-r--r-- | GPL/traffic_scripts/nat.py | 24 | ||||
-rw-r--r-- | GPL/traffic_scripts/policer.py | 7 | ||||
-rw-r--r-- | GPL/traffic_scripts/send_icmp_wait_for_reply.py | 34 | ||||
-rw-r--r-- | GPL/traffic_scripts/send_ip_check_headers.py | 26 | ||||
-rw-r--r-- | GPL/traffic_scripts/srv6_encap.py | 38 | ||||
-rw-r--r-- | GPL/traffic_scripts/vxlan.py | 4 |
12 files changed, 139 insertions, 70 deletions
diff --git a/GPL/traffic_scripts/PacketVerifier.py b/GPL/traffic_scripts/PacketVerifier.py index 89b8c3c206..974906e439 100644 --- a/GPL/traffic_scripts/PacketVerifier.py +++ b/GPL/traffic_scripts/PacketVerifier.py @@ -76,6 +76,7 @@ import os import select +import time from scapy.all import ETH_P_IP, ETH_P_IPV6, ETH_P_ALL, ETH_P_ARP from scapy.config import conf diff --git a/GPL/traffic_scripts/geneve_tunnel.py b/GPL/traffic_scripts/geneve_tunnel.py index 1270aa6663..19dae6d5c9 100644 --- a/GPL/traffic_scripts/geneve_tunnel.py +++ b/GPL/traffic_scripts/geneve_tunnel.py @@ -284,9 +284,9 @@ def main(): # read another packet in the queue if the current one is # ICMPv6MLReport2 continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break check_geneve( rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, geneve_tunnel_mac, diff --git a/GPL/traffic_scripts/ipsec_interface.py b/GPL/traffic_scripts/ipsec_interface.py index ee157260fa..d49e8bc57e 100644 --- a/GPL/traffic_scripts/ipsec_interface.py +++ b/GPL/traffic_scripts/ipsec_interface.py @@ -243,9 +243,9 @@ def main(): if rx_pkt_recv.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break check_ipsec( rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip, @@ -275,9 +275,9 @@ def main(): # read another packet in the queue if the current one is # ICMPv6MLReport2 continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip) diff --git a/GPL/traffic_scripts/ipsec_policy.py b/GPL/traffic_scripts/ipsec_policy.py index 4acf0a144b..b0d4545114 100644 --- a/GPL/traffic_scripts/ipsec_policy.py +++ b/GPL/traffic_scripts/ipsec_policy.py @@ -209,9 +209,9 @@ def main(): # read another packet in the queue if the current one is # ICMPv6MLReport2 continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip) diff --git a/GPL/traffic_scripts/lisp/lisp_check.py b/GPL/traffic_scripts/lisp/lisp_check.py index 88d4ad5619..f7b4c6a797 100644 --- a/GPL/traffic_scripts/lisp/lisp_check.py +++ b/GPL/traffic_scripts/lisp/lisp_check.py @@ -66,17 +66,33 @@ class LispInnerIPv6(IPv6): name = u"Lisp Inner Layer - IPv6" -def valid_ipv4(ip): +def valid_ipv4(ip_address): + """Check IPv4 address. + + :param ip_address: IPv4 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv4Address(ip) + ipaddress.IPv4Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False -def valid_ipv6(ip): +def valid_ipv6(ip_address): + """Check IPv6 address. + + :param ip_address: IPv6 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv6Address(ip) + ipaddress.IPv6Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -85,7 +101,8 @@ def valid_ipv6(ip): def main(): """Send IP ICMP packet from one traffic generator interface to the other. - :raises RuntimeError: If the received packet is not correct.""" + :raises RuntimeError: If the received packet is not correct. + """ args = TrafficScriptArg( [ diff --git a/GPL/traffic_scripts/lisp/lispgpe_check.py b/GPL/traffic_scripts/lisp/lispgpe_check.py index 79d2ccfab6..f8fa595867 100644 --- a/GPL/traffic_scripts/lisp/lispgpe_check.py +++ b/GPL/traffic_scripts/lisp/lispgpe_check.py @@ -92,17 +92,33 @@ class LispGPEInnerNSH(Packet): """ -def valid_ipv4(ip): +def valid_ipv4(ip_address): + """Check IPv4 address. + + :param ip_address: IPv4 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv4Address(ip) + ipaddress.IPv4Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False -def valid_ipv6(ip): +def valid_ipv6(ip_address): + """Check IPv6 address. + + :param ip_address: IPv6 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv6Address(ip) + ipaddress.IPv6Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False diff --git a/GPL/traffic_scripts/nat.py b/GPL/traffic_scripts/nat.py index 319d617f7c..43715fb383 100644 --- a/GPL/traffic_scripts/nat.py +++ b/GPL/traffic_scripts/nat.py @@ -38,17 +38,33 @@ from .PacketVerifier import RxQueue, TxQueue from .TrafficScriptArg import TrafficScriptArg -def valid_ipv4(ip): +def valid_ipv4(ip_address): + """Check IPv4 address. + + :param ip_address: IPv4 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv4Address(ip) + ipaddress.IPv4Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False -def valid_ipv6(ip): +def valid_ipv6(ip_address): + """Check IPv6 address. + + :param ip_address: IPv6 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv6Address(ip) + ipaddress.IPv6Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False diff --git a/GPL/traffic_scripts/policer.py b/GPL/traffic_scripts/policer.py index ef78f279ad..46131da2a0 100644 --- a/GPL/traffic_scripts/policer.py +++ b/GPL/traffic_scripts/policer.py @@ -26,7 +26,6 @@ """Traffic script for IPsec verification.""" import sys -import logging from ipaddress import ip_address from scapy.layers.l2 import Ether @@ -117,9 +116,9 @@ def main(): # read another packet in the queue if the current one is # ICMPv6MLReport2 continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break if pkt_recv is None: raise RuntimeError(u"Rx timeout") diff --git a/GPL/traffic_scripts/send_icmp_wait_for_reply.py b/GPL/traffic_scripts/send_icmp_wait_for_reply.py index 966fa60a9c..fefa60f44e 100644 --- a/GPL/traffic_scripts/send_icmp_wait_for_reply.py +++ b/GPL/traffic_scripts/send_icmp_wait_for_reply.py @@ -38,33 +38,33 @@ from .PacketVerifier import RxQueue, TxQueue from .TrafficScriptArg import TrafficScriptArg -def valid_ipv4(ip): - """Check if IP address has the correct IPv4 address format. +def valid_ipv4(ip_address): + """Check IPv4 address. - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv4 address format, - otherwise return False. + :param ip_address: IPv4 address to check. + :type ip_address: str + :returns: True if IP address is correct. :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. """ try: - ipaddress.IPv4Address(ip) + ipaddress.IPv4Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False -def valid_ipv6(ip): - """Check if IP address has the correct IPv6 address format. +def valid_ipv6(ip_address): + """Check IPv6 address. - :param ip: IP address. - :type ip: str - :return: True in case of correct IPv6 address format, - otherwise return False. + :param ip_address: IPv6 address to check. + :type ip_address: str + :returns: True if IP address is correct. :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. """ try: - ipaddress.IPv6Address(ip) + ipaddress.IPv6Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -105,9 +105,9 @@ def main(): raise ValueError(u"IP not in correct format") icmp_request = ( - Ether(src=src_mac, dst=dst_mac) / - ip_layer(src=src_ip, dst=dst_ip) / - icmp_req() + Ether(src=src_mac, dst=dst_mac) / + ip_layer(src=src_ip, dst=dst_ip) / + icmp_req() ) # Send created packet on the interface diff --git a/GPL/traffic_scripts/send_ip_check_headers.py b/GPL/traffic_scripts/send_ip_check_headers.py index 40268d99c2..e052b12190 100644 --- a/GPL/traffic_scripts/send_ip_check_headers.py +++ b/GPL/traffic_scripts/send_ip_check_headers.py @@ -42,17 +42,33 @@ from .PacketVerifier import RxQueue, TxQueue from .TrafficScriptArg import TrafficScriptArg -def valid_ipv4(ip): +def valid_ipv4(ip_address): + """Check IPv4 address. + + :param ip_address: IPv4 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv4Address(ip) + ipaddress.IPv4Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False -def valid_ipv6(ip): +def valid_ipv6(ip_address): + """Check IPv6 address. + + :param ip_address: IPv6 address to check. + :type ip_address: str + :returns: True if IP address is correct. + :rtype: bool + :raises AttributeError, AddressValueError: If IP address is not valid. + """ try: - ipaddress.IPv6Address(ip) + ipaddress.IPv6Address(ip_address) return True except (AttributeError, ipaddress.AddressValueError): return False @@ -90,7 +106,7 @@ def main(): rxq = RxQueue(rx_if) txq = TxQueue(tx_if) - sent_packets =list() + sent_packets = list() pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac) if encaps_tx == u"Dot1q": diff --git a/GPL/traffic_scripts/srv6_encap.py b/GPL/traffic_scripts/srv6_encap.py index 9cdfccf432..ce62d79974 100644 --- a/GPL/traffic_scripts/srv6_encap.py +++ b/GPL/traffic_scripts/srv6_encap.py @@ -255,9 +255,9 @@ def main(): if rx_pkt_recv.haslayer(ICMPv6ND_NS): # read another packet in the queue if the current one is ICMPv6ND_NS continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break check_srv6( rx_pkt_recv, rx_src_mac, rx_dst_mac, src_ip, dst_ip, dir0_srcsid, @@ -270,20 +270,20 @@ def main(): ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt))) rx_pkt_send = ( - Ether(src=rx_dst_mac, dst=rx_src_mac) / - IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / - IPv6ExtHdrSegmentRouting( - segleft=1 if dir1_dstsid3 == u"None" else 2, - lastentry=1 if dir1_dstsid3 == u"None" else 2, - addresses=[dir1_dstsid2, dir1_dstsid1] - if dir1_dstsid3 == u"None" - else [dir1_dstsid3, dir1_dstsid2, dir1_dstsid1] - ) / - ip_pkt + Ether(src=rx_dst_mac, dst=rx_src_mac) / + IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / + IPv6ExtHdrSegmentRouting( + segleft=1 if dir1_dstsid3 == u"None" else 2, + lastentry=1 if dir1_dstsid3 == u"None" else 2, + addresses=[dir1_dstsid2, dir1_dstsid1] + if dir1_dstsid3 == u"None" + else [dir1_dstsid3, dir1_dstsid2, dir1_dstsid1] + ) / + ip_pkt ) if dir1_dstsid2 != u"None" else ( - Ether(src=rx_dst_mac, dst=rx_src_mac) / - IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / - ip_pkt + Ether(src=rx_dst_mac, dst=rx_src_mac) / + IPv6(src=dir1_srcsid, dst=dir1_dstsid1) / + ip_pkt ) rx_txq.send(rx_pkt_send) @@ -300,9 +300,9 @@ def main(): # read another packet in the queue if the current one is # ICMPv6MLReport2 continue - else: - # otherwise process the current packet - break + + # otherwise process the current packet + break if decap == u"True": check_ip(tx_pkt_recv, tx_dst_mac, tx_src_mac, dst_ip, src_ip) diff --git a/GPL/traffic_scripts/vxlan.py b/GPL/traffic_scripts/vxlan.py index 2acd2a2c93..25d2d60ed8 100644 --- a/GPL/traffic_scripts/vxlan.py +++ b/GPL/traffic_scripts/vxlan.py @@ -21,6 +21,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Traffic script for vxlan verification.""" + from scapy.fields import BitField, XByteField, X3BytesField from scapy.layers.inet import UDP from scapy.layers.l2 import Ether @@ -28,6 +30,8 @@ from scapy.packet import Packet, bind_layers class VXLAN(Packet): + """Custom scapy layer override for VXLAN.""" + name = u"VXLAN" fields_desc = [ BitField(u"flags", 0x08000000, 32), |