diff options
Diffstat (limited to 'extras/libmemif/adapter.go')
-rw-r--r-- | extras/libmemif/adapter.go | 226 |
1 files changed, 189 insertions, 37 deletions
diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go index a74c5cf..d5a1563 100644 --- a/extras/libmemif/adapter.go +++ b/extras/libmemif/adapter.go @@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index) return md->tx_queues[index]; } -// Copy packet data into the selected buffer. +// Copy packet data into the selected buffer with splitting when necessary static void -govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size) +govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize) { - buffers[index].len = (size > buffers[index].len ? buffers[index].len : size); - memcpy(buffers[index].data, data, (size_t)buffers[index].len); + int dataOffset = 0; + + do { + buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize); + void * curData = (packetData + dataOffset); + memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len); + dataOffset += buffers[bufIndex].len; + bufIndex += 1; + packetSize -= buffers[bufIndex].len; + } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0); } // Get packet data from the selected buffer. @@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size) return buffers[index].data; } +// Checks if memif buffer is chained +static int +govpp_is_buffer_chained(memif_buffer_t *buffers, int index) +{ + return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT; +} + +// Allocate memif buffers and return pointer to next free buffer +static int +govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid, + memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf, + uint16_t count, uint16_t * count_out, uint16_t size) +{ + memif_buffer_t * offsetBufs = (bufs + offset); + int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size); + *count_out += offset; + *nextFreeBuf = offsetBufs; + return err; +} + */ import "C" @@ -345,6 +373,7 @@ type Memif struct { // Rx/Tx queues ringSize int // number of items in each ring + bufferSize int // max buffer size stopQPollFd int // event file descriptor used to stop pollRxQueue-s wg sync.WaitGroup // wait group for all pollRxQueue-s rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue @@ -388,6 +417,7 @@ type MemifQueueDetails struct { type CPacketBuffers struct { buffers *C.memif_buffer_t count int + rxChainBuf []RawPacketData } // Context is a global Go-libmemif runtime context. @@ -400,6 +430,11 @@ type Context struct { wg sync.WaitGroup /* wait-group for pollEvents() */ } +type txPacketBuffer struct { + packets []RawPacketData + size int +} + var ( // logger used by the adapter. log *logger.Logger @@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem log2RingSize = 10 } + bufferSize := config.BufferSize + if bufferSize <= 0 { + bufferSize = 2048 + } + // Create memif-wrapper for Go-libmemif. memif = &Memif{ - MemifMeta: config.MemifMeta, - callbacks: &MemifCallbacks{}, - ifIndex: context.nextMemifIndex, - ringSize: 1 << log2RingSize, + MemifMeta: config.MemifMeta, + callbacks: &MemifCallbacks{}, + ifIndex: context.nextMemifIndex, + ringSize: 1 << log2RingSize, + bufferSize: int(bufferSize), } // Initialize memif callbacks. @@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) { // Multiple TxBurst-s can run concurrently provided that each targets a different // TX queue. func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) { - var sentCount C.uint16_t - var allocated C.uint16_t - var bufSize int - if len(packets) == 0 { return 0, nil } @@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 return 0, ErrQueueID } - // The largest packet in the set determines the packet buffer size. + var bufCount int + var buffers []*txPacketBuffer + cQueueID := C.uint16_t(queueID) + for _, packet := range packets { - if len(packet) > int(bufSize) { - bufSize = len(packet) + packetLen := len(packet) + log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen) + + if packetLen > memif.bufferSize { + // Create jumbo buffer + buffer := &txPacketBuffer{ + size: packetLen, + packets: []RawPacketData{packet}, + } + + buffers = append(buffers, buffer) + + // Increment bufCount by number of splits in this jumbo + bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize + } else { + buffersLen := len(buffers) + + // This is very first buffer so there is no data to append to, prepare empty one + if buffersLen == 0 { + buffers = []*txPacketBuffer{{}} + buffersLen = 1 + } + + lastBuffer := buffers[buffersLen-1] + + // Last buffer is jumbo buffer, create new buffer + if lastBuffer.size > memif.bufferSize { + lastBuffer = &txPacketBuffer{} + buffers = append(buffers, lastBuffer) + } + + // Determine buffer size by max packet size in buffer + if packetLen > lastBuffer.size { + lastBuffer.size = packetLen + } + + lastBuffer.packets = append(lastBuffer.packets, packet) + bufCount += 1 } } // Reallocate Tx buffers if needed to fit the input packets. - pb := memif.txQueueBufs[queueID] - bufCount := len(packets) + log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount) + pb := &memif.txQueueBufs[queueID] if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) if newBuffers == nil { @@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 } // Allocate ring slots. - cQueueID := C.uint16_t(queueID) - errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), - &allocated, C.uint32_t(bufSize)) - err = getMemifError(int(errCode)) - if err == ErrNoBufRing { - // Not enough ring slots, <count> will be less than bufCount. - err = nil - } - if err != nil { - return 0, err - } + var allocated C.uint16_t + var subCount C.uint16_t + for _, buffer := range buffers { + packetCount := C.uint16_t(len(buffer.packets)) + isJumbo := buffer.size > memif.bufferSize + + log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v", + cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo) + + var nextFreeBuff *C.memif_buffer_t + startOffset := allocated + errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff, + packetCount, &allocated, C.uint16_t(buffer.size)) + + err = getMemifError(int(errCode)) + endEarly := err == ErrNoBufRing + if endEarly { + // Not enough ring slots, <count> will be less than packetCount. + err = nil + } + if err != nil { + return 0, err + } + + // Copy packet data into the buffers. + nowAllocated := allocated - startOffset + toFill := nowAllocated + if !isJumbo { + // If this is not jumbo frame, only 1 packet needs to be copied each iteration + toFill = 1 + } + + // Iterate over all packets and try to fill them into allocated buffers + // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left + for i, packet := range buffer.packets { + if i >= int(nowAllocated) { + // There was less allocated buffers than actual packet count so exit early + break + } + + packetData := unsafe.Pointer(&packet[0]) + C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet))) + } + + if isJumbo && nowAllocated > 0 { + // If we successfully allocated required amount of buffers for entire jumbo to be sent + // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think + // it only sent 1 packet so it does not need to know anything about jumbo frames + subCount += nowAllocated - 1 + } - // Copy packet data into the buffers. - for i := 0; i < int(allocated); i++ { - packetData := unsafe.Pointer(&packets[i][0]) - C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i]))) + // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try + // to handle it next burst + if endEarly { + break + } } - errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) + var sentCount C.uint16_t + errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) err = getMemifError(int(errCode)) if err != nil { return 0, err } - count = uint16(sentCount) - return count, nil + // Prevent negative values + realSent := uint16(sentCount) - uint16(subCount) + if subCount > sentCount { + sentCount = 0 + } + + log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated) + return realSent, nil } // RxBurst is used to receive multiple packets in one call from a selected queue. @@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat } // Reallocate Rx buffers if needed to fit the output packets. - pb := memif.rxQueueBufs[queueID] + pb := &memif.rxQueueBufs[queueID] bufCount := int(count) if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) @@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat return packets, err } + chained := len(pb.rxChainBuf) > 0 + if chained { + // We had stored data from previous burst because last buffer in previous burst was chained + // so we need to continue appending to this data + packets = pb.rxChainBuf + pb.rxChainBuf = nil + } + // Copy packet data into the instances of RawPacketData. for i := 0; i < int(recvCount); i++ { var packetSize C.int packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize) - packets = append(packets, C.GoBytes(packetData, packetSize)) + packetBytes := C.GoBytes(packetData, packetSize) + + if chained { + // We have chained buffers, so start merging packet data with last read packet + prevPacket := packets[len(packets)-1] + packets[len(packets)-1] = append(prevPacket, packetBytes...) + } else { + packets = append(packets, packetBytes) + } + + // Mark last buffer as chained based on property on current buffer so next buffers + // will try to append data to this one in case we got jumbo frame + chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0 } if recvCount > 0 { @@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat packets = nil } + if chained { + // We did not had enough space to process all chained buffers to the end so simply tell + // reader that it should not process any packets here and save them for next burst + // to finish reading the buffer chain + pb.rxChainBuf = packets + packets = nil + err = ErrNoBuf + } + return packets, err } |