From ba8541cad3a4069886444abbd1848b6ef3fff72c Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 22 Feb 2017 14:37:37 +0100 Subject: Initial Commit: libicnet Change-Id: I10a72cb0d84b76553a85c168416b847f6a4ff5f6 Signed-off-by: Mauro Sardara --- icnet/transport/icnet_transport_raaqm.cc | 466 +++++++++++++++++++++++++++++++ 1 file changed, 466 insertions(+) create mode 100644 icnet/transport/icnet_transport_raaqm.cc (limited to 'icnet/transport/icnet_transport_raaqm.cc') diff --git a/icnet/transport/icnet_transport_raaqm.cc b/icnet/transport/icnet_transport_raaqm.cc new file mode 100644 index 00000000..9a1b77b4 --- /dev/null +++ b/icnet/transport/icnet_transport_raaqm.cc @@ -0,0 +1,466 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport_raaqm.h" + +namespace icnet { + +RaaqmTransportProtocol::RaaqmTransportProtocol(Socket *icnet_socket) + : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) { + init(); +} + +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (this->rate_estimator_) { + delete this->rate_estimator_; + } +} + +void RaaqmTransportProtocol::init() { + std::ifstream is(RAAQM_CONFIG_PATH); + + std::string line; + + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_values::beta_value); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_values::drop_factor); + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, default_values::interest_lifetime); + socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + default_values::transport_protocol_max_retransmissions); + raaqm_autotune_ = false; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; + beta_wifi_ = default_values::beta_value; + drop_wifi_ = default_values::drop_factor; + beta_lte_ = default_values::beta_value; + drop_lte_ = default_values::drop_factor; + wifi_delay_ = 1000; + lte_delay_ = 15000; + avg_rtt_ = 0.0; + + if (!is) { + std::cout << "WARNING: RAAQM parameters not found, set default values" << std::endl; + return; + } + + std::cout << "Setting RAAQM parameters:" << std::endl; + while (getline(is, line)) { + std::string command; + std::istringstream line_s(line); + + line_s >> command; + + if (command == ";") { + continue; + } + + if (command == "autotune") { + std::string tmp; + std::string val; + line_s >> tmp >> val; + if (val == "yes") { + raaqm_autotune_ = true; + } else { + raaqm_autotune_ = false; + } + std::cout << "params: autotune = " << raaqm_autotune_ << std::endl; + continue; + } + + if (command == "lifetime") { + std::string tmp; + int lifetime; + line_s >> tmp >> lifetime; + std::cout << "params: lifetime = " << lifetime << std::endl; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + continue; + } + + if (command == "retransmissions") { + std::string tmp; + int rtx; + line_s >> tmp >> rtx; + std::cout << "params: retransmissions = " << rtx << std::endl; + socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, rtx); + continue; + } + + if (command == "beta") { + std::string tmp; + line_s >> tmp >> default_beta_; + std::cout << "params: beta = " << default_beta_ << std::endl; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta_); + continue; + } + + if (command == "drop") { + std::string tmp; + line_s >> tmp >> default_drop_; + std::cout << "params: drop = " << default_drop_ << std::endl; + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_drop_); + continue; + } + + if (command == "beta_wifi_") { + std::string tmp; + line_s >> tmp >> beta_wifi_; + std::cout << "params: beta_wifi_ = " << beta_wifi_ << std::endl; + continue; + } + + if (command == "drop_wifi_") { + std::string tmp; + line_s >> tmp >> drop_wifi_; + std::cout << "params: drop_wifi_ = " << drop_wifi_ << std::endl; + continue; + } + + if (command == "beta_lte_") { + std::string tmp; + line_s >> tmp >> beta_lte_; + std::cout << "params: beta_lte_ = " << beta_lte_ << std::endl; + continue; + } + + if (command == "drop_lte_") { + std::string tmp; + line_s >> tmp >> drop_lte_; + std::cout << "params: drop_lte_ = " << drop_lte_ << std::endl; + continue; + } + + if (command == "wifi_delay_") { + std::string tmp; + line_s >> tmp >> wifi_delay_; + std::cout << "params: wifi_delay_ = " << wifi_delay_ << std::endl; + continue; + } + + if (command == "lte_delay_") { + std::string tmp; + line_s >> tmp >> lte_delay_; + std::cout << "params: lte_delay_ = " << lte_delay_ << std::endl; + continue; + } + if (command == "alpha") { + std::string tmp; + double rate_alpha = 0.0; + line_s >> tmp >> rate_alpha; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, rate_alpha); + std::cout << "params: alpha = " << rate_alpha << std::endl; + continue; + } + + if (command == "batching_parameter") { + std::string tmp; + int batching_param = 0; + line_s >> tmp >> batching_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + std::cout << "params: batching = " << batching_param << std::endl; + continue; + } + + if (command == "rate_estimator") { + std::string tmp; + int choice_param = 0; + line_s >> tmp >> choice_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, choice_param); + std::cout << "params: choice = " << choice_param << std::endl; + continue; + } + } + is.close(); + std::cout << "init done" << std::endl; +} + +void RaaqmTransportProtocol::reset() { + is_final_block_number_discovered_ = false; + final_block_number_ = std::numeric_limits::max(); + segment_number_ = 0; + interests_in_flight_ = 0; + last_reassembled_segment_ = 0; + content_buffer_size_ = 0; + content_buffer_.clear(); + interest_retransmissions_.clear(); + receive_buffer_.clear(); + unverified_segments_.clear(); + verified_manifests_.clear(); +} + +void RaaqmTransportProtocol::start() { + if (this->rate_estimator_) { + this->rate_estimator_->onStart(); + } + + if (!cur_path_) { + + double drop_factor; + double minimum_drop_probability; + int sample_number; + int interest_lifetime; + double beta; + + socket_->getSocketOption(DROP_FACTOR, drop_factor); + socket_->getSocketOption(MINIMUM_DROP_PROBABILITY, minimum_drop_probability); + socket_->getSocketOption(SAMPLE_NUMBER, sample_number); + socket_->getSocketOption(INTEREST_LIFETIME, interest_lifetime); + socket_->getSocketOption(BETA_VALUE, beta); + + std::cout << "Drop Factor: " << drop_factor << std::endl; + std::cout << "Minimum drop prob: " << minimum_drop_probability << std::endl; + std::cout << "Sample number: " << sample_number << std::endl; + std::cout << "lifetime: " << interest_lifetime << std::endl; + std::cout << "beta: " << beta << std::endl; + + double alpha = 0.0; + int batching_param = 0; + int choice_param = 0; + socket_->getSocketOption(RATE_ESTIMATION_ALPHA, alpha); + socket_->getSocketOption(RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + socket_->getSocketOption(RATE_ESTIMATION_CHOICE, choice_param); + if (choice_param == 1) { + this->rate_estimator_ = new ALaTcpEstimator(); + } else { + this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + } + + socket_->getSocketOption(RATE_ESTIMATION_OBSERVER, &(this->rate_estimator_->observer_)); + + cur_path_ = + std::make_shared(drop_factor, minimum_drop_probability, interest_lifetime * 1000, sample_number); + path_table_[default_values::path_id] = cur_path_; + } + + VegasTransportProtocol::start(); +} + +void RaaqmTransportProtocol::copyContent(ContentObject &content_object) { + if ((content_object.getName().get(-1).toSegment() == final_block_number_) || !(is_running_)) { + this->rate_estimator_->onDownloadFinished(); + } + VegasTransportProtocol::copyContent(content_object); +} + +void RaaqmTransportProtocol::updatePathTable(const ContentObject &content_object) { + unsigned char path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (cur_path_) { + // Create a new path with some default param + if (path_table_.empty()) { + std::cerr << "No path initialized for path table, error could be in default path initialization." << std::endl; + exit(EXIT_FAILURE); + } else { + // Initiate the new path default param + std::shared_ptr + new_path = std::make_shared(*(path_table_.at(default_values::path_id))); + // Insert the new path into hash table + path_table_[path_id] = new_path; + } + } else { + std::cerr << "UNEXPECTED ERROR: when running,current path not found." << std::endl; + exit(EXIT_FAILURE); + } + } + + cur_path_ = path_table_[path_id]; + + size_t packet_size = content_object.getPacketSize(); + size_t data_size = content_object.getContent().size(); + + // Update measurements for path + cur_path_->updateReceivedStats(packet_size, data_size); +} + +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (!cur_path_) { + throw std::runtime_error("ERROR: no current path found, exit"); + } else { + std::chrono::microseconds rtt; + + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - interest_timepoints_[segment % default_values::default_buffer_size]; + rtt = std::chrono::duration_cast(duration); + if (this->rate_estimator_) { + this->rate_estimator_->onRttUpdate(rtt.count()); + } + cur_path_->insertNewRtt(rtt.count()); + cur_path_->smoothTimer(); + + avg_rtt_ = (avg_rtt_ * 0.99) + ((double) rtt.count() * 0.01); + if (cur_path_->newPropagationDelayAvailable()) { + check_drop_probability(); + } + } +} + +void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) { + return; +} + +void RaaqmTransportProtocol::check_drop_probability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + std::unordered_map>::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && it->second->getPropagationDelay() != UINT_MAX + && !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { //only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { //at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { //at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + socket_->getSocketOption(BETA_VALUE, old_beta); + socket_->getSocketOption(DROP_FACTOR, old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + std::cout << "*************[RAAQM TUNING] new beta = " << beta << " new drop = " << drop_prob << " max pd = " + << max_pd << std::endl; + + socket_->setSocketOption(BETA_VALUE, beta); + socket_->setSocketOption(DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::check_for_stale_paths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + std::unordered_map>::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + check_drop_probability(); + } +} + +void RaaqmTransportProtocol::onTimeout(const Interest &interest) { + check_for_stale_paths(); + VegasTransportProtocol::onTimeout(interest); +} + +void RaaqmTransportProtocol::increaseWindow() { + double max_window_size = -1; + socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size); + if (current_window_size_ < max_window_size) // don't expand window above max level + { + double gamma = -1; + socket_->getSocketOption(GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } + this->rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + double min_window_size = -1; + socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size); + if (current_window_size_ > min_window_size) // don't shrink window below minimum level + { + double beta = -1; + socket_->getSocketOption(BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } + this->rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + std::cerr << "ERROR: no current path found, exit" << std::endl; + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + if (rand() % 10000 <= cur_path_->getDropProb() * 10000) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().get(-1).toSegment()); + this->rate_estimator_->onDataReceived((int) content_object.getPacketSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + +void RaaqmTransportProtocol::checkForFastRetransmission(const Interest &interest) { +} + +#if 0 +void +RaaqmTransportProtocol::onInterest(const Interest &interest) +{ + bool mobility = interest.get_MobilityLossFlag(); + + if(mobility){ + const Name &name = interest.getName(); + uint64_t segment = name[-1].toSegment(); + timeval now; + gettimeofday(&now, 0); + std::cout << (long) now.tv_sec << "." << (unsigned) now.tv_usec << " RAAQM: M-Interest " << + segment << " " << interest.getName() << std::endl; + NackSet::iterator it = m_nackSet.find(segment); + if(it == m_nackSet.end()){ + m_nackSet.insert(segment); + } + } +} +#endif + +} // end namespace icnet -- cgit 1.2.3-korg