aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-03-07 19:11:16 +0100
committerMauro Sardara <msardara@cisco.com>2019-03-08 13:32:22 +0100
commit6aaef596f68a514036d5212fc8697bdaf371e5af (patch)
treed09237bb6810c4aa5eff1a3033633e46bb44e3f6 /libtransport
parent3c6c43ef7bc7caa03540b2347e7f180d5b96ec23 (diff)
[HICN-99] Destroy in the correct order and in the correct event loop the attributes of connectors and sockets. Cleanup of prints.
Change-Id: Ie7eef1d186e581aa950f47df20d57681dc33be55 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/core/hicn_binary_api.c2
-rw-r--r--libtransport/src/hicn/transport/core/portal.h98
-rw-r--r--libtransport/src/hicn/transport/core/vpp_binary_api.c6
-rw-r--r--libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc11
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.h4
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc13
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc47
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h11
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h2
-rw-r--r--libtransport/src/hicn/transport/utils/epoll_event_reactor.cc61
-rw-r--r--libtransport/src/hicn/transport/utils/epoll_event_reactor.h20
12 files changed, 128 insertions, 149 deletions
diff --git a/libtransport/src/hicn/transport/core/hicn_binary_api.c b/libtransport/src/hicn/transport/core/hicn_binary_api.c
index d909578a2..3868c0a14 100644
--- a/libtransport/src/hicn/transport/core/hicn_binary_api.c
+++ b/libtransport/src/hicn/transport/core/hicn_binary_api.c
@@ -134,8 +134,6 @@ int hicn_binary_api_register_cons_app(
CONTEXT_SAVE(context_store, api, mp)
- TRANSPORT_LOGI("Message created");
-
return vpp_binary_api_send_request_wait_reply(api->vpp_api, mp);
}
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index cadf37e8c..d9051c23c 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -49,18 +49,19 @@ static constexpr uint32_t pool_size = 2048;
class HandlerMemory {
#ifdef __vpp__
- static constexpr std::size_t memory_size = 1024 * 512;
+ static constexpr std::size_t memory_size = 1024 * 1024;
+
public:
- HandlerMemory() : index_(0) { }
+ HandlerMemory() : index_(0) {}
- HandlerMemory(const HandlerMemory&) = delete;
- HandlerMemory& operator=(const HandlerMemory&) = delete;
+ HandlerMemory(const HandlerMemory &) = delete;
+ HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) {
+ TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
return &storage_[index_++ % memory_size];
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { }
+ TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {}
private:
// Storage space used for handler-based custom memory allocation.
@@ -68,16 +69,16 @@ class HandlerMemory {
uint32_t index_;
#else
public:
- HandlerMemory() { }
+ HandlerMemory() {}
- HandlerMemory(const HandlerMemory&) = delete;
- HandlerMemory& operator=(const HandlerMemory&) = delete;
+ HandlerMemory(const HandlerMemory &) = delete;
+ HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) {
+ TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
return ::operator new(size);
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) {
+ TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
::operator delete(pointer);
}
#endif
@@ -90,34 +91,36 @@ class HandlerAllocator {
public:
using value_type = T;
- explicit HandlerAllocator(HandlerMemory& mem)
- : memory_(mem) {}
+ explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {}
template <typename U>
- HandlerAllocator(const HandlerAllocator<U>& other) noexcept
- : memory_(other.memory_) { }
+ HandlerAllocator(const HandlerAllocator<U> &other) noexcept
+ : memory_(other.memory_) {}
- TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept {
+ TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator &other) const
+ noexcept {
return &memory_ == &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept {
+ TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator &other) const
+ noexcept {
return &memory_ != &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const {
- return static_cast<T*>(memory_.allocate(sizeof(T) * n));
+ TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const {
+ return static_cast<T *>(memory_.allocate(sizeof(T) * n));
}
- TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const {
+ TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const {
return memory_.deallocate(p);
}
private:
- template <typename> friend class HandlerAllocator;
+ template <typename>
+ friend class HandlerAllocator;
// The underlying memory.
- HandlerMemory& memory_;
+ HandlerMemory &memory_;
};
// Wrapper class template for handler objects to allow handler memory
@@ -129,35 +132,33 @@ class CustomAllocatorHandler {
public:
using allocator_type = HandlerAllocator<Handler>;
- CustomAllocatorHandler(HandlerMemory& m, Handler h)
- : memory_(m),
- handler_(h) { }
+ CustomAllocatorHandler(HandlerMemory &m, Handler h)
+ : memory_(m), handler_(h) {}
allocator_type get_allocator() const noexcept {
return allocator_type(memory_);
}
- template <typename ...Args>
- void operator()(Args&&... args) {
+ template <typename... Args>
+ void operator()(Args &&... args) {
handler_(std::forward<Args>(args)...);
}
private:
- HandlerMemory& memory_;
+ HandlerMemory &memory_;
Handler handler_;
};
// Helper function to wrap a handler object to add custom allocation.
template <typename Handler>
inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
- HandlerMemory& m, Handler h) {
+ HandlerMemory &m, Handler h) {
return CustomAllocatorHandler<Handler>(m, h);
}
class Pool {
public:
- Pool(asio::io_service &io_service)
- : io_service_(io_service) {
+ Pool(asio::io_service &io_service) : io_service_(io_service) {
increasePendingInterestPool();
increaseInterestPool();
increaseContentObjectPool();
@@ -223,7 +224,7 @@ class Pool {
asio::io_service &io_service_;
};
-}
+} // namespace portal_details
using PendingInterestHashTable =
std::unordered_map<uint32_t, PendingInterest::Ptr>;
@@ -266,6 +267,7 @@ class Portal {
typename ForwarderInt::ConnectorType>,
ForwarderInt>::value,
"ForwarderInt must inherit from ForwarderInterface!");
+
public:
class ConsumerCallback {
public:
@@ -290,7 +292,7 @@ class Portal {
std::bind(&Portal::setLocalRoutes, this), io_service_,
app_name_),
forwarder_interface_(connector_),
- packet_pool_(io_service) { }
+ packet_pool_(io_service) {}
void setConsumerCallback(ConsumerCallback *consumer_callback) {
consumer_callback_ = consumer_callback;
@@ -310,7 +312,7 @@ class Portal {
forwarder_interface_.connect(is_consumer);
}
- ~Portal() { stopEventsLoop(true); }
+ ~Portal() { killConnection(); }
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
auto it =
@@ -338,11 +340,9 @@ class Portal {
std::move(on_content_object_callback));
pending_interest->setOnTimeoutCallback(
std::move(on_interest_timeout_callback));
- pending_interest->startCountdown(
- portal_details::makeCustomAllocatorHandler(
- async_callback_memory_,
- std::bind(&Portal<ForwarderInt>::timerHandler, this,
- std::placeholders::_1, hash)));
+ pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler,
+ this, std::placeholders::_1, hash)));
pending_interest_hash_table_.emplace(
std::make_pair(hash, std::move(pending_interest)));
}
@@ -398,15 +398,13 @@ class Portal {
forwarder_interface_.send(content_object);
}
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
- if (kill_connection) {
- forwarder_interface_.closeConnection();
+ TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
+ if (!io_service_.stopped()) {
+ io_service_.dispatch([this]() {
+ clear();
+ io_service_.stop();
+ });
}
-
- io_service_.post([this]() {
- clear();
- io_service_.stop();
- });
}
TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); }
@@ -489,12 +487,10 @@ class Portal {
if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
interest_ptr->on_content_object_callback_(
- std::move(interest_ptr->getInterest()),
- std::move(content_object));
+ std::move(interest_ptr->getInterest()), std::move(content_object));
} else if (consumer_callback_) {
consumer_callback_->onContentObject(
- std::move(interest_ptr->getInterest()),
- std::move(content_object));
+ std::move(interest_ptr->getInterest()), std::move(content_object));
}
} else {
TRANSPORT_LOGW("No pending interests for current content (%s)",
diff --git a/libtransport/src/hicn/transport/core/vpp_binary_api.c b/libtransport/src/hicn/transport/core/vpp_binary_api.c
index 09ffb2ec6..d54ef257e 100644
--- a/libtransport/src/hicn/transport/core/vpp_binary_api.c
+++ b/libtransport/src/hicn/transport/core/vpp_binary_api.c
@@ -173,7 +173,7 @@ void vpp_binary_api_send_receive_ping(vpp_binary_api_t *api) {
CONTEXT_SAVE(context_store, api, mp_ping);
- TRANSPORT_LOGI("Sending ping id %u", mp_ping->_vl_msg_id);
+ TRANSPORT_LOGD("Sending ping id %u", mp_ping->_vl_msg_id);
vpp_binary_api_send_request_wait_reply(api, mp_ping);
}
@@ -202,7 +202,7 @@ int vpp_binary_api_set_int_state(vpp_binary_api_t *api, uint32_t sw_index,
CONTEXT_SAVE(context_store, api, mp);
- TRANSPORT_LOGI("Sending set int flags id %u", mp->_vl_msg_id);
+ TRANSPORT_LOGD("Sending set int flags id %u", mp->_vl_msg_id);
return vpp_binary_api_send_request_wait_reply(api, mp);
}
@@ -211,7 +211,7 @@ void vpp_binary_api_send_request(vpp_binary_api_t *api, void *request) {
vl_generic_request_t *req = NULL;
req = (vl_generic_request_t *)request;
- TRANSPORT_LOGI("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id));
+ TRANSPORT_LOGD("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id));
S(api, req);
}
diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
index 0d622f9a3..303b753be 100644
--- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
+++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc
@@ -86,12 +86,10 @@ uint32_t VPPForwarderInterface::getMemifConfiguration() {
void VPPForwarderInterface::consumerConnection() {
hicn_consumer_input_params input = {0};
- hicn_consumer_output_params output;
+ hicn_consumer_output_params output = {0};
ip_address_t ip4_address;
ip_address_t ip6_address;
- std::memset(&output, 0, sizeof(hicn_consumer_output_params));
-
output.src4 = &ip4_address;
output.src6 = &ip6_address;
@@ -120,7 +118,7 @@ void VPPForwarderInterface::producerConnection() {
void VPPForwarderInterface::connect(bool is_consumer) {
std::lock_guard<std::mutex> connection_lock(global_lock_);
- srand(time(NULL));
+ srand(time(nullptr));
int secret = rand() % (1 << 10);
std::stringstream app_name;
app_name << "Libtransport_" << secret;
@@ -209,18 +207,17 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) {
void VPPForwarderInterface::closeConnection() {
if (VPPForwarderInterface::api_) {
+ connector_.close();
+
if (sw_if_index_ != uint32_t(~0)) {
int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_,
sw_if_index_);
-
if (ret < 0) {
TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_);
}
}
vpp_binary_api_destroy(VPPForwarderInterface::api_);
- connector_.close();
-
VPPForwarderInterface::api_ = nullptr;
}
}
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
index b47432460..1d7ad3cb1 100644
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
@@ -104,9 +104,7 @@ class AsyncFullDuplexSocket : public AsyncSocket,
AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service);
AsyncFullDuplexSocket(const core::Prefix &locator);
- ~AsyncFullDuplexSocket() {
- TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!");
- };
+ ~AsyncFullDuplexSocket(){};
using ReadCallback = AsyncReader::ReadCallback;
using WriteCallback = AsyncWriter::WriteCallback;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index febe66853..ca9722849 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -62,25 +62,21 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
timer_interval_milliseconds_(0) {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
- transport_protocol_ = std::make_shared<CbrTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<CbrTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RTC:
- transport_protocol_ = std::make_shared<RTCTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<RTCTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RAAQM:
default:
- transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<RaaqmTransportProtocol>(this);
break;
}
}
ConsumerSocket::~ConsumerSocket() {
stop();
-
async_downloader_.stop();
-
- transport_protocol_.reset();
- portal_.reset();
}
void ConsumerSocket::connect() { portal_->connect(); }
@@ -132,10 +128,9 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
void ConsumerSocket::stop() {
if (transport_protocol_->isRunning()) {
+ std::cout << "Stopping transport protocol " << std::endl;
transport_protocol_->stop();
}
-
- // is_running_ = false;
}
void ConsumerSocket::resume() {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 0f1ad537c..a50aeb583 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -676,7 +676,7 @@ class ConsumerSocket : public BaseSocket {
}
protected:
- std::shared_ptr<TransportProtocol> transport_protocol_;
+ std::unique_ptr<TransportProtocol> transport_protocol_;
private:
// context inner state variables
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 497c40c99..d89fc9367 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -15,6 +15,8 @@
#include <hicn/transport/interfaces/socket_producer.h>
+#include <functional>
+
namespace transport {
namespace interface {
@@ -30,15 +32,12 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
output_buffer_(default_values::producer_socket_output_buffer_size),
- async_thread_(),
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
signature_type_(SHA_256),
hash_algorithm_(HashAlgorithm::SHA_256),
input_buffer_capacity_(default_values::producer_socket_input_buffer_size),
input_buffer_size_(0),
- processing_thread_stop_(false),
- listening_thread_stop_(false),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -49,18 +48,10 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {
- listening_thread_stop_ = false;
-}
+ on_content_produced_(VOID_HANDLER) {}
ProducerSocket::~ProducerSocket() {
- processing_thread_stop_ = true;
- portal_->stopEventsLoop(true);
-
- if (processing_thread_.joinable()) {
- processing_thread_.join();
- }
-
+ stop();
if (listening_thread_.joinable()) {
listening_thread_.join();
}
@@ -308,8 +299,8 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf,
if (is_last_manifest) {
manifest->setFinalManifest(is_last_manifest);
}
+
manifest->encode();
- // Time t0 = std::chrono::steady_clock::now();
identity_->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
}
@@ -324,32 +315,20 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf,
void ProducerSocket::asyncProduce(ContentObject &content_object) {
if (!async_thread_.stopped()) {
- // async_thread_.add(std::bind(&ProducerSocket::produce, this,
- // content_object));
+ auto co_ptr = std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this());
+ async_thread_.add([this, content_object = std::move(co_ptr)]() {
+ produce(*content_object);
+ });
}
}
-// void ProducerSocket::asyncProduce(const Name &suffix,
-// const uint8_t *buf,
-// size_t buffer_size,
-// AsyncProduceCallback && handler) {
-// if (!async_thread_.stopped()) {
-// async_thread_.add([this, buffer = buf, size = buffer_size, cb =
-// std::move(handler)] () {
-// uint64_t bytes_written = produce(suff, buffer, size, 0, false);
-// auto ec = std::make_errc(0);
-// cb(*this, ec, bytes_written);
-// });
-// }
-// }
-
void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
size_t buffer_size) {
if (!async_thread_.stopped()) {
- async_thread_.add(
- [this, suff = suffix, buffer = buf, size = buffer_size]() {
- produce(suff, buffer, size, true);
- });
+ async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() {
+ produce(suffix, buffer, size, 0, false);
+ });
}
}
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 4f38fb30e..1fdbabe2e 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -525,6 +525,10 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_GET;
}
+ private:
+ // Threads
+ std::thread listening_thread_;
+
protected:
asio::io_service internal_io_service_;
asio::io_service &io_service_;
@@ -538,7 +542,6 @@ class ProducerSocket : public Socket<BasePortal>,
private:
utils::EventThread async_thread_;
-
int registration_status_;
bool making_manifest_;
@@ -560,12 +563,6 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic_size_t input_buffer_capacity_;
std::atomic_size_t input_buffer_size_;
- // threads
- std::thread listening_thread_;
- std::thread processing_thread_;
- volatile bool processing_thread_stop_;
- volatile bool listening_thread_stop_;
-
// callbacks
protected:
ProducerInterestCallback on_interest_input_;
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index 6a3b23753..6911eada5 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -39,7 +39,7 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback,
public:
TransportProtocol(interface::ConsumerSocket *icn_socket);
- virtual ~TransportProtocol() { stop(); };
+ virtual ~TransportProtocol() = default;
TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
index 6df9e5656..a97e89500 100644
--- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
+++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc
@@ -48,30 +48,6 @@ int EpollEventReactor::addFileDescriptor(int fd, uint32_t events) {
return 0;
}
-int EpollEventReactor::addFileDescriptor(int fd, uint32_t events,
- EventCallback &callback) {
- auto it = event_callback_map_.find(fd);
-
- if (it == event_callback_map_.end()) {
- event_callback_map_[fd] = callback;
- return addFileDescriptor(fd, events);
- }
-
- return 0;
-}
-
-int EpollEventReactor::addFileDescriptor(int fd, uint32_t events,
- EventCallback &&callback) {
- auto it = event_callback_map_.find(fd);
-
- if (it == event_callback_map_.end()) {
- event_callback_map_[fd] = callback;
- return addFileDescriptor(fd, events);
- }
-
- return 0;
-}
-
int EpollEventReactor::modFileDescriptor(int fd, uint32_t events) {
if (TRANSPORT_EXPECT_FALSE(fd < 0)) {
TRANSPORT_LOGE("invalid fd %d", fd);
@@ -109,6 +85,7 @@ int EpollEventReactor::delFileDescriptor(int fd) {
return -1;
}
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
event_callback_map_.erase(fd);
return 0;
@@ -117,6 +94,8 @@ int EpollEventReactor::delFileDescriptor(int fd) {
void EpollEventReactor::runEventLoop(int timeout) {
Event evt[128];
int en = 0;
+ EventCallbackMap::iterator it;
+ EventCallback callback;
// evt.events = EPOLLIN | EPOLLOUT;
sigset_t sigset;
@@ -134,11 +113,26 @@ void EpollEventReactor::runEventLoop(int timeout) {
for (int i = 0; i < en; i++) {
if (evt[i].data.fd > 0) {
- auto it = event_callback_map_.find(evt[i].data.fd);
+ {
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
+ it = event_callback_map_.find(evt[i].data.fd);
+ }
+
if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) {
TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd);
} else {
- event_callback_map_[evt[i].data.fd](evt[i]);
+ {
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
+ callback = event_callback_map_[evt[i].data.fd];
+ }
+
+ callback(evt[i]);
+
+ // In the callback the epoll event reactor could have been stopped,
+ // then we need to check whether the event loop is still running.
+ if (TRANSPORT_EXPECT_FALSE(!run_event_loop_)) {
+ return;
+ }
}
} else {
TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd);
@@ -150,6 +144,8 @@ void EpollEventReactor::runEventLoop(int timeout) {
void EpollEventReactor::runOneEvent() {
Event evt;
int en = 0;
+ EventCallbackMap::iterator it;
+ EventCallback callback;
// evt.events = EPOLLIN | EPOLLOUT;
sigset_t sigset;
@@ -165,11 +161,20 @@ void EpollEventReactor::runOneEvent() {
}
if (TRANSPORT_EXPECT_TRUE(evt.data.fd > 0)) {
- auto it = event_callback_map_.find(evt.data.fd);
+ {
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
+ it = event_callback_map_.find(evt.data.fd);
+ }
+
if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) {
TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd);
} else {
- event_callback_map_[evt.data.fd](evt);
+ {
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
+ callback = event_callback_map_[evt.data.fd];
+ }
+
+ callback(evt);
}
} else {
TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd);
diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h
index dbb87c6c5..04c10fc7e 100644
--- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h
+++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h
@@ -16,6 +16,7 @@
#pragma once
#include <hicn/transport/utils/event_reactor.h>
+#include <hicn/transport/utils/spinlock.h>
#include <sys/epoll.h>
#include <atomic>
@@ -38,9 +39,22 @@ class EpollEventReactor : public EventReactor {
~EpollEventReactor();
- int addFileDescriptor(int fd, uint32_t events, EventCallback &callback);
+ template <typename EventHandler>
+ int addFileDescriptor(int fd, uint32_t events, EventHandler &&callback) {
+ auto it = event_callback_map_.find(fd);
+ int ret = 0;
- int addFileDescriptor(int fd, uint32_t events, EventCallback &&callback);
+ if (it == event_callback_map_.end()) {
+ {
+ utils::SpinLock::Acquire locked(event_callback_map_lock_);
+ event_callback_map_[fd] = std::forward<EventHandler &&>(callback);
+ }
+
+ ret = addFileDescriptor(fd, events);
+ }
+
+ return ret;
+ }
int delFileDescriptor(int fd);
@@ -60,7 +74,7 @@ class EpollEventReactor : public EventReactor {
int epoll_fd_;
std::atomic_bool run_event_loop_;
EventCallbackMap event_callback_map_;
- std::mutex event_callback_map_mutex_;
+ utils::SpinLock event_callback_map_lock_;
};
} // namespace utils