summaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/CMakeLists.txt4
-rw-r--r--libtransport/src/core/name.cc41
-rw-r--r--libtransport/src/implementation/socket_producer.h8
-rw-r--r--libtransport/src/protocols/CMakeLists.txt3
-rw-r--r--libtransport/src/protocols/cbr.cc12
-rw-r--r--libtransport/src/protocols/cbr.h5
-rw-r--r--libtransport/src/protocols/raaqm.cc289
-rw-r--r--libtransport/src/protocols/raaqm.h55
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc10
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h10
-rw-r--r--libtransport/src/protocols/raaqm_transport_algorithm.cc255
-rw-r--r--libtransport/src/protocols/raaqm_transport_algorithm.h131
-rw-r--r--libtransport/src/protocols/transport_algorithm.cc82
13 files changed, 371 insertions, 534 deletions
diff --git a/libtransport/src/CMakeLists.txt b/libtransport/src/CMakeLists.txt
index f1544cf8d..c10f3da5a 100644
--- a/libtransport/src/CMakeLists.txt
+++ b/libtransport/src/CMakeLists.txt
@@ -68,7 +68,7 @@ if (DISABLE_SHARED_LIBRARIES)
DEPENDS ${DEPENDENCIES}
COMPONENT lib${LIBTRANSPORT}
INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
- HEADER_ROOT_DIR hicn/transport
+ INSTALL_ROOT_DIR hicn/transport
DEFINITIONS ${COMPILER_DEFINITIONS}
)
else ()
@@ -80,7 +80,7 @@ else ()
DEPENDS ${DEPENDENCIES}
COMPONENT lib${LIBTRANSPORT}
INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS}
- HEADER_ROOT_DIR hicn/transport
+ INSTALL_ROOT_DIR hicn/transport
DEFINITIONS ${COMPILER_DEFINITIONS}
)
endif ()
diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc
index 3455460fc..811e93b87 100644
--- a/libtransport/src/core/name.cc
+++ b/libtransport/src/core/name.cc
@@ -13,13 +13,14 @@
* limitations under the License.
*/
-#include <core/manifest_format.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/errors/tokenizer_exception.h>
#include <hicn/transport/utils/hash.h>
#include <hicn/transport/utils/string_tokenizer.h>
+#include <core/manifest_format.h>
+
namespace transport {
namespace core {
@@ -28,19 +29,28 @@ Name::Name() { name_ = {}; }
Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix)
: name_({}) {
+ name_.type = HNT_UNSPEC;
+ std::size_t length;
+ uint8_t *dst = NULL;
+
if (family == AF_INET) {
- name_.prefix.ip4.as_u32 = *(u32 *)(ip_address);
+ dst = name_.ip4.prefix_as_u8;
+ length = IPV4_ADDR_LEN;
+ name_.type = HNT_CONTIGUOUS_V4;
} else if (family == AF_INET6) {
- std::memcpy(&name_.prefix.ip6.as_u64[0], ip_address, IPV6_ADDR_LEN);
+ dst = name_.ip6.prefix_as_u8;
+ length = IPV6_ADDR_LEN;
+ name_.type = HNT_CONTIGUOUS_V6;
} else {
throw errors::RuntimeException("Specified name family does not exist.");
}
- name_.suffix = suffix;
+ std::memcpy(dst, ip_address, length);
+ *reinterpret_cast<std::uint32_t *>(dst + length) = suffix;
}
Name::Name(const char *name, uint32_t segment) {
- name_ = {};
+ name_.type = HNT_UNSPEC;
if (hicn_name_create(name, segment, &name_) < 0) {
throw errors::InvalidIpAddressException();
}
@@ -50,7 +60,7 @@ Name::Name(const std::string &uri, uint32_t segment)
: Name(uri.c_str(), segment) {}
Name::Name(const std::string &uri) {
- name_ = {};
+ name_.type = HNT_UNSPEC;
utils::StringTokenizer tokenizer(uri, "|");
std::string ip_address;
std::string seq_number;
@@ -115,7 +125,9 @@ uint32_t Name::getHash32(bool consider_suffix) const {
return hash;
}
-void Name::clear() { name_ = {}; };
+void Name::clear() { name_.type = HNT_UNSPEC; };
+
+Name::Type Name::getType() const { return name_.type; }
uint32_t Name::getSuffix() const {
uint32_t ret = 0;
@@ -136,7 +148,20 @@ Name &Name::setSuffix(uint32_t seq_number) {
}
std::shared_ptr<Sockaddr> Name::getAddress() const {
- Sockaddr *ret = (Sockaddr *)(new sockaddr_storage());
+ Sockaddr *ret = nullptr;
+
+ switch (name_.type) {
+ case HNT_CONTIGUOUS_V4:
+ case HNT_IOV_V4:
+ ret = (Sockaddr *)new Sockaddr4;
+ break;
+ case HNT_CONTIGUOUS_V6:
+ case HNT_IOV_V6:
+ ret = (Sockaddr *)new Sockaddr6;
+ break;
+ default:
+ throw errors::MalformedNameException();
+ }
if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) {
throw errors::MalformedNameException();
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 574723607..a6f0f969e 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -134,12 +134,16 @@ class ProducerSocket : public Socket<BasePortal>,
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
- if (content_name.isIp4()) {
+ if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
+ content_name.getType() == HNT_IOV_V4) {
hf_format = core::Packet::Format::HF_INET_TCP;
hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
- } else {
+ } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
+ content_name.getType() == HNT_IOV_V6) {
hf_format = core::Packet::Format::HF_INET6_TCP;
hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
+ } else {
+ throw errors::RuntimeException("Unknown name format.");
}
format = hf_format;
diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt
index 6dc1a0737..8bfbdd6ad 100644
--- a/libtransport/src/protocols/CMakeLists.txt
+++ b/libtransport/src/protocols/CMakeLists.txt
@@ -32,7 +32,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.h
)
list(APPEND SOURCE_FILES
@@ -51,8 +50,6 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/transport_algorithm.cc
)
set(RAAQM_CONFIG_INSTALL_PREFIX
diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc
index fc8b53b8d..0bffd7d18 100644
--- a/libtransport/src/protocols/cbr.cc
+++ b/libtransport/src/protocols/cbr.cc
@@ -34,6 +34,18 @@ void CbrTransportProtocol::reset() {
current_window_size_);
}
+void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {}
+
+void CbrTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ auto segment = content_object.getName().getSuffix();
+ auto now = utils::SteadyClock::now();
+ auto rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
+ // Update stats
+ updateStats(segment, rtt.count(), now);
+}
+
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/protocols/cbr.h b/libtransport/src/protocols/cbr.h
index 32788cd05..20129f6a3 100644
--- a/libtransport/src/protocols/cbr.h
+++ b/libtransport/src/protocols/cbr.h
@@ -28,6 +28,11 @@ class CbrTransportProtocol : public RaaqmTransportProtocol {
int start() override;
void reset() override;
+
+ private:
+ void afterContentReception(const Interest &interest,
+ const ContentObject &content_object) override;
+ void afterDataUnsatisfied(uint64_t segment) override;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index cdf878328..783d6194b 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -32,36 +32,39 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
current_window_size_(1),
- raaqm_algorithm_(nullptr),
+ interests_in_flight_(0),
+ cur_path_(nullptr),
t0_(utils::SteadyClock::now()),
+ rate_estimator_(nullptr),
schedule_interests_(true) {
init();
}
-RaaqmTransportProtocol::~RaaqmTransportProtocol() {}
+RaaqmTransportProtocol::~RaaqmTransportProtocol() {
+ if (rate_estimator_) {
+ delete rate_estimator_;
+ }
+}
int RaaqmTransportProtocol::start() {
- if (!raaqm_algorithm_) {
+ if (rate_estimator_) {
+ rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
// RAAQM
double drop_factor;
double minimum_drop_probability;
uint32_t sample_number;
uint32_t interest_lifetime;
- double gamma = 0., max_window = 0., min_window = 0., beta;
socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor);
- socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
- socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY,
minimum_drop_probability);
socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER,
sample_number);
socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
interest_lifetime);
- socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
- max_window);
- socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
- min_window);
// Rate Estimation
double alpha = 0.0;
@@ -75,19 +78,20 @@ int RaaqmTransportProtocol::start() {
choice_param);
if (choice_param == 1) {
- rate_estimator_ = std::make_unique<ALaTcpEstimator>();
+ rate_estimator_ = new ALaTcpEstimator();
} else {
- rate_estimator_ =
- std::make_unique<SimpleEstimator>(alpha, batching_param);
+ rate_estimator_ = new SimpleEstimator(alpha, batching_param);
}
socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
&rate_estimator_->observer_);
- raaqm_algorithm_ = std::make_unique<RaaqmTransportAlgorithm>(
- stats_, rate_estimator_.get(), drop_factor, minimum_drop_probability,
- gamma, beta, sample_number, interest_lifetime, beta_wifi_, drop_wifi_,
- beta_lte_, drop_lte_, wifi_delay_, lte_delay_, max_window, min_window);
+ // Current path
+ auto cur_path = std::make_unique<RaaqmDataPath>(
+ drop_factor, minimum_drop_probability, interest_lifetime * 1000,
+ sample_number);
+ cur_path_ = cur_path.get();
+ path_table_[default_values::path_id] = std::move(cur_path);
}
portal_->setConsumerCallback(this);
@@ -105,7 +109,6 @@ void RaaqmTransportProtocol::reset() {
std::queue<Interest::Ptr> empty;
std::swap(interest_to_retransmit_, empty);
stats_->reset();
- raaqm_algorithm_->reset();
// Reset reassembly component
reassembly_protocol_->reInitialize();
@@ -119,13 +122,65 @@ bool RaaqmTransportProtocol::verifyKeyPackets() {
return index_manager_->onKeyToVerify();
}
+void RaaqmTransportProtocol::increaseWindow() {
+ // return;
+ double max_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
+ max_window_size);
+ if (current_window_size_ < max_window_size) {
+ double gamma = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ // return;
+ double min_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
+ min_window_size);
+ if (current_window_size_ > min_window_size) {
+ double beta = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().getSuffix());
+ rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ (int)content_object.headerSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
void RaaqmTransportProtocol::init() {
std::ifstream is(RAAQM_CONFIG_PATH);
std::string line;
raaqm_autotune_ = false;
- double default_beta = default_values::beta_value;
- double default_drop = default_values::drop_factor;
+ default_beta_ = default_values::beta_value;
+ default_drop_ = default_values::drop_factor;
beta_wifi_ = default_values::beta_value;
drop_wifi_ = default_values::drop_factor;
beta_lte_ = default_values::beta_value;
@@ -181,16 +236,17 @@ void RaaqmTransportProtocol::init() {
if (command == "beta") {
std::string tmp;
- line_s >> tmp >> default_beta;
- socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta);
+ line_s >> tmp >> default_beta_;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ default_beta_);
continue;
}
if (command == "drop") {
std::string tmp;
- line_s >> tmp >> default_drop;
+ line_s >> tmp >> default_drop_;
socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- default_drop);
+ default_drop_);
continue;
}
@@ -229,7 +285,6 @@ void RaaqmTransportProtocol::init() {
line_s >> tmp >> lte_delay_;
continue;
}
-
if (command == "alpha") {
std::string tmp;
double rate_alpha = 0.0;
@@ -288,27 +343,14 @@ void RaaqmTransportProtocol::onContentObject(
void RaaqmTransportProtocol::onContentSegment(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t suffix = content_object->getName().getSuffix();
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
// Decrease in-flight interests
interests_in_flight_--;
- // Get new value for the window
- current_window_size_ =
- raaqm_algorithm_->onContentObject(suffix, content_object->getPathLabel());
-
- // Print stats if needed
- if (*stats_summary_) {
- auto now = std::chrono::steady_clock::now();
- auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
-
- uint32_t timer_interval_milliseconds = 0;
- socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- timer_interval_milliseconds);
- if (dt.count() > timer_interval_milliseconds) {
- (*stats_summary_)(*socket_->getInterface(), *stats_);
- t0_ = now;
- }
+ // Update stats
+ if (!interest_retransmissions_[incremental_suffix & mask]) {
+ afterContentReception(*interest, *content_object);
}
index_manager_->onContentObject(std::move(interest),
@@ -354,6 +396,8 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
}
void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ checkForStalePaths();
+
const Name &n = interest->getName();
TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str());
@@ -375,6 +419,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
(*on_interest_timeout_)(*socket_->getInterface(), *interest);
}
+ afterDataUnsatisfied(segment);
+
uint32_t max_rtx = 0;
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
@@ -456,6 +502,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
// This is set to ~0 so that the next interest_retransmissions_ + 1,
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
+ interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
@@ -471,10 +518,168 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
+ rate_estimator_->onDownloadFinished();
TransportProtocol::onContentReassembled(ec);
schedule_interests_ = false;
}
+void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
+ if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
+ throw std::runtime_error("RAAQM ERROR: no current path found, exit");
+ } else {
+ auto now = utils::SteadyClock::now();
+ utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
+
+ // Update stats
+ updateStats((uint32_t)segment, rtt.count(), now);
+
+ if (rate_estimator_) {
+ rate_estimator_->onRttUpdate((double)rtt.count());
+ }
+
+ cur_path_->insertNewRtt(rtt.count(), now);
+ cur_path_->smoothTimer();
+
+ if (cur_path_->newPropagationDelayAvailable()) {
+ checkDropProbability();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ throw errors::RuntimeException("ERROR: no current path found, exit");
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ double coin = ((double)rand() / (RAND_MAX));
+ if (coin <= cur_path_->getDropProb()) {
+ decreaseWindow();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now) {
+ // Update RTT statistics
+ stats_->updateAverageRtt(rtt);
+ stats_->updateAverageWindowSize(current_window_size_);
+
+ // Call statistics callback
+ if (*stats_summary_) {
+ auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
+
+ uint32_t timer_interval_milliseconds = 0;
+ socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ timer_interval_milliseconds);
+ if (dt.count() > timer_interval_milliseconds) {
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ t0_ = now;
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updatePathTable(
+ const ContentObject &content_object) {
+ uint32_t path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
+ // Create a new path with some default param
+
+ if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
+ throw errors::RuntimeException(
+ "[RAAQM] No path initialized for path table, error could be in "
+ "default path initialization.");
+ }
+
+ // Initiate the new path default param
+ auto new_path = std::make_unique<RaaqmDataPath>(
+ *(path_table_.at(default_values::path_id)));
+
+ // Insert the new path into hash table
+ path_table_[path_id] = std::move(new_path);
+ } else {
+ throw errors::RuntimeException(
+ "UNEXPECTED ERROR: when running,current path not found.");
+ }
+ }
+
+ cur_path_ = path_table_[path_id].get();
+
+ size_t header_size = content_object.headerSize();
+ size_t data_size = content_object.payloadSize();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(header_size + data_size, data_size);
+}
+
+void RaaqmTransportProtocol::checkDropProbability() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ unsigned int max_pd = 0;
+ PathTable::iterator it;
+ for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->getPropagationDelay() > max_pd &&
+ it->second->getPropagationDelay() != UINT_MAX &&
+ !it->second->isStale()) {
+ max_pd = it->second->getPropagationDelay();
+ }
+ }
+
+ double drop_prob = 0;
+ double beta = 0;
+ if (max_pd < wifi_delay_) { // only ethernet paths
+ drop_prob = default_drop_;
+ beta = default_beta_;
+ } else if (max_pd < lte_delay_) { // at least one wifi path
+ drop_prob = drop_wifi_;
+ beta = beta_wifi_;
+ } else { // at least one lte path
+ drop_prob = drop_lte_;
+ beta = beta_lte_;
+ }
+
+ double old_drop_prob = 0;
+ double old_beta = 0;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob);
+
+ if (drop_prob == old_drop_prob && beta == old_beta) {
+ return;
+ }
+
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
+
+ for (it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->setDropProb(drop_prob);
+ }
+}
+
+void RaaqmTransportProtocol::checkForStalePaths() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ bool stale = false;
+ PathTable::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->isStale()) {
+ stale = true;
+ break;
+ }
+ }
+ if (stale) {
+ checkDropProbability();
+ }
+}
+
} // end namespace protocol
} // namespace transport
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index 2999b50cf..fce4194d4 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -20,7 +20,6 @@
#include <protocols/congestion_window_protocol.h>
#include <protocols/protocol.h>
#include <protocols/raaqm_data_path.h>
-#include <protocols/raaqm_transport_algorithm.h>
#include <protocols/rate_estimation.h>
#include <queue>
@@ -30,7 +29,8 @@ namespace transport {
namespace protocol {
-class RaaqmTransportProtocol : public TransportProtocol {
+class RaaqmTransportProtocol : public TransportProtocol,
+ public CWindowProtocol {
public:
RaaqmTransportProtocol(implementation::ConsumerSocket *icnet_socket);
@@ -51,6 +51,16 @@ class RaaqmTransportProtocol : public TransportProtocol {
using PathTable =
std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>;
+ void increaseWindow() override;
+ void decreaseWindow() override;
+
+ virtual void afterContentReception(const Interest &interest,
+ const ContentObject &content_object);
+ virtual void afterDataUnsatisfied(uint64_t segment);
+
+ virtual void updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now);
+
private:
void init();
@@ -76,21 +86,48 @@ class RaaqmTransportProtocol : public TransportProtocol {
void updateRtt(uint64_t segment);
+ void RAAQM();
+
+ void updatePathTable(const ContentObject &content_object);
+
+ void checkDropProbability();
+
+ void checkForStalePaths();
+
+ void printRtt();
+
protected:
- std::queue<Interest::Ptr> interest_to_retransmit_;
- std::array<std::uint32_t, buffer_size> interest_retransmissions_;
- uint32_t interests_in_flight_;
+ // Congestion window management
double current_window_size_;
+ // Protocol management
+ uint64_t interests_in_flight_;
+ std::array<std::uint32_t, buffer_size> interest_retransmissions_;
+ std::array<utils::TimePoint, buffer_size> interest_timepoints_;
+ std::queue<Interest::Ptr> interest_to_retransmit_;
private:
- std::unique_ptr<TransportAlgorithm> raaqm_algorithm_;
- std::unique_ptr<IcnRateEstimator> rate_estimator_;
+ /**
+ * Current download path
+ */
+ RaaqmDataPath *cur_path_;
+
+ /**
+ * Hash table for path: each entry is a pair path ID(key) - path object
+ */
+ PathTable path_table_;
+
// TimePoints for statistic
utils::TimePoint t0_;
- // Temporary placeholder for RAAQM algorithm
- // parameters
+ bool set_interest_filter_;
+
+ // for rate-estimation at packet level
+ IcnRateEstimator *rate_estimator_;
+
+ // params for autotuning
bool raaqm_autotune_;
+ double default_beta_;
+ double default_drop_;
double beta_wifi_;
double drop_wifi_;
double beta_lte_;
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 499b579f3..8bbbadcf2 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/utils/chrono_typedefs.h>
+
#include <protocols/raaqm_data_path.h>
namespace transport {
@@ -65,6 +66,15 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt,
return *this;
}
+RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size) {
+ packets_received_++;
+ m_packets_bytes_received_ += packet_size;
+ raw_data_bytes_received_ += data_size;
+
+ return *this;
+}
+
double RaaqmDataPath::getDropFactor() { return drop_factor_; }
double RaaqmDataPath::getDropProb() { return drop_prob_; }
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index d1234e743..3f037bc76 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -16,6 +16,7 @@
#pragma once
#include <hicn/transport/utils/chrono_typedefs.h>
+
#include <utils/min_filter.h>
#include <chrono>
@@ -47,6 +48,15 @@ class RaaqmDataPath {
RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now);
/**
+ * @brief Update the path statistics
+ * @param packet_size the size of the packet received, including the ICN
+ * header
+ * @param data_size the size of the data received, without the ICN header
+ */
+ RaaqmDataPath &updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size);
+
+ /**
* @brief Get the value of the drop factor parameter
*/
double getDropFactor();
diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.cc b/libtransport/src/protocols/raaqm_transport_algorithm.cc
deleted file mode 100644
index 09e3a47ab..000000000
--- a/libtransport/src/protocols/raaqm_transport_algorithm.cc
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <protocols/raaqm_transport_algorithm.h>
-
-namespace transport {
-namespace protocol {
-
-RaaqmTransportAlgorithm::RaaqmTransportAlgorithm(
- interface::TransportStatistics *stats, IcnRateEstimator *rate_estimator,
- double drop_factor, double minimum_drop_probability, double gamma,
- double beta, uint32_t sample_number, uint32_t interest_lifetime,
- double beta_wifi, double drop_wifi, double beta_lte, double drop_lte,
- unsigned int wifi_delay, unsigned int lte_delay, double max_window,
- double min_window)
- : current_window_size_(1),
- cur_path_(nullptr),
- rate_estimator_(rate_estimator),
- stats_(stats),
- drop_factor_(drop_factor),
- minimum_drop_probability_(minimum_drop_probability),
- gamma_(gamma),
- beta_(beta),
- sample_number_(sample_number),
- interest_lifetime_(interest_lifetime),
- beta_wifi_(beta_wifi),
- drop_wifi_(drop_wifi),
- beta_lte_(beta_lte),
- drop_lte_(drop_lte),
- wifi_delay_(wifi_delay),
- lte_delay_(lte_delay),
- max_window_(max_window),
- min_window_(min_window) {}
-
-RaaqmTransportAlgorithm::~RaaqmTransportAlgorithm() {}
-
-void RaaqmTransportAlgorithm::reset() {
- if (rate_estimator_) {
- rate_estimator_->onStart();
- }
-
- if (!cur_path_) {
- // Current path
- auto cur_path = std::make_unique<RaaqmDataPath>(
- drop_factor_, minimum_drop_probability_, interest_lifetime_ * 1000,
- sample_number_);
- cur_path_ = cur_path.get();
- path_table_[interface::default_values::path_id] = std::move(cur_path);
- }
-}
-
-void RaaqmTransportAlgorithm::increaseWindow() {
- if (current_window_size_ < max_window_) {
- current_window_size_ += gamma_ / current_window_size_;
- }
-
- if (rate_estimator_) {
- rate_estimator_->onWindowIncrease(current_window_size_);
- }
-}
-
-void RaaqmTransportAlgorithm::decreaseWindow() {
- if (current_window_size_ > min_window_) {
- current_window_size_ = current_window_size_ * beta_;
- if (current_window_size_ < min_window_) {
- current_window_size_ = min_window_;
- }
- }
-
- if (rate_estimator_) {
- rate_estimator_->onWindowDecrease(current_window_size_);
- }
-}
-
-void RaaqmTransportAlgorithm::updateRtt(uint32_t suffix) {
- if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
- throw std::runtime_error("RAAQM ERROR: no current path found, exit");
- } else {
- auto now = utils::SteadyClock::now();
- utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
- now - interest_timepoints_[suffix & mask]);
-
- updateStats(suffix, rtt.count(), now);
-
- if (rate_estimator_) {
- rate_estimator_->onRttUpdate((double)rtt.count());
- }
-
- cur_path_->insertNewRtt(rtt.count(), now);
- cur_path_->smoothTimer();
-
- if (cur_path_->newPropagationDelayAvailable()) {
- checkDropProbability();
- }
- }
-}
-
-void RaaqmTransportAlgorithm::RAAQM() {
- if (!cur_path_) {
- throw errors::RuntimeException("ERROR: no current path found, exit");
- exit(EXIT_FAILURE);
- } else {
- // Change drop probability according to RTT statistics
- cur_path_->updateDropProb();
-
- double coin = ((double)rand() / (RAND_MAX));
- if (coin <= cur_path_->getDropProb()) {
- decreaseWindow();
- }
- }
-}
-
-void RaaqmTransportAlgorithm::updatePathTable(uint32_t path_label) {
- uint32_t path_id = path_label;
-
- if (path_table_.find(path_id) == path_table_.end()) {
- if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
- // Create a new path with some default param
-
- if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
- throw errors::RuntimeException(
- "[RAAQM] No path initialized for path table, error could be in "
- "default path initialization.");
- }
-
- // Initiate the new path default param
- auto new_path = std::make_unique<RaaqmDataPath>(
- *(path_table_.at(interface::default_values::path_id)));
-
- // Insert the new path into hash table
- path_table_[path_id] = std::move(new_path);
- } else {
- throw errors::RuntimeException(
- "UNEXPECTED ERROR: when running,current path not found.");
- }
- }
-
- cur_path_ = path_table_[path_id].get();
-}
-
-void RaaqmTransportAlgorithm::checkDropProbability() {
- if (!raaqm_autotune_) {
- return;
- }
-
- unsigned int max_pd = 0;
- PathTable::iterator it;
- for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
- if (it->second->getPropagationDelay() > max_pd &&
- it->second->getPropagationDelay() != UINT_MAX &&
- !it->second->isStale()) {
- max_pd = it->second->getPropagationDelay();
- }
- }
-
- double drop_prob = 0;
- double beta = 0;
- if (max_pd < wifi_delay_) { // only ethernet paths
- drop_prob = drop_factor_;
- beta = beta_;
- } else if (max_pd < lte_delay_) { // at least one wifi path
- drop_prob = drop_wifi_;
- beta = beta_wifi_;
- } else { // at least one lte path
- drop_prob = drop_lte_;
- beta = beta_lte_;
- }
-
- double old_drop_prob = 0;
- double old_beta = 0;
- // socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
- // socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- // old_drop_prob);
-
- if (drop_prob == old_drop_prob && beta == old_beta) {
- return;
- }
-
- // socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
- // socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
-
- for (it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->setDropProb(drop_prob);
- }
-}
-
-void RaaqmTransportAlgorithm::checkForStalePaths() {
- if (!raaqm_autotune_) {
- return;
- }
-
- bool stale = false;
- PathTable::iterator it;
- for (it = path_table_.begin(); it != path_table_.end(); ++it) {
- if (it->second->isStale()) {
- stale = true;
- break;
- }
- }
- if (stale) {
- checkDropProbability();
- }
-}
-
-void RaaqmTransportAlgorithm::updateStats(uint32_t suffix, uint64_t rtt,
- utils::TimePoint &now) {
- // Update RTT statistics
- if (stats_) {
- stats_->updateAverageRtt(rtt);
- stats_->updateAverageWindowSize(current_window_size_);
- }
-}
-
-uint32_t RaaqmTransportAlgorithm::onContentObject(uint32_t suffix,
- uint32_t path_label) {
- updatePathTable(path_label);
- increaseWindow();
- updateRtt(suffix);
-
- // Set drop probablility and window size accordingly
- RAAQM();
-
- return current_window_size_;
-}
-
-uint32_t RaaqmTransportAlgorithm::onInterestTimeout(uint32_t suffix) {
- checkForStalePaths();
- // Decrease the window because the timeout happened
- decreaseWindow();
-
- return current_window_size_;
-}
-
-void RaaqmTransportAlgorithm::onInterestSent(uint32_t suffix) {
- interest_timepoints_[suffix & mask] = utils::SteadyClock::now();
-}
-
-void RaaqmTransportAlgorithm::sessionEnd() {
- rate_estimator_->onDownloadFinished();
-}
-
-} // namespace protocol
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.h b/libtransport/src/protocols/raaqm_transport_algorithm.h
deleted file mode 100644
index eaa65d2a6..000000000
--- a/libtransport/src/protocols/raaqm_transport_algorithm.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-#include <hicn/transport/interfaces/statistics.h>
-#include <hicn/transport/protocols/transport_algorithm.h>
-#include <hicn/transport/utils/chrono_typedefs.h>
-#include <protocols/congestion_window_protocol.h>
-#include <protocols/raaqm_data_path.h>
-#include <protocols/rate_estimation.h>
-
-#include <array>
-#include <queue>
-#include <unordered_map>
-#include <vector>
-
-namespace transport {
-
-namespace protocol {
-
-class RaaqmTransportAlgorithm : public TransportAlgorithm,
- public CWindowProtocol {
- public:
- // TODO: Add windows size and other beta/drop parameters
- RaaqmTransportAlgorithm(interface::TransportStatistics *stats,
- IcnRateEstimator *rate_estimator, double drop_factor,
- double minimum_drop_probability, double gamma,
- double beta, uint32_t sample_number,
- uint32_t interest_lifetime, double beta_wifi,
- double drop_wifi, double beta_lte, double drop_lte,
- unsigned int wifi_delay, unsigned int lte_delay,
- double max_window, double min_window);
-
- ~RaaqmTransportAlgorithm() override;
-
- void reset() override;
-
- uint32_t onContentObject(uint32_t suffix, uint32_t path_label) override;
-
- uint32_t onInterestTimeout(uint32_t suffix) override;
-
- void onInterestSent(uint32_t suffix) override;
-
- void sessionEnd() override;
-
- protected:
- static constexpr uint32_t buffer_size =
- 1 << interface::default_values::log_2_default_buffer_size;
- static constexpr uint16_t mask = buffer_size - 1;
- using PathTable =
- std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>;
-
- void increaseWindow() override;
- void decreaseWindow() override;
-
- virtual void updateStats(uint32_t suffix, uint64_t rtt,
- utils::TimePoint &now);
-
- private:
- void init();
-
- void updateRtt(uint32_t suffix);
-
- void RAAQM();
-
- void updatePathTable(uint32_t path_label);
-
- void checkDropProbability();
-
- void checkForStalePaths();
-
- protected:
- // Congestion window management
- double current_window_size_;
- // Protocol management
- uint64_t interests_in_flight_;
- std::array<utils::TimePoint, buffer_size> interest_timepoints_;
-
- private:
- /**
- * Current download path
- */
- RaaqmDataPath *cur_path_;
-
- /**
- * Hash table for path: each entry is a pair path ID(key) - path object
- */
- PathTable path_table_;
-
- // for rate-estimation at packet level
- IcnRateEstimator *rate_estimator_;
- interface::TransportStatistics *stats_;
-
- bool set_interest_filter_;
-
- // Params
- double drop_factor_;
- double minimum_drop_probability_;
- double gamma_;
- double beta_;
- uint32_t sample_number_;
- uint32_t interest_lifetime_;
-
- bool raaqm_autotune_;
- double beta_wifi_;
- double drop_wifi_;
- double beta_lte_;
- double drop_lte_;
- unsigned int wifi_delay_;
- unsigned int lte_delay_;
- double max_window_;
- double min_window_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/protocols/transport_algorithm.cc b/libtransport/src/protocols/transport_algorithm.cc
deleted file mode 100644
index 37dbf5453..000000000
--- a/libtransport/src/protocols/transport_algorithm.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/transport_algorithm.h>
-#include <hicn/transport/utils/log.h>
-#include <protocols/raaqm_transport_algorithm.h>
-
-#include <stdexcept>
-
-namespace {
-allocator_t *algorithm_allocator = nullptr;
-deallocator_t *algorithm_deallocator = nullptr;
-
-void *custom_allocate(size_t size) {
- void *ret;
- if (algorithm_allocator) {
- ret = algorithm_allocator(size);
- } else {
- ret = new uint8_t[size];
- }
-
- return ret;
-}
-
-void custom_deallocate(void *p) {
- if (algorithm_deallocator) {
- algorithm_deallocator(p);
- } else {
- delete[](char *)(p);
- }
-}
-
-} // namespace
-
-extern "C" void transportAlgorithm_Init(allocator_t *allocator,
- deallocator_t *deallocator) {
- algorithm_allocator = allocator;
- algorithm_deallocator = deallocator;
-}
-
-extern "C" void transportAlgorithm_Destroy(TransportAlgorithm *algorithm) {
- custom_deallocate(algorithm);
-}
-
-extern "C" TransportAlgorithm *transportAlgorithm_CreateRaaqm(
- double drop_factor, double minimum_drop_probability, double gamma,
- double beta, uint32_t sample_number, uint32_t interest_lifetime,
- double beta_wifi, double drop_wifi, double beta_lte, double drop_lte,
- unsigned int wifi_delay, unsigned int lte_delay, double max_window,
- double min_window) {
- TransportAlgorithm *ret = nullptr;
- ret = new (
- custom_allocate(sizeof(transport::protocol::RaaqmTransportAlgorithm)))
- transport::protocol::RaaqmTransportAlgorithm(
- nullptr, nullptr, drop_factor, minimum_drop_probability, gamma, beta,
- sample_number, interest_lifetime, beta_wifi, drop_wifi, beta_lte,
- drop_lte, wifi_delay, lte_delay, max_window, min_window);
-
- return ret;
-}
-
-extern "C" uint32_t transportAlgorithm_OnContentObject(
- TransportAlgorithm *algorithm, uint32_t suffix, uint32_t path_label) {
- return algorithm->onContentObject(suffix, path_label);
-}
-
-extern "C" uint32_t transportAlgorithm_OnInterestTimeout(
- TransportAlgorithm *algorithm, uint32_t suffix) {
- return algorithm->onInterestTimeout(suffix);
-} \ No newline at end of file