From d875ae92a7fa1eaab3bc2616aeeedfc64a81fea4 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 4 Sep 2020 08:21:02 +0000 Subject: [HICN-635] Expose protocol API in libtransport. Signed-off-by: Mauro Sardara Change-Id: I626acb5f79a85b167d50da5c07fa597a9ff4d239 --- libtransport/src/protocols/raaqm.cc | 289 ++++++------------------------------ 1 file changed, 42 insertions(+), 247 deletions(-) (limited to 'libtransport/src/protocols/raaqm.cc') diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 783d6194b..cdf878328 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -32,39 +32,36 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), - interests_in_flight_(0), - cur_path_(nullptr), + raaqm_algorithm_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr), schedule_interests_(true) { init(); } -RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (rate_estimator_) { - delete rate_estimator_; - } -} +RaaqmTransportProtocol::~RaaqmTransportProtocol() {} int RaaqmTransportProtocol::start() { - if (rate_estimator_) { - rate_estimator_->onStart(); - } - - if (!cur_path_) { + if (!raaqm_algorithm_) { // RAAQM double drop_factor; double minimum_drop_probability; uint32_t sample_number; uint32_t interest_lifetime; + double gamma = 0., max_window = 0., min_window = 0., beta; socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, minimum_drop_probability); socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, sample_number); socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interest_lifetime); + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window); + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window); // Rate Estimation double alpha = 0.0; @@ -78,20 +75,19 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - rate_estimator_ = new ALaTcpEstimator(); + rate_estimator_ = std::make_unique(); } else { - rate_estimator_ = new SimpleEstimator(alpha, batching_param); + rate_estimator_ = + std::make_unique(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, &rate_estimator_->observer_); - // Current path - auto cur_path = std::make_unique( - drop_factor, minimum_drop_probability, interest_lifetime * 1000, - sample_number); - cur_path_ = cur_path.get(); - path_table_[default_values::path_id] = std::move(cur_path); + raaqm_algorithm_ = std::make_unique( + stats_, rate_estimator_.get(), drop_factor, minimum_drop_probability, + gamma, beta, sample_number, interest_lifetime, beta_wifi_, drop_wifi_, + beta_lte_, drop_lte_, wifi_delay_, lte_delay_, max_window, min_window); } portal_->setConsumerCallback(this); @@ -109,6 +105,7 @@ void RaaqmTransportProtocol::reset() { std::queue empty; std::swap(interest_to_retransmit_, empty); stats_->reset(); + raaqm_algorithm_->reset(); // Reset reassembly component reassembly_protocol_->reInitialize(); @@ -122,65 +119,13 @@ bool RaaqmTransportProtocol::verifyKeyPackets() { return index_manager_->onKeyToVerify(); } -void RaaqmTransportProtocol::increaseWindow() { - // return; - double max_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window_size); - if (current_window_size_ < max_window_size) { - double gamma = 0.; - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - - current_window_size_ += gamma / current_window_size_; - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowIncrease(current_window_size_); -} - -void RaaqmTransportProtocol::decreaseWindow() { - // return; - double min_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window_size); - if (current_window_size_ > min_window_size) { - double beta = 0.; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - - current_window_size_ = current_window_size_ * beta; - if (current_window_size_ < min_window_size) { - current_window_size_ = min_window_size; - } - - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowDecrease(current_window_size_); -} - -void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - // Decrease the window because the timeout happened - decreaseWindow(); -} - -void RaaqmTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - updatePathTable(content_object); - increaseWindow(); - updateRtt(interest.getName().getSuffix()); - rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); - // Set drop probablility and window size accordingly - RAAQM(); -} - void RaaqmTransportProtocol::init() { std::ifstream is(RAAQM_CONFIG_PATH); std::string line; raaqm_autotune_ = false; - default_beta_ = default_values::beta_value; - default_drop_ = default_values::drop_factor; + double default_beta = default_values::beta_value; + double default_drop = default_values::drop_factor; beta_wifi_ = default_values::beta_value; drop_wifi_ = default_values::drop_factor; beta_lte_ = default_values::beta_value; @@ -236,17 +181,16 @@ void RaaqmTransportProtocol::init() { if (command == "beta") { std::string tmp; - line_s >> tmp >> default_beta_; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - default_beta_); + line_s >> tmp >> default_beta; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta); continue; } if (command == "drop") { std::string tmp; - line_s >> tmp >> default_drop_; + line_s >> tmp >> default_drop; socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop_); + default_drop); continue; } @@ -285,6 +229,7 @@ void RaaqmTransportProtocol::init() { line_s >> tmp >> lte_delay_; continue; } + if (command == "alpha") { std::string tmp; double rate_alpha = 0.0; @@ -343,14 +288,27 @@ void RaaqmTransportProtocol::onContentObject( void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); + uint32_t suffix = content_object->getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; - // Update stats - if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); + // Get new value for the window + current_window_size_ = + raaqm_algorithm_->onContentObject(suffix, content_object->getPathLabel()); + + // Print stats if needed + if (*stats_summary_) { + auto now = std::chrono::steady_clock::now(); + auto dt = std::chrono::duration_cast(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; + } } index_manager_->onContentObject(std::move(interest), @@ -396,8 +354,6 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { } void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - checkForStalePaths(); - const Name &n = interest->getName(); TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); @@ -419,8 +375,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_timeout_)(*socket_->getInterface(), *interest); } - afterDataUnsatisfied(segment); - uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); @@ -502,7 +456,6 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; - interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -518,168 +471,10 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); schedule_interests_ = false; } -void RaaqmTransportProtocol::updateRtt(uint64_t segment) { - if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("RAAQM ERROR: no current path found, exit"); - } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast( - now - interest_timepoints_[segment & mask]); - - // Update stats - updateStats((uint32_t)segment, rtt.count(), now); - - if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); - } - - cur_path_->insertNewRtt(rtt.count(), now); - cur_path_->smoothTimer(); - - if (cur_path_->newPropagationDelayAvailable()) { - checkDropProbability(); - } - } -} - -void RaaqmTransportProtocol::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - double coin = ((double)rand() / (RAND_MAX)); - if (coin <= cur_path_->getDropProb()) { - decreaseWindow(); - } - } -} - -void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { - // Update RTT statistics - stats_->updateAverageRtt(rtt); - stats_->updateAverageWindowSize(current_window_size_); - - // Call statistics callback - if (*stats_summary_) { - auto dt = std::chrono::duration_cast(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_summary_)(*socket_->getInterface(), *stats_); - t0_ = now; - } - } -} - -void RaaqmTransportProtocol::updatePathTable( - const ContentObject &content_object) { - uint32_t path_id = content_object.getPathLabel(); - - if (path_table_.find(path_id) == path_table_.end()) { - if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { - // Create a new path with some default param - - if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { - throw errors::RuntimeException( - "[RAAQM] No path initialized for path table, error could be in " - "default path initialization."); - } - - // Initiate the new path default param - auto new_path = std::make_unique( - *(path_table_.at(default_values::path_id))); - - // Insert the new path into hash table - path_table_[path_id] = std::move(new_path); - } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); - } - } - - cur_path_ = path_table_[path_id].get(); - - size_t header_size = content_object.headerSize(); - size_t data_size = content_object.payloadSize(); - - // Update measurements for path - cur_path_->updateReceivedStats(header_size + data_size, data_size); -} - -void RaaqmTransportProtocol::checkDropProbability() { - if (!raaqm_autotune_) { - return; - } - - unsigned int max_pd = 0; - PathTable::iterator it; - for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->getPropagationDelay() > max_pd && - it->second->getPropagationDelay() != UINT_MAX && - !it->second->isStale()) { - max_pd = it->second->getPropagationDelay(); - } - } - - double drop_prob = 0; - double beta = 0; - if (max_pd < wifi_delay_) { // only ethernet paths - drop_prob = default_drop_; - beta = default_beta_; - } else if (max_pd < lte_delay_) { // at least one wifi path - drop_prob = drop_wifi_; - beta = beta_wifi_; - } else { // at least one lte path - drop_prob = drop_lte_; - beta = beta_lte_; - } - - double old_drop_prob = 0; - double old_beta = 0; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); - - if (drop_prob == old_drop_prob && beta == old_beta) { - return; - } - - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); - - for (it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->setDropProb(drop_prob); - } -} - -void RaaqmTransportProtocol::checkForStalePaths() { - if (!raaqm_autotune_) { - return; - } - - bool stale = false; - PathTable::iterator it; - for (it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->isStale()) { - stale = true; - break; - } - } - if (stale) { - checkDropProbability(); - } -} - } // end namespace protocol } // namespace transport -- cgit 1.2.3-korg