From 6aaef596f68a514036d5212fc8697bdaf371e5af Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Thu, 7 Mar 2019 19:11:16 +0100 Subject: [HICN-99] Destroy in the correct order and in the correct event loop the attributes of connectors and sockets. Cleanup of prints. Change-Id: Ie7eef1d186e581aa950f47df20d57681dc33be55 Signed-off-by: Mauro Sardara --- .../hicn/transport/interfaces/full_duplex_socket.h | 4 +- .../hicn/transport/interfaces/socket_consumer.cc | 13 ++---- .../hicn/transport/interfaces/socket_consumer.h | 2 +- .../hicn/transport/interfaces/socket_producer.cc | 47 ++++++---------------- .../hicn/transport/interfaces/socket_producer.h | 11 ++--- 5 files changed, 23 insertions(+), 54 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h index b47432460..1d7ad3cb1 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h @@ -104,9 +104,7 @@ class AsyncFullDuplexSocket : public AsyncSocket, AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); AsyncFullDuplexSocket(const core::Prefix &locator); - ~AsyncFullDuplexSocket() { - TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!"); - }; + ~AsyncFullDuplexSocket(){}; using ReadCallback = AsyncReader::ReadCallback; using WriteCallback = AsyncWriter::WriteCallback; diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index febe66853..ca9722849 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -62,25 +62,21 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) timer_interval_milliseconds_(0) { switch (protocol) { case TransportProtocolAlgorithms::CBR: - transport_protocol_ = std::make_shared(this); + transport_protocol_ = std::make_unique(this); break; case TransportProtocolAlgorithms::RTC: - transport_protocol_ = std::make_shared(this); + transport_protocol_ = std::make_unique(this); break; case TransportProtocolAlgorithms::RAAQM: default: - transport_protocol_ = std::make_shared(this); + transport_protocol_ = std::make_unique(this); break; } } ConsumerSocket::~ConsumerSocket() { stop(); - async_downloader_.stop(); - - transport_protocol_.reset(); - portal_.reset(); } void ConsumerSocket::connect() { portal_->connect(); } @@ -132,10 +128,9 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { + std::cout << "Stopping transport protocol " << std::endl; transport_protocol_->stop(); } - - // is_running_ = false; } void ConsumerSocket::resume() { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f1ad537c..a50aeb583 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -676,7 +676,7 @@ class ConsumerSocket : public BaseSocket { } protected: - std::shared_ptr transport_protocol_; + std::unique_ptr transport_protocol_; private: // context inner state variables diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 497c40c99..d89fc9367 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -15,6 +15,8 @@ #include +#include + namespace transport { namespace interface { @@ -30,15 +32,12 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), output_buffer_(default_values::producer_socket_output_buffer_size), - async_thread_(), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), input_buffer_capacity_(default_values::producer_socket_input_buffer_size), input_buffer_size_(0), - processing_thread_stop_(false), - listening_thread_stop_(false), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -49,18 +48,10 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { - listening_thread_stop_ = false; -} + on_content_produced_(VOID_HANDLER) {} ProducerSocket::~ProducerSocket() { - processing_thread_stop_ = true; - portal_->stopEventsLoop(true); - - if (processing_thread_.joinable()) { - processing_thread_.join(); - } - + stop(); if (listening_thread_.joinable()) { listening_thread_.join(); } @@ -308,8 +299,8 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } + manifest->encode(); - // Time t0 = std::chrono::steady_clock::now(); identity_->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } @@ -324,32 +315,20 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, void ProducerSocket::asyncProduce(ContentObject &content_object) { if (!async_thread_.stopped()) { - // async_thread_.add(std::bind(&ProducerSocket::produce, this, - // content_object)); + auto co_ptr = std::static_pointer_cast( + content_object.shared_from_this()); + async_thread_.add([this, content_object = std::move(co_ptr)]() { + produce(*content_object); + }); } } -// void ProducerSocket::asyncProduce(const Name &suffix, -// const uint8_t *buf, -// size_t buffer_size, -// AsyncProduceCallback && handler) { -// if (!async_thread_.stopped()) { -// async_thread_.add([this, buffer = buf, size = buffer_size, cb = -// std::move(handler)] () { -// uint64_t bytes_written = produce(suff, buffer, size, 0, false); -// auto ec = std::make_errc(0); -// cb(*this, ec, bytes_written); -// }); -// } -// } - void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size) { if (!async_thread_.stopped()) { - async_thread_.add( - [this, suff = suffix, buffer = buf, size = buffer_size]() { - produce(suff, buffer, size, true); - }); + async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() { + produce(suffix, buffer, size, 0, false); + }); } } diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 4f38fb30e..1fdbabe2e 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -525,6 +525,10 @@ class ProducerSocket : public Socket, return SOCKET_OPTION_GET; } + private: + // Threads + std::thread listening_thread_; + protected: asio::io_service internal_io_service_; asio::io_service &io_service_; @@ -538,7 +542,6 @@ class ProducerSocket : public Socket, private: utils::EventThread async_thread_; - int registration_status_; bool making_manifest_; @@ -560,12 +563,6 @@ class ProducerSocket : public Socket, std::atomic_size_t input_buffer_capacity_; std::atomic_size_t input_buffer_size_; - // threads - std::thread listening_thread_; - std::thread processing_thread_; - volatile bool processing_thread_stop_; - volatile bool listening_thread_stop_; - // callbacks protected: ProducerInterestCallback on_interest_input_; -- cgit 1.2.3-korg