From 2fcb8ce4599ab94178aae1ac9f9ff800fd25cd0a Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Wed, 23 Oct 2019 15:41:59 +0200 Subject: [HICN-354] Fixed bug on raaqm when reassemblying packets Moved rescheduleOnIOService in the header file to allow its usage together with inheritance Change-Id: I15e4b92535e1478d0dd09828d2d13e2b77e000b3 Signed-off-by: Alberto Compagno --- .../hicn/transport/interfaces/socket_consumer.cc | 31 ----------- .../hicn/transport/interfaces/socket_consumer.h | 29 +++++++++- .../hicn/transport/interfaces/socket_producer.cc | 64 ---------------------- .../hicn/transport/interfaces/socket_producer.h | 41 +++++++++++--- libtransport/src/hicn/transport/protocols/raaqm.cc | 2 +- 5 files changed, 61 insertions(+), 106 deletions(-) (limited to 'libtransport/src') diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index 14cd27b6b..64b60101d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -121,37 +121,6 @@ asio::io_service &ConsumerSocket::getIoService() { return portal_->getIoService(); } -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template -int ConsumerSocket::rescheduleOnIOService(int socket_option_key, - arg2 socket_option_value, - Lambda lambda_func) { - // To enforce type check - std::function func = lambda_func; - int result = SOCKET_OPTION_SET; - if (transport_protocol_->isRunning()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, - &result, &done, &func]() { - std::unique_lock lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - }); - std::unique_lock lck(mtx); - if (!done) { - cv.wait(lck); - } - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - int ConsumerSocket::setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in case diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index e3620b269..eceee2d34 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -282,9 +282,36 @@ class ConsumerSocket : public BaseSocket { ConsumerTimerCallback **socket_option_value); protected: + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it template int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func); + Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } private: asio::io_service internal_io_service_; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index f90197490..8f8fc1a79 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -391,70 +391,6 @@ void ProducerSocket::onInterest(Interest &interest) { } } -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template -int ProducerSocket::rescheduleOnIOService(int socket_option_key, - arg2 socket_option_value, - Lambda lambda_func) { - // To enforce type check - std::function func = lambda_func; - int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, - &result, &done, &func]() { - std::unique_lock lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - }); - std::unique_lock lck(mtx); - if (!done) { - cv.wait(lck); - } - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template -int ProducerSocket::rescheduleOnIOServiceWithReference( - int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { - // To enforce type check - std::function func = lambda_func; - int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != this->listening_thread_.get_id()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - std::unique_lock lck(mtx); - bool done = false; - io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, - &cv, &result, &done, &func]() { - std::unique_lock lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } - }); - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - int ProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { switch (socket_option_key) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 5c617d761..709a2582b 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -160,15 +160,6 @@ class ProducerSocket : public Socket, virtual int getSocketOption(int socket_option_key, std::string &socket_option_value); - template - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func); - - template - int rescheduleOnIOServiceWithReference(int socket_option_key, - arg2 &socket_option_value, - Lambda lambda_func); - protected: // Threads std::thread listening_thread_; @@ -215,6 +206,38 @@ class ProducerSocket : public Socket, ProducerContentCallback on_content_produced_; + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it + template + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, + &mtx, &cv, &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + private: void listen(); diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index c816158f9..779f9a9a1 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -519,7 +519,7 @@ void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload) { + if (!on_payload) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " -- cgit 1.2.3-korg