aboutsummaryrefslogtreecommitdiffstats
path: root/GPL/traffic_scripts
diff options
context:
space:
mode:
authorpmikus <pmikus@cisco.com>2021-06-10 08:02:29 +0000
committerpmikus <pmikus@cisco.com>2021-06-10 11:05:48 +0000
commit7829fea4a2c8936513fa95215b7d84997f814a69 (patch)
treea75d02efa5a270fbe31a845bcc3fc66de76bb827 /GPL/traffic_scripts
parent8b25b4e89bdba964f2a3d602b8c47f551a084724 (diff)
FIX: Pylint reduce
Signed-off-by: pmikus <pmikus@cisco.com> Change-Id: I909942dbb920df7f0fe15c0c92cda92c3cd8d8ad
Diffstat (limited to 'GPL/traffic_scripts')
-rw-r--r--GPL/traffic_scripts/PacketVerifier.py1
-rw-r--r--GPL/traffic_scripts/geneve_tunnel.py6
-rw-r--r--GPL/traffic_scripts/ipsec_interface.py12
-rw-r--r--GPL/traffic_scripts/ipsec_policy.py6
-rw-r--r--GPL/traffic_scripts/lisp/lisp_check.py27
-rw-r--r--GPL/traffic_scripts/lisp/lispgpe_check.py24
-rw-r--r--GPL/traffic_scripts/nat.py24
-rw-r--r--GPL/traffic_scripts/policer.py7
-rw-r--r--GPL/traffic_scripts/send_icmp_wait_for_reply.py34
-rw-r--r--GPL/traffic_scripts/send_ip_check_headers.py26
-rw-r--r--GPL/traffic_scripts/srv6_encap.py38
-rw-r--r--GPL/traffic_scripts/vxlan.py4
12 files changed, 139 insertions, 70 deletions
diff --git a/GPL/traffic_scripts/PacketVerifier.py b/GPL/traffic_scripts/PacketVerifier.py
index 89b8c3c206..974906e439 100644
--- a/GPL/traffic_scripts/PacketVerifier.py
+++ b/GPL/traffic_scripts/PacketVerifier.py
@@ -76,6 +76,7 @@
import os
import select
+import time
from scapy.all import ETH_P_IP, ETH_P_IPV6, ETH_P_ALL, ETH_P_ARP
from scapy.config import conf
diff --git a/GPL/traffic_scripts/geneve_tunnel.py b/GPL/traffic_scripts/geneve_tunnel.py
index 1270aa6663..19dae6d5c9 100644
--- a/GPL/traffic_scripts/geneve_tunnel.py
+++ b/GPL/traffic_scripts/geneve_tunnel.py
@@ -284,9 +284,9 @@ def main():
# read another packet in the queue if the current one is
# ICMPv6MLReport2
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
check_geneve(
rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, geneve_tunnel_mac,
diff --git a/GPL/traffic_scripts/ipsec_interface.py b/GPL/traffic_scripts/ipsec_interface.py
index ee157260fa..d49e8bc57e 100644
--- a/GPL/traffic_scripts/ipsec_interface.py
+++ b/GPL/traffic_scripts/ipsec_interface.py
@@ -243,9 +243,9 @@ def main():
if rx_pkt_recv.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
check_ipsec(
rx_pkt_recv, ip_layer, rx_src_mac, rx_dst_mac, src_tun, dst_tun, src_ip,
@@ -275,9 +275,9 @@ def main():
# read another packet in the queue if the current one is
# ICMPv6MLReport2
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
check_ip(tx_pkt_recv, ip_layer, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
diff --git a/GPL/traffic_scripts/ipsec_policy.py b/GPL/traffic_scripts/ipsec_policy.py
index 4acf0a144b..b0d4545114 100644
--- a/GPL/traffic_scripts/ipsec_policy.py
+++ b/GPL/traffic_scripts/ipsec_policy.py
@@ -209,9 +209,9 @@ def main():
# read another packet in the queue if the current one is
# ICMPv6MLReport2
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
check_ip(rx_pkt_recv, ip_layer, src_ip, dst_ip)
diff --git a/GPL/traffic_scripts/lisp/lisp_check.py b/GPL/traffic_scripts/lisp/lisp_check.py
index 88d4ad5619..f7b4c6a797 100644
--- a/GPL/traffic_scripts/lisp/lisp_check.py
+++ b/GPL/traffic_scripts/lisp/lisp_check.py
@@ -66,17 +66,33 @@ class LispInnerIPv6(IPv6):
name = u"Lisp Inner Layer - IPv6"
-def valid_ipv4(ip):
+def valid_ipv4(ip_address):
+ """Check IPv4 address.
+
+ :param ip_address: IPv4 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv4Address(ip)
+ ipaddress.IPv4Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
-def valid_ipv6(ip):
+def valid_ipv6(ip_address):
+ """Check IPv6 address.
+
+ :param ip_address: IPv6 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv6Address(ip)
+ ipaddress.IPv6Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -85,7 +101,8 @@ def valid_ipv6(ip):
def main():
"""Send IP ICMP packet from one traffic generator interface to the other.
- :raises RuntimeError: If the received packet is not correct."""
+ :raises RuntimeError: If the received packet is not correct.
+ """
args = TrafficScriptArg(
[
diff --git a/GPL/traffic_scripts/lisp/lispgpe_check.py b/GPL/traffic_scripts/lisp/lispgpe_check.py
index 79d2ccfab6..f8fa595867 100644
--- a/GPL/traffic_scripts/lisp/lispgpe_check.py
+++ b/GPL/traffic_scripts/lisp/lispgpe_check.py
@@ -92,17 +92,33 @@ class LispGPEInnerNSH(Packet):
"""
-def valid_ipv4(ip):
+def valid_ipv4(ip_address):
+ """Check IPv4 address.
+
+ :param ip_address: IPv4 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv4Address(ip)
+ ipaddress.IPv4Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
-def valid_ipv6(ip):
+def valid_ipv6(ip_address):
+ """Check IPv6 address.
+
+ :param ip_address: IPv6 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv6Address(ip)
+ ipaddress.IPv6Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
diff --git a/GPL/traffic_scripts/nat.py b/GPL/traffic_scripts/nat.py
index 319d617f7c..43715fb383 100644
--- a/GPL/traffic_scripts/nat.py
+++ b/GPL/traffic_scripts/nat.py
@@ -38,17 +38,33 @@ from .PacketVerifier import RxQueue, TxQueue
from .TrafficScriptArg import TrafficScriptArg
-def valid_ipv4(ip):
+def valid_ipv4(ip_address):
+ """Check IPv4 address.
+
+ :param ip_address: IPv4 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv4Address(ip)
+ ipaddress.IPv4Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
-def valid_ipv6(ip):
+def valid_ipv6(ip_address):
+ """Check IPv6 address.
+
+ :param ip_address: IPv6 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv6Address(ip)
+ ipaddress.IPv6Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
diff --git a/GPL/traffic_scripts/policer.py b/GPL/traffic_scripts/policer.py
index ef78f279ad..46131da2a0 100644
--- a/GPL/traffic_scripts/policer.py
+++ b/GPL/traffic_scripts/policer.py
@@ -26,7 +26,6 @@
"""Traffic script for IPsec verification."""
import sys
-import logging
from ipaddress import ip_address
from scapy.layers.l2 import Ether
@@ -117,9 +116,9 @@ def main():
# read another packet in the queue if the current one is
# ICMPv6MLReport2
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
if pkt_recv is None:
raise RuntimeError(u"Rx timeout")
diff --git a/GPL/traffic_scripts/send_icmp_wait_for_reply.py b/GPL/traffic_scripts/send_icmp_wait_for_reply.py
index 966fa60a9c..fefa60f44e 100644
--- a/GPL/traffic_scripts/send_icmp_wait_for_reply.py
+++ b/GPL/traffic_scripts/send_icmp_wait_for_reply.py
@@ -38,33 +38,33 @@ from .PacketVerifier import RxQueue, TxQueue
from .TrafficScriptArg import TrafficScriptArg
-def valid_ipv4(ip):
- """Check if IP address has the correct IPv4 address format.
+def valid_ipv4(ip_address):
+ """Check IPv4 address.
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv4 address format,
- otherwise return False.
+ :param ip_address: IPv4 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
:rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
"""
try:
- ipaddress.IPv4Address(ip)
+ ipaddress.IPv4Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
-def valid_ipv6(ip):
- """Check if IP address has the correct IPv6 address format.
+def valid_ipv6(ip_address):
+ """Check IPv6 address.
- :param ip: IP address.
- :type ip: str
- :return: True in case of correct IPv6 address format,
- otherwise return False.
+ :param ip_address: IPv6 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
:rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
"""
try:
- ipaddress.IPv6Address(ip)
+ ipaddress.IPv6Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -105,9 +105,9 @@ def main():
raise ValueError(u"IP not in correct format")
icmp_request = (
- Ether(src=src_mac, dst=dst_mac) /
- ip_layer(src=src_ip, dst=dst_ip) /
- icmp_req()
+ Ether(src=src_mac, dst=dst_mac) /
+ ip_layer(src=src_ip, dst=dst_ip) /
+ icmp_req()
)
# Send created packet on the interface
diff --git a/GPL/traffic_scripts/send_ip_check_headers.py b/GPL/traffic_scripts/send_ip_check_headers.py
index 40268d99c2..e052b12190 100644
--- a/GPL/traffic_scripts/send_ip_check_headers.py
+++ b/GPL/traffic_scripts/send_ip_check_headers.py
@@ -42,17 +42,33 @@ from .PacketVerifier import RxQueue, TxQueue
from .TrafficScriptArg import TrafficScriptArg
-def valid_ipv4(ip):
+def valid_ipv4(ip_address):
+ """Check IPv4 address.
+
+ :param ip_address: IPv4 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv4Address(ip)
+ ipaddress.IPv4Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
-def valid_ipv6(ip):
+def valid_ipv6(ip_address):
+ """Check IPv6 address.
+
+ :param ip_address: IPv6 address to check.
+ :type ip_address: str
+ :returns: True if IP address is correct.
+ :rtype: bool
+ :raises AttributeError, AddressValueError: If IP address is not valid.
+ """
try:
- ipaddress.IPv6Address(ip)
+ ipaddress.IPv6Address(ip_address)
return True
except (AttributeError, ipaddress.AddressValueError):
return False
@@ -90,7 +106,7 @@ def main():
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
- sent_packets =list()
+ sent_packets = list()
pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
if encaps_tx == u"Dot1q":
diff --git a/GPL/traffic_scripts/srv6_encap.py b/GPL/traffic_scripts/srv6_encap.py
index 9cdfccf432..ce62d79974 100644
--- a/GPL/traffic_scripts/srv6_encap.py
+++ b/GPL/traffic_scripts/srv6_encap.py
@@ -255,9 +255,9 @@ def main():
if rx_pkt_recv.haslayer(ICMPv6ND_NS):
# read another packet in the queue if the current one is ICMPv6ND_NS
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
check_srv6(
rx_pkt_recv, rx_src_mac, rx_dst_mac, src_ip, dst_ip, dir0_srcsid,
@@ -270,20 +270,20 @@ def main():
ip_pkt[Raw].load += (b"\0" * (size_limit - 14 - len(ip_pkt)))
rx_pkt_send = (
- Ether(src=rx_dst_mac, dst=rx_src_mac) /
- IPv6(src=dir1_srcsid, dst=dir1_dstsid1) /
- IPv6ExtHdrSegmentRouting(
- segleft=1 if dir1_dstsid3 == u"None" else 2,
- lastentry=1 if dir1_dstsid3 == u"None" else 2,
- addresses=[dir1_dstsid2, dir1_dstsid1]
- if dir1_dstsid3 == u"None"
- else [dir1_dstsid3, dir1_dstsid2, dir1_dstsid1]
- ) /
- ip_pkt
+ Ether(src=rx_dst_mac, dst=rx_src_mac) /
+ IPv6(src=dir1_srcsid, dst=dir1_dstsid1) /
+ IPv6ExtHdrSegmentRouting(
+ segleft=1 if dir1_dstsid3 == u"None" else 2,
+ lastentry=1 if dir1_dstsid3 == u"None" else 2,
+ addresses=[dir1_dstsid2, dir1_dstsid1]
+ if dir1_dstsid3 == u"None"
+ else [dir1_dstsid3, dir1_dstsid2, dir1_dstsid1]
+ ) /
+ ip_pkt
) if dir1_dstsid2 != u"None" else (
- Ether(src=rx_dst_mac, dst=rx_src_mac) /
- IPv6(src=dir1_srcsid, dst=dir1_dstsid1) /
- ip_pkt
+ Ether(src=rx_dst_mac, dst=rx_src_mac) /
+ IPv6(src=dir1_srcsid, dst=dir1_dstsid1) /
+ ip_pkt
)
rx_txq.send(rx_pkt_send)
@@ -300,9 +300,9 @@ def main():
# read another packet in the queue if the current one is
# ICMPv6MLReport2
continue
- else:
- # otherwise process the current packet
- break
+
+ # otherwise process the current packet
+ break
if decap == u"True":
check_ip(tx_pkt_recv, tx_dst_mac, tx_src_mac, dst_ip, src_ip)
diff --git a/GPL/traffic_scripts/vxlan.py b/GPL/traffic_scripts/vxlan.py
index 2acd2a2c93..25d2d60ed8 100644
--- a/GPL/traffic_scripts/vxlan.py
+++ b/GPL/traffic_scripts/vxlan.py
@@ -21,6 +21,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Traffic script for vxlan verification."""
+
from scapy.fields import BitField, XByteField, X3BytesField
from scapy.layers.inet import UDP
from scapy.layers.l2 import Ether
@@ -28,6 +30,8 @@ from scapy.packet import Packet, bind_layers
class VXLAN(Packet):
+ """Custom scapy layer override for VXLAN."""
+
name = u"VXLAN"
fields_desc = [
BitField(u"flags", 0x08000000, 32),