diff options
14 files changed, 154 insertions, 170 deletions
diff --git a/libtransport/src/hicn/transport/core/hicn_binary_api.c b/libtransport/src/hicn/transport/core/hicn_binary_api.c index d909578a2..3868c0a14 100644 --- a/libtransport/src/hicn/transport/core/hicn_binary_api.c +++ b/libtransport/src/hicn/transport/core/hicn_binary_api.c @@ -134,8 +134,6 @@ int hicn_binary_api_register_cons_app( CONTEXT_SAVE(context_store, api, mp) - TRANSPORT_LOGI("Message created"); - return vpp_binary_api_send_request_wait_reply(api->vpp_api, mp); } diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index cadf37e8c..d9051c23c 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -49,18 +49,19 @@ static constexpr uint32_t pool_size = 2048; class HandlerMemory { #ifdef __vpp__ - static constexpr std::size_t memory_size = 1024 * 512; + static constexpr std::size_t memory_size = 1024 * 1024; + public: - HandlerMemory() : index_(0) { } + HandlerMemory() : index_(0) {} - HandlerMemory(const HandlerMemory&) = delete; - HandlerMemory& operator=(const HandlerMemory&) = delete; + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { return &storage_[index_++ % memory_size]; } - TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { } + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {} private: // Storage space used for handler-based custom memory allocation. @@ -68,16 +69,16 @@ class HandlerMemory { uint32_t index_; #else public: - HandlerMemory() { } + HandlerMemory() {} - HandlerMemory(const HandlerMemory&) = delete; - HandlerMemory& operator=(const HandlerMemory&) = delete; + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void* allocate(std::size_t size) { + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { return ::operator new(size); } - TRANSPORT_ALWAYS_INLINE void deallocate(void* pointer) { + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { ::operator delete(pointer); } #endif @@ -90,34 +91,36 @@ class HandlerAllocator { public: using value_type = T; - explicit HandlerAllocator(HandlerMemory& mem) - : memory_(mem) {} + explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {} template <typename U> - HandlerAllocator(const HandlerAllocator<U>& other) noexcept - : memory_(other.memory_) { } + HandlerAllocator(const HandlerAllocator<U> &other) noexcept + : memory_(other.memory_) {} - TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator& other) const noexcept { + TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator &other) const + noexcept { return &memory_ == &other.memory_; } - TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator& other) const noexcept { + TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator &other) const + noexcept { return &memory_ != &other.memory_; } - TRANSPORT_ALWAYS_INLINE T* allocate(std::size_t n) const { - return static_cast<T*>(memory_.allocate(sizeof(T) * n)); + TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + return static_cast<T *>(memory_.allocate(sizeof(T) * n)); } - TRANSPORT_ALWAYS_INLINE void deallocate(T* p, std::size_t /*n*/) const { + TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } private: - template <typename> friend class HandlerAllocator; + template <typename> + friend class HandlerAllocator; // The underlying memory. - HandlerMemory& memory_; + HandlerMemory &memory_; }; // Wrapper class template for handler objects to allow handler memory @@ -129,35 +132,33 @@ class CustomAllocatorHandler { public: using allocator_type = HandlerAllocator<Handler>; - CustomAllocatorHandler(HandlerMemory& m, Handler h) - : memory_(m), - handler_(h) { } + CustomAllocatorHandler(HandlerMemory &m, Handler h) + : memory_(m), handler_(h) {} allocator_type get_allocator() const noexcept { return allocator_type(memory_); } - template <typename ...Args> - void operator()(Args&&... args) { + template <typename... Args> + void operator()(Args &&... args) { handler_(std::forward<Args>(args)...); } private: - HandlerMemory& memory_; + HandlerMemory &memory_; Handler handler_; }; // Helper function to wrap a handler object to add custom allocation. template <typename Handler> inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( - HandlerMemory& m, Handler h) { + HandlerMemory &m, Handler h) { return CustomAllocatorHandler<Handler>(m, h); } class Pool { public: - Pool(asio::io_service &io_service) - : io_service_(io_service) { + Pool(asio::io_service &io_service) : io_service_(io_service) { increasePendingInterestPool(); increaseInterestPool(); increaseContentObjectPool(); @@ -223,7 +224,7 @@ class Pool { asio::io_service &io_service_; }; -} +} // namespace portal_details using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest::Ptr>; @@ -266,6 +267,7 @@ class Portal { typename ForwarderInt::ConnectorType>, ForwarderInt>::value, "ForwarderInt must inherit from ForwarderInterface!"); + public: class ConsumerCallback { public: @@ -290,7 +292,7 @@ class Portal { std::bind(&Portal::setLocalRoutes, this), io_service_, app_name_), forwarder_interface_(connector_), - packet_pool_(io_service) { } + packet_pool_(io_service) {} void setConsumerCallback(ConsumerCallback *consumer_callback) { consumer_callback_ = consumer_callback; @@ -310,7 +312,7 @@ class Portal { forwarder_interface_.connect(is_consumer); } - ~Portal() { stopEventsLoop(true); } + ~Portal() { killConnection(); } TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { auto it = @@ -338,11 +340,9 @@ class Portal { std::move(on_content_object_callback)); pending_interest->setOnTimeoutCallback( std::move(on_interest_timeout_callback)); - pending_interest->startCountdown( - portal_details::makeCustomAllocatorHandler( - async_callback_memory_, - std::bind(&Portal<ForwarderInt>::timerHandler, this, - std::placeholders::_1, hash))); + pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, + this, std::placeholders::_1, hash))); pending_interest_hash_table_.emplace( std::make_pair(hash, std::move(pending_interest))); } @@ -398,15 +398,13 @@ class Portal { forwarder_interface_.send(content_object); } - TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) { - if (kill_connection) { - forwarder_interface_.closeConnection(); + TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { + if (!io_service_.stopped()) { + io_service_.dispatch([this]() { + clear(); + io_service_.stop(); + }); } - - io_service_.post([this]() { - clear(); - io_service_.stop(); - }); } TRANSPORT_ALWAYS_INLINE void killConnection() { connector_.close(); } @@ -489,12 +487,10 @@ class Portal { if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { interest_ptr->on_content_object_callback_( - std::move(interest_ptr->getInterest()), - std::move(content_object)); + std::move(interest_ptr->getInterest()), std::move(content_object)); } else if (consumer_callback_) { consumer_callback_->onContentObject( - std::move(interest_ptr->getInterest()), - std::move(content_object)); + std::move(interest_ptr->getInterest()), std::move(content_object)); } } else { TRANSPORT_LOGW("No pending interests for current content (%s)", diff --git a/libtransport/src/hicn/transport/core/vpp_binary_api.c b/libtransport/src/hicn/transport/core/vpp_binary_api.c index 09ffb2ec6..d54ef257e 100644 --- a/libtransport/src/hicn/transport/core/vpp_binary_api.c +++ b/libtransport/src/hicn/transport/core/vpp_binary_api.c @@ -173,7 +173,7 @@ void vpp_binary_api_send_receive_ping(vpp_binary_api_t *api) { CONTEXT_SAVE(context_store, api, mp_ping); - TRANSPORT_LOGI("Sending ping id %u", mp_ping->_vl_msg_id); + TRANSPORT_LOGD("Sending ping id %u", mp_ping->_vl_msg_id); vpp_binary_api_send_request_wait_reply(api, mp_ping); } @@ -202,7 +202,7 @@ int vpp_binary_api_set_int_state(vpp_binary_api_t *api, uint32_t sw_index, CONTEXT_SAVE(context_store, api, mp); - TRANSPORT_LOGI("Sending set int flags id %u", mp->_vl_msg_id); + TRANSPORT_LOGD("Sending set int flags id %u", mp->_vl_msg_id); return vpp_binary_api_send_request_wait_reply(api, mp); } @@ -211,7 +211,7 @@ void vpp_binary_api_send_request(vpp_binary_api_t *api, void *request) { vl_generic_request_t *req = NULL; req = (vl_generic_request_t *)request; - TRANSPORT_LOGI("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id)); + TRANSPORT_LOGD("Sending a request to VPP (id=%d).\n", ntohs(req->_vl_msg_id)); S(api, req); } diff --git a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc index 0d622f9a3..303b753be 100644 --- a/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc +++ b/libtransport/src/hicn/transport/core/vpp_forwarder_interface.cc @@ -86,12 +86,10 @@ uint32_t VPPForwarderInterface::getMemifConfiguration() { void VPPForwarderInterface::consumerConnection() { hicn_consumer_input_params input = {0}; - hicn_consumer_output_params output; + hicn_consumer_output_params output = {0}; ip_address_t ip4_address; ip_address_t ip6_address; - std::memset(&output, 0, sizeof(hicn_consumer_output_params)); - output.src4 = &ip4_address; output.src6 = &ip6_address; @@ -120,7 +118,7 @@ void VPPForwarderInterface::producerConnection() { void VPPForwarderInterface::connect(bool is_consumer) { std::lock_guard<std::mutex> connection_lock(global_lock_); - srand(time(NULL)); + srand(time(nullptr)); int secret = rand() % (1 << 10); std::stringstream app_name; app_name << "Libtransport_" << secret; @@ -209,18 +207,17 @@ void VPPForwarderInterface::registerRoute(Prefix &prefix) { void VPPForwarderInterface::closeConnection() { if (VPPForwarderInterface::api_) { + connector_.close(); + if (sw_if_index_ != uint32_t(~0)) { int ret = memif_binary_api_delete_memif(VPPForwarderInterface::memif_api_, sw_if_index_); - if (ret < 0) { TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); } } vpp_binary_api_destroy(VPPForwarderInterface::api_); - connector_.close(); - VPPForwarderInterface::api_ = nullptr; } } diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h index b47432460..1d7ad3cb1 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h @@ -104,9 +104,7 @@ class AsyncFullDuplexSocket : public AsyncSocket, AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); AsyncFullDuplexSocket(const core::Prefix &locator); - ~AsyncFullDuplexSocket() { - TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!"); - }; + ~AsyncFullDuplexSocket(){}; using ReadCallback = AsyncReader::ReadCallback; using WriteCallback = AsyncWriter::WriteCallback; diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index febe66853..ca9722849 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -62,25 +62,21 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) timer_interval_milliseconds_(0) { switch (protocol) { case TransportProtocolAlgorithms::CBR: - transport_protocol_ = std::make_shared<CbrTransportProtocol>(this); + transport_protocol_ = std::make_unique<CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: - transport_protocol_ = std::make_shared<RTCTransportProtocol>(this); + transport_protocol_ = std::make_unique<RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: - transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this); + transport_protocol_ = std::make_unique<RaaqmTransportProtocol>(this); break; } } ConsumerSocket::~ConsumerSocket() { stop(); - async_downloader_.stop(); - - transport_protocol_.reset(); - portal_.reset(); } void ConsumerSocket::connect() { portal_->connect(); } @@ -132,10 +128,9 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { + std::cout << "Stopping transport protocol " << std::endl; transport_protocol_->stop(); } - - // is_running_ = false; } void ConsumerSocket::resume() { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f1ad537c..a50aeb583 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -676,7 +676,7 @@ class ConsumerSocket : public BaseSocket { } protected: - std::shared_ptr<TransportProtocol> transport_protocol_; + std::unique_ptr<TransportProtocol> transport_protocol_; private: // context inner state variables diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 497c40c99..d89fc9367 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -15,6 +15,8 @@ #include <hicn/transport/interfaces/socket_producer.h> +#include <functional> + namespace transport { namespace interface { @@ -30,15 +32,12 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), output_buffer_(default_values::producer_socket_output_buffer_size), - async_thread_(), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), input_buffer_capacity_(default_values::producer_socket_input_buffer_size), input_buffer_size_(0), - processing_thread_stop_(false), - listening_thread_stop_(false), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -49,18 +48,10 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { - listening_thread_stop_ = false; -} + on_content_produced_(VOID_HANDLER) {} ProducerSocket::~ProducerSocket() { - processing_thread_stop_ = true; - portal_->stopEventsLoop(true); - - if (processing_thread_.joinable()) { - processing_thread_.join(); - } - + stop(); if (listening_thread_.joinable()) { listening_thread_.join(); } @@ -308,8 +299,8 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } + manifest->encode(); - // Time t0 = std::chrono::steady_clock::now(); identity_->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } @@ -324,32 +315,20 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, void ProducerSocket::asyncProduce(ContentObject &content_object) { if (!async_thread_.stopped()) { - // async_thread_.add(std::bind(&ProducerSocket::produce, this, - // content_object)); + auto co_ptr = std::static_pointer_cast<ContentObject>( + content_object.shared_from_this()); + async_thread_.add([this, content_object = std::move(co_ptr)]() { + produce(*content_object); + }); } } -// void ProducerSocket::asyncProduce(const Name &suffix, -// const uint8_t *buf, -// size_t buffer_size, -// AsyncProduceCallback && handler) { -// if (!async_thread_.stopped()) { -// async_thread_.add([this, buffer = buf, size = buffer_size, cb = -// std::move(handler)] () { -// uint64_t bytes_written = produce(suff, buffer, size, 0, false); -// auto ec = std::make_errc(0); -// cb(*this, ec, bytes_written); -// }); -// } -// } - void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size) { if (!async_thread_.stopped()) { - async_thread_.add( - [this, suff = suffix, buffer = buf, size = buffer_size]() { - produce(suff, buffer, size, true); - }); + async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() { + produce(suffix, buffer, size, 0, false); + }); } } diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 4f38fb30e..1fdbabe2e 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -525,6 +525,10 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_GET; } + private: + // Threads + std::thread listening_thread_; + protected: asio::io_service internal_io_service_; asio::io_service &io_service_; @@ -538,7 +542,6 @@ class ProducerSocket : public Socket<BasePortal>, private: utils::EventThread async_thread_; - int registration_status_; bool making_manifest_; @@ -560,12 +563,6 @@ class ProducerSocket : public Socket<BasePortal>, std::atomic_size_t input_buffer_capacity_; std::atomic_size_t input_buffer_size_; - // threads - std::thread listening_thread_; - std::thread processing_thread_; - volatile bool processing_thread_stop_; - volatile bool listening_thread_stop_; - // callbacks protected: ProducerInterestCallback on_interest_input_; diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index 6a3b23753..6911eada5 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -39,7 +39,7 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, public: TransportProtocol(interface::ConsumerSocket *icn_socket); - virtual ~TransportProtocol() { stop(); }; + virtual ~TransportProtocol() = default; TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc index 6df9e5656..a97e89500 100644 --- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc +++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.cc @@ -48,30 +48,6 @@ int EpollEventReactor::addFileDescriptor(int fd, uint32_t events) { return 0; } -int EpollEventReactor::addFileDescriptor(int fd, uint32_t events, - EventCallback &callback) { - auto it = event_callback_map_.find(fd); - - if (it == event_callback_map_.end()) { - event_callback_map_[fd] = callback; - return addFileDescriptor(fd, events); - } - - return 0; -} - -int EpollEventReactor::addFileDescriptor(int fd, uint32_t events, - EventCallback &&callback) { - auto it = event_callback_map_.find(fd); - - if (it == event_callback_map_.end()) { - event_callback_map_[fd] = callback; - return addFileDescriptor(fd, events); - } - - return 0; -} - int EpollEventReactor::modFileDescriptor(int fd, uint32_t events) { if (TRANSPORT_EXPECT_FALSE(fd < 0)) { TRANSPORT_LOGE("invalid fd %d", fd); @@ -109,6 +85,7 @@ int EpollEventReactor::delFileDescriptor(int fd) { return -1; } + utils::SpinLock::Acquire locked(event_callback_map_lock_); event_callback_map_.erase(fd); return 0; @@ -117,6 +94,8 @@ int EpollEventReactor::delFileDescriptor(int fd) { void EpollEventReactor::runEventLoop(int timeout) { Event evt[128]; int en = 0; + EventCallbackMap::iterator it; + EventCallback callback; // evt.events = EPOLLIN | EPOLLOUT; sigset_t sigset; @@ -134,11 +113,26 @@ void EpollEventReactor::runEventLoop(int timeout) { for (int i = 0; i < en; i++) { if (evt[i].data.fd > 0) { - auto it = event_callback_map_.find(evt[i].data.fd); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + it = event_callback_map_.find(evt[i].data.fd); + } + if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) { TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd); } else { - event_callback_map_[evt[i].data.fd](evt[i]); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + callback = event_callback_map_[evt[i].data.fd]; + } + + callback(evt[i]); + + // In the callback the epoll event reactor could have been stopped, + // then we need to check whether the event loop is still running. + if (TRANSPORT_EXPECT_FALSE(!run_event_loop_)) { + return; + } } } else { TRANSPORT_LOGE("unexpected event. fd %d", evt[i].data.fd); @@ -150,6 +144,8 @@ void EpollEventReactor::runEventLoop(int timeout) { void EpollEventReactor::runOneEvent() { Event evt; int en = 0; + EventCallbackMap::iterator it; + EventCallback callback; // evt.events = EPOLLIN | EPOLLOUT; sigset_t sigset; @@ -165,11 +161,20 @@ void EpollEventReactor::runOneEvent() { } if (TRANSPORT_EXPECT_TRUE(evt.data.fd > 0)) { - auto it = event_callback_map_.find(evt.data.fd); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + it = event_callback_map_.find(evt.data.fd); + } + if (TRANSPORT_EXPECT_FALSE(it == event_callback_map_.end())) { TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd); } else { - event_callback_map_[evt.data.fd](evt); + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + callback = event_callback_map_[evt.data.fd]; + } + + callback(evt); } } else { TRANSPORT_LOGE("unexpected event. fd %d", evt.data.fd); diff --git a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h index dbb87c6c5..04c10fc7e 100644 --- a/libtransport/src/hicn/transport/utils/epoll_event_reactor.h +++ b/libtransport/src/hicn/transport/utils/epoll_event_reactor.h @@ -16,6 +16,7 @@ #pragma once #include <hicn/transport/utils/event_reactor.h> +#include <hicn/transport/utils/spinlock.h> #include <sys/epoll.h> #include <atomic> @@ -38,9 +39,22 @@ class EpollEventReactor : public EventReactor { ~EpollEventReactor(); - int addFileDescriptor(int fd, uint32_t events, EventCallback &callback); + template <typename EventHandler> + int addFileDescriptor(int fd, uint32_t events, EventHandler &&callback) { + auto it = event_callback_map_.find(fd); + int ret = 0; - int addFileDescriptor(int fd, uint32_t events, EventCallback &&callback); + if (it == event_callback_map_.end()) { + { + utils::SpinLock::Acquire locked(event_callback_map_lock_); + event_callback_map_[fd] = std::forward<EventHandler &&>(callback); + } + + ret = addFileDescriptor(fd, events); + } + + return ret; + } int delFileDescriptor(int fd); @@ -60,7 +74,7 @@ class EpollEventReactor : public EventReactor { int epoll_fd_; std::atomic_bool run_event_loop_; EventCallbackMap event_callback_map_; - std::mutex event_callback_map_mutex_; + utils::SpinLock event_callback_map_lock_; }; } // namespace utils diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 1b6fc67af..860e7a000 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -114,7 +114,8 @@ class HIperfClient { void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, const std::error_code &ec) { Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = std::chrono::duration_cast<TimeDuration>(t2 - t_download_); + TimeDuration dt = + std::chrono::duration_cast<TimeDuration>(t2 - t_download_); long usec = (long)dt.count(); std::cout << "Content retrieved. Size: " << bytes_transferred << " [Bytes]" @@ -139,16 +140,19 @@ class HIperfClient { void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} - void handleTimerExpiration(ConsumerSocket &c, const protocol::TransportStatistics &stats) { + void handleTimerExpiration(ConsumerSocket &c, + const protocol::TransportStatistics &stats) { const char separator = ' '; const int width = 20; utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); + auto exact_duration = + std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); std::stringstream interval; interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + exact_duration.count() / 1000; + << total_duration_milliseconds_ / 1000 + + exact_duration.count() / 1000; std::stringstream bytes_transferred; bytes_transferred << std::fixed << std::setprecision(3) @@ -161,7 +165,8 @@ class HIperfClient { << std::setfill(separator) << "[Mbps]"; std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) << "[Interest]"; + window << stats.getAverageWindowSize() << std::setfill(separator) + << "[Interest]"; std::stringstream avg_rtt; avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[us]"; @@ -279,9 +284,9 @@ class HIperfClient { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::STATS_SUMMARY, - (ConsumerTimerCallback)std::bind( - &HIperfClient::handleTimerExpiration, this, std::placeholders::_1, - std::placeholders::_2)); + (ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration, + this, std::placeholders::_1, + std::placeholders::_2)); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; @@ -317,13 +322,13 @@ class HIperfClient { private: ClientConfiguration configuration_; - std::unique_ptr<ConsumerSocket> consumer_socket_; Time t_stats_; Time t_download_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; asio::io_service io_service_; asio::signal_set signals_; + std::unique_ptr<ConsumerSocket> consumer_socket_; }; class HIperfServer { @@ -393,15 +398,16 @@ class HIperfServer { << std::endl; } - std::shared_ptr<utils::Identity> setProducerIdentity(std::string &keystore_name, - std::string &keystore_password, - HashAlgorithm &hash_algorithm) { + std::shared_ptr<utils::Identity> setProducerIdentity( + std::string &keystore_name, std::string &keystore_password, + HashAlgorithm &hash_algorithm) { if (access(keystore_name.c_str(), F_OK) != -1) { - return std::make_shared<utils::Identity>(keystore_name, keystore_password, hash_algorithm); + return std::make_shared<utils::Identity>(keystore_name, keystore_password, + hash_algorithm); } else { return std::make_shared<utils::Identity>(keystore_name, keystore_password, - CryptoSuite::RSA_SHA256, 1024, 365, - "producer-test"); + CryptoSuite::RSA_SHA256, 1024, + 365, "producer-test"); } } @@ -412,8 +418,8 @@ class HIperfServer { if (configuration_.sign) { auto identity = setProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); + configuration_.keystore_password, + configuration_.hash_algorithm); if (producer_socket_->setSocketOption(GeneralTransportOptions::IDENTITY, identity) == @@ -495,12 +501,12 @@ class HIperfServer { private: ServerConfiguration configuration_; - std::unique_ptr<ProducerSocket> producer_socket_; asio::io_service io_service_; asio::signal_set signals_; std::vector<std::shared_ptr<ContentObject>> content_objects_; std::uint16_t content_objects_index_; std::uint16_t mask_; + std::unique_ptr<ProducerSocket> producer_socket_; }; void usage() { @@ -545,8 +551,7 @@ void usage() { "parameter" << std::endl; std::cerr << "-M\t<Download for real>\t\t" - << "Store the content downloaded." - << std::endl; + << "Store the content downloaded." << std::endl; std::cerr << "-W\t<window_size>\t\t\t" << "Use a fixed congestion window " "for retrieving the data." diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc index d31147c70..3811c6db8 100644 --- a/utils/src/ping_client.cc +++ b/utils/src/ping_client.cc @@ -273,7 +273,7 @@ class Client : interface::BasePortal::ConsumerCallback { std::cout << "Stop ping" << std::endl; std::cout << "Sent: " << sent_ << " Received: " << received_ << " Timeouts: " << timedout_ << std::endl; - portal_.stopEventsLoop(true); + portal_.stopEventsLoop(); } void reset() { |