aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc22
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.h2
-rw-r--r--libtransport/src/hicn/transport/utils/epoll_event_reactor.cc10
-rw-r--r--libtransport/src/hicn/transport/utils/object_pool.h1
-rw-r--r--libtransport/src/hicn/transport/utils/ring_buffer.h2
5 files changed, 15 insertions, 22 deletions
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index a650c3681..38b2a2a98 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -298,17 +298,11 @@ int MemifConnector::txBurst(uint16_t qid) {
}
void MemifConnector::sendCallback(const std::error_code &ec) {
+ timer_set_ = false;
+
if (TRANSPORT_EXPECT_TRUE(!ec && !is_connecting_)) {
doSend();
}
-
- if (output_buffer_.size() > 0) {
- send_timer_->expiresFromNow(std::chrono::microseconds(50));
- send_timer_->asyncWait(
- std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
- } else {
- timer_set_ = false;
- }
}
void MemifConnector::processInputBuffer() {
@@ -378,6 +372,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
packet->append(packet_length);
if (!connector->input_buffer_.push(std::move(packet))) {
+
TRANSPORT_LOGI("Error pushing packet. Ring buffer full.");
// TODO Here we should consider the possibility to signal the congestion
@@ -442,7 +437,11 @@ void MemifConnector::close() {
void MemifConnector::enableBurst() { enable_burst_ = true; }
void MemifConnector::send(const Packet::MemBufPtr &packet) {
-#ifdef CANCEL_TIMER
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ output_buffer_.push_back(packet);
+ }
+#if CANCEL_TIMER
if (!timer_set_) {
timer_set_ = true;
send_timer_->expiresFromNow(std::chrono::microseconds(50));
@@ -450,11 +449,6 @@ void MemifConnector::send(const Packet::MemBufPtr &packet) {
std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
}
#endif
-
- {
- utils::SpinLock::Acquire locked(write_msgs_lock_);
- output_buffer_.push_back(packet);
- }
}
int MemifConnector::doSend() {
diff --git a/libtransport/src/hicn/transport/core/memif_connector.h b/libtransport/src/hicn/transport/core/memif_connector.h
index 6e76b78da..3d2e8411d 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.h
+++ b/libtransport/src/hicn/transport/core/memif_connector.h
@@ -110,7 +110,7 @@ class MemifConnector : public Connector {
int epfd;
std::unique_ptr<std::thread> memif_worker_;
utils::EpollEventReactor event_reactor_;
- volatile bool timer_set_;
+ std::atomic_bool timer_set_;
std::unique_ptr<utils::FdDeadlineTimer> send_timer_;
asio::io_service &io_service_;
std::unique_ptr<asio::io_service::work> work_;
diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
index 81b471857..6df9e5656 100644
--- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
+++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
@@ -51,10 +51,9 @@ int EpollEventReactor::addFileDescriptor(int fd, uint32_t events) {
int EpollEventReactor::addFileDescriptor(int fd, uint32_t events,
EventCallback &callback) {
auto it = event_callback_map_.find(fd);
- event_callback_map_[fd] = callback;
- if (it != event_callback_map_.end()) {
+
+ if (it == event_callback_map_.end()) {
event_callback_map_[fd] = callback;
- } else {
return addFileDescriptor(fd, events);
}
@@ -64,10 +63,9 @@ int EpollEventReactor::addFileDescriptor(int fd, uint32_t events,
int EpollEventReactor::addFileDescriptor(int fd, uint32_t events,
EventCallback &&callback) {
auto it = event_callback_map_.find(fd);
- event_callback_map_[fd] = callback;
- if (it != event_callback_map_.end()) {
+
+ if (it == event_callback_map_.end()) {
event_callback_map_[fd] = callback;
- } else {
return addFileDescriptor(fd, events);
}
diff --git a/libtransport/src/hicn/transport/utils/object_pool.h b/libtransport/src/hicn/transport/utils/object_pool.h
index 9fda214cd..e34730e81 100644
--- a/libtransport/src/hicn/transport/utils/object_pool.h
+++ b/libtransport/src/hicn/transport/utils/object_pool.h
@@ -62,6 +62,7 @@ class ObjectPool {
void add(T *object) {
utils::SpinLock::Acquire locked(object_pool_lock_);
+
if (TRANSPORT_EXPECT_TRUE(!destructor_)) {
object_pool_.emplace_back(makePtr(object));
}
diff --git a/libtransport/src/hicn/transport/utils/ring_buffer.h b/libtransport/src/hicn/transport/utils/ring_buffer.h
index 52bcd81c4..9babe56bd 100644
--- a/libtransport/src/hicn/transport/utils/ring_buffer.h
+++ b/libtransport/src/hicn/transport/utils/ring_buffer.h
@@ -86,7 +86,7 @@ bool CircularFifo<Element, Size>::push(Element&& item) {
// the tail must be accessed with at least acquire
template <typename Element, std::size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item) {
- const auto current_head = head_.load(std::memory_order_relaxed);
+ const size_t current_head = head_.load(std::memory_order_relaxed);
if (current_head == tail_.load(std::memory_order_acquire)) {
return false; // empty queue
}