From d875ae92a7fa1eaab3bc2616aeeedfc64a81fea4 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 4 Sep 2020 08:21:02 +0000 Subject: [HICN-635] Expose protocol API in libtransport. Signed-off-by: Mauro Sardara Change-Id: I626acb5f79a85b167d50da5c07fa597a9ff4d239 --- libtransport/src/protocols/CMakeLists.txt | 4 + libtransport/src/protocols/cbr.cc | 12 - libtransport/src/protocols/cbr.h | 5 - libtransport/src/protocols/raaqm.cc | 289 +++------------------ libtransport/src/protocols/raaqm.h | 55 +--- libtransport/src/protocols/raaqm_data_path.cc | 10 - libtransport/src/protocols/raaqm_data_path.h | 10 - .../src/protocols/raaqm_transport_algorithm.cc | 255 ++++++++++++++++++ .../src/protocols/raaqm_transport_algorithm.h | 131 ++++++++++ libtransport/src/protocols/transport_algorithm.cc | 83 ++++++ libtransport/src/protocols/transport_algorithm.h | 68 +++++ 11 files changed, 592 insertions(+), 330 deletions(-) create mode 100644 libtransport/src/protocols/raaqm_transport_algorithm.cc create mode 100644 libtransport/src/protocols/raaqm_transport_algorithm.h create mode 100644 libtransport/src/protocols/transport_algorithm.cc create mode 100644 libtransport/src/protocols/transport_algorithm.h diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt index 8bfbdd6ad..02e626840 100644 --- a/libtransport/src/protocols/CMakeLists.txt +++ b/libtransport/src/protocols/CMakeLists.txt @@ -32,6 +32,8 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.h ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.h + ${CMAKE_CURRENT_SOURCE_DIR}/transport_algorithm.h ) list(APPEND SOURCE_FILES @@ -50,6 +52,8 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.cc + ${CMAKE_CURRENT_SOURCE_DIR}/transport_algorithm.cc ) set(RAAQM_CONFIG_INSTALL_PREFIX diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index 0bffd7d18..fc8b53b8d 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -34,18 +34,6 @@ void CbrTransportProtocol::reset() { current_window_size_); } -void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} - -void CbrTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - auto segment = content_object.getName().getSuffix(); - auto now = utils::SteadyClock::now(); - auto rtt = std::chrono::duration_cast( - now - interest_timepoints_[segment & mask]); - // Update stats - updateStats(segment, rtt.count(), now); -} - } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/protocols/cbr.h b/libtransport/src/protocols/cbr.h index 20129f6a3..32788cd05 100644 --- a/libtransport/src/protocols/cbr.h +++ b/libtransport/src/protocols/cbr.h @@ -28,11 +28,6 @@ class CbrTransportProtocol : public RaaqmTransportProtocol { int start() override; void reset() override; - - private: - void afterContentReception(const Interest &interest, - const ContentObject &content_object) override; - void afterDataUnsatisfied(uint64_t segment) override; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 783d6194b..cdf878328 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -32,39 +32,36 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), - interests_in_flight_(0), - cur_path_(nullptr), + raaqm_algorithm_(nullptr), t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr), schedule_interests_(true) { init(); } -RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (rate_estimator_) { - delete rate_estimator_; - } -} +RaaqmTransportProtocol::~RaaqmTransportProtocol() {} int RaaqmTransportProtocol::start() { - if (rate_estimator_) { - rate_estimator_->onStart(); - } - - if (!cur_path_) { + if (!raaqm_algorithm_) { // RAAQM double drop_factor; double minimum_drop_probability; uint32_t sample_number; uint32_t interest_lifetime; + double gamma = 0., max_window = 0., min_window = 0., beta; socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, minimum_drop_probability); socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, sample_number); socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interest_lifetime); + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window); + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window); // Rate Estimation double alpha = 0.0; @@ -78,20 +75,19 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - rate_estimator_ = new ALaTcpEstimator(); + rate_estimator_ = std::make_unique(); } else { - rate_estimator_ = new SimpleEstimator(alpha, batching_param); + rate_estimator_ = + std::make_unique(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, &rate_estimator_->observer_); - // Current path - auto cur_path = std::make_unique( - drop_factor, minimum_drop_probability, interest_lifetime * 1000, - sample_number); - cur_path_ = cur_path.get(); - path_table_[default_values::path_id] = std::move(cur_path); + raaqm_algorithm_ = std::make_unique( + stats_, rate_estimator_.get(), drop_factor, minimum_drop_probability, + gamma, beta, sample_number, interest_lifetime, beta_wifi_, drop_wifi_, + beta_lte_, drop_lte_, wifi_delay_, lte_delay_, max_window, min_window); } portal_->setConsumerCallback(this); @@ -109,6 +105,7 @@ void RaaqmTransportProtocol::reset() { std::queue empty; std::swap(interest_to_retransmit_, empty); stats_->reset(); + raaqm_algorithm_->reset(); // Reset reassembly component reassembly_protocol_->reInitialize(); @@ -122,65 +119,13 @@ bool RaaqmTransportProtocol::verifyKeyPackets() { return index_manager_->onKeyToVerify(); } -void RaaqmTransportProtocol::increaseWindow() { - // return; - double max_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window_size); - if (current_window_size_ < max_window_size) { - double gamma = 0.; - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - - current_window_size_ += gamma / current_window_size_; - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowIncrease(current_window_size_); -} - -void RaaqmTransportProtocol::decreaseWindow() { - // return; - double min_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window_size); - if (current_window_size_ > min_window_size) { - double beta = 0.; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - - current_window_size_ = current_window_size_ * beta; - if (current_window_size_ < min_window_size) { - current_window_size_ = min_window_size; - } - - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowDecrease(current_window_size_); -} - -void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - // Decrease the window because the timeout happened - decreaseWindow(); -} - -void RaaqmTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - updatePathTable(content_object); - increaseWindow(); - updateRtt(interest.getName().getSuffix()); - rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); - // Set drop probablility and window size accordingly - RAAQM(); -} - void RaaqmTransportProtocol::init() { std::ifstream is(RAAQM_CONFIG_PATH); std::string line; raaqm_autotune_ = false; - default_beta_ = default_values::beta_value; - default_drop_ = default_values::drop_factor; + double default_beta = default_values::beta_value; + double default_drop = default_values::drop_factor; beta_wifi_ = default_values::beta_value; drop_wifi_ = default_values::drop_factor; beta_lte_ = default_values::beta_value; @@ -236,17 +181,16 @@ void RaaqmTransportProtocol::init() { if (command == "beta") { std::string tmp; - line_s >> tmp >> default_beta_; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - default_beta_); + line_s >> tmp >> default_beta; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta); continue; } if (command == "drop") { std::string tmp; - line_s >> tmp >> default_drop_; + line_s >> tmp >> default_drop; socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop_); + default_drop); continue; } @@ -285,6 +229,7 @@ void RaaqmTransportProtocol::init() { line_s >> tmp >> lte_delay_; continue; } + if (command == "alpha") { std::string tmp; double rate_alpha = 0.0; @@ -343,14 +288,27 @@ void RaaqmTransportProtocol::onContentObject( void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); + uint32_t suffix = content_object->getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; - // Update stats - if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); + // Get new value for the window + current_window_size_ = + raaqm_algorithm_->onContentObject(suffix, content_object->getPathLabel()); + + // Print stats if needed + if (*stats_summary_) { + auto now = std::chrono::steady_clock::now(); + auto dt = std::chrono::duration_cast(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; + } } index_manager_->onContentObject(std::move(interest), @@ -396,8 +354,6 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { } void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - checkForStalePaths(); - const Name &n = interest->getName(); TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); @@ -419,8 +375,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_timeout_)(*socket_->getInterface(), *interest); } - afterDataUnsatisfied(segment); - uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); @@ -502,7 +456,6 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; - interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -518,168 +471,10 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); schedule_interests_ = false; } -void RaaqmTransportProtocol::updateRtt(uint64_t segment) { - if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("RAAQM ERROR: no current path found, exit"); - } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast( - now - interest_timepoints_[segment & mask]); - - // Update stats - updateStats((uint32_t)segment, rtt.count(), now); - - if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); - } - - cur_path_->insertNewRtt(rtt.count(), now); - cur_path_->smoothTimer(); - - if (cur_path_->newPropagationDelayAvailable()) { - checkDropProbability(); - } - } -} - -void RaaqmTransportProtocol::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - double coin = ((double)rand() / (RAND_MAX)); - if (coin <= cur_path_->getDropProb()) { - decreaseWindow(); - } - } -} - -void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { - // Update RTT statistics - stats_->updateAverageRtt(rtt); - stats_->updateAverageWindowSize(current_window_size_); - - // Call statistics callback - if (*stats_summary_) { - auto dt = std::chrono::duration_cast(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_summary_)(*socket_->getInterface(), *stats_); - t0_ = now; - } - } -} - -void RaaqmTransportProtocol::updatePathTable( - const ContentObject &content_object) { - uint32_t path_id = content_object.getPathLabel(); - - if (path_table_.find(path_id) == path_table_.end()) { - if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { - // Create a new path with some default param - - if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { - throw errors::RuntimeException( - "[RAAQM] No path initialized for path table, error could be in " - "default path initialization."); - } - - // Initiate the new path default param - auto new_path = std::make_unique( - *(path_table_.at(default_values::path_id))); - - // Insert the new path into hash table - path_table_[path_id] = std::move(new_path); - } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); - } - } - - cur_path_ = path_table_[path_id].get(); - - size_t header_size = content_object.headerSize(); - size_t data_size = content_object.payloadSize(); - - // Update measurements for path - cur_path_->updateReceivedStats(header_size + data_size, data_size); -} - -void RaaqmTransportProtocol::checkDropProbability() { - if (!raaqm_autotune_) { - return; - } - - unsigned int max_pd = 0; - PathTable::iterator it; - for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->getPropagationDelay() > max_pd && - it->second->getPropagationDelay() != UINT_MAX && - !it->second->isStale()) { - max_pd = it->second->getPropagationDelay(); - } - } - - double drop_prob = 0; - double beta = 0; - if (max_pd < wifi_delay_) { // only ethernet paths - drop_prob = default_drop_; - beta = default_beta_; - } else if (max_pd < lte_delay_) { // at least one wifi path - drop_prob = drop_wifi_; - beta = beta_wifi_; - } else { // at least one lte path - drop_prob = drop_lte_; - beta = beta_lte_; - } - - double old_drop_prob = 0; - double old_beta = 0; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); - - if (drop_prob == old_drop_prob && beta == old_beta) { - return; - } - - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); - - for (it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->setDropProb(drop_prob); - } -} - -void RaaqmTransportProtocol::checkForStalePaths() { - if (!raaqm_autotune_) { - return; - } - - bool stale = false; - PathTable::iterator it; - for (it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->isStale()) { - stale = true; - break; - } - } - if (stale) { - checkDropProbability(); - } -} - } // end namespace protocol } // namespace transport diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index fce4194d4..2999b50cf 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -29,8 +30,7 @@ namespace transport { namespace protocol { -class RaaqmTransportProtocol : public TransportProtocol, - public CWindowProtocol { +class RaaqmTransportProtocol : public TransportProtocol { public: RaaqmTransportProtocol(implementation::ConsumerSocket *icnet_socket); @@ -51,16 +51,6 @@ class RaaqmTransportProtocol : public TransportProtocol, using PathTable = std::unordered_map>; - void increaseWindow() override; - void decreaseWindow() override; - - virtual void afterContentReception(const Interest &interest, - const ContentObject &content_object); - virtual void afterDataUnsatisfied(uint64_t segment); - - virtual void updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now); - private: void init(); @@ -86,48 +76,21 @@ class RaaqmTransportProtocol : public TransportProtocol, void updateRtt(uint64_t segment); - void RAAQM(); - - void updatePathTable(const ContentObject &content_object); - - void checkDropProbability(); - - void checkForStalePaths(); - - void printRtt(); - protected: - // Congestion window management - double current_window_size_; - // Protocol management - uint64_t interests_in_flight_; - std::array interest_retransmissions_; - std::array interest_timepoints_; std::queue interest_to_retransmit_; + std::array interest_retransmissions_; + uint32_t interests_in_flight_; + double current_window_size_; private: - /** - * Current download path - */ - RaaqmDataPath *cur_path_; - - /** - * Hash table for path: each entry is a pair path ID(key) - path object - */ - PathTable path_table_; - + std::unique_ptr raaqm_algorithm_; + std::unique_ptr rate_estimator_; // TimePoints for statistic utils::TimePoint t0_; - bool set_interest_filter_; - - // for rate-estimation at packet level - IcnRateEstimator *rate_estimator_; - - // params for autotuning + // Temporary placeholder for RAAQM algorithm + // parameters bool raaqm_autotune_; - double default_beta_; - double default_drop_; double beta_wifi_; double drop_wifi_; double beta_lte_; diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index 8bbbadcf2..499b579f3 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -14,7 +14,6 @@ */ #include - #include namespace transport { @@ -66,15 +65,6 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt, return *this; } -RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, - std::size_t data_size) { - packets_received_++; - m_packets_bytes_received_ += packet_size; - raw_data_bytes_received_ += data_size; - - return *this; -} - double RaaqmDataPath::getDropFactor() { return drop_factor_; } double RaaqmDataPath::getDropProb() { return drop_prob_; } diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index 3f037bc76..d1234e743 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -16,7 +16,6 @@ #pragma once #include - #include #include @@ -47,15 +46,6 @@ class RaaqmDataPath { */ RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now); - /** - * @brief Update the path statistics - * @param packet_size the size of the packet received, including the ICN - * header - * @param data_size the size of the data received, without the ICN header - */ - RaaqmDataPath &updateReceivedStats(std::size_t packet_size, - std::size_t data_size); - /** * @brief Get the value of the drop factor parameter */ diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.cc b/libtransport/src/protocols/raaqm_transport_algorithm.cc new file mode 100644 index 000000000..09e3a47ab --- /dev/null +++ b/libtransport/src/protocols/raaqm_transport_algorithm.cc @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { +namespace protocol { + +RaaqmTransportAlgorithm::RaaqmTransportAlgorithm( + interface::TransportStatistics *stats, IcnRateEstimator *rate_estimator, + double drop_factor, double minimum_drop_probability, double gamma, + double beta, uint32_t sample_number, uint32_t interest_lifetime, + double beta_wifi, double drop_wifi, double beta_lte, double drop_lte, + unsigned int wifi_delay, unsigned int lte_delay, double max_window, + double min_window) + : current_window_size_(1), + cur_path_(nullptr), + rate_estimator_(rate_estimator), + stats_(stats), + drop_factor_(drop_factor), + minimum_drop_probability_(minimum_drop_probability), + gamma_(gamma), + beta_(beta), + sample_number_(sample_number), + interest_lifetime_(interest_lifetime), + beta_wifi_(beta_wifi), + drop_wifi_(drop_wifi), + beta_lte_(beta_lte), + drop_lte_(drop_lte), + wifi_delay_(wifi_delay), + lte_delay_(lte_delay), + max_window_(max_window), + min_window_(min_window) {} + +RaaqmTransportAlgorithm::~RaaqmTransportAlgorithm() {} + +void RaaqmTransportAlgorithm::reset() { + if (rate_estimator_) { + rate_estimator_->onStart(); + } + + if (!cur_path_) { + // Current path + auto cur_path = std::make_unique( + drop_factor_, minimum_drop_probability_, interest_lifetime_ * 1000, + sample_number_); + cur_path_ = cur_path.get(); + path_table_[interface::default_values::path_id] = std::move(cur_path); + } +} + +void RaaqmTransportAlgorithm::increaseWindow() { + if (current_window_size_ < max_window_) { + current_window_size_ += gamma_ / current_window_size_; + } + + if (rate_estimator_) { + rate_estimator_->onWindowIncrease(current_window_size_); + } +} + +void RaaqmTransportAlgorithm::decreaseWindow() { + if (current_window_size_ > min_window_) { + current_window_size_ = current_window_size_ * beta_; + if (current_window_size_ < min_window_) { + current_window_size_ = min_window_; + } + } + + if (rate_estimator_) { + rate_estimator_->onWindowDecrease(current_window_size_); + } +} + +void RaaqmTransportAlgorithm::updateRtt(uint32_t suffix) { + if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { + throw std::runtime_error("RAAQM ERROR: no current path found, exit"); + } else { + auto now = utils::SteadyClock::now(); + utils::Microseconds rtt = std::chrono::duration_cast( + now - interest_timepoints_[suffix & mask]); + + updateStats(suffix, rtt.count(), now); + + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); + } + + cur_path_->insertNewRtt(rtt.count(), now); + cur_path_->smoothTimer(); + + if (cur_path_->newPropagationDelayAvailable()) { + checkDropProbability(); + } + } +} + +void RaaqmTransportAlgorithm::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + double coin = ((double)rand() / (RAND_MAX)); + if (coin <= cur_path_->getDropProb()) { + decreaseWindow(); + } + } +} + +void RaaqmTransportAlgorithm::updatePathTable(uint32_t path_label) { + uint32_t path_id = path_label; + + if (path_table_.find(path_id) == path_table_.end()) { + if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { + // Create a new path with some default param + + if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { + throw errors::RuntimeException( + "[RAAQM] No path initialized for path table, error could be in " + "default path initialization."); + } + + // Initiate the new path default param + auto new_path = std::make_unique( + *(path_table_.at(interface::default_values::path_id))); + + // Insert the new path into hash table + path_table_[path_id] = std::move(new_path); + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id].get(); +} + +void RaaqmTransportAlgorithm::checkDropProbability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + PathTable::iterator it; + for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && + it->second->getPropagationDelay() != UINT_MAX && + !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { // only ethernet paths + drop_prob = drop_factor_; + beta = beta_; + } else if (max_pd < lte_delay_) { // at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { // at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + // socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); + // socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, + // old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + // socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + // socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportAlgorithm::checkForStalePaths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + PathTable::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + checkDropProbability(); + } +} + +void RaaqmTransportAlgorithm::updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now) { + // Update RTT statistics + if (stats_) { + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); + } +} + +uint32_t RaaqmTransportAlgorithm::onContentObject(uint32_t suffix, + uint32_t path_label) { + updatePathTable(path_label); + increaseWindow(); + updateRtt(suffix); + + // Set drop probablility and window size accordingly + RAAQM(); + + return current_window_size_; +} + +uint32_t RaaqmTransportAlgorithm::onInterestTimeout(uint32_t suffix) { + checkForStalePaths(); + // Decrease the window because the timeout happened + decreaseWindow(); + + return current_window_size_; +} + +void RaaqmTransportAlgorithm::onInterestSent(uint32_t suffix) { + interest_timepoints_[suffix & mask] = utils::SteadyClock::now(); +} + +void RaaqmTransportAlgorithm::sessionEnd() { + rate_estimator_->onDownloadFinished(); +} + +} // namespace protocol +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.h b/libtransport/src/protocols/raaqm_transport_algorithm.h new file mode 100644 index 000000000..5b0b5c05c --- /dev/null +++ b/libtransport/src/protocols/raaqm_transport_algorithm.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +class RaaqmTransportAlgorithm : public TransportAlgorithm, + public CWindowProtocol { + public: + // TODO: Add windows size and other beta/drop parameters + RaaqmTransportAlgorithm(interface::TransportStatistics *stats, + IcnRateEstimator *rate_estimator, double drop_factor, + double minimum_drop_probability, double gamma, + double beta, uint32_t sample_number, + uint32_t interest_lifetime, double beta_wifi, + double drop_wifi, double beta_lte, double drop_lte, + unsigned int wifi_delay, unsigned int lte_delay, + double max_window, double min_window); + + ~RaaqmTransportAlgorithm() override; + + void reset() override; + + uint32_t onContentObject(uint32_t suffix, uint32_t path_label) override; + + uint32_t onInterestTimeout(uint32_t suffix) override; + + void onInterestSent(uint32_t suffix) override; + + void sessionEnd() override; + + protected: + static constexpr uint32_t buffer_size = + 1 << interface::default_values::log_2_default_buffer_size; + static constexpr uint16_t mask = buffer_size - 1; + using PathTable = + std::unordered_map>; + + void increaseWindow() override; + void decreaseWindow() override; + + virtual void updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now); + + private: + void init(); + + void updateRtt(uint32_t suffix); + + void RAAQM(); + + void updatePathTable(uint32_t path_label); + + void checkDropProbability(); + + void checkForStalePaths(); + + protected: + // Congestion window management + double current_window_size_; + // Protocol management + uint64_t interests_in_flight_; + std::array interest_timepoints_; + + private: + /** + * Current download path + */ + RaaqmDataPath *cur_path_; + + /** + * Hash table for path: each entry is a pair path ID(key) - path object + */ + PathTable path_table_; + + // for rate-estimation at packet level + IcnRateEstimator *rate_estimator_; + interface::TransportStatistics *stats_; + + bool set_interest_filter_; + + // Params + double drop_factor_; + double minimum_drop_probability_; + double gamma_; + double beta_; + uint32_t sample_number_; + uint32_t interest_lifetime_; + + bool raaqm_autotune_; + double beta_wifi_; + double drop_wifi_; + double beta_lte_; + double drop_lte_; + unsigned int wifi_delay_; + unsigned int lte_delay_; + double max_window_; + double min_window_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/transport_algorithm.cc b/libtransport/src/protocols/transport_algorithm.cc new file mode 100644 index 000000000..2b8d60934 --- /dev/null +++ b/libtransport/src/protocols/transport_algorithm.cc @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +namespace { +allocator_t *algorithm_allocator = nullptr; +deallocator_t *algorithm_deallocator = nullptr; + +void *custom_allocate(size_t size) { + void *ret; + if (algorithm_allocator) { + ret = algorithm_allocator(size); + } else { + ret = new uint8_t[size]; + } + + return ret; +} + +void custom_deallocate(void *p) { + if (algorithm_deallocator) { + algorithm_deallocator(p); + } else { + delete[](char *)(p); + } +} + +} // namespace + +void transportAlgorithm_Init(allocator_t *allocator, + deallocator_t *deallocator) { + algorithm_allocator = allocator; + algorithm_deallocator = deallocator; +} + +void transportAlgorithm_Destroy(TransportAlgorithm *algorithm) { + custom_deallocate(algorithm); +} + +TransportAlgorithm *transportAlgorithm_CreateRaaqm( + double drop_factor, double minimum_drop_probability, double gamma, + double beta, uint32_t sample_number, uint32_t interest_lifetime, + double beta_wifi, double drop_wifi, double beta_lte, double drop_lte, + unsigned int wifi_delay, unsigned int lte_delay, double max_window, + double min_window) { + TransportAlgorithm *ret = nullptr; + ret = new ( + custom_allocate(sizeof(transport::protocol::RaaqmTransportAlgorithm))) + transport::protocol::RaaqmTransportAlgorithm( + nullptr, nullptr, drop_factor, minimum_drop_probability, gamma, beta, + sample_number, interest_lifetime, beta_wifi, drop_wifi, beta_lte, + drop_lte, wifi_delay, lte_delay, max_window, min_window); + + return ret; +} + +uint32_t transportAlgorithm_OnContentObject(TransportAlgorithm *algorithm, + uint32_t suffix, + uint32_t path_label) { + return algorithm->onContentObject(suffix, path_label); +} + +uint32_t transportAlgorithm_OnInterestTimeout(TransportAlgorithm *algorithm, + uint32_t suffix) { + return algorithm->onInterestTimeout(suffix); +} \ No newline at end of file diff --git a/libtransport/src/protocols/transport_algorithm.h b/libtransport/src/protocols/transport_algorithm.h new file mode 100644 index 000000000..5dacdb3e0 --- /dev/null +++ b/libtransport/src/protocols/transport_algorithm.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef transport_algorithm_h +#define transport_algorithm_h + +#include +#include +#include +#include + +/** + * TransportAlgorithm: class containing the protocol machinery + */ +#ifdef __cplusplus + +#include + +class TransportAlgorithm { + public: + virtual ~TransportAlgorithm() = default; + virtual void reset() = 0; + virtual uint32_t onContentObject(uint32_t suffix, uint32_t path_label) = 0; + virtual uint32_t onInterestTimeout(uint32_t suffix) = 0; + virtual void onInterestSent(uint32_t suffix) = 0; + virtual void sessionEnd() = 0; +}; + +using transport::interface::TransportProtocolAlgorithms; + +#else +typedef void *TransportAlgorithm; +#endif + +typedef void *(allocator_t)(size_t size); +typedef void *(deallocator_t)(void *ptr); + +extern "C" void transportAlgorithm_Init(allocator_t *allocator, + deallocator_t *deallocator); + +extern "C" TransportAlgorithm *transportAlgorithm_CreateRaaqm( + double drop_factor, double minimum_drop_probability, double gamma, + double beta, uint32_t sample_number, uint32_t interest_lifetime, + double beta_wifi, double drop_wifi, double beta_lte, double drop_lte, + unsigned int wifi_delay, unsigned int lte_delay, double max_window, + double min_window); + +extern "C" void transportAlgorithm_Destroy(TransportAlgorithm *algorithm); + +extern "C" uint32_t transportAlgorithm_OnContentObject( + TransportAlgorithm *algorithm, uint32_t suffix, uint32_t path_label); + +extern "C" uint32_t transportAlgorithm_OnInterestTimeout( + TransportAlgorithm *algorithm, uint32_t suffix); + +#endif /* transport_algorithm_h */ \ No newline at end of file -- cgit 1.2.3-korg