#!/usr/bin/python # Copyright 2012 Google, Inc. All rights reserved. """TestCreator creates test templates from pcap files.""" import argparse import base64 import glob import re import string import subprocess import sys class Packet(object): """Helper class encapsulating packet from a pcap file.""" def __init__(self, packet_lines): self.packet_lines = packet_lines self.data = self._DecodeText(packet_lines) @classmethod def _DecodeText(cls, packet_lines): packet_bytes = [] # First line is timestamp and stuff, skip it. # Format: 0x0010: 0000 0020 3aff 3ffe 0000 0000 0000 0000 ....:.?......... for line in packet_lines[1:]: m = re.match(r'\s+0x[a-f\d]+:\s+((?:[\da-f]{2,4}\s)*)', line, re.IGNORECASE) if m is None: continue for hexpart in m.group(1).split(): packet_bytes.append(base64.b16decode(hexpart.upper())) return ''.join(packet_bytes) def Test(self, name, link_type): """Yields a test using this packet, as a set of lines.""" yield '// testPacket%s is the packet:' % name for line in self.packet_lines: yield '// ' + line yield 'var testPacket%s = []byte{' % name data = list(self.data) while data: linebytes, data = data[:16], data[16:] yield ''.join(['\t'] + ['0x%02x, ' % ord(c) for c in linebytes]) yield '}' yield 'func TestPacket%s(t *testing.T) {' % name yield '\tp := gopacket.NewPacket(testPacket%s, LinkType%s, gopacket.Default)' % (name, link_type) yield '\tif p.ErrorLayer() != nil {' yield '\t\tt.Error("Failed to decode packet:", p.ErrorLayer().Error())' yield '\t}' yield '\tcheckLayers(p, []gopacket.LayerType{LayerType%s, FILL_ME_IN_WITH_ACTUAL_LAYERS}, t)' % link_type yield '}' yield 'func BenchmarkDecodePacket%s(b *testing.B) {' % name yield '\tfor i := 0; i < b.N; i++ {' yield '\t\tgopacket.NewPacket(testPacket%s, LinkType%s, gopacket.NoCopy)' % (name, link_type) yield '\t}' yield '}' def GetTcpdumpOutput(filename): """Runs tcpdump on the given file, returning output as string.""" return subprocess.check_output( ['tcpdump', '-XX', '-s', '0', '-n', '-r', filename]) def TcpdumpOutputToPackets(output): """Reads a pcap file with TCPDump, yielding Packet objects.""" pdata = [] for line in output.splitlines(): if line[0] not in string.whitespace and pdata: yield Packet(pdata) pdata = [] pdata.append(line) if pdata: yield Packet(pdata) def main(): class CustomHelpFormatter(argparse.ArgumentDefaultsHelpFormatter): def _format_usage(self, usage, actions, groups, prefix=None): header =('TestCreator creates gopacket tests using a pcap file.\n\n' 'Tests are written to standard out... they can then be \n' 'copied into the file of your choice and modified as \n' 'you see.\n\n') return header + argparse.ArgumentDefaultsHelpFormatter._format_usage( self, usage, actions, groups, prefix) parser = argparse.ArgumentParser(formatter_class=CustomHelpFormatter) parser.add_argument('--link_type', default='Ethernet', help='the link type (default: %(default)s)') parser.add_argument('--name', default='Packet%d', help='the layer type, must have "%d" inside it') parser.add_argument('files', metavar='file.pcap', type=str, nargs='+', help='the files to process') args = parser.parse_args() for arg in args.files: for path in glob.glob(arg): for i, packet in enumerate(TcpdumpOutputToPackets(GetTcpdumpOutput(path))): print '\n'.join(packet.Test( args.name % i, args.link_type)) if __name__ == '__main__': main()