aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc19
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h3
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc27
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h11
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc2
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc5
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h3
7 files changed, 50 insertions, 20 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 5cf40cd1c..7d7f4a0fe 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -267,6 +267,12 @@ uint32_t ProducerSocket::produce(Name content_name,
identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
+ // Send content objects stored in the queue
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ content_queue_.pop();
+ }
+
// Create new manifest. The reference to the last manifest has been
// acquired in the passContentObjectToCallbacks function, so we can
// safely release this reference
@@ -317,11 +323,14 @@ uint32_t ProducerSocket::produce(Name content_name,
using namespace std::chrono_literals;
utils::CryptoHash hash = content_object->computeDigest(hash_algo);
manifest->addSuffixHash(suffix_content.getSuffix(), hash);
- } else if (identity) {
- identity->getSigner().sign(*content_object);
+ content_queue_.push(content_object);
+ } else {
+ if (identity) {
+ identity->getSigner().sign(*content_object);
+ }
+ passContentObjectToCallbacks(content_object);
}
- passContentObjectToCallbacks(content_object);
suffix_content++;
}
@@ -337,6 +346,10 @@ uint32_t ProducerSocket::produce(Name content_name,
manifest->encode();
identity->getSigner().sign(*manifest);
passContentObjectToCallbacks(manifest);
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ content_queue_.pop();
+ }
}
if (on_content_produced_) {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 952587b70..1f1acd401 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -194,6 +194,9 @@ class ProducerSocket : public Socket<BasePortal>,
utils::SuffixManifest suffix_manifest_;
utils::SuffixContent suffix_content_;
+ // While manifests are being built, contents are stored in a queue
+ std::queue<std::shared_ptr<ContentObject>> content_queue_;
+
// callbacks
ProducerInterestCallback on_interest_input_;
ProducerInterestCallback on_interest_dropped_input_buffer_;
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
index cfefe46a1..9af6a5c3a 100644
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
@@ -26,12 +26,13 @@ namespace protocol {
using namespace interface;
ManifestIndexManager::ManifestIndexManager(
- interface::ConsumerSocket *icn_socket)
+ interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
: IncrementalIndexManager(icn_socket),
PacketManager<Interest>(1024),
next_reassembly_segment_(suffix_queue_.end()),
next_to_retrieve_segment_(suffix_queue_.end()),
- next_manifest_(0) {}
+ suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
+ next_interest_(next_interest) {}
bool ManifestIndexManager::onManifest(
core::ContentObject::Ptr &&content_object) {
@@ -84,10 +85,13 @@ bool ManifestIndexManager::onManifest(
if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
// Set the iterators to the beginning of the suffix queue
next_reassembly_segment_ = suffix_queue_.begin();
+ // Set number of segments in manifests assuming the first one is full
+ suffix_manifest_.setNbSegments(std::distance(_it, _end) - 1);
}
if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() ||
- next_manifest_ > final_suffix_)) {
+ suffix_manifest_.getSuffix() >
+ final_suffix_)) {
break;
}
@@ -101,8 +105,6 @@ bool ManifestIndexManager::onManifest(
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
// Number of segments in manifest
- std::size_t segments_in_manifest = std::distance(
- manifest->getSuffixList().begin(), manifest->getSuffixList().end());
std::size_t segment_count = 0;
// Manifest namespace
@@ -110,11 +112,11 @@ bool ManifestIndexManager::onManifest(
// Send as many manifest as required for filling window.
do {
- segment_count += segments_in_manifest;
- next_manifest_ += (uint32_t)segments_in_manifest;
+ segment_count += suffix_manifest_.getNbSegments();
+ suffix_manifest_++;
Interest::Ptr interest = getPacket();
- name.setSuffix(next_manifest_);
+ name.setSuffix(suffix_manifest_.getSuffix());
interest->setName(name);
uint32_t interest_lifetime;
@@ -131,7 +133,7 @@ bool ManifestIndexManager::onManifest(
std::bind(&ManifestIndexManager::onManifestTimeout, this,
std::placeholders::_1));
} while (segment_count < current_window_size &&
- next_manifest_ < final_suffix_);
+ suffix_manifest_.getSuffix() < final_suffix_);
break;
}
@@ -147,8 +149,12 @@ bool ManifestIndexManager::onManifest(
return manifest_verified;
}
-void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c) {
+void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
+ ContentObject::Ptr &&c) {
onManifest(std::move(c));
+ if (next_interest_) {
+ next_interest_->scheduleNextInterests();
+ }
}
void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
@@ -239,6 +245,7 @@ uint32_t ManifestIndexManager::getNextReassemblySegment() {
void ManifestIndexManager::reset() {
IncrementalIndexManager::reset();
+ suffix_manifest_.reset(0);
suffix_queue_.clear();
suffix_hash_map_.clear();
}
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
index 5e1baa09c..5f74ef0bf 100644
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
@@ -17,6 +17,7 @@
#include <hicn/transport/interfaces/socket.h>
#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/utils/suffix_strategy.h>
#include <list>
@@ -32,7 +33,8 @@ class ManifestIndexManager : public IncrementalIndexManager,
using SuffixQueue = std::list<uint32_t>;
using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
- ManifestIndexManager(interface::ConsumerSocket *icn_socket);
+ ManifestIndexManager(interface::ConsumerSocket *icn_socket,
+ TransportProtocol *next_interest);
virtual ~ManifestIndexManager() = default;
@@ -64,8 +66,11 @@ class ManifestIndexManager : public IncrementalIndexManager,
std::pair<std::vector<uint8_t>, core::HashAlgorithm>>
suffix_hash_map_;
- // Next Manifest
- std::uint32_t next_manifest_;
+ // Manifest Suffix
+ utils::SuffixManifest suffix_manifest_;
+
+ // (temporary) To call scheduleNextInterests() after receiving a manifest
+ TransportProtocol *next_interest_;
};
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 779f9a9a1..e8d0e659a 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -28,7 +28,7 @@ using namespace interface;
RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
: TransportProtocol(icnet_socket),
- BaseReassembly(icnet_socket, this),
+ BaseReassembly(icnet_socket, this, this),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index a2062df93..c45d876a0 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -24,12 +24,13 @@ namespace transport {
namespace protocol {
BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback)
+ ContentReassembledCallback *content_callback,
+ TransportProtocol *next_interest)
: reassembly_consumer_socket_(icn_socket),
incremental_index_manager_(
std::make_unique<IncrementalIndexManager>(icn_socket)),
manifest_index_manager_(
- std::make_unique<ManifestIndexManager>(icn_socket)),
+ std::make_unique<ManifestIndexManager>(icn_socket, next_interest)),
index_manager_(incremental_index_manager_.get()),
index_(0),
read_buffer_(nullptr) {
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index 79f0ea4d2..b83aba20e 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -49,7 +49,8 @@ class Reassembly {
class BaseReassembly : public Reassembly {
public:
BaseReassembly(interface::ConsumerSocket *icn_socket,
- ContentReassembledCallback *content_callback);
+ ContentReassembledCallback *content_callback,
+ TransportProtocol *next_interest);
protected:
virtual void reassemble(ContentObject::Ptr &&content_object) override;