diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_bytestream.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_bytestream.cc | 231 |
1 files changed, 105 insertions, 126 deletions
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc index f659cb37c..2a3ec07e1 100644 --- a/libtransport/src/protocols/prod_protocol_bytestream.cc +++ b/libtransport/src/protocols/prod_protocol_bytestream.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -29,12 +29,7 @@ ByteStreamProductionProtocol::ByteStreamProductionProtocol( implementation::ProducerSocket *icn_socket) : ProductionProtocol(icn_socket) {} -ByteStreamProductionProtocol::~ByteStreamProductionProtocol() { - stop(); - if (listening_thread_.joinable()) { - listening_thread_.join(); - } -} +ByteStreamProductionProtocol::~ByteStreamProductionProtocol() { stop(); } uint32_t ByteStreamProductionProtocol::produceDatagram( const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { @@ -68,16 +63,16 @@ uint32_t ByteStreamProductionProtocol::produceStream( return 0; } - Name name(content_name); - - // Get the atomic variables to ensure they keep the same value - // during the production - // Total size of the data packet uint32_t data_packet_size; socket_->getSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); + // Maximum size of a segment + uint32_t max_segment_size; + socket_->getSocketOption(GeneralTransportOptions::MAX_SEGMENT_SIZE, + max_segment_size); + // Expiry time uint32_t content_object_expiry_time; socket_->getSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, @@ -88,106 +83,90 @@ uint32_t ByteStreamProductionProtocol::produceStream( socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo); // Suffix calculation strategy - core::NextSegmentCalculationStrategy _suffix_strategy; + std::shared_ptr<utils::SuffixStrategy> suffix_strategy; socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY, - _suffix_strategy); - auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( - _suffix_strategy, start_offset); + suffix_strategy); + suffix_strategy->reset(start_offset); - auto buffer_size = buffer->length(); + // Default format + core::Packet::Format default_format; + socket_->getSocketOption(GeneralTransportOptions::PACKET_FORMAT, + default_format); + + Name name(content_name); + size_t buffer_size = buffer->length(); + size_t signature_length = signer_->getSignatureFieldSize(); + uint32_t final_block_number = start_offset; + + // Content-related + core::Packet::Format content_format; + uint32_t content_header_size; + uint64_t content_free_space; + uint32_t nb_segments; int bytes_segmented = 0; - std::size_t header_size; - std::size_t manifest_header_size = 0; - std::size_t signature_length = 0; - std::uint32_t final_block_number = start_offset; - uint64_t free_space_for_content = 0; - core::Packet::Format format; + // Manifest-related + core::Packet::Format manifest_format; + uint32_t manifest_header_size; + uint64_t manifest_free_space; + uint32_t nb_manifests; std::shared_ptr<core::ContentObjectManifest> manifest; + uint32_t manifest_capacity = making_manifest_; bool is_last_manifest = false; - - // TODO Manifest may still be used for indexing - if (making_manifest_ && !signer_) { - LOG(FATAL) << "Making manifests without setting producer identity."; - } - - core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; - core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; - - if (name.getType() == HNT_CONTIGUOUS_V4 || name.getType() == HNT_IOV_V4) { - hf_format = core::Packet::Format::HF_INET_TCP; - hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; - } else if (name.getType() == HNT_CONTIGUOUS_V6 || - name.getType() == HNT_IOV_V6) { - hf_format = core::Packet::Format::HF_INET6_TCP; - hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; - } else { - throw errors::RuntimeException("Unknown name format."); - } - - format = hf_format; - if (making_manifest_) { - manifest_header_size = core::Packet::getHeaderSizeFromFormat( - signer_ ? hf_format_ah : hf_format, - signer_ ? signer_->getSignatureFieldSize() : 0); - } else if (signer_) { - format = hf_format_ah; - signature_length = signer_->getSignatureFieldSize(); + ParamsBytestream transport_params; + + manifest_format = Packet::toAHFormat(default_format); + content_format = + !making_manifest_ ? Packet::toAHFormat(default_format) : default_format; + + content_header_size = + core::Packet::getHeaderSizeFromFormat(content_format, signature_length); + manifest_header_size = + core::Packet::getHeaderSizeFromFormat(manifest_format, signature_length); + content_free_space = + std::min(max_segment_size, data_packet_size - content_header_size); + manifest_free_space = + std::min(max_segment_size, data_packet_size - manifest_header_size); + + // Compute the number of segments the data will be split into + nb_segments = + uint32_t(std::ceil(double(buffer_size) / double(content_free_space))); + if (content_free_space * nb_segments < buffer_size) { + nb_segments++; } - header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size - header_size; - uint32_t number_of_segments = - uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); - if (free_space_for_content * number_of_segments < buffer_size) { - number_of_segments++; - } - - // TODO allocate space for all the headers if (making_manifest_) { - uint32_t segment_in_manifest = static_cast<uint32_t>( - std::floor(double(data_packet_size - manifest_header_size - - ContentObjectManifest::getManifestHeaderSize()) / - ContentObjectManifest::getManifestEntrySize()) - - 1.0); - uint32_t number_of_manifests = static_cast<uint32_t>( - std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number += number_of_segments + number_of_manifests - 1; + nb_manifests = static_cast<uint32_t>( + std::ceil(float(nb_segments) / manifest_capacity)); + final_block_number += nb_segments + nb_manifests - 1; + transport_params.final_segment = + is_last ? final_block_number : utils::SuffixStrategy::MAX_SUFFIX; manifest.reset(ContentObjectManifest::createManifest( + manifest_format, name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, name, _suffix_strategy, - signer_ ? signer_->getSignatureFieldSize() : 0)); - manifest->setLifetime(content_object_expiry_time); + is_last_manifest, name, hash_algo, signature_length)); - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); - } + manifest->setLifetime(content_object_expiry_time); + manifest->setParamsBytestream(transport_params); } - for (unsigned int packaged_segments = 0; - packaged_segments < number_of_segments; packaged_segments++) { + auto self = shared_from_this(); + for (unsigned int packaged_segments = 0; packaged_segments < nb_segments; + packaged_segments++) { if (making_manifest_) { - if (manifest->estimateManifestSize(2) > - data_packet_size - manifest_header_size) { + if (manifest->estimateManifestSize(1) > manifest_free_space) { manifest->encode(); - - // If identity set, sign manifest - if (signer_) { - signer_->signPacket(manifest.get()); - } + signer_->signPacket(manifest.get()); // Send the current manifest - passContentObjectToCallbacks(manifest); - + passContentObjectToCallbacks(manifest, self); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName(); // Send content objects stored in the queue while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); + passContentObjectToCallbacks(content_queue_.front(), self); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send content " << content_queue_.front()->getName(); content_queue_.pop(); @@ -195,86 +174,84 @@ uint32_t ByteStreamProductionProtocol::produceStream( // Create new manifest. The reference to the last manifest has been // acquired in the passContentObjectToCallbacks function, so we can - // safely release this reference + // safely release this reference. manifest.reset(ContentObjectManifest::createManifest( + manifest_format, name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - name, _suffix_strategy, - signer_ ? signer_->getSignatureFieldSize() : 0)); + core::ManifestType::INLINE_MANIFEST, is_last_manifest, name, + hash_algo, signature_length)); manifest->setLifetime(content_object_expiry_time); - manifest->setFinalBlockNumber( - is_last ? final_block_number - : utils::SuffixStrategy::INVALID_SUFFIX); + manifest->setParamsBytestream(transport_params); } } - auto content_suffix = suffix_strategy->getNextContentSuffix(); + // Create content object + uint32_t content_suffix = suffix_strategy->getNextContentSuffix(); auto content_object = std::make_shared<ContentObject>( - name.setSuffix(content_suffix), format, - signer_ && !making_manifest_ ? signer_->getSignatureFieldSize() : 0); + name.setSuffix(content_suffix), content_format, + !making_manifest_ ? signature_length : 0); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); - b->trimStart(free_space_for_content * packaged_segments); + b->trimStart(content_free_space * packaged_segments); b->trimEnd(b->length()); - if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { + // Segment the input data + if (TRANSPORT_EXPECT_FALSE(packaged_segments == nb_segments - 1)) { b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); if (is_last && making_manifest_) { is_last_manifest = true; } else if (is_last) { - content_object->setRst(); + content_object->setLast(); } } else { - b->append(free_space_for_content); - bytes_segmented += (int)(free_space_for_content); + b->append(content_free_space); + bytes_segmented += (int)(content_free_space); } + // Set the segmented data as payload content_object->appendPayload(std::move(b)); + // Either we sign the content object or we save its hash into the current + // manifest if (making_manifest_) { - using namespace std::chrono_literals; auth::CryptoHash hash = content_object->computeDigest(hash_algo); manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { - if (signer_) { - signer_->signPacket(content_object.get()); - } - passContentObjectToCallbacks(content_object); + signer_->signPacket(content_object.get()); + passContentObjectToCallbacks(content_object, self); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send content " << content_object->getName(); } } + // We send the manifest that hasn't been fully filled yet if (making_manifest_) { if (is_last_manifest) { - manifest->setFinalManifest(is_last_manifest); + manifest->setIsLast(is_last_manifest); } manifest->encode(); + signer_->signPacket(manifest.get()); - if (signer_) { - signer_->signPacket(manifest.get()); - } - - passContentObjectToCallbacks(manifest); + passContentObjectToCallbacks(manifest, self); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest->getName(); while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); + passContentObjectToCallbacks(content_queue_.front(), self); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send content " << content_queue_.front()->getName(); content_queue_.pop(); } } - portal_->getIoService().post([this]() { + portal_->getThread().add([this, self]() { std::shared_ptr<ContentObject> co; while (object_queue_for_callbacks_.pop(co)) { if (*on_new_segment_) { @@ -296,7 +273,7 @@ uint32_t ByteStreamProductionProtocol::produceStream( } }); - portal_->getIoService().dispatch([this, buffer_size]() { + portal_->getThread().add([this, buffer_size, self]() { if (*on_content_produced_) { on_content_produced_->operator()(*socket_->getInterface(), std::make_error_code(std::errc(0)), @@ -307,9 +284,10 @@ uint32_t ByteStreamProductionProtocol::produceStream( return suffix_strategy->getTotalCount(); } -void ByteStreamProductionProtocol::scheduleSendBurst() { - portal_->getIoService().post([this]() { - std::shared_ptr<ContentObject> co; +void ByteStreamProductionProtocol::scheduleSendBurst( + const std::shared_ptr<ByteStreamProductionProtocol> &self) { + portal_->getThread().add([this, self]() { + ContentObject::Ptr co; for (uint32_t i = 0; i < burst_size; i++) { if (object_queue_for_callbacks_.pop(co)) { @@ -321,11 +299,15 @@ void ByteStreamProductionProtocol::scheduleSendBurst() { on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); } + output_buffer_.insert(co); + if (*on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_->operator()( *socket_->getInterface(), *co); } + portal_->sendContentObject(*co); + if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), *co); } @@ -337,13 +319,12 @@ void ByteStreamProductionProtocol::scheduleSendBurst() { } void ByteStreamProductionProtocol::passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - output_buffer_.insert(content_object); - portal_->sendContentObject(*content_object); + const std::shared_ptr<ContentObject> &content_object, + const std::shared_ptr<ByteStreamProductionProtocol> &self) { object_queue_for_callbacks_.push(std::move(content_object)); if (object_queue_for_callbacks_.size() >= burst_size) { - scheduleSendBurst(); + scheduleSendBurst(self); } } @@ -376,7 +357,5 @@ void ByteStreamProductionProtocol::onInterest(Interest &interest) { } } -void ByteStreamProductionProtocol::onError(std::error_code ec) {} - } // namespace protocol } // end namespace transport |