aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_producer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.cc')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc70
1 files changed, 36 insertions, 34 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 8f8fc1a79..5cf40cd1c 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -38,6 +38,8 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
registration_status_(REGISTRATION_NOT_ATTEMPTED),
making_manifest_(false),
hash_algorithm_(HashAlgorithm::SHA_256),
+ suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -151,33 +153,32 @@ uint32_t ProducerSocket::produce(Name content_name,
return 0;
}
- // Copy the atomic variable to ensure always the same value during the a
- // production
+ // Copy the atomic variables to ensure they keep the same value
+ // during the production
std::size_t data_packet_size = data_packet_size_;
uint32_t content_object_expiry_time = content_object_expiry_time_;
- HashAlgorithm algo = hash_algorithm_;
+ HashAlgorithm hash_algo = hash_algorithm_;
bool making_manifest = making_manifest_;
+ utils::SuffixContent suffix_content = suffix_content_;
+ utils::SuffixManifest suffix_manifest = suffix_manifest_;
std::shared_ptr<utils::Identity> identity;
getSocketOption(GeneralTransportOptions::IDENTITY, identity);
auto buffer_size = buffer->length();
-
const std::size_t hash_size = 32;
-
int bytes_segmented = 0;
std::size_t header_size;
std::size_t manifest_header_size = 0;
std::size_t signature_length = 0;
std::uint32_t final_block_number = 0;
-
uint64_t free_space_for_content = 0;
core::Packet::Format format;
-
- uint32_t current_segment = start_offset;
std::shared_ptr<ContentObjectManifest> manifest;
bool is_last_manifest = false;
std::unique_ptr<utils::CryptoHash> zero_hash;
+ suffix_content.updateSuffix(start_offset);
+ suffix_content.setUsingManifest(making_manifest);
// TODO Manifest may still be used for indexing
if (making_manifest && !identity) {
@@ -210,35 +211,37 @@ uint32_t ProducerSocket::produce(Name content_name,
}
header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
-
free_space_for_content = data_packet_size - header_size;
-
uint32_t number_of_segments =
uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
-
if (free_space_for_content * number_of_segments < buffer_size) {
number_of_segments++;
}
// TODO allocate space for all the headers
-
if (making_manifest) {
- auto segment_in_manifest = static_cast<float>(
+ uint32_t segment_in_manifest = static_cast<uint32_t>(
std::floor(double(data_packet_size - manifest_header_size -
ContentObjectManifest::getManifestHeaderSize()) /
- (4.0 + 32.0)) -
+ ContentObjectManifest::getManifestEntrySize()) -
1.0);
- auto number_of_manifests = static_cast<uint32_t>(
+ uint32_t number_of_manifests = static_cast<uint32_t>(
std::ceil(float(number_of_segments) / segment_in_manifest));
final_block_number = number_of_segments + number_of_manifests - 1;
+ suffix_manifest.updateSuffix(start_offset);
+ suffix_manifest.setNbSegments(segment_in_manifest);
+ suffix_content.updateSuffix(start_offset + 1);
+ suffix_content.setNbSegments(segment_in_manifest);
+
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(current_segment++),
+ content_name.setSuffix(suffix_manifest.getSuffix()),
core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
- hash_algorithm_, is_last_manifest, content_name,
+ hash_algo, is_last_manifest, content_name,
core::NextSegmentCalculationStrategy::INCREMENTAL,
identity->getSignatureLength()));
manifest->setLifetime(content_object_expiry_time);
+ suffix_manifest++;
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
@@ -249,7 +252,7 @@ uint32_t ProducerSocket::produce(Name content_name,
uint8_t hash[hash_size];
std::memset(hash, 0, hash_size);
zero_hash = std::make_unique<utils::CryptoHash>(
- hash, hash_size, static_cast<utils::CryptoHashType>(algo));
+ hash, hash_size, static_cast<utils::CryptoHashType>(hash_algo));
}
for (unsigned int packaged_segments = 0;
@@ -258,42 +261,41 @@ uint32_t ProducerSocket::produce(Name content_name,
if (manifest->estimateManifestSize(2) >
data_packet_size - manifest_header_size) {
// Add next manifest
- manifest->addSuffixHash(current_segment, *zero_hash);
-
+ manifest->addSuffixHash(suffix_manifest.getSuffix(), *zero_hash);
// Send the current manifest
manifest->encode();
-
identity->getSigner().sign(*manifest);
-
passContentObjectToCallbacks(manifest);
// Create new manifest. The reference to the last manifest has been
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference
manifest.reset(ContentObjectManifest::createManifest(
- content_name.setSuffix(current_segment),
+ content_name.setSuffix(suffix_manifest.getSuffix()),
core::ManifestVersion::VERSION_1,
- core::ManifestType::INLINE_MANIFEST, hash_algorithm_,
- is_last_manifest, content_name,
- core::NextSegmentCalculationStrategy::INCREMENTAL,
+ core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
+ content_name, core::NextSegmentCalculationStrategy::INCREMENTAL,
identity->getSignatureLength()));
manifest->setLifetime(content_object_expiry_time);
+
if (is_last) {
manifest->setFinalBlockNumber(final_block_number);
} else {
manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
}
- current_segment++;
+
+ suffix_manifest++;
}
}
auto content_object = std::make_shared<ContentObject>(
- content_name.setSuffix(current_segment), format);
+ content_name.setSuffix(suffix_content.getSuffix()), format);
content_object->setLifetime(content_object_expiry_time);
auto b = buffer->cloneOne();
b->trimStart(free_space_for_content * packaged_segments);
b->trimEnd(b->length());
+
if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
b->append(buffer_size - bytes_segmented);
bytes_segmented += (int)(buffer_size - bytes_segmented);
@@ -313,14 +315,14 @@ uint32_t ProducerSocket::produce(Name content_name,
if (making_manifest) {
using namespace std::chrono_literals;
- utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_);
- manifest->addSuffixHash(current_segment, hash);
+ utils::CryptoHash hash = content_object->computeDigest(hash_algo);
+ manifest->addSuffixHash(suffix_content.getSuffix(), hash);
} else if (identity) {
identity->getSigner().sign(*content_object);
}
- current_segment++;
passContentObjectToCallbacks(content_object);
+ suffix_content++;
}
if (making_manifest) {
@@ -329,7 +331,7 @@ uint32_t ProducerSocket::produce(Name content_name,
}
if (!is_last) {
- manifest->addSuffixHash(current_segment, *zero_hash);
+ manifest->addSuffixHash(suffix_content.getSuffix(), *zero_hash);
}
manifest->encode();
@@ -344,7 +346,7 @@ uint32_t ProducerSocket::produce(Name content_name,
});
}
- return current_segment - start_offset;
+ return suffix_content.getSuffix() - start_offset;
}
void ProducerSocket::asyncProduce(ContentObject &content_object) {
@@ -880,4 +882,4 @@ asio::io_service &ProducerSocket::getIoService() { return io_service_; }
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport