1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
|
// Copyright 2012 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// +build linux
// Package afpacket provides Go bindings for MMap'd AF_PACKET socket reading.
package afpacket
// Couldn't have done this without:
// http://lxr.free-electrons.com/source/Documentation/networking/packet_mmap.txt
// http://codemonkeytips.blogspot.co.uk/2011/07/asynchronous-packet-socket-reading-with.html
import (
"errors"
"fmt"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"golang.org/x/net/bpf"
"golang.org/x/sys/unix"
"github.com/google/gopacket"
)
/*
#include <linux/if_packet.h> // AF_PACKET, sockaddr_ll
#include <linux/if_ether.h> // ETH_P_ALL
#include <sys/socket.h> // socket()
#include <unistd.h> // close()
#include <arpa/inet.h> // htons()
#include <sys/mman.h> // mmap(), munmap()
#include <poll.h> // poll()
*/
import "C"
var pageSize = unix.Getpagesize()
var tpacketAlignment = uint(C.TPACKET_ALIGNMENT)
// ErrPoll returned by poll
var ErrPoll = errors.New("packet poll failed")
// ErrTimeout returned on poll timeout
var ErrTimeout = errors.New("packet poll timeout expired")
func tpacketAlign(v int) int {
return int((uint(v) + tpacketAlignment - 1) & ((^tpacketAlignment) - 1))
}
// Stats is a set of counters detailing the work TPacket has done so far.
type Stats struct {
// Packets is the total number of packets returned to the caller.
Packets int64
// Polls is the number of blocking syscalls made waiting for packets.
// This should always be <= Packets, since with TPacket one syscall
// can (and often does) return many results.
Polls int64
}
// SocketStats is a struct where socket stats are stored
type SocketStats C.struct_tpacket_stats
// SocketStatsV3 is a struct where socket stats for TPacketV3 are stored
type SocketStatsV3 C.struct_tpacket_stats_v3
// TPacket implements packet receiving for Linux AF_PACKET versions 1, 2, and 3.
type TPacket struct {
// fd is the C file descriptor.
fd int
// ring points to the memory space of the ring buffer shared by tpacket and the kernel.
ring []byte
// rawring is the unsafe pointer that we use to poll for packets
rawring unsafe.Pointer
// opts contains read-only options for the TPacket object.
opts options
mu sync.Mutex // guards below
// offset is the offset into the ring of the current header.
offset int
// current is the current header.
current header
// pollset is used by TPacket for its poll() call.
pollset unix.PollFd
// shouldReleasePacket is set to true whenever we return packet data, to make sure we remember to release that data back to the kernel.
shouldReleasePacket bool
// headerNextNeeded is set to true when header need to move to the next packet. No need to move it case of poll error.
headerNextNeeded bool
// tpVersion is the version of TPacket actually in use, set by setRequestedTPacketVersion.
tpVersion OptTPacketVersion
// Hackity hack hack hack. We need to return a pointer to the header with
// getTPacketHeader, and we don't want to allocate a v3wrapper every time,
// so we leave it in the TPacket object and return a pointer to it.
v3 v3wrapper
statsMu sync.Mutex // guards stats below
// stats is simple statistics on TPacket's run.
stats Stats
// socketStats contains stats from the socket
socketStats SocketStats
// same as socketStats, but with an extra field freeze_q_cnt
socketStatsV3 SocketStatsV3
}
var _ gopacket.ZeroCopyPacketDataSource = &TPacket{}
// bindToInterface binds the TPacket socket to a particular named interface.
func (h *TPacket) bindToInterface(ifaceName string) error {
ifIndex := 0
// An empty string here means to listen to all interfaces
if ifaceName != "" {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return fmt.Errorf("InterfaceByName: %v", err)
}
ifIndex = iface.Index
}
s := &unix.SockaddrLinklayer{
Protocol: htons(uint16(unix.ETH_P_ALL)),
Ifindex: ifIndex,
}
return unix.Bind(h.fd, s)
}
// setTPacketVersion asks the kernel to set TPacket to a particular version, and returns an error on failure.
func (h *TPacket) setTPacketVersion(version OptTPacketVersion) error {
if err := unix.SetsockoptInt(h.fd, unix.SOL_PACKET, unix.PACKET_VERSION, int(version)); err != nil {
return fmt.Errorf("setsockopt packet_version: %v", err)
}
return nil
}
// setRequestedTPacketVersion tries to set TPacket to the requested version or versions.
func (h *TPacket) setRequestedTPacketVersion() error {
switch {
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion3) && h.setTPacketVersion(TPacketVersion3) == nil:
h.tpVersion = TPacketVersion3
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion2) && h.setTPacketVersion(TPacketVersion2) == nil:
h.tpVersion = TPacketVersion2
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion1) && h.setTPacketVersion(TPacketVersion1) == nil:
h.tpVersion = TPacketVersion1
default:
return errors.New("no known tpacket versions work on this machine")
}
return nil
}
// setUpRing sets up the shared-memory ring buffer between the user process and the kernel.
func (h *TPacket) setUpRing() (err error) {
totalSize := int(h.opts.framesPerBlock * h.opts.numBlocks * h.opts.frameSize)
switch h.tpVersion {
case TPacketVersion1, TPacketVersion2:
var tp C.struct_tpacket_req
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring: %v", err)
}
case TPacketVersion3:
var tp C.struct_tpacket_req3
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
}
default:
return errors.New("invalid tpVersion")
}
h.ring, err = unix.Mmap(h.fd, 0, totalSize, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
if err != nil {
return err
}
if h.ring == nil {
return errors.New("no ring")
}
h.rawring = unsafe.Pointer(&h.ring[0])
return nil
}
// Close cleans up the TPacket. It should not be used after the Close call.
func (h *TPacket) Close() {
if h.fd == -1 {
return // already closed.
}
if h.ring != nil {
unix.Munmap(h.ring)
}
h.ring = nil
unix.Close(h.fd)
h.fd = -1
runtime.SetFinalizer(h, nil)
}
// NewTPacket returns a new TPacket object for reading packets off the wire.
// Its behavior may be modified by passing in any/all of afpacket.Opt* to this
// function.
// If this function succeeds, the user should be sure to Close the returned
// TPacket when finished with it.
func NewTPacket(opts ...interface{}) (h *TPacket, err error) {
h = &TPacket{}
if h.opts, err = parseOptions(opts...); err != nil {
return nil, err
}
fd, err := unix.Socket(unix.AF_PACKET, int(h.opts.socktype), int(htons(unix.ETH_P_ALL)))
if err != nil {
return nil, err
}
h.fd = fd
if err = h.bindToInterface(h.opts.iface); err != nil {
goto errlbl
}
if err = h.setRequestedTPacketVersion(); err != nil {
goto errlbl
}
if err = h.setUpRing(); err != nil {
goto errlbl
}
// Clear stat counter from socket
if err = h.InitSocketStats(); err != nil {
goto errlbl
}
runtime.SetFinalizer(h, (*TPacket).Close)
return h, nil
errlbl:
h.Close()
return nil, err
}
// SetBPF attaches a BPF filter to the underlying socket
func (h *TPacket) SetBPF(filter []bpf.RawInstruction) error {
var p unix.SockFprog
if len(filter) > int(^uint16(0)) {
return errors.New("filter too large")
}
p.Len = uint16(len(filter))
p.Filter = (*unix.SockFilter)(unsafe.Pointer(&filter[0]))
return setsockopt(h.fd, unix.SOL_SOCKET, unix.SO_ATTACH_FILTER, unsafe.Pointer(&p), unix.SizeofSockFprog)
}
func (h *TPacket) releaseCurrentPacket() error {
h.current.clearStatus()
h.offset++
h.shouldReleasePacket = false
return nil
}
// ZeroCopyReadPacketData reads the next packet off the wire, and returns its data.
// The slice returned by ZeroCopyReadPacketData points to bytes owned by the
// TPacket. Each call to ZeroCopyReadPacketData invalidates any data previously
// returned by ZeroCopyReadPacketData. Care must be taken not to keep pointers
// to old bytes when using ZeroCopyReadPacketData... if you need to keep data past
// the next time you call ZeroCopyReadPacketData, use ReadPacketData, which copies
// the bytes into a new buffer for you.
// tp, _ := NewTPacket(...)
// data1, _, _ := tp.ZeroCopyReadPacketData()
// // do everything you want with data1 here, copying bytes out of it if you'd like to keep them around.
// data2, _, _ := tp.ZeroCopyReadPacketData() // invalidates bytes in data1
func (h *TPacket) ZeroCopyReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
h.mu.Lock()
retry:
if h.current == nil || !h.headerNextNeeded || !h.current.next() {
if h.shouldReleasePacket {
h.releaseCurrentPacket()
}
h.current = h.getTPacketHeader()
if err = h.pollForFirstPacket(h.current); err != nil {
h.headerNextNeeded = false
h.mu.Unlock()
return
}
// We received an empty block
if h.current.getLength() == 0 {
goto retry
}
}
data = h.current.getData()
ci.Timestamp = h.current.getTime()
ci.CaptureLength = len(data)
ci.Length = h.current.getLength()
ci.InterfaceIndex = h.current.getIfaceIndex()
atomic.AddInt64(&h.stats.Packets, 1)
h.headerNextNeeded = true
h.mu.Unlock()
return
}
// Stats returns statistics on the packets the TPacket has seen so far.
func (h *TPacket) Stats() (Stats, error) {
return Stats{
Polls: atomic.LoadInt64(&h.stats.Polls),
Packets: atomic.LoadInt64(&h.stats.Packets),
}, nil
}
// InitSocketStats clears socket counters and return empty stats.
func (h *TPacket) InitSocketStats() error {
if h.tpVersion == TPacketVersion3 {
socklen := unsafe.Sizeof(h.socketStatsV3)
slt := C.socklen_t(socklen)
var ssv3 SocketStatsV3
err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ssv3), uintptr(unsafe.Pointer(&slt)))
if err != nil {
return err
}
h.socketStatsV3 = SocketStatsV3{}
} else {
socklen := unsafe.Sizeof(h.socketStats)
slt := C.socklen_t(socklen)
var ss SocketStats
err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ss), uintptr(unsafe.Pointer(&slt)))
if err != nil {
return err
}
h.socketStats = SocketStats{}
}
return nil
}
// SocketStats saves stats from the socket to the TPacket instance.
func (h *TPacket) SocketStats() (SocketStats, SocketStatsV3, error) {
h.statsMu.Lock()
defer h.statsMu.Unlock()
// We need to save the counters since asking for the stats will clear them
if h.tpVersion == TPacketVersion3 {
socklen := unsafe.Sizeof(h.socketStatsV3)
slt := C.socklen_t(socklen)
var ssv3 SocketStatsV3
err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ssv3), uintptr(unsafe.Pointer(&slt)))
if err != nil {
return SocketStats{}, SocketStatsV3{}, err
}
h.socketStatsV3.tp_packets += ssv3.tp_packets
h.socketStatsV3.tp_drops += ssv3.tp_drops
h.socketStatsV3.tp_freeze_q_cnt += ssv3.tp_freeze_q_cnt
return h.socketStats, h.socketStatsV3, nil
}
socklen := unsafe.Sizeof(h.socketStats)
slt := C.socklen_t(socklen)
var ss SocketStats
err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ss), uintptr(unsafe.Pointer(&slt)))
if err != nil {
return SocketStats{}, SocketStatsV3{}, err
}
h.socketStats.tp_packets += ss.tp_packets
h.socketStats.tp_drops += ss.tp_drops
return h.socketStats, h.socketStatsV3, nil
}
// ReadPacketDataTo reads packet data into a user-supplied buffer.
// This function reads up to the length of the passed-in slice.
// The number of bytes read into data will be returned in ci.CaptureLength,
// which is the minimum of the size of the passed-in buffer and the size of
// the captured packet.
func (h *TPacket) ReadPacketDataTo(data []byte) (ci gopacket.CaptureInfo, err error) {
var d []byte
d, ci, err = h.ZeroCopyReadPacketData()
if err != nil {
return
}
ci.CaptureLength = copy(data, d)
return
}
// ReadPacketData reads the next packet, copies it into a new buffer, and returns
// that buffer. Since the buffer is allocated by ReadPacketData, it is safe for long-term
// use. This implements gopacket.PacketDataSource.
func (h *TPacket) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
var d []byte
d, ci, err = h.ZeroCopyReadPacketData()
if err != nil {
return
}
data = make([]byte, len(d))
copy(data, d)
return
}
func (h *TPacket) getTPacketHeader() header {
switch h.tpVersion {
case TPacketVersion1:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
return (*v1header)(unsafe.Pointer(position))
case TPacketVersion2:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
return (*v2header)(unsafe.Pointer(position))
case TPacketVersion3:
// TPacket3 uses each block to return values, instead of each frame. Hence we need to rotate when we hit #blocks, not #frames.
if h.offset >= h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset*h.opts.framesPerBlock)
h.v3 = initV3Wrapper(unsafe.Pointer(position))
return &h.v3
}
panic("handle tpacket version is invalid")
}
func (h *TPacket) pollForFirstPacket(hdr header) error {
tm := int(h.opts.pollTimeout / time.Millisecond)
for hdr.getStatus()&C.TP_STATUS_USER == 0 {
h.pollset.Fd = int32(h.fd)
h.pollset.Events = unix.POLLIN
h.pollset.Revents = 0
n, err := unix.Poll([]unix.PollFd{h.pollset}, tm)
if n == 0 {
return ErrTimeout
}
atomic.AddInt64(&h.stats.Polls, 1)
if h.pollset.Revents&unix.POLLERR > 0 {
return ErrPoll
}
if err != nil {
return err
}
}
h.shouldReleasePacket = true
return nil
}
// FanoutType determines the type of fanout to use with a TPacket SetFanout call.
type FanoutType int
// FanoutType values.
const (
FanoutHash FanoutType = 0
// It appears that defrag only works with FanoutHash, see:
// http://lxr.free-electrons.com/source/net/packet/af_packet.c#L1204
FanoutHashWithDefrag FanoutType = 0x8000
FanoutLoadBalance FanoutType = 1
FanoutCPU FanoutType = 2
)
// SetFanout activates TPacket's fanout ability.
// Use of Fanout requires creating multiple TPacket objects and the same id/type to
// a SetFanout call on each. Note that this can be done cross-process, so if two
// different processes both call SetFanout with the same type/id, they'll share
// packets between them. The same should work for multiple TPacket objects within
// the same process.
func (h *TPacket) SetFanout(t FanoutType, id uint16) error {
h.mu.Lock()
defer h.mu.Unlock()
arg := C.int(t) << 16
arg |= C.int(id)
return setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_FANOUT, unsafe.Pointer(&arg), unsafe.Sizeof(arg))
}
// WritePacketData transmits a raw packet.
func (h *TPacket) WritePacketData(pkt []byte) error {
_, err := unix.Write(h.fd, pkt)
return err
}
|