aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/google/gopacket/tcpassembly/assembly.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/google/gopacket/tcpassembly/assembly.go')
-rw-r--r--vendor/github.com/google/gopacket/tcpassembly/assembly.go788
1 files changed, 0 insertions, 788 deletions
diff --git a/vendor/github.com/google/gopacket/tcpassembly/assembly.go b/vendor/github.com/google/gopacket/tcpassembly/assembly.go
deleted file mode 100644
index 50f6487..0000000
--- a/vendor/github.com/google/gopacket/tcpassembly/assembly.go
+++ /dev/null
@@ -1,788 +0,0 @@
-// Copyright 2012 Google, Inc. All rights reserved.
-//
-// Use of this source code is governed by a BSD-style license
-// that can be found in the LICENSE file in the root of the source
-// tree.
-
-// Package tcpassembly provides TCP stream re-assembly.
-//
-// The tcpassembly package implements uni-directional TCP reassembly, for use in
-// packet-sniffing applications. The caller reads packets off the wire, then
-// presents them to an Assembler in the form of gopacket layers.TCP packets
-// (github.com/google/gopacket, github.com/google/gopacket/layers).
-//
-// The Assembler uses a user-supplied
-// StreamFactory to create a user-defined Stream interface, then passes packet
-// data in stream order to that object. A concurrency-safe StreamPool keeps
-// track of all current Streams being reassembled, so multiple Assemblers may
-// run at once to assemble packets while taking advantage of multiple cores.
-package tcpassembly
-
-import (
- "flag"
- "fmt"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "log"
- "sync"
- "time"
-)
-
-var memLog = flag.Bool("assembly_memuse_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log information regarding its memory use every once in a while.")
-var debugLog = flag.Bool("assembly_debug_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log verbose debugging information (at least one line per packet)")
-
-const invalidSequence = -1
-const uint32Max = 0xFFFFFFFF
-
-// Sequence is a TCP sequence number. It provides a few convenience functions
-// for handling TCP wrap-around. The sequence should always be in the range
-// [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations
-// and should never be set.
-type Sequence int64
-
-// Difference defines an ordering for comparing TCP sequences that's safe for
-// roll-overs. It returns:
-// > 0 : if t comes after s
-// < 0 : if t comes before s
-// 0 : if t == s
-// The number returned is the sequence difference, so 4.Difference(8) will
-// return 4.
-//
-// It handles rollovers by considering any sequence in the first quarter of the
-// uint32 space to be after any sequence in the last quarter of that space, thus
-// wrapping the uint32 space.
-func (s Sequence) Difference(t Sequence) int {
- if s > uint32Max-uint32Max/4 && t < uint32Max/4 {
- t += uint32Max
- } else if t > uint32Max-uint32Max/4 && s < uint32Max/4 {
- s += uint32Max
- }
- return int(t - s)
-}
-
-// Add adds an integer to a sequence and returns the resulting sequence.
-func (s Sequence) Add(t int) Sequence {
- return (s + Sequence(t)) & uint32Max
-}
-
-// Reassembly objects are passed by an Assembler into Streams using the
-// Reassembled call. Callers should not need to create these structs themselves
-// except for testing.
-type Reassembly struct {
- // Bytes is the next set of bytes in the stream. May be empty.
- Bytes []byte
- // Skip is set to non-zero if bytes were skipped between this and the
- // last Reassembly. If this is the first packet in a connection and we
- // didn't see the start, we have no idea how many bytes we skipped, so
- // we set it to -1. Otherwise, it's set to the number of bytes skipped.
- Skip int
- // Start is set if this set of bytes has a TCP SYN accompanying it.
- Start bool
- // End is set if this set of bytes has a TCP FIN or RST accompanying it.
- End bool
- // Seen is the timestamp this set of bytes was pulled off the wire.
- Seen time.Time
-}
-
-const pageBytes = 1900
-
-// page is used to store TCP data we're not ready for yet (out-of-order
-// packets). Unused pages are stored in and returned from a pageCache, which
-// avoids memory allocation. Used pages are stored in a doubly-linked list in
-// a connection.
-type page struct {
- Reassembly
- seq Sequence
- index int
- prev, next *page
- buf [pageBytes]byte
-}
-
-// pageCache is a concurrency-unsafe store of page objects we use to avoid
-// memory allocation as much as we can. It grows but never shrinks.
-type pageCache struct {
- free []*page
- pcSize int
- size, used int
- pages [][]page
- pageRequests int64
-}
-
-const initialAllocSize = 1024
-
-func newPageCache() *pageCache {
- pc := &pageCache{
- free: make([]*page, 0, initialAllocSize),
- pcSize: initialAllocSize,
- }
- pc.grow()
- return pc
-}
-
-// grow exponentially increases the size of our page cache as much as necessary.
-func (c *pageCache) grow() {
- pages := make([]page, c.pcSize)
- c.pages = append(c.pages, pages)
- c.size += c.pcSize
- for i := range pages {
- c.free = append(c.free, &pages[i])
- }
- if *memLog {
- log.Println("PageCache: created", c.pcSize, "new pages")
- }
- c.pcSize *= 2
-}
-
-// next returns a clean, ready-to-use page object.
-func (c *pageCache) next(ts time.Time) (p *page) {
- if *memLog {
- c.pageRequests++
- if c.pageRequests&0xFFFF == 0 {
- log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
- }
- }
- if len(c.free) == 0 {
- c.grow()
- }
- i := len(c.free) - 1
- p, c.free = c.free[i], c.free[:i]
- p.prev = nil
- p.next = nil
- p.Seen = ts
- p.Bytes = p.buf[:0]
- c.used++
- return p
-}
-
-// replace replaces a page into the pageCache.
-func (c *pageCache) replace(p *page) {
- c.used--
- c.free = append(c.free, p)
-}
-
-// Stream is implemented by the caller to handle incoming reassembled
-// TCP data. Callers create a StreamFactory, then StreamPool uses
-// it to create a new Stream for every TCP stream.
-//
-// assembly will, in order:
-// 1) Create the stream via StreamFactory.New
-// 2) Call Reassembled 0 or more times, passing in reassembled TCP data in order
-// 3) Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.
-type Stream interface {
- // Reassembled is called zero or more times. assembly guarantees
- // that the set of all Reassembly objects passed in during all
- // calls are presented in the order they appear in the TCP stream.
- // Reassembly objects are reused after each Reassembled call,
- // so it's important to copy anything you need out of them
- // (specifically out of Reassembly.Bytes) that you need to stay
- // around after you return from the Reassembled call.
- Reassembled([]Reassembly)
- // ReassemblyComplete is called when assembly decides there is
- // no more data for this Stream, either because a FIN or RST packet
- // was seen, or because the stream has timed out without any new
- // packet data (due to a call to FlushOlderThan).
- ReassemblyComplete()
-}
-
-// StreamFactory is used by assembly to create a new stream for each
-// new TCP session.
-type StreamFactory interface {
- // New should return a new stream for the given TCP key.
- New(netFlow, tcpFlow gopacket.Flow) Stream
-}
-
-func (p *StreamPool) connections() []*connection {
- p.mu.RLock()
- conns := make([]*connection, 0, len(p.conns))
- for _, conn := range p.conns {
- conns = append(conns, conn)
- }
- p.mu.RUnlock()
- return conns
-}
-
-// FlushOptions provide options for flushing connections.
-type FlushOptions struct {
- T time.Time // If nonzero, only connections with data older than T are flushed
- CloseAll bool // If true, ALL connections are closed post flush, not just those that correctly see FIN/RST.
-}
-
-// FlushWithOptions finds any streams waiting for packets older than
-// the given time, and pushes through the data they have (IE: tells
-// them to stop waiting and skip the data they're waiting for).
-//
-// Each Stream maintains a list of zero or more sets of bytes it has received
-// out-of-order. For example, if it has processed up through sequence number
-// 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of
-// bytes also has the timestamp it was originally viewed. A flush call will
-// look at the smallest subsequent set of bytes, in this case [15-20), and if
-// its timestamp is older than the passed-in time, it will push it and all
-// contiguous byte-sets out to the Stream's Reassembled function. In this case,
-// it will push [15-20), but also [20-25), since that's contiguous. It will
-// only push [30-50) if its timestamp is also older than the passed-in time,
-// otherwise it will wait until the next FlushOlderThan to see if bytes [25-30)
-// come in.
-//
-// If it pushes all bytes (or there were no sets of bytes to begin with)
-// AND the connection has not received any bytes since the passed-in time,
-// the connection will be closed.
-//
-// If CloseAll is set, it will close out connections that have been drained.
-// Regardless of the CloseAll setting, connections stale for the specified
-// time will be closed.
-//
-// Returns the number of connections flushed, and of those, the number closed
-// because of the flush.
-func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int) {
- conns := a.connPool.connections()
- closes := 0
- flushes := 0
- for _, conn := range conns {
- flushed := false
- conn.mu.Lock()
- if conn.closed {
- // Already closed connection, nothing to do here.
- conn.mu.Unlock()
- continue
- }
- for conn.first != nil && conn.first.Seen.Before(opt.T) {
- a.skipFlush(conn)
- flushed = true
- if conn.closed {
- closes++
- break
- }
- }
- if opt.CloseAll && !conn.closed && conn.first == nil && conn.lastSeen.Before(opt.T) {
- flushed = true
- a.closeConnection(conn)
- closes++
- }
- if flushed {
- flushes++
- }
- conn.mu.Unlock()
- }
- return flushes, closes
-}
-
-// FlushOlderThan calls FlushWithOptions with the CloseAll option set to true.
-func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int) {
- return a.FlushWithOptions(FlushOptions{CloseAll: true, T: t})
-}
-
-// FlushAll flushes all remaining data into all remaining connections, closing
-// those connections. It returns the total number of connections flushed/closed
-// by the call.
-func (a *Assembler) FlushAll() (closed int) {
- conns := a.connPool.connections()
- closed = len(conns)
- for _, conn := range conns {
- conn.mu.Lock()
- for !conn.closed {
- a.skipFlush(conn)
- }
- conn.mu.Unlock()
- }
- return
-}
-
-type key [2]gopacket.Flow
-
-func (k *key) String() string {
- return fmt.Sprintf("%s:%s", k[0], k[1])
-}
-
-// StreamPool stores all streams created by Assemblers, allowing multiple
-// assemblers to work together on stream processing while enforcing the fact
-// that a single stream receives its data serially. It is safe
-// for concurrency, usable by multiple Assemblers at once.
-//
-// StreamPool handles the creation and storage of Stream objects used by one or
-// more Assembler objects. When a new TCP stream is found by an Assembler, it
-// creates an associated Stream by calling its StreamFactory's New method.
-// Thereafter (until the stream is closed), that Stream object will receive
-// assembled TCP data via Assembler's calls to the stream's Reassembled
-// function.
-//
-// Like the Assembler, StreamPool attempts to minimize allocation. Unlike the
-// Assembler, though, it does have to do some locking to make sure that the
-// connection objects it stores are accessible to multiple Assemblers.
-type StreamPool struct {
- conns map[key]*connection
- users int
- mu sync.RWMutex
- factory StreamFactory
- free []*connection
- all [][]connection
- nextAlloc int
- newConnectionCount int64
-}
-
-func (p *StreamPool) grow() {
- conns := make([]connection, p.nextAlloc)
- p.all = append(p.all, conns)
- for i := range conns {
- p.free = append(p.free, &conns[i])
- }
- if *memLog {
- log.Println("StreamPool: created", p.nextAlloc, "new connections")
- }
- p.nextAlloc *= 2
-}
-
-// NewStreamPool creates a new connection pool. Streams will
-// be created as necessary using the passed-in StreamFactory.
-func NewStreamPool(factory StreamFactory) *StreamPool {
- return &StreamPool{
- conns: make(map[key]*connection, initialAllocSize),
- free: make([]*connection, 0, initialAllocSize),
- factory: factory,
- nextAlloc: initialAllocSize,
- }
-}
-
-const assemblerReturnValueInitialSize = 16
-
-// NewAssembler creates a new assembler. Pass in the StreamPool
-// to use, may be shared across assemblers.
-//
-// This sets some sane defaults for the assembler options,
-// see DefaultAssemblerOptions for details.
-func NewAssembler(pool *StreamPool) *Assembler {
- pool.mu.Lock()
- pool.users++
- pool.mu.Unlock()
- return &Assembler{
- ret: make([]Reassembly, assemblerReturnValueInitialSize),
- pc: newPageCache(),
- connPool: pool,
- AssemblerOptions: DefaultAssemblerOptions,
- }
-}
-
-// DefaultAssemblerOptions provides default options for an assembler.
-// These options are used by default when calling NewAssembler, so if
-// modified before a NewAssembler call they'll affect the resulting Assembler.
-//
-// Note that the default options can result in ever-increasing memory usage
-// unless one of the Flush* methods is called on a regular basis.
-var DefaultAssemblerOptions = AssemblerOptions{
- MaxBufferedPagesPerConnection: 0, // unlimited
- MaxBufferedPagesTotal: 0, // unlimited
-}
-
-type connection struct {
- key key
- pages int
- first, last *page
- nextSeq Sequence
- created, lastSeen time.Time
- stream Stream
- closed bool
- mu sync.Mutex
-}
-
-func (c *connection) reset(k key, s Stream, ts time.Time) {
- c.key = k
- c.pages = 0
- c.first, c.last = nil, nil
- c.nextSeq = invalidSequence
- c.created = ts
- c.stream = s
- c.closed = false
-}
-
-// AssemblerOptions controls the behavior of each assembler. Modify the
-// options of each assembler you create to change their behavior.
-type AssemblerOptions struct {
- // MaxBufferedPagesTotal is an upper limit on the total number of pages to
- // buffer while waiting for out-of-order packets. Once this limit is
- // reached, the assembler will degrade to flushing every connection it
- // gets a packet for. If <= 0, this is ignored.
- MaxBufferedPagesTotal int
- // MaxBufferedPagesPerConnection is an upper limit on the number of pages
- // buffered for a single connection. Should this limit be reached for a
- // particular connection, the smallest sequence number will be flushed, along
- // with any contiguous data. If <= 0, this is ignored.
- MaxBufferedPagesPerConnection int
-}
-
-// Assembler handles reassembling TCP streams. It is not safe for
-// concurrency... after passing a packet in via the Assemble call, the caller
-// must wait for that call to return before calling Assemble again. Callers can
-// get around this by creating multiple assemblers that share a StreamPool. In
-// that case, each individual stream will still be handled serially (each stream
-// has an individual mutex associated with it), however multiple assemblers can
-// assemble different connections concurrently.
-//
-// The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing
-// applications written in Go. The Assembler uses the following methods to be
-// as fast as possible, to keep packet processing speedy:
-//
-// Avoids Lock Contention
-//
-// Assemblers locks connections, but each connection has an individual lock, and
-// rarely will two Assemblers be looking at the same connection. Assemblers
-// lock the StreamPool when looking up connections, but they use Reader
-// locks initially, and only force a write lock if they need to create a new
-// connection or close one down. These happen much less frequently than
-// individual packet handling.
-//
-// Each assembler runs in its own goroutine, and the only state shared between
-// goroutines is through the StreamPool. Thus all internal Assembler state
-// can be handled without any locking.
-//
-// NOTE: If you can guarantee that packets going to a set of Assemblers will
-// contain information on different connections per Assembler (for example,
-// they're already hashed by PF_RING hashing or some other hashing mechanism),
-// then we recommend you use a seperate StreamPool per Assembler, thus
-// avoiding all lock contention. Only when different Assemblers could receive
-// packets for the same Stream should a StreamPool be shared between them.
-//
-// Avoids Memory Copying
-//
-// In the common case, handling of a single TCP packet should result in zero
-// memory allocations. The Assembler will look up the connection, figure out
-// that the packet has arrived in order, and immediately pass that packet on to
-// the appropriate connection's handling code. Only if a packet arrives out of
-// order is its contents copied and stored in memory for later.
-//
-// Avoids Memory Allocation
-//
-// Assemblers try very hard to not use memory allocation unless absolutely
-// necessary. Packet data for sequential packets is passed directly to streams
-// with no copying or allocation. Packet data for out-of-order packets is
-// copied into reusable pages, and new pages are only allocated rarely when the
-// page cache runs out. Page caches are Assembler-specific, thus not used
-// concurrently and requiring no locking.
-//
-// Internal representations for connection objects are also reused over time.
-// Because of this, the most common memory allocation done by the Assembler is
-// generally what's done by the caller in StreamFactory.New. If no allocation
-// is done there, then very little allocation is done ever, mostly to handle
-// large increases in bandwidth or numbers of connections.
-//
-// TODO: The page caches used by an Assembler will grow to the size necessary
-// to handle a workload, and currently will never shrink. This means that
-// traffic spikes can result in large memory usage which isn't garbage
-// collected when typical traffic levels return.
-type Assembler struct {
- AssemblerOptions
- ret []Reassembly
- pc *pageCache
- connPool *StreamPool
-}
-
-func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
- if *memLog {
- p.newConnectionCount++
- if p.newConnectionCount&0x7FFF == 0 {
- log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
- }
- }
- if len(p.free) == 0 {
- p.grow()
- }
- index := len(p.free) - 1
- c, p.free = p.free[index], p.free[:index]
- c.reset(k, s, ts)
- return c
-}
-
-// getConnection returns a connection. If end is true and a connection
-// does not already exist, returns nil. This allows us to check for a
-// connection without actually creating one if it doesn't already exist.
-func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
- p.mu.RLock()
- conn := p.conns[k]
- p.mu.RUnlock()
- if end || conn != nil {
- return conn
- }
- s := p.factory.New(k[0], k[1])
- p.mu.Lock()
- conn = p.newConnection(k, s, ts)
- if conn2 := p.conns[k]; conn2 != nil {
- p.mu.Unlock()
- return conn2
- }
- p.conns[k] = conn
- p.mu.Unlock()
- return conn
-}
-
-// Assemble calls AssembleWithTimestamp with the current timestamp, useful for
-// packets being read directly off the wire.
-func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP) {
- a.AssembleWithTimestamp(netFlow, t, time.Now())
-}
-
-// AssembleWithTimestamp reassembles the given TCP packet into its appropriate
-// stream.
-//
-// The timestamp passed in must be the timestamp the packet was seen.
-// For packets read off the wire, time.Now() should be fine. For packets read
-// from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp
-// will affect which streams are flushed by a call to FlushOlderThan.
-//
-// Each Assemble call results in, in order:
-//
-// zero or one calls to StreamFactory.New, creating a stream
-// zero or one calls to Reassembled on a single stream
-// zero or one calls to ReassemblyComplete on the same stream
-func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
- // Ignore empty TCP packets
- if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
- if *debugLog {
- log.Println("ignoring useless packet")
- }
- return
- }
-
- a.ret = a.ret[:0]
- key := key{netFlow, t.TransportFlow()}
- var conn *connection
- // This for loop handles a race condition where a connection will close, lock
- // the connection pool, and remove itself, but before it locked the connection
- // pool it's returned to another Assemble statement. This should loop 0-1
- // times for the VAST majority of cases.
- for {
- conn = a.connPool.getConnection(
- key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
- if conn == nil {
- if *debugLog {
- log.Printf("%v got empty packet on otherwise empty connection", key)
- }
- return
- }
- conn.mu.Lock()
- if !conn.closed {
- break
- }
- conn.mu.Unlock()
- }
- if conn.lastSeen.Before(timestamp) {
- conn.lastSeen = timestamp
- }
- seq, bytes := Sequence(t.Seq), t.Payload
- if conn.nextSeq == invalidSequence {
- if t.SYN {
- if *debugLog {
- log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
- }
- a.ret = append(a.ret, Reassembly{
- Bytes: bytes,
- Skip: 0,
- Start: true,
- Seen: timestamp,
- })
- conn.nextSeq = seq.Add(len(bytes) + 1)
- } else {
- if *debugLog {
- log.Printf("%v waiting for start, storing into connection", key)
- }
- a.insertIntoConn(t, conn, timestamp)
- }
- } else if diff := conn.nextSeq.Difference(seq); diff > 0 {
- if *debugLog {
- log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
- }
- a.insertIntoConn(t, conn, timestamp)
- } else {
- bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
- if *debugLog {
- log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
- }
- a.ret = append(a.ret, Reassembly{
- Bytes: bytes,
- Skip: 0,
- End: t.RST || t.FIN,
- Seen: timestamp,
- })
- }
- if len(a.ret) > 0 {
- a.sendToConnection(conn)
- }
- conn.mu.Unlock()
-}
-
-func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
- if expected == invalidSequence {
- return bytes, received.Add(len(bytes))
- }
- span := int(received.Difference(expected))
- if span <= 0 {
- return bytes, received.Add(len(bytes))
- } else if len(bytes) < span {
- return nil, expected
- }
- return bytes[span:], expected.Add(len(bytes) - span)
-}
-
-// sendToConnection sends the current values in a.ret to the connection, closing
-// the connection if the last thing sent had End set.
-func (a *Assembler) sendToConnection(conn *connection) {
- a.addContiguous(conn)
- if conn.stream == nil {
- panic("why?")
- }
- conn.stream.Reassembled(a.ret)
- if a.ret[len(a.ret)-1].End {
- a.closeConnection(conn)
- }
-}
-
-// addContiguous adds contiguous byte-sets to a connection.
-func (a *Assembler) addContiguous(conn *connection) {
- for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
- a.addNextFromConn(conn)
- }
-}
-
-// skipFlush skips the first set of bytes we're waiting for and returns the
-// first set of bytes we have. If we have no bytes pending, it closes the
-// connection.
-func (a *Assembler) skipFlush(conn *connection) {
- if *debugLog {
- log.Printf("%v skipFlush %v", conn.key, conn.nextSeq)
- }
- if conn.first == nil {
- a.closeConnection(conn)
- return
- }
- a.ret = a.ret[:0]
- a.addNextFromConn(conn)
- a.addContiguous(conn)
- a.sendToConnection(conn)
-}
-
-func (p *StreamPool) remove(conn *connection) {
- p.mu.Lock()
- delete(p.conns, conn.key)
- p.free = append(p.free, conn)
- p.mu.Unlock()
-}
-
-func (a *Assembler) closeConnection(conn *connection) {
- if *debugLog {
- log.Printf("%v closing", conn.key)
- }
- conn.stream.ReassemblyComplete()
- conn.closed = true
- a.connPool.remove(conn)
- for p := conn.first; p != nil; p = p.next {
- a.pc.replace(p)
- }
-}
-
-// traverseConn traverses our doubly-linked list of pages for the correct
-// position to put the given sequence number. Note that it traverses backwards,
-// starting at the highest sequence number and going down, since we assume the
-// common case is that TCP packets for a stream will appear in-order, with
-// minimal loss or packet reordering.
-func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
- prev = c.last
- for prev != nil && prev.seq.Difference(seq) < 0 {
- current = prev
- prev = current.prev
- }
- return
-}
-
-// pushBetween inserts the doubly-linked list first-...-last in between the
-// nodes prev-next in another doubly-linked list. If prev is nil, makes first
-// the new first page in the connection's list. If next is nil, makes last the
-// new last page in the list. first/last may point to the same page.
-func (c *connection) pushBetween(prev, next, first, last *page) {
- // Maintain our doubly linked list
- if next == nil || c.last == nil {
- c.last = last
- } else {
- last.next = next
- next.prev = last
- }
- if prev == nil || c.first == nil {
- c.first = first
- } else {
- first.prev = prev
- prev.next = first
- }
-}
-
-func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
- if conn.first != nil && conn.first.seq == conn.nextSeq {
- panic("wtf")
- }
- p, p2, numPages := a.pagesFromTCP(t, ts)
- prev, current := conn.traverseConn(Sequence(t.Seq))
- conn.pushBetween(prev, current, p, p2)
- conn.pages += numPages
- if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
- (a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
- if *debugLog {
- log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
- }
- a.addNextFromConn(conn)
- }
-}
-
-// pagesFromTCP creates a page (or set of pages) from a TCP packet. Note that
-// it should NEVER receive a SYN packet, as it doesn't handle sequences
-// correctly.
-//
-// It returns the first and last page in its doubly-linked list of new pages.
-func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
- first := a.pc.next(ts)
- current := first
- numPages++
- seq, bytes := Sequence(t.Seq), t.Payload
- for {
- length := min(len(bytes), pageBytes)
- current.Bytes = current.buf[:length]
- copy(current.Bytes, bytes)
- current.seq = seq
- bytes = bytes[length:]
- if len(bytes) == 0 {
- break
- }
- seq = seq.Add(length)
- current.next = a.pc.next(ts)
- current.next.prev = current
- current = current.next
- numPages++
- }
- current.End = t.RST || t.FIN
- return first, current, numPages
-}
-
-// addNextFromConn pops the first page from a connection off and adds it to the
-// return array.
-func (a *Assembler) addNextFromConn(conn *connection) {
- if conn.nextSeq == invalidSequence {
- conn.first.Skip = -1
- } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
- conn.first.Skip = int(diff)
- }
- conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
- if *debugLog {
- log.Printf("%v adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
- }
- a.ret = append(a.ret, conn.first.Reassembly)
- a.pc.replace(conn.first)
- if conn.first == conn.last {
- conn.first = nil
- conn.last = nil
- } else {
- conn.first = conn.first.next
- conn.first.prev = nil
- }
- conn.pages--
-}
-
-func min(a, b int) int {
- if a < b {
- return a
- }
- return b
-}