diff options
6 files changed, 60 insertions, 41 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 7d7f4a0fe..6782000ac 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -165,7 +165,6 @@ uint32_t ProducerSocket::produce(Name content_name, getSocketOption(GeneralTransportOptions::IDENTITY, identity); auto buffer_size = buffer->length(); - const std::size_t hash_size = 32; int bytes_segmented = 0; std::size_t header_size; std::size_t manifest_header_size = 0; @@ -176,7 +175,6 @@ uint32_t ProducerSocket::produce(Name content_name, core::Packet::Format format; std::shared_ptr<ContentObjectManifest> manifest; bool is_last_manifest = false; - std::unique_ptr<utils::CryptoHash> zero_hash; suffix_content.updateSuffix(start_offset); suffix_content.setUsingManifest(making_manifest); @@ -249,10 +247,6 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); } - uint8_t hash[hash_size]; - std::memset(hash, 0, hash_size); - zero_hash = std::make_unique<utils::CryptoHash>( - hash, hash_size, static_cast<utils::CryptoHashType>(hash_algo)); } for (unsigned int packaged_segments = 0; @@ -260,8 +254,6 @@ uint32_t ProducerSocket::produce(Name content_name, if (making_manifest) { if (manifest->estimateManifestSize(2) > data_packet_size - manifest_header_size) { - // Add next manifest - manifest->addSuffixHash(suffix_manifest.getSuffix(), *zero_hash); // Send the current manifest manifest->encode(); identity->getSigner().sign(*manifest); @@ -339,10 +331,6 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->setFinalManifest(is_last_manifest); } - if (!is_last) { - manifest->addSuffixHash(suffix_content.getSuffix(), *zero_hash); - } - manifest->encode(); identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc index 0c3fd76cf..ea13bf9e6 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc @@ -29,9 +29,11 @@ ManifestIndexManager::ManifestIndexManager( interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) : IncrementalIndexManager(icn_socket), PacketManager<Interest>(1024), - next_reassembly_segment_(suffix_queue_.end()), next_to_retrieve_segment_(suffix_queue_.end()), suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + next_reassembly_segment_( + core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true), + ignored_segments_(), next_interest_(next_interest) {} bool ManifestIndexManager::onManifest( @@ -51,13 +53,10 @@ bool ManifestIndexManager::onManifest( switch (manifest->getManifestType()) { case core::ManifestType::INLINE_MANIFEST: { auto _it = manifest->getSuffixList().begin(); - auto _end = --manifest->getSuffixList().end(); + auto _end = manifest->getSuffixList().end(); + size_t nb_segments = std::distance(_it, _end); final_suffix_ = manifest->getFinalBlockNumber(); // final block number - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { - _end++; - } - suffix_hash_map_[_it->first] = std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), manifest->getHashAlgorithm()); @@ -80,15 +79,31 @@ bool ManifestIndexManager::onManifest( } if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { - // Set the iterators to the beginning of the suffix queue - next_reassembly_segment_ = suffix_queue_.begin(); - // Set number of segments in manifests assuming the first one is full - suffix_manifest_.setNbSegments( - std::distance(manifest->getSuffixList().begin(), - manifest->getSuffixList().end()) - - 1); - suffix_manifest_.setSuffixStrategy( - manifest->getNextSegmentCalculationStrategy()); + core::NextSegmentCalculationStrategy strategy = + manifest->getNextSegmentCalculationStrategy(); + + suffix_manifest_.reset(0); + suffix_manifest_.setNbSegments(nb_segments); + suffix_manifest_.setSuffixStrategy(strategy); + TRANSPORT_LOGD("Capacity of 1st manifest %zu", + suffix_manifest_.getNbSegments()); + + next_reassembly_segment_.reset(*suffix_queue_.begin()); + next_reassembly_segment_.setNbSegments(nb_segments); + suffix_manifest_.setSuffixStrategy(strategy); + } + + // If the manifest is not full, we add the suffixes of missing segments + // to the list of segments to ignore when computing the next reassembly + // index. + if (TRANSPORT_EXPECT_FALSE( + suffix_manifest_.getNbSegments() - nb_segments > 0)) { + auto start = manifest->getSuffixList().begin(); + auto last = --_end; + for (uint32_t i = last->first + 1; + i < start->first + suffix_manifest_.getNbSegments(); i++) { + ignored_segments_.push_back(i); + } } if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) { @@ -242,16 +257,28 @@ bool ManifestIndexManager::isFinalSuffixDiscovered() { } uint32_t ManifestIndexManager::getNextReassemblySegment() { - if (TRANSPORT_EXPECT_FALSE(next_reassembly_segment_ == suffix_queue_.end())) { - return invalid_index; - } + uint32_t current_reassembly_segment; + + while (true) { + current_reassembly_segment = next_reassembly_segment_.getSuffix(); + next_reassembly_segment_++; + + if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) { + return invalid_index; + } + + if (ignored_segments_.empty()) break; + + auto is_ignored = + std::find(ignored_segments_.begin(), ignored_segments_.end(), + current_reassembly_segment); + + if (is_ignored == ignored_segments_.end()) break; - if (TRANSPORT_EXPECT_TRUE(next_reassembly_segment_ != - suffix_queue_.begin())) { - suffix_queue_.erase(std::prev(next_reassembly_segment_)); + ignored_segments_.erase(is_ignored); } - return *next_reassembly_segment_++; + return current_reassembly_segment; } void ManifestIndexManager::reset() { diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h index cb88940d5..645b20e9a 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h @@ -59,17 +59,20 @@ class ManifestIndexManager : public IncrementalIndexManager, protected: SuffixQueue suffix_queue_; - SuffixQueue::iterator next_reassembly_segment_; SuffixQueue::iterator next_to_retrieve_segment_; + utils::SuffixManifest suffix_manifest_; + utils::SuffixContent next_reassembly_segment_; + + // Holds segments that should not be requested. Useful when + // computing the next reassembly segment because some manifests + // may be incomplete. + std::vector<uint32_t> ignored_segments_; // Hash verification std::unordered_map<uint32_t, std::pair<std::vector<uint8_t>, core::HashAlgorithm>> suffix_hash_map_; - // Manifest Suffix - utils::SuffixManifest suffix_manifest_; - // (temporary) To call scheduleNextInterests() after receiving a manifest TransportProtocol *next_interest_; }; diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 21a59bc9a..a57eb7cd9 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -567,7 +567,8 @@ void RaaqmTransportProtocol::RAAQM() { // Change drop probability according to RTT statistics cur_path_->updateDropProb(); - if (std::rand() % 10000 <= cur_path_->getDropProb() * 10000) { + double coin = ((double) rand() / (RAND_MAX)); + if (coin <= cur_path_->getDropProb()) { decreaseWindow(); } } diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index b83aba20e..e859ca294 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -70,7 +70,7 @@ class BaseReassembly : public Reassembly { IndexVerificationManager *index_manager_; std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_; - uint64_t index_; + uint32_t index_; std::unique_ptr<utils::MemBuf> read_buffer_; }; diff --git a/libtransport/src/hicn/transport/utils/min_filter.h b/libtransport/src/hicn/transport/utils/min_filter.h index acb081edc..dcfd5652d 100644 --- a/libtransport/src/hicn/transport/utils/min_filter.h +++ b/libtransport/src/hicn/transport/utils/min_filter.h @@ -35,7 +35,7 @@ class MinFilter { template <typename R> TRANSPORT_ALWAYS_INLINE void pushBack(R&& value) { - if (by_arrival_.size() > size_) { + if (by_arrival_.size() >= size_) { by_order_.erase(by_arrival_.back()); by_arrival_.pop_back(); } |