aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core/memif_connector.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/core/memif_connector.cc')
-rw-r--r--libtransport/src/core/memif_connector.cc525
1 files changed, 265 insertions, 260 deletions
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 553aab42a..a224beb11 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,18 +13,16 @@
* limitations under the License.
*/
+#include <core/errors.h>
#include <core/memif_connector.h>
+#include <glog/logging.h>
#include <hicn/transport/errors/not_implemented_exception.h>
-
-#ifdef __vpp__
-
#include <sys/epoll.h>
#include <cstdlib>
-extern "C" {
-#include <memif/libmemif.h>
-};
+/* sstrncpy */
+#include <hicn/util/sstrncpy.h>
#define CANCEL_TIMER 1
@@ -32,190 +30,174 @@ namespace transport {
namespace core {
-struct memif_connection {
- uint16_t index;
- /* memif conenction handle */
- memif_conn_handle_t conn;
- /* transmit queue id */
- uint16_t tx_qid;
- /* tx buffers */
- memif_buffer_t *tx_bufs;
- /* allocated tx buffers counter */
- /* number of tx buffers pointing to shared memory */
- uint16_t tx_buf_num;
- /* rx buffers */
- memif_buffer_t *rx_bufs;
- /* allcoated rx buffers counter */
- /* number of rx buffers pointing to shared memory */
- uint16_t rx_buf_num;
- /* interface ip address */
- uint8_t ip_addr[4];
-};
-
-std::once_flag MemifConnector::flag_;
-utils::EpollEventReactor MemifConnector::main_event_reactor_;
-
MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
- OnReconnect &&on_reconnect_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
asio::io_service &io_service,
std::string app_name)
- : Connector(std::move(receive_callback), std::move(on_reconnect_callback)),
- memif_worker_(nullptr),
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ event_reactor_(),
+ memif_worker_(std::bind(&MemifConnector::threadMain, this)),
timer_set_(false),
- send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
- disconnect_timer_(
- std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
+ send_timer_(event_reactor_),
+ disconnect_timer_(event_reactor_),
io_service_(io_service),
- packet_counter_(0),
- memif_connection_(std::make_unique<memif_connection_t>()),
+ work_(asio::make_work_guard(io_service_)),
+ memif_connection_({0}),
tx_buf_counter_(0),
is_reconnection_(false),
data_available_(false),
app_name_(app_name),
- socket_filename_("") {
- std::call_once(MemifConnector::flag_, &MemifConnector::init, this);
-}
-
-MemifConnector::~MemifConnector() { close(); }
-
-void MemifConnector::init() {
- /* initialize memory interface */
- int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()),
- nullptr, nullptr, nullptr);
-
- if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_init: %s", memif_strerror(err));
+ socket_filename_(""),
+ buffer_size_(kbuf_size),
+ log2_ring_size_(klog2_ring_size),
+ max_memif_bufs_(1 << klog2_ring_size) {}
+
+MemifConnector::~MemifConnector() {
+ try {
+ close();
+ } catch (errors::RuntimeException &e) {
+ // do nothing
}
}
-void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
- state_ = ConnectorState::CONNECTING;
+void MemifConnector::connect(uint32_t memif_id, long memif_mode,
+ const std::string &socket_filename,
+ std::size_t buffer_size,
+ std::size_t log2_ring_size) {
+ state_ = State::CONNECTING;
memif_id_ = memif_id;
- socket_filename_ = "/run/vpp/memif.sock";
-
- createMemif(memif_id, memif_mode, nullptr);
+ socket_filename_ = socket_filename;
+ buffer_size_ = buffer_size;
+ log2_ring_size_ = log2_ring_size;
+ max_memif_bufs_ = 1 << log2_ring_size;
+ createMemif(memif_id, memif_mode);
+}
- work_ = std::make_unique<asio::io_service::work>(io_service_);
+int MemifConnector::createMemif(uint32_t index, uint8_t is_master) {
+ int err = MEMIF_ERR_SUCCESS;
- while (state_ != ConnectorState::CONNECTED) {
- MemifConnector::main_event_reactor_.runOneEvent();
- }
+ memif_socket_args_t socket_args;
+ memif_conn_args_t args;
+ memset(&socket_args, 0, sizeof(memif_socket_args_t));
+ memset(&args, 0, sizeof(memif_conn_args_t));
- int err;
+ // Setup memif socket first
- /* get interrupt queue id */
- int fd = -1;
- err = memif_get_queue_efd(memif_connection_->conn, 0, &fd);
- if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_get_queue_efd: %s", memif_strerror(err));
- return;
+ int rc = strcpy_s(socket_args.path, sizeof(socket_args.path) - 1,
+ socket_filename_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided socket path is larger than " +
+ std::to_string(sizeof(socket_args.path)) + " bytes.";
+ throw errors::RuntimeException(error);
}
- // Remove fd from main epoll
- main_event_reactor_.delFileDescriptor(fd);
-
- // Add fd to epoll of instance
- event_reactor_.addFileDescriptor(
- fd, EPOLLIN, [this](const utils::Event &evt) -> int {
- return onInterrupt(memif_connection_->conn, this, 0);
- });
+ rc = strcpy_s(socket_args.app_name, sizeof(socket_args.app_name) - 1,
+ app_name_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided app_name is larger than " +
+ std::to_string(sizeof(socket_args.app_name)) +
+ " bytes.";
+ throw errors::RuntimeException(error);
+ }
- memif_worker_ = std::make_unique<std::thread>(
- std::bind(&MemifConnector::threadMain, this));
-}
+ socket_args.on_control_fd_update = controlFdUpdate;
+ socket_args.alloc = nullptr;
+ socket_args.realloc = nullptr;
+ socket_args.free = nullptr;
-int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) {
- memif_connection_t *c = memif_connection_.get();
+ err = memif_create_socket(&args.socket, &socket_args, this);
- /* setting memif connection arguments */
- memif_conn_args_t args;
- memset(&args, 0, sizeof(args));
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
- args.is_master = mode;
- args.log2_ring_size = MEMIF_LOG2_RING_SIZE;
- args.buffer_size = MEMIF_BUF_SIZE;
+ // Setup memif connection using provided memif_socket_handle_t
+ args.is_master = is_master;
+ args.log2_ring_size = log2_ring_size_;
+ args.buffer_size = buffer_size_;
args.num_s2m_rings = 1;
args.num_m2s_rings = 1;
- strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1);
+ strcpy_s((char *)args.interface_name, sizeof(args.interface_name), IF_NAME);
args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
-
- int err;
-
- err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr);
+ args.interface_id = index;
+ err = memif_create(&memif_connection_.conn, &args, onConnect, onDisconnect,
+ onInterrupt, this);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
throw errors::RuntimeException(memif_strerror(err));
}
- args.interface_id = index;
- /* last argument for memif_create (void * private_ctx) is used by user
- to identify connection. this context is returned with callbacks */
-
- /* default interrupt */
- if (s == nullptr) {
- err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt,
- this);
-
- if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- throw errors::RuntimeException(memif_strerror(err));
- }
- }
-
- c->index = (uint16_t)index;
- c->tx_qid = 0;
+ memif_connection_.index = (uint16_t)index;
+ memif_connection_.tx_qid = 0;
/* alloc memif buffers */
- c->rx_buf_num = 0;
- c->rx_bufs = static_cast<memif_buffer_t *>(
- malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
- c->tx_buf_num = 0;
- c->tx_bufs = static_cast<memif_buffer_t *>(
- malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS));
-
- // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0);
+ memif_connection_.rx_buf_num = 0;
+ memif_connection_.rx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
+ memif_connection_.tx_buf_num = 0;
+ memif_connection_.tx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
return 0;
}
int MemifConnector::deleteMemif() {
- memif_connection_t *c = memif_connection_.get();
-
- if (c->rx_bufs) {
- free(c->rx_bufs);
+ if (memif_connection_.rx_bufs) {
+ free(memif_connection_.rx_bufs);
}
- c->rx_bufs = nullptr;
- c->rx_buf_num = 0;
+ memif_connection_.rx_bufs = nullptr;
+ memif_connection_.rx_buf_num = 0;
- if (c->tx_bufs) {
- free(c->tx_bufs);
+ if (memif_connection_.tx_bufs) {
+ free(memif_connection_.tx_bufs);
}
- c->tx_bufs = nullptr;
- c->tx_buf_num = 0;
+ memif_connection_.tx_bufs = nullptr;
+ memif_connection_.tx_buf_num = 0;
int err;
/* disconenct then delete memif connection */
- err = memif_delete(&c->conn);
+ err = memif_delete(&memif_connection_.conn);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_delete: %s", memif_strerror(err));
+ LOG(ERROR) << "memif_delete: " << memif_strerror(err);
}
- if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) {
- TRANSPORT_LOGE("memif delete fail");
+ if (TRANSPORT_EXPECT_FALSE(memif_connection_.conn != nullptr)) {
+ LOG(ERROR) << "memif delete fail";
}
+ state_ = State::CLOSED;
+
return 0;
}
-int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) {
+int MemifConnector::controlFdUpdate(memif_fd_event_t fde, void *private_ctx) {
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ uint32_t evt = 0;
+
/* convert memif event definitions to epoll events */
+ auto events = fde.type;
+ auto fd = fde.fd;
+
+ if (events & MEMIF_FD_EVENT_ERROR) {
+ LOG(ERROR) << "memif fd event: Error";
+ return -1;
+ }
+
if (events & MEMIF_FD_EVENT_DEL) {
- return MemifConnector::main_event_reactor_.delFileDescriptor(fd);
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: DEL fd " << fd;
+ return self->event_reactor_.delFileDescriptor(fd);
}
- uint32_t evt = 0;
+ if (events & MEMIF_FD_EVENT_MOD) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: MOD fd " << fd;
+ return self->event_reactor_.modFileDescriptor(fd, evt);
+ }
if (events & MEMIF_FD_EVENT_READ) {
evt |= EPOLLIN;
@@ -225,13 +207,10 @@ int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) {
evt |= EPOLLOUT;
}
- if (events & MEMIF_FD_EVENT_MOD) {
- return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt);
- }
-
- return MemifConnector::main_event_reactor_.addFileDescriptor(
- fd, evt, [](const utils::Event &evt) -> int {
- uint32_t event = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: ADD fd " << fd;
+ return self->event_reactor_.addFileDescriptor(
+ fd, evt, [fde](const utils::Event &evt) -> int {
+ int event = 0;
int memif_err = 0;
if (evt.events & EPOLLIN) {
@@ -246,82 +225,86 @@ int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) {
event |= MEMIF_FD_EVENT_ERROR;
}
- memif_err = memif_control_fd_handler(evt.data.fd, event);
+ memif_err = memif_control_fd_handler(fde.private_ctx,
+ memif_fd_event_type_t(event));
if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_control_fd_handler: %s",
- memif_strerror(memif_err));
+ LOG(ERROR) << "memif_control_fd_handler: "
+ << memif_strerror(memif_err);
}
return 0;
});
}
-int MemifConnector::bufferAlloc(long n, uint16_t qid) {
- memif_connection_t *c = memif_connection_.get();
+uint16_t MemifConnector::bufferAlloc(long n, uint16_t qid,
+ std::error_code &ec) {
int err;
- uint16_t r;
+ uint16_t r = 0;
/* set data pointer to shared memory and set buffer_len to shared mmeory
* buffer len */
- err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000);
+ err = memif_buffer_alloc(memif_connection_.conn, qid,
+ memif_connection_.tx_bufs, n, &r, buffer_size_);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_buffer_alloc: %s", memif_strerror(err));
+ ec = make_error_code(core_error::send_buffer_allocation_failed);
}
- c->tx_buf_num += r;
+ memif_connection_.tx_buf_num += r;
return r;
}
-int MemifConnector::txBurst(uint16_t qid) {
- memif_connection_t *c = memif_connection_.get();
- int err;
- uint16_t r;
+uint16_t MemifConnector::txBurst(uint16_t qid, std::error_code &ec) {
+ int err = MEMIF_ERR_SUCCESS;
+ ec = make_error_code(core_error::success);
+ uint16_t tx = 0;
+
/* inform peer memif interface about data in shared memory buffers */
/* mark memif buffers as free */
- err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r);
+ err = memif_tx_burst(memif_connection_.conn, qid, memif_connection_.tx_bufs,
+ memif_connection_.tx_buf_num, &tx);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
+ ec = make_error_code(core_error::send_failed);
}
- // err = memif_refill_queue(c->conn, qid, r, 0);
+ memif_connection_.tx_buf_num -= tx;
+ return tx;
+}
- if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err));
- c->tx_buf_num -= r;
- return -1;
+void MemifConnector::scheduleSend(std::uint64_t delay) {
+ if (!timer_set_) {
+ timer_set_ = true;
+ send_timer_.expiresFromNow(std::chrono::microseconds(delay));
+ send_timer_.asyncWait(
+ std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
}
-
- c->tx_buf_num -= r;
- return 0;
}
void MemifConnector::sendCallback(const std::error_code &ec) {
timer_set_ = false;
- if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) {
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) {
doSend();
}
}
-void MemifConnector::processInputBuffer(std::uint16_t total_packets) {
- Packet::MemBufPtr ptr;
-
- for (; total_packets > 0; total_packets--) {
- if (input_buffer_.pop(ptr)) {
- receive_callback_(std::move(ptr));
- }
- }
-}
-
/* informs user about connected status. private_ctx is used by user to identify
connection (multiple connections WIP) */
int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
- MemifConnector *connector = (MemifConnector *)private_ctx;
- connector->state_ = ConnectorState::CONNECTED;
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ self->state_ = State::CONNECTED;
memif_refill_queue(conn, 0, -1, 0);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Memif " << self->app_name_ << " connected";
+
+ // We are connected. Notify higher layers.
+ self->io_service_.post([self]() {
+ self->on_reconnect_callback_(self, make_error_code(core_error::success));
+ });
+
+ self->doSend();
+
return 0;
}
@@ -329,54 +312,51 @@ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
identify connection (multiple connections WIP) */
int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
MemifConnector *connector = (MemifConnector *)private_ctx;
- connector->state_ = ConnectorState::CLOSED;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Memif " << connector->app_name_ << " disconnected";
return 0;
}
-void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); }
+void MemifConnector::threadMain() { event_reactor_.runEventLoop(200); }
int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
uint16_t qid) {
MemifConnector *connector = (MemifConnector *)private_ctx;
- memif_connection_t *c = connector->memif_connection_.get();
+ Details &c = connector->memif_connection_;
+ std::weak_ptr<MemifConnector> self = connector->shared_from_this();
+ std::vector<::utils::MemBuf::Ptr> v;
+ std::error_code ec = make_error_code(core_error::success);
+
int err = MEMIF_ERR_SUCCESS, ret_val;
- uint16_t total_packets = 0;
- uint16_t rx;
+ uint16_t rx = 0;
do {
- err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx);
+ err = memif_rx_burst(conn, qid, c.rx_bufs, max_burst, &rx);
ret_val = err;
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS &&
err != MEMIF_ERR_NOBUF)) {
- TRANSPORT_LOGE("memif_rx_burst: %s", memif_strerror(err));
+ ec = make_error_code(core_error::receive_failed);
+ LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err);
goto error;
}
- c->rx_buf_num += rx;
+ c.rx_buf_num += rx;
if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
- TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
+ LOG(ERROR) << "socket stopped: ignoring " << rx << " packets";
goto error;
}
std::size_t packet_length;
+ v.reserve(rx);
for (int i = 0; i < rx; i++) {
- auto packet = connector->getPacket();
- packet_length = (c->rx_bufs + i)->len;
- std::memcpy(packet->writableData(),
- reinterpret_cast<const uint8_t *>((c->rx_bufs + i)->data),
- packet_length);
- packet->append(packet_length);
-
- if (!connector->input_buffer_.push(std::move(packet))) {
- TRANSPORT_LOGE("Error pushing packet. Ring buffer full.");
-
- // TODO Here we should consider the possibility to signal the congestion
- // to the application, that would react properly (e.g. slow down
- // message)
- }
+ auto buffer = connector->getRawBuffer();
+ packet_length = (c.rx_bufs + i)->len;
+ std::memcpy(buffer.first, (c.rx_bufs + i)->data, packet_length);
+ auto packet = connector->getPacketFromBuffer(buffer.first, packet_length);
+ v.emplace_back(std::move(packet));
}
/* mark memif buffers and shared memory buffers as free */
@@ -385,114 +365,139 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
err = memif_refill_queue(conn, qid, rx, 0);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
}
- c->rx_buf_num -= rx;
- total_packets += rx;
+ c.rx_buf_num -= rx;
} while (ret_val == MEMIF_ERR_NOBUF);
- connector->io_service_.post(
- std::bind(&MemifConnector::processInputBuffer, connector, total_packets));
+ connector->io_service_.post([self, buffers = std::move(v)]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), buffers,
+ std::make_error_code(std::errc(0)));
+ }
+ });
return 0;
error:
- err = memif_refill_queue(c->conn, qid, rx, 0);
+ err = memif_refill_queue(c.conn, qid, rx, 0);
if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
- TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err));
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
}
- c->rx_buf_num -= rx;
+ c.rx_buf_num -= rx;
+
+ connector->io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), {}, ec);
+ }
+ });
return 0;
}
void MemifConnector::close() {
- if (state_ != ConnectorState::CLOSED) {
- disconnect_timer_->expiresFromNow(std::chrono::microseconds(50));
- disconnect_timer_->asyncWait([this](const std::error_code &ec) {
+ if (state_ != State::CLOSED) {
+ disconnect_timer_.expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_.asyncWait([this](const std::error_code &ec) {
deleteMemif();
event_reactor_.stop();
- work_.reset();
});
+ }
- if (memif_worker_ && memif_worker_->joinable()) {
- memif_worker_->join();
- }
+ if (memif_worker_.joinable()) {
+ memif_worker_.join();
}
}
-void MemifConnector::send(const Packet::MemBufPtr &packet) {
+void MemifConnector::send(Packet &packet) { send(packet.shared_from_this()); }
+
+void MemifConnector::send(const utils::MemBuf::Ptr &buffer) {
{
utils::SpinLock::Acquire locked(write_msgs_lock_);
- output_buffer_.push_back(packet);
+ output_buffer_.push_back(buffer);
}
#if CANCEL_TIMER
- if (!timer_set_) {
- timer_set_ = true;
- send_timer_->expiresFromNow(std::chrono::microseconds(50));
- send_timer_->asyncWait(
- std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
- }
+ scheduleSend(50);
#endif
}
int MemifConnector::doSend() {
std::size_t max = 0;
- uint16_t n = 0;
std::size_t size = 0;
-
- {
- utils::SpinLock::Acquire locked(write_msgs_lock_);
- size = output_buffer_.size();
+ std::error_code ec = make_error_code(core_error::success);
+ int ret = 0;
+ uint64_t delay = 50; // microseconds
+
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+
+ // Check if there are pending buffers to send
+ if (memif_connection_.tx_buf_num > 0) {
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ delay = 200;
+ goto done;
+ }
}
- do {
- max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS;
-
- if (TRANSPORT_EXPECT_FALSE(
- (n = bufferAlloc(max, memif_connection_->tx_qid)) < 0)) {
- TRANSPORT_LOGE("Error allocating buffers.");
- return -1;
- }
+ // Continue trying to send buffers in output_buffer_
+ size = output_buffer_.size();
+ max = size < max_burst ? size : max_burst;
- for (uint16_t i = 0; i < n; i++) {
- utils::SpinLock::Acquire locked(write_msgs_lock_);
+ ret = bufferAlloc(max, memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool() && ret == 0)) {
+ delay = 200;
+ goto done;
+ }
- auto packet = output_buffer_.front().get();
- const utils::MemBuf *current = packet;
- std::size_t offset = 0;
- uint8_t *shared_buffer =
- reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data);
- do {
- std::memcpy(shared_buffer + offset, current->data(), current->length());
- offset += current->length();
- current = current->next();
- } while (current != packet);
+ // Fill allocated buffers and remove them from output_buffer_
+ for (uint16_t i = 0; i < ret; i++) {
+ auto packet = output_buffer_.front().get();
+ const utils::MemBuf *current = packet;
+ std::size_t offset = 0;
+ uint8_t *shared_buffer =
+ reinterpret_cast<uint8_t *>(memif_connection_.tx_bufs[i].data);
+ do {
+ std::memcpy(shared_buffer + offset, current->data(), current->length());
+ offset += current->length();
+ current = current->next();
+ } while (current != packet);
+
+ memif_connection_.tx_bufs[i].len = uint32_t(offset);
+ output_buffer_.pop_front();
+ }
- memif_connection_->tx_bufs[i].len = uint32_t(offset);
+ // Try to send them
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ LOG(ERROR) << "Tx burst failed " << ec.message();
+ delay = 200;
+ goto done;
+ }
- output_buffer_.pop_front();
- }
+done:
+ memif_refill_queue(memif_connection_.conn, memif_connection_.tx_qid, ret, 0);
- txBurst(memif_connection_->tx_qid);
+ // If there are still packets to send, schedule another send
+ if (memif_connection_.tx_buf_num > 0 || !output_buffer_.empty()) {
+ scheduleSend(delay);
+ }
- utils::SpinLock::Acquire locked(write_msgs_lock_);
- size = output_buffer_.size();
- } while (size > 0);
+ // If error, signal to upper layers
+ if (ec.operator bool()) {
+ std::weak_ptr<MemifConnector> self = shared_from_this();
+ io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->sent_callback_(c.get(), ec);
+ }
+ });
+ }
return 0;
}
-void MemifConnector::send(const uint8_t *packet, std::size_t len,
- const PacketSentCallback &packet_sent) {
- throw errors::NotImplementedException();
-}
-
} // end namespace core
} // end namespace transport
-
-#endif // __vpp__