aboutsummaryrefslogtreecommitdiffstats
path: root/extras/libmemif/adapter.go
diff options
context:
space:
mode:
Diffstat (limited to 'extras/libmemif/adapter.go')
-rw-r--r--extras/libmemif/adapter.go226
1 files changed, 189 insertions, 37 deletions
diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go
index a74c5cf..d5a1563 100644
--- a/extras/libmemif/adapter.go
+++ b/extras/libmemif/adapter.go
@@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
return md->tx_queues[index];
}
-// Copy packet data into the selected buffer.
+// Copy packet data into the selected buffer with splitting when necessary
static void
-govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size)
+govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
{
- buffers[index].len = (size > buffers[index].len ? buffers[index].len : size);
- memcpy(buffers[index].data, data, (size_t)buffers[index].len);
+ int dataOffset = 0;
+
+ do {
+ buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
+ void * curData = (packetData + dataOffset);
+ memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
+ dataOffset += buffers[bufIndex].len;
+ bufIndex += 1;
+ packetSize -= buffers[bufIndex].len;
+ } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
}
// Get packet data from the selected buffer.
@@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
return buffers[index].data;
}
+// Checks if memif buffer is chained
+static int
+govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
+{
+ return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
+}
+
+// Allocate memif buffers and return pointer to next free buffer
+static int
+govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
+ memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
+ uint16_t count, uint16_t * count_out, uint16_t size)
+{
+ memif_buffer_t * offsetBufs = (bufs + offset);
+ int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
+ *count_out += offset;
+ *nextFreeBuf = offsetBufs;
+ return err;
+}
+
*/
import "C"
@@ -345,6 +373,7 @@ type Memif struct {
// Rx/Tx queues
ringSize int // number of items in each ring
+ bufferSize int // max buffer size
stopQPollFd int // event file descriptor used to stop pollRxQueue-s
wg sync.WaitGroup // wait group for all pollRxQueue-s
rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
@@ -388,6 +417,7 @@ type MemifQueueDetails struct {
type CPacketBuffers struct {
buffers *C.memif_buffer_t
count int
+ rxChainBuf []RawPacketData
}
// Context is a global Go-libmemif runtime context.
@@ -400,6 +430,11 @@ type Context struct {
wg sync.WaitGroup /* wait-group for pollEvents() */
}
+type txPacketBuffer struct {
+ packets []RawPacketData
+ size int
+}
+
var (
// logger used by the adapter.
log *logger.Logger
@@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
log2RingSize = 10
}
+ bufferSize := config.BufferSize
+ if bufferSize <= 0 {
+ bufferSize = 2048
+ }
+
// Create memif-wrapper for Go-libmemif.
memif = &Memif{
- MemifMeta: config.MemifMeta,
- callbacks: &MemifCallbacks{},
- ifIndex: context.nextMemifIndex,
- ringSize: 1 << log2RingSize,
+ MemifMeta: config.MemifMeta,
+ callbacks: &MemifCallbacks{},
+ ifIndex: context.nextMemifIndex,
+ ringSize: 1 << log2RingSize,
+ bufferSize: int(bufferSize),
}
// Initialize memif callbacks.
@@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
// Multiple TxBurst-s can run concurrently provided that each targets a different
// TX queue.
func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
- var sentCount C.uint16_t
- var allocated C.uint16_t
- var bufSize int
-
if len(packets) == 0 {
return 0, nil
}
@@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
return 0, ErrQueueID
}
- // The largest packet in the set determines the packet buffer size.
+ var bufCount int
+ var buffers []*txPacketBuffer
+ cQueueID := C.uint16_t(queueID)
+
for _, packet := range packets {
- if len(packet) > int(bufSize) {
- bufSize = len(packet)
+ packetLen := len(packet)
+ log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
+
+ if packetLen > memif.bufferSize {
+ // Create jumbo buffer
+ buffer := &txPacketBuffer{
+ size: packetLen,
+ packets: []RawPacketData{packet},
+ }
+
+ buffers = append(buffers, buffer)
+
+ // Increment bufCount by number of splits in this jumbo
+ bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
+ } else {
+ buffersLen := len(buffers)
+
+ // This is very first buffer so there is no data to append to, prepare empty one
+ if buffersLen == 0 {
+ buffers = []*txPacketBuffer{{}}
+ buffersLen = 1
+ }
+
+ lastBuffer := buffers[buffersLen-1]
+
+ // Last buffer is jumbo buffer, create new buffer
+ if lastBuffer.size > memif.bufferSize {
+ lastBuffer = &txPacketBuffer{}
+ buffers = append(buffers, lastBuffer)
+ }
+
+ // Determine buffer size by max packet size in buffer
+ if packetLen > lastBuffer.size {
+ lastBuffer.size = packetLen
+ }
+
+ lastBuffer.packets = append(lastBuffer.packets, packet)
+ bufCount += 1
}
}
// Reallocate Tx buffers if needed to fit the input packets.
- pb := memif.txQueueBufs[queueID]
- bufCount := len(packets)
+ log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
+ pb := &memif.txQueueBufs[queueID]
if pb.count < bufCount {
newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
if newBuffers == nil {
@@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
}
// Allocate ring slots.
- cQueueID := C.uint16_t(queueID)
- errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount),
- &allocated, C.uint32_t(bufSize))
- err = getMemifError(int(errCode))
- if err == ErrNoBufRing {
- // Not enough ring slots, <count> will be less than bufCount.
- err = nil
- }
- if err != nil {
- return 0, err
- }
+ var allocated C.uint16_t
+ var subCount C.uint16_t
+ for _, buffer := range buffers {
+ packetCount := C.uint16_t(len(buffer.packets))
+ isJumbo := buffer.size > memif.bufferSize
+
+ log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
+ cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
+
+ var nextFreeBuff *C.memif_buffer_t
+ startOffset := allocated
+ errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
+ packetCount, &allocated, C.uint16_t(buffer.size))
+
+ err = getMemifError(int(errCode))
+ endEarly := err == ErrNoBufRing
+ if endEarly {
+ // Not enough ring slots, <count> will be less than packetCount.
+ err = nil
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ // Copy packet data into the buffers.
+ nowAllocated := allocated - startOffset
+ toFill := nowAllocated
+ if !isJumbo {
+ // If this is not jumbo frame, only 1 packet needs to be copied each iteration
+ toFill = 1
+ }
+
+ // Iterate over all packets and try to fill them into allocated buffers
+ // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
+ for i, packet := range buffer.packets {
+ if i >= int(nowAllocated) {
+ // There was less allocated buffers than actual packet count so exit early
+ break
+ }
+
+ packetData := unsafe.Pointer(&packet[0])
+ C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
+ }
+
+ if isJumbo && nowAllocated > 0 {
+ // If we successfully allocated required amount of buffers for entire jumbo to be sent
+ // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
+ // it only sent 1 packet so it does not need to know anything about jumbo frames
+ subCount += nowAllocated - 1
+ }
- // Copy packet data into the buffers.
- for i := 0; i < int(allocated); i++ {
- packetData := unsafe.Pointer(&packets[i][0])
- C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i])))
+ // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
+ // to handle it next burst
+ if endEarly {
+ break
+ }
}
- errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
+ var sentCount C.uint16_t
+ errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
err = getMemifError(int(errCode))
if err != nil {
return 0, err
}
- count = uint16(sentCount)
- return count, nil
+ // Prevent negative values
+ realSent := uint16(sentCount) - uint16(subCount)
+ if subCount > sentCount {
+ sentCount = 0
+ }
+
+ log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
+ return realSent, nil
}
// RxBurst is used to receive multiple packets in one call from a selected queue.
@@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
}
// Reallocate Rx buffers if needed to fit the output packets.
- pb := memif.rxQueueBufs[queueID]
+ pb := &memif.rxQueueBufs[queueID]
bufCount := int(count)
if pb.count < bufCount {
newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
@@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
return packets, err
}
+ chained := len(pb.rxChainBuf) > 0
+ if chained {
+ // We had stored data from previous burst because last buffer in previous burst was chained
+ // so we need to continue appending to this data
+ packets = pb.rxChainBuf
+ pb.rxChainBuf = nil
+ }
+
// Copy packet data into the instances of RawPacketData.
for i := 0; i < int(recvCount); i++ {
var packetSize C.int
packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
- packets = append(packets, C.GoBytes(packetData, packetSize))
+ packetBytes := C.GoBytes(packetData, packetSize)
+
+ if chained {
+ // We have chained buffers, so start merging packet data with last read packet
+ prevPacket := packets[len(packets)-1]
+ packets[len(packets)-1] = append(prevPacket, packetBytes...)
+ } else {
+ packets = append(packets, packetBytes)
+ }
+
+ // Mark last buffer as chained based on property on current buffer so next buffers
+ // will try to append data to this one in case we got jumbo frame
+ chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
}
if recvCount > 0 {
@@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
packets = nil
}
+ if chained {
+ // We did not had enough space to process all chained buffers to the end so simply tell
+ // reader that it should not process any packets here and save them for next burst
+ // to finish reading the buffer chain
+ pb.rxChainBuf = packets
+ packets = nil
+ err = ErrNoBuf
+ }
+
return packets, err
}