aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/raaqm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/raaqm.cc')
-rw-r--r--libtransport/src/protocols/raaqm.cc235
1 files changed, 99 insertions, 136 deletions
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 783d6194b..bcbc15aef 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,10 +13,11 @@
* limitations under the License.
*/
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
-#include <protocols/indexer.h>
+#include <protocols/index_manager_bytestream.h>
#include <protocols/raaqm.h>
#include <cstdlib>
@@ -30,12 +31,14 @@ using namespace interface;
RaaqmTransportProtocol::RaaqmTransportProtocol(
implementation::ConsumerSocket *icn_socket)
- : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
+ : TransportProtocol(icn_socket, new IndexManager(icn_socket, this),
+ new ByteStreamReassembly(icn_socket, this)),
current_window_size_(1),
interests_in_flight_(0),
cur_path_(nullptr),
- t0_(utils::SteadyClock::now()),
+ t0_(utils::SteadyTime::Clock::now()),
rate_estimator_(nullptr),
+ dis_(0, 1.0),
schedule_interests_(true) {
init();
}
@@ -46,11 +49,34 @@ RaaqmTransportProtocol::~RaaqmTransportProtocol() {
}
}
-int RaaqmTransportProtocol::start() {
+void RaaqmTransportProtocol::reset() {
+ // Set first segment to retrieve
+ TransportProtocol::reset();
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ indexer_verifier_->setFirstSuffix(name->getSuffix());
+ std::queue<uint32_t> empty;
+ std::swap(interest_to_retransmit_, empty);
+ stats_->reset();
+
+ // Reset protocol variables
+ interests_in_flight_ = 0;
+ t0_ = utils::SteadyTime::Clock::now();
+
+ // Optionally reset congestion window
+ bool reset_window;
+ socket_->getSocketOption(RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET,
+ reset_window);
+ if (reset_window) {
+ current_window_size_ = 1;
+ }
+
+ // Reset rate estimator
if (rate_estimator_) {
rate_estimator_->onStart();
}
+ // If not cur_path exists, create one
if (!cur_path_) {
// RAAQM
double drop_factor;
@@ -93,33 +119,6 @@ int RaaqmTransportProtocol::start() {
cur_path_ = cur_path.get();
path_table_[default_values::path_id] = std::move(cur_path);
}
-
- portal_->setConsumerCallback(this);
- return TransportProtocol::start();
-}
-
-void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
-
-void RaaqmTransportProtocol::reset() {
- // Set first segment to retrieve
- core::Name *name;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
- index_manager_->reset();
- index_manager_->setFirstSuffix(name->getSuffix());
- std::queue<Interest::Ptr> empty;
- std::swap(interest_to_retransmit_, empty);
- stats_->reset();
-
- // Reset reassembly component
- reassembly_protocol_->reInitialize();
-
- // Reset protocol variables
- interests_in_flight_ = 0;
- t0_ = utils::SteadyClock::now();
-}
-
-bool RaaqmTransportProtocol::verifyKeyPackets() {
- return index_manager_->onKeyToVerify();
}
void RaaqmTransportProtocol::increaseWindow() {
@@ -189,9 +188,8 @@ void RaaqmTransportProtocol::init() {
lte_delay_ = 15000;
if (!is) {
- TRANSPORT_LOGW(
- "WARNING: RAAQM parameters not found at %s, set default values",
- RAAQM_CONFIG_PATH);
+ LOG(WARNING) << "RAAQM parameters not found at " << RAAQM_CONFIG_PATH
+ << ", set default values";
return;
}
@@ -317,75 +315,66 @@ void RaaqmTransportProtocol::init() {
is.close();
}
-void RaaqmTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- // Check whether makes sense to continue
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
+void RaaqmTransportProtocol::onContentObjectReceived(
+ Interest &interest, ContentObject &content_object, std::error_code &ec) {
+ uint32_t incremental_suffix = content_object.getName().getSuffix();
// Call application-defined callbacks
if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), *content_object);
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
}
if (*on_interest_satisfied_) {
- (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
- }
-
- if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- stats_->updateBytesRecv(content_object->payloadSize());
+ (*on_interest_satisfied_)(*socket_->getInterface(), interest);
}
- onContentSegment(std::move(interest), std::move(content_object));
- scheduleNextInterests();
-}
+ ec = make_error_code(protocol_error::success);
-void RaaqmTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
+ if (content_object.getPayloadType() == PayloadType::DATA) {
+ stats_->updateBytesRecv(content_object.payloadSize());
+ }
// Decrease in-flight interests
interests_in_flight_--;
// Update stats
if (!interest_retransmissions_[incremental_suffix & mask]) {
- afterContentReception(*interest, *content_object);
+ afterContentReception(interest, content_object);
}
- index_manager_->onContentObject(std::move(interest),
- std::move(content_object));
+ // Schedule next interests
+ scheduleNextInterests();
}
-void RaaqmTransportProtocol::onPacketDropped(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onPacketDropped(Interest &interest,
+ ContentObject &content_object,
+ const std::error_code &reason) {
uint32_t max_rtx = 0;
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- uint64_t segment = interest->getName().getSuffix();
+ uint64_t segment = interest.getName().getSuffix();
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
if (*on_interest_retransmission_) {
- (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
+ (*on_interest_retransmission_)(*socket_->getInterface(), interest);
}
if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
+ (*on_interest_output_)(*socket_->getInterface(), interest);
}
- if (!is_running_) {
+ if (!isRunning()) {
return;
}
interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(std::move(interest));
+ interest_to_retransmit_.push((unsigned int)segment);
} else {
- TRANSPORT_LOGE(
- "Stop: received not trusted packet %llu times",
- (unsigned long long)interest_retransmissions_[segment & mask]);
+ LOG(ERROR) << "Stop: received not trusted packet "
+ << interest_retransmissions_[segment & mask] << " times";
onContentReassembled(
make_error_code(protocol_error::max_retransmissions_error));
}
@@ -395,23 +384,27 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
}
-void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- checkForStalePaths();
-
- const Name &n = interest->getName();
-
- TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str());
+void RaaqmTransportProtocol::sendInterest(
+ const Name &interest_name,
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST> *additional_suffixes,
+ uint32_t len) {
+ interests_in_flight_++;
+ interest_retransmissions_[interest_name.getSuffix() & mask]++;
+ interest_timepoints_[interest_name.getSuffix() & mask] =
+ utils::SteadyTime::Clock::now();
+ TransportProtocol::sendInterest(interest_name, additional_suffixes, len);
+}
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
+void RaaqmTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
+ const Name &n) {
+ checkForStalePaths();
interests_in_flight_--;
uint64_t segment = n.getSuffix();
// Do not retransmit interests asking contents that do not exist.
- if (segment > index_manager_->getFinalSuffix()) {
+ if (segment > indexer_verifier_->getFinalSuffix()) {
return;
}
@@ -432,32 +425,34 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
(*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- if (!is_running_) {
+ if (!isRunning()) {
return;
}
- interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(std::move(interest));
-
+ interest_to_retransmit_.push((unsigned int)segment);
scheduleNextInterests();
} else {
- TRANSPORT_LOGE("Stop: reached max retx limit.");
+ LOG(ERROR) << "Stop: reached max retx limit.";
onContentReassembled(std::make_error_code(std::errc(std::errc::io_error)));
}
}
void RaaqmTransportProtocol::scheduleNextInterests() {
- bool cancel = (!is_running_ && !is_first_) || !schedule_interests_;
+ bool cancel = (!isRunning() && !is_first_) || !schedule_interests_;
if (TRANSPORT_EXPECT_FALSE(cancel)) {
schedule_interests_ = true;
return;
}
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+
if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ &&
interest_to_retransmit_.size() > 0)) {
// send at least one interest if there are retransmissions to perform and
// there is no space left in the window
- sendInterest(std::move(interest_to_retransmit_.front()));
+ auto suffix = interest_to_retransmit_.front();
+ sendInterest(name->setSuffix(suffix));
interest_to_retransmit_.pop();
}
@@ -466,58 +461,26 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
// Send the interest needed for filling the window
while (interests_in_flight_ < current_window_size_) {
if (interest_to_retransmit_.size() > 0) {
- sendInterest(std::move(interest_to_retransmit_.front()));
+ auto suffix = interest_to_retransmit_.front();
+ sendInterest(name->setSuffix(suffix));
interest_to_retransmit_.pop();
} else {
- index = index_manager_->getNextSuffix();
+ if (TRANSPORT_EXPECT_FALSE(!isRunning() && !is_first_)) {
+ break;
+ }
+
+ index = indexer_verifier_->getNextSuffix();
if (index == IndexManager::invalid_index) {
break;
}
- sendInterest(index);
+ interest_retransmissions_[index & mask] = ~0;
+ sendInterest(name->setSuffix(index));
}
}
}
-bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getPacket();
- core::Name *name;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
- name->setSuffix((uint32_t)next_suffix);
- interest->setName(*name);
-
- uint32_t interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
- interest->setLifetime(interest_lifetime);
-
- if (*on_interest_output_) {
- on_interest_output_->operator()(*socket_->getInterface(), *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return false;
- }
-
- // This is set to ~0 so that the next interest_retransmissions_ + 1,
- // performed by sendInterest, will result in 0
- interest_retransmissions_[next_suffix & mask] = ~0;
- interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
-
- sendInterest(std::move(interest));
-
- return true;
-}
-
-void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
- interests_in_flight_++;
- interest_retransmissions_[interest->getName().getSuffix() & mask]++;
-
- TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str());
- portal_->sendInterest(std::move(interest));
-}
-
-void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
+void RaaqmTransportProtocol::onContentReassembled(const std::error_code &ec) {
rate_estimator_->onDownloadFinished();
TransportProtocol::onContentReassembled(ec);
schedule_interests_ = false;
@@ -527,18 +490,18 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
throw std::runtime_error("RAAQM ERROR: no current path found, exit");
} else {
- auto now = utils::SteadyClock::now();
- utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
- now - interest_timepoints_[segment & mask]);
+ auto now = utils::SteadyTime::Clock::now();
+ auto rtt = utils::SteadyTime::getDurationUs(
+ interest_timepoints_[segment & mask], now);
// Update stats
- updateStats((uint32_t)segment, rtt.count(), now);
+ updateStats((uint32_t)segment, rtt, now);
if (rate_estimator_) {
- rate_estimator_->onRttUpdate((double)rtt.count());
+ rate_estimator_->onRttUpdate(rtt);
}
- cur_path_->insertNewRtt(rtt.count(), now);
+ cur_path_->insertNewRtt(rtt, now);
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
@@ -550,27 +513,27 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
void RaaqmTransportProtocol::RAAQM() {
if (!cur_path_) {
throw errors::RuntimeException("ERROR: no current path found, exit");
- exit(EXIT_FAILURE);
} else {
// Change drop probability according to RTT statistics
cur_path_->updateDropProb();
- double coin = ((double)rand() / (RAND_MAX));
+ double coin = dis_(gen_);
if (coin <= cur_path_->getDropProb()) {
decreaseWindow();
}
}
}
-void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
- utils::TimePoint &now) {
+void RaaqmTransportProtocol::updateStats(
+ uint32_t suffix, const utils::SteadyTime::Microseconds &rtt,
+ utils::SteadyTime::TimePoint &now) {
// Update RTT statistics
stats_->updateAverageRtt(rtt);
stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
if (*stats_summary_) {
- auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
+ auto dt = utils::SteadyTime::getDurationMs(t0_, now);
uint32_t timer_interval_milliseconds = 0;
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,