aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/raaqm.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc575
1 files changed, 418 insertions, 157 deletions
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 3e04689e9..f5eb48bd8 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -14,8 +14,10 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_indexing_manager.h>
#include <hicn/transport/protocols/raaqm.h>
+#include <cstdlib>
#include <fstream>
namespace transport {
@@ -24,8 +26,14 @@ namespace protocol {
using namespace interface;
-RaaqmTransportProtocol::RaaqmTransportProtocol(BaseSocket *icnet_socket)
- : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) {
+RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ BaseReassembly(icnet_socket, this),
+ current_window_size_(1),
+ interests_in_flight_(0),
+ cur_path_(nullptr),
+ t0_(utils::SteadyClock::now()),
+ rate_estimator_(nullptr) {
init();
}
@@ -35,16 +43,123 @@ RaaqmTransportProtocol::~RaaqmTransportProtocol() {
}
}
+int RaaqmTransportProtocol::start() {
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+ // RAAQM
+ double drop_factor;
+ double minimum_drop_probability;
+ uint32_t sample_number;
+ uint32_t interest_lifetime;
+
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor);
+ socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY,
+ minimum_drop_probability);
+ socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER,
+ sample_number);
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+
+ // Rate Estimation
+ double alpha = 0.0;
+ uint32_t batching_param = 0;
+ uint32_t choice_param = 0;
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ alpha);
+ socket_->getSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
+
+ if (choice_param == 1) {
+ this->rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
+ &this->rate_estimator_->observer_);
+
+ // Current path
+ auto cur_path = std::make_unique<RaaqmDataPath>(
+ drop_factor, minimum_drop_probability, interest_lifetime * 1000,
+ sample_number);
+ cur_path_ = cur_path.get();
+ path_table_[default_values::path_id] = std::move(cur_path);
+ }
+
+ portal_->setConsumerCallback(this);
+ return TransportProtocol::start();
+}
+
+void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
+
+void RaaqmTransportProtocol::reset() {
+ // Reset reassembly component
+ BaseReassembly::reset();
+
+ // Reset protocol variables
+ interests_in_flight_ = 0;
+ t0_ = utils::SteadyClock::now();
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ double max_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
+ max_window_size);
+ if (current_window_size_ < max_window_size) {
+ double gamma = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ this->rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ double min_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
+ min_window_size);
+ if (current_window_size_ > min_window_size) {
+ double beta = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ this->rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().getSuffix());
+ this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ content_object.headerSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
void RaaqmTransportProtocol::init() {
std::ifstream is(RAAQM_CONFIG_PATH);
std::string line;
-
- socket_->beta_ = default_values::beta_value;
- socket_->drop_factor_ = default_values::drop_factor;
- socket_->interest_lifetime_ = default_values::interest_lifetime;
- socket_->max_retransmissions_ =
- default_values::transport_protocol_max_retransmissions;
raaqm_autotune_ = false;
default_beta_ = default_values::beta_value;
default_drop_ = default_values::drop_factor;
@@ -56,7 +171,9 @@ void RaaqmTransportProtocol::init() {
lte_delay_ = 15000;
if (!is) {
- TRANSPORT_LOGW("WARNING: RAAQM parameters not found, set default values");
+ TRANSPORT_LOGW(
+ "WARNING: RAAQM parameters not found at %s, set default values",
+ RAAQM_CONFIG_PATH);
return;
}
@@ -86,7 +203,8 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t lifetime;
line_s >> tmp >> lifetime;
- socket_->interest_lifetime_ = lifetime;
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
continue;
}
@@ -94,21 +212,23 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t rtx;
line_s >> tmp >> rtx;
- socket_->max_retransmissions_ = rtx;
+ socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx);
continue;
}
if (command == "beta") {
std::string tmp;
line_s >> tmp >> default_beta_;
- socket_->beta_ = default_beta_;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ default_beta_);
continue;
}
if (command == "drop") {
std::string tmp;
line_s >> tmp >> default_drop_;
- socket_->drop_factor_ = default_drop_;
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
+ default_drop_);
continue;
}
@@ -151,7 +271,8 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
double rate_alpha = 0.0;
line_s >> tmp >> rate_alpha;
- socket_->rate_estimation_alpha_ = rate_alpha;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ rate_alpha);
continue;
}
@@ -159,7 +280,9 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t batching_param = 0;
line_s >> tmp >> batching_param;
- socket_->rate_estimation_batching_parameter_ = batching_param;
+ socket_->setSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER,
+ batching_param);
continue;
}
@@ -167,133 +290,332 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t choice_param = 0;
line_s >> tmp >> choice_param;
- socket_->rate_estimation_choice_ = choice_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
continue;
}
}
+
is.close();
}
-void RaaqmTransportProtocol::start(
- utils::SharableVector<uint8_t> &content_buffer) {
- if (this->rate_estimator_) {
- this->rate_estimator_->onStart();
+void RaaqmTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+
+ // Check whether makes sense to continue
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
}
- if (!cur_path_) {
- double drop_factor;
- double minimum_drop_probability;
- uint32_t sample_number;
- uint32_t interest_lifetime;
- // double beta;
+ // Call application-defined callbacks
+ ConsumerContentObjectCallback *callback_content_object = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &callback_content_object);
+ if (*callback_content_object != VOID_HANDLER) {
+ (*callback_content_object)(*socket_, *content_object);
+ }
- drop_factor = socket_->drop_factor_;
- minimum_drop_probability = socket_->minimum_drop_probability_;
- sample_number = socket_->sample_number_;
- interest_lifetime = socket_->interest_lifetime_;
- // beta = socket_->beta_;
+ ConsumerInterestCallback *callback_interest = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &callback_interest);
+ if (*callback_content_object != VOID_HANDLER) {
+ (*callback_interest)(*socket_, *interest);
+ }
- double alpha = 0.0;
- uint32_t batching_param = 0;
- uint32_t choice_param = 0;
- alpha = socket_->rate_estimation_alpha_;
- batching_param = socket_->rate_estimation_batching_parameter_;
- choice_param = socket_->rate_estimation_choice_;
+ if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
+ PayloadType::MANIFEST)) {
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ index_manager_ = manifest_index_manager_.get();
+ interests_in_flight_--;
+ }
- if (choice_param == 1) {
- this->rate_estimator_ = new ALaTcpEstimator();
- } else {
- this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ index_manager_->onManifest(std::move(content_object));
+
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ index_manager_ = incremental_index_manager_.get();
}
- this->rate_estimator_->observer_ = socket_->rate_estimation_observer_;
+ onContentSegment(std::move(interest), std::move(content_object));
+ }
- cur_path_ = std::make_shared<RaaqmDataPath>(
- drop_factor, minimum_drop_probability, interest_lifetime * 1000,
- sample_number);
- path_table_[default_values::path_id] = cur_path_;
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ BaseReassembly::index_ = index_manager_->getNextReassemblySegment();
}
- VegasTransportProtocol::start(content_buffer);
+ scheduleNextInterests();
}
-void RaaqmTransportProtocol::copyContent(const ContentObject &content_object) {
- if (TRANSPORT_EXPECT_FALSE(
- (content_object.getName().getSuffix() == final_block_number_) ||
- !(is_running_))) {
- this->rate_estimator_->onDownloadFinished();
+void RaaqmTransportProtocol::onContentSegment(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+ bool virtual_download = false;
+ socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download);
+
+ // Decrease in-flight interests
+ interests_in_flight_--;
+
+ // Update stats
+ if (!interest_retransmissions_[incremental_suffix & mask]) {
+ afterContentReception(*interest, *content_object);
+ }
+
+ if (index_manager_->onContentObject(*content_object)) {
+ stats_.updateBytesRecv(content_object->payloadSize());
+
+ if (!virtual_download) {
+ reassemble(std::move(content_object));
+ } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
+ index_manager_->getFinalSuffix())) {
+ onContentReassembled(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ // TODO Application policy check
+ // unverified_segments_.emplace(
+ // std::make_pair(incremental_suffix, std::move(content_object)));
+ TRANSPORT_LOGE("Received not trusted segment.");
}
- VegasTransportProtocol::copyContent(content_object);
}
-void RaaqmTransportProtocol::updatePathTable(
- const ContentObject &content_object) {
- uint32_t path_id = content_object.getPathLabel();
+void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ checkForStalePaths();
- if (path_table_.find(path_id) == path_table_.end()) {
- if (cur_path_) {
- // Create a new path with some default param
- if (path_table_.empty()) {
- throw errors::RuntimeException(
- "No path initialized for path table, error could be in default "
- "path initialization.");
- } else {
- // Initiate the new path default param
- std::shared_ptr<RaaqmDataPath> new_path =
- std::make_shared<RaaqmDataPath>(
- *(path_table_.at(default_values::path_id)));
- // Insert the new path into hash table
- path_table_[path_id] = new_path;
- }
+ const Name &n = interest->getName();
+
+ TRANSPORT_LOGW("Timeout on %s", n.toString().c_str());
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ uint64_t segment = n.getSuffix();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (segment >= index_manager_->getFinalSuffix()) {
+ return;
+ }
+
+ ConsumerInterestCallback *callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ afterDataUnsatisfied(segment);
+
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
+
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_.updateRetxCount(1);
+
+ callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if ((*callback) != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
+
+ scheduleNextInterests();
+ } else {
+ TRANSPORT_LOGE("Stop: reached max retx limit.");
+ onContentReassembled(std::make_error_code(std::errc(std::errc::io_error)));
+ }
+}
+
+void RaaqmTransportProtocol::scheduleNextInterests() {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ uint32_t index = IndexManager::invalid_index;
+ // Send the interest needed for filling the window
+ while (interests_in_flight_ < current_window_size_) {
+ if (interest_to_retransmit_.size() > 0) {
+ sendInterest(std::move(interest_to_retransmit_.front()));
+ interest_to_retransmit_.pop();
} else {
- throw errors::RuntimeException(
- "UNEXPECTED ERROR: when running,current path not found.");
+ index = index_manager_->getNextSuffix();
+ if (index == IndexManager::invalid_index) {
+ break;
+ }
+ sendInterest(index);
}
}
+}
- cur_path_ = path_table_[path_id];
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = getPacket();
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ name->setSuffix(next_suffix);
+ interest->setName(*name);
+
+ uint32_t interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+ interest->setLifetime(interest_lifetime);
+
+ ConsumerInterestCallback *callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ callback->operator()(*socket_, *interest);
+ }
- size_t header_size = content_object.headerSize();
- size_t data_size = content_object.payloadSize();
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
- // Update measurements for path
- cur_path_->updateReceivedStats(header_size + data_size, data_size);
+ interest_retransmissions_[next_suffix & mask] = ~0;
+ interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
+ sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
+ interests_in_flight_++;
+ interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
+ interface::ConsumerContentCallback *on_payload = nullptr;
+ socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
+ if (*on_payload != VOID_HANDLER) {
+ std::shared_ptr<std::vector<uint8_t>> content_buffer;
+ socket_->getSocketOption(
+ interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
+ (*on_payload)(*socket_, content_buffer->size(), ec);
+ }
+
+ this->rate_estimator_->onDownloadFinished();
+ stop();
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
- throw std::runtime_error("ERROR: no current path found, exit");
+ throw std::runtime_error("RAAQM ERROR: no current path found, exit");
} else {
- std::chrono::microseconds rtt;
+ auto now = utils::SteadyClock::now();
+ utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
- std::chrono::steady_clock::duration duration =
- std::chrono::steady_clock::now() -
- interest_timepoints_[segment & mask_];
- rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+ // Update stats
+ updateStats(segment, rtt.count(), now);
if (this->rate_estimator_) {
this->rate_estimator_->onRttUpdate((double)rtt.count());
}
+
cur_path_->insertNewRtt(rtt.count());
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
- check_drop_probability();
+ checkDropProbability();
}
}
}
-void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) {
- return;
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ throw errors::RuntimeException("ERROR: no current path found, exit");
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ if (std::rand() % 10000 <= cur_path_->getDropProb() * 10000) {
+ decreaseWindow();
+ }
+ }
}
-void RaaqmTransportProtocol::check_drop_probability() {
+void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now) {
+ // Update RTT statistics
+ stats_.updateAverageRtt(rtt);
+ stats_.updateAverageWindowSize(current_window_size_);
+
+ // Call statistics callback
+ ConsumerTimerCallback *stats_callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback != VOID_HANDLER) {
+ auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
+
+ uint32_t timer_interval_milliseconds = 0;
+ socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ timer_interval_milliseconds);
+ if (dt.count() > timer_interval_milliseconds) {
+ (*stats_callback)(*socket_, stats_);
+ t0_ = utils::SteadyClock::now();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updatePathTable(
+ const ContentObject &content_object) {
+ uint32_t path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
+ // Create a new path with some default param
+
+ if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
+ throw errors::RuntimeException(
+ "[RAAQM] No path initialized for path table, error could be in "
+ "default path initialization.");
+ }
+
+ // Initiate the new path default param
+ auto new_path = std::make_unique<RaaqmDataPath>(
+ *(path_table_.at(default_values::path_id)));
+
+ // Insert the new path into hash table
+ path_table_[path_id] = std::move(new_path);
+ } else {
+ throw errors::RuntimeException(
+ "UNEXPECTED ERROR: when running,current path not found.");
+ }
+ }
+
+ cur_path_ = path_table_[path_id].get();
+
+ size_t header_size = content_object.headerSize();
+ size_t data_size = content_object.payloadSize();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(header_size + data_size, data_size);
+}
+
+void RaaqmTransportProtocol::checkDropProbability() {
if (!raaqm_autotune_) {
return;
}
unsigned int max_pd = 0;
- std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ PathTable::iterator it;
for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
if (it->second->getPropagationDelay() > max_pd &&
it->second->getPropagationDelay() != UINT_MAX &&
@@ -317,28 +639,28 @@ void RaaqmTransportProtocol::check_drop_probability() {
double old_drop_prob = 0;
double old_beta = 0;
- old_beta = socket_->beta_;
- old_drop_prob = socket_->drop_factor_;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob);
if (drop_prob == old_drop_prob && beta == old_beta) {
return;
}
- socket_->beta_ = beta;
- socket_->drop_factor_ = drop_prob;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
for (it = path_table_.begin(); it != path_table_.end(); it++) {
it->second->setDropProb(drop_prob);
}
}
-void RaaqmTransportProtocol::check_for_stale_paths() {
+void RaaqmTransportProtocol::checkForStalePaths() {
if (!raaqm_autotune_) {
return;
}
bool stale = false;
- std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ PathTable::iterator it;
for (it = path_table_.begin(); it != path_table_.end(); ++it) {
if (it->second->isStale()) {
stale = true;
@@ -346,71 +668,10 @@ void RaaqmTransportProtocol::check_for_stale_paths() {
}
}
if (stale) {
- check_drop_probability();
+ checkDropProbability();
}
}
-void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- check_for_stale_paths();
- VegasTransportProtocol::onTimeout(std::move(interest));
-}
-
-void RaaqmTransportProtocol::increaseWindow() {
- double max_window_size = socket_->max_window_size_;
- if (current_window_size_ < max_window_size) {
- double gamma = socket_->gamma_;
-
- current_window_size_ += gamma / current_window_size_;
- socket_->current_window_size_ = current_window_size_;
- }
- this->rate_estimator_->onWindowIncrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::decreaseWindow() {
- double min_window_size = socket_->min_window_size_;
- if (current_window_size_ > min_window_size) {
- double beta = socket_->beta_;
-
- current_window_size_ = current_window_size_ * beta;
- if (current_window_size_ < min_window_size) {
- current_window_size_ = min_window_size;
- }
-
- socket_->current_window_size_ = current_window_size_;
- }
- this->rate_estimator_->onWindowDecrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::RAAQM() {
- if (!cur_path_) {
- throw errors::RuntimeException("ERROR: no current path found, exit");
- exit(EXIT_FAILURE);
- } else {
- // Change drop probability according to RTT statistics
- cur_path_->updateDropProb();
-
- if (rand() % 10000 <= cur_path_->getDropProb() * 10000) {
- decreaseWindow();
- }
- }
-}
-
-void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
- // Decrease the window because the timeout happened
- decreaseWindow();
-}
-
-void RaaqmTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {
- updatePathTable(content_object);
- increaseWindow();
- updateRtt(interest.getName().getSuffix());
- this->rate_estimator_->onDataReceived(
- (int)(content_object.payloadSize() + content_object.headerSize()));
- // Set drop probablility and window size accordingly
- RAAQM();
-}
-
} // end namespace protocol
} // end namespace transport