aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/CMakeLists.txt46
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/cbr.cc47
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/cbr.h48
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/consumer.conf21
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/download_observer.h32
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/protocol.cc45
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/protocol.h79
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/raaqm.cc416
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/raaqm.h94
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/raaqm_data_path.cc158
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/raaqm_data_path.h230
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rate_estimation.cc353
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rate_estimation.h175
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rtc.cc813
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rtc.h210
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rtc_data_path.cc85
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/rtc_data_path.h62
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/test/CMakeLists.txt10
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/test/test_transport_producer.cc80
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/vegas.cc630
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/vegas.h161
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc57
-rwxr-xr-xlibtransport/src/hicn/transport/protocols/vegas_rto_estimator.h48
23 files changed, 3900 insertions, 0 deletions
diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
new file mode 100755
index 000000000..1c3b76c24
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
@@ -0,0 +1,46 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/vegas.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/vegas.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+)
+
+set(TRANSPORT_CONFIG
+ ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc
new file mode 100755
index 000000000..3da4819c3
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/cbr.cc
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/cbr.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+CbrTransportProtocol::CbrTransportProtocol(BaseSocket *icnet_socket)
+ : VegasTransportProtocol(icnet_socket) {}
+
+void CbrTransportProtocol::start(
+ utils::SharableVector<uint8_t> &receive_buffer) {
+ current_window_size_ = socket_->current_window_size_;
+ VegasTransportProtocol::start(receive_buffer);
+}
+
+void CbrTransportProtocol::changeInterestLifetime(uint64_t segment) { return; }
+
+void CbrTransportProtocol::increaseWindow() {}
+
+void CbrTransportProtocol::decreaseWindow() {}
+
+void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {}
+
+void CbrTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h
new file mode 100755
index 000000000..0a572292a
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/cbr.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/raaqm_data_path.h>
+#include <hicn/transport/protocols/rate_estimation.h>
+#include <hicn/transport/protocols/vegas.h>
+#include <hicn/transport/protocols/vegas_rto_estimator.h>
+
+namespace transport {
+
+namespace protocol {
+
+class CbrTransportProtocol : public VegasTransportProtocol {
+ public:
+ CbrTransportProtocol(interface::BaseSocket *icnet_socket);
+
+ void start(utils::SharableVector<uint8_t> &receive_buffer) override;
+
+ private:
+ void afterContentReception(const Interest &interest,
+ const ContentObject &content_object) override;
+
+ void afterDataUnsatisfied(uint64_t segment) override;
+
+ void increaseWindow() override;
+
+ void decreaseWindow() override;
+
+ void changeInterestLifetime(uint64_t segment) override;
+};
+
+} // end namespace protocol
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/consumer.conf b/libtransport/src/hicn/transport/protocols/consumer.conf
new file mode 100755
index 000000000..1a366f32f
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/consumer.conf
@@ -0,0 +1,21 @@
+; this file contais the parameters for RAAQM
+autotune = no
+lifetime = 500
+retransmissions = 128
+beta = 0.99
+drop = 0.003
+beta_wifi_ = 0.99
+drop_wifi_ = 0.6
+beta_lte_ = 0.99
+drop_lte_ = 0.003
+wifi_delay_ = 200
+lte_delay_ = 9000
+
+alpha = 0.95
+batching_parameter = 200
+
+;Choice of rate estimator:
+;0 --> an estimation each $(batching_parameter) packets
+;1 --> an estimation "a la TCP", estimation at the end of the download of the segment
+
+rate_estimator = 0
diff --git a/libtransport/src/hicn/transport/protocols/download_observer.h b/libtransport/src/hicn/transport/protocols/download_observer.h
new file mode 100755
index 000000000..6d24fe6fd
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/download_observer.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace transport {
+
+namespace protocol {
+
+class IcnObserver {
+ public:
+ virtual ~IcnObserver(){};
+
+ virtual void notifyStats(double throughput) = 0;
+ virtual void notifyDownloadTime(double downloadTime) = 0;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
new file mode 100755
index 000000000..ea4fd6dbf
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+TransportProtocol::TransportProtocol(interface::BaseSocket *icn_socket)
+ : socket_(dynamic_cast<interface::ConsumerSocket *>(icn_socket)),
+ is_running_(false),
+ interest_pool_() {
+ // Create pool of interests
+ increasePoolSize();
+}
+
+TransportProtocol::~TransportProtocol() {}
+
+void TransportProtocol::updatePortal() { portal_ = socket_->portal_; }
+
+bool TransportProtocol::isRunning() { return is_running_; }
+
+void TransportProtocol::increasePoolSize(std::size_t size) {
+ for (std::size_t i = 0; i < size; i++) {
+ interest_pool_.add(new Interest());
+ }
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
new file mode 100755
index 000000000..56c57e025
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class TransportProtocolCallback {
+ virtual void onContentObject(const core::Interest &interest,
+ const core::ContentObject &content_object) = 0;
+ virtual void onTimeout(const core::Interest &interest) = 0;
+};
+
+class TransportProtocol : public interface::BasePortal::ConsumerCallback {
+ static constexpr std::size_t interest_pool_size = 4096;
+
+ public:
+ TransportProtocol(interface::BaseSocket *icn_socket);
+
+ virtual ~TransportProtocol();
+
+ void updatePortal();
+
+ bool isRunning();
+
+ virtual void start(utils::SharableVector<uint8_t> &content_buffer) = 0;
+
+ virtual void stop() = 0;
+
+ virtual void resume() = 0;
+
+ protected:
+ virtual void increasePoolSize(std::size_t size = interest_pool_size);
+
+ TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() {
+ auto result = interest_pool_.get();
+
+ while (TRANSPORT_EXPECT_FALSE(!result.first)) {
+ // Add packets to the pool
+ increasePoolSize();
+ result = interest_pool_.get();
+ }
+
+ return std::move(result.second);
+ }
+ // Consumer Callback
+ virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0;
+ virtual void onTimeout(Interest::Ptr &&i) = 0;
+
+ protected:
+ interface::ConsumerSocket *socket_;
+ std::shared_ptr<interface::BasePortal> portal_;
+ volatile bool is_running_;
+ utils::ObjectPool<Interest> interest_pool_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
new file mode 100755
index 000000000..cd22ecfdc
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -0,0 +1,416 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/raaqm.h>
+
+#include <fstream>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+RaaqmTransportProtocol::RaaqmTransportProtocol(BaseSocket *icnet_socket)
+ : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) {
+ init();
+}
+
+RaaqmTransportProtocol::~RaaqmTransportProtocol() {
+ if (this->rate_estimator_) {
+ delete this->rate_estimator_;
+ }
+}
+
+void RaaqmTransportProtocol::init() {
+ std::ifstream is(RAAQM_CONFIG_PATH);
+
+ std::string line;
+
+ socket_->beta_ = default_values::beta_value;
+ socket_->drop_factor_ = default_values::drop_factor;
+ socket_->interest_lifetime_ = default_values::interest_lifetime;
+ socket_->max_retransmissions_ =
+ default_values::transport_protocol_max_retransmissions;
+ raaqm_autotune_ = false;
+ default_beta_ = default_values::beta_value;
+ default_drop_ = default_values::drop_factor;
+ beta_wifi_ = default_values::beta_value;
+ drop_wifi_ = default_values::drop_factor;
+ beta_lte_ = default_values::beta_value;
+ drop_lte_ = default_values::drop_factor;
+ wifi_delay_ = 1000;
+ lte_delay_ = 15000;
+
+ if (!is) {
+ TRANSPORT_LOGW("WARNING: RAAQM parameters not found, set default values");
+ return;
+ }
+
+ while (getline(is, line)) {
+ std::string command;
+ std::istringstream line_s(line);
+
+ line_s >> command;
+
+ if (command == ";") {
+ continue;
+ }
+
+ if (command == "autotune") {
+ std::string tmp;
+ std::string val;
+ line_s >> tmp >> val;
+ if (val == "yes") {
+ raaqm_autotune_ = true;
+ } else {
+ raaqm_autotune_ = false;
+ }
+ continue;
+ }
+
+ if (command == "lifetime") {
+ std::string tmp;
+ uint32_t lifetime;
+ line_s >> tmp >> lifetime;
+ socket_->interest_lifetime_ = lifetime;
+ continue;
+ }
+
+ if (command == "retransmissions") {
+ std::string tmp;
+ uint32_t rtx;
+ line_s >> tmp >> rtx;
+ socket_->max_retransmissions_ = rtx;
+ continue;
+ }
+
+ if (command == "beta") {
+ std::string tmp;
+ line_s >> tmp >> default_beta_;
+ socket_->beta_ = default_beta_;
+ continue;
+ }
+
+ if (command == "drop") {
+ std::string tmp;
+ line_s >> tmp >> default_drop_;
+ socket_->drop_factor_ = default_drop_;
+ continue;
+ }
+
+ if (command == "beta_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> beta_wifi_;
+ continue;
+ }
+
+ if (command == "drop_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> drop_wifi_;
+ continue;
+ }
+
+ if (command == "beta_lte_") {
+ std::string tmp;
+ line_s >> tmp >> beta_lte_;
+ continue;
+ }
+
+ if (command == "drop_lte_") {
+ std::string tmp;
+ line_s >> tmp >> drop_lte_;
+ continue;
+ }
+
+ if (command == "wifi_delay_") {
+ std::string tmp;
+ line_s >> tmp >> wifi_delay_;
+ continue;
+ }
+
+ if (command == "lte_delay_") {
+ std::string tmp;
+ line_s >> tmp >> lte_delay_;
+ continue;
+ }
+ if (command == "alpha") {
+ std::string tmp;
+ double rate_alpha = 0.0;
+ line_s >> tmp >> rate_alpha;
+ socket_->rate_estimation_alpha_ = rate_alpha;
+ continue;
+ }
+
+ if (command == "batching_parameter") {
+ std::string tmp;
+ uint32_t batching_param = 0;
+ line_s >> tmp >> batching_param;
+ socket_->rate_estimation_batching_parameter_ = batching_param;
+ continue;
+ }
+
+ if (command == "rate_estimator") {
+ std::string tmp;
+ uint32_t choice_param = 0;
+ line_s >> tmp >> choice_param;
+ socket_->rate_estimation_choice_ = choice_param;
+ continue;
+ }
+ }
+ is.close();
+}
+
+void RaaqmTransportProtocol::start(
+ utils::SharableVector<uint8_t> &content_buffer) {
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+ double drop_factor;
+ double minimum_drop_probability;
+ uint32_t sample_number;
+ uint32_t interest_lifetime;
+ // double beta;
+
+ drop_factor = socket_->drop_factor_;
+ minimum_drop_probability = socket_->minimum_drop_probability_;
+ sample_number = socket_->sample_number_;
+ interest_lifetime = socket_->interest_lifetime_;
+ // beta = socket_->beta_;
+
+ double alpha = 0.0;
+ uint32_t batching_param = 0;
+ uint32_t choice_param = 0;
+ alpha = socket_->rate_estimation_alpha_;
+ batching_param = socket_->rate_estimation_batching_parameter_;
+ choice_param = socket_->rate_estimation_choice_;
+
+ if (choice_param == 1) {
+ this->rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ this->rate_estimator_->observer_ = socket_->rate_estimation_observer_;
+
+ cur_path_ = std::make_shared<RaaqmDataPath>(
+ drop_factor, minimum_drop_probability, interest_lifetime * 1000,
+ sample_number);
+ path_table_[default_values::path_id] = cur_path_;
+ }
+
+ VegasTransportProtocol::start(content_buffer);
+}
+
+void RaaqmTransportProtocol::copyContent(const ContentObject &content_object) {
+ if (TRANSPORT_EXPECT_FALSE(
+ (content_object.getName().getSuffix() == final_block_number_) ||
+ !(is_running_))) {
+ this->rate_estimator_->onDownloadFinished();
+ }
+ VegasTransportProtocol::copyContent(content_object);
+}
+
+void RaaqmTransportProtocol::updatePathTable(
+ const ContentObject &content_object) {
+ uint32_t path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (cur_path_) {
+ // Create a new path with some default param
+ if (path_table_.empty()) {
+ throw errors::RuntimeException(
+ "No path initialized for path table, error could be in default "
+ "path initialization.");
+ } else {
+ // Initiate the new path default param
+ std::shared_ptr<RaaqmDataPath> new_path =
+ std::make_shared<RaaqmDataPath>(
+ *(path_table_.at(default_values::path_id)));
+ // Insert the new path into hash table
+ path_table_[path_id] = new_path;
+ }
+ } else {
+ throw errors::RuntimeException(
+ "UNEXPECTED ERROR: when running,current path not found.");
+ }
+ }
+
+ cur_path_ = path_table_[path_id];
+
+ size_t header_size = content_object.headerSize();
+ size_t data_size = content_object.payloadSize();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(header_size + data_size, data_size);
+}
+
+void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
+ if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
+ throw std::runtime_error("ERROR: no current path found, exit");
+ } else {
+ std::chrono::microseconds rtt;
+
+ std::chrono::steady_clock::duration duration =
+ std::chrono::steady_clock::now() -
+ interest_timepoints_[segment & mask_];
+ rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onRttUpdate(rtt.count());
+ }
+ cur_path_->insertNewRtt(rtt.count());
+ cur_path_->smoothTimer();
+
+ if (cur_path_->newPropagationDelayAvailable()) {
+ check_drop_probability();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ return;
+}
+
+void RaaqmTransportProtocol::check_drop_probability() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ unsigned int max_pd = 0;
+ std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->getPropagationDelay() > max_pd &&
+ it->second->getPropagationDelay() != UINT_MAX &&
+ !it->second->isStale()) {
+ max_pd = it->second->getPropagationDelay();
+ }
+ }
+
+ double drop_prob = 0;
+ double beta = 0;
+ if (max_pd < wifi_delay_) { // only ethernet paths
+ drop_prob = default_drop_;
+ beta = default_beta_;
+ } else if (max_pd < lte_delay_) { // at least one wifi path
+ drop_prob = drop_wifi_;
+ beta = beta_wifi_;
+ } else { // at least one lte path
+ drop_prob = drop_lte_;
+ beta = beta_lte_;
+ }
+
+ double old_drop_prob = 0;
+ double old_beta = 0;
+ old_beta = socket_->beta_;
+ old_drop_prob = socket_->drop_factor_;
+
+ if (drop_prob == old_drop_prob && beta == old_beta) {
+ return;
+ }
+
+ socket_->beta_ = beta;
+ socket_->drop_factor_ = drop_prob;
+
+ for (it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->setDropProb(drop_prob);
+ }
+}
+
+void RaaqmTransportProtocol::check_for_stale_paths() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ bool stale = false;
+ std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->isStale()) {
+ stale = true;
+ break;
+ }
+ }
+ if (stale) {
+ check_drop_probability();
+ }
+}
+
+void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ check_for_stale_paths();
+ VegasTransportProtocol::onTimeout(std::move(interest));
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ double max_window_size = socket_->max_window_size_;
+ if (current_window_size_ < max_window_size) {
+ double gamma = socket_->gamma_;
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->current_window_size_ = current_window_size_;
+ }
+ this->rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ double min_window_size = socket_->min_window_size_;
+ if (current_window_size_ > min_window_size) {
+ double beta = socket_->beta_;
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->current_window_size_ = current_window_size_;
+ }
+ this->rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ throw errors::RuntimeException("ERROR: no current path found, exit");
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ if (rand() % 10000 <= cur_path_->getDropProb() * 10000) {
+ decreaseWindow();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().getSuffix());
+ this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ content_object.headerSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
new file mode 100755
index 000000000..6ca410251
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/raaqm.h
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/raaqm_data_path.h>
+#include <hicn/transport/protocols/rate_estimation.h>
+#include <hicn/transport/protocols/vegas.h>
+#include <hicn/transport/protocols/vegas_rto_estimator.h>
+
+namespace transport {
+
+namespace protocol {
+
+class RaaqmTransportProtocol : public VegasTransportProtocol {
+ public:
+ RaaqmTransportProtocol(interface::BaseSocket *icnet_socket);
+
+ ~RaaqmTransportProtocol();
+
+ void start(utils::SharableVector<uint8_t> &content_buffer) override;
+
+ protected:
+ void copyContent(const ContentObject &content_object) override;
+
+ private:
+ void init();
+
+ void afterContentReception(const Interest &interest,
+ const ContentObject &content_object) override;
+
+ void afterDataUnsatisfied(uint64_t segment) override;
+
+ void increaseWindow() override;
+
+ void updateRtt(uint64_t segment);
+
+ void decreaseWindow() override;
+
+ void changeInterestLifetime(uint64_t segment) override;
+
+ void onTimeout(Interest::Ptr &&interest) override;
+
+ void RAAQM();
+
+ void updatePathTable(const ContentObject &content_object);
+
+ void check_drop_probability();
+
+ void check_for_stale_paths();
+
+ void printRtt();
+
+ /**
+ * Current download path
+ */
+ std::shared_ptr<RaaqmDataPath> cur_path_;
+
+ /**
+ * Hash table for path: each entry is a pair path ID(key) - path object
+ */
+ std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>> path_table_;
+
+ bool set_interest_filter_;
+ // for rate-estimation at packet level
+ IcnRateEstimator *rate_estimator_;
+
+ // params for autotuning
+ bool raaqm_autotune_;
+ double default_beta_;
+ double default_drop_;
+ double beta_wifi_;
+ double drop_wifi_;
+ double beta_lte_;
+ double drop_lte_;
+ unsigned int wifi_delay_;
+ unsigned int lte_delay_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
new file mode 100755
index 000000000..f876cf4f8
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/raaqm_data_path.h>
+
+namespace transport {
+
+namespace protocol {
+
+RaaqmDataPath::RaaqmDataPath(double drop_factor,
+ double minimum_drop_probability,
+ unsigned new_timer, unsigned int samples,
+ uint64_t new_rtt, uint64_t new_rtt_min,
+ uint64_t new_rtt_max, unsigned new_pd)
+
+ : drop_factor_(drop_factor),
+ minimum_drop_probability_(minimum_drop_probability),
+ timer_(new_timer),
+ samples_(samples),
+ rtt_(new_rtt),
+ rtt_min_(new_rtt_min),
+ rtt_max_(new_rtt_max),
+ prop_delay_(new_pd),
+ new_prop_delay_(false),
+ drop_prob_(0),
+ packets_received_(0),
+ last_packets_received_(0),
+ m_packets_bytes_received_(0),
+ last_packets_bytes_received_(0),
+ raw_data_bytes_received_(0),
+ last_raw_data_bytes_received_(0),
+ rtt_samples_(samples_),
+ average_rtt_(0),
+ alpha_(ALPHA) {
+ gettimeofday(&m_last_received_pkt_, 0);
+}
+
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+ rtt_ = new_rtt;
+ rtt_samples_.pushBack(new_rtt);
+
+ rtt_max_ = rtt_samples_.rBegin();
+ rtt_min_ = rtt_samples_.begin();
+
+ if (rtt_min_ < prop_delay_) {
+ new_prop_delay_ = true;
+ prop_delay_ = rtt_min_;
+ }
+
+ gettimeofday(&m_last_received_pkt_, 0);
+
+ return *this;
+}
+
+RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size) {
+ packets_received_++;
+ m_packets_bytes_received_ += packet_size;
+ raw_data_bytes_received_ += data_size;
+
+ return *this;
+}
+
+double RaaqmDataPath::getDropFactor() { return drop_factor_; }
+
+double RaaqmDataPath::getDropProb() { return drop_prob_; }
+
+RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) {
+ drop_prob_ = dropProb;
+
+ return *this;
+}
+
+double RaaqmDataPath::getMinimumDropProbability() {
+ return minimum_drop_probability_;
+}
+
+double RaaqmDataPath::getTimer() { return timer_; }
+
+RaaqmDataPath &RaaqmDataPath::smoothTimer() {
+ timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ +
+ (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO);
+
+ return *this;
+}
+
+double RaaqmDataPath::getRtt() { return rtt_; }
+
+double RaaqmDataPath::getAverageRtt() { return average_rtt_; }
+
+double RaaqmDataPath::getRttMax() { return rtt_max_; }
+
+double RaaqmDataPath::getRttMin() { return rtt_min_; }
+
+unsigned RaaqmDataPath::getSampleValue() { return samples_; }
+
+unsigned RaaqmDataPath::getRttQueueSize() {
+ return static_cast<unsigned>(rtt_samples_.size());
+}
+
+RaaqmDataPath &RaaqmDataPath::updateDropProb() {
+ drop_prob_ = 0.0;
+
+ if (getSampleValue() == getRttQueueSize()) {
+ if (rtt_max_ == rtt_min_) {
+ drop_prob_ = minimum_drop_probability_;
+ } else {
+ drop_prob_ = minimum_drop_probability_ +
+ drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_);
+ }
+ }
+
+ return *this;
+}
+
+double RaaqmDataPath::getMicroSeconds(struct timeval &time) {
+ return (double)(time.tv_sec) * 1000000 + (double)(time.tv_usec);
+}
+
+void RaaqmDataPath::setAlpha(double alpha) {
+ if (alpha >= 0 && alpha <= 1) {
+ alpha_ = alpha;
+ }
+}
+
+bool RaaqmDataPath::newPropagationDelayAvailable() {
+ bool r = new_prop_delay_;
+ new_prop_delay_ = false;
+ return r;
+}
+
+unsigned int RaaqmDataPath::getPropagationDelay() { return prop_delay_; }
+
+bool RaaqmDataPath::isStale() {
+ struct timeval now;
+ gettimeofday(&now, 0);
+ double time = getMicroSeconds(now) - getMicroSeconds(m_last_received_pkt_);
+ if (time > 2000000) {
+ return true;
+ }
+ return false;
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
new file mode 100755
index 000000000..6f63940c9
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/min_filter.h>
+
+#include <sys/time.h>
+#include <climits>
+#include <iostream>
+
+#define TIMEOUT_SMOOTHER 0.1
+#define TIMEOUT_RATIO 10
+#define ALPHA 0.8
+
+namespace transport {
+
+namespace protocol {
+
+class RaaqmDataPath {
+ public:
+ RaaqmDataPath(double drop_factor, double minimum_drop_probability,
+ unsigned new_timer, unsigned int samples,
+ uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000,
+ uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX);
+
+ public:
+ /*
+ * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is
+ * full, and thus need overwrite. Also it maintains the validity of min and
+ * max of RTT.
+ * @param new_rtt is the value of the new RTT
+ */
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+
+ /**
+ * @brief Update the path statistics
+ * @param packet_size the size of the packet received, including the ICN
+ * header
+ * @param data_size the size of the data received, without the ICN header
+ */
+ RaaqmDataPath &updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size);
+
+ /**
+ * @brief Get the value of the drop factor parameter
+ */
+ double getDropFactor();
+
+ /**
+ * @brief Get the value of the drop probability
+ */
+ double getDropProb();
+
+ /**
+ * @brief Set the value pf the drop probability
+ * @param drop_prob is the value of the drop probability
+ */
+ RaaqmDataPath &setDropProb(double drop_prob);
+
+ /**
+ * @brief Get the minimum drop probability
+ */
+ double getMinimumDropProbability();
+
+ /**
+ * @brief Get last RTT
+ */
+ double getRtt();
+
+ /**
+ * @brief Get average RTT
+ */
+ double getAverageRtt();
+
+ /**
+ * @brief Get the current m_timer value
+ */
+ double getTimer();
+
+ /**
+ * @brief Smooth he value of the m_timer accordingly with the last RTT
+ * measured
+ */
+ RaaqmDataPath &smoothTimer();
+
+ /**
+ * @brief Get the maximum RTT among the last samples
+ */
+ double getRttMax();
+
+ /**
+ * @brief Get the minimum RTT among the last samples
+ */
+ double getRttMin();
+
+ /**
+ * @brief Get the number of saved samples
+ */
+ unsigned getSampleValue();
+
+ /**
+ * @brief Get the size og the RTT queue
+ */
+ unsigned getRttQueueSize();
+
+ /*
+ * @brief Change drop probability according to RTT statistics
+ * Invoked in RAAQM(), before control window size update.
+ */
+ RaaqmDataPath &updateDropProb();
+
+ /**
+ * @brief This function convert the time from struct timeval to its value in
+ * microseconds
+ */
+ static double getMicroSeconds(struct timeval &time);
+
+ void setAlpha(double alpha);
+
+ /**
+ * @brief Returns the smallest RTT registered so far for this path
+ */
+
+ unsigned int getPropagationDelay();
+
+ bool newPropagationDelayAvailable();
+
+ bool isStale();
+
+ private:
+ /**
+ * The value of the drop factor
+ */
+ double drop_factor_;
+
+ /**
+ * The minumum drop probability
+ */
+ double minimum_drop_probability_;
+
+ /**
+ * The timer, expressed in milliseconds
+ */
+ double timer_;
+
+ /**
+ * The number of samples to store for computing the protocol measurements
+ */
+ const unsigned int samples_;
+
+ /**
+ * The last, the minimum and the maximum value of the RTT (among the last
+ * m_samples samples)
+ */
+ uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_;
+
+ bool new_prop_delay_;
+
+ /**
+ * The current drop probability
+ */
+ double drop_prob_;
+
+ /**
+ * The number of packets received in this path
+ */
+ intmax_t packets_received_;
+
+ /**
+ * The first packet received after the statistics print
+ */
+ intmax_t last_packets_received_;
+
+ /**
+ * Total number of bytes received including the ICN header
+ */
+ intmax_t m_packets_bytes_received_;
+
+ /**
+ * The amount of packet bytes received at the last path summary computation
+ */
+ intmax_t last_packets_bytes_received_;
+
+ /**
+ * Total number of bytes received without including the ICN header
+ */
+ intmax_t raw_data_bytes_received_;
+
+ /**
+ * The amount of raw dat bytes received at the last path summary computation
+ */
+ intmax_t last_raw_data_bytes_received_;
+
+ class byArrival;
+
+ class byOrder;
+
+ /**
+ * Double ended queue for the RTTs
+ */
+
+ typedef utils::MinFilter<uint64_t> RTTQueue;
+
+ RTTQueue rtt_samples_;
+
+ /**
+ * Time of the last call to the path reporter method
+ */
+ struct timeval m_last_received_pkt_;
+
+ double average_rtt_;
+ double alpha_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
new file mode 100755
index 000000000..e313bf9f6
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
@@ -0,0 +1,353 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/rate_estimation.h>
+#include <hicn/transport/utils/log.h>
+
+namespace transport {
+
+namespace protocol {
+
+void *Timer(void *data) {
+ InterRttEstimator *estimator = (InterRttEstimator *)data;
+
+ double dat_rtt, my_avg_win, my_avg_rtt;
+ int my_win_change, number_of_packets, max_packet_size;
+
+ pthread_mutex_lock(&(estimator->mutex_));
+ dat_rtt = estimator->rtt_;
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ while (estimator->is_running_) {
+ usleep(KV * dat_rtt);
+
+ pthread_mutex_lock(&(estimator->mutex_));
+
+ dat_rtt = estimator->rtt_;
+ my_avg_win = estimator->avg_win_;
+ my_avg_rtt = estimator->avg_rtt_;
+ my_win_change = estimator->win_change_;
+ number_of_packets = estimator->number_of_packets_;
+ max_packet_size = estimator->max_packet_size_;
+ estimator->avg_rtt_ = estimator->rtt_;
+ estimator->avg_win_ = 0;
+ estimator->win_change_ = 0;
+ estimator->number_of_packets_ = 1;
+
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ if (number_of_packets == 0 || my_win_change == 0) {
+ continue;
+ }
+ if (estimator->estimation_ == 0) {
+ estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 /
+ (1.0 * my_win_change)) /
+ (my_avg_rtt / (1.0 * number_of_packets));
+ }
+
+ estimator->estimation_ =
+ estimator->alpha_ * estimator->estimation_ +
+ (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size *
+ 1000000.0 / (1.0 * my_win_change)) /
+ (my_avg_rtt / (1.0 * number_of_packets)));
+
+ if (estimator->observer_) {
+ estimator->observer_->notifyStats(estimator->estimation_);
+ }
+ }
+
+ return nullptr;
+}
+
+InterRttEstimator::InterRttEstimator(double alpha_arg) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->thread_is_running_ = false;
+ this->my_th_ = NULL;
+ this->is_running_ = true;
+ this->avg_rtt_ = 0.0;
+ this->estimation_ = 0.0;
+ this->avg_win_ = 0.0;
+ this->rtt_ = 0.0;
+ this->win_change_ = 0;
+ this->number_of_packets_ = 0;
+ this->max_packet_size_ = 0;
+ this->win_current_ = 1.0;
+
+ pthread_mutex_init(&(this->mutex_), NULL);
+ gettimeofday(&(this->start_time_), 0);
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+InterRttEstimator::~InterRttEstimator() {
+ this->is_running_ = false;
+ if (this->my_th_) {
+ pthread_join(*(this->my_th_), NULL);
+ }
+ this->my_th_ = NULL;
+ pthread_mutex_destroy(&(this->mutex_));
+}
+
+void InterRttEstimator::onRttUpdate(double rtt) {
+ pthread_mutex_lock(&(this->mutex_));
+ this->rtt_ = rtt;
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ if (!thread_is_running_) {
+ my_th_ = (pthread_t *)malloc(sizeof(pthread_t));
+ if (!my_th_) {
+ TRANSPORT_LOGE("Error allocating thread.");
+ my_th_ = NULL;
+ }
+ if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer,
+ (void *)this)) {
+ TRANSPORT_LOGE("Error creating the thread");
+ my_th_ = NULL;
+ }
+ thread_is_running_ = true;
+ }
+}
+
+void InterRttEstimator::onWindowIncrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+void InterRttEstimator::onWindowDecrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+ALaTcpEstimator::ALaTcpEstimator() {
+ this->estimation_ = 0.0;
+ this->observer_ = NULL;
+ gettimeofday(&(this->start_time_), 0);
+ this->totalSize_ = 0.0;
+}
+
+void ALaTcpEstimator::onStart() {
+ this->totalSize_ = 0.0;
+ gettimeofday(&(this->start_time_), 0);
+}
+
+void ALaTcpEstimator::onDownloadFinished() {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->start_time_);
+ this->estimation_ = this->totalSize_ * 8 * 1000000 / delay;
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+}
+
+void ALaTcpEstimator::onDataReceived(int packet_size) {
+ this->totalSize_ += packet_size;
+}
+
+SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) {
+ this->estimation_ = 0.0;
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->batching_param_ = batching_param;
+ this->total_size_ = 0.0;
+ this->number_of_packets_ = 0;
+ this->base_alpha_ = alphaArg;
+ this->alpha_ = alphaArg;
+ gettimeofday(&(this->start_time_), 0);
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+void SimpleEstimator::onStart() {
+ this->estimated_ = false;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_batch_), 0);
+ gettimeofday(&(this->start_time_), 0);
+}
+
+void SimpleEstimator::onDownloadFinished() {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->start_time_);
+ if (observer_) {
+ observer_->notifyDownloadTime(delay);
+ }
+ if (!this->estimated_) {
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
+ ((double)this->batching_param_));
+ } else {
+ if (this->number_of_packets_ >=
+ (int)(75.0 * (double)this->batching_param_ / 100.0)) {
+ delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
+ ((double)this->batching_param_));
+ }
+ }
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_batch_), 0);
+ gettimeofday(&(this->start_time_), 0);
+}
+
+void SimpleEstimator::onDataReceived(int packet_size) {
+ this->total_size_ += packet_size;
+}
+
+void SimpleEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+
+ if (number_of_packets_ == this->batching_param_) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_batch_), 0);
+ }
+}
+
+BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg,
+ int param) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->batching_param_ = param;
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ this->max_packet_size_ = 0;
+ this->estimation_ = 0.0;
+ this->win_current_ = 1.0;
+ gettimeofday(&(this->begin_batch_), 0);
+ gettimeofday(&(this->start_time_), 0);
+}
+
+void BatchingPacketsEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+
+ if (number_of_packets_ == this->batching_param_) {
+ if (estimation_ == 0) {
+ estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 /
+ (1.0 * win_change_)) /
+ (avg_rtt_ / (1.0 * number_of_packets_));
+ } else {
+ estimation_ = alpha_ * estimation_ +
+ (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ *
+ 1000000.0 / (1.0 * win_change_)) /
+ (avg_rtt_ / (1.0 * number_of_packets_)));
+ }
+
+ if (observer_) {
+ observer_->notifyStats(estimation_);
+ }
+
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ }
+}
+
+void BatchingPacketsEstimator::onWindowIncrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+void BatchingPacketsEstimator::onWindowDecrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) -
+ RaaqmDataPath::getMicroSeconds(this->begin_batch_);
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ gettimeofday(&(this->begin_batch_), 0);
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h
new file mode 100755
index 000000000..b889efe12
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rate_estimation.h
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <unistd.h>
+
+#include <hicn/transport/protocols/download_observer.h>
+#include <hicn/transport/protocols/raaqm_data_path.h>
+
+#define BATCH 50
+#define KV 20
+#define ALPHA 0.8
+#define RATE_CHOICE 0
+
+namespace transport {
+
+namespace protocol {
+
+class IcnRateEstimator {
+ public:
+ IcnRateEstimator(){};
+
+ virtual ~IcnRateEstimator(){};
+
+ virtual void onRttUpdate(double rtt){};
+
+ virtual void onDataReceived(int packetSize){};
+
+ virtual void onWindowIncrease(double winCurrent){};
+
+ virtual void onWindowDecrease(double winCurrent){};
+
+ virtual void onStart(){};
+
+ virtual void onDownloadFinished(){};
+
+ virtual void setObserver(IcnObserver *observer) {
+ this->observer_ = observer;
+ };
+ IcnObserver *observer_;
+ struct timeval start_time_;
+ struct timeval begin_batch_;
+ double base_alpha_;
+ double alpha_;
+ double estimation_;
+ int number_of_packets_;
+ // this boolean is to make sure at least one estimation of the BW is done
+ bool estimated_;
+};
+
+// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class InterRttEstimator : public IcnRateEstimator {
+ public:
+ InterRttEstimator(double alpha_arg);
+
+ ~InterRttEstimator();
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart(){};
+
+ void onDownloadFinished(){};
+
+ // private: should be done by using getters
+ pthread_t *my_th_;
+ bool thread_is_running_;
+ double rtt_;
+ bool is_running_;
+ pthread_mutex_t mutex_;
+ double avg_rtt_;
+ double avg_win_;
+ int max_packet_size_;
+ double win_change_;
+ double win_current_;
+};
+
+// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class BatchingPacketsEstimator : public IcnRateEstimator {
+ public:
+ BatchingPacketsEstimator(double alpha_arg, int batchingParam);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart(){};
+
+ void onDownloadFinished(){};
+
+ private:
+ int batching_param_;
+ double avg_rtt_;
+ double avg_win_;
+ double win_change_;
+ int max_packet_size_;
+ double win_current_;
+};
+
+// Segment Estimator
+
+class ALaTcpEstimator : public IcnRateEstimator {
+ public:
+ ALaTcpEstimator();
+
+ void onDataReceived(int packet_size);
+ void onStart();
+ void onDownloadFinished();
+
+ private:
+ double totalSize_;
+};
+
+// A Rate estimator, this one is the simplest: counting batching_param_ packets
+// and then divide the sum of the size of these packets by the time taken to DL
+// them. Should be the one used
+
+class SimpleEstimator : public IcnRateEstimator {
+ public:
+ SimpleEstimator(double alpha, int batching_param);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size);
+
+ void onWindowIncrease(double win_current){};
+
+ void onWindowDecrease(double win_current){};
+
+ void onStart();
+
+ void onDownloadFinished();
+
+ private:
+ int batching_param_;
+ double total_size_;
+};
+
+void *Timer(void *data);
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
new file mode 100755
index 000000000..1f42cf230
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -0,0 +1,813 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <math.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/rtc.h>
+
+/*
+ * TODO
+ * 2) start/constructor/rest variable implementation
+ * 3) interest retransmission: now I always recover, we should recover only if
+ * we have enough time 4) returnContentToUser: rememeber to remove the first
+ * 32bits from the payload
+ */
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(BaseSocket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ inflightInterests_(1 << default_values::log_2_default_buffer_size),
+ modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
+ icnet_socket->getSocketOption(PORTAL, portal_);
+ reset();
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {
+ if (is_running_) {
+ stop();
+ }
+}
+
+void RTCTransportProtocol::start(
+ utils::SharableVector<uint8_t> &content_buffer) {
+
+ if(is_running_)
+ return;
+
+ is_running_ = true;
+ content_buffer_ = content_buffer.shared_from_this();
+
+ reset();
+ scheduleNextInterest();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+void RTCTransportProtocol::stop() {
+ if(!is_running_)
+ return;
+
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void RTCTransportProtocol::resume(){
+ if(is_running_)
+ return;
+
+ is_running_ = true;
+
+ lastRoundBegin_ = std::chrono::steady_clock::now();
+ inflightInterestsCount_ = 0;
+ if(content_buffer_)
+ content_buffer_->clear();
+
+ scheduleNextInterest();
+
+ portal_->runEventsLoop();
+
+ is_running_ = false;
+}
+
+void RTCTransportProtocol::onRTCPPacket(uint8_t *packet, size_t len) {
+ //#define MASK_RTCP_VERSION 192
+ //#define MASK_TYPE_CODE 31
+ size_t read = 0;
+ uint8_t *offset = packet;
+ while (read < len) {
+ if ((((*offset) & MASK_RTCP_VERSION) >> 6) != RTCP_VERSION) {
+ TRANSPORT_LOGE("error while parsing RTCP packet, version unkwown");
+ return;
+ }
+ processRtcpHeader(offset);
+ uint16_t RTCPlen = (ntohs(*(((uint16_t *)offset) + 1)) + 1) * 4;
+ offset += RTCPlen;
+ read += RTCPlen;
+ }
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ // controller var
+ lastRoundBegin_ = std::chrono::steady_clock::now();
+ currentState_ = RTC_SYNC_STATE;
+
+ // cwin var
+ currentCWin_ = INITIAL_CWIN;
+ maxCWin_ = INITIAL_CWIN_MAX;
+
+ // names/packets var
+ actualSegment_ = 0;
+ inflightInterestsCount_ = 0;
+ while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ nackedByProducer_.clear();
+ nackedByProducerMaxSize_ = 512;
+ if (content_buffer_) content_buffer_->clear();
+
+ holes_.clear();
+ lastReceived_ = 0;
+
+ // stats
+ receivedBytes_ = 0;
+ sentInterest_ = 0;
+ receivedData_ = 0;
+ packetLost_ = 0;
+ avgPacketSize_ = INIT_PACKET_SIZE;
+ gotNack_ = false;
+ gotFutureNack_ = 0;
+ roundsWithoutNacks_ = 0;
+ pathTable_.clear();
+ // roundCounter_ = 0;
+ // minRTTwin_.clear();
+ // for (int i = 0; i < MIN_RTT_WIN; i++)
+ // minRTTwin_.push_back(UINT_MAX);
+ minRtt_ = UINT_MAX;
+
+ // CC var
+ estimatedBw_ = 0.0;
+ lossRate_ = 0.0;
+ queuingDelay_ = 0.0;
+ protocolState_ = RTC_NORMAL_STATE;
+
+ producerPathLabel_ = 0;
+ socket_->setSocketOption(
+ GeneralTransportOptions::INTEREST_LIFETIME,
+ (uint32_t)
+ RTC_INTEREST_LIFETIME); // XXX this should bedone by the application
+}
+
+uint32_t max(uint32_t a, uint32_t b) {
+ if (a > b)
+ return a;
+ else
+ return b;
+}
+
+uint32_t min(uint32_t a, uint32_t b) {
+ if (a < b)
+ return a;
+ else
+ return b;
+}
+
+void RTCTransportProtocol::checkRound() {
+ uint32_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - lastRoundBegin_)
+ .count();
+ if (duration >= ROUND_LEN) {
+ lastRoundBegin_ = std::chrono::steady_clock::now();
+ updateStats(duration); // update stats and window
+ }
+}
+
+void RTCTransportProtocol::updateDelayStats(
+ const ContentObject &content_object) {
+ uint32_t segmentNumber = content_object.getName().getSuffix();
+ uint32_t pkt = segmentNumber & modMask_;
+
+ if (inflightInterests_[pkt].transmissionTime ==
+ 0) // this is always the case if we have a retransmitted packet (timeout
+ // or RTCP)
+ return;
+
+ uint32_t pathLabel = content_object.getPathLabel();
+
+ if (pathTable_.find(pathLabel) == pathTable_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>();
+ pathTable_[pathLabel] = newPath;
+ }
+
+ // RTT measurements are useful both from NACKs and data packets
+ uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ inflightInterests_[pkt].transmissionTime;
+
+ pathTable_[pathLabel]->insertRttSample(RTT);
+
+ // we collect OWD only for datapackets
+ if (content_object.getPayload().length() != NACK_HEADER_SIZE) {
+ uint64_t *senderTimeStamp = (uint64_t *)content_object.getPayload().data();
+
+ int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count() -
+ *senderTimeStamp;
+
+ pathTable_[pathLabel]->insertOwdSample(OWD);
+ }
+}
+
+void RTCTransportProtocol::updateStats(uint32_t round_duration) {
+ if (receivedBytes_ != 0) {
+ double bytesPerSec = (double)(receivedBytes_ * ((double)MILLI_IN_A_SEC /
+ (double)round_duration));
+ estimatedBw_ = (estimatedBw_ * ESTIMATED_BW_ALPHA) +
+ ((1 - ESTIMATED_BW_ALPHA) * bytesPerSec);
+ }
+
+ auto it = pathTable_.find(producerPathLabel_);
+ if (it == pathTable_.end()) return;
+
+ // double maxAvgRTT = it->second->getAverageRtt();
+ // double minRTT = it->second->getMinRtt();
+ minRtt_ = it->second->getMinRtt();
+ queuingDelay_ = it->second->getQueuingDealy();
+
+ if (minRtt_ == 0) minRtt_ = 1;
+
+ for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
+ it->second->roundEnd();
+ }
+
+ // this is inefficient but the window is supposed to be small, so it
+ // probably makes sense to leave it like this
+ // if(minRTT == 0)
+ // minRTT = 1;
+
+ // minRTTwin_[roundCounter_ % MIN_RTT_WIN] = minRTT;
+ // minRtt_ = minRTT;
+ // for (int i = 0; i < MIN_RTT_WIN; i++)
+ // if(minRtt_ > minRTTwin_[i])
+ // minRtt_ = minRTTwin_[i];
+
+ // roundCounter_++;
+
+ // std::cout << "min RTT " << minRtt_ << " queuing " << queuingDelay_ <<
+ // std::endl;
+
+ if (sentInterest_ != 0 && currentState_ == RTC_NORMAL_STATE) {
+ double lossRate = (double)((double)packetLost_ / (double)sentInterest_);
+ lossRate_ = lossRate_ * ESTIMATED_LOSSES_ALPHA +
+ (lossRate * (1 - ESTIMATED_LOSSES_ALPHA));
+ }
+
+ if (avgPacketSize_ == 0) avgPacketSize_ = INIT_PACKET_SIZE;
+
+ uint32_t BDP =
+ ceil((estimatedBw_ * (double)((double)minRtt_ / (double)MILLI_IN_A_SEC) *
+ BANDWIDTH_SLACK_FACTOR) /
+ avgPacketSize_);
+ uint32_t BW = ceil(estimatedBw_);
+ computeMaxWindow(BW, BDP);
+
+ // bound also by interest lifitime* production rate
+ if (!gotNack_) {
+ roundsWithoutNacks_++;
+ if (currentState_ == RTC_SYNC_STATE &&
+ roundsWithoutNacks_ >= ROUNDS_IN_SYNC_BEFORE_SWITCH) {
+ currentState_ = RTC_NORMAL_STATE;
+ }
+ } else {
+ roundsWithoutNacks_ = 0;
+ }
+
+ updateCCState();
+ updateWindow();
+
+ // in any case we reset all the counters
+
+ gotNack_ = false;
+ gotFutureNack_ = 0;
+ receivedBytes_ = 0;
+ sentInterest_ = 0;
+ receivedData_ = 0;
+ packetLost_ = 0;
+}
+
+void RTCTransportProtocol::updateCCState() {
+ // TODO
+}
+
+void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
+ uint32_t BDPWin) {
+ if (productionRate ==
+ 0) // we have no info about the producer, keep the previous maxCWin
+ return;
+
+ uint32_t interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interestLifetime);
+ uint32_t maxWaintingInterest = ceil(
+ (productionRate / avgPacketSize_) *
+ (double)((double)(interestLifetime * INTEREST_LIFETIME_REDUCTION_FACTOR) /
+ (double)MILLI_IN_A_SEC));
+
+ if (currentState_ == RTC_SYNC_STATE) {
+ // in this case we do not limit the window with the BDP, beacuse most likly
+ // it is wrong
+ maxCWin_ = maxWaintingInterest;
+ return;
+ }
+
+ // currentState = RTC_NORMAL_STATE
+ if (BDPWin != 0) {
+ maxCWin_ = ceil((double)BDPWin + ((double)BDPWin / 10.0)); // BDP + 10%
+ } else {
+ maxCWin_ = min(maxWaintingInterest, maxCWin_);
+ }
+}
+
+void RTCTransportProtocol::updateWindow() {
+ if (currentState_ == RTC_SYNC_STATE) return;
+
+ if (currentCWin_ < maxCWin_ * 0.7) {
+ currentCWin_ = min(maxCWin_, currentCWin_ * WIN_INCREASE_FACTOR);
+ } else if (currentCWin_ > maxCWin_) {
+ currentCWin_ = max(currentCWin_ * WIN_DECREASE_FACTOR, MIN_CWIN);
+ }
+}
+
+void RTCTransportProtocol::decreaseWindow() {
+ // this is used only in SYNC mode
+ if (currentState_ == RTC_NORMAL_STATE) return;
+
+ if (gotFutureNack_ == 1)
+ currentCWin_ =
+ min((currentCWin_ - 1), ceil((double)maxCWin_ * 0.66)); // 2/3
+ else
+ currentCWin_--;
+
+ currentCWin_ = max(currentCWin_, MIN_CWIN);
+}
+
+void RTCTransportProtocol::increaseWindow() {
+ // this is used only in SYNC mode
+ if (currentState_ == RTC_NORMAL_STATE) return;
+
+ // we need to be carefull to do not increase the window to much
+ if (currentCWin_ < ((double)maxCWin_ * 0.5)) {
+ currentCWin_ = currentCWin_ + 1; // exponential
+ } else {
+ currentCWin_ = min(
+ maxCWin_, ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
+ }
+}
+
+void RTCTransportProtocol::sendInterest() {
+ Name interest_name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ interest_name);
+ bool isRTX = false;
+ // uint32_t sentInt = 0;
+
+ if (interestRetransmissions_.size() > 0) {
+ // handle retransmission
+ // here we have two possibile retransmissions: retransmissions due to
+ // timeouts and retransmissions due to RTCP NACKs. we will send the interest
+ // anyway, even if it is pending (this is possible only in the second case)
+ uint32_t rtxSeg = interestRetransmissions_.front();
+ interestRetransmissions_.pop();
+
+ std::unordered_map<uint32_t, uint64_t>::const_iterator res =
+ holes_.find(rtxSeg);
+ if (res != holes_.end()) {
+ // this packet is already managed by as an hole
+ // we don't need to send it again
+ return;
+ }
+
+ // a packet recovery means that there was a loss
+ packetLost_++;
+
+ uint32_t pkt = rtxSeg & modMask_;
+ interest_name.setSuffix(rtxSeg);
+
+ // if the interest is not pending anymore we encrease the retrasnmission
+ // counter in order to avoid to handle a recovered packt as a normal one
+ if (!portal_->interestIsPending(interest_name)) {
+ inflightInterests_[pkt].retransmissions++;
+ }
+
+ inflightInterests_[pkt].transmissionTime = 0;
+ isRTX = true;
+ } else {
+ // in this case we send the packet only if it is not pending yet
+ interest_name.setSuffix(actualSegment_);
+ if (portal_->interestIsPending(interest_name)) {
+ actualSegment_++;
+ return;
+ }
+
+ // sentInt = actualSegment_;
+ uint32_t pkt = actualSegment_ & modMask_;
+ inflightInterests_[pkt].transmissionTime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ inflightInterests_[pkt].retransmissions = 0;
+ actualSegment_++;
+ }
+
+ auto interest = getInterest();
+ interest->setName(interest_name);
+
+ uint32_t interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interestLifetime);
+ interest->setLifetime(uint32_t(interestLifetime));
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ on_interest_output);
+
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ using namespace std::placeholders;
+ portal_->sendInterest(std::move(interest));
+
+ sentInterest_++;
+
+ if (!isRTX) {
+ inflightInterestsCount_++;
+ }
+}
+
+void RTCTransportProtocol::scheduleNextInterest() {
+ checkRound();
+ if(!is_running_)
+ return;
+
+ uint32_t MAX_RECOVER =
+ 40; // if the packet is more than MAX_RECOVER seq in the past we drop it
+ uint64_t TIME_BEFORE_RECOVERY = 10; // this should be proporsional to the RTT
+
+ // holes are important only in NORMAL state
+ if (currentState_ == RTC_NORMAL_STATE) {
+ for (std::unordered_map<uint32_t, uint64_t>::iterator it = holes_.begin();
+ it != holes_.end();) {
+ if (it->first < lastReceived_ - MAX_RECOVER) {
+ // the packet is to hold, remove it
+ it = holes_.erase(it);
+ } else {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t sinceLastTry = now - it->second;
+
+ if (sinceLastTry > TIME_BEFORE_RECOVERY || it->second == 0) {
+ // a recovery means a packet lost
+ packetLost_++;
+ // update last sent time
+ it->second = now;
+
+ Name interest_name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ interest_name);
+
+ uint32_t pkt = it->first & modMask_;
+ interest_name.setSuffix(it->first);
+
+ if (!portal_->interestIsPending(interest_name)) {
+ inflightInterests_[pkt].retransmissions++;
+ }
+
+ inflightInterests_[pkt].transmissionTime = 0;
+ // XXX
+ // code refactoring:
+ // from here on this is a copy and paste of the code inside
+ // sendInterest this should go inside an other method
+ auto interest = getInterest();
+ uint32_t interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interestLifetime);
+ interest->setLifetime(uint32_t(interestLifetime));
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ on_interest_output);
+ if (on_interest_output != VOID_HANDLER)
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_),
+ *interest);
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) return;
+
+ using namespace std::placeholders;
+ portal_->sendInterest(std::move(interest));
+
+ sentInterest_++;
+ }
+ ++it;
+ }
+ // as usual check the round at each packet
+ checkRound();
+ }
+ }
+
+ while (interestRetransmissions_.size() > 0) {
+ sendInterest();
+ checkRound();
+ }
+
+ while (inflightInterestsCount_ < currentCWin_) {
+ sendInterest();
+ checkRound();
+ }
+}
+
+void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
+ for (uint32_t i = 0; i < nacks.size(); i++) {
+ if (nackedByProducer_.find(nacks[i]) != nackedByProducer_.end()) {
+ continue;
+ }
+ // packetLost_++;
+ // XXX here I need to avoid the retrasmission for packet that were nacked by
+ // the network
+ interestRetransmissions_.push(nacks[i]);
+ }
+
+ scheduleNextInterest();
+}
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ // packetLost_++;
+
+ uint32_t segmentNumber = interest->getName().getSuffix();
+ uint32_t pkt = segmentNumber & modMask_;
+
+ if (inflightInterests_[pkt].retransmissions == 0) {
+ inflightInterestsCount_--;
+ }
+
+ if (inflightInterests_[pkt].retransmissions < MAX_RTX) {
+ interestRetransmissions_.push(segmentNumber);
+ }
+
+ scheduleNextInterest();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ uint32_t *payload = (uint32_t *)content_object.getPayload().data();
+ uint32_t productionSeg = *payload;
+ uint32_t productionRate = *(++payload);
+ uint32_t nackSegment = content_object.getName().getSuffix();
+
+ // we synch the estimated production rate with the actual one
+ estimatedBw_ = (double)productionRate;
+
+ // if(inflightInterests_[segmentNumber %
+ // default_values::default_buffer_size].retransmissions != 0){ ignore nacks for
+ // retransmissions
+ // return;
+ //}
+
+ gotNack_ = true;
+
+ if (productionSeg > nackSegment) {
+ // we are asking for stuff produced in the past
+ actualSegment_ = max(productionSeg + 1, actualSegment_);
+ if (currentState_ == RTC_NORMAL_STATE) {
+ currentState_ = RTC_SYNC_STATE;
+ // if we switch in SYNC mode we do not care about holes
+ // se we reset the data structure. going back to NORMAL
+ // mode will anable again the holes_ check.
+ holes_.clear();
+ lastReceived_ = 0;
+ }
+
+ computeMaxWindow(productionRate, 0);
+ increaseWindow();
+
+ if (nackedByProducer_.size() >= nackedByProducerMaxSize_)
+ nackedByProducer_.erase(nackedByProducer_.begin());
+ nackedByProducer_.insert(nackSegment);
+
+ } else if (productionSeg < nackSegment) {
+ gotFutureNack_++;
+ // we are asking stuff in the future
+ // example
+ // 10 12 13 14 15 16 17
+ // ^ ^ ^
+ // in prod nack actual
+ // in this example we sent up to segment 17 and we get a nack for segment 15
+ // this means that we will get nack also for 16 17
+ // and valid data for 13 14
+ // so the next segment to ask is 15, because 13 and 14 will can back anyway
+ // we go back only in the case that the actual segment is really bigger than
+ // nack segment, other we do nothing
+
+ actualSegment_ = min(actualSegment_, nackSegment);
+
+ computeMaxWindow(productionRate, 0);
+ decreaseWindow();
+
+ if (currentState_ == RTC_SYNC_STATE) {
+ currentState_ = RTC_NORMAL_STATE;
+ }
+ } // equal should not happen
+}
+
+void RTCTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t payload_size = content_object->getPayload().length();
+ uint32_t segmentNumber = content_object->getName().getSuffix();
+ uint32_t pkt = segmentNumber & modMask_;
+
+ // try to recover holes
+ // we can recover haoles with valid data, nacks or retransmitted packets
+ bool recoveredHole = false;
+ std::unordered_map<uint32_t, uint64_t>::const_iterator res =
+ holes_.find(segmentNumber);
+ if (res != holes_.end()) {
+ holes_.erase(res);
+ recoveredHole = true;
+ }
+
+ if (payload_size == NACK_HEADER_SIZE) {
+ // Nacks always come form the producer, so we set the producerePathLabel_;
+ producerPathLabel_ = content_object->getPathLabel();
+ if (inflightInterests_[pkt].retransmissions == 0) {
+ inflightInterestsCount_--;
+ onNack(*content_object);
+ updateDelayStats(*content_object);
+ }
+
+ } else {
+ receivedData_++;
+
+ avgPacketSize_ =
+ (ESTIMATED_PACKET_SIZE * avgPacketSize_) +
+ ((1 - ESTIMATED_PACKET_SIZE) * content_object->getPayload().length());
+
+ if (inflightInterests_[pkt].retransmissions == 0) {
+ inflightInterestsCount_--;
+ // we count only non retransmitted data in order to take into accunt only
+ // the transmition rate of the producer
+ receivedBytes_ +=
+ content_object->headerSize() + content_object->payloadSize();
+ updateDelayStats(*content_object);
+
+ // handle holes
+ // the packet sequence make sense only in case of valid data (no nacks, no
+ // rtx) in RTC_NORMAL_STATE we should get all the packets in order, so if
+ // segmentNumber != lastReceived + 1 something happened
+ // if recoveredHole == true this is a packet recovered so we should do
+ // nothing
+ if (currentState_ == RTC_NORMAL_STATE && recoveredHole == false) {
+ if ((segmentNumber != lastReceived_ + 1) &&
+ segmentNumber > lastReceived_) {
+ // we have holes in the sequence
+ for (uint32_t seq = lastReceived_ + 1; seq < segmentNumber; seq++) {
+ // the hole exists we do not insert it again
+ std::unordered_map<uint32_t, uint64_t>::const_iterator res =
+ holes_.find(seq);
+ if (res == holes_.end())
+ holes_.insert(std::make_pair(seq, 0)); // 0 means never sent
+ }
+ }
+ }
+
+ // this if should be always true
+ if (segmentNumber > lastReceived_) {
+ lastReceived_ = segmentNumber;
+ }
+ }
+
+ returnContentToUser(*content_object);
+ increaseWindow();
+ }
+
+ scheduleNextInterest();
+}
+
+void RTCTransportProtocol::returnContentToUser(
+ const ContentObject &content_object) {
+ // return content to the user
+ Array a = content_object.getPayload();
+
+ uint8_t *start = ((uint8_t *)a.data()) + TIMESTAMP_SIZE;
+ unsigned size = a.length() - TIMESTAMP_SIZE;
+
+ // set offset between hICN and RTP packets
+ uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));
+ RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq;
+
+ content_buffer_->insert(content_buffer_->end(), start, start + size);
+
+ ConsumerContentCallback on_payload = VOID_HANDLER;
+ socket_->getSocketOption(CONTENT_RETRIEVED, on_payload);
+ if (on_payload != VOID_HANDLER) {
+ on_payload(*dynamic_cast<ConsumerSocket *>(socket_), size,
+ std::make_error_code(std::errc(0)));
+ }
+}
+
+uint32_t RTCTransportProtocol::hICN2RTP(uint32_t hicn_seq) {
+ return RTPhICN_offset_ - hicn_seq;
+}
+
+uint32_t RTCTransportProtocol::RTP2hICN(uint32_t rtp_seq) {
+ return RTPhICN_offset_ + rtp_seq;
+}
+
+void RTCTransportProtocol::processRtcpHeader(uint8_t *offset) {
+ uint8_t pkt_type = (*(offset + 1));
+ switch (pkt_type) {
+ case RTCP_RR: // Receiver report
+ TRANSPORT_LOGI("got RR packet\n");
+ break;
+ case RTCP_SR: // Sender report
+ TRANSPORT_LOGI("got SR packet\n");
+ break;
+ case RTCP_SDES: // Description
+ processSDES(offset);
+ break;
+ case RTCP_RTPFB: // Transport layer FB message
+ processGenericNack(offset);
+ break;
+ case RTCP_PSFB:
+ processPli(offset);
+ break;
+ default:
+ errorParsingRtcpHeader(offset);
+ }
+}
+
+void RTCTransportProtocol::errorParsingRtcpHeader(uint8_t *offset) {
+ uint8_t pt = (*(offset + 1));
+ uint8_t code = ((*offset) & MASK_TYPE_CODE);
+ TRANSPORT_LOGE("Received unknwnon RTCP packet. Payload type = %u, code = %u",
+ pt, code);
+}
+
+void RTCTransportProtocol::processSDES(uint8_t *offset) {
+ uint8_t code = ((*offset) & MASK_TYPE_CODE);
+ switch (code) {
+ case RTCP_SDES_CNAME:
+ TRANSPORT_LOGI("got SDES packet: CNAME\n");
+ break;
+ default:
+ errorParsingRtcpHeader(offset);
+ }
+}
+
+void RTCTransportProtocol::processPli(uint8_t *offset) {
+ if (((*offset) & MASK_TYPE_CODE) != RTCP_PSFB_PLI) {
+ errorParsingRtcpHeader(offset);
+ return;
+ }
+
+ TRANSPORT_LOGI("got PLI packet\n");
+}
+
+void RTCTransportProtocol::processGenericNack(uint8_t *offset) {
+ if (((*offset) & MASK_TYPE_CODE) != RTCP_RTPFB_GENERIC_NACK) {
+ errorParsingRtcpHeader(offset);
+ return;
+ }
+
+ std::vector<uint32_t> nacks;
+
+ uint16_t header_lines =
+ ntohs(*(((uint16_t *)offset) + 1)) -
+ 2; // 2 is the number of header 32-bits words - 1 (RFC 4885)
+ uint8_t *payload = offset + RTPC_NACK_HEADER; // 12 bytes
+ for (uint16_t l = header_lines; l > 0; l--) {
+ nacks.push_back(RTP2hICN(ntohs(*((uint16_t *)payload))));
+
+ uint16_t BLP = ntohs(*(((uint16_t *)payload) + 1));
+
+ for (int bit = 0; bit < 15; bit++) { // 16 bits word to scan
+ if ((BLP >> bit) & 1) {
+ nacks.push_back(RTP2hICN((ntohs(*((uint16_t *)payload)) + bit + 1) %
+ MAX_RTCP_SEQ_NUMBER));
+ }
+ }
+
+ payload += 4; // go to the next line
+ }
+
+ portal_->getIoService().post(std::bind(
+ &RTCTransportProtocol::scheduleAppNackRtx, this, std::move(nacks)));
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
new file mode 100755
index 000000000..249af6b99
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <queue>
+#include <set>
+#include <unordered_map>
+
+#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/rtc_data_path.h>
+
+// algorithm state
+#define RTC_SYNC_STATE 0
+#define RTC_NORMAL_STATE 1
+#define ROUNDS_IN_SYNC_BEFORE_SWITCH 3
+
+// packet constants
+#define INIT_PACKET_SIZE 1300 // bytes
+#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp
+#define NACK_HEADER_SIZE 8 // bytes
+#define TIMESTAMP_SIZE 8 // bytes
+#define RTC_INTEREST_LIFETIME 1000 // ms
+
+// controller constant
+#define ROUND_LEN \
+ 200 // ms interval of time on which we take decisions / measurements
+#define MAX_RTX 128
+#define MIN_RTT_WIN 30 // rounds
+
+// cwin
+#define INITIAL_CWIN 1 // packets
+#define INITIAL_CWIN_MAX 100000 // packets
+#define MIN_CWIN 5 // packets
+
+// statistics constants
+#define BANDWIDTH_SLACK_FACTOR 1.5
+#define ESTIMATED_BW_ALPHA 0.7
+#define ESTIMATED_PACKET_SIZE 0.7
+#define ESTIMATED_LOSSES_ALPHA 0.8
+#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+
+//#define MAX_LOSS_RATE 0.05
+//#define MAX_QUEUING_DELAY 200 //ms
+
+// cwin
+#define INITIAL_CWIN 1
+#define MIN_CWIN 5
+#define WIN_DECREASE_FACTOR 0.8
+#define WIN_INCREASE_FACTOR 1.1
+
+// protocol state
+//#define RTC_CONGESTED_STATE 10
+//#define RTC_LOSSY_STATE 20
+//#define RTC_DELAY_STATE 30
+//#define RTC_NORMAL_STATE 40
+
+// other constants
+#define NANO_IN_A_SEC 1000000000
+#define MICRO_IN_A_SEC 1000000
+#define MILLI_IN_A_SEC 1000
+
+// RTCP
+#define MASK_RTCP_VERSION 192
+#define MASK_TYPE_CODE \
+ 31 // this is RC in the RR/SR packet or FMT int the early feedback packets
+#define RTPC_NACK_HEADER 12 // bytes
+#define MAX_RTCP_SEQ_NUMBER 0xffff
+#define RTCP_VERSION 2
+// RTCP TYPES
+#define RTCP_SR 200
+#define RTCP_RR 201
+#define RTCP_SDES 202
+#define RTCP_RTPFB 205
+#define RTCP_PSFB 206
+// RTCP RC/FMT
+#define RTCP_SDES_CNAME 1
+#define RTCP_RTPFB_GENERIC_NACK 1
+#define RTCP_PSFB_PLI 1
+
+namespace transport {
+
+namespace protocol {
+
+struct sentInterest {
+ uint64_t transmissionTime;
+ uint8_t retransmissions;
+};
+
+class RTCTransportProtocol : public TransportProtocol {
+ public:
+ RTCTransportProtocol(interface::BaseSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ void start(utils::SharableVector<uint8_t> &content_buffer);
+
+ void stop();
+
+ void resume();
+
+ void onRTCPPacket(uint8_t *packet, size_t len);
+
+ private:
+ // algo functions
+ void reset();
+ void checkRound();
+
+ // CC functions
+ void updateDelayStats(const ContentObject &content_object);
+ void updateStats(uint32_t round_duration);
+ void updateCCState();
+ void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin);
+ void updateWindow();
+ void decreaseWindow();
+ void increaseWindow();
+ void resetPreviousWindow();
+
+ // packet functions
+ void sendInterest();
+ void scheduleNextInterest();
+ void scheduleAppNackRtx(std::vector<uint32_t> &nacks);
+ void onTimeout(Interest::Ptr &&interest);
+ void onNack(const ContentObject &content_object);
+ void onContentObject(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object);
+ void returnContentToUser(const ContentObject &content_object);
+
+ // RTCP functions
+ uint32_t hICN2RTP(uint32_t hicn_seq);
+ uint32_t RTP2hICN(uint32_t rtp_seq);
+ void processRtcpHeader(uint8_t *offset);
+ void errorParsingRtcpHeader(uint8_t *offset);
+ void processSDES(uint8_t *offset);
+ void processGenericNack(uint8_t *offset);
+ void processPli(uint8_t *offset);
+
+ // controller var
+ std::chrono::steady_clock::time_point lastRoundBegin_;
+ // bool allPacketsInSync_;
+ // unsigned numberOfRoundsInSync_;
+ // unsigned numberOfCatchUpRounds_;
+ // bool catchUpPhase_;
+ unsigned currentState_;
+
+ // uint32_t inProduction_;
+
+ // cwin var
+ uint32_t currentCWin_;
+ uint32_t maxCWin_;
+ // uint32_t previousCWin_;
+
+ // names/packets var
+ uint32_t actualSegment_;
+ int32_t RTPhICN_offset_;
+ uint32_t inflightInterestsCount_;
+ std::queue<uint32_t> interestRetransmissions_;
+ std::vector<sentInterest> inflightInterests_;
+ uint32_t nackedByProducerMaxSize_;
+ std::set<uint32_t>
+ nackedByProducer_; // this is used to avoid retransmissions from the
+ // application for pakets for which we already got a
+ // past NACK by the producer these packet are too old,
+ // they will never be retrived
+ std::shared_ptr<utils::SharableVector<uint8_t>> content_buffer_;
+ uint32_t modMask_;
+
+ // stats
+ uint32_t receivedBytes_;
+ uint32_t sentInterest_;
+ uint32_t receivedData_;
+ uint32_t packetLost_;
+ double avgPacketSize_;
+ bool gotNack_;
+ uint32_t gotFutureNack_;
+ uint32_t roundsWithoutNacks_;
+ uint32_t producerPathLabel_; // XXX we pick only one path lable for the
+ // producer for now, assuming the usage of a
+ // single path this should be extended to a
+ // vector
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
+ uint32_t roundCounter_;
+ // std::vector<uint64_t> minRTTwin_;
+ uint64_t minRtt_;
+
+ std::unordered_map<uint32_t, uint64_t> holes_;
+ uint32_t lastReceived_;
+
+ // CC var
+ double estimatedBw_;
+ double lossRate_;
+ double queuingDelay_;
+ unsigned protocolState_;
+};
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
new file mode 100755
index 000000000..6c9605fb2
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/protocols/rtc_data_path.h>
+
+namespace transport {
+
+namespace protocol {
+
+RTCDataPath::RTCDataPath()
+ : min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(0.0),
+ queuing_delay(0.0),
+ RTThistory_(HISTORY_LEN),
+ OWDhistory_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(uint64_t rtt) {
+ // for the rtt we only keep track of the min one
+ if (rtt < min_rtt) min_rtt = rtt;
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+}
+
+void RTCDataPath::roundEnd() {
+ // compute queuing delay
+ queuing_delay = avg_owd - getMinOwd();
+
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ RTThistory_.pushBack(min_rtt);
+ min_rtt = UINT_MAX;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ OWDhistory_.pushBack(min_owd);
+ min_owd = INT_MAX;
+}
+
+double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+
+uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); }
+
+int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); }
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h
new file mode 100755
index 000000000..ace16ff12
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.h
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <hicn/transport/utils/min_filter.h>
+#include <climits>
+
+#define ALPHA_RTC 0.125
+#define HISTORY_LEN 30
+
+namespace transport {
+
+namespace protocol {
+
+class RTCDataPath {
+ public:
+ RTCDataPath();
+
+ public:
+ void insertRttSample(uint64_t rtt);
+ void insertOwdSample(int64_t owd);
+
+ uint64_t getMinRtt();
+
+ double getQueuingDealy();
+
+ void roundEnd();
+
+ private:
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ utils::MinFilter<uint64_t> RTThistory_;
+ utils::MinFilter<int64_t> OWDhistory_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt
new file mode 100755
index 000000000..6f9fdb9aa
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt
@@ -0,0 +1,10 @@
+# Enable gcov output for the tests
+add_definitions(--coverage)
+set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage")
+
+set(TestsExpectedToPass
+ test_transport_producer)
+
+foreach(test ${TestsExpectedToPass})
+ AddTest(${test})
+endforeach() \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc
new file mode 100755
index 000000000..204f2cbe2
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "../socket_producer.h"
+#include "literals.h"
+
+#include <test.h>
+#include <random>
+
+namespace transport {
+
+namespace protocol {
+
+namespace {
+// The fixture for testing class Foo.
+class ProducerTest : public ::testing::Test {
+ protected:
+ ProducerTest() : name_("b001::123|321"), producer_(io_service_) {
+ // You can do set-up work for each test here.
+ }
+
+ virtual ~ProducerTest() {
+ // You can do clean-up work that doesn't throw exceptions here.
+ }
+
+ // If the constructor and destructor are not enough for setting up
+ // and cleaning up each test, you can define the following methods:
+
+ virtual void SetUp() {
+ // Code here will be called immediately after the constructor (right
+ // before each test).
+ }
+
+ virtual void TearDown() {
+ // Code here will be called immediately after each test (right
+ // before the destructor).
+ }
+
+ Name name_;
+ asio::io_service io_service_;
+ ProducerSocket producer_;
+};
+
+} // namespace
+
+// Tests that the Foo::Bar() method does Abc.
+TEST_F(ProducerTest, ProduceContent) {
+ std::string content(250000, '?');
+
+ producer_.registerPrefix(Prefix("b001::/64"));
+ producer_.produce(name_, reinterpret_cast<const uint8_t *>(content.data()),
+ content.size(), true);
+ producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 500000000_U32);
+ producer_.attach();
+ producer_.serveForever();
+}
+
+} // namespace protocol
+
+} // namespace transport
+
+int main(int argc, char **argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+} \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/vegas.cc b/libtransport/src/hicn/transport/protocols/vegas.cc
new file mode 100755
index 000000000..b6d79bfcc
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/vegas.cc
@@ -0,0 +1,630 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/vegas.h>
+#include <hicn/transport/utils/literals.h>
+
+#include <cmath>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+VegasTransportProtocol::VegasTransportProtocol(BaseSocket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ is_final_block_number_discovered_(false),
+ final_block_number_(std::numeric_limits<uint32_t>::max()),
+ last_reassembled_segment_(0),
+ content_buffer_size_(0),
+ current_window_size_(default_values::min_window_size),
+ interests_in_flight_(0),
+ next_suffix_(0),
+ interest_retransmissions_(1 << default_values::log_2_default_buffer_size),
+ interest_timepoints_(1 << default_values::log_2_default_buffer_size),
+ retx_count_(0),
+ receive_buffer_(1 << default_values::log_2_default_buffer_size),
+ unverified_segments_(1 << default_values::log_2_default_buffer_size),
+ verified_manifests_(1 << default_values::log_2_default_buffer_size),
+ mask_((1 << default_values::log_2_default_buffer_size) - 1),
+ incremental_suffix_index_(0),
+ suffix_queue_completed_(false),
+ download_with_manifest_(false),
+ next_manifest_interval_(0_U16),
+ interest_tx_(0),
+ interest_count_(0),
+ byte_count_(0),
+ average_rtt_(0.0) {
+ portal_ = socket_->portal_;
+ incremental_suffix_index_++;
+}
+
+VegasTransportProtocol::~VegasTransportProtocol() { stop(); }
+
+void VegasTransportProtocol::reset() {
+ portal_->setConsumerCallback(this);
+
+ is_final_block_number_discovered_ = false;
+ interest_pool_index_ = 0;
+ final_block_number_ = std::numeric_limits<uint32_t>::max();
+ next_suffix_ = 0;
+ interests_in_flight_ = 0;
+ last_reassembled_segment_ = 0;
+ content_buffer_size_ = 0;
+ content_buffer_->clear();
+ interest_retransmissions_.clear();
+ interest_retransmissions_.resize(
+ 1 << default_values::log_2_default_buffer_size, 0);
+ interest_timepoints_.clear();
+ interest_timepoints_.resize(1 << default_values::log_2_default_buffer_size,
+ std::chrono::steady_clock::time_point());
+ receive_buffer_.clear();
+ unverified_segments_.clear();
+ verified_manifests_.clear();
+ next_manifest_interval_ = 0;
+ next_manifest_ = 0;
+ download_with_manifest_ = false;
+ incremental_suffix_index_ = 0;
+
+ interest_tx_ = 0;
+ interest_count_ = 0;
+ byte_count_ = 0;
+ average_rtt_ = 0;
+
+ // asio::io_service &io_service = portal_->getIoService();
+
+ // if (io_service.stopped()) {
+ // io_service.reset();
+ // }
+}
+
+void VegasTransportProtocol::start(
+ utils::SharableVector<uint8_t> &content_buffer) {
+
+ if(is_running_)
+ return;
+
+ socket_->t0_ = std::chrono::steady_clock::now();
+
+ is_running_ = true;
+ content_buffer_ = content_buffer.shared_from_this();
+
+ reset();
+
+ sendInterest(next_suffix_++);
+ portal_->runEventsLoop();
+ removeAllPendingInterests();
+ is_running_ = false;
+
+}
+
+void VegasTransportProtocol::resume(){
+ if(is_running_)
+ return;
+
+ is_running_ = true;
+ sendInterest(next_suffix_++);
+ portal_->runEventsLoop();
+ removeAllPendingInterests();
+ is_running_ = false;
+}
+
+void VegasTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = getInterest();
+ socket_->network_name_.setSuffix(next_suffix);
+ interest->setName(socket_->network_name_);
+
+ interest->setLifetime(uint32_t(socket_->interest_lifetime_));
+
+ if (socket_->on_interest_output_ != VOID_HANDLER) {
+ socket_->on_interest_output_(*socket_, *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ interests_in_flight_++;
+ interest_retransmissions_[next_suffix & mask_] = 0;
+ interest_timepoints_[next_suffix & mask_] = std::chrono::steady_clock::now();
+
+ using namespace std::placeholders;
+ portal_->sendInterest(std::move(interest));
+}
+
+void VegasTransportProtocol::stop() {
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void VegasTransportProtocol::onContentSegment(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+ bool virtual_download = socket_->virtual_download_;
+
+ if (verifyContentObject(*content_object)) {
+ byte_count_ += content_object->getPayload().length();
+
+ if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
+ is_final_block_number_discovered_ = true;
+ final_block_number_ = incremental_suffix;
+ }
+
+ if (!virtual_download) {
+ receive_buffer_.emplace(
+ std::make_pair(incremental_suffix, std::move(content_object)));
+ reassemble();
+ } else if (TRANSPORT_EXPECT_FALSE(is_final_block_number_discovered_ &&
+ incremental_suffix ==
+ final_block_number_)) {
+ returnContentToUser();
+ }
+ } else {
+ unverified_segments_.emplace(
+ std::make_pair(incremental_suffix, std::move(content_object)));
+ }
+}
+
+void VegasTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ increaseWindow();
+}
+
+void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ decreaseWindow();
+}
+
+void VegasTransportProtocol::scheduleNextInterests() {
+ if (is_running_) {
+ uint32_t next_suffix;
+ while (interests_in_flight_ < current_window_size_) {
+ if (download_with_manifest_) {
+ if (suffix_queue_.size() * 2 < current_window_size_ &&
+ next_manifest_ < final_block_number_ && next_manifest_interval_) {
+ next_manifest_ += next_manifest_interval_;
+ sendInterest(next_manifest_);
+ continue;
+ }
+
+ if (suffix_queue_.pop(next_suffix)) {
+ // next_suffix = suffix_queue_.front();
+ sendInterest(next_suffix);
+ // suffix_queue_.pop_front();
+ } else {
+ if (!suffix_queue_completed_) {
+ TRANSPORT_LOGE("Empty queue!!!!!!");
+ }
+ break;
+ }
+ } else {
+ if (is_final_block_number_discovered_) {
+ if (next_suffix_ > final_block_number_) {
+ return;
+ }
+ }
+
+ sendInterest(next_suffix_++);
+ }
+ }
+ }
+}
+
+void VegasTransportProtocol::decreaseWindow() {
+ if (current_window_size_ > socket_->min_window_size_) {
+ current_window_size_ = std::ceil(current_window_size_ / 2);
+ socket_->current_window_size_ = current_window_size_;
+ }
+}
+
+void VegasTransportProtocol::increaseWindow() {
+ if (current_window_size_ < socket_->max_window_size_) {
+ current_window_size_++;
+ socket_->max_window_size_ = current_window_size_;
+ }
+};
+
+void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ std::chrono::steady_clock::duration duration =
+ std::chrono::steady_clock::now() - interest_timepoints_[segment];
+ rtt_estimator_.addMeasurement(
+ std::chrono::duration_cast<std::chrono::microseconds>(duration));
+
+ RtoEstimator::Duration rto = rtt_estimator_.computeRto();
+ std::chrono::milliseconds lifetime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(rto);
+
+ socket_->interest_lifetime_ = lifetime.count();
+}
+
+void VegasTransportProtocol::returnContentToUser() {
+ if (socket_->on_payload_retrieved_ != VOID_HANDLER) {
+ socket_->on_payload_retrieved_(*socket_, byte_count_,
+ std::make_error_code(std::errc(0)));
+ }
+
+ stop();
+}
+
+void VegasTransportProtocol::onManifest(
+ std::unique_ptr<ContentObjectManifest> &&manifest) {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ download_with_manifest_ = true;
+
+ uint32_t segment = manifest->getName().getSuffix();
+
+ if (verifyManifest(*manifest)) {
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_TRUE(manifest->getVersion() ==
+ core::ManifestVersion::VERSION_1)) {
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = --manifest->getSuffixList().end();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) {
+ _end++;
+ }
+
+ // Get final block number
+ is_final_block_number_discovered_ = true;
+ final_block_number_ = manifest->getFinalBlockNumber();
+
+ for (; _it != _end; _it++) {
+ suffix_hash_map_[_it->first] = std::make_pair(
+ std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+ suffix_queue_.push(_it->first);
+ }
+
+ next_manifest_interval_ = manifest->getSuffixList().size();
+
+ if (manifest->isFinalManifest()) {
+ suffix_queue_completed_ = true;
+ // Give it a try
+ if (verifier_thread_) {
+ asio::io_service &io_service = portal_->getIoService();
+ io_service.post([this]() { scheduleNextInterests(); });
+ }
+ }
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+ }
+
+ if (!socket_->virtual_download_) {
+ receive_buffer_.emplace(
+ std::make_pair(segment, std::move(manifest->getPacket())));
+ reassemble();
+ } else {
+ if (segment >= final_block_number_) {
+ stop();
+ }
+ }
+ }
+}
+
+bool VegasTransportProtocol::verifyManifest(
+ const ContentObjectManifest &manifest) {
+ if (!socket_->verify_signature_) {
+ return true;
+ }
+
+ bool is_data_secure = false;
+
+ if (socket_->on_content_object_verification_ == VOID_HANDLER) {
+ is_data_secure = static_cast<bool>(socket_->verifier_.verify(manifest));
+ } else if (socket_->on_content_object_verification_(*socket_, manifest)) {
+ is_data_secure = true;
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_data_secure)) {
+ TRANSPORT_LOGE("Verification failed for %s\n",
+ manifest.getName().toString().c_str());
+ }
+
+ return is_data_secure;
+}
+
+// TODO Add the name in the digest computation!
+void VegasTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+
+ std::chrono::microseconds rtt;
+ Time now = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::duration duration =
+ now - interest_timepoints_[incremental_suffix & mask_];
+ rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+
+ average_rtt_ = (0.7 * average_rtt_) + (0.3 * (double)rtt.count());
+
+ if (socket_->on_timer_expires_ != VOID_HANDLER) {
+ auto dt = std::chrono::duration_cast<TimeDuration>(now - socket_->t0_);
+ if (dt.count() > socket_->timer_interval_milliseconds_) {
+ socket_->on_timer_expires_(*socket_, byte_count_, dt,
+ current_window_size_, retx_count_,
+ std::round(average_rtt_));
+ socket_->t0_ = std::chrono::steady_clock::now();
+ }
+ }
+
+ interests_in_flight_--;
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ || incremental_suffix == ~0_U64 ||
+ receive_buffer_.find(incremental_suffix) !=
+ receive_buffer_.end())) {
+ return;
+ }
+
+ changeInterestLifetime(incremental_suffix);
+
+ if (socket_->on_content_object_input_ != VOID_HANDLER) {
+ socket_->on_content_object_input_(*socket_, *content_object);
+ }
+
+ if (socket_->on_interest_satisfied_ != VOID_HANDLER) {
+ socket_->on_interest_satisfied_(*socket_, *interest);
+ }
+
+ if (!interest_retransmissions_[incremental_suffix & mask_]) {
+ afterContentReception(*interest, *content_object);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
+ PayloadType::MANIFEST)) {
+ // TODO Fix manifest!!
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(content_object));
+
+ if (verifier_thread_ && incremental_suffix != 0) {
+ // verifier_thread_->add(std::bind(&VegasTransportProtocol::onManifest,
+ // this, std::move(manifest)));
+ } else {
+ onManifest(std::move(manifest));
+ }
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ if (verifier_thread_) {
+ // verifier_thread_->add(std::bind(&VegasTransportProtocol::onContentSegment,
+ // this, std::move(content_object)));
+ } else {
+ onContentSegment(std::move(interest), std::move(content_object));
+ }
+ }
+
+ scheduleNextInterests();
+}
+
+bool VegasTransportProtocol::verifyContentObject(
+ const ContentObject &content_object) {
+ if (!dynamic_cast<ConsumerSocket *>(socket_)->verify_signature_) {
+ return true;
+ }
+
+ uint64_t segment = content_object.getName().getSuffix();
+
+ bool ret = false;
+
+ if (download_with_manifest_) {
+ auto it = suffix_hash_map_.find(segment);
+ if (it != suffix_hash_map_.end()) {
+ auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
+ auto data_packet_digest = content_object.computeDigest(it->second.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
+ manifest_digest_bytes.data(),
+ hash_type)) {
+ suffix_hash_map_.erase(it);
+ ret = true;
+ } else {
+ throw errors::RuntimeException(
+ "Verification failure policy has to be implemented.");
+ }
+ }
+ } else {
+ ret = static_cast<bool>(
+ dynamic_cast<ConsumerSocket *>(socket_)->verifier_.verify(
+ content_object));
+
+ if (!ret) {
+ throw errors::RuntimeException(
+ "Verification failure policy has to be implemented.");
+ }
+ }
+
+ return ret;
+ ;
+}
+
+void VegasTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ TRANSPORT_LOGW("Timeout on %s", interest->getName().toString().c_str());
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ uint64_t segment = interest->getName().getSuffix();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (is_final_block_number_discovered_) {
+ if (segment > final_block_number_) {
+ return;
+ }
+ }
+
+ if (socket_->on_interest_timeout_ != VOID_HANDLER) {
+ socket_->on_interest_timeout_(*socket_, *interest);
+ }
+
+ afterDataUnsatisfied(segment);
+
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask_] <
+ socket_->max_retransmissions_)) {
+ retx_count_++;
+
+ if (socket_->on_interest_retransmission_ != VOID_HANDLER) {
+ socket_->on_interest_retransmission_(*socket_, *interest);
+ }
+
+ if (socket_->on_interest_output_ != VOID_HANDLER) {
+ socket_->on_interest_output_(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ // retransmit
+ interests_in_flight_++;
+ interest_retransmissions_[segment & mask_]++;
+
+ using namespace std::placeholders;
+ portal_->sendInterest(std::move(interest));
+ } else {
+ TRANSPORT_LOGE("Stop: reached max retx limit.");
+ partialDownload();
+ stop();
+ }
+}
+
+void VegasTransportProtocol::copyContent(const ContentObject &content_object) {
+ Array a = content_object.getPayload();
+
+ content_buffer_->insert(content_buffer_->end(), (uint8_t *)a.data(),
+ (uint8_t *)a.data() + a.length());
+
+ bool download_completed =
+ is_final_block_number_discovered_ &&
+ content_object.getName().getSuffix() == final_block_number_;
+
+ if (TRANSPORT_EXPECT_FALSE(download_completed || !is_running_)) {
+ // asio::io_service& io_service = portal_->getIoService();
+ // io_service.post([this] () {
+ returnContentToUser();
+ // });
+ }
+}
+
+void VegasTransportProtocol::reassemble() {
+ uint64_t index = last_reassembled_segment_;
+ auto it = receive_buffer_.find(index);
+
+ do {
+ if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ copyContent(*it->second);
+ receive_buffer_.erase(it);
+ }
+
+ index = ++last_reassembled_segment_;
+ it = receive_buffer_.find(index);
+ } while (it != receive_buffer_.end());
+}
+
+void VegasTransportProtocol::partialDownload() {
+ if (!socket_->virtual_download_) {
+ reassemble();
+ }
+
+ if (socket_->on_payload_retrieved_ != VOID_HANDLER) {
+ socket_->on_payload_retrieved_(
+ *socket_, byte_count_,
+ std::make_error_code(std::errc(std::errc::io_error)));
+ }
+}
+
+// TODO Check vegas protocol
+// void VegasTransportProtocol::checkForFastRetransmission(const Interest
+// &interest) {
+// uint64_t segNumber = interest.getName().getSuffix();
+// received_segments_[segNumber] = true;
+// fast_retransmitted_segments.erase(segNumber);
+
+// uint64_t possibly_lost_segment = 0;
+// uint64_t highest_received_segment = received_segments_.rbegin()->first;
+
+// for (uint64_t i = 0; i <= highest_received_segment; i++) {
+// if (received_segments_.find(i) == received_segments_.end()) {
+// if (fast_retransmitted_segments.find(i) ==
+// fast_retransmitted_segments.end()) {
+// possibly_lost_segment = i;
+// uint8_t out_of_order_segments = 0;
+// for (uint64_t j = i; j <= highest_received_segment; j++) {
+// if (received_segments_.find(j) != received_segments_.end()) {
+// out_of_order_segments++;
+// if (out_of_order_segments >=
+// default_values::max_out_of_order_segments) {
+// fast_retransmitted_segments[possibly_lost_segment] = true;
+// fastRetransmit(interest, possibly_lost_segment);
+// }
+// }
+// }
+// }
+// }
+// }
+// }
+
+// void VegasTransportProtocol::fastRetransmit(const Interest &interest,
+// uint32_t chunk_number) {
+// if (interest_retransmissions_[chunk_number & mask_] <
+// socket_->max_retransmissions_) {
+// Name name = interest.getName();
+// name.setSuffix(chunk_number);
+
+// std::shared_ptr<Interest> retx_interest =
+// std::make_shared<Interest>(name);
+
+// if (socket_->on_interest_retransmission_ != VOID_HANDLER) {
+// socket_->on_interest_retransmission_(*socket_, *retx_interest);
+// }
+
+// if (socket_->on_interest_output_ != VOID_HANDLER) {
+// socket_->on_interest_output_(*socket_, *retx_interest);
+// }
+
+// if (!is_running_) {
+// return;
+// }
+
+// interests_in_flight_++;
+// interest_retransmissions_[chunk_number & mask_]++;
+
+// using namespace std::placeholders;
+// portal_->sendInterest(std::move(retx_interest));
+// }
+// }
+
+void VegasTransportProtocol::removeAllPendingInterests() { portal_->clear(); }
+
+} // end namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/vegas.h b/libtransport/src/hicn/transport/protocols/vegas.h
new file mode 100755
index 000000000..7791ffc94
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/vegas.h
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/vegas_rto_estimator.h>
+#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/ring_buffer.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#include <map>
+
+namespace transport {
+
+namespace protocol {
+
+typedef utils::CircularFifo<uint32_t, 1024 * 128> SuffixQueue;
+typedef std::chrono::time_point<std::chrono::steady_clock> Time;
+typedef std::chrono::milliseconds TimeDuration;
+
+class VegasTransportProtocol : public TransportProtocol {
+ public:
+ VegasTransportProtocol(interface::BaseSocket *icnet_socket);
+
+ virtual ~VegasTransportProtocol();
+
+ virtual void start(utils::SharableVector<uint8_t> &content_buffer) override;
+
+ void stop() override;
+
+ void resume() override;
+
+ protected:
+ void reset();
+
+ void sendInterest(std::uint64_t next_suffix);
+
+ void onContentSegment(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object);
+
+ bool verifyContentObject(const ContentObject &content_object);
+
+ bool verifyManifest(const interface::ContentObjectManifest &manifest);
+
+ virtual void onTimeout(Interest::Ptr &&interest) override;
+
+ void onManifest(std::unique_ptr<interface::ContentObjectManifest> &&manifest);
+
+ void onContentObject(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override;
+
+ virtual void changeInterestLifetime(uint64_t segment);
+
+ void scheduleNextInterests();
+
+ virtual void decreaseWindow();
+
+ virtual void increaseWindow();
+
+ virtual void afterContentReception(const Interest &interest,
+ const ContentObject &content_object);
+
+ virtual void afterDataUnsatisfied(uint64_t segment);
+
+ void reassemble();
+
+ void returnContentToUser();
+
+ void partialDownload();
+
+ virtual void copyContent(const ContentObject &content_object);
+
+ // virtual void checkForFastRetransmission(const Interest &interest);
+
+ // void fastRetransmit(const Interest &interest, uint32_t chunk_number);
+
+ void removeAllPendingInterests();
+
+ protected:
+ void handleTimeout(const std::error_code &ec);
+
+ // reassembly variables
+ volatile bool is_final_block_number_discovered_;
+ std::atomic<uint64_t> final_block_number_;
+ uint64_t last_reassembled_segment_;
+ std::shared_ptr<utils::SharableVector<uint8_t>> content_buffer_;
+ size_t content_buffer_size_;
+
+ // transmission variablesis_final_block_number_discovered_
+ double current_window_size_;
+ double pending_window_size_;
+ uint64_t interests_in_flight_;
+ uint64_t next_suffix_;
+ std::vector<std::uint32_t> interest_retransmissions_;
+ std::vector<std::chrono::steady_clock::time_point> interest_timepoints_;
+ RtoEstimator rtt_estimator_;
+
+ uint32_t retx_count_;
+
+ // buffers
+ std::unordered_map<std::uint32_t, ContentObject::Ptr>
+ receive_buffer_; // verified segments by segment number
+ std::unordered_map<std::uint32_t, ContentObject::Ptr>
+ unverified_segments_; // used with embedded manifests
+ std::unordered_map<std::uint32_t, ContentObject::Ptr>
+ verified_manifests_; // by segment number
+
+ std::uint16_t interest_pool_index_;
+ std::uint16_t mask_;
+
+ // suffix randomization: since the suffixes in the manifests could not be in a
+ // sequential order, we need to map those suffixes into an ordered sequence.
+ std::unordered_map<std::uint64_t, std::uint64_t>
+ incremental_suffix_to_real_suffix_map_;
+ std::unordered_map<std::uint64_t, std::uint64_t>
+ real_suffix_to_incremental_suffix_map_;
+ std::uint32_t incremental_suffix_index_;
+
+ // verification
+ std::unordered_map<uint32_t, std::pair<std::vector<uint8_t>, HashAlgorithm>>
+ suffix_hash_map_;
+
+ // Fast Retransmission
+ std::map<uint64_t, bool> received_segments_;
+ std::unordered_map<uint64_t, bool> fast_retransmitted_segments;
+
+ // Suffix queue
+ volatile bool suffix_queue_completed_;
+ SuffixQueue suffix_queue_;
+
+ volatile bool download_with_manifest_;
+ uint32_t next_manifest_;
+ std::atomic<uint16_t> next_manifest_interval_;
+
+ std::unique_ptr<utils::EventThread> verifier_thread_;
+
+ uint32_t interest_tx_;
+ uint32_t interest_count_;
+
+ uint64_t byte_count_;
+ double average_rtt_;
+
+ std::unordered_map<uint32_t, uint64_t> sign_time_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc
new file mode 100755
index 000000000..f5f797bbe
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/protocols/vegas_rto_estimator.h>
+
+#include <algorithm>
+#include <cmath>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+RtoEstimator::RtoEstimator(Duration min_rto)
+ : smoothed_rtt_(RtoEstimator::getInitialRtt().count()),
+ rtt_variation_(0),
+ first_measurement_(true),
+ last_rto_(min_rto.count()) {}
+
+void RtoEstimator::addMeasurement(Duration rtt) {
+ double duration = static_cast<double>(rtt.count());
+ if (first_measurement_) {
+ smoothed_rtt_ = duration;
+ rtt_variation_ = duration / 2;
+ first_measurement_ = false;
+ } else {
+ rtt_variation_ = (1 - default_values::beta) * rtt_variation_ +
+ default_values::beta * std::abs(smoothed_rtt_ - duration);
+ smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ +
+ default_values::alpha * duration;
+ }
+}
+
+RtoEstimator::Duration RtoEstimator::computeRto() const {
+ double rto = smoothed_rtt_ +
+ std::max(double(default_values::clock_granularity.count()),
+ default_values::k* rtt_variation_);
+ return Duration(static_cast<Duration::rep>(rto));
+}
+
+} // end namespace protocol
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h
new file mode 100755
index 000000000..e84afc49c
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+
+// Implementation inspired from RFC6298
+// (https://tools.ietf.org/search/rfc6298#ref-JK88)
+
+namespace transport {
+
+namespace protocol {
+
+class RtoEstimator {
+ public:
+ typedef std::chrono::microseconds Duration;
+
+ static Duration getInitialRtt() { return std::chrono::seconds(1); }
+
+ RtoEstimator(Duration min_rto = std::chrono::seconds(1));
+
+ void addMeasurement(Duration measure);
+
+ Duration computeRto() const;
+
+ private:
+ double smoothed_rtt_;
+ double rtt_variation_;
+ bool first_measurement_;
+ double last_rto_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport \ No newline at end of file