From 229385955109b866a23c4ac2aa03d4d11044c39d Mon Sep 17 00:00:00 2001 From: "Enrico Loparco (eloparco)" Date: Thu, 24 Jun 2021 09:15:41 +0200 Subject: [HICN-708] Rebase with master Signed-off-by: Enrico Loparco (eloparco) Change-Id: I2122e1d61dd3b2e039972624ffbdbcb3c5610159 --- libtransport/src/protocols/raaqm.cc | 82 ++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 41 deletions(-) (limited to 'libtransport/src/protocols/raaqm.cc') diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 0a93dec44..bc8500227 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -13,8 +13,8 @@ * limitations under the License. */ +#include #include - #include #include #include @@ -36,7 +36,8 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( interests_in_flight_(0), cur_path_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { + rate_estimator_(nullptr), + schedule_interests_(true) { init(); } @@ -116,10 +117,14 @@ void RaaqmTransportProtocol::reset() { // Reset protocol variables interests_in_flight_ = 0; t0_ = utils::SteadyClock::now(); -} -bool RaaqmTransportProtocol::verifyKeyPackets() { - return index_manager_->onKeyToVerify(); + // Optionally reset congestion window + bool reset_window; + socket_->getSocketOption(RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET, + reset_window); + if (reset_window) { + current_window_size_ = 1; + } } void RaaqmTransportProtocol::increaseWindow() { @@ -317,8 +322,8 @@ void RaaqmTransportProtocol::init() { is.close(); } -void RaaqmTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { +void RaaqmTransportProtocol::onContentObject(Interest &interest, + ContentObject &content_object) { // Check whether makes sense to continue if (TRANSPORT_EXPECT_FALSE(!is_running_)) { return; @@ -326,54 +331,53 @@ void RaaqmTransportProtocol::onContentObject( // Call application-defined callbacks if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), *content_object); + (*on_content_object_input_)(*socket_->getInterface(), content_object); } if (*on_interest_satisfied_) { - (*on_interest_satisfied_)(*socket_->getInterface(), *interest); + (*on_interest_satisfied_)(*socket_->getInterface(), interest); } - if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - stats_->updateBytesRecv(content_object->payloadSize()); + if (content_object.getPayloadType() == PayloadType::DATA) { + stats_->updateBytesRecv(content_object.payloadSize()); } - onContentSegment(std::move(interest), std::move(content_object)); + onContentSegment(interest, content_object); scheduleNextInterests(); } -void RaaqmTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); +void RaaqmTransportProtocol::onContentSegment(Interest &interest, + ContentObject &content_object) { + uint32_t incremental_suffix = content_object.getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; // Update stats if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); + afterContentReception(interest, content_object); } - index_manager_->onContentObject(std::move(interest), - std::move(content_object)); + index_manager_->onContentObject(interest, content_object); } -void RaaqmTransportProtocol::onPacketDropped( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { +void RaaqmTransportProtocol::onPacketDropped(Interest &interest, + ContentObject &content_object) { uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - uint64_t segment = interest->getName().getSuffix(); + uint64_t segment = interest.getName().getSuffix(); if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { stats_->updateRetxCount(1); if (*on_interest_retransmission_) { - (*on_interest_retransmission_)(*socket_->getInterface(), *interest); + (*on_interest_retransmission_)(*socket_->getInterface(), interest); } if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); + (*on_interest_output_)(*socket_->getInterface(), interest); } if (!is_running_) { @@ -381,7 +385,7 @@ void RaaqmTransportProtocol::onPacketDropped( } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); + interest_to_retransmit_.push(interest.shared_from_this()); } else { TRANSPORT_LOGE( "Stop: received not trusted packet %llu times", @@ -432,10 +436,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); - } - if (!is_running_) { return; } @@ -451,7 +451,9 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + bool cancel = (!is_running_ && !is_first_) || !schedule_interests_; + if (TRANSPORT_EXPECT_FALSE(cancel)) { + schedule_interests_ = true; return; } @@ -460,7 +462,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() { // send at least one interest if there are retransmissions to perform and // there is no space left in the window sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); interest_to_retransmit_.pop(); } @@ -470,22 +471,25 @@ void RaaqmTransportProtocol::scheduleNextInterests() { while (interests_in_flight_ < current_window_size_) { if (interest_to_retransmit_.size() > 0) { sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); interest_to_retransmit_.pop(); } else { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + TRANSPORT_LOGI("Adios"); + break; + } + index = index_manager_->getNextSuffix(); if (index == IndexManager::invalid_index) { break; } sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); } } } -bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getPacket(); +void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = core::PacketManager<>::getInstance().getPacket(); core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); name->setSuffix((uint32_t)next_suffix); @@ -499,30 +503,26 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { if (*on_interest_output_) { on_interest_output_->operator()(*socket_->getInterface(), *interest); } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return false; - } - // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); - sendInterest(std::move(interest)); - return true; + sendInterest(std::move(interest)); } void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { interests_in_flight_++; interest_retransmissions_[interest->getName().getSuffix() & mask]++; + TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str()); portal_->sendInterest(std::move(interest)); } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); + schedule_interests_ = false; } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { -- cgit 1.2.3-korg