diff options
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc')
-rw-r--r-- | libtransport/src/hicn/transport/protocols/raaqm.cc | 712 |
1 files changed, 0 insertions, 712 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc deleted file mode 100644 index 984470edb..000000000 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ /dev/null @@ -1,712 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/indexer.h> -#include <hicn/transport/protocols/raaqm.h> -#include <hicn/transport/protocols/errors.h> - -#include <cstdlib> -#include <fstream> - -namespace transport { - -namespace protocol { - -using namespace interface; - -RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), - current_window_size_(1), - interests_in_flight_(0), - cur_path_(nullptr), - t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { - init(); -} - -RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (rate_estimator_) { - delete rate_estimator_; - } -} - -int RaaqmTransportProtocol::start() { - if (rate_estimator_) { - rate_estimator_->onStart(); - } - - if (!cur_path_) { - // RAAQM - double drop_factor; - double minimum_drop_probability; - uint32_t sample_number; - uint32_t interest_lifetime; - - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); - socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, - minimum_drop_probability); - socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, - sample_number); - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - - // Rate Estimation - double alpha = 0.0; - uint32_t batching_param = 0; - uint32_t choice_param = 0; - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, - alpha); - socket_->getSocketOption( - RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, - choice_param); - - if (choice_param == 1) { - rate_estimator_ = new ALaTcpEstimator(); - } else { - rate_estimator_ = new SimpleEstimator(alpha, batching_param); - } - - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, - &rate_estimator_->observer_); - - // Current path - auto cur_path = std::make_unique<RaaqmDataPath>( - drop_factor, minimum_drop_probability, interest_lifetime * 1000, - sample_number); - cur_path_ = cur_path.get(); - path_table_[default_values::path_id] = std::move(cur_path); - } - - portal_->setConsumerCallback(this); - return TransportProtocol::start(); -} - -void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } - -void RaaqmTransportProtocol::reset() { - // Set first segment to retrieve - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - index_manager_->reset(); - index_manager_->setFirstSuffix(name->getSuffix()); - std::queue<Interest::Ptr> empty; - std::swap(interest_to_retransmit_, empty); - stats_->reset(); - - // Reset reassembly component - reassembly_protocol_->reInitialize(); - - // Reset protocol variables - interests_in_flight_ = 0; - t0_ = utils::SteadyClock::now(); -} - -bool RaaqmTransportProtocol::verifyKeyPackets() { - return index_manager_->onKeyToVerify(); -} - -void RaaqmTransportProtocol::increaseWindow() { - // return; - double max_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window_size); - if (current_window_size_ < max_window_size) { - double gamma = 0.; - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - - current_window_size_ += gamma / current_window_size_; - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowIncrease(current_window_size_); -} - -void RaaqmTransportProtocol::decreaseWindow() { - // return; - double min_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window_size); - if (current_window_size_ > min_window_size) { - double beta = 0.; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - - current_window_size_ = current_window_size_ * beta; - if (current_window_size_ < min_window_size) { - current_window_size_ = min_window_size; - } - - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowDecrease(current_window_size_); -} - -void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - // Decrease the window because the timeout happened - decreaseWindow(); -} - -void RaaqmTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - updatePathTable(content_object); - increaseWindow(); - updateRtt(interest.getName().getSuffix()); - rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); - // Set drop probablility and window size accordingly - RAAQM(); -} - -void RaaqmTransportProtocol::init() { - std::ifstream is(RAAQM_CONFIG_PATH); - - std::string line; - raaqm_autotune_ = false; - default_beta_ = default_values::beta_value; - default_drop_ = default_values::drop_factor; - beta_wifi_ = default_values::beta_value; - drop_wifi_ = default_values::drop_factor; - beta_lte_ = default_values::beta_value; - drop_lte_ = default_values::drop_factor; - wifi_delay_ = 1000; - lte_delay_ = 15000; - - if (!is) { - TRANSPORT_LOGW( - "WARNING: RAAQM parameters not found at %s, set default values", - RAAQM_CONFIG_PATH); - return; - } - - while (getline(is, line)) { - std::string command; - std::istringstream line_s(line); - - line_s >> command; - - if (command == ";") { - continue; - } - - if (command == "autotune") { - std::string tmp; - std::string val; - line_s >> tmp >> val; - if (val == "yes") { - raaqm_autotune_ = true; - } else { - raaqm_autotune_ = false; - } - continue; - } - - if (command == "lifetime") { - std::string tmp; - uint32_t lifetime; - line_s >> tmp >> lifetime; - socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - continue; - } - - if (command == "retransmissions") { - std::string tmp; - uint32_t rtx; - line_s >> tmp >> rtx; - socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx); - continue; - } - - if (command == "beta") { - std::string tmp; - line_s >> tmp >> default_beta_; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - default_beta_); - continue; - } - - if (command == "drop") { - std::string tmp; - line_s >> tmp >> default_drop_; - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop_); - continue; - } - - if (command == "beta_wifi_") { - std::string tmp; - line_s >> tmp >> beta_wifi_; - continue; - } - - if (command == "drop_wifi_") { - std::string tmp; - line_s >> tmp >> drop_wifi_; - continue; - } - - if (command == "beta_lte_") { - std::string tmp; - line_s >> tmp >> beta_lte_; - continue; - } - - if (command == "drop_lte_") { - std::string tmp; - line_s >> tmp >> drop_lte_; - continue; - } - - if (command == "wifi_delay_") { - std::string tmp; - line_s >> tmp >> wifi_delay_; - continue; - } - - if (command == "lte_delay_") { - std::string tmp; - line_s >> tmp >> lte_delay_; - continue; - } - if (command == "alpha") { - std::string tmp; - double rate_alpha = 0.0; - line_s >> tmp >> rate_alpha; - socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, - rate_alpha); - continue; - } - - if (command == "batching_parameter") { - std::string tmp; - uint32_t batching_param = 0; - line_s >> tmp >> batching_param; - socket_->setSocketOption( - RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, - batching_param); - continue; - } - - if (command == "rate_estimator") { - std::string tmp; - uint32_t choice_param = 0; - line_s >> tmp >> choice_param; - socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, - choice_param); - continue; - } - } - - is.close(); -} - -void RaaqmTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // Check whether makes sense to continue - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_, *content_object); - } - - ConsumerInterestCallback *callback_interest = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, - &callback_interest); - if (*callback_interest) { - (*callback_interest)(*socket_, *interest); - } - - if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - stats_->updateBytesRecv(content_object->payloadSize()); - } - - onContentSegment(std::move(interest), std::move(content_object)); - scheduleNextInterests(); -} - -void RaaqmTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - - // Decrease in-flight interests - interests_in_flight_--; - - // Update stats - if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); - } - - index_manager_->onContentObject(std::move(interest), - std::move(content_object)); -} - -void RaaqmTransportProtocol::onPacketDropped( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t max_rtx = 0; - socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - - uint64_t segment = interest->getName().getSuffix(); - ConsumerInterestCallback *callback = VOID_HANDLER; - if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < - max_rtx)) { - stats_->updateRetxCount(1); - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - if (!is_running_) { - return; - } - - interest_retransmissions_[segment & mask]++; - - interest_to_retransmit_.push(std::move(interest)); - } else { - TRANSPORT_LOGE( - "Stop: received not trusted packet %llu times", - (unsigned long long)interest_retransmissions_[segment & mask]); - onContentReassembled( - make_error_code(protocol_error::max_retransmissions_error)); - } -} - -void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { - -} - -void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - checkForStalePaths(); - - const Name &n = interest->getName(); - - TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); - - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - interests_in_flight_--; - - uint64_t segment = n.getSuffix(); - - // Do not retransmit interests asking contents that do not exist. - if (segment > index_manager_->getFinalSuffix()) { - return; - } - - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - afterDataUnsatisfied(segment); - - uint32_t max_rtx = 0; - socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - - if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < - max_rtx)) { - stats_->updateRetxCount(1); - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - if (!is_running_) { - return; - } - - interest_retransmissions_[segment & mask]++; - - interest_to_retransmit_.push(std::move(interest)); - - scheduleNextInterests(); - } else { - TRANSPORT_LOGE("Stop: reached max retx limit."); - onContentReassembled(std::make_error_code(std::errc(std::errc::io_error))); - } -} - -void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ && - interest_to_retransmit_.size() > 0)) { - // send at least one interest if there are retransmissions to perform and - // there is no space left in the window - sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); - interest_to_retransmit_.pop(); - } - - uint32_t index = IndexManager::invalid_index; - - // Send the interest needed for filling the window - while (interests_in_flight_ < current_window_size_) { - if (interest_to_retransmit_.size() > 0) { - sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); - interest_to_retransmit_.pop(); - } else { - index = index_manager_->getNextSuffix(); - if (index == IndexManager::invalid_index) { - TRANSPORT_LOGE("INVALID INDEX %d", index); - break; - } - - sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); - } - } -} - -void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getPacket(); - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - name->setSuffix((uint32_t)next_suffix); - interest->setName(*name); - - uint32_t interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - interest->setLifetime(interest_lifetime); - - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - callback->operator()(*socket_, *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - // This is set to ~0 so that the next interest_retransmissions_ + 1, - // performed by sendInterest, will result in 0 - interest_retransmissions_[next_suffix & mask] = ~0; - interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); - sendInterest(std::move(interest)); -} - -void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { - interests_in_flight_++; - interest_retransmissions_[interest->getName().getSuffix() & mask]++; - - portal_->sendInterest(std::move(interest)); -} - -void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - rate_estimator_->onDownloadFinished(); - TransportProtocol::onContentReassembled(ec); -} - -void RaaqmTransportProtocol::updateRtt(uint64_t segment) { - if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("RAAQM ERROR: no current path found, exit"); - } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( - now - interest_timepoints_[segment & mask]); - - // Update stats - updateStats((uint32_t)segment, rtt.count(), now); - - if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); - } - - cur_path_->insertNewRtt(rtt.count()); - cur_path_->smoothTimer(); - - if (cur_path_->newPropagationDelayAvailable()) { - checkDropProbability(); - } - } -} - -void RaaqmTransportProtocol::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - double coin = ((double)rand() / (RAND_MAX)); - if (coin <= cur_path_->getDropProb()) { - decreaseWindow(); - } - } -} - -void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { - // Update RTT statistics - stats_->updateAverageRtt(rtt); - stats_->updateAverageWindowSize(current_window_size_); - - // Call statistics callback - ConsumerTimerCallback *stats_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { - auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_, *stats_); - t0_ = utils::SteadyClock::now(); - } - } -} - -void RaaqmTransportProtocol::updatePathTable( - const ContentObject &content_object) { - uint32_t path_id = content_object.getPathLabel(); - - if (path_table_.find(path_id) == path_table_.end()) { - if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { - // Create a new path with some default param - - if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { - throw errors::RuntimeException( - "[RAAQM] No path initialized for path table, error could be in " - "default path initialization."); - } - - // Initiate the new path default param - auto new_path = std::make_unique<RaaqmDataPath>( - *(path_table_.at(default_values::path_id))); - - // Insert the new path into hash table - path_table_[path_id] = std::move(new_path); - } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); - } - } - - cur_path_ = path_table_[path_id].get(); - - size_t header_size = content_object.headerSize(); - size_t data_size = content_object.payloadSize(); - - // Update measurements for path - cur_path_->updateReceivedStats(header_size + data_size, data_size); -} - -void RaaqmTransportProtocol::checkDropProbability() { - if (!raaqm_autotune_) { - return; - } - - unsigned int max_pd = 0; - PathTable::iterator it; - for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->getPropagationDelay() > max_pd && - it->second->getPropagationDelay() != UINT_MAX && - !it->second->isStale()) { - max_pd = it->second->getPropagationDelay(); - } - } - - double drop_prob = 0; - double beta = 0; - if (max_pd < wifi_delay_) { // only ethernet paths - drop_prob = default_drop_; - beta = default_beta_; - } else if (max_pd < lte_delay_) { // at least one wifi path - drop_prob = drop_wifi_; - beta = beta_wifi_; - } else { // at least one lte path - drop_prob = drop_lte_; - beta = beta_lte_; - } - - double old_drop_prob = 0; - double old_beta = 0; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); - - if (drop_prob == old_drop_prob && beta == old_beta) { - return; - } - - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); - - for (it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->setDropProb(drop_prob); - } -} - -void RaaqmTransportProtocol::checkForStalePaths() { - if (!raaqm_autotune_) { - return; - } - - bool stale = false; - PathTable::iterator it; - for (it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->isStale()) { - stale = true; - break; - } - } - if (stale) { - checkDropProbability(); - } -} - -} // end namespace protocol - -} // namespace transport |