aboutsummaryrefslogtreecommitdiffstats
path: root/test/gtest/test_scapy_gen.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/gtest/test_scapy_gen.py')
-rw-r--r--test/gtest/test_scapy_gen.py96
1 files changed, 96 insertions, 0 deletions
diff --git a/test/gtest/test_scapy_gen.py b/test/gtest/test_scapy_gen.py
new file mode 100644
index 0000000..aecbe5f
--- /dev/null
+++ b/test/gtest/test_scapy_gen.py
@@ -0,0 +1,96 @@
+# Copyright (c) 2016 Intel Corporation.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+from socket import inet_pton
+import logging
+logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
+from scapy.all import *
+from scapy.layers.inet import IP
+from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import UDP
+from random import shuffle
+
+src_mac = "00:00:00:00:de:ad"
+dst_mac = "00:00:de:ad:be:ef"
+eth_hdr_len = len(Ether())
+ip_hdr_len = len(IP())
+ipv6_hdr_len = len(IPv6())
+udp_hdr_len = len(UDP())
+udpv4_hdr_len = eth_hdr_len + ip_hdr_len + udp_hdr_len
+udpv6_hdr_len = eth_hdr_len + ipv6_hdr_len + udp_hdr_len
+
+
+def write_pkts(pkts, pcap_path):
+ try:
+ pktdump = PcapWriter(pcap_path, append=False, sync=True)
+ if len(pkts) > 0:
+ pktdump.write(pkts)
+ except IOError:
+ pass
+
+
+def read_pkts(pcap_path):
+ try:
+ pkts_ref = PcapReader(pcap_path)
+ pkts = pkts_ref.read_all()
+ return list(pkts)
+ except IOError:
+ pkts = []
+ return pkts
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Generate packets for"
+ "TLDK rx/tx tests")
+ parser.add_argument("l_ip")
+ parser.add_argument("r_ip")
+ parser.add_argument("l_port", type=int)
+ parser.add_argument("r_port", type=int)
+ parser.add_argument("nb_pkts", type=int)
+ parser.add_argument("file")
+ parser.add_argument("-bc3", "--bad_chksum_l3", default=None, type=int)
+ parser.add_argument("-bc4", "--bad_chksum_l4", default=None, type=int)
+ parser.add_argument("-f", "--fragment")
+ parser.add_argument("-r", "--rand-pkt-size")
+
+ args = parser.parse_args()
+
+ ip_ver = ""
+ try:
+ inet_pton(socket.AF_INET, args.l_ip)
+ ip_ver = "ipv4"
+ except socket.error:
+ ip_ver = "ipv6"
+
+ pkts = read_pkts(args.file)
+
+ if "ipv4" in ip_ver:
+ for i in range(0, args.nb_pkts):
+ pkt = Ether(dst=dst_mac, src=src_mac) /\
+ IP(src=args.l_ip, dst=args.r_ip, frag=0, chksum=args.bad_chksum_l3) /\
+ UDP(sport=args.l_port, dport=args.r_port, chksum=args.bad_chksum_l4) /\
+ Raw(RandString(size=(100 - udpv4_hdr_len)))
+ pkts.append(pkt)
+ else:
+ for i in range(0, args.nb_pkts):
+ pkt = Ether(dst=dst_mac, src=src_mac) /\
+ IPv6(src=args.l_ip, dst=args.r_ip) /\
+ UDP(sport=args.l_port, dport=args.r_port, chksum=args.bad_chksum_l4) / \
+ Raw(RandString(size=(100 - udpv6_hdr_len)))
+ pkts.append(pkt)
+
+ shuffle(pkts)
+ write_pkts(pkts, args.file)
+
+main()