aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc118
1 files changed, 59 insertions, 59 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index a57eb7cd9..641ae45c3 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -14,8 +14,9 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
+#include <hicn/transport/protocols/indexer.h>
#include <hicn/transport/protocols/raaqm.h>
+#include <hicn/transport/protocols/errors.h>
#include <cstdlib>
#include <fstream>
@@ -26,9 +27,8 @@ namespace protocol {
using namespace interface;
-RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
- BaseReassembly(icnet_socket, this, this),
+RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
@@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() {
// Set first segment to retrieve
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ index_manager_->reset();
index_manager_->setFirstSuffix(name->getSuffix());
std::queue<Interest::Ptr> empty;
std::swap(interest_to_retransmit_, empty);
- stats_.reset();
+ stats_->reset();
// Reset reassembly component
- BaseReassembly::reset();
+ reassembly_protocol_->reInitialize();
// Reset protocol variables
interests_in_flight_ = 0;
@@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() {
void RaaqmTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
-
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject(
(*callback_interest)(*socket_, *interest);
}
- if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
- PayloadType::MANIFEST)) {
- if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
- index_manager_ = manifest_index_manager_.get();
- interests_in_flight_--;
- }
-
- index_manager_->onManifest(std::move(content_object));
-
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- onContentSegment(std::move(interest), std::move(content_object));
+ if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ stats_->updateBytesRecv(content_object->payloadSize());
}
+ onContentSegment(std::move(interest), std::move(content_object));
scheduleNextInterests();
}
void RaaqmTransportProtocol::onContentSegment(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
uint32_t incremental_suffix = content_object->getName().getSuffix();
- bool virtual_download = false;
- socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download);
// Decrease in-flight interests
interests_in_flight_--;
@@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment(
afterContentReception(*interest, *content_object);
}
- if (index_manager_->onContentObject(*content_object)) {
- stats_.updateBytesRecv(content_object->payloadSize());
+ index_manager_->onContentObject(std::move(interest),
+ std::move(content_object));
+}
- if (!virtual_download) {
- reassemble(std::move(content_object));
- } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
- index_manager_->getFinalSuffix())) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
+void RaaqmTransportProtocol::onPacketDropped(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- if (on_payload) {
- on_payload->readSuccess(stats_.getBytesRecv());
- }
+ uint64_t segment = interest->getName().getSuffix();
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_->updateRetxCount(1);
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
}
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
} else {
- // TODO Application policy check
- // unverified_segments_.emplace(
- // std::make_pair(incremental_suffix, std::move(content_object)));
- TRANSPORT_LOGE("Received not trusted segment.");
+ TRANSPORT_LOGE(
+ "Stop: received not trusted packet %llu times",
+ (unsigned long long)interest_retransmissions_[segment & mask]);
+ onContentReassembled(
+ make_error_code(protocol_error::max_retransmissions_error));
}
}
+void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
+
+}
+
void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
checkForStalePaths();
@@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
uint64_t segment = n.getSuffix();
// Do not retransmit interests asking contents that do not exist.
- if (segment >= index_manager_->getFinalSuffix()) {
+ if (segment > index_manager_->getFinalSuffix()) {
return;
}
@@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
- stats_.updateRetxCount(1);
+ stats_->updateRetxCount(1);
callback = VOID_HANDLER;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
@@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before "
- "starting "
- "the content retrieval.");
- }
-
- if (!ec) {
- on_payload->readSuccess(stats_.getBytesRecv());
- } else {
- on_payload->readError(ec);
- }
-
rate_estimator_->onDownloadFinished();
- stop();
+ TransportProtocol::onContentReassembled(ec);
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
@@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() {
// Change drop probability according to RTT statistics
cur_path_->updateDropProb();
- double coin = ((double) rand() / (RAND_MAX));
+ double coin = ((double)rand() / (RAND_MAX));
if (coin <= cur_path_->getDropProb()) {
decreaseWindow();
}
@@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() {
void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
utils::TimePoint &now) {
// Update RTT statistics
- stats_.updateAverageRtt(rtt);
- stats_.updateAverageWindowSize(current_window_size_);
+ stats_->updateAverageRtt(rtt);
+ stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
ConsumerTimerCallback *stats_callback = VOID_HANDLER;
@@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_, stats_);
+ (*stats_callback)(*socket_, *stats_);
t0_ = utils::SteadyClock::now();
}
}