From bac3da61644515f05663789b122554dc77549286 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Thu, 17 Jan 2019 13:47:57 +0100 Subject: This is the first commit of the hicn project Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f Signed-off-by: Luca Muscariello --- .../src/hicn/transport/protocols/CMakeLists.txt | 46 ++ libtransport/src/hicn/transport/protocols/cbr.cc | 47 ++ libtransport/src/hicn/transport/protocols/cbr.h | 48 ++ .../src/hicn/transport/protocols/consumer.conf | 21 + .../hicn/transport/protocols/download_observer.h | 32 + .../src/hicn/transport/protocols/protocol.cc | 45 ++ .../src/hicn/transport/protocols/protocol.h | 79 ++ libtransport/src/hicn/transport/protocols/raaqm.cc | 416 +++++++++++ libtransport/src/hicn/transport/protocols/raaqm.h | 94 +++ .../hicn/transport/protocols/raaqm_data_path.cc | 158 ++++ .../src/hicn/transport/protocols/raaqm_data_path.h | 230 ++++++ .../hicn/transport/protocols/rate_estimation.cc | 353 +++++++++ .../src/hicn/transport/protocols/rate_estimation.h | 175 +++++ libtransport/src/hicn/transport/protocols/rtc.cc | 813 +++++++++++++++++++++ libtransport/src/hicn/transport/protocols/rtc.h | 210 ++++++ .../src/hicn/transport/protocols/rtc_data_path.cc | 85 +++ .../src/hicn/transport/protocols/rtc_data_path.h | 62 ++ .../hicn/transport/protocols/test/CMakeLists.txt | 10 + .../protocols/test/test_transport_producer.cc | 80 ++ libtransport/src/hicn/transport/protocols/vegas.cc | 630 ++++++++++++++++ libtransport/src/hicn/transport/protocols/vegas.h | 161 ++++ .../transport/protocols/vegas_rto_estimator.cc | 57 ++ .../hicn/transport/protocols/vegas_rto_estimator.h | 48 ++ 23 files changed, 3900 insertions(+) create mode 100755 libtransport/src/hicn/transport/protocols/CMakeLists.txt create mode 100755 libtransport/src/hicn/transport/protocols/cbr.cc create mode 100755 libtransport/src/hicn/transport/protocols/cbr.h create mode 100755 libtransport/src/hicn/transport/protocols/consumer.conf create mode 100755 libtransport/src/hicn/transport/protocols/download_observer.h create mode 100755 libtransport/src/hicn/transport/protocols/protocol.cc create mode 100755 libtransport/src/hicn/transport/protocols/protocol.h create mode 100755 libtransport/src/hicn/transport/protocols/raaqm.cc create mode 100755 libtransport/src/hicn/transport/protocols/raaqm.h create mode 100755 libtransport/src/hicn/transport/protocols/raaqm_data_path.cc create mode 100755 libtransport/src/hicn/transport/protocols/raaqm_data_path.h create mode 100755 libtransport/src/hicn/transport/protocols/rate_estimation.cc create mode 100755 libtransport/src/hicn/transport/protocols/rate_estimation.h create mode 100755 libtransport/src/hicn/transport/protocols/rtc.cc create mode 100755 libtransport/src/hicn/transport/protocols/rtc.h create mode 100755 libtransport/src/hicn/transport/protocols/rtc_data_path.cc create mode 100755 libtransport/src/hicn/transport/protocols/rtc_data_path.h create mode 100755 libtransport/src/hicn/transport/protocols/test/CMakeLists.txt create mode 100755 libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc create mode 100755 libtransport/src/hicn/transport/protocols/vegas.cc create mode 100755 libtransport/src/hicn/transport/protocols/vegas.h create mode 100755 libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc create mode 100755 libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt new file mode 100755 index 000000000..1c3b76c24 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt @@ -0,0 +1,46 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h + ${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h + ${CMAKE_CURRENT_SOURCE_DIR}/vegas.h + ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h + ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.h + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/vegas.cc + ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc + ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc +) + +set(TRANSPORT_CONFIG + ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc new file mode 100755 index 000000000..3da4819c3 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/cbr.cc @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +using namespace interface; + +CbrTransportProtocol::CbrTransportProtocol(BaseSocket *icnet_socket) + : VegasTransportProtocol(icnet_socket) {} + +void CbrTransportProtocol::start( + utils::SharableVector &receive_buffer) { + current_window_size_ = socket_->current_window_size_; + VegasTransportProtocol::start(receive_buffer); +} + +void CbrTransportProtocol::changeInterestLifetime(uint64_t segment) { return; } + +void CbrTransportProtocol::increaseWindow() {} + +void CbrTransportProtocol::decreaseWindow() {} + +void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} + +void CbrTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) {} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h new file mode 100755 index 000000000..0a572292a --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/cbr.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +class CbrTransportProtocol : public VegasTransportProtocol { + public: + CbrTransportProtocol(interface::BaseSocket *icnet_socket); + + void start(utils::SharableVector &receive_buffer) override; + + private: + void afterContentReception(const Interest &interest, + const ContentObject &content_object) override; + + void afterDataUnsatisfied(uint64_t segment) override; + + void increaseWindow() override; + + void decreaseWindow() override; + + void changeInterestLifetime(uint64_t segment) override; +}; + +} // end namespace protocol + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/consumer.conf b/libtransport/src/hicn/transport/protocols/consumer.conf new file mode 100755 index 000000000..1a366f32f --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/consumer.conf @@ -0,0 +1,21 @@ +; this file contais the parameters for RAAQM +autotune = no +lifetime = 500 +retransmissions = 128 +beta = 0.99 +drop = 0.003 +beta_wifi_ = 0.99 +drop_wifi_ = 0.6 +beta_lte_ = 0.99 +drop_lte_ = 0.003 +wifi_delay_ = 200 +lte_delay_ = 9000 + +alpha = 0.95 +batching_parameter = 200 + +;Choice of rate estimator: +;0 --> an estimation each $(batching_parameter) packets +;1 --> an estimation "a la TCP", estimation at the end of the download of the segment + +rate_estimator = 0 diff --git a/libtransport/src/hicn/transport/protocols/download_observer.h b/libtransport/src/hicn/transport/protocols/download_observer.h new file mode 100755 index 000000000..6d24fe6fd --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/download_observer.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { + +namespace protocol { + +class IcnObserver { + public: + virtual ~IcnObserver(){}; + + virtual void notifyStats(double throughput) = 0; + virtual void notifyDownloadTime(double downloadTime) = 0; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc new file mode 100755 index 000000000..ea4fd6dbf --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +TransportProtocol::TransportProtocol(interface::BaseSocket *icn_socket) + : socket_(dynamic_cast(icn_socket)), + is_running_(false), + interest_pool_() { + // Create pool of interests + increasePoolSize(); +} + +TransportProtocol::~TransportProtocol() {} + +void TransportProtocol::updatePortal() { portal_ = socket_->portal_; } + +bool TransportProtocol::isRunning() { return is_running_; } + +void TransportProtocol::increasePoolSize(std::size_t size) { + for (std::size_t i = 0; i < size; i++) { + interest_pool_.add(new Interest()); + } +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h new file mode 100755 index 000000000..56c57e025 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace transport { + +namespace protocol { + +using namespace core; + +class TransportProtocolCallback { + virtual void onContentObject(const core::Interest &interest, + const core::ContentObject &content_object) = 0; + virtual void onTimeout(const core::Interest &interest) = 0; +}; + +class TransportProtocol : public interface::BasePortal::ConsumerCallback { + static constexpr std::size_t interest_pool_size = 4096; + + public: + TransportProtocol(interface::BaseSocket *icn_socket); + + virtual ~TransportProtocol(); + + void updatePortal(); + + bool isRunning(); + + virtual void start(utils::SharableVector &content_buffer) = 0; + + virtual void stop() = 0; + + virtual void resume() = 0; + + protected: + virtual void increasePoolSize(std::size_t size = interest_pool_size); + + TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { + auto result = interest_pool_.get(); + + while (TRANSPORT_EXPECT_FALSE(!result.first)) { + // Add packets to the pool + increasePoolSize(); + result = interest_pool_.get(); + } + + return std::move(result.second); + } + // Consumer Callback + virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; + virtual void onTimeout(Interest::Ptr &&i) = 0; + + protected: + interface::ConsumerSocket *socket_; + std::shared_ptr portal_; + volatile bool is_running_; + utils::ObjectPool interest_pool_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc new file mode 100755 index 000000000..cd22ecfdc --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -0,0 +1,416 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +namespace transport { + +namespace protocol { + +using namespace interface; + +RaaqmTransportProtocol::RaaqmTransportProtocol(BaseSocket *icnet_socket) + : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) { + init(); +} + +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (this->rate_estimator_) { + delete this->rate_estimator_; + } +} + +void RaaqmTransportProtocol::init() { + std::ifstream is(RAAQM_CONFIG_PATH); + + std::string line; + + socket_->beta_ = default_values::beta_value; + socket_->drop_factor_ = default_values::drop_factor; + socket_->interest_lifetime_ = default_values::interest_lifetime; + socket_->max_retransmissions_ = + default_values::transport_protocol_max_retransmissions; + raaqm_autotune_ = false; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; + beta_wifi_ = default_values::beta_value; + drop_wifi_ = default_values::drop_factor; + beta_lte_ = default_values::beta_value; + drop_lte_ = default_values::drop_factor; + wifi_delay_ = 1000; + lte_delay_ = 15000; + + if (!is) { + TRANSPORT_LOGW("WARNING: RAAQM parameters not found, set default values"); + return; + } + + while (getline(is, line)) { + std::string command; + std::istringstream line_s(line); + + line_s >> command; + + if (command == ";") { + continue; + } + + if (command == "autotune") { + std::string tmp; + std::string val; + line_s >> tmp >> val; + if (val == "yes") { + raaqm_autotune_ = true; + } else { + raaqm_autotune_ = false; + } + continue; + } + + if (command == "lifetime") { + std::string tmp; + uint32_t lifetime; + line_s >> tmp >> lifetime; + socket_->interest_lifetime_ = lifetime; + continue; + } + + if (command == "retransmissions") { + std::string tmp; + uint32_t rtx; + line_s >> tmp >> rtx; + socket_->max_retransmissions_ = rtx; + continue; + } + + if (command == "beta") { + std::string tmp; + line_s >> tmp >> default_beta_; + socket_->beta_ = default_beta_; + continue; + } + + if (command == "drop") { + std::string tmp; + line_s >> tmp >> default_drop_; + socket_->drop_factor_ = default_drop_; + continue; + } + + if (command == "beta_wifi_") { + std::string tmp; + line_s >> tmp >> beta_wifi_; + continue; + } + + if (command == "drop_wifi_") { + std::string tmp; + line_s >> tmp >> drop_wifi_; + continue; + } + + if (command == "beta_lte_") { + std::string tmp; + line_s >> tmp >> beta_lte_; + continue; + } + + if (command == "drop_lte_") { + std::string tmp; + line_s >> tmp >> drop_lte_; + continue; + } + + if (command == "wifi_delay_") { + std::string tmp; + line_s >> tmp >> wifi_delay_; + continue; + } + + if (command == "lte_delay_") { + std::string tmp; + line_s >> tmp >> lte_delay_; + continue; + } + if (command == "alpha") { + std::string tmp; + double rate_alpha = 0.0; + line_s >> tmp >> rate_alpha; + socket_->rate_estimation_alpha_ = rate_alpha; + continue; + } + + if (command == "batching_parameter") { + std::string tmp; + uint32_t batching_param = 0; + line_s >> tmp >> batching_param; + socket_->rate_estimation_batching_parameter_ = batching_param; + continue; + } + + if (command == "rate_estimator") { + std::string tmp; + uint32_t choice_param = 0; + line_s >> tmp >> choice_param; + socket_->rate_estimation_choice_ = choice_param; + continue; + } + } + is.close(); +} + +void RaaqmTransportProtocol::start( + utils::SharableVector &content_buffer) { + if (this->rate_estimator_) { + this->rate_estimator_->onStart(); + } + + if (!cur_path_) { + double drop_factor; + double minimum_drop_probability; + uint32_t sample_number; + uint32_t interest_lifetime; + // double beta; + + drop_factor = socket_->drop_factor_; + minimum_drop_probability = socket_->minimum_drop_probability_; + sample_number = socket_->sample_number_; + interest_lifetime = socket_->interest_lifetime_; + // beta = socket_->beta_; + + double alpha = 0.0; + uint32_t batching_param = 0; + uint32_t choice_param = 0; + alpha = socket_->rate_estimation_alpha_; + batching_param = socket_->rate_estimation_batching_parameter_; + choice_param = socket_->rate_estimation_choice_; + + if (choice_param == 1) { + this->rate_estimator_ = new ALaTcpEstimator(); + } else { + this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + } + + this->rate_estimator_->observer_ = socket_->rate_estimation_observer_; + + cur_path_ = std::make_shared( + drop_factor, minimum_drop_probability, interest_lifetime * 1000, + sample_number); + path_table_[default_values::path_id] = cur_path_; + } + + VegasTransportProtocol::start(content_buffer); +} + +void RaaqmTransportProtocol::copyContent(const ContentObject &content_object) { + if (TRANSPORT_EXPECT_FALSE( + (content_object.getName().getSuffix() == final_block_number_) || + !(is_running_))) { + this->rate_estimator_->onDownloadFinished(); + } + VegasTransportProtocol::copyContent(content_object); +} + +void RaaqmTransportProtocol::updatePathTable( + const ContentObject &content_object) { + uint32_t path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (cur_path_) { + // Create a new path with some default param + if (path_table_.empty()) { + throw errors::RuntimeException( + "No path initialized for path table, error could be in default " + "path initialization."); + } else { + // Initiate the new path default param + std::shared_ptr new_path = + std::make_shared( + *(path_table_.at(default_values::path_id))); + // Insert the new path into hash table + path_table_[path_id] = new_path; + } + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id]; + + size_t header_size = content_object.headerSize(); + size_t data_size = content_object.payloadSize(); + + // Update measurements for path + cur_path_->updateReceivedStats(header_size + data_size, data_size); +} + +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { + throw std::runtime_error("ERROR: no current path found, exit"); + } else { + std::chrono::microseconds rtt; + + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - + interest_timepoints_[segment & mask_]; + rtt = std::chrono::duration_cast(duration); + + if (this->rate_estimator_) { + this->rate_estimator_->onRttUpdate(rtt.count()); + } + cur_path_->insertNewRtt(rtt.count()); + cur_path_->smoothTimer(); + + if (cur_path_->newPropagationDelayAvailable()) { + check_drop_probability(); + } + } +} + +void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) { + return; +} + +void RaaqmTransportProtocol::check_drop_probability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + std::unordered_map>::iterator it; + for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && + it->second->getPropagationDelay() != UINT_MAX && + !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { // only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { // at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { // at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + old_beta = socket_->beta_; + old_drop_prob = socket_->drop_factor_; + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + socket_->beta_ = beta; + socket_->drop_factor_ = drop_prob; + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::check_for_stale_paths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + std::unordered_map>::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + check_drop_probability(); + } +} + +void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { + check_for_stale_paths(); + VegasTransportProtocol::onTimeout(std::move(interest)); +} + +void RaaqmTransportProtocol::increaseWindow() { + double max_window_size = socket_->max_window_size_; + if (current_window_size_ < max_window_size) { + double gamma = socket_->gamma_; + + current_window_size_ += gamma / current_window_size_; + socket_->current_window_size_ = current_window_size_; + } + this->rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + double min_window_size = socket_->min_window_size_; + if (current_window_size_ > min_window_size) { + double beta = socket_->beta_; + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->current_window_size_ = current_window_size_; + } + this->rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + if (rand() % 10000 <= cur_path_->getDropProb() * 10000) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().getSuffix()); + this->rate_estimator_->onDataReceived((int)content_object.payloadSize() + + content_object.headerSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h new file mode 100755 index 000000000..6ca410251 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/raaqm.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace transport { + +namespace protocol { + +class RaaqmTransportProtocol : public VegasTransportProtocol { + public: + RaaqmTransportProtocol(interface::BaseSocket *icnet_socket); + + ~RaaqmTransportProtocol(); + + void start(utils::SharableVector &content_buffer) override; + + protected: + void copyContent(const ContentObject &content_object) override; + + private: + void init(); + + void afterContentReception(const Interest &interest, + const ContentObject &content_object) override; + + void afterDataUnsatisfied(uint64_t segment) override; + + void increaseWindow() override; + + void updateRtt(uint64_t segment); + + void decreaseWindow() override; + + void changeInterestLifetime(uint64_t segment) override; + + void onTimeout(Interest::Ptr &&interest) override; + + void RAAQM(); + + void updatePathTable(const ContentObject &content_object); + + void check_drop_probability(); + + void check_for_stale_paths(); + + void printRtt(); + + /** + * Current download path + */ + std::shared_ptr cur_path_; + + /** + * Hash table for path: each entry is a pair path ID(key) - path object + */ + std::unordered_map> path_table_; + + bool set_interest_filter_; + // for rate-estimation at packet level + IcnRateEstimator *rate_estimator_; + + // params for autotuning + bool raaqm_autotune_; + double default_beta_; + double default_drop_; + double beta_wifi_; + double drop_wifi_; + double beta_lte_; + double drop_lte_; + unsigned int wifi_delay_; + unsigned int lte_delay_; +}; + +} // end namespace protocol + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc new file mode 100755 index 000000000..f876cf4f8 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { + +namespace protocol { + +RaaqmDataPath::RaaqmDataPath(double drop_factor, + double minimum_drop_probability, + unsigned new_timer, unsigned int samples, + uint64_t new_rtt, uint64_t new_rtt_min, + uint64_t new_rtt_max, unsigned new_pd) + + : drop_factor_(drop_factor), + minimum_drop_probability_(minimum_drop_probability), + timer_(new_timer), + samples_(samples), + rtt_(new_rtt), + rtt_min_(new_rtt_min), + rtt_max_(new_rtt_max), + prop_delay_(new_pd), + new_prop_delay_(false), + drop_prob_(0), + packets_received_(0), + last_packets_received_(0), + m_packets_bytes_received_(0), + last_packets_bytes_received_(0), + raw_data_bytes_received_(0), + last_raw_data_bytes_received_(0), + rtt_samples_(samples_), + average_rtt_(0), + alpha_(ALPHA) { + gettimeofday(&m_last_received_pkt_, 0); +} + +RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { + rtt_ = new_rtt; + rtt_samples_.pushBack(new_rtt); + + rtt_max_ = rtt_samples_.rBegin(); + rtt_min_ = rtt_samples_.begin(); + + if (rtt_min_ < prop_delay_) { + new_prop_delay_ = true; + prop_delay_ = rtt_min_; + } + + gettimeofday(&m_last_received_pkt_, 0); + + return *this; +} + +RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, + std::size_t data_size) { + packets_received_++; + m_packets_bytes_received_ += packet_size; + raw_data_bytes_received_ += data_size; + + return *this; +} + +double RaaqmDataPath::getDropFactor() { return drop_factor_; } + +double RaaqmDataPath::getDropProb() { return drop_prob_; } + +RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) { + drop_prob_ = dropProb; + + return *this; +} + +double RaaqmDataPath::getMinimumDropProbability() { + return minimum_drop_probability_; +} + +double RaaqmDataPath::getTimer() { return timer_; } + +RaaqmDataPath &RaaqmDataPath::smoothTimer() { + timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ + + (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO); + + return *this; +} + +double RaaqmDataPath::getRtt() { return rtt_; } + +double RaaqmDataPath::getAverageRtt() { return average_rtt_; } + +double RaaqmDataPath::getRttMax() { return rtt_max_; } + +double RaaqmDataPath::getRttMin() { return rtt_min_; } + +unsigned RaaqmDataPath::getSampleValue() { return samples_; } + +unsigned RaaqmDataPath::getRttQueueSize() { + return static_cast(rtt_samples_.size()); +} + +RaaqmDataPath &RaaqmDataPath::updateDropProb() { + drop_prob_ = 0.0; + + if (getSampleValue() == getRttQueueSize()) { + if (rtt_max_ == rtt_min_) { + drop_prob_ = minimum_drop_probability_; + } else { + drop_prob_ = minimum_drop_probability_ + + drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_); + } + } + + return *this; +} + +double RaaqmDataPath::getMicroSeconds(struct timeval &time) { + return (double)(time.tv_sec) * 1000000 + (double)(time.tv_usec); +} + +void RaaqmDataPath::setAlpha(double alpha) { + if (alpha >= 0 && alpha <= 1) { + alpha_ = alpha; + } +} + +bool RaaqmDataPath::newPropagationDelayAvailable() { + bool r = new_prop_delay_; + new_prop_delay_ = false; + return r; +} + +unsigned int RaaqmDataPath::getPropagationDelay() { return prop_delay_; } + +bool RaaqmDataPath::isStale() { + struct timeval now; + gettimeofday(&now, 0); + double time = getMicroSeconds(now) - getMicroSeconds(m_last_received_pkt_); + if (time > 2000000) { + return true; + } + return false; +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h new file mode 100755 index 000000000..6f63940c9 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +#define TIMEOUT_SMOOTHER 0.1 +#define TIMEOUT_RATIO 10 +#define ALPHA 0.8 + +namespace transport { + +namespace protocol { + +class RaaqmDataPath { + public: + RaaqmDataPath(double drop_factor, double minimum_drop_probability, + unsigned new_timer, unsigned int samples, + uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000, + uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX); + + public: + /* + * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is + * full, and thus need overwrite. Also it maintains the validity of min and + * max of RTT. + * @param new_rtt is the value of the new RTT + */ + RaaqmDataPath &insertNewRtt(uint64_t new_rtt); + + /** + * @brief Update the path statistics + * @param packet_size the size of the packet received, including the ICN + * header + * @param data_size the size of the data received, without the ICN header + */ + RaaqmDataPath &updateReceivedStats(std::size_t packet_size, + std::size_t data_size); + + /** + * @brief Get the value of the drop factor parameter + */ + double getDropFactor(); + + /** + * @brief Get the value of the drop probability + */ + double getDropProb(); + + /** + * @brief Set the value pf the drop probability + * @param drop_prob is the value of the drop probability + */ + RaaqmDataPath &setDropProb(double drop_prob); + + /** + * @brief Get the minimum drop probability + */ + double getMinimumDropProbability(); + + /** + * @brief Get last RTT + */ + double getRtt(); + + /** + * @brief Get average RTT + */ + double getAverageRtt(); + + /** + * @brief Get the current m_timer value + */ + double getTimer(); + + /** + * @brief Smooth he value of the m_timer accordingly with the last RTT + * measured + */ + RaaqmDataPath &smoothTimer(); + + /** + * @brief Get the maximum RTT among the last samples + */ + double getRttMax(); + + /** + * @brief Get the minimum RTT among the last samples + */ + double getRttMin(); + + /** + * @brief Get the number of saved samples + */ + unsigned getSampleValue(); + + /** + * @brief Get the size og the RTT queue + */ + unsigned getRttQueueSize(); + + /* + * @brief Change drop probability according to RTT statistics + * Invoked in RAAQM(), before control window size update. + */ + RaaqmDataPath &updateDropProb(); + + /** + * @brief This function convert the time from struct timeval to its value in + * microseconds + */ + static double getMicroSeconds(struct timeval &time); + + void setAlpha(double alpha); + + /** + * @brief Returns the smallest RTT registered so far for this path + */ + + unsigned int getPropagationDelay(); + + bool newPropagationDelayAvailable(); + + bool isStale(); + + private: + /** + * The value of the drop factor + */ + double drop_factor_; + + /** + * The minumum drop probability + */ + double minimum_drop_probability_; + + /** + * The timer, expressed in milliseconds + */ + double timer_; + + /** + * The number of samples to store for computing the protocol measurements + */ + const unsigned int samples_; + + /** + * The last, the minimum and the maximum value of the RTT (among the last + * m_samples samples) + */ + uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_; + + bool new_prop_delay_; + + /** + * The current drop probability + */ + double drop_prob_; + + /** + * The number of packets received in this path + */ + intmax_t packets_received_; + + /** + * The first packet received after the statistics print + */ + intmax_t last_packets_received_; + + /** + * Total number of bytes received including the ICN header + */ + intmax_t m_packets_bytes_received_; + + /** + * The amount of packet bytes received at the last path summary computation + */ + intmax_t last_packets_bytes_received_; + + /** + * Total number of bytes received without including the ICN header + */ + intmax_t raw_data_bytes_received_; + + /** + * The amount of raw dat bytes received at the last path summary computation + */ + intmax_t last_raw_data_bytes_received_; + + class byArrival; + + class byOrder; + + /** + * Double ended queue for the RTTs + */ + + typedef utils::MinFilter RTTQueue; + + RTTQueue rtt_samples_; + + /** + * Time of the last call to the path reporter method + */ + struct timeval m_last_received_pkt_; + + double average_rtt_; + double alpha_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc new file mode 100755 index 000000000..e313bf9f6 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rate_estimation.cc @@ -0,0 +1,353 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace transport { + +namespace protocol { + +void *Timer(void *data) { + InterRttEstimator *estimator = (InterRttEstimator *)data; + + double dat_rtt, my_avg_win, my_avg_rtt; + int my_win_change, number_of_packets, max_packet_size; + + pthread_mutex_lock(&(estimator->mutex_)); + dat_rtt = estimator->rtt_; + pthread_mutex_unlock(&(estimator->mutex_)); + + while (estimator->is_running_) { + usleep(KV * dat_rtt); + + pthread_mutex_lock(&(estimator->mutex_)); + + dat_rtt = estimator->rtt_; + my_avg_win = estimator->avg_win_; + my_avg_rtt = estimator->avg_rtt_; + my_win_change = estimator->win_change_; + number_of_packets = estimator->number_of_packets_; + max_packet_size = estimator->max_packet_size_; + estimator->avg_rtt_ = estimator->rtt_; + estimator->avg_win_ = 0; + estimator->win_change_ = 0; + estimator->number_of_packets_ = 1; + + pthread_mutex_unlock(&(estimator->mutex_)); + + if (number_of_packets == 0 || my_win_change == 0) { + continue; + } + if (estimator->estimation_ == 0) { + estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 / + (1.0 * my_win_change)) / + (my_avg_rtt / (1.0 * number_of_packets)); + } + + estimator->estimation_ = + estimator->alpha_ * estimator->estimation_ + + (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size * + 1000000.0 / (1.0 * my_win_change)) / + (my_avg_rtt / (1.0 * number_of_packets))); + + if (estimator->observer_) { + estimator->observer_->notifyStats(estimator->estimation_); + } + } + + return nullptr; +} + +InterRttEstimator::InterRttEstimator(double alpha_arg) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->thread_is_running_ = false; + this->my_th_ = NULL; + this->is_running_ = true; + this->avg_rtt_ = 0.0; + this->estimation_ = 0.0; + this->avg_win_ = 0.0; + this->rtt_ = 0.0; + this->win_change_ = 0; + this->number_of_packets_ = 0; + this->max_packet_size_ = 0; + this->win_current_ = 1.0; + + pthread_mutex_init(&(this->mutex_), NULL); + gettimeofday(&(this->start_time_), 0); + gettimeofday(&(this->begin_batch_), 0); +} + +InterRttEstimator::~InterRttEstimator() { + this->is_running_ = false; + if (this->my_th_) { + pthread_join(*(this->my_th_), NULL); + } + this->my_th_ = NULL; + pthread_mutex_destroy(&(this->mutex_)); +} + +void InterRttEstimator::onRttUpdate(double rtt) { + pthread_mutex_lock(&(this->mutex_)); + this->rtt_ = rtt; + this->number_of_packets_++; + this->avg_rtt_ += rtt; + pthread_mutex_unlock(&(this->mutex_)); + + if (!thread_is_running_) { + my_th_ = (pthread_t *)malloc(sizeof(pthread_t)); + if (!my_th_) { + TRANSPORT_LOGE("Error allocating thread."); + my_th_ = NULL; + } + if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer, + (void *)this)) { + TRANSPORT_LOGE("Error creating the thread"); + my_th_ = NULL; + } + thread_is_running_ = true; + } +} + +void InterRttEstimator::onWindowIncrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + gettimeofday(&(this->begin_batch_), 0); +} + +void InterRttEstimator::onWindowDecrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + gettimeofday(&(this->begin_batch_), 0); +} + +ALaTcpEstimator::ALaTcpEstimator() { + this->estimation_ = 0.0; + this->observer_ = NULL; + gettimeofday(&(this->start_time_), 0); + this->totalSize_ = 0.0; +} + +void ALaTcpEstimator::onStart() { + this->totalSize_ = 0.0; + gettimeofday(&(this->start_time_), 0); +} + +void ALaTcpEstimator::onDownloadFinished() { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->start_time_); + this->estimation_ = this->totalSize_ * 8 * 1000000 / delay; + if (observer_) { + observer_->notifyStats(this->estimation_); + } +} + +void ALaTcpEstimator::onDataReceived(int packet_size) { + this->totalSize_ += packet_size; +} + +SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) { + this->estimation_ = 0.0; + this->estimated_ = false; + this->observer_ = NULL; + this->batching_param_ = batching_param; + this->total_size_ = 0.0; + this->number_of_packets_ = 0; + this->base_alpha_ = alphaArg; + this->alpha_ = alphaArg; + gettimeofday(&(this->start_time_), 0); + gettimeofday(&(this->begin_batch_), 0); +} + +void SimpleEstimator::onStart() { + this->estimated_ = false; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_batch_), 0); + gettimeofday(&(this->start_time_), 0); +} + +void SimpleEstimator::onDownloadFinished() { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->start_time_); + if (observer_) { + observer_->notifyDownloadTime(delay); + } + if (!this->estimated_) { + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / + ((double)this->batching_param_)); + } else { + if (this->number_of_packets_ >= + (int)(75.0 * (double)this->batching_param_ / 100.0)) { + delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / + ((double)this->batching_param_)); + } + } + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_batch_), 0); + gettimeofday(&(this->start_time_), 0); +} + +void SimpleEstimator::onDataReceived(int packet_size) { + this->total_size_ += packet_size; +} + +void SimpleEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + + if (number_of_packets_ == this->batching_param_) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_batch_), 0); + } +} + +BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, + int param) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->batching_param_ = param; + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + this->max_packet_size_ = 0; + this->estimation_ = 0.0; + this->win_current_ = 1.0; + gettimeofday(&(this->begin_batch_), 0); + gettimeofday(&(this->start_time_), 0); +} + +void BatchingPacketsEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + this->avg_rtt_ += rtt; + + if (number_of_packets_ == this->batching_param_) { + if (estimation_ == 0) { + estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / + (1.0 * win_change_)) / + (avg_rtt_ / (1.0 * number_of_packets_)); + } else { + estimation_ = alpha_ * estimation_ + + (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ * + 1000000.0 / (1.0 * win_change_)) / + (avg_rtt_ / (1.0 * number_of_packets_))); + } + + if (observer_) { + observer_->notifyStats(estimation_); + } + + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + } +} + +void BatchingPacketsEstimator::onWindowIncrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + gettimeofday(&(this->begin_batch_), 0); +} + +void BatchingPacketsEstimator::onWindowDecrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - + RaaqmDataPath::getMicroSeconds(this->begin_batch_); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + gettimeofday(&(this->begin_batch_), 0); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h new file mode 100755 index 000000000..b889efe12 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rate_estimation.h @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +#define BATCH 50 +#define KV 20 +#define ALPHA 0.8 +#define RATE_CHOICE 0 + +namespace transport { + +namespace protocol { + +class IcnRateEstimator { + public: + IcnRateEstimator(){}; + + virtual ~IcnRateEstimator(){}; + + virtual void onRttUpdate(double rtt){}; + + virtual void onDataReceived(int packetSize){}; + + virtual void onWindowIncrease(double winCurrent){}; + + virtual void onWindowDecrease(double winCurrent){}; + + virtual void onStart(){}; + + virtual void onDownloadFinished(){}; + + virtual void setObserver(IcnObserver *observer) { + this->observer_ = observer; + }; + IcnObserver *observer_; + struct timeval start_time_; + struct timeval begin_batch_; + double base_alpha_; + double alpha_; + double estimation_; + int number_of_packets_; + // this boolean is to make sure at least one estimation of the BW is done + bool estimated_; +}; + +// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT) + +class InterRttEstimator : public IcnRateEstimator { + public: + InterRttEstimator(double alpha_arg); + + ~InterRttEstimator(); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart(){}; + + void onDownloadFinished(){}; + + // private: should be done by using getters + pthread_t *my_th_; + bool thread_is_running_; + double rtt_; + bool is_running_; + pthread_mutex_t mutex_; + double avg_rtt_; + double avg_win_; + int max_packet_size_; + double win_change_; + double win_current_; +}; + +// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT) + +class BatchingPacketsEstimator : public IcnRateEstimator { + public: + BatchingPacketsEstimator(double alpha_arg, int batchingParam); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart(){}; + + void onDownloadFinished(){}; + + private: + int batching_param_; + double avg_rtt_; + double avg_win_; + double win_change_; + int max_packet_size_; + double win_current_; +}; + +// Segment Estimator + +class ALaTcpEstimator : public IcnRateEstimator { + public: + ALaTcpEstimator(); + + void onDataReceived(int packet_size); + void onStart(); + void onDownloadFinished(); + + private: + double totalSize_; +}; + +// A Rate estimator, this one is the simplest: counting batching_param_ packets +// and then divide the sum of the size of these packets by the time taken to DL +// them. Should be the one used + +class SimpleEstimator : public IcnRateEstimator { + public: + SimpleEstimator(double alpha, int batching_param); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size); + + void onWindowIncrease(double win_current){}; + + void onWindowDecrease(double win_current){}; + + void onStart(); + + void onDownloadFinished(); + + private: + int batching_param_; + double total_size_; +}; + +void *Timer(void *data); + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc new file mode 100755 index 000000000..1f42cf230 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -0,0 +1,813 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +/* + * TODO + * 2) start/constructor/rest variable implementation + * 3) interest retransmission: now I always recover, we should recover only if + * we have enough time 4) returnContentToUser: rememeber to remove the first + * 32bits from the payload + */ + +namespace transport { + +namespace protocol { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol(BaseSocket *icnet_socket) + : TransportProtocol(icnet_socket), + inflightInterests_(1 << default_values::log_2_default_buffer_size), + modMask_((1 << default_values::log_2_default_buffer_size) - 1) { + icnet_socket->getSocketOption(PORTAL, portal_); + reset(); +} + +RTCTransportProtocol::~RTCTransportProtocol() { + if (is_running_) { + stop(); + } +} + +void RTCTransportProtocol::start( + utils::SharableVector &content_buffer) { + + if(is_running_) + return; + + is_running_ = true; + content_buffer_ = content_buffer.shared_from_this(); + + reset(); + scheduleNextInterest(); + + portal_->runEventsLoop(); + is_running_ = false; +} + +void RTCTransportProtocol::stop() { + if(!is_running_) + return; + + is_running_ = false; + portal_->stopEventsLoop(); +} + +void RTCTransportProtocol::resume(){ + if(is_running_) + return; + + is_running_ = true; + + lastRoundBegin_ = std::chrono::steady_clock::now(); + inflightInterestsCount_ = 0; + if(content_buffer_) + content_buffer_->clear(); + + scheduleNextInterest(); + + portal_->runEventsLoop(); + + is_running_ = false; +} + +void RTCTransportProtocol::onRTCPPacket(uint8_t *packet, size_t len) { + //#define MASK_RTCP_VERSION 192 + //#define MASK_TYPE_CODE 31 + size_t read = 0; + uint8_t *offset = packet; + while (read < len) { + if ((((*offset) & MASK_RTCP_VERSION) >> 6) != RTCP_VERSION) { + TRANSPORT_LOGE("error while parsing RTCP packet, version unkwown"); + return; + } + processRtcpHeader(offset); + uint16_t RTCPlen = (ntohs(*(((uint16_t *)offset) + 1)) + 1) * 4; + offset += RTCPlen; + read += RTCPlen; + } +} + +// private +void RTCTransportProtocol::reset() { + // controller var + lastRoundBegin_ = std::chrono::steady_clock::now(); + currentState_ = RTC_SYNC_STATE; + + // cwin var + currentCWin_ = INITIAL_CWIN; + maxCWin_ = INITIAL_CWIN_MAX; + + // names/packets var + actualSegment_ = 0; + inflightInterestsCount_ = 0; + while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + nackedByProducer_.clear(); + nackedByProducerMaxSize_ = 512; + if (content_buffer_) content_buffer_->clear(); + + holes_.clear(); + lastReceived_ = 0; + + // stats + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; + avgPacketSize_ = INIT_PACKET_SIZE; + gotNack_ = false; + gotFutureNack_ = 0; + roundsWithoutNacks_ = 0; + pathTable_.clear(); + // roundCounter_ = 0; + // minRTTwin_.clear(); + // for (int i = 0; i < MIN_RTT_WIN; i++) + // minRTTwin_.push_back(UINT_MAX); + minRtt_ = UINT_MAX; + + // CC var + estimatedBw_ = 0.0; + lossRate_ = 0.0; + queuingDelay_ = 0.0; + protocolState_ = RTC_NORMAL_STATE; + + producerPathLabel_ = 0; + socket_->setSocketOption( + GeneralTransportOptions::INTEREST_LIFETIME, + (uint32_t) + RTC_INTEREST_LIFETIME); // XXX this should bedone by the application +} + +uint32_t max(uint32_t a, uint32_t b) { + if (a > b) + return a; + else + return b; +} + +uint32_t min(uint32_t a, uint32_t b) { + if (a < b) + return a; + else + return b; +} + +void RTCTransportProtocol::checkRound() { + uint32_t duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - lastRoundBegin_) + .count(); + if (duration >= ROUND_LEN) { + lastRoundBegin_ = std::chrono::steady_clock::now(); + updateStats(duration); // update stats and window + } +} + +void RTCTransportProtocol::updateDelayStats( + const ContentObject &content_object) { + uint32_t segmentNumber = content_object.getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + if (inflightInterests_[pkt].transmissionTime == + 0) // this is always the case if we have a retransmitted packet (timeout + // or RTCP) + return; + + uint32_t pathLabel = content_object.getPathLabel(); + + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // found a new path + std::shared_ptr newPath = std::make_shared(); + pathTable_[pathLabel] = newPath; + } + + // RTT measurements are useful both from NACKs and data packets + uint64_t RTT = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + inflightInterests_[pkt].transmissionTime; + + pathTable_[pathLabel]->insertRttSample(RTT); + + // we collect OWD only for datapackets + if (content_object.getPayload().length() != NACK_HEADER_SIZE) { + uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload().data(); + + int64_t OWD = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count() - + *senderTimeStamp; + + pathTable_[pathLabel]->insertOwdSample(OWD); + } +} + +void RTCTransportProtocol::updateStats(uint32_t round_duration) { + if (receivedBytes_ != 0) { + double bytesPerSec = (double)(receivedBytes_ * ((double)MILLI_IN_A_SEC / + (double)round_duration)); + estimatedBw_ = (estimatedBw_ * ESTIMATED_BW_ALPHA) + + ((1 - ESTIMATED_BW_ALPHA) * bytesPerSec); + } + + auto it = pathTable_.find(producerPathLabel_); + if (it == pathTable_.end()) return; + + // double maxAvgRTT = it->second->getAverageRtt(); + // double minRTT = it->second->getMinRtt(); + minRtt_ = it->second->getMinRtt(); + queuingDelay_ = it->second->getQueuingDealy(); + + if (minRtt_ == 0) minRtt_ = 1; + + for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { + it->second->roundEnd(); + } + + // this is inefficient but the window is supposed to be small, so it + // probably makes sense to leave it like this + // if(minRTT == 0) + // minRTT = 1; + + // minRTTwin_[roundCounter_ % MIN_RTT_WIN] = minRTT; + // minRtt_ = minRTT; + // for (int i = 0; i < MIN_RTT_WIN; i++) + // if(minRtt_ > minRTTwin_[i]) + // minRtt_ = minRTTwin_[i]; + + // roundCounter_++; + + // std::cout << "min RTT " << minRtt_ << " queuing " << queuingDelay_ << + // std::endl; + + if (sentInterest_ != 0 && currentState_ == RTC_NORMAL_STATE) { + double lossRate = (double)((double)packetLost_ / (double)sentInterest_); + lossRate_ = lossRate_ * ESTIMATED_LOSSES_ALPHA + + (lossRate * (1 - ESTIMATED_LOSSES_ALPHA)); + } + + if (avgPacketSize_ == 0) avgPacketSize_ = INIT_PACKET_SIZE; + + uint32_t BDP = + ceil((estimatedBw_ * (double)((double)minRtt_ / (double)MILLI_IN_A_SEC) * + BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); + uint32_t BW = ceil(estimatedBw_); + computeMaxWindow(BW, BDP); + + // bound also by interest lifitime* production rate + if (!gotNack_) { + roundsWithoutNacks_++; + if (currentState_ == RTC_SYNC_STATE && + roundsWithoutNacks_ >= ROUNDS_IN_SYNC_BEFORE_SWITCH) { + currentState_ = RTC_NORMAL_STATE; + } + } else { + roundsWithoutNacks_ = 0; + } + + updateCCState(); + updateWindow(); + + // in any case we reset all the counters + + gotNack_ = false; + gotFutureNack_ = 0; + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; +} + +void RTCTransportProtocol::updateCCState() { + // TODO +} + +void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, + uint32_t BDPWin) { + if (productionRate == + 0) // we have no info about the producer, keep the previous maxCWin + return; + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + uint32_t maxWaintingInterest = ceil( + (productionRate / avgPacketSize_) * + (double)((double)(interestLifetime * INTEREST_LIFETIME_REDUCTION_FACTOR) / + (double)MILLI_IN_A_SEC)); + + if (currentState_ == RTC_SYNC_STATE) { + // in this case we do not limit the window with the BDP, beacuse most likly + // it is wrong + maxCWin_ = maxWaintingInterest; + return; + } + + // currentState = RTC_NORMAL_STATE + if (BDPWin != 0) { + maxCWin_ = ceil((double)BDPWin + ((double)BDPWin / 10.0)); // BDP + 10% + } else { + maxCWin_ = min(maxWaintingInterest, maxCWin_); + } +} + +void RTCTransportProtocol::updateWindow() { + if (currentState_ == RTC_SYNC_STATE) return; + + if (currentCWin_ < maxCWin_ * 0.7) { + currentCWin_ = min(maxCWin_, currentCWin_ * WIN_INCREASE_FACTOR); + } else if (currentCWin_ > maxCWin_) { + currentCWin_ = max(currentCWin_ * WIN_DECREASE_FACTOR, MIN_CWIN); + } +} + +void RTCTransportProtocol::decreaseWindow() { + // this is used only in SYNC mode + if (currentState_ == RTC_NORMAL_STATE) return; + + if (gotFutureNack_ == 1) + currentCWin_ = + min((currentCWin_ - 1), ceil((double)maxCWin_ * 0.66)); // 2/3 + else + currentCWin_--; + + currentCWin_ = max(currentCWin_, MIN_CWIN); +} + +void RTCTransportProtocol::increaseWindow() { + // this is used only in SYNC mode + if (currentState_ == RTC_NORMAL_STATE) return; + + // we need to be carefull to do not increase the window to much + if (currentCWin_ < ((double)maxCWin_ * 0.5)) { + currentCWin_ = currentCWin_ + 1; // exponential + } else { + currentCWin_ = min( + maxCWin_, ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear + } +} + +void RTCTransportProtocol::sendInterest() { + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + interest_name); + bool isRTX = false; + // uint32_t sentInt = 0; + + if (interestRetransmissions_.size() > 0) { + // handle retransmission + // here we have two possibile retransmissions: retransmissions due to + // timeouts and retransmissions due to RTCP NACKs. we will send the interest + // anyway, even if it is pending (this is possible only in the second case) + uint32_t rtxSeg = interestRetransmissions_.front(); + interestRetransmissions_.pop(); + + std::unordered_map::const_iterator res = + holes_.find(rtxSeg); + if (res != holes_.end()) { + // this packet is already managed by as an hole + // we don't need to send it again + return; + } + + // a packet recovery means that there was a loss + packetLost_++; + + uint32_t pkt = rtxSeg & modMask_; + interest_name.setSuffix(rtxSeg); + + // if the interest is not pending anymore we encrease the retrasnmission + // counter in order to avoid to handle a recovered packt as a normal one + if (!portal_->interestIsPending(interest_name)) { + inflightInterests_[pkt].retransmissions++; + } + + inflightInterests_[pkt].transmissionTime = 0; + isRTX = true; + } else { + // in this case we send the packet only if it is not pending yet + interest_name.setSuffix(actualSegment_); + if (portal_->interestIsPending(interest_name)) { + actualSegment_++; + return; + } + + // sentInt = actualSegment_; + uint32_t pkt = actualSegment_ & modMask_; + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + inflightInterests_[pkt].retransmissions = 0; + actualSegment_++; + } + + auto interest = getInterest(); + interest->setName(interest_name); + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + interest->setLifetime(uint32_t(interestLifetime)); + + ConsumerInterestCallback on_interest_output = VOID_HANDLER; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + on_interest_output); + + if (on_interest_output != VOID_HANDLER) { + on_interest_output(*dynamic_cast(socket_), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); + + sentInterest_++; + + if (!isRTX) { + inflightInterestsCount_++; + } +} + +void RTCTransportProtocol::scheduleNextInterest() { + checkRound(); + if(!is_running_) + return; + + uint32_t MAX_RECOVER = + 40; // if the packet is more than MAX_RECOVER seq in the past we drop it + uint64_t TIME_BEFORE_RECOVERY = 10; // this should be proporsional to the RTT + + // holes are important only in NORMAL state + if (currentState_ == RTC_NORMAL_STATE) { + for (std::unordered_map::iterator it = holes_.begin(); + it != holes_.end();) { + if (it->first < lastReceived_ - MAX_RECOVER) { + // the packet is to hold, remove it + it = holes_.erase(it); + } else { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t sinceLastTry = now - it->second; + + if (sinceLastTry > TIME_BEFORE_RECOVERY || it->second == 0) { + // a recovery means a packet lost + packetLost_++; + // update last sent time + it->second = now; + + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + interest_name); + + uint32_t pkt = it->first & modMask_; + interest_name.setSuffix(it->first); + + if (!portal_->interestIsPending(interest_name)) { + inflightInterests_[pkt].retransmissions++; + } + + inflightInterests_[pkt].transmissionTime = 0; + // XXX + // code refactoring: + // from here on this is a copy and paste of the code inside + // sendInterest this should go inside an other method + auto interest = getInterest(); + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + interest->setLifetime(uint32_t(interestLifetime)); + + ConsumerInterestCallback on_interest_output = VOID_HANDLER; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + on_interest_output); + if (on_interest_output != VOID_HANDLER) + on_interest_output(*dynamic_cast(socket_), + *interest); + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) return; + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); + + sentInterest_++; + } + ++it; + } + // as usual check the round at each packet + checkRound(); + } + } + + while (interestRetransmissions_.size() > 0) { + sendInterest(); + checkRound(); + } + + while (inflightInterestsCount_ < currentCWin_) { + sendInterest(); + checkRound(); + } +} + +void RTCTransportProtocol::scheduleAppNackRtx(std::vector &nacks) { + for (uint32_t i = 0; i < nacks.size(); i++) { + if (nackedByProducer_.find(nacks[i]) != nackedByProducer_.end()) { + continue; + } + // packetLost_++; + // XXX here I need to avoid the retrasmission for packet that were nacked by + // the network + interestRetransmissions_.push(nacks[i]); + } + + scheduleNextInterest(); +} +void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { + // packetLost_++; + + uint32_t segmentNumber = interest->getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + if (inflightInterests_[pkt].retransmissions == 0) { + inflightInterestsCount_--; + } + + if (inflightInterests_[pkt].retransmissions < MAX_RTX) { + interestRetransmissions_.push(segmentNumber); + } + + scheduleNextInterest(); +} + +void RTCTransportProtocol::onNack(const ContentObject &content_object) { + uint32_t *payload = (uint32_t *)content_object.getPayload().data(); + uint32_t productionSeg = *payload; + uint32_t productionRate = *(++payload); + uint32_t nackSegment = content_object.getName().getSuffix(); + + // we synch the estimated production rate with the actual one + estimatedBw_ = (double)productionRate; + + // if(inflightInterests_[segmentNumber % + // default_values::default_buffer_size].retransmissions != 0){ ignore nacks for + // retransmissions + // return; + //} + + gotNack_ = true; + + if (productionSeg > nackSegment) { + // we are asking for stuff produced in the past + actualSegment_ = max(productionSeg + 1, actualSegment_); + if (currentState_ == RTC_NORMAL_STATE) { + currentState_ = RTC_SYNC_STATE; + // if we switch in SYNC mode we do not care about holes + // se we reset the data structure. going back to NORMAL + // mode will anable again the holes_ check. + holes_.clear(); + lastReceived_ = 0; + } + + computeMaxWindow(productionRate, 0); + increaseWindow(); + + if (nackedByProducer_.size() >= nackedByProducerMaxSize_) + nackedByProducer_.erase(nackedByProducer_.begin()); + nackedByProducer_.insert(nackSegment); + + } else if (productionSeg < nackSegment) { + gotFutureNack_++; + // we are asking stuff in the future + // example + // 10 12 13 14 15 16 17 + // ^ ^ ^ + // in prod nack actual + // in this example we sent up to segment 17 and we get a nack for segment 15 + // this means that we will get nack also for 16 17 + // and valid data for 13 14 + // so the next segment to ask is 15, because 13 and 14 will can back anyway + // we go back only in the case that the actual segment is really bigger than + // nack segment, other we do nothing + + actualSegment_ = min(actualSegment_, nackSegment); + + computeMaxWindow(productionRate, 0); + decreaseWindow(); + + if (currentState_ == RTC_SYNC_STATE) { + currentState_ = RTC_NORMAL_STATE; + } + } // equal should not happen +} + +void RTCTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t payload_size = content_object->getPayload().length(); + uint32_t segmentNumber = content_object->getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + // try to recover holes + // we can recover haoles with valid data, nacks or retransmitted packets + bool recoveredHole = false; + std::unordered_map::const_iterator res = + holes_.find(segmentNumber); + if (res != holes_.end()) { + holes_.erase(res); + recoveredHole = true; + } + + if (payload_size == NACK_HEADER_SIZE) { + // Nacks always come form the producer, so we set the producerePathLabel_; + producerPathLabel_ = content_object->getPathLabel(); + if (inflightInterests_[pkt].retransmissions == 0) { + inflightInterestsCount_--; + onNack(*content_object); + updateDelayStats(*content_object); + } + + } else { + receivedData_++; + + avgPacketSize_ = + (ESTIMATED_PACKET_SIZE * avgPacketSize_) + + ((1 - ESTIMATED_PACKET_SIZE) * content_object->getPayload().length()); + + if (inflightInterests_[pkt].retransmissions == 0) { + inflightInterestsCount_--; + // we count only non retransmitted data in order to take into accunt only + // the transmition rate of the producer + receivedBytes_ += + content_object->headerSize() + content_object->payloadSize(); + updateDelayStats(*content_object); + + // handle holes + // the packet sequence make sense only in case of valid data (no nacks, no + // rtx) in RTC_NORMAL_STATE we should get all the packets in order, so if + // segmentNumber != lastReceived + 1 something happened + // if recoveredHole == true this is a packet recovered so we should do + // nothing + if (currentState_ == RTC_NORMAL_STATE && recoveredHole == false) { + if ((segmentNumber != lastReceived_ + 1) && + segmentNumber > lastReceived_) { + // we have holes in the sequence + for (uint32_t seq = lastReceived_ + 1; seq < segmentNumber; seq++) { + // the hole exists we do not insert it again + std::unordered_map::const_iterator res = + holes_.find(seq); + if (res == holes_.end()) + holes_.insert(std::make_pair(seq, 0)); // 0 means never sent + } + } + } + + // this if should be always true + if (segmentNumber > lastReceived_) { + lastReceived_ = segmentNumber; + } + } + + returnContentToUser(*content_object); + increaseWindow(); + } + + scheduleNextInterest(); +} + +void RTCTransportProtocol::returnContentToUser( + const ContentObject &content_object) { + // return content to the user + Array a = content_object.getPayload(); + + uint8_t *start = ((uint8_t *)a.data()) + TIMESTAMP_SIZE; + unsigned size = a.length() - TIMESTAMP_SIZE; + + // set offset between hICN and RTP packets + uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); + RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq; + + content_buffer_->insert(content_buffer_->end(), start, start + size); + + ConsumerContentCallback on_payload = VOID_HANDLER; + socket_->getSocketOption(CONTENT_RETRIEVED, on_payload); + if (on_payload != VOID_HANDLER) { + on_payload(*dynamic_cast(socket_), size, + std::make_error_code(std::errc(0))); + } +} + +uint32_t RTCTransportProtocol::hICN2RTP(uint32_t hicn_seq) { + return RTPhICN_offset_ - hicn_seq; +} + +uint32_t RTCTransportProtocol::RTP2hICN(uint32_t rtp_seq) { + return RTPhICN_offset_ + rtp_seq; +} + +void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) { + uint8_t pkt_type = (*(offset + 1)); + switch (pkt_type) { + case RTCP_RR: // Receiver report + TRANSPORT_LOGI("got RR packet\n"); + break; + case RTCP_SR: // Sender report + TRANSPORT_LOGI("got SR packet\n"); + break; + case RTCP_SDES: // Description + processSDES(offset); + break; + case RTCP_RTPFB: // Transport layer FB message + processGenericNack(offset); + break; + case RTCP_PSFB: + processPli(offset); + break; + default: + errorParsingRtcpHeader(offset); + } +} + +void RTCTransportProtocol::errorParsingRtcpHeader(uint8_t *offset) { + uint8_t pt = (*(offset + 1)); + uint8_t code = ((*offset) & MASK_TYPE_CODE); + TRANSPORT_LOGE("Received unknwnon RTCP packet. Payload type = %u, code = %u", + pt, code); +} + +void RTCTransportProtocol::processSDES(uint8_t *offset) { + uint8_t code = ((*offset) & MASK_TYPE_CODE); + switch (code) { + case RTCP_SDES_CNAME: + TRANSPORT_LOGI("got SDES packet: CNAME\n"); + break; + default: + errorParsingRtcpHeader(offset); + } +} + +void RTCTransportProtocol::processPli(uint8_t *offset) { + if (((*offset) & MASK_TYPE_CODE) != RTCP_PSFB_PLI) { + errorParsingRtcpHeader(offset); + return; + } + + TRANSPORT_LOGI("got PLI packet\n"); +} + +void RTCTransportProtocol::processGenericNack(uint8_t *offset) { + if (((*offset) & MASK_TYPE_CODE) != RTCP_RTPFB_GENERIC_NACK) { + errorParsingRtcpHeader(offset); + return; + } + + std::vector nacks; + + uint16_t header_lines = + ntohs(*(((uint16_t *)offset) + 1)) - + 2; // 2 is the number of header 32-bits words - 1 (RFC 4885) + uint8_t *payload = offset + RTPC_NACK_HEADER; // 12 bytes + for (uint16_t l = header_lines; l > 0; l--) { + nacks.push_back(RTP2hICN(ntohs(*((uint16_t *)payload)))); + + uint16_t BLP = ntohs(*(((uint16_t *)payload) + 1)); + + for (int bit = 0; bit < 15; bit++) { // 16 bits word to scan + if ((BLP >> bit) & 1) { + nacks.push_back(RTP2hICN((ntohs(*((uint16_t *)payload)) + bit + 1) % + MAX_RTCP_SEQ_NUMBER)); + } + } + + payload += 4; // go to the next line + } + + portal_->getIoService().post(std::bind( + &RTCTransportProtocol::scheduleAppNackRtx, this, std::move(nacks))); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h new file mode 100755 index 000000000..249af6b99 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +// algorithm state +#define RTC_SYNC_STATE 0 +#define RTC_NORMAL_STATE 1 +#define ROUNDS_IN_SYNC_BEFORE_SWITCH 3 + +// packet constants +#define INIT_PACKET_SIZE 1300 // bytes +#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp +#define NACK_HEADER_SIZE 8 // bytes +#define TIMESTAMP_SIZE 8 // bytes +#define RTC_INTEREST_LIFETIME 1000 // ms + +// controller constant +#define ROUND_LEN \ + 200 // ms interval of time on which we take decisions / measurements +#define MAX_RTX 128 +#define MIN_RTT_WIN 30 // rounds + +// cwin +#define INITIAL_CWIN 1 // packets +#define INITIAL_CWIN_MAX 100000 // packets +#define MIN_CWIN 5 // packets + +// statistics constants +#define BANDWIDTH_SLACK_FACTOR 1.5 +#define ESTIMATED_BW_ALPHA 0.7 +#define ESTIMATED_PACKET_SIZE 0.7 +#define ESTIMATED_LOSSES_ALPHA 0.8 +#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 + +//#define MAX_LOSS_RATE 0.05 +//#define MAX_QUEUING_DELAY 200 //ms + +// cwin +#define INITIAL_CWIN 1 +#define MIN_CWIN 5 +#define WIN_DECREASE_FACTOR 0.8 +#define WIN_INCREASE_FACTOR 1.1 + +// protocol state +//#define RTC_CONGESTED_STATE 10 +//#define RTC_LOSSY_STATE 20 +//#define RTC_DELAY_STATE 30 +//#define RTC_NORMAL_STATE 40 + +// other constants +#define NANO_IN_A_SEC 1000000000 +#define MICRO_IN_A_SEC 1000000 +#define MILLI_IN_A_SEC 1000 + +// RTCP +#define MASK_RTCP_VERSION 192 +#define MASK_TYPE_CODE \ + 31 // this is RC in the RR/SR packet or FMT int the early feedback packets +#define RTPC_NACK_HEADER 12 // bytes +#define MAX_RTCP_SEQ_NUMBER 0xffff +#define RTCP_VERSION 2 +// RTCP TYPES +#define RTCP_SR 200 +#define RTCP_RR 201 +#define RTCP_SDES 202 +#define RTCP_RTPFB 205 +#define RTCP_PSFB 206 +// RTCP RC/FMT +#define RTCP_SDES_CNAME 1 +#define RTCP_RTPFB_GENERIC_NACK 1 +#define RTCP_PSFB_PLI 1 + +namespace transport { + +namespace protocol { + +struct sentInterest { + uint64_t transmissionTime; + uint8_t retransmissions; +}; + +class RTCTransportProtocol : public TransportProtocol { + public: + RTCTransportProtocol(interface::BaseSocket *icnet_socket); + + ~RTCTransportProtocol(); + + void start(utils::SharableVector &content_buffer); + + void stop(); + + void resume(); + + void onRTCPPacket(uint8_t *packet, size_t len); + + private: + // algo functions + void reset(); + void checkRound(); + + // CC functions + void updateDelayStats(const ContentObject &content_object); + void updateStats(uint32_t round_duration); + void updateCCState(); + void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin); + void updateWindow(); + void decreaseWindow(); + void increaseWindow(); + void resetPreviousWindow(); + + // packet functions + void sendInterest(); + void scheduleNextInterest(); + void scheduleAppNackRtx(std::vector &nacks); + void onTimeout(Interest::Ptr &&interest); + void onNack(const ContentObject &content_object); + void onContentObject(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object); + void returnContentToUser(const ContentObject &content_object); + + // RTCP functions + uint32_t hICN2RTP(uint32_t hicn_seq); + uint32_t RTP2hICN(uint32_t rtp_seq); + void processRtcpHeader(uint8_t *offset); + void errorParsingRtcpHeader(uint8_t *offset); + void processSDES(uint8_t *offset); + void processGenericNack(uint8_t *offset); + void processPli(uint8_t *offset); + + // controller var + std::chrono::steady_clock::time_point lastRoundBegin_; + // bool allPacketsInSync_; + // unsigned numberOfRoundsInSync_; + // unsigned numberOfCatchUpRounds_; + // bool catchUpPhase_; + unsigned currentState_; + + // uint32_t inProduction_; + + // cwin var + uint32_t currentCWin_; + uint32_t maxCWin_; + // uint32_t previousCWin_; + + // names/packets var + uint32_t actualSegment_; + int32_t RTPhICN_offset_; + uint32_t inflightInterestsCount_; + std::queue interestRetransmissions_; + std::vector inflightInterests_; + uint32_t nackedByProducerMaxSize_; + std::set + nackedByProducer_; // this is used to avoid retransmissions from the + // application for pakets for which we already got a + // past NACK by the producer these packet are too old, + // they will never be retrived + std::shared_ptr> content_buffer_; + uint32_t modMask_; + + // stats + uint32_t receivedBytes_; + uint32_t sentInterest_; + uint32_t receivedData_; + uint32_t packetLost_; + double avgPacketSize_; + bool gotNack_; + uint32_t gotFutureNack_; + uint32_t roundsWithoutNacks_; + uint32_t producerPathLabel_; // XXX we pick only one path lable for the + // producer for now, assuming the usage of a + // single path this should be extended to a + // vector + std::unordered_map> pathTable_; + uint32_t roundCounter_; + // std::vector minRTTwin_; + uint64_t minRtt_; + + std::unordered_map holes_; + uint32_t lastReceived_; + + // CC var + double estimatedBw_; + double lossRate_; + double queuingDelay_; + unsigned protocolState_; +}; + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc new file mode 100755 index 000000000..6c9605fb2 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { + +namespace protocol { + +RTCDataPath::RTCDataPath() + : min_rtt(UINT_MAX), + prev_min_rtt(UINT_MAX), + min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the + // real OWD, but the measured one, that depends on the + // clock of sender and receiver. the only meaningful + // value is is the queueing delay. for this reason we + // keep both RTT (for the windowd calculation) and OWD + // (for congestion/quality control) + prev_min_owd(INT_MAX), + avg_owd(0.0), + queuing_delay(0.0), + RTThistory_(HISTORY_LEN), + OWDhistory_(HISTORY_LEN){}; + +void RTCDataPath::insertRttSample(uint64_t rtt) { + // for the rtt we only keep track of the min one + if (rtt < min_rtt) min_rtt = rtt; +} + +void RTCDataPath::insertOwdSample(int64_t owd) { + // for owd we use both min and avg + if (owd < min_owd) min_owd = owd; + + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); +} + +void RTCDataPath::roundEnd() { + // compute queuing delay + queuing_delay = avg_owd - getMinOwd(); + + // reset min_rtt and add it to the history + if (min_rtt != UINT_MAX) { + prev_min_rtt = min_rtt; + } else { + // this may happen if we do not receive any packet + // from this path in the last round. in this case + // we use the measure from the previuos round + min_rtt = prev_min_rtt; + } + + RTThistory_.pushBack(min_rtt); + min_rtt = UINT_MAX; + + // do the same for min owd + if (min_owd != INT_MAX) { + prev_min_owd = min_owd; + } else { + min_owd = prev_min_owd; + } + + OWDhistory_.pushBack(min_owd); + min_owd = INT_MAX; +} + +double RTCDataPath::getQueuingDealy() { return queuing_delay; } + +uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); } + +int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); } + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h new file mode 100755 index 000000000..ace16ff12 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#define ALPHA_RTC 0.125 +#define HISTORY_LEN 30 + +namespace transport { + +namespace protocol { + +class RTCDataPath { + public: + RTCDataPath(); + + public: + void insertRttSample(uint64_t rtt); + void insertOwdSample(int64_t owd); + + uint64_t getMinRtt(); + + double getQueuingDealy(); + + void roundEnd(); + + private: + int64_t getMinOwd(); + + uint64_t min_rtt; + uint64_t prev_min_rtt; + + int64_t min_owd; + int64_t prev_min_owd; + + double avg_owd; + + double queuing_delay; + + utils::MinFilter RTThistory_; + utils::MinFilter OWDhistory_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt new file mode 100755 index 000000000..6f9fdb9aa --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt @@ -0,0 +1,10 @@ +# Enable gcov output for the tests +add_definitions(--coverage) +set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") + +set(TestsExpectedToPass + test_transport_producer) + +foreach(test ${TestsExpectedToPass}) + AddTest(${test}) +endforeach() \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc new file mode 100755 index 000000000..204f2cbe2 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../socket_producer.h" +#include "literals.h" + +#include +#include + +namespace transport { + +namespace protocol { + +namespace { +// The fixture for testing class Foo. +class ProducerTest : public ::testing::Test { + protected: + ProducerTest() : name_("b001::123|321"), producer_(io_service_) { + // You can do set-up work for each test here. + } + + virtual ~ProducerTest() { + // You can do clean-up work that doesn't throw exceptions here. + } + + // If the constructor and destructor are not enough for setting up + // and cleaning up each test, you can define the following methods: + + virtual void SetUp() { + // Code here will be called immediately after the constructor (right + // before each test). + } + + virtual void TearDown() { + // Code here will be called immediately after each test (right + // before the destructor). + } + + Name name_; + asio::io_service io_service_; + ProducerSocket producer_; +}; + +} // namespace + +// Tests that the Foo::Bar() method does Abc. +TEST_F(ProducerTest, ProduceContent) { + std::string content(250000, '?'); + + producer_.registerPrefix(Prefix("b001::/64")); + producer_.produce(name_, reinterpret_cast(content.data()), + content.size(), true); + producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + 500000000_U32); + producer_.attach(); + producer_.serveForever(); +} + +} // namespace protocol + +} // namespace transport + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/vegas.cc b/libtransport/src/hicn/transport/protocols/vegas.cc new file mode 100755 index 000000000..b6d79bfcc --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/vegas.cc @@ -0,0 +1,630 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +namespace transport { + +namespace protocol { + +using namespace interface; + +VegasTransportProtocol::VegasTransportProtocol(BaseSocket *icnet_socket) + : TransportProtocol(icnet_socket), + is_final_block_number_discovered_(false), + final_block_number_(std::numeric_limits::max()), + last_reassembled_segment_(0), + content_buffer_size_(0), + current_window_size_(default_values::min_window_size), + interests_in_flight_(0), + next_suffix_(0), + interest_retransmissions_(1 << default_values::log_2_default_buffer_size), + interest_timepoints_(1 << default_values::log_2_default_buffer_size), + retx_count_(0), + receive_buffer_(1 << default_values::log_2_default_buffer_size), + unverified_segments_(1 << default_values::log_2_default_buffer_size), + verified_manifests_(1 << default_values::log_2_default_buffer_size), + mask_((1 << default_values::log_2_default_buffer_size) - 1), + incremental_suffix_index_(0), + suffix_queue_completed_(false), + download_with_manifest_(false), + next_manifest_interval_(0_U16), + interest_tx_(0), + interest_count_(0), + byte_count_(0), + average_rtt_(0.0) { + portal_ = socket_->portal_; + incremental_suffix_index_++; +} + +VegasTransportProtocol::~VegasTransportProtocol() { stop(); } + +void VegasTransportProtocol::reset() { + portal_->setConsumerCallback(this); + + is_final_block_number_discovered_ = false; + interest_pool_index_ = 0; + final_block_number_ = std::numeric_limits::max(); + next_suffix_ = 0; + interests_in_flight_ = 0; + last_reassembled_segment_ = 0; + content_buffer_size_ = 0; + content_buffer_->clear(); + interest_retransmissions_.clear(); + interest_retransmissions_.resize( + 1 << default_values::log_2_default_buffer_size, 0); + interest_timepoints_.clear(); + interest_timepoints_.resize(1 << default_values::log_2_default_buffer_size, + std::chrono::steady_clock::time_point()); + receive_buffer_.clear(); + unverified_segments_.clear(); + verified_manifests_.clear(); + next_manifest_interval_ = 0; + next_manifest_ = 0; + download_with_manifest_ = false; + incremental_suffix_index_ = 0; + + interest_tx_ = 0; + interest_count_ = 0; + byte_count_ = 0; + average_rtt_ = 0; + + // asio::io_service &io_service = portal_->getIoService(); + + // if (io_service.stopped()) { + // io_service.reset(); + // } +} + +void VegasTransportProtocol::start( + utils::SharableVector &content_buffer) { + + if(is_running_) + return; + + socket_->t0_ = std::chrono::steady_clock::now(); + + is_running_ = true; + content_buffer_ = content_buffer.shared_from_this(); + + reset(); + + sendInterest(next_suffix_++); + portal_->runEventsLoop(); + removeAllPendingInterests(); + is_running_ = false; + +} + +void VegasTransportProtocol::resume(){ + if(is_running_) + return; + + is_running_ = true; + sendInterest(next_suffix_++); + portal_->runEventsLoop(); + removeAllPendingInterests(); + is_running_ = false; +} + +void VegasTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = getInterest(); + socket_->network_name_.setSuffix(next_suffix); + interest->setName(socket_->network_name_); + + interest->setLifetime(uint32_t(socket_->interest_lifetime_)); + + if (socket_->on_interest_output_ != VOID_HANDLER) { + socket_->on_interest_output_(*socket_, *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_++; + interest_retransmissions_[next_suffix & mask_] = 0; + interest_timepoints_[next_suffix & mask_] = std::chrono::steady_clock::now(); + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); +} + +void VegasTransportProtocol::stop() { + is_running_ = false; + portal_->stopEventsLoop(); +} + +void VegasTransportProtocol::onContentSegment( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + bool virtual_download = socket_->virtual_download_; + + if (verifyContentObject(*content_object)) { + byte_count_ += content_object->getPayload().length(); + + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { + is_final_block_number_discovered_ = true; + final_block_number_ = incremental_suffix; + } + + if (!virtual_download) { + receive_buffer_.emplace( + std::make_pair(incremental_suffix, std::move(content_object))); + reassemble(); + } else if (TRANSPORT_EXPECT_FALSE(is_final_block_number_discovered_ && + incremental_suffix == + final_block_number_)) { + returnContentToUser(); + } + } else { + unverified_segments_.emplace( + std::make_pair(incremental_suffix, std::move(content_object))); + } +} + +void VegasTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + increaseWindow(); +} + +void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + decreaseWindow(); +} + +void VegasTransportProtocol::scheduleNextInterests() { + if (is_running_) { + uint32_t next_suffix; + while (interests_in_flight_ < current_window_size_) { + if (download_with_manifest_) { + if (suffix_queue_.size() * 2 < current_window_size_ && + next_manifest_ < final_block_number_ && next_manifest_interval_) { + next_manifest_ += next_manifest_interval_; + sendInterest(next_manifest_); + continue; + } + + if (suffix_queue_.pop(next_suffix)) { + // next_suffix = suffix_queue_.front(); + sendInterest(next_suffix); + // suffix_queue_.pop_front(); + } else { + if (!suffix_queue_completed_) { + TRANSPORT_LOGE("Empty queue!!!!!!"); + } + break; + } + } else { + if (is_final_block_number_discovered_) { + if (next_suffix_ > final_block_number_) { + return; + } + } + + sendInterest(next_suffix_++); + } + } + } +} + +void VegasTransportProtocol::decreaseWindow() { + if (current_window_size_ > socket_->min_window_size_) { + current_window_size_ = std::ceil(current_window_size_ / 2); + socket_->current_window_size_ = current_window_size_; + } +} + +void VegasTransportProtocol::increaseWindow() { + if (current_window_size_ < socket_->max_window_size_) { + current_window_size_++; + socket_->max_window_size_ = current_window_size_; + } +}; + +void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) { + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - interest_timepoints_[segment]; + rtt_estimator_.addMeasurement( + std::chrono::duration_cast(duration)); + + RtoEstimator::Duration rto = rtt_estimator_.computeRto(); + std::chrono::milliseconds lifetime = + std::chrono::duration_cast(rto); + + socket_->interest_lifetime_ = lifetime.count(); +} + +void VegasTransportProtocol::returnContentToUser() { + if (socket_->on_payload_retrieved_ != VOID_HANDLER) { + socket_->on_payload_retrieved_(*socket_, byte_count_, + std::make_error_code(std::errc(0))); + } + + stop(); +} + +void VegasTransportProtocol::onManifest( + std::unique_ptr &&manifest) { + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + download_with_manifest_ = true; + + uint32_t segment = manifest->getName().getSuffix(); + + if (verifyManifest(*manifest)) { + manifest->decode(); + + if (TRANSPORT_EXPECT_TRUE(manifest->getVersion() == + core::ManifestVersion::VERSION_1)) { + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = --manifest->getSuffixList().end(); + + if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { + _end++; + } + + // Get final block number + is_final_block_number_discovered_ = true; + final_block_number_ = manifest->getFinalBlockNumber(); + + for (; _it != _end; _it++) { + suffix_hash_map_[_it->first] = std::make_pair( + std::vector(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + suffix_queue_.push(_it->first); + } + + next_manifest_interval_ = manifest->getSuffixList().size(); + + if (manifest->isFinalManifest()) { + suffix_queue_completed_ = true; + // Give it a try + if (verifier_thread_) { + asio::io_service &io_service = portal_->getIoService(); + io_service.post([this]() { scheduleNextInterests(); }); + } + } + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } + } + + if (!socket_->virtual_download_) { + receive_buffer_.emplace( + std::make_pair(segment, std::move(manifest->getPacket()))); + reassemble(); + } else { + if (segment >= final_block_number_) { + stop(); + } + } + } +} + +bool VegasTransportProtocol::verifyManifest( + const ContentObjectManifest &manifest) { + if (!socket_->verify_signature_) { + return true; + } + + bool is_data_secure = false; + + if (socket_->on_content_object_verification_ == VOID_HANDLER) { + is_data_secure = static_cast(socket_->verifier_.verify(manifest)); + } else if (socket_->on_content_object_verification_(*socket_, manifest)) { + is_data_secure = true; + } + + if (TRANSPORT_EXPECT_FALSE(!is_data_secure)) { + TRANSPORT_LOGE("Verification failed for %s\n", + manifest.getName().toString().c_str()); + } + + return is_data_secure; +} + +// TODO Add the name in the digest computation! +void VegasTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + + std::chrono::microseconds rtt; + Time now = std::chrono::steady_clock::now(); + std::chrono::steady_clock::duration duration = + now - interest_timepoints_[incremental_suffix & mask_]; + rtt = std::chrono::duration_cast(duration); + + average_rtt_ = (0.7 * average_rtt_) + (0.3 * (double)rtt.count()); + + if (socket_->on_timer_expires_ != VOID_HANDLER) { + auto dt = std::chrono::duration_cast(now - socket_->t0_); + if (dt.count() > socket_->timer_interval_milliseconds_) { + socket_->on_timer_expires_(*socket_, byte_count_, dt, + current_window_size_, retx_count_, + std::round(average_rtt_)); + socket_->t0_ = std::chrono::steady_clock::now(); + } + } + + interests_in_flight_--; + + if (TRANSPORT_EXPECT_FALSE(!is_running_ || incremental_suffix == ~0_U64 || + receive_buffer_.find(incremental_suffix) != + receive_buffer_.end())) { + return; + } + + changeInterestLifetime(incremental_suffix); + + if (socket_->on_content_object_input_ != VOID_HANDLER) { + socket_->on_content_object_input_(*socket_, *content_object); + } + + if (socket_->on_interest_satisfied_ != VOID_HANDLER) { + socket_->on_interest_satisfied_(*socket_, *interest); + } + + if (!interest_retransmissions_[incremental_suffix & mask_]) { + afterContentReception(*interest, *content_object); + } + + if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == + PayloadType::MANIFEST)) { + // TODO Fix manifest!! + auto manifest = + std::make_unique(std::move(content_object)); + + if (verifier_thread_ && incremental_suffix != 0) { + // verifier_thread_->add(std::bind(&VegasTransportProtocol::onManifest, + // this, std::move(manifest))); + } else { + onManifest(std::move(manifest)); + } + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + if (verifier_thread_) { + // verifier_thread_->add(std::bind(&VegasTransportProtocol::onContentSegment, + // this, std::move(content_object))); + } else { + onContentSegment(std::move(interest), std::move(content_object)); + } + } + + scheduleNextInterests(); +} + +bool VegasTransportProtocol::verifyContentObject( + const ContentObject &content_object) { + if (!dynamic_cast(socket_)->verify_signature_) { + return true; + } + + uint64_t segment = content_object.getName().getSuffix(); + + bool ret = false; + + if (download_with_manifest_) { + auto it = suffix_hash_map_.find(segment); + if (it != suffix_hash_map_.end()) { + auto hash_type = static_cast(it->second.second); + auto data_packet_digest = content_object.computeDigest(it->second.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest().data(); + std::vector &manifest_digest_bytes = it->second.first; + + if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, + manifest_digest_bytes.data(), + hash_type)) { + suffix_hash_map_.erase(it); + ret = true; + } else { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + } + } else { + ret = static_cast( + dynamic_cast(socket_)->verifier_.verify( + content_object)); + + if (!ret) { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + } + + return ret; + ; +} + +void VegasTransportProtocol::onTimeout(Interest::Ptr &&interest) { + TRANSPORT_LOGW("Timeout on %s", interest->getName().toString().c_str()); + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_--; + + uint64_t segment = interest->getName().getSuffix(); + + // Do not retransmit interests asking contents that do not exist. + if (is_final_block_number_discovered_) { + if (segment > final_block_number_) { + return; + } + } + + if (socket_->on_interest_timeout_ != VOID_HANDLER) { + socket_->on_interest_timeout_(*socket_, *interest); + } + + afterDataUnsatisfied(segment); + + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask_] < + socket_->max_retransmissions_)) { + retx_count_++; + + if (socket_->on_interest_retransmission_ != VOID_HANDLER) { + socket_->on_interest_retransmission_(*socket_, *interest); + } + + if (socket_->on_interest_output_ != VOID_HANDLER) { + socket_->on_interest_output_(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + // retransmit + interests_in_flight_++; + interest_retransmissions_[segment & mask_]++; + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); + } else { + TRANSPORT_LOGE("Stop: reached max retx limit."); + partialDownload(); + stop(); + } +} + +void VegasTransportProtocol::copyContent(const ContentObject &content_object) { + Array a = content_object.getPayload(); + + content_buffer_->insert(content_buffer_->end(), (uint8_t *)a.data(), + (uint8_t *)a.data() + a.length()); + + bool download_completed = + is_final_block_number_discovered_ && + content_object.getName().getSuffix() == final_block_number_; + + if (TRANSPORT_EXPECT_FALSE(download_completed || !is_running_)) { + // asio::io_service& io_service = portal_->getIoService(); + // io_service.post([this] () { + returnContentToUser(); + // }); + } +} + +void VegasTransportProtocol::reassemble() { + uint64_t index = last_reassembled_segment_; + auto it = receive_buffer_.find(index); + + do { + if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { + copyContent(*it->second); + receive_buffer_.erase(it); + } + + index = ++last_reassembled_segment_; + it = receive_buffer_.find(index); + } while (it != receive_buffer_.end()); +} + +void VegasTransportProtocol::partialDownload() { + if (!socket_->virtual_download_) { + reassemble(); + } + + if (socket_->on_payload_retrieved_ != VOID_HANDLER) { + socket_->on_payload_retrieved_( + *socket_, byte_count_, + std::make_error_code(std::errc(std::errc::io_error))); + } +} + +// TODO Check vegas protocol +// void VegasTransportProtocol::checkForFastRetransmission(const Interest +// &interest) { +// uint64_t segNumber = interest.getName().getSuffix(); +// received_segments_[segNumber] = true; +// fast_retransmitted_segments.erase(segNumber); + +// uint64_t possibly_lost_segment = 0; +// uint64_t highest_received_segment = received_segments_.rbegin()->first; + +// for (uint64_t i = 0; i <= highest_received_segment; i++) { +// if (received_segments_.find(i) == received_segments_.end()) { +// if (fast_retransmitted_segments.find(i) == +// fast_retransmitted_segments.end()) { +// possibly_lost_segment = i; +// uint8_t out_of_order_segments = 0; +// for (uint64_t j = i; j <= highest_received_segment; j++) { +// if (received_segments_.find(j) != received_segments_.end()) { +// out_of_order_segments++; +// if (out_of_order_segments >= +// default_values::max_out_of_order_segments) { +// fast_retransmitted_segments[possibly_lost_segment] = true; +// fastRetransmit(interest, possibly_lost_segment); +// } +// } +// } +// } +// } +// } +// } + +// void VegasTransportProtocol::fastRetransmit(const Interest &interest, +// uint32_t chunk_number) { +// if (interest_retransmissions_[chunk_number & mask_] < +// socket_->max_retransmissions_) { +// Name name = interest.getName(); +// name.setSuffix(chunk_number); + +// std::shared_ptr retx_interest = +// std::make_shared(name); + +// if (socket_->on_interest_retransmission_ != VOID_HANDLER) { +// socket_->on_interest_retransmission_(*socket_, *retx_interest); +// } + +// if (socket_->on_interest_output_ != VOID_HANDLER) { +// socket_->on_interest_output_(*socket_, *retx_interest); +// } + +// if (!is_running_) { +// return; +// } + +// interests_in_flight_++; +// interest_retransmissions_[chunk_number & mask_]++; + +// using namespace std::placeholders; +// portal_->sendInterest(std::move(retx_interest)); +// } +// } + +void VegasTransportProtocol::removeAllPendingInterests() { portal_->clear(); } + +} // end namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/vegas.h b/libtransport/src/hicn/transport/protocols/vegas.h new file mode 100755 index 000000000..7791ffc94 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/vegas.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace transport { + +namespace protocol { + +typedef utils::CircularFifo SuffixQueue; +typedef std::chrono::time_point Time; +typedef std::chrono::milliseconds TimeDuration; + +class VegasTransportProtocol : public TransportProtocol { + public: + VegasTransportProtocol(interface::BaseSocket *icnet_socket); + + virtual ~VegasTransportProtocol(); + + virtual void start(utils::SharableVector &content_buffer) override; + + void stop() override; + + void resume() override; + + protected: + void reset(); + + void sendInterest(std::uint64_t next_suffix); + + void onContentSegment(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object); + + bool verifyContentObject(const ContentObject &content_object); + + bool verifyManifest(const interface::ContentObjectManifest &manifest); + + virtual void onTimeout(Interest::Ptr &&interest) override; + + void onManifest(std::unique_ptr &&manifest); + + void onContentObject(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override; + + virtual void changeInterestLifetime(uint64_t segment); + + void scheduleNextInterests(); + + virtual void decreaseWindow(); + + virtual void increaseWindow(); + + virtual void afterContentReception(const Interest &interest, + const ContentObject &content_object); + + virtual void afterDataUnsatisfied(uint64_t segment); + + void reassemble(); + + void returnContentToUser(); + + void partialDownload(); + + virtual void copyContent(const ContentObject &content_object); + + // virtual void checkForFastRetransmission(const Interest &interest); + + // void fastRetransmit(const Interest &interest, uint32_t chunk_number); + + void removeAllPendingInterests(); + + protected: + void handleTimeout(const std::error_code &ec); + + // reassembly variables + volatile bool is_final_block_number_discovered_; + std::atomic final_block_number_; + uint64_t last_reassembled_segment_; + std::shared_ptr> content_buffer_; + size_t content_buffer_size_; + + // transmission variablesis_final_block_number_discovered_ + double current_window_size_; + double pending_window_size_; + uint64_t interests_in_flight_; + uint64_t next_suffix_; + std::vector interest_retransmissions_; + std::vector interest_timepoints_; + RtoEstimator rtt_estimator_; + + uint32_t retx_count_; + + // buffers + std::unordered_map + receive_buffer_; // verified segments by segment number + std::unordered_map + unverified_segments_; // used with embedded manifests + std::unordered_map + verified_manifests_; // by segment number + + std::uint16_t interest_pool_index_; + std::uint16_t mask_; + + // suffix randomization: since the suffixes in the manifests could not be in a + // sequential order, we need to map those suffixes into an ordered sequence. + std::unordered_map + incremental_suffix_to_real_suffix_map_; + std::unordered_map + real_suffix_to_incremental_suffix_map_; + std::uint32_t incremental_suffix_index_; + + // verification + std::unordered_map, HashAlgorithm>> + suffix_hash_map_; + + // Fast Retransmission + std::map received_segments_; + std::unordered_map fast_retransmitted_segments; + + // Suffix queue + volatile bool suffix_queue_completed_; + SuffixQueue suffix_queue_; + + volatile bool download_with_manifest_; + uint32_t next_manifest_; + std::atomic next_manifest_interval_; + + std::unique_ptr verifier_thread_; + + uint32_t interest_tx_; + uint32_t interest_count_; + + uint64_t byte_count_; + double average_rtt_; + + std::unordered_map sign_time_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc new file mode 100755 index 000000000..f5f797bbe --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +namespace transport { + +namespace protocol { + +using namespace interface; + +RtoEstimator::RtoEstimator(Duration min_rto) + : smoothed_rtt_(RtoEstimator::getInitialRtt().count()), + rtt_variation_(0), + first_measurement_(true), + last_rto_(min_rto.count()) {} + +void RtoEstimator::addMeasurement(Duration rtt) { + double duration = static_cast(rtt.count()); + if (first_measurement_) { + smoothed_rtt_ = duration; + rtt_variation_ = duration / 2; + first_measurement_ = false; + } else { + rtt_variation_ = (1 - default_values::beta) * rtt_variation_ + + default_values::beta * std::abs(smoothed_rtt_ - duration); + smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ + + default_values::alpha * duration; + } +} + +RtoEstimator::Duration RtoEstimator::computeRto() const { + double rto = smoothed_rtt_ + + std::max(double(default_values::clock_granularity.count()), + default_values::k* rtt_variation_); + return Duration(static_cast(rto)); +} + +} // end namespace protocol + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h new file mode 100755 index 000000000..e84afc49c --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +// Implementation inspired from RFC6298 +// (https://tools.ietf.org/search/rfc6298#ref-JK88) + +namespace transport { + +namespace protocol { + +class RtoEstimator { + public: + typedef std::chrono::microseconds Duration; + + static Duration getInitialRtt() { return std::chrono::seconds(1); } + + RtoEstimator(Duration min_rto = std::chrono::seconds(1)); + + void addMeasurement(Duration measure); + + Duration computeRto() const; + + private: + double smoothed_rtt_; + double rtt_variation_; + bool first_measurement_; + double last_rto_; +}; + +} // end namespace protocol + +} // end namespace transport \ No newline at end of file -- cgit 1.2.3-korg