From 3bce9bfdce707313de4f9cccdc867abd9edf82df Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 7 Feb 2020 20:00:06 +0100 Subject: [HICN-508] [HICN-509] [HICN-506] Manifest rework Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara --- libtransport/src/hicn/transport/protocols/raaqm.cc | 118 ++++++++++----------- 1 file changed, 59 insertions(+), 59 deletions(-) (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc') diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index a57eb7cd9..641ae45c3 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -14,8 +14,9 @@ */ #include -#include +#include #include +#include #include #include @@ -26,9 +27,8 @@ namespace protocol { using namespace interface; -RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), - BaseReassembly(icnet_socket, this, this), +RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), @@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() { // Set first segment to retrieve core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->reset(); index_manager_->setFirstSuffix(name->getSuffix()); std::queue empty; std::swap(interest_to_retransmit_, empty); - stats_.reset(); + stats_->reset(); // Reset reassembly component - BaseReassembly::reset(); + reassembly_protocol_->reInitialize(); // Reset protocol variables interests_in_flight_ = 0; @@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() { void RaaqmTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - // Check whether makes sense to continue if (TRANSPORT_EXPECT_FALSE(!is_running_)) { return; @@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject( (*callback_interest)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == - PayloadType::MANIFEST)) { - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - index_manager_ = manifest_index_manager_.get(); - interests_in_flight_--; - } - - index_manager_->onManifest(std::move(content_object)); - - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - onContentSegment(std::move(interest), std::move(content_object)); + if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + stats_->updateBytesRecv(content_object->payloadSize()); } + onContentSegment(std::move(interest), std::move(content_object)); scheduleNextInterests(); } void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { uint32_t incremental_suffix = content_object->getName().getSuffix(); - bool virtual_download = false; - socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download); // Decrease in-flight interests interests_in_flight_--; @@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment( afterContentReception(*interest, *content_object); } - if (index_manager_->onContentObject(*content_object)) { - stats_.updateBytesRecv(content_object->payloadSize()); + index_manager_->onContentObject(std::move(interest), + std::move(content_object)); +} - if (!virtual_download) { - reassemble(std::move(content_object)); - } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == - index_manager_->getFinalSuffix())) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); +void RaaqmTransportProtocol::onPacketDropped( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - if (on_payload) { - on_payload->readSuccess(stats_.getBytesRecv()); - } + uint64_t segment = interest->getName().getSuffix(); + ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_->updateRetxCount(1); + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); } + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); } else { - // TODO Application policy check - // unverified_segments_.emplace( - // std::make_pair(incremental_suffix, std::move(content_object))); - TRANSPORT_LOGE("Received not trusted segment."); + TRANSPORT_LOGE( + "Stop: received not trusted packet %llu times", + (unsigned long long)interest_retransmissions_[segment & mask]); + onContentReassembled( + make_error_code(protocol_error::max_retransmissions_error)); } } +void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { + +} + void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { checkForStalePaths(); @@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint64_t segment = n.getSuffix(); // Do not retransmit interests asking contents that do not exist. - if (segment >= index_manager_->getFinalSuffix()) { + if (segment > index_manager_->getFinalSuffix()) { return; } @@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { - stats_.updateRetxCount(1); + stats_->updateRetxCount(1); callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, @@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before " - "starting " - "the content retrieval."); - } - - if (!ec) { - on_payload->readSuccess(stats_.getBytesRecv()); - } else { - on_payload->readError(ec); - } - rate_estimator_->onDownloadFinished(); - stop(); + TransportProtocol::onContentReassembled(ec); } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { @@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() { // Change drop probability according to RTT statistics cur_path_->updateDropProb(); - double coin = ((double) rand() / (RAND_MAX)); + double coin = ((double)rand() / (RAND_MAX)); if (coin <= cur_path_->getDropProb()) { decreaseWindow(); } @@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() { void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, utils::TimePoint &now) { // Update RTT statistics - stats_.updateAverageRtt(rtt); - stats_.updateAverageWindowSize(current_window_size_); + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback ConsumerTimerCallback *stats_callback = VOID_HANDLER; @@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, timer_interval_milliseconds); if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_, stats_); + (*stats_callback)(*socket_, *stats_); t0_ = utils::SteadyClock::now(); } } -- cgit 1.2.3-korg