diff options
Diffstat (limited to 'libtransport/src/core/memif_connector.cc')
-rw-r--r-- | libtransport/src/core/memif_connector.cc | 525 |
1 files changed, 265 insertions, 260 deletions
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc index 553aab42a..a224beb11 100644 --- a/libtransport/src/core/memif_connector.cc +++ b/libtransport/src/core/memif_connector.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,18 +13,16 @@ * limitations under the License. */ +#include <core/errors.h> #include <core/memif_connector.h> +#include <glog/logging.h> #include <hicn/transport/errors/not_implemented_exception.h> - -#ifdef __vpp__ - #include <sys/epoll.h> #include <cstdlib> -extern "C" { -#include <memif/libmemif.h> -}; +/* sstrncpy */ +#include <hicn/util/sstrncpy.h> #define CANCEL_TIMER 1 @@ -32,190 +30,174 @@ namespace transport { namespace core { -struct memif_connection { - uint16_t index; - /* memif conenction handle */ - memif_conn_handle_t conn; - /* transmit queue id */ - uint16_t tx_qid; - /* tx buffers */ - memif_buffer_t *tx_bufs; - /* allocated tx buffers counter */ - /* number of tx buffers pointing to shared memory */ - uint16_t tx_buf_num; - /* rx buffers */ - memif_buffer_t *rx_bufs; - /* allcoated rx buffers counter */ - /* number of rx buffers pointing to shared memory */ - uint16_t rx_buf_num; - /* interface ip address */ - uint8_t ip_addr[4]; -}; - -std::once_flag MemifConnector::flag_; -utils::EpollEventReactor MemifConnector::main_event_reactor_; - MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&on_reconnect_callback, + PacketSentCallback &&packet_sent, + OnCloseCallback &&close_callback, + OnReconnectCallback &&on_reconnect, asio::io_service &io_service, std::string app_name) - : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), - memif_worker_(nullptr), + : Connector(std::move(receive_callback), std::move(packet_sent), + std::move(close_callback), std::move(on_reconnect)), + event_reactor_(), + memif_worker_(std::bind(&MemifConnector::threadMain, this)), timer_set_(false), - send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), - disconnect_timer_( - std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), + send_timer_(event_reactor_), + disconnect_timer_(event_reactor_), io_service_(io_service), - packet_counter_(0), - memif_connection_(std::make_unique<memif_connection_t>()), + work_(asio::make_work_guard(io_service_)), + memif_connection_({0}), tx_buf_counter_(0), is_reconnection_(false), data_available_(false), app_name_(app_name), - socket_filename_("") { - std::call_once(MemifConnector::flag_, &MemifConnector::init, this); -} - -MemifConnector::~MemifConnector() { close(); } - -void MemifConnector::init() { - /* initialize memory interface */ - int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()), - nullptr, nullptr, nullptr); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_init: %s", memif_strerror(err)); + socket_filename_(""), + buffer_size_(kbuf_size), + log2_ring_size_(klog2_ring_size), + max_memif_bufs_(1 << klog2_ring_size) {} + +MemifConnector::~MemifConnector() { + try { + close(); + } catch (errors::RuntimeException &e) { + // do nothing } } -void MemifConnector::connect(uint32_t memif_id, long memif_mode) { - state_ = ConnectorState::CONNECTING; +void MemifConnector::connect(uint32_t memif_id, long memif_mode, + const std::string &socket_filename, + std::size_t buffer_size, + std::size_t log2_ring_size) { + state_ = State::CONNECTING; memif_id_ = memif_id; - socket_filename_ = "/run/vpp/memif.sock"; - - createMemif(memif_id, memif_mode, nullptr); + socket_filename_ = socket_filename; + buffer_size_ = buffer_size; + log2_ring_size_ = log2_ring_size; + max_memif_bufs_ = 1 << log2_ring_size; + createMemif(memif_id, memif_mode); +} - work_ = std::make_unique<asio::io_service::work>(io_service_); +int MemifConnector::createMemif(uint32_t index, uint8_t is_master) { + int err = MEMIF_ERR_SUCCESS; - while (state_ != ConnectorState::CONNECTED) { - MemifConnector::main_event_reactor_.runOneEvent(); - } + memif_socket_args_t socket_args; + memif_conn_args_t args; + memset(&socket_args, 0, sizeof(memif_socket_args_t)); + memset(&args, 0, sizeof(memif_conn_args_t)); - int err; + // Setup memif socket first - /* get interrupt queue id */ - int fd = -1; - err = memif_get_queue_efd(memif_connection_->conn, 0, &fd); - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_get_queue_efd: %s", memif_strerror(err)); - return; + int rc = strcpy_s(socket_args.path, sizeof(socket_args.path) - 1, + socket_filename_.c_str()); + if (rc != EOK) { + std::string error = "Provided socket path is larger than " + + std::to_string(sizeof(socket_args.path)) + " bytes."; + throw errors::RuntimeException(error); } - // Remove fd from main epoll - main_event_reactor_.delFileDescriptor(fd); - - // Add fd to epoll of instance - event_reactor_.addFileDescriptor( - fd, EPOLLIN, [this](const utils::Event &evt) -> int { - return onInterrupt(memif_connection_->conn, this, 0); - }); + rc = strcpy_s(socket_args.app_name, sizeof(socket_args.app_name) - 1, + app_name_.c_str()); + if (rc != EOK) { + std::string error = "Provided app_name is larger than " + + std::to_string(sizeof(socket_args.app_name)) + + " bytes."; + throw errors::RuntimeException(error); + } - memif_worker_ = std::make_unique<std::thread>( - std::bind(&MemifConnector::threadMain, this)); -} + socket_args.on_control_fd_update = controlFdUpdate; + socket_args.alloc = nullptr; + socket_args.realloc = nullptr; + socket_args.free = nullptr; -int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) { - memif_connection_t *c = memif_connection_.get(); + err = memif_create_socket(&args.socket, &socket_args, this); - /* setting memif connection arguments */ - memif_conn_args_t args; - memset(&args, 0, sizeof(args)); + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + throw errors::RuntimeException(memif_strerror(err)); + } - args.is_master = mode; - args.log2_ring_size = MEMIF_LOG2_RING_SIZE; - args.buffer_size = MEMIF_BUF_SIZE; + // Setup memif connection using provided memif_socket_handle_t + args.is_master = is_master; + args.log2_ring_size = log2_ring_size_; + args.buffer_size = buffer_size_; args.num_s2m_rings = 1; args.num_m2s_rings = 1; - strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1); + strcpy_s((char *)args.interface_name, sizeof(args.interface_name), IF_NAME); args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; - - int err; - - err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr); + args.interface_id = index; + err = memif_create(&memif_connection_.conn, &args, onConnect, onDisconnect, + onInterrupt, this); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { throw errors::RuntimeException(memif_strerror(err)); } - args.interface_id = index; - /* last argument for memif_create (void * private_ctx) is used by user - to identify connection. this context is returned with callbacks */ - - /* default interrupt */ - if (s == nullptr) { - err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt, - this); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - throw errors::RuntimeException(memif_strerror(err)); - } - } - - c->index = (uint16_t)index; - c->tx_qid = 0; + memif_connection_.index = (uint16_t)index; + memif_connection_.tx_qid = 0; /* alloc memif buffers */ - c->rx_buf_num = 0; - c->rx_bufs = static_cast<memif_buffer_t *>( - malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); - c->tx_buf_num = 0; - c->tx_bufs = static_cast<memif_buffer_t *>( - malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); - - // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0); + memif_connection_.rx_buf_num = 0; + memif_connection_.rx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * max_memif_bufs_)); + memif_connection_.tx_buf_num = 0; + memif_connection_.tx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * max_memif_bufs_)); return 0; } int MemifConnector::deleteMemif() { - memif_connection_t *c = memif_connection_.get(); - - if (c->rx_bufs) { - free(c->rx_bufs); + if (memif_connection_.rx_bufs) { + free(memif_connection_.rx_bufs); } - c->rx_bufs = nullptr; - c->rx_buf_num = 0; + memif_connection_.rx_bufs = nullptr; + memif_connection_.rx_buf_num = 0; - if (c->tx_bufs) { - free(c->tx_bufs); + if (memif_connection_.tx_bufs) { + free(memif_connection_.tx_bufs); } - c->tx_bufs = nullptr; - c->tx_buf_num = 0; + memif_connection_.tx_bufs = nullptr; + memif_connection_.tx_buf_num = 0; int err; /* disconenct then delete memif connection */ - err = memif_delete(&c->conn); + err = memif_delete(&memif_connection_.conn); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_delete: %s", memif_strerror(err)); + LOG(ERROR) << "memif_delete: " << memif_strerror(err); } - if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) { - TRANSPORT_LOGE("memif delete fail"); + if (TRANSPORT_EXPECT_FALSE(memif_connection_.conn != nullptr)) { + LOG(ERROR) << "memif delete fail"; } + state_ = State::CLOSED; + return 0; } -int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) { +int MemifConnector::controlFdUpdate(memif_fd_event_t fde, void *private_ctx) { + auto self = reinterpret_cast<MemifConnector *>(private_ctx); + uint32_t evt = 0; + /* convert memif event definitions to epoll events */ + auto events = fde.type; + auto fd = fde.fd; + + if (events & MEMIF_FD_EVENT_ERROR) { + LOG(ERROR) << "memif fd event: Error"; + return -1; + } + if (events & MEMIF_FD_EVENT_DEL) { - return MemifConnector::main_event_reactor_.delFileDescriptor(fd); + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: DEL fd " << fd; + return self->event_reactor_.delFileDescriptor(fd); } - uint32_t evt = 0; + if (events & MEMIF_FD_EVENT_MOD) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: MOD fd " << fd; + return self->event_reactor_.modFileDescriptor(fd, evt); + } if (events & MEMIF_FD_EVENT_READ) { evt |= EPOLLIN; @@ -225,13 +207,10 @@ int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) { evt |= EPOLLOUT; } - if (events & MEMIF_FD_EVENT_MOD) { - return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt); - } - - return MemifConnector::main_event_reactor_.addFileDescriptor( - fd, evt, [](const utils::Event &evt) -> int { - uint32_t event = 0; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: ADD fd " << fd; + return self->event_reactor_.addFileDescriptor( + fd, evt, [fde](const utils::Event &evt) -> int { + int event = 0; int memif_err = 0; if (evt.events & EPOLLIN) { @@ -246,82 +225,86 @@ int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) { event |= MEMIF_FD_EVENT_ERROR; } - memif_err = memif_control_fd_handler(evt.data.fd, event); + memif_err = memif_control_fd_handler(fde.private_ctx, + memif_fd_event_type_t(event)); if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_control_fd_handler: %s", - memif_strerror(memif_err)); + LOG(ERROR) << "memif_control_fd_handler: " + << memif_strerror(memif_err); } return 0; }); } -int MemifConnector::bufferAlloc(long n, uint16_t qid) { - memif_connection_t *c = memif_connection_.get(); +uint16_t MemifConnector::bufferAlloc(long n, uint16_t qid, + std::error_code &ec) { int err; - uint16_t r; + uint16_t r = 0; /* set data pointer to shared memory and set buffer_len to shared mmeory * buffer len */ - err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000); + err = memif_buffer_alloc(memif_connection_.conn, qid, + memif_connection_.tx_bufs, n, &r, buffer_size_); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_buffer_alloc: %s", memif_strerror(err)); + ec = make_error_code(core_error::send_buffer_allocation_failed); } - c->tx_buf_num += r; + memif_connection_.tx_buf_num += r; return r; } -int MemifConnector::txBurst(uint16_t qid) { - memif_connection_t *c = memif_connection_.get(); - int err; - uint16_t r; +uint16_t MemifConnector::txBurst(uint16_t qid, std::error_code &ec) { + int err = MEMIF_ERR_SUCCESS; + ec = make_error_code(core_error::success); + uint16_t tx = 0; + /* inform peer memif interface about data in shared memory buffers */ /* mark memif buffers as free */ - err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r); + err = memif_tx_burst(memif_connection_.conn, qid, memif_connection_.tx_bufs, + memif_connection_.tx_buf_num, &tx); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err)); + ec = make_error_code(core_error::send_failed); } - // err = memif_refill_queue(c->conn, qid, r, 0); + memif_connection_.tx_buf_num -= tx; + return tx; +} - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err)); - c->tx_buf_num -= r; - return -1; +void MemifConnector::scheduleSend(std::uint64_t delay) { + if (!timer_set_) { + timer_set_ = true; + send_timer_.expiresFromNow(std::chrono::microseconds(delay)); + send_timer_.asyncWait( + std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1)); } - - c->tx_buf_num -= r; - return 0; } void MemifConnector::sendCallback(const std::error_code &ec) { timer_set_ = false; - if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) { + if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) { doSend(); } } -void MemifConnector::processInputBuffer(std::uint16_t total_packets) { - Packet::MemBufPtr ptr; - - for (; total_packets > 0; total_packets--) { - if (input_buffer_.pop(ptr)) { - receive_callback_(std::move(ptr)); - } - } -} - /* informs user about connected status. private_ctx is used by user to identify connection (multiple connections WIP) */ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { - MemifConnector *connector = (MemifConnector *)private_ctx; - connector->state_ = ConnectorState::CONNECTED; + auto self = reinterpret_cast<MemifConnector *>(private_ctx); + self->state_ = State::CONNECTED; memif_refill_queue(conn, 0, -1, 0); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Memif " << self->app_name_ << " connected"; + + // We are connected. Notify higher layers. + self->io_service_.post([self]() { + self->on_reconnect_callback_(self, make_error_code(core_error::success)); + }); + + self->doSend(); + return 0; } @@ -329,54 +312,51 @@ int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { identify connection (multiple connections WIP) */ int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) { MemifConnector *connector = (MemifConnector *)private_ctx; - connector->state_ = ConnectorState::CLOSED; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Memif " << connector->app_name_ << " disconnected"; return 0; } -void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); } +void MemifConnector::threadMain() { event_reactor_.runEventLoop(200); } int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, uint16_t qid) { MemifConnector *connector = (MemifConnector *)private_ctx; - memif_connection_t *c = connector->memif_connection_.get(); + Details &c = connector->memif_connection_; + std::weak_ptr<MemifConnector> self = connector->shared_from_this(); + std::vector<::utils::MemBuf::Ptr> v; + std::error_code ec = make_error_code(core_error::success); + int err = MEMIF_ERR_SUCCESS, ret_val; - uint16_t total_packets = 0; - uint16_t rx; + uint16_t rx = 0; do { - err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx); + err = memif_rx_burst(conn, qid, c.rx_bufs, max_burst, &rx); ret_val = err; if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF)) { - TRANSPORT_LOGE("memif_rx_burst: %s", memif_strerror(err)); + ec = make_error_code(core_error::receive_failed); + LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err); goto error; } - c->rx_buf_num += rx; + c.rx_buf_num += rx; if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) { - TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx); + LOG(ERROR) << "socket stopped: ignoring " << rx << " packets"; goto error; } std::size_t packet_length; + v.reserve(rx); for (int i = 0; i < rx; i++) { - auto packet = connector->getPacket(); - packet_length = (c->rx_bufs + i)->len; - std::memcpy(packet->writableData(), - reinterpret_cast<const uint8_t *>((c->rx_bufs + i)->data), - packet_length); - packet->append(packet_length); - - if (!connector->input_buffer_.push(std::move(packet))) { - TRANSPORT_LOGE("Error pushing packet. Ring buffer full."); - - // TODO Here we should consider the possibility to signal the congestion - // to the application, that would react properly (e.g. slow down - // message) - } + auto buffer = connector->getRawBuffer(); + packet_length = (c.rx_bufs + i)->len; + std::memcpy(buffer.first, (c.rx_bufs + i)->data, packet_length); + auto packet = connector->getPacketFromBuffer(buffer.first, packet_length); + v.emplace_back(std::move(packet)); } /* mark memif buffers and shared memory buffers as free */ @@ -385,114 +365,139 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, err = memif_refill_queue(conn, qid, rx, 0); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err)); + LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); } - c->rx_buf_num -= rx; - total_packets += rx; + c.rx_buf_num -= rx; } while (ret_val == MEMIF_ERR_NOBUF); - connector->io_service_.post( - std::bind(&MemifConnector::processInputBuffer, connector, total_packets)); + connector->io_service_.post([self, buffers = std::move(v)]() { + if (auto c = self.lock()) { + c->receive_callback_(c.get(), buffers, + std::make_error_code(std::errc(0))); + } + }); return 0; error: - err = memif_refill_queue(c->conn, qid, rx, 0); + err = memif_refill_queue(c.conn, qid, rx, 0); if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err)); + LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); } - c->rx_buf_num -= rx; + c.rx_buf_num -= rx; + + connector->io_service_.post([self, ec]() { + if (auto c = self.lock()) { + c->receive_callback_(c.get(), {}, ec); + } + }); return 0; } void MemifConnector::close() { - if (state_ != ConnectorState::CLOSED) { - disconnect_timer_->expiresFromNow(std::chrono::microseconds(50)); - disconnect_timer_->asyncWait([this](const std::error_code &ec) { + if (state_ != State::CLOSED) { + disconnect_timer_.expiresFromNow(std::chrono::microseconds(50)); + disconnect_timer_.asyncWait([this](const std::error_code &ec) { deleteMemif(); event_reactor_.stop(); - work_.reset(); }); + } - if (memif_worker_ && memif_worker_->joinable()) { - memif_worker_->join(); - } + if (memif_worker_.joinable()) { + memif_worker_.join(); } } -void MemifConnector::send(const Packet::MemBufPtr &packet) { +void MemifConnector::send(Packet &packet) { send(packet.shared_from_this()); } + +void MemifConnector::send(const utils::MemBuf::Ptr &buffer) { { utils::SpinLock::Acquire locked(write_msgs_lock_); - output_buffer_.push_back(packet); + output_buffer_.push_back(buffer); } #if CANCEL_TIMER - if (!timer_set_) { - timer_set_ = true; - send_timer_->expiresFromNow(std::chrono::microseconds(50)); - send_timer_->asyncWait( - std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1)); - } + scheduleSend(50); #endif } int MemifConnector::doSend() { std::size_t max = 0; - uint16_t n = 0; std::size_t size = 0; - - { - utils::SpinLock::Acquire locked(write_msgs_lock_); - size = output_buffer_.size(); + std::error_code ec = make_error_code(core_error::success); + int ret = 0; + uint64_t delay = 50; // microseconds + + utils::SpinLock::Acquire locked(write_msgs_lock_); + + // Check if there are pending buffers to send + if (memif_connection_.tx_buf_num > 0) { + ret = txBurst(memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + delay = 200; + goto done; + } } - do { - max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS; - - if (TRANSPORT_EXPECT_FALSE( - (n = bufferAlloc(max, memif_connection_->tx_qid)) < 0)) { - TRANSPORT_LOGE("Error allocating buffers."); - return -1; - } + // Continue trying to send buffers in output_buffer_ + size = output_buffer_.size(); + max = size < max_burst ? size : max_burst; - for (uint16_t i = 0; i < n; i++) { - utils::SpinLock::Acquire locked(write_msgs_lock_); + ret = bufferAlloc(max, memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool() && ret == 0)) { + delay = 200; + goto done; + } - auto packet = output_buffer_.front().get(); - const utils::MemBuf *current = packet; - std::size_t offset = 0; - uint8_t *shared_buffer = - reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data); - do { - std::memcpy(shared_buffer + offset, current->data(), current->length()); - offset += current->length(); - current = current->next(); - } while (current != packet); + // Fill allocated buffers and remove them from output_buffer_ + for (uint16_t i = 0; i < ret; i++) { + auto packet = output_buffer_.front().get(); + const utils::MemBuf *current = packet; + std::size_t offset = 0; + uint8_t *shared_buffer = + reinterpret_cast<uint8_t *>(memif_connection_.tx_bufs[i].data); + do { + std::memcpy(shared_buffer + offset, current->data(), current->length()); + offset += current->length(); + current = current->next(); + } while (current != packet); + + memif_connection_.tx_bufs[i].len = uint32_t(offset); + output_buffer_.pop_front(); + } - memif_connection_->tx_bufs[i].len = uint32_t(offset); + // Try to send them + ret = txBurst(memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + LOG(ERROR) << "Tx burst failed " << ec.message(); + delay = 200; + goto done; + } - output_buffer_.pop_front(); - } +done: + memif_refill_queue(memif_connection_.conn, memif_connection_.tx_qid, ret, 0); - txBurst(memif_connection_->tx_qid); + // If there are still packets to send, schedule another send + if (memif_connection_.tx_buf_num > 0 || !output_buffer_.empty()) { + scheduleSend(delay); + } - utils::SpinLock::Acquire locked(write_msgs_lock_); - size = output_buffer_.size(); - } while (size > 0); + // If error, signal to upper layers + if (ec.operator bool()) { + std::weak_ptr<MemifConnector> self = shared_from_this(); + io_service_.post([self, ec]() { + if (auto c = self.lock()) { + c->sent_callback_(c.get(), ec); + } + }); + } return 0; } -void MemifConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) { - throw errors::NotImplementedException(); -} - } // end namespace core } // end namespace transport - -#endif // __vpp__ |