diff options
Diffstat (limited to 'test/gtest/test_scapy_gen.py')
-rw-r--r-- | test/gtest/test_scapy_gen.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/test/gtest/test_scapy_gen.py b/test/gtest/test_scapy_gen.py new file mode 100644 index 0000000..aecbe5f --- /dev/null +++ b/test/gtest/test_scapy_gen.py @@ -0,0 +1,96 @@ +# Copyright (c) 2016 Intel Corporation. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +from socket import inet_pton +import logging +logging.getLogger("scapy.runtime").setLevel(logging.ERROR) +from scapy.all import * +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6 +from scapy.layers.inet import UDP +from random import shuffle + +src_mac = "00:00:00:00:de:ad" +dst_mac = "00:00:de:ad:be:ef" +eth_hdr_len = len(Ether()) +ip_hdr_len = len(IP()) +ipv6_hdr_len = len(IPv6()) +udp_hdr_len = len(UDP()) +udpv4_hdr_len = eth_hdr_len + ip_hdr_len + udp_hdr_len +udpv6_hdr_len = eth_hdr_len + ipv6_hdr_len + udp_hdr_len + + +def write_pkts(pkts, pcap_path): + try: + pktdump = PcapWriter(pcap_path, append=False, sync=True) + if len(pkts) > 0: + pktdump.write(pkts) + except IOError: + pass + + +def read_pkts(pcap_path): + try: + pkts_ref = PcapReader(pcap_path) + pkts = pkts_ref.read_all() + return list(pkts) + except IOError: + pkts = [] + return pkts + + +def main(): + parser = argparse.ArgumentParser(description="Generate packets for" + "TLDK rx/tx tests") + parser.add_argument("l_ip") + parser.add_argument("r_ip") + parser.add_argument("l_port", type=int) + parser.add_argument("r_port", type=int) + parser.add_argument("nb_pkts", type=int) + parser.add_argument("file") + parser.add_argument("-bc3", "--bad_chksum_l3", default=None, type=int) + parser.add_argument("-bc4", "--bad_chksum_l4", default=None, type=int) + parser.add_argument("-f", "--fragment") + parser.add_argument("-r", "--rand-pkt-size") + + args = parser.parse_args() + + ip_ver = "" + try: + inet_pton(socket.AF_INET, args.l_ip) + ip_ver = "ipv4" + except socket.error: + ip_ver = "ipv6" + + pkts = read_pkts(args.file) + + if "ipv4" in ip_ver: + for i in range(0, args.nb_pkts): + pkt = Ether(dst=dst_mac, src=src_mac) /\ + IP(src=args.l_ip, dst=args.r_ip, frag=0, chksum=args.bad_chksum_l3) /\ + UDP(sport=args.l_port, dport=args.r_port, chksum=args.bad_chksum_l4) /\ + Raw(RandString(size=(100 - udpv4_hdr_len))) + pkts.append(pkt) + else: + for i in range(0, args.nb_pkts): + pkt = Ether(dst=dst_mac, src=src_mac) /\ + IPv6(src=args.l_ip, dst=args.r_ip) /\ + UDP(sport=args.l_port, dport=args.r_port, chksum=args.bad_chksum_l4) / \ + Raw(RandString(size=(100 - udpv6_hdr_len))) + pkts.append(pkt) + + shuffle(pkts) + write_pkts(pkts, args.file) + +main() |