aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_producer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.cc')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc148
1 files changed, 90 insertions, 58 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 4fef5d1e2..26a7208b6 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -14,7 +14,7 @@
*/
#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/utils/identity.h>
+#include <hicn/transport/utils/signer.h>
#include <functional>
@@ -35,6 +35,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
data_packet_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
output_buffer_(default_values::producer_socket_output_buffer_size),
+ async_thread_(),
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
hash_algorithm_(HashAlgorithm::SHA_256),
@@ -96,51 +97,47 @@ void ProducerSocket::listen() {
void ProducerSocket::passContentObjectToCallbacks(
const std::shared_ptr<ContentObject> &content_object) {
if (content_object) {
- if (on_new_segment_) {
- io_service_.dispatch([this, content_object]() {
+ io_service_.dispatch([this, content_object]() {
+ if (on_new_segment_) {
on_new_segment_(*this, *content_object);
- });
- }
+ }
- if (on_content_object_to_sign_) {
- io_service_.dispatch([this, content_object]() {
+ if (on_content_object_to_sign_) {
on_content_object_to_sign_(*this, *content_object);
- });
- }
+ }
- if (on_content_object_in_output_buffer_) {
- io_service_.dispatch([this, content_object]() {
+ if (on_content_object_in_output_buffer_) {
on_content_object_in_output_buffer_(*this, *content_object);
- });
- }
+ }
+ });
output_buffer_.insert(content_object);
- if (on_content_object_output_) {
- io_service_.dispatch([this, content_object]() {
+ io_service_.dispatch([this, content_object]() {
+ if (on_content_object_output_) {
on_content_object_output_(*this, *content_object);
- });
- }
+ }
+ });
portal_->sendContentObject(*content_object);
}
}
void ProducerSocket::produce(ContentObject &content_object) {
- if (on_content_object_in_output_buffer_) {
- io_service_.dispatch([this, &content_object]() {
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_in_output_buffer_) {
on_content_object_in_output_buffer_(*this, content_object);
- });
- }
+ }
+ });
output_buffer_.insert(std::static_pointer_cast<ContentObject>(
content_object.shared_from_this()));
- if (on_content_object_output_) {
- io_service_.dispatch([this, &content_object]() {
+ io_service_.dispatch([this, &content_object]() {
+ if (on_content_object_output_) {
on_content_object_output_(*this, content_object);
- });
- }
+ }
+ });
portal_->sendContentObject(content_object);
}
@@ -160,8 +157,8 @@ uint32_t ProducerSocket::produce(Name content_name,
bool making_manifest = making_manifest_;
auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
suffix_strategy_, start_offset);
- std::shared_ptr<utils::Identity> identity;
- getSocketOption(GeneralTransportOptions::IDENTITY, identity);
+ std::shared_ptr<utils::Signer> signer;
+ getSocketOption(GeneralTransportOptions::SIGNER, signer);
auto buffer_size = buffer->length();
int bytes_segmented = 0;
@@ -176,7 +173,7 @@ uint32_t ProducerSocket::produce(Name content_name,
bool is_last_manifest = false;
// TODO Manifest may still be used for indexing
- if (making_manifest && !identity) {
+ if (making_manifest && !signer) {
TRANSPORT_LOGD("Making manifests without setting producer identity.");
}
@@ -197,11 +194,11 @@ uint32_t ProducerSocket::produce(Name content_name,
format = hf_format;
if (making_manifest) {
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- identity ? hf_format_ah : hf_format,
- identity ? identity->getSignatureLength() : 0);
- } else if (identity) {
+ signer ? hf_format_ah : hf_format,
+ signer ? signer->getSignatureLength() : 0);
+ } else if (signer) {
format = hf_format_ah;
- signature_length = identity->getSignatureLength();
+ signature_length = signer->getSignatureLength();
}
header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
@@ -227,7 +224,7 @@ uint32_t ProducerSocket::produce(Name content_name,
content_name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
hash_algo, is_last_manifest, content_name, suffix_strategy_,
- identity ? identity->getSignatureLength() : 0));
+ signer ? signer->getSignatureLength() : 0));
manifest->setLifetime(content_object_expiry_time);
if (is_last) {
@@ -237,6 +234,7 @@ uint32_t ProducerSocket::produce(Name content_name,
}
}
+ TRANSPORT_LOGD("--------- START PRODUCE ----------");
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
if (making_manifest) {
@@ -246,15 +244,18 @@ uint32_t ProducerSocket::produce(Name content_name,
manifest->encode();
// If identity set, sign manifest
- if (identity) {
- identity->getSigner().sign(*manifest);
+ if (signer) {
+ signer->sign(*manifest);
}
passContentObjectToCallbacks(manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
content_queue_.pop();
}
@@ -266,7 +267,7 @@ uint32_t ProducerSocket::produce(Name content_name,
core::ManifestVersion::VERSION_1,
core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
content_name, suffix_strategy_,
- identity ? identity->getSignatureLength() : 0));
+ signer ? signer->getSignatureLength() : 0));
manifest->setLifetime(content_object_expiry_time);
manifest->setFinalBlockNumber(
@@ -307,10 +308,11 @@ uint32_t ProducerSocket::produce(Name content_name,
manifest->addSuffixHash(content_suffix, hash);
content_queue_.push(content_object);
} else {
- if (identity) {
- identity->getSigner().sign(*content_object);
+ if (signer) {
+ signer->sign(*content_object);
}
passContentObjectToCallbacks(content_object);
+ TRANSPORT_LOGD("Send content %u", content_object->getName().getSuffix());
}
}
@@ -320,24 +322,28 @@ uint32_t ProducerSocket::produce(Name content_name,
}
manifest->encode();
- if (identity) {
- identity->getSigner().sign(*manifest);
+ if (signer) {
+ signer->sign(*manifest);
}
passContentObjectToCallbacks(manifest);
+ TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %u",
+ content_queue_.front()->getName().getSuffix());
content_queue_.pop();
}
}
- if (on_content_produced_) {
- io_service_.dispatch([this, buffer_size]() {
+ io_service_.dispatch([this, buffer_size]() {
+ if (on_content_produced_) {
on_content_produced_(*this, std::make_error_code(std::errc(0)),
buffer_size);
- });
- }
+ }
+ });
+ TRANSPORT_LOGD("--------- END PRODUCE ------------");
return suffix_strategy->getTotalCount();
}
@@ -346,16 +352,42 @@ void ProducerSocket::asyncProduce(ContentObject &content_object) {
auto co_ptr = std::static_pointer_cast<ContentObject>(
content_object.shared_from_this());
async_thread_.add([this, content_object = std::move(co_ptr)]() {
- produce(*content_object);
+ ProducerSocket::produce(*content_object);
});
}
}
void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
- size_t buffer_size) {
+ size_t buffer_size, bool is_last,
+ uint32_t *start_offset) {
if (!async_thread_.stopped()) {
- async_thread_.add([this, suffix, buffer = buf, size = buffer_size]() {
- produce(suffix, buffer, size, 0, false);
+ async_thread_.add([this, suffix, buffer = buf, size = buffer_size, is_last,
+ start_offset]() {
+ if (start_offset != NULL) {
+ *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last,
+ *start_offset);
+ } else {
+ ProducerSocket::produce(suffix, buffer, size, is_last, 0);
+ }
+ });
+ }
+}
+
+void ProducerSocket::asyncProduce(Name content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t offset,
+ uint32_t **last_segment) {
+ if (!async_thread_.stopped()) {
+ auto a = buffer.release();
+ async_thread_.add([this, content_name, a, is_last, offset, last_segment]() {
+ auto buf = std::unique_ptr<utils::MemBuf>(a);
+ if (last_segment != NULL) {
+ **last_segment =
+ offset + ProducerSocket::produce(content_name, std::move(buf),
+ is_last, offset);
+ } else {
+ ProducerSocket::produce(content_name, std::move(buf), is_last, offset);
+ }
});
}
}
@@ -392,8 +424,8 @@ int ProducerSocket::setSocketOption(int socket_option_key,
if (socket_option_value < default_values::max_content_object_size &&
socket_option_value > 0) {
data_packet_size_ = socket_option_value;
- break;
}
+ break;
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
output_buffer_.setLimit(socket_option_value);
@@ -632,12 +664,12 @@ int ProducerSocket::setSocketOption(int socket_option_key,
int ProducerSocket::setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Identity> &socket_option_value) {
+ const std::shared_ptr<utils::Signer> &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY: {
- utils::SpinLock::Acquire locked(identity_lock_);
- identity_.reset();
- identity_ = socket_option_value;
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ signer_.reset();
+ signer_ = socket_option_value;
} break;
default:
return SOCKET_OPTION_NOT_SET;
@@ -844,11 +876,11 @@ int ProducerSocket::getSocketOption(int socket_option_key,
int ProducerSocket::getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Identity> &socket_option_value) {
+ std::shared_ptr<utils::Signer> &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY: {
- utils::SpinLock::Acquire locked(identity_lock_);
- socket_option_value = identity_;
+ case GeneralTransportOptions::SIGNER: {
+ utils::SpinLock::Acquire locked(signer_lock_);
+ socket_option_value = signer_;
} break;
default:
return SOCKET_OPTION_NOT_GET;