diff options
author | Mauro Sardara <msardara@cisco.com> | 2019-03-07 19:11:16 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2019-03-08 13:32:22 +0100 |
commit | 6aaef596f68a514036d5212fc8697bdaf371e5af (patch) | |
tree | d09237bb6810c4aa5eff1a3033633e46bb44e3f6 /libtransport | |
parent | 3c6c43ef7bc7caa03540b2347e7f180d5b96ec23 (diff) |
[HICN-99] Destroy in the correct order and in the correct event loop the attributes of connectors and sockets. Cleanup of prints.
Change-Id: Ie7eef1d186e581aa950f47df20d57681dc33be55
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
12 files changed, 128 insertions, 149 deletions
diff --git a/libtransport/src/hicn/transport/core/hicn_binary_api.c b/libtransport/src/hicn/transport/core/hicn_binary_api.c index d909578a2..3868c0a14 100644 --- a/libtransport/src/hicn/transport/core/hicn_binary_api.c +++ b/libtransport/src/hicn/transport/core/hicn_binary_api.c @@ -134,8 +134,6 @@ int hicn_binary_api_register_cons_app( CONTEXT_SAVE(context_store, api, mp) - TRANSPORT_LOGI("Message created"); - return vpp_binary_api_send_request_wait_reply(api->vpp_api, mp); } diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index cadf37e8c..d9051c23c 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -49,18 +49,19 @@ static constexpr uint32_t pool_size = 2048; class HandlerMemory { #ifdef __vpp__ - static constexpr std::size_t memory_size = 1024 * 512; + static constexpr std::size_t memory_size = 1024 * 1024; + public: - HandlerMemory() : index_(0) { } + HandlerMemory() : index_(0) {} - HandlerMemory(const HandlerMemory&) = delete; - HandlerMemory& operator=(const HandlerMemory&) = delete; + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { return &storage_[index_++ % memory_size]; } - TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { } + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {} private: // Storage space used for handler-based custom memory allocation. @@ -68,16 +69,16 @@ class HandlerMemory { uint32_t index_; #else public: - HandlerMemory() { } + HandlerMemory() {} - HandlerMemory(const HandlerMemory&) = delete; - HandlerMemory& operator=(const HandlerMemory&) = delete; + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { return ::operator new(size); } - TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { ::operator delete(pointer); } #endif @@ -90,34 +91,36 @@ class HandlerAllocator { public: using value_type = T; - explicit HandlerAllocator(HandlerMemory& mem) - : memory_(mem) {} + explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {} template <typename U> - HandlerAllocator(const HandlerAllocator<U>& other) noexcept - : memory_(other.memory_) { } + HandlerAllocator(const HandlerAllocator<U> &other) noexcept + : memory_(other.memory_) {} - TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept { + TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator &other) const + noexcept { return &memory_ == &other.memory_; } - TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept { + TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator &other) const + noexcept { return &memory_ != &other.memory_; } - TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const { - return static_cast<T*>(memory_.allocate(sizeof(T) * n)); + TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + return static_cast<T *>(memory_.allocate(sizeof(T) * n)); } - TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const { + TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } private: - template <typename> friend class HandlerAllocator; + template <typename> + friend class HandlerAllocator; // The underlying memory. - HandlerMemory& memory_; + HandlerMemory &memory_; }; // Wrapper class template for handler objects to allow handler memory @@ -129,35 +132,33 @@ class CustomAllocatorHandler { public: using allocator_type = HandlerAllocator<Handler>; - CustomAllocatorHandler(HandlerMemory& m, Handler h) - : memory_(m), - handler_(h) { } + CustomAllocatorHandler(HandlerMemory &m, Handler h) + : memory_(m), handler_(h) {} allocator_type get_allocator() const noexcept { return allocator_type(memory_); } - template <typename ...Args> - void operator()(Args&&... args) { + template <typename... Args> + void operator()(Args &&... args) { handler_(std::forward<Args>(args)...); } private: - HandlerMemory& memory_; + HandlerMemory &memory_; Handler handler_; }; // Helper function to wrap a handler object to add custom allocation. template <typename Handler> inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( - HandlerMemory& m, Handler h) { + HandlerMemory &m, Handler h) { return CustomAllocatorHandler<Handler>(m, h); } class Pool { public: - Pool(asio::io_service &io_service) - : io_service_(io_service) { + Pool(asio::io_service &io_service) : io_service_(io_service) { increasePendingInterestPool(); increaseInterestPool(); increaseContentObjectPool(); @@ -223,7 +224,7 @@ class Pool { asio::io_service &io_service_; }; -} +} // namespace portal_details using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest::Ptr>; @@ -266,6 +267,7 @@ class Portal { typename ForwarderInt::ConnectorType>, ForwarderInt>::value, "ForwarderInt must inherit from ForwarderInterface!"); + public: class ConsumerCallback { public: @@ -290,7 +292,7 @@ class Portal { std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), forwarder_interface_(connector_), - packet_pool_(io_service) { } + packet_pool_(io_service) {} void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; @@ -310,7 +312,7 @@ class Portal { forwarder_interface_.connect(is_consumer); } - ~Portal() { stopEventsLoop(true); } + ~Portal() { killConnection(); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { auto it = @@ -338,11 +340,9 @@ class Portal { std::move(on_content_object_callback)); pending_interest->setOnTimeoutCallback( std::move(on_interest_timeout_callback)); - pending_interest->startCountdown( - portal_details::makeCustomAllocatorHandler( - async_callback_memory_, - std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, hash))); + pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, + this, std::placeholders::_1, hash))); pending_interest_hash_table_.emplace( std::make_pair(hash, std::move(pending_interest))); } @@ -398,15 +398,13 @@ class Portal { forwarder_interface_.send(content_object); } - TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { - if (kill_connection) { - forwarder_interface_.closeConnection(); + TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { + if (!io_service_.stopped()) { + io_service_.dispatch([this]() { + clear(); + io_service_.stop(); + }); } - - io_service_.post([this]() { - clear(); - io_service_.stop(); - }); } TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } @@ -489,12 +487,10 @@ class Portal { if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { interest_ptr->on_content_object_callback_( - std::move(interest_ptr->getInterest()), - std::move(content_object)); + std::move(interest_ptr->getInterest()), std::move(content_object)); } else if (consumer_callback_) { consumer_callback_->onContentObject( - std::move(interest_ptr->getInterest()), - std::move(content_object)); + std::move(interest_ptr->getInterest()), std::move(content_object)); } } else { TRANSPORT_LOGW("No pending interests for current content (%s)", diff --git a/libtransport/src/hicn/transport/core/vpp_binary_api.c b/libtransport/src/hicn/transport/core/vpp_binary_api.c index 09ffb2ec6..d54ef257e 100644 --- a/libtransport/src/hicn/transport/core/vpp_binary_api.c +++ b/libtransport/src/hicn/transport/core/vpp_binary_api.c @@ -173,7 +173,7 @@ void vpp_binary_api_send_receive_ping(vpp_binary_api_t *api) { CONTEXT_SAVE(context_store, api, mp_ping); - TRANSPORT_LOGI("Sending ping id %u", mp_ping->_vl_msg_id); + TRANSPORT_LOGD("Sending ping id %u", mp_ping->_vl_msg_id); vpp_binary_api_send_request_wait_reply(api, mp_ping); } @@ -202,7 +202,7 @@ int vpp_binary_api_set_int_state(vpp_binary_api_t *api, uint32_t sw_index, CONTEXT_SAVE(context_store, api, mp); - TRANSPORT_LOGI("Sending set int flags id %u", mp->_vl_msg_id); + TRANSPORT_LOGD("Sending set int flags id %u", mp->_vl_msg_id); return vpp_binary_api_send_request_wait_reply(api, mp); } @@ -211,7 +211,7 @@ void vpp_binary_api_send_request(vpp_binary_api_t *api, void *request) { vl_generic_request_t *req = NULL; req = (vl_generic_request_t *)request; - TRANSPORT_LOGI("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id)); + TRANSPORT_LOGD("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id)); S(api, req); } diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 0d622f9a3..303b753be 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -86,12 +86,10 @@ uint32_t VPPForwarderInterface::getMemifConfiguration() { void VPPForwarderInterface::consumerConnection() { hicn_consumer_input_params input = {0}; - hicn_consumer_output_params output; + hicn_consumer_output_params output = {0}; ip_address_t ip4_address; ip_address_t ip6_address; - std::memset(&output, 0, sizeof(hicn_consumer_output_params)); - output.src4 = &ip4_address; output.src6 = &ip6_address; @@ -120,7 +118,7 @@ void VPPForwarderInterface::producerConnection() { void VPPForwarderInterface::connect(bool is_consumer) { std::lock_guard<std::mutex> connection_lock(global_lock_); - srand(time(NULL)); + srand(time(nullptr)); int secret = rand() % (1 << 10); std::stringstream app_name; app_name << "Libtransport_" << secret; @@ -209,18 +207,17 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { void VPPForwarderInterface::closeConnection() { if (VPPForwarderInterface::api_) { + connector_.close(); + if (sw_if_index_ != uint32_t(~0)) { int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, sw_if_index_); - if (ret < 0) { TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); } } vpp_binary_api_destroy(VPPForwarderInterface::api_); - connector_.close(); - VPPForwarderInterface::api_ = nullptr; } } diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h index b47432460..1d7ad3cb1 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h @@ -104,9 +104,7 @@ class AsyncFullDuplexSocket : public AsyncSocket, AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); AsyncFullDuplexSocket(const core::Prefix &locator); - ~AsyncFullDuplexSocket() { - TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!"); - }; + ~AsyncFullDuplexSocket(){}; using ReadCallback = AsyncReader::ReadCallback; using WriteCallback = AsyncWriter::WriteCallback; diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index febe66853..ca9722849 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -62,25 +62,21 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) timer_interval_milliseconds_(0) { switch (protocol) { case TransportProtocolAlgorithms::CBR: - transport_protocol_ = std::make_shared<CbrTransportProtocol>(this); + transport_protocol_ = std::make_unique<CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: - transport_protocol_ = std::make_shared<RTCTransportProtocol>(this); + transport_protocol_ = std::make_unique<RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: - transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this); + transport_protocol_ = std::make_unique<RaaqmTransportProtocol>(this); break; } } ConsumerSocket::~ConsumerSocket() { stop(); - async_downloader_.stop(); - - transport_protocol_.reset(); - portal_.reset(); } void ConsumerSocket::connect() { portal_->connect(); } @@ -132,10 +128,9 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { + std::cout << "Stopping transport protocol " << std::endl; transport_protocol_->stop(); } - - // is_running_ = false; } void ConsumerSocket::resume() { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f1ad537c..a50aeb583 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -676,7 +676,7 @@ class ConsumerSocket : public BaseSocket { } protected: - std::shared_ptr<TransportProtocol> transport_protocol_; + std::unique_ptr<TransportProtocol> transport_protocol_; private: // context inner state variables diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 497c40c99..d89fc9367 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -15,6 +15,8 @@ #include <hicn/transport/interfaces/socket_producer.h> +#include <functional> + namespace transport { namespace interface { @@ -30,15 +32,12 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), output_buffer_(default_values::producer_socket_output_buffer_size), - async_thread_(), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), input_buffer_capacity_(default_values::producer_socket_input_buffer_size), input_buffer_size_(0), - processing_thread_stop_(false), - listening_thread_stop_(false), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -49,18 +48,10 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { - listening_thread_stop_ = false; -} + on_content_produced_(VOID_HANDLER) {} ProducerSocket::~ProducerSocket() { - processing_thread_stop_ = true; - portal_->stopEventsLoop(true); - - if (processing_thread_.joinable()) { - processing_thread_.join(); - } - + stop(); if (listening_thread_.joinable()) { listening_thread_.join(); } @@ -308,8 +299,8 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } + manifest->encode(); - // Time t0 = std::chrono::steady_clock::now(); identity_->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } @@ -324,32 +315,20 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, void ProducerSocket::asyncProduce(ContentObject &content_object) { if (!async_thread_.stopped()) { - // async_thread_.add(std::bind(&ProducerSocket::produce, this, - // content_object)); + auto co_ptr = std::static_pointer_cast<ContentObject>( + content_object.shared_from_this()); + async_thread_.add([this, content_object = std::move(co_ptr)]() { + produce(*content_object); + }); } } -// void ProducerSocket::asyncProduce(const Name &suffix, -// const uint8_t *buf, -// size_t buffer_size, -// AsyncProduceCallback && handler) { -// if (!async_thread_.stopped()) { -// async_thread_.add([this, buffer = buf, size = buffer_size, cb = -// std::move(handler)] () { -// uint64_t bytes_written = produce(suff, buffer, size, 0, false); -// auto ec = std::make_errc(0); -// cb(*this, ec, bytes_written); -// }); -// } -// } - void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size) { if (!async_thread_.stopped()) { - async_thread_.add( - [this, suff = suffix, buffer = buf, size = buffer_size]() { - produce(suff, buffer, size, true); - }); + async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() { + produce(suffix, buffer, size, 0, false); + }); } } diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 4f38fb30e..1fdbabe2e 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -525,6 +525,10 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_GET; } + private: + // Threads + std::thread listening_thread_; + protected: asio::io_service internal_io_service_; asio::io_service &io_service_; @@ -538,7 +542,6 @@ class ProducerSocket : public Socket<BasePortal>, private: utils::EventThread async_thread_; - int registration_status_; bool making_manifest_; @@ -560,12 +563,6 @@ class ProducerSocket : public Socket<BasePortal>, std::atomic_size_t input_buffer_capacity_; std::atomic_size_t input_buffer_size_; - // threads - std::thread listening_thread_; - std::thread processing_thread_; - volatile bool processing_thread_stop_; - volatile bool listening_thread_stop_; - // callbacks protected: ProducerInterestCallback on_interest_input_; diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index 6a3b23753..6911eada5 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -39,7 +39,7 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, public: TransportProtocol(interface::ConsumerSocket *icn_socket); - virtual ~TransportProtocol() { stop(); }; + virtual ~TransportProtocol() = default; TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc index 6df9e5656..a97e89500 100644 --- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc +++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc @@ -48,30 +48,6 @@ int EpollEventReactor::addFileDescriptor(int fd, uint32_t events) { return 0; } -int EpollEventReactor::addFileDescriptor(int fd, uint32_t events, - EventCallback &callback) { - auto it = event_callback_map_.find(fd); - - if (it == event_callback_map_.end()) { - event_callback_map_[fd] = callback; - return addFileDescriptor(fd, events); - } - - return 0; -} - -int EpollEventReactor::addFileDescriptor(int fd, uint32_t events, - EventCallback &&callback) { - auto it = event_callback_map_.find(fd); - - if (it == event_callback_map_.end()) { - event_callback_map_[fd] = callback; - return addFileDescriptor(fd, events); - } - - return 0; -} - int EpollEventReactor::modFileDescriptor(int fd, uint32_t events) { if (TRANSPORT_EXPECT_FALSE(fd < 0)) { TRANSPORT_LOGE("invalid fd %d", fd); @@ -109,6 +85,7 @@ int EpollEventReactor::delFileDescriptor(int fd) { return -1; } + utils::SpinLock::Acquire locked(event_callback_map_lock_); event_callback_map_.erase(fd); return 0; @@ -117,6 +94,8 @@ int EpollEventReactor::delFileDescriptor(int fd) { void EpollEventReactor::runEventLoop(int timeout) { Event evt[128]; int en = 0; + EventCallbackMap::iterator it; + EventCallback callback; // evt.events = EPOLLIN | EPOLLOUT; sigset_t sigset; @@ -134,11 +113,26 @@ void EpollEventReactor::runEventLoop(int timeout) { for (int i = 0; i < en; i++) { if (evt[i].data.fd > 0) { - auto it = event_callback_map_.find(evt[i].data.fd); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + it = event_callback_map_.find(evt[i].data.fd); + } + if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) { TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd); } else { - event_callback_map_[evt[i].data.fd](evt[i]); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + callback = event_callback_map_[evt[i].data.fd]; + } + + callback(evt[i]); + + // In the callback the epoll event reactor could have been stopped, + // then we need to check whether the event loop is still running. + if (TRANSPORT_EXPECT_FALSE(!run_event_loop_)) { + return; + } } } else { TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd); @@ -150,6 +144,8 @@ void EpollEventReactor::runEventLoop(int timeout) { void EpollEventReactor::runOneEvent() { Event evt; int en = 0; + EventCallbackMap::iterator it; + EventCallback callback; // evt.events = EPOLLIN | EPOLLOUT; sigset_t sigset; @@ -165,11 +161,20 @@ void EpollEventReactor::runOneEvent() { } if (TRANSPORT_EXPECT_TRUE(evt.data.fd > 0)) { - auto it = event_callback_map_.find(evt.data.fd); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + it = event_callback_map_.find(evt.data.fd); + } + if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) { TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd); } else { - event_callback_map_[evt.data.fd](evt); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + callback = event_callback_map_[evt.data.fd]; + } + + callback(evt); } } else { TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd); diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h index dbb87c6c5..04c10fc7e 100644 --- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h +++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h @@ -16,6 +16,7 @@ #pragma once #include <hicn/transport/utils/event_reactor.h> +#include <hicn/transport/utils/spinlock.h> #include <sys/epoll.h> #include <atomic> @@ -38,9 +39,22 @@ class EpollEventReactor : public EventReactor { ~EpollEventReactor(); - int addFileDescriptor(int fd, uint32_t events, EventCallback &callback); + template <typename EventHandler> + int addFileDescriptor(int fd, uint32_t events, EventHandler &&callback) { + auto it = event_callback_map_.find(fd); + int ret = 0; - int addFileDescriptor(int fd, uint32_t events, EventCallback &&callback); + if (it == event_callback_map_.end()) { + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + event_callback_map_[fd] = std::forward<EventHandler &&>(callback); + } + + ret = addFileDescriptor(fd, events); + } + + return ret; + } int delFileDescriptor(int fd); @@ -60,7 +74,7 @@ class EpollEventReactor : public EventReactor { int epoll_fd_; std::atomic_bool run_event_loop_; EventCallbackMap event_callback_map_; - std::mutex event_callback_map_mutex_; + utils::SpinLock event_callback_map_lock_; }; } // namespace utils |