summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorOlivier Roques <oroques@cisco.com>2019-11-12 09:51:34 +0000
committerOlivier Roques <olvrqs@gmail.com>2019-11-14 10:48:26 +0000
commit92bc1a46f8e33cc664a95819fde45b9b5b4ac321 (patch)
tree3057a8a30788fb887cfa846a040a36edd2e79da7 /libtransport
parent34d819b0cf58fa3a96b5cae84b825b5a8a633cd4 (diff)
[HICN-393] Fix various issues related to manifests
The current manifest implementation is broken: 1. ManifestIndexingManager, responsible for validating manifests and segments and retrieving the next ones, assumes that all manifests have the same size. This assumption affects the retrieval of next manifests which is based on the number of segments the current manifest contains. Therefore when a non-full manifests arrives, the computed suffix of the next manifest is wrong and refer to a content instead, which results in an error. 2. Manifests are used to update a suffix queue which stores all the segments listed in manifests. This queue is used to retrieve content sequentially via a pointer indicating the next content to fetch. When the pointer reaches the end of the suffix queue, the consumer stops sending interests. The correct behavior would be to wait for a new manifest which would update the queue. This patch fixes these two issues: 1. Issue 1 was fixed by using SuffixManifest (HICN-392). This allows to set the capacity of a manifest at the start of the consumption instead of checking each time the size of the current manifest and then using that (non-constant) value to retrieve the next manifests. 2. Issue 2 was fixed by passing to ManifestIndexingManager a reference to an object capable of calling the scheduleNextInterest function, which is then called after a new manifest is retrieved to make sure interests for content kept being sent. This is not an optimal solution but rather a temporary one, until the retrieval of manifests is done at the transport level rather than in ManifestIndexingManager. This patch also changes the order of production: manifests are now sent before content. To do so, contents are added into a queue until the manifest is complete. Signed-off-by: Olivier Roques <olvrqs@gmail.com> Change-Id: I1a1bb92ca1cf2d3c745c1b65f6c7376f916c679b
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc19
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h3
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc27
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h11
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc2
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc5
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h3
7 files changed, 50 insertions, 20 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 5cf40cd1c..7d7f4a0fe 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -267,6 +267,12 @@ uint32_t ProducerSocket::produce(Name content_name,
identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
+ // Send content objects stored in the queue
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ content_queue_.pop();
+ }
+
// Create new manifest. The reference to the last manifest has been
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference
@@ -317,11 +323,14 @@ uint32_t ProducerSocket::produce(Name content_name,
using namespace std::chrono_literals;
utils::CryptoHash hash = content_object->computeDigest(hash_algo);
manifest->addSuffixHash(suffix_content.getSuffix(), hash);
- } else if (identity) {
- identity->getSigner().sign(*content_object);
+ content_queue_.push(content_object);
+ } else {
+ if (identity) {
+ identity->getSigner().sign(*content_object);
+ }
+ passContentObjectToCallbacks(content_object);
}
- passContentObjectToCallbacks(content_object);
suffix_content++;
}
@@ -337,6 +346,10 @@ uint32_t ProducerSocket::produce(Name content_name,
manifest->encode();
identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ content_queue_.pop();
+ }
}
if (on_content_produced_) {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 952587b70..1f1acd401 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -194,6 +194,9 @@ class ProducerSocket : public Socket<BasePortal>,
utils::SuffixManifest suffix_manifest_;
utils::SuffixContent suffix_content_;
+ // While manifests are being built, contents are stored in a queue
+ std::queue<std::shared_ptr<ContentObject>> content_queue_;
+
// callbacks
ProducerInterestCallback on_interest_input_;
ProducerInterestCallback on_interest_dropped_input_buffer_;
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
index cfefe46a1..9af6a5c3a 100644
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
@@ -26,12 +26,13 @@ namespace protocol {
using namespace interface;
ManifestIndexManager::ManifestIndexManager(
- interface::ConsumerSocket *icn_socket)
+ interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
: IncrementalIndexManager(icn_socket),
PacketManager<Interest>(1024),
next_reassembly_segment_(suffix_queue_.end()),
next_to_retrieve_segment_(suffix_queue_.end()),
- next_manifest_(0) {}
+ suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ next_interest_(next_interest) {}
bool ManifestIndexManager::onManifest(
core::ContentObject::Ptr &&content_object) {
@@ -84,10 +85,13 @@ bool ManifestIndexManager::onManifest(
if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
// Set the iterators to the beginning of the suffix queue
next_reassembly_segment_ = suffix_queue_.begin();
+ // Set number of segments in manifests assuming the first one is full
+ suffix_manifest_.setNbSegments(std::distance(_it, _end) - 1);
}
if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() ||
- next_manifest_ > final_suffix_)) {
+ suffix_manifest_.getSuffix() >
+ final_suffix_)) {
break;
}
@@ -101,8 +105,6 @@ bool ManifestIndexManager::onManifest(
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
// Number of segments in manifest
- std::size_t segments_in_manifest = std::distance(
- manifest->getSuffixList().begin(), manifest->getSuffixList().end());
std::size_t segment_count = 0;
// Manifest namespace
@@ -110,11 +112,11 @@ bool ManifestIndexManager::onManifest(
// Send as many manifest as required for filling window.
do {
- segment_count += segments_in_manifest;
- next_manifest_ += (uint32_t)segments_in_manifest;
+ segment_count += suffix_manifest_.getNbSegments();
+ suffix_manifest_++;
Interest::Ptr interest = getPacket();
- name.setSuffix(next_manifest_);
+ name.setSuffix(suffix_manifest_.getSuffix());
interest->setName(name);
uint32_t interest_lifetime;
@@ -131,7 +133,7 @@ bool ManifestIndexManager::onManifest(
std::bind(&ManifestIndexManager::onManifestTimeout, this,
std::placeholders::_1));
} while (segment_count < current_window_size &&
- next_manifest_ < final_suffix_);
+ suffix_manifest_.getSuffix() < final_suffix_);
break;
}
@@ -147,8 +149,12 @@ bool ManifestIndexManager::onManifest(
return manifest_verified;
}
-void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c) {
+void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
+ ContentObject::Ptr &&c) {
onManifest(std::move(c));
+ if (next_interest_) {
+ next_interest_->scheduleNextInterests();
+ }
}
void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
@@ -239,6 +245,7 @@ uint32_t ManifestIndexManager::getNextReassemblySegment() {
void ManifestIndexManager::reset() {
IncrementalIndexManager::reset();
+ suffix_manifest_.reset(0);
suffix_queue_.clear();
suffix_hash_map_.clear();
}
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
index 5e1baa09c..5f74ef0bf 100644
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
@@ -17,6 +17,7 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/utils/suffix_strategy.h>
#include <list>
@@ -32,7 +33,8 @@ class ManifestIndexManager : public IncrementalIndexManager,
using SuffixQueue = std::list<uint32_t>;
using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
- ManifestIndexManager(interface::ConsumerSocket *icn_socket);
+ ManifestIndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *next_interest);
virtual ~ManifestIndexManager() = default;
@@ -64,8 +66,11 @@ class ManifestIndexManager : public IncrementalIndexManager,
std::pair<std::vector<uint8_t>, core::HashAlgorithm>>
suffix_hash_map_;
- // Next Manifest
- std::uint32_t next_manifest_;
+ // Manifest Suffix
+ utils::SuffixManifest suffix_manifest_;
+
+ // (temporary) To call scheduleNextInterests() after receiving a manifest
+ TransportProtocol *next_interest_;
};
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 779f9a9a1..e8d0e659a 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -28,7 +28,7 @@ using namespace interface;
RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
: TransportProtocol(icnet_socket),
- BaseReassembly(icnet_socket, this),
+ BaseReassembly(icnet_socket, this, this),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index a2062df93..c45d876a0 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -24,12 +24,13 @@ namespace transport {
namespace protocol {
BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback)
+ ContentReassembledCallback *content_callback,
+ TransportProtocol *next_interest)
: reassembly_consumer_socket_(icn_socket),
incremental_index_manager_(
std::make_unique<IncrementalIndexManager>(icn_socket)),
manifest_index_manager_(
- std::make_unique<ManifestIndexManager>(icn_socket)),
+ std::make_unique<ManifestIndexManager>(icn_socket, next_interest)),
index_manager_(incremental_index_manager_.get()),
index_(0),
read_buffer_(nullptr) {
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index 79f0ea4d2..b83aba20e 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -49,7 +49,8 @@ class Reassembly {
class BaseReassembly : public Reassembly {
public:
BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback);
+ ContentReassembledCallback *content_callback,
+ TransportProtocol *next_interest);
protected:
virtual void reassemble(ContentObject::Ptr &&content_object) override;