diff options
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 27 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 20 | ||||
-rw-r--r-- | libtransport/src/implementation/tls_socket_consumer.cc | 6 |
3 files changed, 34 insertions, 19 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 488f238ba..175678209 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -17,12 +17,11 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/security/verifier.h> - +#include <hicn/transport/utils/event_thread.h> #include <protocols/cbr.h> #include <protocols/protocol.h> #include <protocols/raaqm.h> #include <protocols/rtc.h> -#include <utils/event_thread.h> namespace transport { namespace implementation { @@ -32,10 +31,11 @@ using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; class ConsumerSocket : public Socket<BasePortal> { - public: - ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + private: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + std::shared_ptr<Portal> &&portal) : consumer_interface_(consumer), - portal_(std::make_shared<Portal>()), + portal_(portal), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -83,6 +83,17 @@ class ConsumerSocket : public Socket<BasePortal> { } } + public: + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) + : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {} + + ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, + asio::io_service &io_service) + : ConsumerSocket(consumer, protocol, + std::make_shared<Portal>(io_service)) { + is_async_ = true; + } + ~ConsumerSocket() { stop(); async_downloader_.stop(); @@ -110,7 +121,7 @@ class ConsumerSocket : public Socket<BasePortal> { transport_protocol_->start(); - return CONSUMER_FINISHED; + return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; } virtual int asyncConsume(const Name &name) { @@ -632,6 +643,10 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = key_content_; break; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; + break; + default: return SOCKET_OPTION_NOT_GET; } diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 8c5c453fc..a6f0f969e 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -16,11 +16,9 @@ #pragma once #include <hicn/transport/security/signer.h> - +#include <hicn/transport/utils/event_thread.h> #include <implementation/socket.h> - #include <utils/content_store.h> -#include <utils/event_thread.h> #include <utils/suffix_strategy.h> #include <atomic> @@ -206,12 +204,15 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); // Send content objects stored in the queue while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD( + "Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } @@ -268,7 +269,8 @@ class ProducerSocket : public Socket<BasePortal>, signer->sign(*content_object); } passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_object->getName().toString().c_str()); } } @@ -283,11 +285,13 @@ class ProducerSocket : public Socket<BasePortal>, } passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); content_queue_.pop(); } } diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 7cf653848..1be6f41a7 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -14,7 +14,6 @@ */ #include <implementation/tls_socket_consumer.h> - #include <openssl/bio.h> #include <openssl/ssl.h> #include <openssl/tls1.h> @@ -304,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) { } if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { - is_async_ = true; - download_content(name); - }); + async_downloader_tls_.add([this, name]() { download_content(name); }); } return CONSUMER_RUNNING; |