summaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r--libtransport/src/implementation/socket_consumer.h27
-rw-r--r--libtransport/src/implementation/socket_producer.h20
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc6
3 files changed, 34 insertions, 19 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 488f238ba..175678209 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -17,12 +17,11 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/security/verifier.h>
-
+#include <hicn/transport/utils/event_thread.h>
#include <protocols/cbr.h>
#include <protocols/protocol.h>
#include <protocols/raaqm.h>
#include <protocols/rtc.h>
-#include <utils/event_thread.h>
namespace transport {
namespace implementation {
@@ -32,10 +31,11 @@ using namespace interface;
using ReadCallback = interface::ConsumerSocket::ReadCallback;
class ConsumerSocket : public Socket<BasePortal> {
- public:
- ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
+ private:
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
+ std::shared_ptr<Portal> &&portal)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>()),
+ portal_(portal),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -83,6 +83,17 @@ class ConsumerSocket : public Socket<BasePortal> {
}
}
+ public:
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
+ : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {}
+
+ ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
+ asio::io_service &io_service)
+ : ConsumerSocket(consumer, protocol,
+ std::make_shared<Portal>(io_service)) {
+ is_async_ = true;
+ }
+
~ConsumerSocket() {
stop();
async_downloader_.stop();
@@ -110,7 +121,7 @@ class ConsumerSocket : public Socket<BasePortal> {
transport_protocol_->start();
- return CONSUMER_FINISHED;
+ return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED;
}
virtual int asyncConsume(const Name &name) {
@@ -632,6 +643,10 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = key_content_;
break;
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 8c5c453fc..a6f0f969e 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -16,11 +16,9 @@
#pragma once
#include <hicn/transport/security/signer.h>
-
+#include <hicn/transport/utils/event_thread.h>
#include <implementation/socket.h>
-
#include <utils/content_store.h>
-#include <utils/event_thread.h>
#include <utils/suffix_strategy.h>
#include <atomic>
@@ -206,12 +204,15 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+ TRANSPORT_LOGD("Send manifest %s",
+ manifest->getName().toString().c_str());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
+ TRANSPORT_LOGD(
+ "Send content %s",
+ content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
@@ -268,7 +269,8 @@ class ProducerSocket : public Socket<BasePortal>,
signer->sign(*content_object);
}
passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str());
+ TRANSPORT_LOGD("Send content %s",
+ content_object->getName().toString().c_str());
}
}
@@ -283,11 +285,13 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+ TRANSPORT_LOGD("Send manifest %s",
+ manifest->getName().toString().c_str());
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
+ TRANSPORT_LOGD("Send content %s",
+ content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
}
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 7cf653848..1be6f41a7 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -14,7 +14,6 @@
*/
#include <implementation/tls_socket_consumer.h>
-
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/tls1.h>
@@ -304,10 +303,7 @@ int TLSConsumerSocket::asyncConsume(const Name &name) {
}
if (!async_downloader_tls_.stopped()) {
- async_downloader_tls_.add([this, name]() {
- is_async_ = true;
- download_content(name);
- });
+ async_downloader_tls_.add([this, name]() { download_content(name); });
}
return CONSUMER_RUNNING;