diff options
Diffstat (limited to 'libtransport/src')
-rw-r--r-- | libtransport/src/CMakeLists.txt | 4 | ||||
-rw-r--r-- | libtransport/src/core/name.cc | 41 | ||||
-rw-r--r-- | libtransport/src/implementation/socket_producer.h | 8 | ||||
-rw-r--r-- | libtransport/src/protocols/CMakeLists.txt | 3 | ||||
-rw-r--r-- | libtransport/src/protocols/cbr.cc | 12 | ||||
-rw-r--r-- | libtransport/src/protocols/cbr.h | 5 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm.cc | 289 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm.h | 55 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm_data_path.cc | 10 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm_data_path.h | 10 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm_transport_algorithm.cc | 255 | ||||
-rw-r--r-- | libtransport/src/protocols/raaqm_transport_algorithm.h | 131 | ||||
-rw-r--r-- | libtransport/src/protocols/transport_algorithm.cc | 82 |
13 files changed, 371 insertions, 534 deletions
diff --git a/libtransport/src/CMakeLists.txt b/libtransport/src/CMakeLists.txt index f1544cf8d..c10f3da5a 100644 --- a/libtransport/src/CMakeLists.txt +++ b/libtransport/src/CMakeLists.txt @@ -68,7 +68,7 @@ if (DISABLE_SHARED_LIBRARIES) DEPENDS ${DEPENDENCIES} COMPONENT lib${LIBTRANSPORT} INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} - HEADER_ROOT_DIR hicn/transport + INSTALL_ROOT_DIR hicn/transport DEFINITIONS ${COMPILER_DEFINITIONS} ) else () @@ -80,7 +80,7 @@ else () DEPENDS ${DEPENDENCIES} COMPONENT lib${LIBTRANSPORT} INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} - HEADER_ROOT_DIR hicn/transport + INSTALL_ROOT_DIR hicn/transport DEFINITIONS ${COMPILER_DEFINITIONS} ) endif () diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc index 3455460fc..811e93b87 100644 --- a/libtransport/src/core/name.cc +++ b/libtransport/src/core/name.cc @@ -13,13 +13,14 @@ * limitations under the License. */ -#include <core/manifest_format.h> #include <hicn/transport/core/name.h> #include <hicn/transport/errors/errors.h> #include <hicn/transport/errors/tokenizer_exception.h> #include <hicn/transport/utils/hash.h> #include <hicn/transport/utils/string_tokenizer.h> +#include <core/manifest_format.h> + namespace transport { namespace core { @@ -28,19 +29,28 @@ Name::Name() { name_ = {}; } Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix) : name_({}) { + name_.type = HNT_UNSPEC; + std::size_t length; + uint8_t *dst = NULL; + if (family == AF_INET) { - name_.prefix.ip4.as_u32 = *(u32 *)(ip_address); + dst = name_.ip4.prefix_as_u8; + length = IPV4_ADDR_LEN; + name_.type = HNT_CONTIGUOUS_V4; } else if (family == AF_INET6) { - std::memcpy(&name_.prefix.ip6.as_u64[0], ip_address, IPV6_ADDR_LEN); + dst = name_.ip6.prefix_as_u8; + length = IPV6_ADDR_LEN; + name_.type = HNT_CONTIGUOUS_V6; } else { throw errors::RuntimeException("Specified name family does not exist."); } - name_.suffix = suffix; + std::memcpy(dst, ip_address, length); + *reinterpret_cast<std::uint32_t *>(dst + length) = suffix; } Name::Name(const char *name, uint32_t segment) { - name_ = {}; + name_.type = HNT_UNSPEC; if (hicn_name_create(name, segment, &name_) < 0) { throw errors::InvalidIpAddressException(); } @@ -50,7 +60,7 @@ Name::Name(const std::string &uri, uint32_t segment) : Name(uri.c_str(), segment) {} Name::Name(const std::string &uri) { - name_ = {}; + name_.type = HNT_UNSPEC; utils::StringTokenizer tokenizer(uri, "|"); std::string ip_address; std::string seq_number; @@ -115,7 +125,9 @@ uint32_t Name::getHash32(bool consider_suffix) const { return hash; } -void Name::clear() { name_ = {}; }; +void Name::clear() { name_.type = HNT_UNSPEC; }; + +Name::Type Name::getType() const { return name_.type; } uint32_t Name::getSuffix() const { uint32_t ret = 0; @@ -136,7 +148,20 @@ Name &Name::setSuffix(uint32_t seq_number) { } std::shared_ptr<Sockaddr> Name::getAddress() const { - Sockaddr *ret = (Sockaddr *)(new sockaddr_storage()); + Sockaddr *ret = nullptr; + + switch (name_.type) { + case HNT_CONTIGUOUS_V4: + case HNT_IOV_V4: + ret = (Sockaddr *)new Sockaddr4; + break; + case HNT_CONTIGUOUS_V6: + case HNT_IOV_V6: + ret = (Sockaddr *)new Sockaddr6; + break; + default: + throw errors::MalformedNameException(); + } if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) { throw errors::MalformedNameException(); diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 574723607..a6f0f969e 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -134,12 +134,16 @@ class ProducerSocket : public Socket<BasePortal>, core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; - if (content_name.isIp4()) { + if (content_name.getType() == HNT_CONTIGUOUS_V4 || + content_name.getType() == HNT_IOV_V4) { hf_format = core::Packet::Format::HF_INET_TCP; hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; - } else { + } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || + content_name.getType() == HNT_IOV_V6) { hf_format = core::Packet::Format::HF_INET6_TCP; hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; + } else { + throw errors::RuntimeException("Unknown name format."); } format = hf_format; diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt index 6dc1a0737..8bfbdd6ad 100644 --- a/libtransport/src/protocols/CMakeLists.txt +++ b/libtransport/src/protocols/CMakeLists.txt @@ -32,7 +32,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.h ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.h ) list(APPEND SOURCE_FILES @@ -51,8 +50,6 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_transport_algorithm.cc - ${CMAKE_CURRENT_SOURCE_DIR}/transport_algorithm.cc ) set(RAAQM_CONFIG_INSTALL_PREFIX diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc index fc8b53b8d..0bffd7d18 100644 --- a/libtransport/src/protocols/cbr.cc +++ b/libtransport/src/protocols/cbr.cc @@ -34,6 +34,18 @@ void CbrTransportProtocol::reset() { current_window_size_); } +void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} + +void CbrTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + auto segment = content_object.getName().getSuffix(); + auto now = utils::SteadyClock::now(); + auto rtt = std::chrono::duration_cast<utils::Microseconds>( + now - interest_timepoints_[segment & mask]); + // Update stats + updateStats(segment, rtt.count(), now); +} + } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/protocols/cbr.h b/libtransport/src/protocols/cbr.h index 32788cd05..20129f6a3 100644 --- a/libtransport/src/protocols/cbr.h +++ b/libtransport/src/protocols/cbr.h @@ -28,6 +28,11 @@ class CbrTransportProtocol : public RaaqmTransportProtocol { int start() override; void reset() override; + + private: + void afterContentReception(const Interest &interest, + const ContentObject &content_object) override; + void afterDataUnsatisfied(uint64_t segment) override; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index cdf878328..783d6194b 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -32,36 +32,39 @@ RaaqmTransportProtocol::RaaqmTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), - raaqm_algorithm_(nullptr), + interests_in_flight_(0), + cur_path_(nullptr), t0_(utils::SteadyClock::now()), + rate_estimator_(nullptr), schedule_interests_(true) { init(); } -RaaqmTransportProtocol::~RaaqmTransportProtocol() {} +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (rate_estimator_) { + delete rate_estimator_; + } +} int RaaqmTransportProtocol::start() { - if (!raaqm_algorithm_) { + if (rate_estimator_) { + rate_estimator_->onStart(); + } + + if (!cur_path_) { // RAAQM double drop_factor; double minimum_drop_probability; uint32_t sample_number; uint32_t interest_lifetime; - double gamma = 0., max_window = 0., min_window = 0., beta; socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, minimum_drop_probability); socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, sample_number); socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interest_lifetime); - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window); - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window); // Rate Estimation double alpha = 0.0; @@ -75,19 +78,20 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - rate_estimator_ = std::make_unique<ALaTcpEstimator>(); + rate_estimator_ = new ALaTcpEstimator(); } else { - rate_estimator_ = - std::make_unique<SimpleEstimator>(alpha, batching_param); + rate_estimator_ = new SimpleEstimator(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, &rate_estimator_->observer_); - raaqm_algorithm_ = std::make_unique<RaaqmTransportAlgorithm>( - stats_, rate_estimator_.get(), drop_factor, minimum_drop_probability, - gamma, beta, sample_number, interest_lifetime, beta_wifi_, drop_wifi_, - beta_lte_, drop_lte_, wifi_delay_, lte_delay_, max_window, min_window); + // Current path + auto cur_path = std::make_unique<RaaqmDataPath>( + drop_factor, minimum_drop_probability, interest_lifetime * 1000, + sample_number); + cur_path_ = cur_path.get(); + path_table_[default_values::path_id] = std::move(cur_path); } portal_->setConsumerCallback(this); @@ -105,7 +109,6 @@ void RaaqmTransportProtocol::reset() { std::queue<Interest::Ptr> empty; std::swap(interest_to_retransmit_, empty); stats_->reset(); - raaqm_algorithm_->reset(); // Reset reassembly component reassembly_protocol_->reInitialize(); @@ -119,13 +122,65 @@ bool RaaqmTransportProtocol::verifyKeyPackets() { return index_manager_->onKeyToVerify(); } +void RaaqmTransportProtocol::increaseWindow() { + // return; + double max_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window_size); + if (current_window_size_ < max_window_size) { + double gamma = 0.; + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + // return; + double min_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window_size); + if (current_window_size_ > min_window_size) { + double beta = 0.; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().getSuffix()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + void RaaqmTransportProtocol::init() { std::ifstream is(RAAQM_CONFIG_PATH); std::string line; raaqm_autotune_ = false; - double default_beta = default_values::beta_value; - double default_drop = default_values::drop_factor; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; beta_wifi_ = default_values::beta_value; drop_wifi_ = default_values::drop_factor; beta_lte_ = default_values::beta_value; @@ -181,16 +236,17 @@ void RaaqmTransportProtocol::init() { if (command == "beta") { std::string tmp; - line_s >> tmp >> default_beta; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta); + line_s >> tmp >> default_beta_; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, + default_beta_); continue; } if (command == "drop") { std::string tmp; - line_s >> tmp >> default_drop; + line_s >> tmp >> default_drop_; socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop); + default_drop_); continue; } @@ -229,7 +285,6 @@ void RaaqmTransportProtocol::init() { line_s >> tmp >> lte_delay_; continue; } - if (command == "alpha") { std::string tmp; double rate_alpha = 0.0; @@ -288,27 +343,14 @@ void RaaqmTransportProtocol::onContentObject( void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t suffix = content_object->getName().getSuffix(); + uint32_t incremental_suffix = content_object->getName().getSuffix(); // Decrease in-flight interests interests_in_flight_--; - // Get new value for the window - current_window_size_ = - raaqm_algorithm_->onContentObject(suffix, content_object->getPathLabel()); - - // Print stats if needed - if (*stats_summary_) { - auto now = std::chrono::steady_clock::now(); - auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_summary_)(*socket_->getInterface(), *stats_); - t0_ = now; - } + // Update stats + if (!interest_retransmissions_[incremental_suffix & mask]) { + afterContentReception(*interest, *content_object); } index_manager_->onContentObject(std::move(interest), @@ -354,6 +396,8 @@ void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { } void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { + checkForStalePaths(); + const Name &n = interest->getName(); TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); @@ -375,6 +419,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { (*on_interest_timeout_)(*socket_->getInterface(), *interest); } + afterDataUnsatisfied(segment); + uint32_t max_rtx = 0; socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); @@ -456,6 +502,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { // This is set to ~0 so that the next interest_retransmissions_ + 1, // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; + interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -471,10 +518,168 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { + rate_estimator_->onDownloadFinished(); TransportProtocol::onContentReassembled(ec); schedule_interests_ = false; } +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { + throw std::runtime_error("RAAQM ERROR: no current path found, exit"); + } else { + auto now = utils::SteadyClock::now(); + utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( + now - interest_timepoints_[segment & mask]); + + // Update stats + updateStats((uint32_t)segment, rtt.count(), now); + + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); + } + + cur_path_->insertNewRtt(rtt.count(), now); + cur_path_->smoothTimer(); + + if (cur_path_->newPropagationDelayAvailable()) { + checkDropProbability(); + } + } +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + double coin = ((double)rand() / (RAND_MAX)); + if (coin <= cur_path_->getDropProb()) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now) { + // Update RTT statistics + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); + + // Call statistics callback + if (*stats_summary_) { + auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; + } + } +} + +void RaaqmTransportProtocol::updatePathTable( + const ContentObject &content_object) { + uint32_t path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { + // Create a new path with some default param + + if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { + throw errors::RuntimeException( + "[RAAQM] No path initialized for path table, error could be in " + "default path initialization."); + } + + // Initiate the new path default param + auto new_path = std::make_unique<RaaqmDataPath>( + *(path_table_.at(default_values::path_id))); + + // Insert the new path into hash table + path_table_[path_id] = std::move(new_path); + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id].get(); + + size_t header_size = content_object.headerSize(); + size_t data_size = content_object.payloadSize(); + + // Update measurements for path + cur_path_->updateReceivedStats(header_size + data_size, data_size); +} + +void RaaqmTransportProtocol::checkDropProbability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + PathTable::iterator it; + for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && + it->second->getPropagationDelay() != UINT_MAX && + !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { // only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { // at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { // at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::checkForStalePaths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + PathTable::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + checkDropProbability(); + } +} + } // end namespace protocol } // namespace transport diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index 2999b50cf..fce4194d4 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -20,7 +20,6 @@ #include <protocols/congestion_window_protocol.h> #include <protocols/protocol.h> #include <protocols/raaqm_data_path.h> -#include <protocols/raaqm_transport_algorithm.h> #include <protocols/rate_estimation.h> #include <queue> @@ -30,7 +29,8 @@ namespace transport { namespace protocol { -class RaaqmTransportProtocol : public TransportProtocol { +class RaaqmTransportProtocol : public TransportProtocol, + public CWindowProtocol { public: RaaqmTransportProtocol(implementation::ConsumerSocket *icnet_socket); @@ -51,6 +51,16 @@ class RaaqmTransportProtocol : public TransportProtocol { using PathTable = std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>; + void increaseWindow() override; + void decreaseWindow() override; + + virtual void afterContentReception(const Interest &interest, + const ContentObject &content_object); + virtual void afterDataUnsatisfied(uint64_t segment); + + virtual void updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now); + private: void init(); @@ -76,21 +86,48 @@ class RaaqmTransportProtocol : public TransportProtocol { void updateRtt(uint64_t segment); + void RAAQM(); + + void updatePathTable(const ContentObject &content_object); + + void checkDropProbability(); + + void checkForStalePaths(); + + void printRtt(); + protected: - std::queue<Interest::Ptr> interest_to_retransmit_; - std::array<std::uint32_t, buffer_size> interest_retransmissions_; - uint32_t interests_in_flight_; + // Congestion window management double current_window_size_; + // Protocol management + uint64_t interests_in_flight_; + std::array<std::uint32_t, buffer_size> interest_retransmissions_; + std::array<utils::TimePoint, buffer_size> interest_timepoints_; + std::queue<Interest::Ptr> interest_to_retransmit_; private: - std::unique_ptr<TransportAlgorithm> raaqm_algorithm_; - std::unique_ptr<IcnRateEstimator> rate_estimator_; + /** + * Current download path + */ + RaaqmDataPath *cur_path_; + + /** + * Hash table for path: each entry is a pair path ID(key) - path object + */ + PathTable path_table_; + // TimePoints for statistic utils::TimePoint t0_; - // Temporary placeholder for RAAQM algorithm - // parameters + bool set_interest_filter_; + + // for rate-estimation at packet level + IcnRateEstimator *rate_estimator_; + + // params for autotuning bool raaqm_autotune_; + double default_beta_; + double default_drop_; double beta_wifi_; double drop_wifi_; double beta_lte_; diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index 499b579f3..8bbbadcf2 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -14,6 +14,7 @@ */ #include <hicn/transport/utils/chrono_typedefs.h> + #include <protocols/raaqm_data_path.h> namespace transport { @@ -65,6 +66,15 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt, return *this; } +RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, + std::size_t data_size) { + packets_received_++; + m_packets_bytes_received_ += packet_size; + raw_data_bytes_received_ += data_size; + + return *this; +} + double RaaqmDataPath::getDropFactor() { return drop_factor_; } double RaaqmDataPath::getDropProb() { return drop_prob_; } diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index d1234e743..3f037bc76 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -16,6 +16,7 @@ #pragma once #include <hicn/transport/utils/chrono_typedefs.h> + #include <utils/min_filter.h> #include <chrono> @@ -47,6 +48,15 @@ class RaaqmDataPath { RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now); /** + * @brief Update the path statistics + * @param packet_size the size of the packet received, including the ICN + * header + * @param data_size the size of the data received, without the ICN header + */ + RaaqmDataPath &updateReceivedStats(std::size_t packet_size, + std::size_t data_size); + + /** * @brief Get the value of the drop factor parameter */ double getDropFactor(); diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.cc b/libtransport/src/protocols/raaqm_transport_algorithm.cc deleted file mode 100644 index 09e3a47ab..000000000 --- a/libtransport/src/protocols/raaqm_transport_algorithm.cc +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <protocols/raaqm_transport_algorithm.h> - -namespace transport { -namespace protocol { - -RaaqmTransportAlgorithm::RaaqmTransportAlgorithm( - interface::TransportStatistics *stats, IcnRateEstimator *rate_estimator, - double drop_factor, double minimum_drop_probability, double gamma, - double beta, uint32_t sample_number, uint32_t interest_lifetime, - double beta_wifi, double drop_wifi, double beta_lte, double drop_lte, - unsigned int wifi_delay, unsigned int lte_delay, double max_window, - double min_window) - : current_window_size_(1), - cur_path_(nullptr), - rate_estimator_(rate_estimator), - stats_(stats), - drop_factor_(drop_factor), - minimum_drop_probability_(minimum_drop_probability), - gamma_(gamma), - beta_(beta), - sample_number_(sample_number), - interest_lifetime_(interest_lifetime), - beta_wifi_(beta_wifi), - drop_wifi_(drop_wifi), - beta_lte_(beta_lte), - drop_lte_(drop_lte), - wifi_delay_(wifi_delay), - lte_delay_(lte_delay), - max_window_(max_window), - min_window_(min_window) {} - -RaaqmTransportAlgorithm::~RaaqmTransportAlgorithm() {} - -void RaaqmTransportAlgorithm::reset() { - if (rate_estimator_) { - rate_estimator_->onStart(); - } - - if (!cur_path_) { - // Current path - auto cur_path = std::make_unique<RaaqmDataPath>( - drop_factor_, minimum_drop_probability_, interest_lifetime_ * 1000, - sample_number_); - cur_path_ = cur_path.get(); - path_table_[interface::default_values::path_id] = std::move(cur_path); - } -} - -void RaaqmTransportAlgorithm::increaseWindow() { - if (current_window_size_ < max_window_) { - current_window_size_ += gamma_ / current_window_size_; - } - - if (rate_estimator_) { - rate_estimator_->onWindowIncrease(current_window_size_); - } -} - -void RaaqmTransportAlgorithm::decreaseWindow() { - if (current_window_size_ > min_window_) { - current_window_size_ = current_window_size_ * beta_; - if (current_window_size_ < min_window_) { - current_window_size_ = min_window_; - } - } - - if (rate_estimator_) { - rate_estimator_->onWindowDecrease(current_window_size_); - } -} - -void RaaqmTransportAlgorithm::updateRtt(uint32_t suffix) { - if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("RAAQM ERROR: no current path found, exit"); - } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( - now - interest_timepoints_[suffix & mask]); - - updateStats(suffix, rtt.count(), now); - - if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); - } - - cur_path_->insertNewRtt(rtt.count(), now); - cur_path_->smoothTimer(); - - if (cur_path_->newPropagationDelayAvailable()) { - checkDropProbability(); - } - } -} - -void RaaqmTransportAlgorithm::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - double coin = ((double)rand() / (RAND_MAX)); - if (coin <= cur_path_->getDropProb()) { - decreaseWindow(); - } - } -} - -void RaaqmTransportAlgorithm::updatePathTable(uint32_t path_label) { - uint32_t path_id = path_label; - - if (path_table_.find(path_id) == path_table_.end()) { - if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { - // Create a new path with some default param - - if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { - throw errors::RuntimeException( - "[RAAQM] No path initialized for path table, error could be in " - "default path initialization."); - } - - // Initiate the new path default param - auto new_path = std::make_unique<RaaqmDataPath>( - *(path_table_.at(interface::default_values::path_id))); - - // Insert the new path into hash table - path_table_[path_id] = std::move(new_path); - } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); - } - } - - cur_path_ = path_table_[path_id].get(); -} - -void RaaqmTransportAlgorithm::checkDropProbability() { - if (!raaqm_autotune_) { - return; - } - - unsigned int max_pd = 0; - PathTable::iterator it; - for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->getPropagationDelay() > max_pd && - it->second->getPropagationDelay() != UINT_MAX && - !it->second->isStale()) { - max_pd = it->second->getPropagationDelay(); - } - } - - double drop_prob = 0; - double beta = 0; - if (max_pd < wifi_delay_) { // only ethernet paths - drop_prob = drop_factor_; - beta = beta_; - } else if (max_pd < lte_delay_) { // at least one wifi path - drop_prob = drop_wifi_; - beta = beta_wifi_; - } else { // at least one lte path - drop_prob = drop_lte_; - beta = beta_lte_; - } - - double old_drop_prob = 0; - double old_beta = 0; - // socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); - // socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, - // old_drop_prob); - - if (drop_prob == old_drop_prob && beta == old_beta) { - return; - } - - // socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - // socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); - - for (it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->setDropProb(drop_prob); - } -} - -void RaaqmTransportAlgorithm::checkForStalePaths() { - if (!raaqm_autotune_) { - return; - } - - bool stale = false; - PathTable::iterator it; - for (it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->isStale()) { - stale = true; - break; - } - } - if (stale) { - checkDropProbability(); - } -} - -void RaaqmTransportAlgorithm::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { - // Update RTT statistics - if (stats_) { - stats_->updateAverageRtt(rtt); - stats_->updateAverageWindowSize(current_window_size_); - } -} - -uint32_t RaaqmTransportAlgorithm::onContentObject(uint32_t suffix, - uint32_t path_label) { - updatePathTable(path_label); - increaseWindow(); - updateRtt(suffix); - - // Set drop probablility and window size accordingly - RAAQM(); - - return current_window_size_; -} - -uint32_t RaaqmTransportAlgorithm::onInterestTimeout(uint32_t suffix) { - checkForStalePaths(); - // Decrease the window because the timeout happened - decreaseWindow(); - - return current_window_size_; -} - -void RaaqmTransportAlgorithm::onInterestSent(uint32_t suffix) { - interest_timepoints_[suffix & mask] = utils::SteadyClock::now(); -} - -void RaaqmTransportAlgorithm::sessionEnd() { - rate_estimator_->onDownloadFinished(); -} - -} // namespace protocol -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/raaqm_transport_algorithm.h b/libtransport/src/protocols/raaqm_transport_algorithm.h deleted file mode 100644 index eaa65d2a6..000000000 --- a/libtransport/src/protocols/raaqm_transport_algorithm.h +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket_options_default_values.h> -#include <hicn/transport/interfaces/statistics.h> -#include <hicn/transport/protocols/transport_algorithm.h> -#include <hicn/transport/utils/chrono_typedefs.h> -#include <protocols/congestion_window_protocol.h> -#include <protocols/raaqm_data_path.h> -#include <protocols/rate_estimation.h> - -#include <array> -#include <queue> -#include <unordered_map> -#include <vector> - -namespace transport { - -namespace protocol { - -class RaaqmTransportAlgorithm : public TransportAlgorithm, - public CWindowProtocol { - public: - // TODO: Add windows size and other beta/drop parameters - RaaqmTransportAlgorithm(interface::TransportStatistics *stats, - IcnRateEstimator *rate_estimator, double drop_factor, - double minimum_drop_probability, double gamma, - double beta, uint32_t sample_number, - uint32_t interest_lifetime, double beta_wifi, - double drop_wifi, double beta_lte, double drop_lte, - unsigned int wifi_delay, unsigned int lte_delay, - double max_window, double min_window); - - ~RaaqmTransportAlgorithm() override; - - void reset() override; - - uint32_t onContentObject(uint32_t suffix, uint32_t path_label) override; - - uint32_t onInterestTimeout(uint32_t suffix) override; - - void onInterestSent(uint32_t suffix) override; - - void sessionEnd() override; - - protected: - static constexpr uint32_t buffer_size = - 1 << interface::default_values::log_2_default_buffer_size; - static constexpr uint16_t mask = buffer_size - 1; - using PathTable = - std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>; - - void increaseWindow() override; - void decreaseWindow() override; - - virtual void updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now); - - private: - void init(); - - void updateRtt(uint32_t suffix); - - void RAAQM(); - - void updatePathTable(uint32_t path_label); - - void checkDropProbability(); - - void checkForStalePaths(); - - protected: - // Congestion window management - double current_window_size_; - // Protocol management - uint64_t interests_in_flight_; - std::array<utils::TimePoint, buffer_size> interest_timepoints_; - - private: - /** - * Current download path - */ - RaaqmDataPath *cur_path_; - - /** - * Hash table for path: each entry is a pair path ID(key) - path object - */ - PathTable path_table_; - - // for rate-estimation at packet level - IcnRateEstimator *rate_estimator_; - interface::TransportStatistics *stats_; - - bool set_interest_filter_; - - // Params - double drop_factor_; - double minimum_drop_probability_; - double gamma_; - double beta_; - uint32_t sample_number_; - uint32_t interest_lifetime_; - - bool raaqm_autotune_; - double beta_wifi_; - double drop_wifi_; - double beta_lte_; - double drop_lte_; - unsigned int wifi_delay_; - unsigned int lte_delay_; - double max_window_; - double min_window_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/protocols/transport_algorithm.cc b/libtransport/src/protocols/transport_algorithm.cc deleted file mode 100644 index 37dbf5453..000000000 --- a/libtransport/src/protocols/transport_algorithm.cc +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/protocols/transport_algorithm.h> -#include <hicn/transport/utils/log.h> -#include <protocols/raaqm_transport_algorithm.h> - -#include <stdexcept> - -namespace { -allocator_t *algorithm_allocator = nullptr; -deallocator_t *algorithm_deallocator = nullptr; - -void *custom_allocate(size_t size) { - void *ret; - if (algorithm_allocator) { - ret = algorithm_allocator(size); - } else { - ret = new uint8_t[size]; - } - - return ret; -} - -void custom_deallocate(void *p) { - if (algorithm_deallocator) { - algorithm_deallocator(p); - } else { - delete[](char *)(p); - } -} - -} // namespace - -extern "C" void transportAlgorithm_Init(allocator_t *allocator, - deallocator_t *deallocator) { - algorithm_allocator = allocator; - algorithm_deallocator = deallocator; -} - -extern "C" void transportAlgorithm_Destroy(TransportAlgorithm *algorithm) { - custom_deallocate(algorithm); -} - -extern "C" TransportAlgorithm *transportAlgorithm_CreateRaaqm( - double drop_factor, double minimum_drop_probability, double gamma, - double beta, uint32_t sample_number, uint32_t interest_lifetime, - double beta_wifi, double drop_wifi, double beta_lte, double drop_lte, - unsigned int wifi_delay, unsigned int lte_delay, double max_window, - double min_window) { - TransportAlgorithm *ret = nullptr; - ret = new ( - custom_allocate(sizeof(transport::protocol::RaaqmTransportAlgorithm))) - transport::protocol::RaaqmTransportAlgorithm( - nullptr, nullptr, drop_factor, minimum_drop_probability, gamma, beta, - sample_number, interest_lifetime, beta_wifi, drop_wifi, beta_lte, - drop_lte, wifi_delay, lte_delay, max_window, min_window); - - return ret; -} - -extern "C" uint32_t transportAlgorithm_OnContentObject( - TransportAlgorithm *algorithm, uint32_t suffix, uint32_t path_label) { - return algorithm->onContentObject(suffix, path_label); -} - -extern "C" uint32_t transportAlgorithm_OnInterestTimeout( - TransportAlgorithm *algorithm, uint32_t suffix) { - return algorithm->onInterestTimeout(suffix); -}
\ No newline at end of file |