diff options
7 files changed, 50 insertions, 20 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 5cf40cd1c..7d7f4a0fe 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -267,6 +267,12 @@ uint32_t ProducerSocket::produce(Name content_name, identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); + // Send content objects stored in the queue + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + content_queue_.pop(); + } + // Create new manifest. The reference to the last manifest has been // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference @@ -317,11 +323,14 @@ uint32_t ProducerSocket::produce(Name content_name, using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algo); manifest->addSuffixHash(suffix_content.getSuffix(), hash); - } else if (identity) { - identity->getSigner().sign(*content_object); + content_queue_.push(content_object); + } else { + if (identity) { + identity->getSigner().sign(*content_object); + } + passContentObjectToCallbacks(content_object); } - passContentObjectToCallbacks(content_object); suffix_content++; } @@ -337,6 +346,10 @@ uint32_t ProducerSocket::produce(Name content_name, manifest->encode(); identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); + while (!content_queue_.empty()) { + passContentObjectToCallbacks(content_queue_.front()); + content_queue_.pop(); + } } if (on_content_produced_) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 952587b70..1f1acd401 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -194,6 +194,9 @@ class ProducerSocket : public Socket<BasePortal>, utils::SuffixManifest suffix_manifest_; utils::SuffixContent suffix_content_; + // While manifests are being built, contents are stored in a queue + std::queue<std::shared_ptr<ContentObject>> content_queue_; + // callbacks ProducerInterestCallback on_interest_input_; ProducerInterestCallback on_interest_dropped_input_buffer_; diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc index cfefe46a1..9af6a5c3a 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc @@ -26,12 +26,13 @@ namespace protocol { using namespace interface; ManifestIndexManager::ManifestIndexManager( - interface::ConsumerSocket *icn_socket) + interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) : IncrementalIndexManager(icn_socket), PacketManager<Interest>(1024), next_reassembly_segment_(suffix_queue_.end()), next_to_retrieve_segment_(suffix_queue_.end()), - next_manifest_(0) {} + suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + next_interest_(next_interest) {} bool ManifestIndexManager::onManifest( core::ContentObject::Ptr &&content_object) { @@ -84,10 +85,13 @@ bool ManifestIndexManager::onManifest( if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { // Set the iterators to the beginning of the suffix queue next_reassembly_segment_ = suffix_queue_.begin(); + // Set number of segments in manifests assuming the first one is full + suffix_manifest_.setNbSegments(std::distance(_it, _end) - 1); } if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() || - next_manifest_ > final_suffix_)) { + suffix_manifest_.getSuffix() > + final_suffix_)) { break; } @@ -101,8 +105,6 @@ bool ManifestIndexManager::onManifest( socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); // Number of segments in manifest - std::size_t segments_in_manifest = std::distance( - manifest->getSuffixList().begin(), manifest->getSuffixList().end()); std::size_t segment_count = 0; // Manifest namespace @@ -110,11 +112,11 @@ bool ManifestIndexManager::onManifest( // Send as many manifest as required for filling window. do { - segment_count += segments_in_manifest; - next_manifest_ += (uint32_t)segments_in_manifest; + segment_count += suffix_manifest_.getNbSegments(); + suffix_manifest_++; Interest::Ptr interest = getPacket(); - name.setSuffix(next_manifest_); + name.setSuffix(suffix_manifest_.getSuffix()); interest->setName(name); uint32_t interest_lifetime; @@ -131,7 +133,7 @@ bool ManifestIndexManager::onManifest( std::bind(&ManifestIndexManager::onManifestTimeout, this, std::placeholders::_1)); } while (segment_count < current_window_size && - next_manifest_ < final_suffix_); + suffix_manifest_.getSuffix() < final_suffix_); break; } @@ -147,8 +149,12 @@ bool ManifestIndexManager::onManifest( return manifest_verified; } -void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c) { +void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, + ContentObject::Ptr &&c) { onManifest(std::move(c)); + if (next_interest_) { + next_interest_->scheduleNextInterests(); + } } void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { @@ -239,6 +245,7 @@ uint32_t ManifestIndexManager::getNextReassemblySegment() { void ManifestIndexManager::reset() { IncrementalIndexManager::reset(); + suffix_manifest_.reset(0); suffix_queue_.clear(); suffix_hash_map_.clear(); } diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h index 5e1baa09c..5f74ef0bf 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h @@ -17,6 +17,7 @@ #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/protocols/indexing_manager.h> +#include <hicn/transport/utils/suffix_strategy.h> #include <list> @@ -32,7 +33,8 @@ class ManifestIndexManager : public IncrementalIndexManager, using SuffixQueue = std::list<uint32_t>; using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; - ManifestIndexManager(interface::ConsumerSocket *icn_socket); + ManifestIndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *next_interest); virtual ~ManifestIndexManager() = default; @@ -64,8 +66,11 @@ class ManifestIndexManager : public IncrementalIndexManager, std::pair<std::vector<uint8_t>, core::HashAlgorithm>> suffix_hash_map_; - // Next Manifest - std::uint32_t next_manifest_; + // Manifest Suffix + utils::SuffixManifest suffix_manifest_; + + // (temporary) To call scheduleNextInterests() after receiving a manifest + TransportProtocol *next_interest_; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 779f9a9a1..e8d0e659a 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -28,7 +28,7 @@ using namespace interface; RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) : TransportProtocol(icnet_socket), - BaseReassembly(icnet_socket, this), + BaseReassembly(icnet_socket, this, this), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index a2062df93..c45d876a0 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -24,12 +24,13 @@ namespace transport { namespace protocol { BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback) + ContentReassembledCallback *content_callback, + TransportProtocol *next_interest) : reassembly_consumer_socket_(icn_socket), incremental_index_manager_( std::make_unique<IncrementalIndexManager>(icn_socket)), manifest_index_manager_( - std::make_unique<ManifestIndexManager>(icn_socket)), + std::make_unique<ManifestIndexManager>(icn_socket, next_interest)), index_manager_(incremental_index_manager_.get()), index_(0), read_buffer_(nullptr) { diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index 79f0ea4d2..b83aba20e 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -49,7 +49,8 @@ class Reassembly { class BaseReassembly : public Reassembly { public: BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback); + ContentReassembledCallback *content_callback, + TransportProtocol *next_interest); protected: virtual void reassemble(ContentObject::Ptr &&content_object) override; |