diff options
author | Milan Lenco <milan.lenco@pantheon.tech> | 2017-10-11 16:40:58 +0200 |
---|---|---|
committer | Milan Lenco <milan.lenco@pantheon.tech> | 2017-10-13 08:40:37 +0200 |
commit | 3f1edad4e6ba0a7876750aea55507fae14d8badf (patch) | |
tree | a473997249d9ba7deb70b1076d14e4c4ed029a43 /vendor/github.com/google/gopacket/tcpassembly/tcpreader | |
parent | 8b66677c2382a8e739d437621de4473d5ec0b9f1 (diff) |
ODPM 266: Go-libmemif + 2 examples.
Change-Id: Icdb9b9eb2314eff6c96afe7996fcf2728291de4a
Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
Diffstat (limited to 'vendor/github.com/google/gopacket/tcpassembly/tcpreader')
-rw-r--r-- | vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go | 210 | ||||
-rw-r--r-- | vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go | 129 |
2 files changed, 339 insertions, 0 deletions
diff --git a/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go new file mode 100644 index 0000000..092b811 --- /dev/null +++ b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go @@ -0,0 +1,210 @@ +// Copyright 2012 Google, Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style license +// that can be found in the LICENSE file in the root of the source +// tree. + +// Package tcpreader provides an implementation for tcpassembly.Stream which presents +// the caller with an io.Reader for easy processing. +// +// The assembly package handles packet data reordering, but its output is +// library-specific, thus not usable by the majority of external Go libraries. +// The io.Reader interface, on the other hand, is used throughout much of Go +// code as an easy mechanism for reading in data streams and decoding them. For +// example, the net/http package provides the ReadRequest function, which can +// parase an HTTP request from a live data stream, just what we'd want when +// sniffing HTTP traffic. Using ReaderStream, this is relatively easy to set +// up: +// +// // Create our StreamFactory +// type httpStreamFactory struct {} +// func (f *httpStreamFactory) New(a, b gopacket.Flow) { +// r := tcpreader.NewReaderStream(false) +// go printRequests(r) +// return &r +// } +// func printRequests(r io.Reader) { +// // Convert to bufio, since that's what ReadRequest wants. +// buf := bufio.NewReader(r) +// for { +// if req, err := http.ReadRequest(buf); err == io.EOF { +// return +// } else if err != nil { +// log.Println("Error parsing HTTP requests:", err) +// } else { +// fmt.Println("HTTP REQUEST:", req) +// fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes") +// } +// } +// } +// +// Using just this code, we're able to reference a powerful, built-in library +// for HTTP request parsing to do all the dirty-work of parsing requests from +// the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool, +// start up an tcpassembly.Assembler, and you're good to go! +package tcpreader + +import ( + "errors" + "github.com/google/gopacket/tcpassembly" + "io" +) + +var discardBuffer = make([]byte, 4096) + +// DiscardBytesToFirstError will read in all bytes up to the first error +// reported by the given reader, then return the number of bytes discarded +// and the error encountered. +func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) { + for { + n, e := r.Read(discardBuffer) + discarded += n + if e != nil { + return discarded, e + } + } +} + +// DiscardBytesToEOF will read in all bytes from a Reader until it +// encounters an io.EOF, then return the number of bytes. Be careful +// of this... if used on a Reader that returns a non-io.EOF error +// consistently, this will loop forever discarding that error while +// it waits for an EOF. +func DiscardBytesToEOF(r io.Reader) (discarded int) { + for { + n, e := DiscardBytesToFirstError(r) + discarded += n + if e == io.EOF { + return + } + } +} + +// ReaderStream implements both tcpassembly.Stream and io.Reader. You can use it +// as a building block to make simple, easy stream handlers. +// +// IMPORTANT: If you use a ReaderStream, you MUST read ALL BYTES from it, +// quickly. Not reading available bytes will block TCP stream reassembly. It's +// a common pattern to do this by starting a goroutine in the factory's New +// method: +// +// type myStreamHandler struct { +// r ReaderStream +// } +// func (m *myStreamHandler) run() { +// // Do something here that reads all of the ReaderStream, or your assembly +// // will block. +// fmt.Println(tcpreader.DiscardBytesToEOF(&m.r)) +// } +// func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream { +// s := &myStreamHandler{} +// go s.run() +// // Return the ReaderStream as the stream that assembly should populate. +// return &s.r +// } +type ReaderStream struct { + ReaderStreamOptions + reassembled chan []tcpassembly.Reassembly + done chan bool + current []tcpassembly.Reassembly + closed bool + lossReported bool + first bool + initiated bool +} + +// ReaderStreamOptions provides user-resettable options for a ReaderStream. +type ReaderStreamOptions struct { + // LossErrors determines whether this stream will return + // ReaderStreamDataLoss errors from its Read function whenever it + // determines data has been lost. + LossErrors bool +} + +// NewReaderStream returns a new ReaderStream object. +func NewReaderStream() ReaderStream { + r := ReaderStream{ + reassembled: make(chan []tcpassembly.Reassembly), + done: make(chan bool), + first: true, + initiated: true, + } + return r +} + +// Reassembled implements tcpassembly.Stream's Reassembled function. +func (r *ReaderStream) Reassembled(reassembly []tcpassembly.Reassembly) { + if !r.initiated { + panic("ReaderStream not created via NewReaderStream") + } + r.reassembled <- reassembly + <-r.done +} + +// ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function. +func (r *ReaderStream) ReassemblyComplete() { + close(r.reassembled) + close(r.done) +} + +// stripEmpty strips empty reassembly slices off the front of its current set of +// slices. +func (r *ReaderStream) stripEmpty() { + for len(r.current) > 0 && len(r.current[0].Bytes) == 0 { + r.current = r.current[1:] + r.lossReported = false + } +} + +// DataLost is returned by the ReaderStream's Read function when it encounters +// a Reassembly with Skip != 0. +var DataLost = errors.New("lost data") + +// Read implements io.Reader's Read function. +// Given a byte slice, it will either copy a non-zero number of bytes into +// that slice and return the number of bytes and a nil error, or it will +// leave slice p as is and return 0, io.EOF. +func (r *ReaderStream) Read(p []byte) (int, error) { + if !r.initiated { + panic("ReaderStream not created via NewReaderStream") + } + var ok bool + r.stripEmpty() + for !r.closed && len(r.current) == 0 { + if r.first { + r.first = false + } else { + r.done <- true + } + if r.current, ok = <-r.reassembled; ok { + r.stripEmpty() + } else { + r.closed = true + } + } + if len(r.current) > 0 { + current := &r.current[0] + if r.LossErrors && !r.lossReported && current.Skip != 0 { + r.lossReported = true + return 0, DataLost + } + length := copy(p, current.Bytes) + current.Bytes = current.Bytes[length:] + return length, nil + } + return 0, io.EOF +} + +// Close implements io.Closer's Close function, making ReaderStream a +// io.ReadCloser. It discards all remaining bytes in the reassembly in a +// manner that's safe for the assembler (IE: it doesn't block). +func (r *ReaderStream) Close() error { + r.current = nil + r.closed = true + for { + if _, ok := <-r.reassembled; !ok { + return nil + } + r.done <- true + } +} diff --git a/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go new file mode 100644 index 0000000..7da9fd9 --- /dev/null +++ b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go @@ -0,0 +1,129 @@ +// Copyright 2012 Google, Inc. All rights reserved. +// +// Use of this source code is governed by a BSD-style license +// that can be found in the LICENSE file in the root of the source +// tree. + +package tcpreader + +import ( + "bytes" + "fmt" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/tcpassembly" + "io" + "net" + "testing" +) + +var netFlow gopacket.Flow + +func init() { + netFlow, _ = gopacket.FlowFromEndpoints( + layers.NewIPEndpoint(net.IP{1, 2, 3, 4}), + layers.NewIPEndpoint(net.IP{5, 6, 7, 8})) +} + +type readReturn struct { + data []byte + err error +} +type readSequence struct { + in []layers.TCP + want []readReturn +} +type testReaderFactory struct { + lossErrors bool + readSize int + ReaderStream + output chan []byte +} + +func (t *testReaderFactory) New(a, b gopacket.Flow) tcpassembly.Stream { + return &t.ReaderStream +} + +func testReadSequence(t *testing.T, lossErrors bool, readSize int, seq readSequence) { + f := &testReaderFactory{ReaderStream: NewReaderStream()} + f.ReaderStream.LossErrors = lossErrors + p := tcpassembly.NewStreamPool(f) + a := tcpassembly.NewAssembler(p) + buf := make([]byte, readSize) + go func() { + for i, test := range seq.in { + fmt.Println("Assembling", i) + a.Assemble(netFlow, &test) + fmt.Println("Assembly done") + } + }() + for i, test := range seq.want { + fmt.Println("Waiting for read", i) + n, err := f.Read(buf[:]) + fmt.Println("Got read") + if n != len(test.data) { + t.Errorf("test %d want %d bytes, got %d bytes", i, len(test.data), n) + } else if err != test.err { + t.Errorf("test %d want err %v, got err %v", i, test.err, err) + } else if !bytes.Equal(buf[:n], test.data) { + t.Errorf("test %d\nwant: %v\n got: %v\n", i, test.data, buf[:n]) + } + } + fmt.Println("All done reads") +} + +func TestRead(t *testing.T) { + testReadSequence(t, false, 10, readSequence{ + in: []layers.TCP{ + { + SYN: true, + SrcPort: 1, + DstPort: 2, + Seq: 1000, + BaseLayer: layers.BaseLayer{Payload: []byte{1, 2, 3}}, + }, + { + FIN: true, + SrcPort: 1, + DstPort: 2, + Seq: 1004, + }, + }, + want: []readReturn{ + {data: []byte{1, 2, 3}}, + {err: io.EOF}, + }, + }) +} + +func TestReadSmallChunks(t *testing.T) { + testReadSequence(t, false, 2, readSequence{ + in: []layers.TCP{ + { + SYN: true, + SrcPort: 1, + DstPort: 2, + Seq: 1000, + BaseLayer: layers.BaseLayer{Payload: []byte{1, 2, 3}}, + }, + { + FIN: true, + SrcPort: 1, + DstPort: 2, + Seq: 1004, + }, + }, + want: []readReturn{ + {data: []byte{1, 2}}, + {data: []byte{3}}, + {err: io.EOF}, + }, + }) +} + +func ExampleDiscardBytesToEOF() { + b := bytes.NewBuffer([]byte{1, 2, 3, 4, 5}) + fmt.Println(DiscardBytesToEOF(b)) + // Output: + // 5 +} |