aboutsummaryrefslogtreecommitdiffstats
path: root/icnet/transport/icnet_transport_raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'icnet/transport/icnet_transport_raaqm.cc')
-rw-r--r--icnet/transport/icnet_transport_raaqm.cc466
1 files changed, 466 insertions, 0 deletions
diff --git a/icnet/transport/icnet_transport_raaqm.cc b/icnet/transport/icnet_transport_raaqm.cc
new file mode 100644
index 00000000..9a1b77b4
--- /dev/null
+++ b/icnet/transport/icnet_transport_raaqm.cc
@@ -0,0 +1,466 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_raaqm.h"
+
+namespace icnet {
+
+RaaqmTransportProtocol::RaaqmTransportProtocol(Socket *icnet_socket)
+ : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) {
+ init();
+}
+
+RaaqmTransportProtocol::~RaaqmTransportProtocol() {
+ if (this->rate_estimator_) {
+ delete this->rate_estimator_;
+ }
+}
+
+void RaaqmTransportProtocol::init() {
+ std::ifstream is(RAAQM_CONFIG_PATH);
+
+ std::string line;
+
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_values::beta_value);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_values::drop_factor);
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, default_values::interest_lifetime);
+ socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ default_values::transport_protocol_max_retransmissions);
+ raaqm_autotune_ = false;
+ default_beta_ = default_values::beta_value;
+ default_drop_ = default_values::drop_factor;
+ beta_wifi_ = default_values::beta_value;
+ drop_wifi_ = default_values::drop_factor;
+ beta_lte_ = default_values::beta_value;
+ drop_lte_ = default_values::drop_factor;
+ wifi_delay_ = 1000;
+ lte_delay_ = 15000;
+ avg_rtt_ = 0.0;
+
+ if (!is) {
+ std::cout << "WARNING: RAAQM parameters not found, set default values" << std::endl;
+ return;
+ }
+
+ std::cout << "Setting RAAQM parameters:" << std::endl;
+ while (getline(is, line)) {
+ std::string command;
+ std::istringstream line_s(line);
+
+ line_s >> command;
+
+ if (command == ";") {
+ continue;
+ }
+
+ if (command == "autotune") {
+ std::string tmp;
+ std::string val;
+ line_s >> tmp >> val;
+ if (val == "yes") {
+ raaqm_autotune_ = true;
+ } else {
+ raaqm_autotune_ = false;
+ }
+ std::cout << "params: autotune = " << raaqm_autotune_ << std::endl;
+ continue;
+ }
+
+ if (command == "lifetime") {
+ std::string tmp;
+ int lifetime;
+ line_s >> tmp >> lifetime;
+ std::cout << "params: lifetime = " << lifetime << std::endl;
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ continue;
+ }
+
+ if (command == "retransmissions") {
+ std::string tmp;
+ int rtx;
+ line_s >> tmp >> rtx;
+ std::cout << "params: retransmissions = " << rtx << std::endl;
+ socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, rtx);
+ continue;
+ }
+
+ if (command == "beta") {
+ std::string tmp;
+ line_s >> tmp >> default_beta_;
+ std::cout << "params: beta = " << default_beta_ << std::endl;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta_);
+ continue;
+ }
+
+ if (command == "drop") {
+ std::string tmp;
+ line_s >> tmp >> default_drop_;
+ std::cout << "params: drop = " << default_drop_ << std::endl;
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_drop_);
+ continue;
+ }
+
+ if (command == "beta_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> beta_wifi_;
+ std::cout << "params: beta_wifi_ = " << beta_wifi_ << std::endl;
+ continue;
+ }
+
+ if (command == "drop_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> drop_wifi_;
+ std::cout << "params: drop_wifi_ = " << drop_wifi_ << std::endl;
+ continue;
+ }
+
+ if (command == "beta_lte_") {
+ std::string tmp;
+ line_s >> tmp >> beta_lte_;
+ std::cout << "params: beta_lte_ = " << beta_lte_ << std::endl;
+ continue;
+ }
+
+ if (command == "drop_lte_") {
+ std::string tmp;
+ line_s >> tmp >> drop_lte_;
+ std::cout << "params: drop_lte_ = " << drop_lte_ << std::endl;
+ continue;
+ }
+
+ if (command == "wifi_delay_") {
+ std::string tmp;
+ line_s >> tmp >> wifi_delay_;
+ std::cout << "params: wifi_delay_ = " << wifi_delay_ << std::endl;
+ continue;
+ }
+
+ if (command == "lte_delay_") {
+ std::string tmp;
+ line_s >> tmp >> lte_delay_;
+ std::cout << "params: lte_delay_ = " << lte_delay_ << std::endl;
+ continue;
+ }
+ if (command == "alpha") {
+ std::string tmp;
+ double rate_alpha = 0.0;
+ line_s >> tmp >> rate_alpha;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, rate_alpha);
+ std::cout << "params: alpha = " << rate_alpha << std::endl;
+ continue;
+ }
+
+ if (command == "batching_parameter") {
+ std::string tmp;
+ int batching_param = 0;
+ line_s >> tmp >> batching_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ std::cout << "params: batching = " << batching_param << std::endl;
+ continue;
+ }
+
+ if (command == "rate_estimator") {
+ std::string tmp;
+ int choice_param = 0;
+ line_s >> tmp >> choice_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, choice_param);
+ std::cout << "params: choice = " << choice_param << std::endl;
+ continue;
+ }
+ }
+ is.close();
+ std::cout << "init done" << std::endl;
+}
+
+void RaaqmTransportProtocol::reset() {
+ is_final_block_number_discovered_ = false;
+ final_block_number_ = std::numeric_limits<uint64_t>::max();
+ segment_number_ = 0;
+ interests_in_flight_ = 0;
+ last_reassembled_segment_ = 0;
+ content_buffer_size_ = 0;
+ content_buffer_.clear();
+ interest_retransmissions_.clear();
+ receive_buffer_.clear();
+ unverified_segments_.clear();
+ verified_manifests_.clear();
+}
+
+void RaaqmTransportProtocol::start() {
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+
+ double drop_factor;
+ double minimum_drop_probability;
+ int sample_number;
+ int interest_lifetime;
+ double beta;
+
+ socket_->getSocketOption(DROP_FACTOR, drop_factor);
+ socket_->getSocketOption(MINIMUM_DROP_PROBABILITY, minimum_drop_probability);
+ socket_->getSocketOption(SAMPLE_NUMBER, sample_number);
+ socket_->getSocketOption(INTEREST_LIFETIME, interest_lifetime);
+ socket_->getSocketOption(BETA_VALUE, beta);
+
+ std::cout << "Drop Factor: " << drop_factor << std::endl;
+ std::cout << "Minimum drop prob: " << minimum_drop_probability << std::endl;
+ std::cout << "Sample number: " << sample_number << std::endl;
+ std::cout << "lifetime: " << interest_lifetime << std::endl;
+ std::cout << "beta: " << beta << std::endl;
+
+ double alpha = 0.0;
+ int batching_param = 0;
+ int choice_param = 0;
+ socket_->getSocketOption(RATE_ESTIMATION_ALPHA, alpha);
+ socket_->getSocketOption(RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ socket_->getSocketOption(RATE_ESTIMATION_CHOICE, choice_param);
+ if (choice_param == 1) {
+ this->rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ socket_->getSocketOption(RATE_ESTIMATION_OBSERVER, &(this->rate_estimator_->observer_));
+
+ cur_path_ =
+ std::make_shared<RaaqmDataPath>(drop_factor, minimum_drop_probability, interest_lifetime * 1000, sample_number);
+ path_table_[default_values::path_id] = cur_path_;
+ }
+
+ VegasTransportProtocol::start();
+}
+
+void RaaqmTransportProtocol::copyContent(ContentObject &content_object) {
+ if ((content_object.getName().get(-1).toSegment() == final_block_number_) || !(is_running_)) {
+ this->rate_estimator_->onDownloadFinished();
+ }
+ VegasTransportProtocol::copyContent(content_object);
+}
+
+void RaaqmTransportProtocol::updatePathTable(const ContentObject &content_object) {
+ unsigned char path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (cur_path_) {
+ // Create a new path with some default param
+ if (path_table_.empty()) {
+ std::cerr << "No path initialized for path table, error could be in default path initialization." << std::endl;
+ exit(EXIT_FAILURE);
+ } else {
+ // Initiate the new path default param
+ std::shared_ptr<RaaqmDataPath>
+ new_path = std::make_shared<RaaqmDataPath>(*(path_table_.at(default_values::path_id)));
+ // Insert the new path into hash table
+ path_table_[path_id] = new_path;
+ }
+ } else {
+ std::cerr << "UNEXPECTED ERROR: when running,current path not found." << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ cur_path_ = path_table_[path_id];
+
+ size_t packet_size = content_object.getPacketSize();
+ size_t data_size = content_object.getContent().size();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(packet_size, data_size);
+}
+
+void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
+ if (!cur_path_) {
+ throw std::runtime_error("ERROR: no current path found, exit");
+ } else {
+ std::chrono::microseconds rtt;
+
+ std::chrono::steady_clock::duration duration =
+ std::chrono::steady_clock::now() - interest_timepoints_[segment % default_values::default_buffer_size];
+ rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onRttUpdate(rtt.count());
+ }
+ cur_path_->insertNewRtt(rtt.count());
+ cur_path_->smoothTimer();
+
+ avg_rtt_ = (avg_rtt_ * 0.99) + ((double) rtt.count() * 0.01);
+ if (cur_path_->newPropagationDelayAvailable()) {
+ check_drop_probability();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ return;
+}
+
+void RaaqmTransportProtocol::check_drop_probability() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ unsigned int max_pd = 0;
+ std::unordered_map<unsigned char, std::shared_ptr<RaaqmDataPath >>::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->getPropagationDelay() > max_pd && it->second->getPropagationDelay() != UINT_MAX
+ && !it->second->isStale()) {
+ max_pd = it->second->getPropagationDelay();
+ }
+ }
+
+ double drop_prob = 0;
+ double beta = 0;
+ if (max_pd < wifi_delay_) { //only ethernet paths
+ drop_prob = default_drop_;
+ beta = default_beta_;
+ } else if (max_pd < lte_delay_) { //at least one wifi path
+ drop_prob = drop_wifi_;
+ beta = beta_wifi_;
+ } else { //at least one lte path
+ drop_prob = drop_lte_;
+ beta = beta_lte_;
+ }
+
+ double old_drop_prob = 0;
+ double old_beta = 0;
+ socket_->getSocketOption(BETA_VALUE, old_beta);
+ socket_->getSocketOption(DROP_FACTOR, old_drop_prob);
+
+ if (drop_prob == old_drop_prob && beta == old_beta) {
+ return;
+ }
+
+ std::cout << "*************[RAAQM TUNING] new beta = " << beta << " new drop = " << drop_prob << " max pd = "
+ << max_pd << std::endl;
+
+ socket_->setSocketOption(BETA_VALUE, beta);
+ socket_->setSocketOption(DROP_FACTOR, drop_prob);
+
+ for (it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->setDropProb(drop_prob);
+ }
+}
+
+void RaaqmTransportProtocol::check_for_stale_paths() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ bool stale = false;
+ std::unordered_map<unsigned char, std::shared_ptr<RaaqmDataPath >>::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->isStale()) {
+ stale = true;
+ break;
+ }
+ }
+ if (stale) {
+ check_drop_probability();
+ }
+}
+
+void RaaqmTransportProtocol::onTimeout(const Interest &interest) {
+ check_for_stale_paths();
+ VegasTransportProtocol::onTimeout(interest);
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ double max_window_size = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size);
+ if (current_window_size_ < max_window_size) // don't expand window above max level
+ {
+ double gamma = -1;
+ socket_->getSocketOption(GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+ this->rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ double min_window_size = -1;
+ socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size);
+ if (current_window_size_ > min_window_size) // don't shrink window below minimum level
+ {
+ double beta = -1;
+ socket_->getSocketOption(BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+ this->rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ std::cerr << "ERROR: no current path found, exit" << std::endl;
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ if (rand() % 10000 <= cur_path_->getDropProb() * 10000) {
+ decreaseWindow();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().get(-1).toSegment());
+ this->rate_estimator_->onDataReceived((int) content_object.getPacketSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
+void RaaqmTransportProtocol::checkForFastRetransmission(const Interest &interest) {
+}
+
+#if 0
+void
+RaaqmTransportProtocol::onInterest(const Interest &interest)
+{
+ bool mobility = interest.get_MobilityLossFlag();
+
+ if(mobility){
+ const Name &name = interest.getName();
+ uint64_t segment = name[-1].toSegment();
+ timeval now;
+ gettimeofday(&now, 0);
+ std::cout << (long) now.tv_sec << "." << (unsigned) now.tv_usec << " RAAQM: M-Interest " <<
+ segment << " " << interest.getName() << std::endl;
+ NackSet::iterator it = m_nackSet.find(segment);
+ if(it == m_nackSet.end()){
+ m_nackSet.insert(segment);
+ }
+ }
+}
+#endif
+
+} // end namespace icnet