diff options
Diffstat (limited to 'libtransport/src/protocols/raaqm.cc')
-rw-r--r-- | libtransport/src/protocols/raaqm.cc | 243 |
1 files changed, 99 insertions, 144 deletions
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 5023adf2e..bcbc15aef 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,10 +13,11 @@ * limitations under the License. */ +#include <hicn/transport/core/global_object_pool.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <implementation/socket_consumer.h> #include <protocols/errors.h> -#include <protocols/indexer.h> +#include <protocols/index_manager_bytestream.h> #include <protocols/raaqm.h> #include <cstdlib> @@ -30,12 +31,14 @@ using namespace interface; RaaqmTransportProtocol::RaaqmTransportProtocol( implementation::ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), + : TransportProtocol(icn_socket, new IndexManager(icn_socket, this), + new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), - t0_(utils::SteadyClock::now()), + t0_(utils::SteadyTime::Clock::now()), rate_estimator_(nullptr), + dis_(0, 1.0), schedule_interests_(true) { init(); } @@ -46,11 +49,34 @@ RaaqmTransportProtocol::~RaaqmTransportProtocol() { } } -int RaaqmTransportProtocol::start() { +void RaaqmTransportProtocol::reset() { + // Set first segment to retrieve + TransportProtocol::reset(); + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + indexer_verifier_->setFirstSuffix(name->getSuffix()); + std::queue<uint32_t> empty; + std::swap(interest_to_retransmit_, empty); + stats_->reset(); + + // Reset protocol variables + interests_in_flight_ = 0; + t0_ = utils::SteadyTime::Clock::now(); + + // Optionally reset congestion window + bool reset_window; + socket_->getSocketOption(RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET, + reset_window); + if (reset_window) { + current_window_size_ = 1; + } + + // Reset rate estimator if (rate_estimator_) { rate_estimator_->onStart(); } + // If not cur_path exists, create one if (!cur_path_) { // RAAQM double drop_factor; @@ -93,41 +119,6 @@ int RaaqmTransportProtocol::start() { cur_path_ = cur_path.get(); path_table_[default_values::path_id] = std::move(cur_path); } - - portal_->setConsumerCallback(this); - return TransportProtocol::start(); -} - -void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } - -void RaaqmTransportProtocol::reset() { - // Set first segment to retrieve - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - index_manager_->reset(); - index_manager_->setFirstSuffix(name->getSuffix()); - std::queue<Interest::Ptr> empty; - std::swap(interest_to_retransmit_, empty); - stats_->reset(); - - // Reset reassembly component - reassembly_protocol_->reInitialize(); - - // Reset protocol variables - interests_in_flight_ = 0; - t0_ = utils::SteadyClock::now(); - - // Optionally reset congestion window - bool reset_window; - socket_->getSocketOption(RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET, - reset_window); - if (reset_window) { - current_window_size_ = 1; - } -} - -bool RaaqmTransportProtocol::verifyKeyPackets() { - return index_manager_->onKeyToVerify(); } void RaaqmTransportProtocol::increaseWindow() { @@ -197,9 +188,8 @@ void RaaqmTransportProtocol::init() { lte_delay_ = 15000; if (!is) { - TRANSPORT_LOGW( - "WARNING: RAAQM parameters not found at %s, set default values", - RAAQM_CONFIG_PATH); + LOG(WARNING) << "RAAQM parameters not found at " << RAAQM_CONFIG_PATH + << ", set default values"; return; } @@ -325,75 +315,66 @@ void RaaqmTransportProtocol::init() { is.close(); } -void RaaqmTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // Check whether makes sense to continue - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } +void RaaqmTransportProtocol::onContentObjectReceived( + Interest &interest, ContentObject &content_object, std::error_code &ec) { + uint32_t incremental_suffix = content_object.getName().getSuffix(); // Call application-defined callbacks if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), *content_object); + (*on_content_object_input_)(*socket_->getInterface(), content_object); } if (*on_interest_satisfied_) { - (*on_interest_satisfied_)(*socket_->getInterface(), *interest); - } - - if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - stats_->updateBytesRecv(content_object->payloadSize()); + (*on_interest_satisfied_)(*socket_->getInterface(), interest); } - onContentSegment(std::move(interest), std::move(content_object)); - scheduleNextInterests(); -} + ec = make_error_code(protocol_error::success); -void RaaqmTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); + if (content_object.getPayloadType() == PayloadType::DATA) { + stats_->updateBytesRecv(content_object.payloadSize()); + } // Decrease in-flight interests interests_in_flight_--; // Update stats if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); + afterContentReception(interest, content_object); } - index_manager_->onContentObject(std::move(interest), - std::move(content_object)); + // Schedule next interests + scheduleNextInterests(); } -void RaaqmTransportProtocol::onPacketDropped( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { +void RaaqmTransportProtocol::onPacketDropped(Interest &interest, + ContentObject &content_object, + const std::error_code &reason) { uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - uint64_t segment = interest->getName().getSuffix(); + uint64_t segment = interest.getName().getSuffix(); if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { stats_->updateRetxCount(1); if (*on_interest_retransmission_) { - (*on_interest_retransmission_)(*socket_->getInterface(), *interest); + (*on_interest_retransmission_)(*socket_->getInterface(), interest); } if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); + (*on_interest_output_)(*socket_->getInterface(), interest); } - if (!is_running_) { + if (!isRunning()) { return; } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); + interest_to_retransmit_.push((unsigned int)segment); } else { - TRANSPORT_LOGE( - "Stop: received not trusted packet %llu times", - (unsigned long long)interest_retransmissions_[segment & mask]); + LOG(ERROR) << "Stop: received not trusted packet " + << interest_retransmissions_[segment & mask] << " times"; onContentReassembled( make_error_code(protocol_error::max_retransmissions_error)); } @@ -403,23 +384,27 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { } -void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - checkForStalePaths(); - - const Name &n = interest->getName(); - - TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); +void RaaqmTransportProtocol::sendInterest( + const Name &interest_name, + std::array<uint32_t, MAX_AGGREGATED_INTEREST> *additional_suffixes, + uint32_t len) { + interests_in_flight_++; + interest_retransmissions_[interest_name.getSuffix() & mask]++; + interest_timepoints_[interest_name.getSuffix() & mask] = + utils::SteadyTime::Clock::now(); + TransportProtocol::sendInterest(interest_name, additional_suffixes, len); +} - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } +void RaaqmTransportProtocol::onInterestTimeout(Interest::Ptr &interest, + const Name &n) { + checkForStalePaths(); interests_in_flight_--; uint64_t segment = n.getSuffix(); // Do not retransmit interests asking contents that do not exist. - if (segment > index_manager_->getFinalSuffix()) { + if (segment > indexer_verifier_->getFinalSuffix()) { return; } @@ -440,32 +425,34 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - if (!is_running_) { + if (!isRunning()) { return; } - interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); - + interest_to_retransmit_.push((unsigned int)segment); scheduleNextInterests(); } else { - TRANSPORT_LOGE("Stop: reached max retx limit."); + LOG(ERROR) << "Stop: reached max retx limit."; onContentReassembled(std::make_error_code(std::errc(std::errc::io_error))); } } void RaaqmTransportProtocol::scheduleNextInterests() { - bool cancel = (!is_running_ && !is_first_) || !schedule_interests_; + bool cancel = (!isRunning() && !is_first_) || !schedule_interests_; if (TRANSPORT_EXPECT_FALSE(cancel)) { schedule_interests_ = true; return; } + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ && interest_to_retransmit_.size() > 0)) { // send at least one interest if there are retransmissions to perform and // there is no space left in the window - sendInterest(std::move(interest_to_retransmit_.front())); + auto suffix = interest_to_retransmit_.front(); + sendInterest(name->setSuffix(suffix)); interest_to_retransmit_.pop(); } @@ -474,58 +461,26 @@ void RaaqmTransportProtocol::scheduleNextInterests() { // Send the interest needed for filling the window while (interests_in_flight_ < current_window_size_) { if (interest_to_retransmit_.size() > 0) { - sendInterest(std::move(interest_to_retransmit_.front())); + auto suffix = interest_to_retransmit_.front(); + sendInterest(name->setSuffix(suffix)); interest_to_retransmit_.pop(); } else { - index = index_manager_->getNextSuffix(); + if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) { + break; + } + + index = indexer_verifier_->getNextSuffix(); if (index == IndexManager::invalid_index) { break; } - sendInterest(index); + interest_retransmissions_[index & mask] = ~0; + sendInterest(name->setSuffix(index)); } } } -bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getPacket(); - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - name->setSuffix((uint32_t)next_suffix); - interest->setName(*name); - - uint32_t interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - interest->setLifetime(interest_lifetime); - - if (*on_interest_output_) { - on_interest_output_->operator()(*socket_->getInterface(), *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return false; - } - - // This is set to ~0 so that the next interest_retransmissions_ + 1, - // performed by sendInterest, will result in 0 - interest_retransmissions_[next_suffix & mask] = ~0; - interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); - - sendInterest(std::move(interest)); - - return true; -} - -void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { - interests_in_flight_++; - interest_retransmissions_[interest->getName().getSuffix() & mask]++; - - TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str()); - portal_->sendInterest(std::move(interest)); -} - -void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { +void RaaqmTransportProtocol::onContentReassembled(const std::error_code &ec) { rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); schedule_interests_ = false; @@ -535,18 +490,18 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { throw std::runtime_error("RAAQM ERROR: no current path found, exit"); } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( - now - interest_timepoints_[segment & mask]); + auto now = utils::SteadyTime::Clock::now(); + auto rtt = utils::SteadyTime::getDurationUs( + interest_timepoints_[segment & mask], now); // Update stats - updateStats((uint32_t)segment, rtt.count(), now); + updateStats((uint32_t)segment, rtt, now); if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); + rate_estimator_->onRttUpdate(rtt); } - cur_path_->insertNewRtt(rtt.count(), now); + cur_path_->insertNewRtt(rtt, now); cur_path_->smoothTimer(); if (cur_path_->newPropagationDelayAvailable()) { @@ -558,27 +513,27 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { void RaaqmTransportProtocol::RAAQM() { if (!cur_path_) { throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); } else { // Change drop probability according to RTT statistics cur_path_->updateDropProb(); - double coin = ((double)rand() / (RAND_MAX)); + double coin = dis_(gen_); if (coin <= cur_path_->getDropProb()) { decreaseWindow(); } } } -void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { +void RaaqmTransportProtocol::updateStats( + uint32_t suffix, const utils::SteadyTime::Microseconds &rtt, + utils::SteadyTime::TimePoint &now) { // Update RTT statistics stats_->updateAverageRtt(rtt); stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback if (*stats_summary_) { - auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); + auto dt = utils::SteadyTime::getDurationMs(t0_, now); uint32_t timer_interval_milliseconds = 0; socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, |