summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRastislav Szabo <raszabo@cisco.com>2018-07-16 14:35:45 +0000
committerGerrit Code Review <gerrit@fd.io>2018-07-16 14:35:45 +0000
commitda815585c3f75c4ac073b0766dd668abf83844d8 (patch)
treea3dc0575e58dc826f4d6bcbe05b98c8a467c8a82
parenta679bbc78b98089f9b221bbe7de963f5274073bb (diff)
parent2e99b881938437dd6d6808bd233ce05d03af9f6e (diff)
Merge "Add support for jumbo frames to libmemif"
-rw-r--r--Makefile1
-rw-r--r--extras/libmemif/README.md6
-rw-r--r--extras/libmemif/adapter.go226
-rw-r--r--extras/libmemif/examples/jumbo-frames/jumbo-frames.go176
4 files changed, 372 insertions, 37 deletions
diff --git a/Makefile b/Makefile
index 7bed92c..ee06818 100644
--- a/Makefile
+++ b/Makefile
@@ -15,6 +15,7 @@ extras:
@cd extras/libmemif/examples/raw-data && go build -v
@cd extras/libmemif/examples/icmp-responder && go build -v
@cd extras/libmemif/examples/gopacket && go build -v
+ @cd extras/libmemif/examples/jumbo-frames && go build -v
clean:
@rm -f cmd/binapi-generator/binapi-generator
diff --git a/extras/libmemif/README.md b/extras/libmemif/README.md
index 854aa96..d663fd2 100644
--- a/extras/libmemif/README.md
+++ b/extras/libmemif/README.md
@@ -146,6 +146,12 @@ through each of the 3 queues. The received packets are printed to stdout.
Stop an instance of *raw-data* with an interrupt signal (^C).
+#### Jumbo Frames Raw data (libmemif <-> libmemif)
+
+*jumbo-frames* is simple example how to send larger and larger jumbo
+packets with libmemif adapter. This is simple copy of *raw-data* but with
+sending larger packets, so for more information read its code and documentation.
+
#### ICMP Responder
*icmp-responder* is a simple example showing how to answer APR and ICMP
diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go
index a74c5cf..d5a1563 100644
--- a/extras/libmemif/adapter.go
+++ b/extras/libmemif/adapter.go
@@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
return md->tx_queues[index];
}
-// Copy packet data into the selected buffer.
+// Copy packet data into the selected buffer with splitting when necessary
static void
-govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size)
+govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
{
- buffers[index].len = (size > buffers[index].len ? buffers[index].len : size);
- memcpy(buffers[index].data, data, (size_t)buffers[index].len);
+ int dataOffset = 0;
+
+ do {
+ buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
+ void * curData = (packetData + dataOffset);
+ memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
+ dataOffset += buffers[bufIndex].len;
+ bufIndex += 1;
+ packetSize -= buffers[bufIndex].len;
+ } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
}
// Get packet data from the selected buffer.
@@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
return buffers[index].data;
}
+// Checks if memif buffer is chained
+static int
+govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
+{
+ return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
+}
+
+// Allocate memif buffers and return pointer to next free buffer
+static int
+govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
+ memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
+ uint16_t count, uint16_t * count_out, uint16_t size)
+{
+ memif_buffer_t * offsetBufs = (bufs + offset);
+ int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
+ *count_out += offset;
+ *nextFreeBuf = offsetBufs;
+ return err;
+}
+
*/
import "C"
@@ -345,6 +373,7 @@ type Memif struct {
// Rx/Tx queues
ringSize int // number of items in each ring
+ bufferSize int // max buffer size
stopQPollFd int // event file descriptor used to stop pollRxQueue-s
wg sync.WaitGroup // wait group for all pollRxQueue-s
rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
@@ -388,6 +417,7 @@ type MemifQueueDetails struct {
type CPacketBuffers struct {
buffers *C.memif_buffer_t
count int
+ rxChainBuf []RawPacketData
}
// Context is a global Go-libmemif runtime context.
@@ -400,6 +430,11 @@ type Context struct {
wg sync.WaitGroup /* wait-group for pollEvents() */
}
+type txPacketBuffer struct {
+ packets []RawPacketData
+ size int
+}
+
var (
// logger used by the adapter.
log *logger.Logger
@@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
log2RingSize = 10
}
+ bufferSize := config.BufferSize
+ if bufferSize <= 0 {
+ bufferSize = 2048
+ }
+
// Create memif-wrapper for Go-libmemif.
memif = &Memif{
- MemifMeta: config.MemifMeta,
- callbacks: &MemifCallbacks{},
- ifIndex: context.nextMemifIndex,
- ringSize: 1 << log2RingSize,
+ MemifMeta: config.MemifMeta,
+ callbacks: &MemifCallbacks{},
+ ifIndex: context.nextMemifIndex,
+ ringSize: 1 << log2RingSize,
+ bufferSize: int(bufferSize),
}
// Initialize memif callbacks.
@@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
// Multiple TxBurst-s can run concurrently provided that each targets a different
// TX queue.
func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
- var sentCount C.uint16_t
- var allocated C.uint16_t
- var bufSize int
-
if len(packets) == 0 {
return 0, nil
}
@@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
return 0, ErrQueueID
}
- // The largest packet in the set determines the packet buffer size.
+ var bufCount int
+ var buffers []*txPacketBuffer
+ cQueueID := C.uint16_t(queueID)
+
for _, packet := range packets {
- if len(packet) > int(bufSize) {
- bufSize = len(packet)
+ packetLen := len(packet)
+ log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
+
+ if packetLen > memif.bufferSize {
+ // Create jumbo buffer
+ buffer := &txPacketBuffer{
+ size: packetLen,
+ packets: []RawPacketData{packet},
+ }
+
+ buffers = append(buffers, buffer)
+
+ // Increment bufCount by number of splits in this jumbo
+ bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
+ } else {
+ buffersLen := len(buffers)
+
+ // This is very first buffer so there is no data to append to, prepare empty one
+ if buffersLen == 0 {
+ buffers = []*txPacketBuffer{{}}
+ buffersLen = 1
+ }
+
+ lastBuffer := buffers[buffersLen-1]
+
+ // Last buffer is jumbo buffer, create new buffer
+ if lastBuffer.size > memif.bufferSize {
+ lastBuffer = &txPacketBuffer{}
+ buffers = append(buffers, lastBuffer)
+ }
+
+ // Determine buffer size by max packet size in buffer
+ if packetLen > lastBuffer.size {
+ lastBuffer.size = packetLen
+ }
+
+ lastBuffer.packets = append(lastBuffer.packets, packet)
+ bufCount += 1
}
}
// Reallocate Tx buffers if needed to fit the input packets.
- pb := memif.txQueueBufs[queueID]
- bufCount := len(packets)
+ log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
+ pb := &memif.txQueueBufs[queueID]
if pb.count < bufCount {
newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
if newBuffers == nil {
@@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
}
// Allocate ring slots.
- cQueueID := C.uint16_t(queueID)
- errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount),
- &allocated, C.uint32_t(bufSize))
- err = getMemifError(int(errCode))
- if err == ErrNoBufRing {
- // Not enough ring slots, <count> will be less than bufCount.
- err = nil
- }
- if err != nil {
- return 0, err
- }
+ var allocated C.uint16_t
+ var subCount C.uint16_t
+ for _, buffer := range buffers {
+ packetCount := C.uint16_t(len(buffer.packets))
+ isJumbo := buffer.size > memif.bufferSize
+
+ log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
+ cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
+
+ var nextFreeBuff *C.memif_buffer_t
+ startOffset := allocated
+ errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
+ packetCount, &allocated, C.uint16_t(buffer.size))
+
+ err = getMemifError(int(errCode))
+ endEarly := err == ErrNoBufRing
+ if endEarly {
+ // Not enough ring slots, <count> will be less than packetCount.
+ err = nil
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ // Copy packet data into the buffers.
+ nowAllocated := allocated - startOffset
+ toFill := nowAllocated
+ if !isJumbo {
+ // If this is not jumbo frame, only 1 packet needs to be copied each iteration
+ toFill = 1
+ }
+
+ // Iterate over all packets and try to fill them into allocated buffers
+ // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
+ for i, packet := range buffer.packets {
+ if i >= int(nowAllocated) {
+ // There was less allocated buffers than actual packet count so exit early
+ break
+ }
+
+ packetData := unsafe.Pointer(&packet[0])
+ C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
+ }
+
+ if isJumbo && nowAllocated > 0 {
+ // If we successfully allocated required amount of buffers for entire jumbo to be sent
+ // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
+ // it only sent 1 packet so it does not need to know anything about jumbo frames
+ subCount += nowAllocated - 1
+ }
- // Copy packet data into the buffers.
- for i := 0; i < int(allocated); i++ {
- packetData := unsafe.Pointer(&packets[i][0])
- C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i])))
+ // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
+ // to handle it next burst
+ if endEarly {
+ break
+ }
}
- errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
+ var sentCount C.uint16_t
+ errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
err = getMemifError(int(errCode))
if err != nil {
return 0, err
}
- count = uint16(sentCount)
- return count, nil
+ // Prevent negative values
+ realSent := uint16(sentCount) - uint16(subCount)
+ if subCount > sentCount {
+ sentCount = 0
+ }
+
+ log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
+ return realSent, nil
}
// RxBurst is used to receive multiple packets in one call from a selected queue.
@@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
}
// Reallocate Rx buffers if needed to fit the output packets.
- pb := memif.rxQueueBufs[queueID]
+ pb := &memif.rxQueueBufs[queueID]
bufCount := int(count)
if pb.count < bufCount {
newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
@@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
return packets, err
}
+ chained := len(pb.rxChainBuf) > 0
+ if chained {
+ // We had stored data from previous burst because last buffer in previous burst was chained
+ // so we need to continue appending to this data
+ packets = pb.rxChainBuf
+ pb.rxChainBuf = nil
+ }
+
// Copy packet data into the instances of RawPacketData.
for i := 0; i < int(recvCount); i++ {
var packetSize C.int
packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
- packets = append(packets, C.GoBytes(packetData, packetSize))
+ packetBytes := C.GoBytes(packetData, packetSize)
+
+ if chained {
+ // We have chained buffers, so start merging packet data with last read packet
+ prevPacket := packets[len(packets)-1]
+ packets[len(packets)-1] = append(prevPacket, packetBytes...)
+ } else {
+ packets = append(packets, packetBytes)
+ }
+
+ // Mark last buffer as chained based on property on current buffer so next buffers
+ // will try to append data to this one in case we got jumbo frame
+ chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
}
if recvCount > 0 {
@@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
packets = nil
}
+ if chained {
+ // We did not had enough space to process all chained buffers to the end so simply tell
+ // reader that it should not process any packets here and save them for next burst
+ // to finish reading the buffer chain
+ pb.rxChainBuf = packets
+ packets = nil
+ err = ErrNoBuf
+ }
+
return packets, err
}
diff --git a/extras/libmemif/examples/jumbo-frames/jumbo-frames.go b/extras/libmemif/examples/jumbo-frames/jumbo-frames.go
new file mode 100644
index 0000000..1bc943f
--- /dev/null
+++ b/extras/libmemif/examples/jumbo-frames/jumbo-frames.go
@@ -0,0 +1,176 @@
+// jumbo-frames is simple example how to send larger and larger jumbo packets with libmemif adapter. This is simple copy
+// of raw-data but with sending larger packets, so for more information read its code and docs.
+package main
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "sync"
+ "time"
+
+ "git.fd.io/govpp.git/extras/libmemif"
+)
+
+const (
+ Socket = "/tmp/jumbo-frames-example"
+ Secret = "secret"
+ ConnectionID = 1
+ NumQueues uint8 = 3
+)
+
+var wg sync.WaitGroup
+var stopCh chan struct{}
+
+func OnConnect(memif *libmemif.Memif) (err error) {
+ details, err := memif.GetDetails()
+ if err != nil {
+ fmt.Printf("libmemif.GetDetails() error: %v\n", err)
+ }
+ fmt.Printf("memif %s has been connected: %+v\n", memif.IfName, details)
+
+ stopCh = make(chan struct{})
+ var i uint8
+ for i = 0; i < uint8(len(details.RxQueues)); i++ {
+ wg.Add(1)
+ go ReadAndPrintPackets(memif, i)
+ }
+ for i = 0; i < uint8(len(details.TxQueues)); i++ {
+ wg.Add(1)
+ go SendPackets(memif, i)
+ }
+ return nil
+}
+
+func OnDisconnect(memif *libmemif.Memif) (err error) {
+ fmt.Printf("memif %s has been disconnected\n", memif.IfName)
+ close(stopCh)
+ wg.Wait()
+ return nil
+}
+
+func ReadAndPrintPackets(memif *libmemif.Memif, queueID uint8) {
+ defer wg.Done()
+
+ interruptCh, err := memif.GetQueueInterruptChan(queueID)
+ if err != nil {
+ switch err {
+ case libmemif.ErrQueueID:
+ fmt.Printf("libmemif.Memif.GetQueueInterruptChan() complains about invalid queue id!?")
+ default:
+ fmt.Printf("libmemif.Memif.GetQueueInterruptChan() error: %v\n", err)
+ }
+ return
+ }
+
+ counter := 0
+ for {
+ select {
+ case <-interruptCh:
+ counter++
+ for {
+ packets, err := memif.RxBurst(queueID, 10)
+ if err != nil {
+ fmt.Printf("libmemif.Memif.RxBurst() error: %v\n", err)
+ } else {
+ if len(packets) == 0 {
+ break
+ }
+ for _, packet := range packets {
+ fmt.Printf("Received packet queue=%d: %v in burst %d\n", queueID, len(packet), counter)
+ }
+ }
+ }
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+func SendPackets(memif *libmemif.Memif, queueID uint8) {
+ defer wg.Done()
+
+ counter := 0
+ for {
+ select {
+ case <-time.After(3 * time.Second):
+ counter++
+ packetMul := counter % 100 + 1 // Limit max iterations to 100 to not go out of bounds
+ packets := []libmemif.RawPacketData{
+ make([]byte, 128*packetMul),
+ make([]byte, 256*packetMul),
+ make([]byte, 512*packetMul),
+ }
+ sent := 0
+ for {
+ count, err := memif.TxBurst(queueID, packets[sent:])
+ if err != nil {
+ fmt.Printf("libmemif.Memif.TxBurst() error: %v\n", err)
+ break
+ } else {
+ fmt.Printf("libmemif.Memif.TxBurst() has sent %d packets in burst %v.\n", count, counter)
+ sent += int(count)
+ if sent == len(packets) {
+ break
+ }
+ }
+ }
+ case <-stopCh:
+ return
+ }
+ }
+}
+
+func main() {
+ var isMaster = true
+ var appSuffix string
+ if len(os.Args) > 1 && (os.Args[1] == "--slave" || os.Args[1] == "-slave") {
+ isMaster = false
+ appSuffix = "-slave"
+ }
+
+ appName := "jumbo-frames" + appSuffix
+ fmt.Println("Initializing libmemif as ", appName)
+ err := libmemif.Init(appName)
+ if err != nil {
+ fmt.Printf("libmemif.Init() error: %v\n", err)
+ return
+ }
+ defer libmemif.Cleanup()
+
+ memifCallbacks := &libmemif.MemifCallbacks{
+ OnConnect: OnConnect,
+ OnDisconnect: OnDisconnect,
+ }
+
+ memifConfig := &libmemif.MemifConfig{
+ MemifMeta: libmemif.MemifMeta{
+ IfName: "memif1",
+ ConnID: ConnectionID,
+ SocketFilename: Socket,
+ Secret: Secret,
+ IsMaster: isMaster,
+ Mode: libmemif.IfModeEthernet,
+ },
+ MemifShmSpecs: libmemif.MemifShmSpecs{
+ NumRxQueues: NumQueues,
+ NumTxQueues: NumQueues,
+ BufferSize: 2048,
+ Log2RingSize: 10,
+ },
+ }
+
+ fmt.Printf("Callbacks: %+v\n", memifCallbacks)
+ fmt.Printf("Config: %+v\n", memifConfig)
+
+ memif, err := libmemif.CreateInterface(memifConfig, memifCallbacks)
+ if err != nil {
+ fmt.Printf("libmemif.CreateInterface() error: %v\n", err)
+ return
+ }
+ defer memif.Close()
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, os.Interrupt)
+ <-sigChan
+}