summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/google/gopacket/tcpassembly/tcpreader
diff options
context:
space:
mode:
authorMilan Lenco <milan.lenco@pantheon.tech>2017-10-11 16:40:58 +0200
committerMilan Lenco <milan.lenco@pantheon.tech>2017-10-13 08:40:37 +0200
commit3f1edad4e6ba0a7876750aea55507fae14d8badf (patch)
treea473997249d9ba7deb70b1076d14e4c4ed029a43 /vendor/github.com/google/gopacket/tcpassembly/tcpreader
parent8b66677c2382a8e739d437621de4473d5ec0b9f1 (diff)
ODPM 266: Go-libmemif + 2 examples.
Change-Id: Icdb9b9eb2314eff6c96afe7996fcf2728291de4a Signed-off-by: Milan Lenco <milan.lenco@pantheon.tech>
Diffstat (limited to 'vendor/github.com/google/gopacket/tcpassembly/tcpreader')
-rw-r--r--vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go210
-rw-r--r--vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go129
2 files changed, 339 insertions, 0 deletions
diff --git a/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go
new file mode 100644
index 0000000..092b811
--- /dev/null
+++ b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader.go
@@ -0,0 +1,210 @@
+// Copyright 2012 Google, Inc. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license
+// that can be found in the LICENSE file in the root of the source
+// tree.
+
+// Package tcpreader provides an implementation for tcpassembly.Stream which presents
+// the caller with an io.Reader for easy processing.
+//
+// The assembly package handles packet data reordering, but its output is
+// library-specific, thus not usable by the majority of external Go libraries.
+// The io.Reader interface, on the other hand, is used throughout much of Go
+// code as an easy mechanism for reading in data streams and decoding them. For
+// example, the net/http package provides the ReadRequest function, which can
+// parase an HTTP request from a live data stream, just what we'd want when
+// sniffing HTTP traffic. Using ReaderStream, this is relatively easy to set
+// up:
+//
+// // Create our StreamFactory
+// type httpStreamFactory struct {}
+// func (f *httpStreamFactory) New(a, b gopacket.Flow) {
+// r := tcpreader.NewReaderStream(false)
+// go printRequests(r)
+// return &r
+// }
+// func printRequests(r io.Reader) {
+// // Convert to bufio, since that's what ReadRequest wants.
+// buf := bufio.NewReader(r)
+// for {
+// if req, err := http.ReadRequest(buf); err == io.EOF {
+// return
+// } else if err != nil {
+// log.Println("Error parsing HTTP requests:", err)
+// } else {
+// fmt.Println("HTTP REQUEST:", req)
+// fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes")
+// }
+// }
+// }
+//
+// Using just this code, we're able to reference a powerful, built-in library
+// for HTTP request parsing to do all the dirty-work of parsing requests from
+// the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool,
+// start up an tcpassembly.Assembler, and you're good to go!
+package tcpreader
+
+import (
+ "errors"
+ "github.com/google/gopacket/tcpassembly"
+ "io"
+)
+
+var discardBuffer = make([]byte, 4096)
+
+// DiscardBytesToFirstError will read in all bytes up to the first error
+// reported by the given reader, then return the number of bytes discarded
+// and the error encountered.
+func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) {
+ for {
+ n, e := r.Read(discardBuffer)
+ discarded += n
+ if e != nil {
+ return discarded, e
+ }
+ }
+}
+
+// DiscardBytesToEOF will read in all bytes from a Reader until it
+// encounters an io.EOF, then return the number of bytes. Be careful
+// of this... if used on a Reader that returns a non-io.EOF error
+// consistently, this will loop forever discarding that error while
+// it waits for an EOF.
+func DiscardBytesToEOF(r io.Reader) (discarded int) {
+ for {
+ n, e := DiscardBytesToFirstError(r)
+ discarded += n
+ if e == io.EOF {
+ return
+ }
+ }
+}
+
+// ReaderStream implements both tcpassembly.Stream and io.Reader. You can use it
+// as a building block to make simple, easy stream handlers.
+//
+// IMPORTANT: If you use a ReaderStream, you MUST read ALL BYTES from it,
+// quickly. Not reading available bytes will block TCP stream reassembly. It's
+// a common pattern to do this by starting a goroutine in the factory's New
+// method:
+//
+// type myStreamHandler struct {
+// r ReaderStream
+// }
+// func (m *myStreamHandler) run() {
+// // Do something here that reads all of the ReaderStream, or your assembly
+// // will block.
+// fmt.Println(tcpreader.DiscardBytesToEOF(&m.r))
+// }
+// func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
+// s := &myStreamHandler{}
+// go s.run()
+// // Return the ReaderStream as the stream that assembly should populate.
+// return &s.r
+// }
+type ReaderStream struct {
+ ReaderStreamOptions
+ reassembled chan []tcpassembly.Reassembly
+ done chan bool
+ current []tcpassembly.Reassembly
+ closed bool
+ lossReported bool
+ first bool
+ initiated bool
+}
+
+// ReaderStreamOptions provides user-resettable options for a ReaderStream.
+type ReaderStreamOptions struct {
+ // LossErrors determines whether this stream will return
+ // ReaderStreamDataLoss errors from its Read function whenever it
+ // determines data has been lost.
+ LossErrors bool
+}
+
+// NewReaderStream returns a new ReaderStream object.
+func NewReaderStream() ReaderStream {
+ r := ReaderStream{
+ reassembled: make(chan []tcpassembly.Reassembly),
+ done: make(chan bool),
+ first: true,
+ initiated: true,
+ }
+ return r
+}
+
+// Reassembled implements tcpassembly.Stream's Reassembled function.
+func (r *ReaderStream) Reassembled(reassembly []tcpassembly.Reassembly) {
+ if !r.initiated {
+ panic("ReaderStream not created via NewReaderStream")
+ }
+ r.reassembled <- reassembly
+ <-r.done
+}
+
+// ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
+func (r *ReaderStream) ReassemblyComplete() {
+ close(r.reassembled)
+ close(r.done)
+}
+
+// stripEmpty strips empty reassembly slices off the front of its current set of
+// slices.
+func (r *ReaderStream) stripEmpty() {
+ for len(r.current) > 0 && len(r.current[0].Bytes) == 0 {
+ r.current = r.current[1:]
+ r.lossReported = false
+ }
+}
+
+// DataLost is returned by the ReaderStream's Read function when it encounters
+// a Reassembly with Skip != 0.
+var DataLost = errors.New("lost data")
+
+// Read implements io.Reader's Read function.
+// Given a byte slice, it will either copy a non-zero number of bytes into
+// that slice and return the number of bytes and a nil error, or it will
+// leave slice p as is and return 0, io.EOF.
+func (r *ReaderStream) Read(p []byte) (int, error) {
+ if !r.initiated {
+ panic("ReaderStream not created via NewReaderStream")
+ }
+ var ok bool
+ r.stripEmpty()
+ for !r.closed && len(r.current) == 0 {
+ if r.first {
+ r.first = false
+ } else {
+ r.done <- true
+ }
+ if r.current, ok = <-r.reassembled; ok {
+ r.stripEmpty()
+ } else {
+ r.closed = true
+ }
+ }
+ if len(r.current) > 0 {
+ current := &r.current[0]
+ if r.LossErrors && !r.lossReported && current.Skip != 0 {
+ r.lossReported = true
+ return 0, DataLost
+ }
+ length := copy(p, current.Bytes)
+ current.Bytes = current.Bytes[length:]
+ return length, nil
+ }
+ return 0, io.EOF
+}
+
+// Close implements io.Closer's Close function, making ReaderStream a
+// io.ReadCloser. It discards all remaining bytes in the reassembly in a
+// manner that's safe for the assembler (IE: it doesn't block).
+func (r *ReaderStream) Close() error {
+ r.current = nil
+ r.closed = true
+ for {
+ if _, ok := <-r.reassembled; !ok {
+ return nil
+ }
+ r.done <- true
+ }
+}
diff --git a/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go
new file mode 100644
index 0000000..7da9fd9
--- /dev/null
+++ b/vendor/github.com/google/gopacket/tcpassembly/tcpreader/reader_test.go
@@ -0,0 +1,129 @@
+// Copyright 2012 Google, Inc. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license
+// that can be found in the LICENSE file in the root of the source
+// tree.
+
+package tcpreader
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/tcpassembly"
+ "io"
+ "net"
+ "testing"
+)
+
+var netFlow gopacket.Flow
+
+func init() {
+ netFlow, _ = gopacket.FlowFromEndpoints(
+ layers.NewIPEndpoint(net.IP{1, 2, 3, 4}),
+ layers.NewIPEndpoint(net.IP{5, 6, 7, 8}))
+}
+
+type readReturn struct {
+ data []byte
+ err error
+}
+type readSequence struct {
+ in []layers.TCP
+ want []readReturn
+}
+type testReaderFactory struct {
+ lossErrors bool
+ readSize int
+ ReaderStream
+ output chan []byte
+}
+
+func (t *testReaderFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
+ return &t.ReaderStream
+}
+
+func testReadSequence(t *testing.T, lossErrors bool, readSize int, seq readSequence) {
+ f := &testReaderFactory{ReaderStream: NewReaderStream()}
+ f.ReaderStream.LossErrors = lossErrors
+ p := tcpassembly.NewStreamPool(f)
+ a := tcpassembly.NewAssembler(p)
+ buf := make([]byte, readSize)
+ go func() {
+ for i, test := range seq.in {
+ fmt.Println("Assembling", i)
+ a.Assemble(netFlow, &test)
+ fmt.Println("Assembly done")
+ }
+ }()
+ for i, test := range seq.want {
+ fmt.Println("Waiting for read", i)
+ n, err := f.Read(buf[:])
+ fmt.Println("Got read")
+ if n != len(test.data) {
+ t.Errorf("test %d want %d bytes, got %d bytes", i, len(test.data), n)
+ } else if err != test.err {
+ t.Errorf("test %d want err %v, got err %v", i, test.err, err)
+ } else if !bytes.Equal(buf[:n], test.data) {
+ t.Errorf("test %d\nwant: %v\n got: %v\n", i, test.data, buf[:n])
+ }
+ }
+ fmt.Println("All done reads")
+}
+
+func TestRead(t *testing.T) {
+ testReadSequence(t, false, 10, readSequence{
+ in: []layers.TCP{
+ {
+ SYN: true,
+ SrcPort: 1,
+ DstPort: 2,
+ Seq: 1000,
+ BaseLayer: layers.BaseLayer{Payload: []byte{1, 2, 3}},
+ },
+ {
+ FIN: true,
+ SrcPort: 1,
+ DstPort: 2,
+ Seq: 1004,
+ },
+ },
+ want: []readReturn{
+ {data: []byte{1, 2, 3}},
+ {err: io.EOF},
+ },
+ })
+}
+
+func TestReadSmallChunks(t *testing.T) {
+ testReadSequence(t, false, 2, readSequence{
+ in: []layers.TCP{
+ {
+ SYN: true,
+ SrcPort: 1,
+ DstPort: 2,
+ Seq: 1000,
+ BaseLayer: layers.BaseLayer{Payload: []byte{1, 2, 3}},
+ },
+ {
+ FIN: true,
+ SrcPort: 1,
+ DstPort: 2,
+ Seq: 1004,
+ },
+ },
+ want: []readReturn{
+ {data: []byte{1, 2}},
+ {data: []byte{3}},
+ {err: io.EOF},
+ },
+ })
+}
+
+func ExampleDiscardBytesToEOF() {
+ b := bytes.NewBuffer([]byte{1, 2, 3, 4, 5})
+ fmt.Println(DiscardBytesToEOF(b))
+ // Output:
+ // 5
+}