diff options
author | Olivier Roques <oroques@cisco.com> | 2019-11-12 09:51:34 +0000 |
---|---|---|
committer | Olivier Roques <olvrqs@gmail.com> | 2019-11-14 10:48:26 +0000 |
commit | 92bc1a46f8e33cc664a95819fde45b9b5b4ac321 (patch) | |
tree | 3057a8a30788fb887cfa846a040a36edd2e79da7 /libtransport | |
parent | 34d819b0cf58fa3a96b5cae84b825b5a8a633cd4 (diff) |
[HICN-393] Fix various issues related to manifests
The current manifest implementation is broken:
1. ManifestIndexingManager, responsible for validating manifests and
segments and retrieving the next ones, assumes that all manifests
have the same size. This assumption affects the retrieval of next
manifests which is based on the number of segments the current
manifest contains. Therefore when a non-full manifests arrives,
the computed suffix of the next manifest is wrong and refer to a
content instead, which results in an error.
2. Manifests are used to update a suffix queue which stores all
the segments listed in manifests. This queue is used to retrieve
content sequentially via a pointer indicating the next content to
fetch. When the pointer reaches the end of the suffix queue, the
consumer stops sending interests. The correct behavior would be to
wait for a new manifest which would update the queue.
This patch fixes these two issues:
1. Issue 1 was fixed by using SuffixManifest (HICN-392). This allows
to set the capacity of a manifest at the start of the consumption
instead of checking each time the size of the current manifest and
then using that (non-constant) value to retrieve the next manifests.
2. Issue 2 was fixed by passing to ManifestIndexingManager a reference
to an object capable of calling the scheduleNextInterest function,
which is then called after a new manifest is retrieved to make sure
interests for content kept being sent. This is not an optimal solution
but rather a temporary one, until the retrieval of manifests is done
at the transport level rather than in ManifestIndexingManager.
This patch also changes the order of production: manifests are now
sent before content. To do so, contents are added into a queue until
the manifest is complete.
Signed-off-by: Olivier Roques <olvrqs@gmail.com>
Change-Id: I1a1bb92ca1cf2d3c745c1b65f6c7376f916c679b
Diffstat (limited to 'libtransport')
7 files changed, 50 insertions, 20 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 5cf40cd1c..7d7f4a0fe 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -267,6 +267,12 @@ uint32_t ProducerSocket::produce(Name content_name, identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + content_queue_.pop(); + } + // Create new manifest. The reference to the last manifest has been // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference @@ -317,11 +323,14 @@ uint32_t ProducerSocket::produce(Name content_name, using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algo); manifest->addSuffixHash(suffix_content.getSuffix(), hash); - } else if (identity) { - identity->getSigner().sign(*content_object); + content_queue_.push(content_object); + } else { + if (identity) { + identity->getSigner().sign(*content_object); + } + passContentObjectToCallbacks(content_object); } - passContentObjectToCallbacks(content_object); suffix_content++; } @@ -337,6 +346,10 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->encode(); identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + content_queue_.pop(); + } } if (on_content_produced_) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 952587b70..1f1acd401 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -194,6 +194,9 @@ class ProducerSocket : public Socket<BasePortal>, utils::SuffixManifest suffix_manifest_; utils::SuffixContent suffix_content_; + // While manifests are being built, contents are stored in a queue + std::queue<std::shared_ptr<ContentObject>> content_queue_; + // callbacks ProducerInterestCallback on_interest_input_; ProducerInterestCallback on_interest_dropped_input_buffer_; diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc index cfefe46a1..9af6a5c3a 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc @@ -26,12 +26,13 @@ namespace protocol { using namespace interface; ManifestIndexManager::ManifestIndexManager( - interface::ConsumerSocket *icn_socket) + interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) : IncrementalIndexManager(icn_socket), PacketManager<Interest>(1024), next_reassembly_segment_(suffix_queue_.end()), next_to_retrieve_segment_(suffix_queue_.end()), - next_manifest_(0) {} + suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + next_interest_(next_interest) {} bool ManifestIndexManager::onManifest( core::ContentObject::Ptr &&content_object) { @@ -84,10 +85,13 @@ bool ManifestIndexManager::onManifest( if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { // Set the iterators to the beginning of the suffix queue next_reassembly_segment_ = suffix_queue_.begin(); + // Set number of segments in manifests assuming the first one is full + suffix_manifest_.setNbSegments(std::distance(_it, _end) - 1); } if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() || - next_manifest_ > final_suffix_)) { + suffix_manifest_.getSuffix() > + final_suffix_)) { break; } @@ -101,8 +105,6 @@ bool ManifestIndexManager::onManifest( socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); // Number of segments in manifest - std::size_t segments_in_manifest = std::distance( - manifest->getSuffixList().begin(), manifest->getSuffixList().end()); std::size_t segment_count = 0; // Manifest namespace @@ -110,11 +112,11 @@ bool ManifestIndexManager::onManifest( // Send as many manifest as required for filling window. do { - segment_count += segments_in_manifest; - next_manifest_ += (uint32_t)segments_in_manifest; + segment_count += suffix_manifest_.getNbSegments(); + suffix_manifest_++; Interest::Ptr interest = getPacket(); - name.setSuffix(next_manifest_); + name.setSuffix(suffix_manifest_.getSuffix()); interest->setName(name); uint32_t interest_lifetime; @@ -131,7 +133,7 @@ bool ManifestIndexManager::onManifest( std::bind(&ManifestIndexManager::onManifestTimeout, this, std::placeholders::_1)); } while (segment_count < current_window_size && - next_manifest_ < final_suffix_); + suffix_manifest_.getSuffix() < final_suffix_); break; } @@ -147,8 +149,12 @@ bool ManifestIndexManager::onManifest( return manifest_verified; } -void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c) { +void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, + ContentObject::Ptr &&c) { onManifest(std::move(c)); + if (next_interest_) { + next_interest_->scheduleNextInterests(); + } } void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { @@ -239,6 +245,7 @@ uint32_t ManifestIndexManager::getNextReassemblySegment() { void ManifestIndexManager::reset() { IncrementalIndexManager::reset(); + suffix_manifest_.reset(0); suffix_queue_.clear(); suffix_hash_map_.clear(); } diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h index 5e1baa09c..5f74ef0bf 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h @@ -17,6 +17,7 @@ #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/protocols/indexing_manager.h> +#include <hicn/transport/utils/suffix_strategy.h> #include <list> @@ -32,7 +33,8 @@ class ManifestIndexManager : public IncrementalIndexManager, using SuffixQueue = std::list<uint32_t>; using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; - ManifestIndexManager(interface::ConsumerSocket *icn_socket); + ManifestIndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *next_interest); virtual ~ManifestIndexManager() = default; @@ -64,8 +66,11 @@ class ManifestIndexManager : public IncrementalIndexManager, std::pair<std::vector<uint8_t>, core::HashAlgorithm>> suffix_hash_map_; - // Next Manifest - std::uint32_t next_manifest_; + // Manifest Suffix + utils::SuffixManifest suffix_manifest_; + + // (temporary) To call scheduleNextInterests() after receiving a manifest + TransportProtocol *next_interest_; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 779f9a9a1..e8d0e659a 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -28,7 +28,7 @@ using namespace interface; RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) : TransportProtocol(icnet_socket), - BaseReassembly(icnet_socket, this), + BaseReassembly(icnet_socket, this, this), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index a2062df93..c45d876a0 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -24,12 +24,13 @@ namespace transport { namespace protocol { BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback) + ContentReassembledCallback *content_callback, + TransportProtocol *next_interest) : reassembly_consumer_socket_(icn_socket), incremental_index_manager_( std::make_unique<IncrementalIndexManager>(icn_socket)), manifest_index_manager_( - std::make_unique<ManifestIndexManager>(icn_socket)), + std::make_unique<ManifestIndexManager>(icn_socket, next_interest)), index_manager_(incremental_index_manager_.get()), index_(0), read_buffer_(nullptr) { diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index 79f0ea4d2..b83aba20e 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -49,7 +49,8 @@ class Reassembly { class BaseReassembly : public Reassembly { public: BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback); + ContentReassembledCallback *content_callback, + TransportProtocol *next_interest); protected: virtual void reassemble(ContentObject::Ptr &&content_object) override; |