diff options
Diffstat (limited to 'libtransport/src/protocols/raaqm.cc')
-rw-r--r-- | libtransport/src/protocols/raaqm.cc | 289 |
1 files changed, 247 insertions, 42 deletions
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index cdf878328..783d6194b 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -32,36 +32,39 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), - raaqm_algorithm_(nullptr), + interests_in_flight_(0), + cur_path_(nullptr), t0_(utils::SteadyClock::now()), + rate_estimator_(nullptr), schedule_interests_(true) { init(); } -RaaqmTransportProtocol::~RaaqmTransportProtocol() {} +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (rate_estimator_) { + delete rate_estimator_; + } +} int RaaqmTransportProtocol::start() { - if (!raaqm_algorithm_) { + if (rate_estimator_) { + rate_estimator_->onStart(); + } + + if (!cur_path_) { // RAAQM double drop_factor; double minimum_drop_probability; uint32_t sample_number; uint32_t interest_lifetime; - double gamma = 0., max_window = 0., min_window = 0., beta; socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, minimum_drop_probability); socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, sample_number); socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interest_lifetime); - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window); - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window); // Rate Estimation double alpha = 0.0; @@ -75,19 +78,20 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - rate_estimator_ = std::make_unique<ALaTcpEstimator>(); + rate_estimator_ = new ALaTcpEstimator(); } else { - rate_estimator_ = - std::make_unique<SimpleEstimator>(alpha, batching_param); + rate_estimator_ = new SimpleEstimator(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, &rate_estimator_->observer_); - raaqm_algorithm_ = std::make_unique<RaaqmTransportAlgorithm>( - stats_, rate_estimator_.get(), drop_factor, minimum_drop_probability, - gamma, beta, sample_number, interest_lifetime, beta_wifi_, drop_wifi_, - beta_lte_, drop_lte_, wifi_delay_, lte_delay_, max_window, min_window); + // Current path + auto cur_path = std::make_unique<RaaqmDataPath>( + drop_factor, minimum_drop_probability, interest_lifetime * 1000, + sample_number); + cur_path_ = cur_path.get(); + path_table_[default_values::path_id] = std::move(cur_path); } portal_->setConsumerCallback(this); @@ -105,7 +109,6 @@ void RaaqmTransportProtocol::reset() { std::queue<Interest::Ptr> empty; std::swap(interest_to_retransmit_, empty); stats_->reset(); - raaqm_algorithm_->reset(); // Reset reassembly component reassembly_protocol_->reInitialize(); @@ -119,13 +122,65 @@ bool RaaqmTransportProtocol::verifyKeyPackets() { return index_manager_->onKeyToVerify(); } +void RaaqmTransportProtocol::increaseWindow() { + // return; + double max_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window_size); + if (current_window_size_ < max_window_size) { + double gamma = 0.; + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + // return; + double min_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window_size); + if (current_window_size_ > min_window_size) { + double beta = 0.; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().getSuffix()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + void RaaqmTransportProtocol::init() { std::ifstream is(RAAQM_CONFIG_PATH); std::string line; raaqm_autotune_ = false; - double default_beta = default_values::beta_value; - double default_drop = default_values::drop_factor; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; beta_wifi_ = default_values::beta_value; drop_wifi_ = default_values::drop_factor; beta_lte_ = default_values::beta_value; @@ -181,16 +236,17 @@ void RaaqmTransportProtocol::init() { if (command == "beta") { std::string tmp; - line_s >> tmp >> default_beta; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta); + line_s >> tmp >> default_beta_; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, + default_beta_); continue; } if (command == "drop") { std::string tmp; - line_s >> tmp >> default_drop; + line_s >> tmp >> default_drop_; socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop); + default_drop_); continue; } @@ -229,7 +285,6 @@ void RaaqmTransportProtocol::init() { line_s >> tmp >> lte_delay_; continue; } - if (command == "alpha") { std::string tmp; double rate_alpha = 0.0; @@ -288,27 +343,14 @@ void RaaqmTransportProtocol::onContentObject( void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t suffix = content_object->getName().getSuffix(); + uint32_t incremental_suffix = content_object->getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; - // Get new value for the window - current_window_size_ = - raaqm_algorithm_->onContentObject(suffix, content_object->getPathLabel()); - - // Print stats if needed - if (*stats_summary_) { - auto now = std::chrono::steady_clock::now(); - auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_summary_)(*socket_->getInterface(), *stats_); - t0_ = now; - } + // Update stats + if (!interest_retransmissions_[incremental_suffix & mask]) { + afterContentReception(*interest, *content_object); } index_manager_->onContentObject(std::move(interest), @@ -354,6 +396,8 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { } void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { + checkForStalePaths(); + const Name &n = interest->getName(); TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); @@ -375,6 +419,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_timeout_)(*socket_->getInterface(), *interest); } + afterDataUnsatisfied(segment); + uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); @@ -456,6 +502,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; + interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -471,10 +518,168 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { + rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); schedule_interests_ = false; } +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { + throw std::runtime_error("RAAQM ERROR: no current path found, exit"); + } else { + auto now = utils::SteadyClock::now(); + utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( + now - interest_timepoints_[segment & mask]); + + // Update stats + updateStats((uint32_t)segment, rtt.count(), now); + + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); + } + + cur_path_->insertNewRtt(rtt.count(), now); + cur_path_->smoothTimer(); + + if (cur_path_->newPropagationDelayAvailable()) { + checkDropProbability(); + } + } +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + double coin = ((double)rand() / (RAND_MAX)); + if (coin <= cur_path_->getDropProb()) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now) { + // Update RTT statistics + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); + + // Call statistics callback + if (*stats_summary_) { + auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; + } + } +} + +void RaaqmTransportProtocol::updatePathTable( + const ContentObject &content_object) { + uint32_t path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { + // Create a new path with some default param + + if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { + throw errors::RuntimeException( + "[RAAQM] No path initialized for path table, error could be in " + "default path initialization."); + } + + // Initiate the new path default param + auto new_path = std::make_unique<RaaqmDataPath>( + *(path_table_.at(default_values::path_id))); + + // Insert the new path into hash table + path_table_[path_id] = std::move(new_path); + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id].get(); + + size_t header_size = content_object.headerSize(); + size_t data_size = content_object.payloadSize(); + + // Update measurements for path + cur_path_->updateReceivedStats(header_size + data_size, data_size); +} + +void RaaqmTransportProtocol::checkDropProbability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + PathTable::iterator it; + for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && + it->second->getPropagationDelay() != UINT_MAX && + !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { // only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { // at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { // at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::checkForStalePaths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + PathTable::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + checkDropProbability(); + } +} + } // end namespace protocol } // namespace transport |