aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_bytestream.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_bytestream.cc')
-rw-r--r--libtransport/src/protocols/prod_protocol_bytestream.cc74
1 files changed, 33 insertions, 41 deletions
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc
index 6bd989fe4..f659cb37c 100644
--- a/libtransport/src/protocols/prod_protocol_bytestream.cc
+++ b/libtransport/src/protocols/prod_protocol_bytestream.cc
@@ -87,11 +87,6 @@ uint32_t ByteStreamProductionProtocol::produceStream(
auth::CryptoHashType hash_algo;
socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo);
- // Use manifest
- bool making_manifest;
- socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest);
-
// Suffix calculation strategy
core::NextSegmentCalculationStrategy _suffix_strategy;
socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY,
@@ -99,9 +94,6 @@ uint32_t ByteStreamProductionProtocol::produceStream(
auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
_suffix_strategy, start_offset);
- std::shared_ptr<auth::Signer> signer;
- socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer);
-
auto buffer_size = buffer->length();
int bytes_segmented = 0;
std::size_t header_size;
@@ -115,8 +107,8 @@ uint32_t ByteStreamProductionProtocol::produceStream(
bool is_last_manifest = false;
// TODO Manifest may still be used for indexing
- if (making_manifest && !signer) {
- TRANSPORT_LOGE("Making manifests without setting producer identity.");
+ if (making_manifest_ && !signer_) {
+ LOG(FATAL) << "Making manifests without setting producer identity.";
}
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
@@ -134,13 +126,13 @@ uint32_t ByteStreamProductionProtocol::produceStream(
}
format = hf_format;
- if (making_manifest) {
+ if (making_manifest_) {
manifest_header_size = core::Packet::getHeaderSizeFromFormat(
- signer ? hf_format_ah : hf_format,
- signer ? signer->getSignatureSize() : 0);
- } else if (signer) {
+ signer_ ? hf_format_ah : hf_format,
+ signer_ ? signer_->getSignatureFieldSize() : 0);
+ } else if (signer_) {
format = hf_format_ah;
- signature_length = signer->getSignatureSize();
+ signature_length = signer_->getSignatureFieldSize();
}
header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
@@ -152,7 +144,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
}
// TODO allocate space for all the headers
- if (making_manifest) {
+ if (making_manifest_) {
uint32_t segment_in_manifest = static_cast<uint32_t>(
std::floor(double(data_packet_size - manifest_header_size -
ContentObjectManifest::getManifestHeaderSize()) /
@@ -166,7 +158,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
hash_algo, is_last_manifest, name, _suffix_strategy,
- signer ? signer->getSignatureSize() : 0));
+ signer_ ? signer_->getSignatureFieldSize() : 0));
manifest->setLifetime(content_object_expiry_time);
if (is_last) {
@@ -178,27 +170,26 @@ uint32_t ByteStreamProductionProtocol::produceStream(
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
- if (making_manifest) {
+ if (making_manifest_) {
if (manifest->estimateManifestSize(2) >
data_packet_size - manifest_header_size) {
manifest->encode();
// If identity set, sign manifest
- if (signer) {
- signer->signPacket(manifest.get());
+ if (signer_) {
+ signer_->signPacket(manifest.get());
}
// Send the current manifest
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s",
- manifest->getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName();
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s",
- content_queue_.front()->getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content " << content_queue_.front()->getName();
content_queue_.pop();
}
@@ -209,7 +200,8 @@ uint32_t ByteStreamProductionProtocol::produceStream(
name.setSuffix(suffix_strategy->getNextManifestSuffix()),
core::ManifestVersion::VERSION_1,
core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
- name, _suffix_strategy, signer ? signer->getSignatureSize() : 0));
+ name, _suffix_strategy,
+ signer_ ? signer_->getSignatureFieldSize() : 0));
manifest->setLifetime(content_object_expiry_time);
manifest->setFinalBlockNumber(
@@ -221,7 +213,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
auto content_suffix = suffix_strategy->getNextContentSuffix();
auto content_object = std::make_shared<ContentObject>(
name.setSuffix(content_suffix), format,
- signer && !making_manifest ? signer->getSignatureSize() : 0);
+ signer_ && !making_manifest_ ? signer_->getSignatureFieldSize() : 0);
content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
@@ -232,7 +224,7 @@ uint32_t ByteStreamProductionProtocol::produceStream(
b->append(buffer_size - bytes_segmented);
bytes_segmented += (int)(buffer_size - bytes_segmented);
- if (is_last && making_manifest) {
+ if (is_last && making_manifest_) {
is_last_manifest = true;
} else if (is_last) {
content_object->setRst();
@@ -245,39 +237,39 @@ uint32_t ByteStreamProductionProtocol::produceStream(
content_object->appendPayload(std::move(b));
- if (making_manifest) {
+ if (making_manifest_) {
using namespace std::chrono_literals;
auth::CryptoHash hash = content_object->computeDigest(hash_algo);
manifest->addSuffixHash(content_suffix, hash);
content_queue_.push(content_object);
} else {
- if (signer) {
- signer->signPacket(content_object.get());
+ if (signer_) {
+ signer_->signPacket(content_object.get());
}
passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %s",
- content_object->getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content " << content_object->getName();
}
}
- if (making_manifest) {
+ if (making_manifest_) {
if (is_last_manifest) {
manifest->setFinalManifest(is_last_manifest);
}
manifest->encode();
- if (signer) {
- signer->signPacket(manifest.get());
+ if (signer_) {
+ signer_->signPacket(manifest.get());
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName();
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %s",
- content_queue_.front()->getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content " << content_queue_.front()->getName();
content_queue_.pop();
}
}
@@ -356,14 +348,14 @@ void ByteStreamProductionProtocol::passContentObjectToCallbacks(
}
void ByteStreamProductionProtocol::onInterest(Interest &interest) {
- TRANSPORT_LOGD("Received interest for %s",
- interest.getName().toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received interest for " << interest.getName();
if (*on_interest_input_) {
on_interest_input_->operator()(*socket_->getInterface(), interest);
}
const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
+ output_buffer_.find(interest.getName());
if (content_object) {
if (*on_interest_satisfied_output_buffer_) {