diff options
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.cc')
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/socket_producer.cc | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 8f8fc1a79..5cf40cd1c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -38,6 +38,8 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), + suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -151,33 +153,32 @@ uint32_t ProducerSocket::produce(Name content_name, return 0; } - // Copy the atomic variable to ensure always the same value during the a - // production + // Copy the atomic variables to ensure they keep the same value + // during the production std::size_t data_packet_size = data_packet_size_; uint32_t content_object_expiry_time = content_object_expiry_time_; - HashAlgorithm algo = hash_algorithm_; + HashAlgorithm hash_algo = hash_algorithm_; bool making_manifest = making_manifest_; + utils::SuffixContent suffix_content = suffix_content_; + utils::SuffixManifest suffix_manifest = suffix_manifest_; std::shared_ptr<utils::Identity> identity; getSocketOption(GeneralTransportOptions::IDENTITY, identity); auto buffer_size = buffer->length(); - const std::size_t hash_size = 32; - int bytes_segmented = 0; std::size_t header_size; std::size_t manifest_header_size = 0; std::size_t signature_length = 0; std::uint32_t final_block_number = 0; - uint64_t free_space_for_content = 0; core::Packet::Format format; - - uint32_t current_segment = start_offset; std::shared_ptr<ContentObjectManifest> manifest; bool is_last_manifest = false; std::unique_ptr<utils::CryptoHash> zero_hash; + suffix_content.updateSuffix(start_offset); + suffix_content.setUsingManifest(making_manifest); // TODO Manifest may still be used for indexing if (making_manifest && !identity) { @@ -210,35 +211,37 @@ uint32_t ProducerSocket::produce(Name content_name, } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size - header_size; - uint32_t number_of_segments = uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); - if (free_space_for_content * number_of_segments < buffer_size) { number_of_segments++; } // TODO allocate space for all the headers - if (making_manifest) { - auto segment_in_manifest = static_cast<float>( + uint32_t segment_in_manifest = static_cast<uint32_t>( std::floor(double(data_packet_size - manifest_header_size - ContentObjectManifest::getManifestHeaderSize()) / - (4.0 + 32.0)) - + ContentObjectManifest::getManifestEntrySize()) - 1.0); - auto number_of_manifests = static_cast<uint32_t>( + uint32_t number_of_manifests = static_cast<uint32_t>( std::ceil(float(number_of_segments) / segment_in_manifest)); final_block_number = number_of_segments + number_of_manifests - 1; + suffix_manifest.updateSuffix(start_offset); + suffix_manifest.setNbSegments(segment_in_manifest); + suffix_content.updateSuffix(start_offset + 1); + suffix_content.setNbSegments(segment_in_manifest); + manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(current_segment++), + content_name.setSuffix(suffix_manifest.getSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algorithm_, is_last_manifest, content_name, + hash_algo, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, identity->getSignatureLength())); manifest->setLifetime(content_object_expiry_time); + suffix_manifest++; if (is_last) { manifest->setFinalBlockNumber(final_block_number); @@ -249,7 +252,7 @@ uint32_t ProducerSocket::produce(Name content_name, uint8_t hash[hash_size]; std::memset(hash, 0, hash_size); zero_hash = std::make_unique<utils::CryptoHash>( - hash, hash_size, static_cast<utils::CryptoHashType>(algo)); + hash, hash_size, static_cast<utils::CryptoHashType>(hash_algo)); } for (unsigned int packaged_segments = 0; @@ -258,42 +261,41 @@ uint32_t ProducerSocket::produce(Name content_name, if (manifest->estimateManifestSize(2) > data_packet_size - manifest_header_size) { // Add next manifest - manifest->addSuffixHash(current_segment, *zero_hash); - + manifest->addSuffixHash(suffix_manifest.getSuffix(), *zero_hash); // Send the current manifest manifest->encode(); - identity->getSigner().sign(*manifest); - passContentObjectToCallbacks(manifest); // Create new manifest. The reference to the last manifest has been // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(current_segment), + content_name.setSuffix(suffix_manifest.getSuffix()), core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, hash_algorithm_, - is_last_manifest, content_name, - core::NextSegmentCalculationStrategy::INCREMENTAL, + core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, + content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, identity->getSignatureLength())); manifest->setLifetime(content_object_expiry_time); + if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); } - current_segment++; + + suffix_manifest++; } } auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(current_segment), format); + content_name.setSuffix(suffix_content.getSuffix()), format); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); b->trimStart(free_space_for_content * packaged_segments); b->trimEnd(b->length()); + if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); @@ -313,14 +315,14 @@ uint32_t ProducerSocket::produce(Name content_name, if (making_manifest) { using namespace std::chrono_literals; - utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); - manifest->addSuffixHash(current_segment, hash); + utils::CryptoHash hash = content_object->computeDigest(hash_algo); + manifest->addSuffixHash(suffix_content.getSuffix(), hash); } else if (identity) { identity->getSigner().sign(*content_object); } - current_segment++; passContentObjectToCallbacks(content_object); + suffix_content++; } if (making_manifest) { @@ -329,7 +331,7 @@ uint32_t ProducerSocket::produce(Name content_name, } if (!is_last) { - manifest->addSuffixHash(current_segment, *zero_hash); + manifest->addSuffixHash(suffix_content.getSuffix(), *zero_hash); } manifest->encode(); @@ -344,7 +346,7 @@ uint32_t ProducerSocket::produce(Name content_name, }); } - return current_segment - start_offset; + return suffix_content.getSuffix() - start_offset; } void ProducerSocket::asyncProduce(ContentObject &content_object) { @@ -880,4 +882,4 @@ asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface -} // end namespace transport
\ No newline at end of file +} // end namespace transport |