aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-03-07 19:11:16 +0100
committerMauro Sardara <msardara@cisco.com>2019-03-08 13:32:22 +0100
commit6aaef596f68a514036d5212fc8697bdaf371e5af (patch)
treed09237bb6810c4aa5eff1a3033633e46bb44e3f6 /libtransport/src/hicn/transport/interfaces
parent3c6c43ef7bc7caa03540b2347e7f180d5b96ec23 (diff)
[HICN-99] Destroy in the correct order and in the correct event loop the attributes of connectors and sockets. Cleanup of prints.
Change-Id: Ie7eef1d186e581aa950f47df20d57681dc33be55 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.h4
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc13
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc47
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h11
5 files changed, 23 insertions, 54 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
index b47432460..1d7ad3cb1 100644
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
@@ -104,9 +104,7 @@ class AsyncFullDuplexSocket : public AsyncSocket,
AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service);
AsyncFullDuplexSocket(const core::Prefix &locator);
- ~AsyncFullDuplexSocket() {
- TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!");
- };
+ ~AsyncFullDuplexSocket(){};
using ReadCallback = AsyncReader::ReadCallback;
using WriteCallback = AsyncWriter::WriteCallback;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index febe66853..ca9722849 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -62,25 +62,21 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
timer_interval_milliseconds_(0) {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
- transport_protocol_ = std::make_shared<CbrTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<CbrTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RTC:
- transport_protocol_ = std::make_shared<RTCTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<RTCTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RAAQM:
default:
- transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this);
+ transport_protocol_ = std::make_unique<RaaqmTransportProtocol>(this);
break;
}
}
ConsumerSocket::~ConsumerSocket() {
stop();
-
async_downloader_.stop();
-
- transport_protocol_.reset();
- portal_.reset();
}
void ConsumerSocket::connect() { portal_->connect(); }
@@ -132,10 +128,9 @@ void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
void ConsumerSocket::stop() {
if (transport_protocol_->isRunning()) {
+ std::cout << "Stopping transport protocol " << std::endl;
transport_protocol_->stop();
}
-
- // is_running_ = false;
}
void ConsumerSocket::resume() {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 0f1ad537c..a50aeb583 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -676,7 +676,7 @@ class ConsumerSocket : public BaseSocket {
}
protected:
- std::shared_ptr<TransportProtocol> transport_protocol_;
+ std::unique_ptr<TransportProtocol> transport_protocol_;
private:
// context inner state variables
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 497c40c99..d89fc9367 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -15,6 +15,8 @@
#include <hicn/transport/interfaces/socket_producer.h>
+#include <functional>
+
namespace transport {
namespace interface {
@@ -30,15 +32,12 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
output_buffer_(default_values::producer_socket_output_buffer_size),
- async_thread_(),
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
signature_type_(SHA_256),
hash_algorithm_(HashAlgorithm::SHA_256),
input_buffer_capacity_(default_values::producer_socket_input_buffer_size),
input_buffer_size_(0),
- processing_thread_stop_(false),
- listening_thread_stop_(false),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -49,18 +48,10 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {
- listening_thread_stop_ = false;
-}
+ on_content_produced_(VOID_HANDLER) {}
ProducerSocket::~ProducerSocket() {
- processing_thread_stop_ = true;
- portal_->stopEventsLoop(true);
-
- if (processing_thread_.joinable()) {
- processing_thread_.join();
- }
-
+ stop();
if (listening_thread_.joinable()) {
listening_thread_.join();
}
@@ -308,8 +299,8 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf,
if (is_last_manifest) {
manifest->setFinalManifest(is_last_manifest);
}
+
manifest->encode();
- // Time t0 = std::chrono::steady_clock::now();
identity_->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
}
@@ -324,32 +315,20 @@ uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf,
void ProducerSocket::asyncProduce(ContentObject &content_object) {
if (!async_thread_.stopped()) {
- // async_thread_.add(std::bind(&ProducerSocket::produce, this,
- // content_object));
+ auto co_ptr = std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this());
+ async_thread_.add([this, content_object = std::move(co_ptr)]() {
+ produce(*content_object);
+ });
}
}
-// void ProducerSocket::asyncProduce(const Name &suffix,
-// const uint8_t *buf,
-// size_t buffer_size,
-// AsyncProduceCallback && handler) {
-// if (!async_thread_.stopped()) {
-// async_thread_.add([this, buffer = buf, size = buffer_size, cb =
-// std::move(handler)] () {
-// uint64_t bytes_written = produce(suff, buffer, size, 0, false);
-// auto ec = std::make_errc(0);
-// cb(*this, ec, bytes_written);
-// });
-// }
-// }
-
void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
size_t buffer_size) {
if (!async_thread_.stopped()) {
- async_thread_.add(
- [this, suff = suffix, buffer = buf, size = buffer_size]() {
- produce(suff, buffer, size, true);
- });
+ async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() {
+ produce(suffix, buffer, size, 0, false);
+ });
}
}
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 4f38fb30e..1fdbabe2e 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -525,6 +525,10 @@ class ProducerSocket : public Socket<BasePortal>,
return SOCKET_OPTION_GET;
}
+ private:
+ // Threads
+ std::thread listening_thread_;
+
protected:
asio::io_service internal_io_service_;
asio::io_service &io_service_;
@@ -538,7 +542,6 @@ class ProducerSocket : public Socket<BasePortal>,
private:
utils::EventThread async_thread_;
-
int registration_status_;
bool making_manifest_;
@@ -560,12 +563,6 @@ class ProducerSocket : public Socket<BasePortal>,
std::atomic_size_t input_buffer_capacity_;
std::atomic_size_t input_buffer_size_;
- // threads
- std::thread listening_thread_;
- std::thread processing_thread_;
- volatile bool processing_thread_stop_;
- volatile bool listening_thread_stop_;
-
// callbacks
protected:
ProducerInterestCallback on_interest_input_;