From 2e99b881938437dd6d6808bd233ce05d03af9f6e Mon Sep 17 00:00:00 2001 From: Tomas Slusny Date: Tue, 26 Jun 2018 16:53:48 +0200 Subject: Add support for jumbo frames to libmemif Add support for frames larger than maximum buffer size to libmemif adapter. Change-Id: I8b2d9fe7e05328194cd0aafd6e3ab40392ebbfed Signed-off-by: Tomas Slusny --- Makefile | 1 + extras/libmemif/README.md | 6 + extras/libmemif/adapter.go | 226 +++++++++++++++++---- .../libmemif/examples/jumbo-frames/jumbo-frames.go | 176 ++++++++++++++++ 4 files changed, 372 insertions(+), 37 deletions(-) create mode 100644 extras/libmemif/examples/jumbo-frames/jumbo-frames.go diff --git a/Makefile b/Makefile index 214f632..ad9aa5e 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ extras: @cd extras/libmemif/examples/raw-data && go build -v @cd extras/libmemif/examples/icmp-responder && go build -v @cd extras/libmemif/examples/gopacket && go build -v + @cd extras/libmemif/examples/jumbo-frames && go build -v clean: @rm -f cmd/binapi-generator/binapi-generator diff --git a/extras/libmemif/README.md b/extras/libmemif/README.md index 854aa96..d663fd2 100644 --- a/extras/libmemif/README.md +++ b/extras/libmemif/README.md @@ -146,6 +146,12 @@ through each of the 3 queues. The received packets are printed to stdout. Stop an instance of *raw-data* with an interrupt signal (^C). +#### Jumbo Frames Raw data (libmemif <-> libmemif) + +*jumbo-frames* is simple example how to send larger and larger jumbo +packets with libmemif adapter. This is simple copy of *raw-data* but with +sending larger packets, so for more information read its code and documentation. + #### ICMP Responder *icmp-responder* is a simple example showing how to answer APR and ICMP diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go index a74c5cf..d5a1563 100644 --- a/extras/libmemif/adapter.go +++ b/extras/libmemif/adapter.go @@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index) return md->tx_queues[index]; } -// Copy packet data into the selected buffer. +// Copy packet data into the selected buffer with splitting when necessary static void -govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size) +govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize) { - buffers[index].len = (size > buffers[index].len ? buffers[index].len : size); - memcpy(buffers[index].data, data, (size_t)buffers[index].len); + int dataOffset = 0; + + do { + buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize); + void * curData = (packetData + dataOffset); + memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len); + dataOffset += buffers[bufIndex].len; + bufIndex += 1; + packetSize -= buffers[bufIndex].len; + } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0); } // Get packet data from the selected buffer. @@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size) return buffers[index].data; } +// Checks if memif buffer is chained +static int +govpp_is_buffer_chained(memif_buffer_t *buffers, int index) +{ + return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT; +} + +// Allocate memif buffers and return pointer to next free buffer +static int +govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid, + memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf, + uint16_t count, uint16_t * count_out, uint16_t size) +{ + memif_buffer_t * offsetBufs = (bufs + offset); + int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size); + *count_out += offset; + *nextFreeBuf = offsetBufs; + return err; +} + */ import "C" @@ -345,6 +373,7 @@ type Memif struct { // Rx/Tx queues ringSize int // number of items in each ring + bufferSize int // max buffer size stopQPollFd int // event file descriptor used to stop pollRxQueue-s wg sync.WaitGroup // wait group for all pollRxQueue-s rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue @@ -388,6 +417,7 @@ type MemifQueueDetails struct { type CPacketBuffers struct { buffers *C.memif_buffer_t count int + rxChainBuf []RawPacketData } // Context is a global Go-libmemif runtime context. @@ -400,6 +430,11 @@ type Context struct { wg sync.WaitGroup /* wait-group for pollEvents() */ } +type txPacketBuffer struct { + packets []RawPacketData + size int +} + var ( // logger used by the adapter. log *logger.Logger @@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem log2RingSize = 10 } + bufferSize := config.BufferSize + if bufferSize <= 0 { + bufferSize = 2048 + } + // Create memif-wrapper for Go-libmemif. memif = &Memif{ - MemifMeta: config.MemifMeta, - callbacks: &MemifCallbacks{}, - ifIndex: context.nextMemifIndex, - ringSize: 1 << log2RingSize, + MemifMeta: config.MemifMeta, + callbacks: &MemifCallbacks{}, + ifIndex: context.nextMemifIndex, + ringSize: 1 << log2RingSize, + bufferSize: int(bufferSize), } // Initialize memif callbacks. @@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) { // Multiple TxBurst-s can run concurrently provided that each targets a different // TX queue. func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) { - var sentCount C.uint16_t - var allocated C.uint16_t - var bufSize int - if len(packets) == 0 { return 0, nil } @@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 return 0, ErrQueueID } - // The largest packet in the set determines the packet buffer size. + var bufCount int + var buffers []*txPacketBuffer + cQueueID := C.uint16_t(queueID) + for _, packet := range packets { - if len(packet) > int(bufSize) { - bufSize = len(packet) + packetLen := len(packet) + log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen) + + if packetLen > memif.bufferSize { + // Create jumbo buffer + buffer := &txPacketBuffer{ + size: packetLen, + packets: []RawPacketData{packet}, + } + + buffers = append(buffers, buffer) + + // Increment bufCount by number of splits in this jumbo + bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize + } else { + buffersLen := len(buffers) + + // This is very first buffer so there is no data to append to, prepare empty one + if buffersLen == 0 { + buffers = []*txPacketBuffer{{}} + buffersLen = 1 + } + + lastBuffer := buffers[buffersLen-1] + + // Last buffer is jumbo buffer, create new buffer + if lastBuffer.size > memif.bufferSize { + lastBuffer = &txPacketBuffer{} + buffers = append(buffers, lastBuffer) + } + + // Determine buffer size by max packet size in buffer + if packetLen > lastBuffer.size { + lastBuffer.size = packetLen + } + + lastBuffer.packets = append(lastBuffer.packets, packet) + bufCount += 1 } } // Reallocate Tx buffers if needed to fit the input packets. - pb := memif.txQueueBufs[queueID] - bufCount := len(packets) + log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount) + pb := &memif.txQueueBufs[queueID] if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) if newBuffers == nil { @@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 } // Allocate ring slots. - cQueueID := C.uint16_t(queueID) - errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), - &allocated, C.uint32_t(bufSize)) - err = getMemifError(int(errCode)) - if err == ErrNoBufRing { - // Not enough ring slots, will be less than bufCount. - err = nil - } - if err != nil { - return 0, err - } + var allocated C.uint16_t + var subCount C.uint16_t + for _, buffer := range buffers { + packetCount := C.uint16_t(len(buffer.packets)) + isJumbo := buffer.size > memif.bufferSize + + log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v", + cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo) + + var nextFreeBuff *C.memif_buffer_t + startOffset := allocated + errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff, + packetCount, &allocated, C.uint16_t(buffer.size)) + + err = getMemifError(int(errCode)) + endEarly := err == ErrNoBufRing + if endEarly { + // Not enough ring slots, will be less than packetCount. + err = nil + } + if err != nil { + return 0, err + } + + // Copy packet data into the buffers. + nowAllocated := allocated - startOffset + toFill := nowAllocated + if !isJumbo { + // If this is not jumbo frame, only 1 packet needs to be copied each iteration + toFill = 1 + } + + // Iterate over all packets and try to fill them into allocated buffers + // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left + for i, packet := range buffer.packets { + if i >= int(nowAllocated) { + // There was less allocated buffers than actual packet count so exit early + break + } + + packetData := unsafe.Pointer(&packet[0]) + C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet))) + } + + if isJumbo && nowAllocated > 0 { + // If we successfully allocated required amount of buffers for entire jumbo to be sent + // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think + // it only sent 1 packet so it does not need to know anything about jumbo frames + subCount += nowAllocated - 1 + } - // Copy packet data into the buffers. - for i := 0; i < int(allocated); i++ { - packetData := unsafe.Pointer(&packets[i][0]) - C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i]))) + // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try + // to handle it next burst + if endEarly { + break + } } - errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) + var sentCount C.uint16_t + errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) err = getMemifError(int(errCode)) if err != nil { return 0, err } - count = uint16(sentCount) - return count, nil + // Prevent negative values + realSent := uint16(sentCount) - uint16(subCount) + if subCount > sentCount { + sentCount = 0 + } + + log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated) + return realSent, nil } // RxBurst is used to receive multiple packets in one call from a selected queue. @@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat } // Reallocate Rx buffers if needed to fit the output packets. - pb := memif.rxQueueBufs[queueID] + pb := &memif.rxQueueBufs[queueID] bufCount := int(count) if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) @@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat return packets, err } + chained := len(pb.rxChainBuf) > 0 + if chained { + // We had stored data from previous burst because last buffer in previous burst was chained + // so we need to continue appending to this data + packets = pb.rxChainBuf + pb.rxChainBuf = nil + } + // Copy packet data into the instances of RawPacketData. for i := 0; i < int(recvCount); i++ { var packetSize C.int packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize) - packets = append(packets, C.GoBytes(packetData, packetSize)) + packetBytes := C.GoBytes(packetData, packetSize) + + if chained { + // We have chained buffers, so start merging packet data with last read packet + prevPacket := packets[len(packets)-1] + packets[len(packets)-1] = append(prevPacket, packetBytes...) + } else { + packets = append(packets, packetBytes) + } + + // Mark last buffer as chained based on property on current buffer so next buffers + // will try to append data to this one in case we got jumbo frame + chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0 } if recvCount > 0 { @@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat packets = nil } + if chained { + // We did not had enough space to process all chained buffers to the end so simply tell + // reader that it should not process any packets here and save them for next burst + // to finish reading the buffer chain + pb.rxChainBuf = packets + packets = nil + err = ErrNoBuf + } + return packets, err } diff --git a/extras/libmemif/examples/jumbo-frames/jumbo-frames.go b/extras/libmemif/examples/jumbo-frames/jumbo-frames.go new file mode 100644 index 0000000..1bc943f --- /dev/null +++ b/extras/libmemif/examples/jumbo-frames/jumbo-frames.go @@ -0,0 +1,176 @@ +// jumbo-frames is simple example how to send larger and larger jumbo packets with libmemif adapter. This is simple copy +// of raw-data but with sending larger packets, so for more information read its code and docs. +package main + +import ( + "fmt" + "os" + "os/signal" + "sync" + "time" + + "git.fd.io/govpp.git/extras/libmemif" +) + +const ( + Socket = "/tmp/jumbo-frames-example" + Secret = "secret" + ConnectionID = 1 + NumQueues uint8 = 3 +) + +var wg sync.WaitGroup +var stopCh chan struct{} + +func OnConnect(memif *libmemif.Memif) (err error) { + details, err := memif.GetDetails() + if err != nil { + fmt.Printf("libmemif.GetDetails() error: %v\n", err) + } + fmt.Printf("memif %s has been connected: %+v\n", memif.IfName, details) + + stopCh = make(chan struct{}) + var i uint8 + for i = 0; i < uint8(len(details.RxQueues)); i++ { + wg.Add(1) + go ReadAndPrintPackets(memif, i) + } + for i = 0; i < uint8(len(details.TxQueues)); i++ { + wg.Add(1) + go SendPackets(memif, i) + } + return nil +} + +func OnDisconnect(memif *libmemif.Memif) (err error) { + fmt.Printf("memif %s has been disconnected\n", memif.IfName) + close(stopCh) + wg.Wait() + return nil +} + +func ReadAndPrintPackets(memif *libmemif.Memif, queueID uint8) { + defer wg.Done() + + interruptCh, err := memif.GetQueueInterruptChan(queueID) + if err != nil { + switch err { + case libmemif.ErrQueueID: + fmt.Printf("libmemif.Memif.GetQueueInterruptChan() complains about invalid queue id!?") + default: + fmt.Printf("libmemif.Memif.GetQueueInterruptChan() error: %v\n", err) + } + return + } + + counter := 0 + for { + select { + case <-interruptCh: + counter++ + for { + packets, err := memif.RxBurst(queueID, 10) + if err != nil { + fmt.Printf("libmemif.Memif.RxBurst() error: %v\n", err) + } else { + if len(packets) == 0 { + break + } + for _, packet := range packets { + fmt.Printf("Received packet queue=%d: %v in burst %d\n", queueID, len(packet), counter) + } + } + } + case <-stopCh: + return + } + } +} + +func SendPackets(memif *libmemif.Memif, queueID uint8) { + defer wg.Done() + + counter := 0 + for { + select { + case <-time.After(3 * time.Second): + counter++ + packetMul := counter % 100 + 1 // Limit max iterations to 100 to not go out of bounds + packets := []libmemif.RawPacketData{ + make([]byte, 128*packetMul), + make([]byte, 256*packetMul), + make([]byte, 512*packetMul), + } + sent := 0 + for { + count, err := memif.TxBurst(queueID, packets[sent:]) + if err != nil { + fmt.Printf("libmemif.Memif.TxBurst() error: %v\n", err) + break + } else { + fmt.Printf("libmemif.Memif.TxBurst() has sent %d packets in burst %v.\n", count, counter) + sent += int(count) + if sent == len(packets) { + break + } + } + } + case <-stopCh: + return + } + } +} + +func main() { + var isMaster = true + var appSuffix string + if len(os.Args) > 1 && (os.Args[1] == "--slave" || os.Args[1] == "-slave") { + isMaster = false + appSuffix = "-slave" + } + + appName := "jumbo-frames" + appSuffix + fmt.Println("Initializing libmemif as ", appName) + err := libmemif.Init(appName) + if err != nil { + fmt.Printf("libmemif.Init() error: %v\n", err) + return + } + defer libmemif.Cleanup() + + memifCallbacks := &libmemif.MemifCallbacks{ + OnConnect: OnConnect, + OnDisconnect: OnDisconnect, + } + + memifConfig := &libmemif.MemifConfig{ + MemifMeta: libmemif.MemifMeta{ + IfName: "memif1", + ConnID: ConnectionID, + SocketFilename: Socket, + Secret: Secret, + IsMaster: isMaster, + Mode: libmemif.IfModeEthernet, + }, + MemifShmSpecs: libmemif.MemifShmSpecs{ + NumRxQueues: NumQueues, + NumTxQueues: NumQueues, + BufferSize: 2048, + Log2RingSize: 10, + }, + } + + fmt.Printf("Callbacks: %+v\n", memifCallbacks) + fmt.Printf("Config: %+v\n", memifConfig) + + memif, err := libmemif.CreateInterface(memifConfig, memifCallbacks) + if err != nil { + fmt.Printf("libmemif.CreateInterface() error: %v\n", err) + return + } + defer memif.Close() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + <-sigChan +} -- cgit 1.2.3-korg