diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_bytestream.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_bytestream.cc | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc new file mode 100644 index 000000000..6bd989fe4 --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_bytestream.cc @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_producer.h> +#include <protocols/prod_protocol_bytestream.h> + +#include <atomic> + +namespace transport { + +namespace protocol { + +using namespace core; +using namespace implementation; + +ByteStreamProductionProtocol::ByteStreamProductionProtocol( + implementation::ProducerSocket *icn_socket) + : ProductionProtocol(icn_socket) {} + +ByteStreamProductionProtocol::~ByteStreamProductionProtocol() { + stop(); + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +uint32_t ByteStreamProductionProtocol::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceDatagram(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, + bool is_last, + uint32_t start_offset) { + if (!buffer_size) { + return 0; + } + + return produceStream(content_name, + utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, + start_offset); +} + +uint32_t ByteStreamProductionProtocol::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { + return 0; + } + + Name name(content_name); + + // Get the atomic variables to ensure they keep the same value + // during the production + + // Total size of the data packet + uint32_t data_packet_size; + socket_->getSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + // Expiry time + uint32_t content_object_expiry_time; + socket_->getSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + content_object_expiry_time); + + // Hash algorithm + auth::CryptoHashType hash_algo; + socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo); + + // Use manifest + bool making_manifest; + socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest); + + // Suffix calculation strategy + core::NextSegmentCalculationStrategy _suffix_strategy; + socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY, + _suffix_strategy); + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + _suffix_strategy, start_offset); + + std::shared_ptr<auth::Signer> signer; + socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer); + + auto buffer_size = buffer->length(); + int bytes_segmented = 0; + std::size_t header_size; + std::size_t manifest_header_size = 0; + std::size_t signature_length = 0; + std::uint32_t final_block_number = start_offset; + uint64_t free_space_for_content = 0; + + core::Packet::Format format; + std::shared_ptr<core::ContentObjectManifest> manifest; + bool is_last_manifest = false; + + // TODO Manifest may still be used for indexing + if (making_manifest && !signer) { + TRANSPORT_LOGE("Making manifests without setting producer identity."); + } + + core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; + core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; + + if (name.getType() == HNT_CONTIGUOUS_V4 || name.getType() == HNT_IOV_V4) { + hf_format = core::Packet::Format::HF_INET_TCP; + hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; + } else if (name.getType() == HNT_CONTIGUOUS_V6 || + name.getType() == HNT_IOV_V6) { + hf_format = core::Packet::Format::HF_INET6_TCP; + hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; + } else { + throw errors::RuntimeException("Unknown name format."); + } + + format = hf_format; + if (making_manifest) { + manifest_header_size = core::Packet::getHeaderSizeFromFormat( + signer ? hf_format_ah : hf_format, + signer ? signer->getSignatureSize() : 0); + } else if (signer) { + format = hf_format_ah; + signature_length = signer->getSignatureSize(); + } + + header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); + free_space_for_content = data_packet_size - header_size; + uint32_t number_of_segments = + uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); + if (free_space_for_content * number_of_segments < buffer_size) { + number_of_segments++; + } + + // TODO allocate space for all the headers + if (making_manifest) { + uint32_t segment_in_manifest = static_cast<uint32_t>( + std::floor(double(data_packet_size - manifest_header_size - + ContentObjectManifest::getManifestHeaderSize()) / + ContentObjectManifest::getManifestEntrySize()) - + 1.0); + uint32_t number_of_manifests = static_cast<uint32_t>( + std::ceil(float(number_of_segments) / segment_in_manifest)); + final_block_number += number_of_segments + number_of_manifests - 1; + + manifest.reset(ContentObjectManifest::createManifest( + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, + hash_algo, is_last_manifest, name, _suffix_strategy, + signer ? signer->getSignatureSize() : 0)); + manifest->setLifetime(content_object_expiry_time); + + if (is_last) { + manifest->setFinalBlockNumber(final_block_number); + } else { + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + for (unsigned int packaged_segments = 0; + packaged_segments < number_of_segments; packaged_segments++) { + if (making_manifest) { + if (manifest->estimateManifestSize(2) > + data_packet_size - manifest_header_size) { + manifest->encode(); + + // If identity set, sign manifest + if (signer) { + signer->signPacket(manifest.get()); + } + + // Send the current manifest + passContentObjectToCallbacks(manifest); + + TRANSPORT_LOGD("Send manifest %s", + manifest->getName().toString().c_str()); + + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); + content_queue_.pop(); + } + + // Create new manifest. The reference to the last manifest has been + // acquired in the passContentObjectToCallbacks function, so we can + // safely release this reference + manifest.reset(ContentObjectManifest::createManifest( + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, + name, _suffix_strategy, signer ? signer->getSignatureSize() : 0)); + + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); + } + } + + auto content_suffix = suffix_strategy->getNextContentSuffix(); + auto content_object = std::make_shared<ContentObject>( + name.setSuffix(content_suffix), format, + signer && !making_manifest ? signer->getSignatureSize() : 0); + content_object->setLifetime(content_object_expiry_time); + + auto b = buffer->cloneOne(); + b->trimStart(free_space_for_content * packaged_segments); + b->trimEnd(b->length()); + + if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { + b->append(buffer_size - bytes_segmented); + bytes_segmented += (int)(buffer_size - bytes_segmented); + + if (is_last && making_manifest) { + is_last_manifest = true; + } else if (is_last) { + content_object->setRst(); + } + + } else { + b->append(free_space_for_content); + bytes_segmented += (int)(free_space_for_content); + } + + content_object->appendPayload(std::move(b)); + + if (making_manifest) { + using namespace std::chrono_literals; + auth::CryptoHash hash = content_object->computeDigest(hash_algo); + manifest->addSuffixHash(content_suffix, hash); + content_queue_.push(content_object); + } else { + if (signer) { + signer->signPacket(content_object.get()); + } + passContentObjectToCallbacks(content_object); + TRANSPORT_LOGD("Send content %s", + content_object->getName().toString().c_str()); + } + } + + if (making_manifest) { + if (is_last_manifest) { + manifest->setFinalManifest(is_last_manifest); + } + + manifest->encode(); + + if (signer) { + signer->signPacket(manifest.get()); + } + + passContentObjectToCallbacks(manifest); + TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str()); + + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + TRANSPORT_LOGD("Send content %s", + content_queue_.front()->getName().toString().c_str()); + content_queue_.pop(); + } + } + + portal_->getIoService().post([this]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } + }); + + portal_->getIoService().dispatch([this, buffer_size]() { + if (*on_content_produced_) { + on_content_produced_->operator()(*socket_->getInterface(), + std::make_error_code(std::errc(0)), + buffer_size); + } + }); + + return suffix_strategy->getTotalCount(); +} + +void ByteStreamProductionProtocol::scheduleSendBurst() { + portal_->getIoService().post([this]() { + std::shared_ptr<ContentObject> co; + + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } else { + break; + } + } + }); +} + +void ByteStreamProductionProtocol::passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); + } +} + +void ByteStreamProductionProtocol::onInterest(Interest &interest) { + TRANSPORT_LOGD("Received interest for %s", + interest.getName().toString().c_str()); + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), + interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + portal_->sendContentObject(*content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + } +} + +void ByteStreamProductionProtocol::onError(std::error_code ec) {} + +} // namespace protocol +} // end namespace transport |