diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_bytestream.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_bytestream.cc | 369 |
1 files changed, 369 insertions, 0 deletions
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc new file mode 100644 index 000000000..7f103e12b --- /dev/null +++ b/libtransport/src/protocols/prod_protocol_bytestream.cc @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_producer.h> +#include <protocols/prod_protocol_bytestream.h> + +#include <atomic> + +namespace transport { + +namespace protocol { + +using namespace core; +using namespace implementation; + +ByteStreamProductionProtocol::ByteStreamProductionProtocol( + implementation::ProducerSocket *icn_socket) + : ProductionProtocol(icn_socket) {} + +ByteStreamProductionProtocol::~ByteStreamProductionProtocol() { stop(); } + +uint32_t ByteStreamProductionProtocol::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceDatagram(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size) { + throw errors::NotImplementedException(); +} + +uint32_t ByteStreamProductionProtocol::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, + bool is_last, + uint32_t start_offset) { + if (!buffer_size) { + return 0; + } + + return produceStream(content_name, + utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, + start_offset); +} + +uint32_t ByteStreamProductionProtocol::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { + if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { + return 0; + } + + // Total size of the data packet + uint32_t data_packet_size; + socket_->getSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE, + data_packet_size); + + // Maximum size of a segment + uint32_t max_segment_size; + socket_->getSocketOption(GeneralTransportOptions::MAX_SEGMENT_SIZE, + max_segment_size); + + // Expiry time + uint32_t content_object_expiry_time; + socket_->getSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + content_object_expiry_time); + + // Hash algorithm + auth::CryptoHashType hash_algo; + socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo); + + // Suffix calculation strategy + std::shared_ptr<utils::SuffixStrategy> suffix_strategy; + socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY, + suffix_strategy); + suffix_strategy->reset(start_offset); + + // Default format + core::Packet::Format default_format; + socket_->getSocketOption(GeneralTransportOptions::PACKET_FORMAT, + default_format); + + Name name(content_name); + size_t buffer_size = buffer->length(); + size_t signature_length = signer_->getSignatureFieldSize(); + uint32_t final_block_number = start_offset; + + // Content-related + core::Packet::Format content_format; + uint32_t content_header_size; + uint64_t content_free_space; + uint32_t nb_segments; + int bytes_segmented = 0; + + // Manifest-related + core::Packet::Format manifest_format; + uint32_t manifest_header_size; + uint64_t manifest_free_space; + uint32_t nb_manifests; + std::shared_ptr<core::ContentObjectManifest> manifest; + uint32_t manifest_capacity = manifest_max_capacity_; + bool is_last_manifest = false; + ParamsBytestream transport_params; + + manifest_format = Packet::toAHFormat(default_format); + content_format = !manifest_max_capacity_ ? Packet::toAHFormat(default_format) + : default_format; + + content_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat( + content_format, signature_length); + manifest_header_size = (uint32_t)core::Packet::getHeaderSizeFromFormat( + manifest_format, signature_length); + content_free_space = + std::min(max_segment_size, data_packet_size - content_header_size); + manifest_free_space = + std::min(max_segment_size, data_packet_size - manifest_header_size); + + // Compute the number of segments the data will be split into + nb_segments = + uint32_t(std::ceil(double(buffer_size) / double(content_free_space))); + if (content_free_space * nb_segments < buffer_size) { + nb_segments++; + } + + if (manifest_max_capacity_) { + nb_manifests = static_cast<uint32_t>( + std::ceil(float(nb_segments) / manifest_capacity)); + final_block_number += nb_segments + nb_manifests - 1; + transport_params.final_segment = + is_last ? final_block_number : utils::SuffixStrategy::MAX_SUFFIX; + + manifest = ContentObjectManifest::createContentManifest( + manifest_format, + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + signature_length); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, is_last_manifest, + name); + manifest->setParamsBytestream(transport_params); + manifest->getPacket()->setLifetime(content_object_expiry_time); + } + + auto self = shared_from_this(); + for (unsigned int packaged_segments = 0; packaged_segments < nb_segments; + packaged_segments++) { + if (manifest_max_capacity_) { + if (manifest->Encoder::manifestSize(1) > manifest_free_space) { + manifest->encode(); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); + + signer_->signPacket(manifest_co.get()); + + // Send the current manifest + passContentObjectToCallbacks(manifest_co, self); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send manifest " << manifest_co->getName(); + + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front(), self); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content " << content_queue_.front()->getName(); + content_queue_.pop(); + } + + // Create new manifest. The reference to the last manifest has been + // acquired in the passContentObjectToCallbacks function, so we can + // safely release this reference. + manifest = ContentObjectManifest::createContentManifest( + manifest_format, + name.setSuffix(suffix_strategy->getNextManifestSuffix()), + signature_length); + manifest->setHeaders(core::ManifestType::INLINE_MANIFEST, + manifest_max_capacity_, hash_algo, + is_last_manifest, name); + manifest->setParamsBytestream(transport_params); + manifest->getPacket()->setLifetime(content_object_expiry_time); + } + } + + // Create content object + uint32_t content_suffix = suffix_strategy->getNextContentSuffix(); + auto content_object = std::make_shared<ContentObject>( + name.setSuffix(content_suffix), content_format, + !manifest_max_capacity_ ? signature_length : 0); + content_object->setLifetime(content_object_expiry_time); + + auto b = buffer->cloneOne(); + b->trimStart(content_free_space * packaged_segments); + b->trimEnd(b->length()); + + // Segment the input data + if (TRANSPORT_EXPECT_FALSE(packaged_segments == nb_segments - 1)) { + b->append(buffer_size - bytes_segmented); + bytes_segmented += (int)(buffer_size - bytes_segmented); + + if (is_last && manifest_max_capacity_) { + is_last_manifest = true; + } else if (is_last) { + content_object->setLast(); + } + + } else { + b->append(content_free_space); + bytes_segmented += (int)(content_free_space); + } + + // Set the segmented data as payload + content_object->appendPayload(std::move(b)); + + // Either we sign the content object or we save its hash into the current + // manifest + if (manifest_max_capacity_) { + auth::CryptoHash hash = content_object->computeDigest(hash_algo); + manifest->addEntry(content_suffix, hash); + content_queue_.push(content_object); + } else { + signer_->signPacket(content_object.get()); + passContentObjectToCallbacks(content_object, self); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content " << content_object->getName(); + } + } + + // We send the manifest that hasn't been fully filled yet + if (manifest_max_capacity_) { + if (is_last_manifest) { + manifest->setIsLast(is_last_manifest); + } + + manifest->encode(); + auto manifest_co = + std::dynamic_pointer_cast<ContentObject>(manifest->getPacket()); + + signer_->signPacket(manifest_co.get()); + + passContentObjectToCallbacks(manifest_co, self); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send manifest " << manifest_co->getName(); + + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front(), self); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content " << content_queue_.front()->getName(); + content_queue_.pop(); + } + } + + portal_->getThread().add([this, self]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } + }); + + portal_->getThread().add([this, buffer_size, self]() { + if (*on_content_produced_) { + on_content_produced_->operator()(*socket_->getInterface(), + std::make_error_code(std::errc(0)), + buffer_size); + } + }); + + return suffix_strategy->getTotalCount(); +} + +void ByteStreamProductionProtocol::scheduleSendBurst( + const std::shared_ptr<ByteStreamProductionProtocol> &self) { + portal_->getThread().add([this, self]() { + ContentObject::Ptr co; + + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (*on_new_segment_) { + on_new_segment_->operator()(*socket_->getInterface(), *co); + } + + if (*on_content_object_to_sign_) { + on_content_object_to_sign_->operator()(*socket_->getInterface(), *co); + } + + output_buffer_.insert(co); + + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()( + *socket_->getInterface(), *co); + } + + portal_->sendContentObject(*co); + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } + } else { + break; + } + } + }); +} + +void ByteStreamProductionProtocol::passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object, + const std::shared_ptr<ByteStreamProductionProtocol> &self) { + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(self); + } +} + +void ByteStreamProductionProtocol::onInterest(Interest &interest) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received interest for " << interest.getName(); + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + const std::shared_ptr<ContentObject> content_object = + output_buffer_.find(interest.getName()); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), + interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + portal_->sendContentObject(*content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + } +} + +} // namespace protocol +} // end namespace transport |