summaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols')
-rw-r--r--libtransport/src/protocols/CMakeLists.txt75
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc126
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.h54
-rw-r--r--libtransport/src/protocols/cbr.cc52
-rw-r--r--libtransport/src/protocols/cbr.h40
-rw-r--r--libtransport/src/protocols/congestion_window_protocol.h30
-rw-r--r--libtransport/src/protocols/consumer.conf21
-rw-r--r--libtransport/src/protocols/data_processing_events.h33
-rw-r--r--libtransport/src/protocols/datagram_reassembly.cc36
-rw-r--r--libtransport/src/protocols/datagram_reassembly.h39
-rw-r--r--libtransport/src/protocols/errors.cc60
-rw-r--r--libtransport/src/protocols/errors.h91
-rw-r--r--libtransport/src/protocols/incremental_indexer.cc53
-rw-r--r--libtransport/src/protocols/incremental_indexer.h143
-rw-r--r--libtransport/src/protocols/indexer.cc79
-rw-r--r--libtransport/src/protocols/indexer.h106
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.cc234
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.h92
-rw-r--r--libtransport/src/protocols/packet_manager.h65
-rw-r--r--libtransport/src/protocols/protocol.cc105
-rw-r--r--libtransport/src/protocols/protocol.h92
-rw-r--r--libtransport/src/protocols/raaqm.cc714
-rw-r--r--libtransport/src/protocols/raaqm.h142
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc158
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h226
-rw-r--r--libtransport/src/protocols/rate_estimation.cc356
-rw-r--r--libtransport/src/protocols/rate_estimation.h174
-rw-r--r--libtransport/src/protocols/reassembly.cc72
-rw-r--r--libtransport/src/protocols/reassembly.h67
-rw-r--r--libtransport/src/protocols/rtc.cc1017
-rw-r--r--libtransport/src/protocols/rtc.h227
-rw-r--r--libtransport/src/protocols/rtc_data_path.cc156
-rw-r--r--libtransport/src/protocols/rtc_data_path.h80
-rw-r--r--libtransport/src/protocols/test/CMakeLists.txt10
-rw-r--r--libtransport/src/protocols/test/test_transport_producer.cc80
-rw-r--r--libtransport/src/protocols/verification_manager.cc101
-rw-r--r--libtransport/src/protocols/verification_manager.h71
37 files changed, 5277 insertions, 0 deletions
diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt
new file mode 100644
index 000000000..3156d9ae9
--- /dev/null
+++ b/libtransport/src/protocols/CMakeLists.txt
@@ -0,0 +1,75 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
+)
+
+set(RAAQM_CONFIG_INSTALL_PREFIX
+${CMAKE_INSTALL_PREFIX}/etc/hicn
+)
+
+set(raaqm_config_path
+ ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf
+ PARENT_SCOPE
+)
+
+set(TRANSPORT_CONFIG
+ ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf
+)
+
+install(
+ FILES ${TRANSPORT_CONFIG}
+ DESTINATION ${CMAKE_INSTALL_FULL_SYSCONFDIR}/hicn
+ COMPONENT lib${LIBTRANSPORT}
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
new file mode 100644
index 000000000..c2996ebc1
--- /dev/null
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
+
+#include <implementation/socket_consumer.h>
+#include <protocols/byte_stream_reassembly.h>
+#include <protocols/errors.h>
+#include <protocols/indexer.h>
+#include <protocols/protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
+ByteStreamReassembly::ByteStreamReassembly(
+ implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : Reassembly(icn_socket, transport_protocol),
+ index_(IndexManager::invalid_index),
+ download_complete_(false) {}
+
+void ByteStreamReassembly::reassemble(
+ std::unique_ptr<ContentObjectManifest> &&manifest) {
+ if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
+ received_packets_.emplace(
+ std::make_pair(manifest->getName().getSuffix(), nullptr));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ received_packets_.emplace(std::make_pair(
+ content_object->getName().getSuffix(), std::move(content_object)));
+ assembleContent();
+ }
+}
+
+void ByteStreamReassembly::assembleContent() {
+ if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) {
+ index_ = index_manager_->getNextReassemblySegment();
+ if (index_ == IndexManager::invalid_index) {
+ return;
+ }
+ }
+
+ auto it = received_packets_.find((const unsigned int)index_);
+ while (it != received_packets_.end()) {
+ // Check if valid packet
+ if (it->second) {
+ copyContent(*it->second);
+ }
+
+ received_packets_.erase(it);
+ index_ = index_manager_->getNextReassemblySegment();
+ it = received_packets_.find((const unsigned int)index_);
+ }
+
+ if (!download_complete_ && index_ != IndexManager::invalid_index) {
+ transport_protocol_->onReassemblyFailed(index_);
+ }
+}
+
+void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+ auto a = content_object.getPayload();
+ auto payload_length = a->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
+
+ download_complete_ =
+ index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
+ notifyApplication();
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::success));
+ }
+}
+
+void ByteStreamReassembly::reInitialize() {
+ index_ = IndexManager::invalid_index;
+ download_complete_ = false;
+
+ received_packets_.clear();
+
+ // reset read buffer
+ ReadCallback *read_callback;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h
new file mode 100644
index 000000000..5e5c9ec6b
--- /dev/null
+++ b/libtransport/src/protocols/byte_stream_reassembly.h
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class ByteStreamReassembly : public Reassembly {
+ public:
+ ByteStreamReassembly(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ protected:
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
+
+ virtual void copyContent(const core::ContentObject &content_object);
+
+ virtual void reInitialize() override;
+
+ private:
+ void assembleContent();
+
+ protected:
+ // The consumer socket
+ // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
+ // std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
+ // IndexVerificationManager *index_manager_;
+ std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_;
+ uint32_t index_;
+ bool download_complete_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc
new file mode 100644
index 000000000..5df55bd5c
--- /dev/null
+++ b/libtransport/src/protocols/cbr.cc
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/socket_consumer.h>
+
+#include <protocols/cbr.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+CbrTransportProtocol::CbrTransportProtocol(
+ implementation::ConsumerSocket *icnet_socket)
+ : RaaqmTransportProtocol(icnet_socket) {}
+
+int CbrTransportProtocol::start() { return RaaqmTransportProtocol::start(); }
+
+void CbrTransportProtocol::reset() {
+ RaaqmTransportProtocol::reset();
+ socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+}
+
+void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {}
+
+void CbrTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ auto segment = content_object.getName().getSuffix();
+ auto now = utils::SteadyClock::now();
+ auto rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
+ // Update stats
+ updateStats(segment, rtt.count(), now);
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/cbr.h b/libtransport/src/protocols/cbr.h
new file mode 100644
index 000000000..20129f6a3
--- /dev/null
+++ b/libtransport/src/protocols/cbr.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/raaqm.h>
+
+namespace transport {
+
+namespace protocol {
+
+class CbrTransportProtocol : public RaaqmTransportProtocol {
+ public:
+ CbrTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ int start() override;
+
+ void reset() override;
+
+ private:
+ void afterContentReception(const Interest &interest,
+ const ContentObject &content_object) override;
+ void afterDataUnsatisfied(uint64_t segment) override;
+};
+
+} // end namespace protocol
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/protocols/congestion_window_protocol.h b/libtransport/src/protocols/congestion_window_protocol.h
new file mode 100644
index 000000000..36ac6eb17
--- /dev/null
+++ b/libtransport/src/protocols/congestion_window_protocol.h
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace transport {
+
+namespace protocol {
+
+class CWindowProtocol {
+ protected:
+ virtual void increaseWindow() = 0;
+ virtual void decreaseWindow() = 0;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf
new file mode 100644
index 000000000..1a366f32f
--- /dev/null
+++ b/libtransport/src/protocols/consumer.conf
@@ -0,0 +1,21 @@
+; this file contais the parameters for RAAQM
+autotune = no
+lifetime = 500
+retransmissions = 128
+beta = 0.99
+drop = 0.003
+beta_wifi_ = 0.99
+drop_wifi_ = 0.6
+beta_lte_ = 0.99
+drop_lte_ = 0.003
+wifi_delay_ = 200
+lte_delay_ = 9000
+
+alpha = 0.95
+batching_parameter = 200
+
+;Choice of rate estimator:
+;0 --> an estimation each $(batching_parameter) packets
+;1 --> an estimation "a la TCP", estimation at the end of the download of the segment
+
+rate_estimator = 0
diff --git a/libtransport/src/protocols/data_processing_events.h b/libtransport/src/protocols/data_processing_events.h
new file mode 100644
index 000000000..8975c2b4a
--- /dev/null
+++ b/libtransport/src/protocols/data_processing_events.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+namespace transport {
+namespace protocol {
+
+class ContentObjectProcessingEventCallback {
+ public:
+ virtual ~ContentObjectProcessingEventCallback() = default;
+ virtual void onPacketDropped(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+};
+
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc
new file mode 100644
index 000000000..abd7e984d
--- /dev/null
+++ b/libtransport/src/protocols/datagram_reassembly.cc
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/datagram_reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+DatagramReassembly::DatagramReassembly(
+ implementation::ConsumerSocket* icn_socket,
+ TransportProtocol* transport_protocol)
+ : Reassembly(icn_socket, transport_protocol) {}
+
+void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) {
+ read_buffer_ = content_object->getPayload();
+ Reassembly::notifyApplication();
+}
+
+void DatagramReassembly::reInitialize() {}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h
new file mode 100644
index 000000000..2427ae62f
--- /dev/null
+++ b/libtransport/src/protocols/datagram_reassembly.h
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+class DatagramReassembly : public Reassembly {
+ public:
+ DatagramReassembly(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol);
+
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+ virtual void reInitialize() override;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
+ return;
+ }
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/errors.cc b/libtransport/src/protocols/errors.cc
new file mode 100644
index 000000000..eefb6f957
--- /dev/null
+++ b/libtransport/src/protocols/errors.cc
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/errors.h>
+
+namespace transport {
+namespace protocol {
+
+const std::error_category& protocol_category() {
+ static protocol_category_impl instance;
+
+ return instance;
+}
+
+const char* protocol_category_impl::name() const throw() {
+ return "transport::protocol::error";
+}
+
+std::string protocol_category_impl::message(int ev) const {
+ switch (static_cast<protocol_error>(ev)) {
+ case protocol_error::success: {
+ return "Success";
+ }
+ case protocol_error::signature_verification_failed: {
+ return "Signature verification failed.";
+ }
+ case protocol_error::integrity_verification_failed: {
+ return "Integrity verification failed";
+ }
+ case protocol_error::no_verifier_provided: {
+ return "Transport cannot get any verifier for the given data.";
+ }
+ case protocol_error::io_error: {
+ return "Conectivity error between transport and local forwarder";
+ }
+ case protocol_error::max_retransmissions_error: {
+ return "Transport protocol reached max number of retransmissions allowed "
+ "for the same interest.";
+ }
+ case protocol_error::session_aborted: {
+ return "The session has been aborted by the application.";
+ }
+ default: { return "Unknown protocol error"; }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/protocols/errors.h b/libtransport/src/protocols/errors.h
new file mode 100644
index 000000000..cb3d3474e
--- /dev/null
+++ b/libtransport/src/protocols/errors.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+
+namespace transport {
+namespace protocol {
+
+/**
+ * @brief Get the default server error category.
+ * @return The default server error category instance.
+ *
+ * @warning The first call to this function is thread-safe only starting with
+ * C++11.
+ */
+const std::error_category& protocol_category();
+
+/**
+ * The list of errors.
+ */
+enum class protocol_error {
+ success = 0,
+ signature_verification_failed,
+ integrity_verification_failed,
+ no_verifier_provided,
+ io_error,
+ max_retransmissions_error,
+ session_aborted,
+};
+
+/**
+ * @brief Create an error_code instance for the given error.
+ * @param error The error.
+ * @return The error_code instance.
+ */
+inline std::error_code make_error_code(protocol_error error) {
+ return std::error_code(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief Create an error_condition instance for the given error.
+ * @param error The error.
+ * @return The error_condition instance.
+ */
+inline std::error_condition make_error_condition(protocol_error error) {
+ return std::error_condition(static_cast<int>(error), protocol_category());
+}
+
+/**
+ * @brief A server error category.
+ */
+class protocol_category_impl : public std::error_category {
+ public:
+ /**
+ * @brief Get the name of the category.
+ * @return The name of the category.
+ */
+ virtual const char* name() const throw();
+
+ /**
+ * @brief Get the error message for a given error.
+ * @param ev The error numeric value.
+ * @return The message associated to the error.
+ */
+ virtual std::string message(int ev) const;
+};
+} // namespace protocol
+} // namespace transport
+
+namespace std {
+// namespace system {
+template <>
+struct is_error_code_enum<::transport::protocol::protocol_error>
+ : public std::true_type {};
+// } // namespace system
+} // namespace std \ No newline at end of file
diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc
new file mode 100644
index 000000000..e590b4fee
--- /dev/null
+++ b/libtransport/src/protocols/incremental_indexer.cc
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/incremental_indexer.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <protocols/protocol.h>
+
+namespace transport {
+namespace protocol {
+
+void IncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ using namespace interface;
+
+ if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
+ final_suffix_ = content_object->getName().getSuffix();
+ }
+
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(interest),
+ std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/protocols/incremental_indexer.h b/libtransport/src/protocols/incremental_indexer.h
new file mode 100644
index 000000000..20c5e4759
--- /dev/null
+++ b/libtransport/src/protocols/incremental_indexer.h
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/errors/runtime_exception.h>
+#include <hicn/transport/errors/unexpected_manifest_exception.h>
+#include <hicn/transport/utils/literals.h>
+
+#include <protocols/indexer.h>
+#include <protocols/reassembly.h>
+#include <protocols/verification_manager.h>
+
+#include <deque>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+class Reassembly;
+class TransportProtocol;
+
+class IncrementalIndexer : public Indexer {
+ public:
+ IncrementalIndexer(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
+ : socket_(icn_socket),
+ reassembly_(reassembly),
+ transport_protocol_(transport),
+ final_suffix_(std::numeric_limits<uint32_t>::max()),
+ first_suffix_(0),
+ next_download_suffix_(0),
+ next_reassembly_suffix_(0),
+ verification_manager_(
+ std::make_unique<SignatureVerificationManager>(icn_socket)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
+
+ IncrementalIndexer(const IncrementalIndexer &) = delete;
+
+ IncrementalIndexer(IncrementalIndexer &&other)
+ : socket_(other.socket_),
+ reassembly_(other.reassembly_),
+ transport_protocol_(other.transport_protocol_),
+ final_suffix_(other.final_suffix_),
+ first_suffix_(other.first_suffix_),
+ next_download_suffix_(other.next_download_suffix_),
+ next_reassembly_suffix_(other.next_reassembly_suffix_),
+ verification_manager_(std::move(other.verification_manager_)) {
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
+
+ /**
+ *
+ */
+ virtual ~IncrementalIndexer() {}
+
+ TRANSPORT_ALWAYS_INLINE virtual void reset(
+ std::uint32_t offset = 0) override {
+ final_suffix_ = std::numeric_limits<uint32_t>::max();
+ next_download_suffix_ = offset;
+ next_reassembly_suffix_ = offset;
+ }
+
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override {
+ return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++
+ : IndexManager::invalid_index;
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix(
+ uint32_t suffix) override {
+ first_suffix_ = suffix;
+ }
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override {
+ return next_reassembly_suffix_ <= final_suffix_
+ ? next_reassembly_suffix_++
+ : IndexManager::invalid_index;
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override {
+ return final_suffix_ != std::numeric_limits<uint32_t>::max();
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override {
+ return final_suffix_;
+ }
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) {
+ reassembly_ = reassembly;
+
+ if (reassembly_) {
+ reassembly_->setIndexer(this);
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override {
+ return verification_manager_->onKeyToVerify();
+ }
+
+ protected:
+ implementation::ConsumerSocket *socket_;
+ Reassembly *reassembly_;
+ TransportProtocol *transport_protocol_;
+ uint32_t final_suffix_;
+ uint32_t first_suffix_;
+ uint32_t next_download_suffix_;
+ uint32_t next_reassembly_suffix_;
+ std::unique_ptr<VerificationManager> verification_manager_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/indexer.cc b/libtransport/src/protocols/indexer.cc
new file mode 100644
index 000000000..ca12330a6
--- /dev/null
+++ b/libtransport/src/protocols/indexer.cc
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <protocols/incremental_indexer.h>
+#include <protocols/indexer.h>
+#include <protocols/manifest_incremental_indexer.h>
+#include <protocols/protocol.h>
+
+namespace transport {
+namespace protocol {
+
+IndexManager::IndexManager(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly)
+ : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport,
+ reassembly)),
+ first_segment_received_(false),
+ icn_socket_(icn_socket),
+ transport_(transport),
+ reassembly_(reassembly) {}
+
+void IndexManager::onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) {
+ if (first_segment_received_) {
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ } else {
+ std::uint32_t segment_number = interest->getName().getSuffix();
+
+ if (segment_number == 0) {
+ // Check if manifest
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ IncrementalIndexer *indexer =
+ static_cast<IncrementalIndexer *>(indexer_.release());
+ indexer_ =
+ std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer));
+ delete indexer;
+ }
+
+ indexer_->onContentObject(std::move(interest), std::move(content_object));
+ auto it = interest_data_set_.begin();
+ while (it != interest_data_set_.end()) {
+ indexer_->onContentObject(
+ std::move(const_cast<core::Interest::Ptr &&>(it->first)),
+ std::move(const_cast<core::ContentObject::Ptr &&>(it->second)));
+ it = interest_data_set_.erase(it);
+ }
+
+ first_segment_received_ = true;
+ } else {
+ interest_data_set_.emplace(std::move(interest),
+ std::move(content_object));
+ }
+ }
+}
+
+bool IndexManager::onKeyToVerify() { return indexer_->onKeyToVerify(); }
+
+void IndexManager::reset(std::uint32_t offset) {
+ indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
+ reassembly_);
+ first_segment_received_ = false;
+ interest_data_set_.clear();
+}
+
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/indexer.h b/libtransport/src/protocols/indexer.h
new file mode 100644
index 000000000..8213a1503
--- /dev/null
+++ b/libtransport/src/protocols/indexer.h
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/interest.h>
+
+#include <set>
+
+namespace transport {
+
+namespace implementation {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+class Reassembly;
+class TransportProtocol;
+
+class Indexer {
+ public:
+ /**
+ *
+ */
+ virtual ~Indexer() = default;
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ virtual uint32_t getNextSuffix() = 0;
+
+ virtual void setFirstSuffix(uint32_t suffix) = 0;
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ virtual uint32_t getNextReassemblySegment() = 0;
+
+ virtual bool isFinalSuffixDiscovered() = 0;
+
+ virtual uint32_t getFinalSuffix() = 0;
+
+ virtual void reset(std::uint32_t offset = 0) = 0;
+
+ virtual void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) = 0;
+
+ virtual bool onKeyToVerify() = 0;
+};
+
+class IndexManager : Indexer {
+ public:
+ static constexpr uint32_t invalid_index = ~0;
+
+ IndexManager(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport, Reassembly *reassembly);
+
+ uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); }
+
+ void setFirstSuffix(uint32_t suffix) override {
+ indexer_->setFirstSuffix(suffix);
+ }
+
+ uint32_t getNextReassemblySegment() override {
+ return indexer_->getNextReassemblySegment();
+ }
+
+ bool isFinalSuffixDiscovered() override {
+ return indexer_->isFinalSuffixDiscovered();
+ }
+
+ uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); }
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ bool onKeyToVerify() override;
+
+ private:
+ std::unique_ptr<Indexer> indexer_;
+ bool first_segment_received_;
+ std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ interest_data_set_;
+ implementation::ConsumerSocket *icn_socket_;
+ TransportProtocol *transport_;
+ Reassembly *reassembly_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc
new file mode 100644
index 000000000..1a2f9dec3
--- /dev/null
+++ b/libtransport/src/protocols/manifest_incremental_indexer.cc
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/socket_consumer.h>
+
+#include <protocols/manifest_incremental_indexer.h>
+#include <protocols/protocol.h>
+
+#include <cmath>
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ManifestIncrementalIndexer::ManifestIncrementalIndexer(
+ implementation::ConsumerSocket *icn_socket, TransportProtocol *transport,
+ Reassembly *reassembly)
+ : IncrementalIndexer(icn_socket, transport, reassembly),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ NextSegmentCalculationStrategy::INCREMENTAL, next_download_suffix_,
+ 0)) {}
+
+void ManifestIncrementalIndexer::onContentObject(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ // Check if mainfiest or not
+ if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ onUntrustedManifest(std::move(interest), std::move(content_object));
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ onUntrustedContentObject(std::move(interest), std::move(content_object));
+ }
+}
+
+void ManifestIncrementalIndexer::onUntrustedManifest(
+ core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+ auto ret = verification_manager_->onPacketToVerify(*content_object);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ processTrustedManifest(std::move(content_object));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET:
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+}
+
+void ManifestIncrementalIndexer::processTrustedManifest(
+ ContentObject::Ptr &&content_object) {
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(*content_object));
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
+ core::ManifestVersion::VERSION_1)) {
+ throw errors::RuntimeException("Received manifest with unknown version.");
+ }
+
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = manifest->getSuffixList().end();
+
+ suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber());
+
+ for (; _it != _end; _it++) {
+ auto hash =
+ std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+
+ if (!checkUnverifiedSegments(_it->first, hash)) {
+ suffix_hash_map_[_it->first] = std::move(hash);
+ }
+ }
+
+ reassembly_->reassemble(std::move(manifest));
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+}
+
+bool ManifestIncrementalIndexer::checkUnverifiedSegments(
+ std::uint32_t suffix, const HashEntry &hash) {
+ auto it = unverified_segments_.find(suffix);
+
+ if (it != unverified_segments_.end()) {
+ auto ret = verifyContentObject(hash, *it->second.second);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ reassembly_->reassemble(std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(it->second.first),
+ std::move(it->second.second));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+
+ unverified_segments_.erase(it);
+ return true;
+ }
+
+ return false;
+}
+
+VerificationPolicy ManifestIncrementalIndexer::verifyContentObject(
+ const HashEntry &manifest_hash, const ContentObject &content_object) {
+ VerificationPolicy ret;
+
+ auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second);
+ auto data_packet_digest = content_object.computeDigest(manifest_hash.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(
+ data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ } else {
+ ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+ ret = (*verification_failed_callback)(
+ *socket_->getInterface(), content_object,
+ make_error_code(protocol_error::integrity_verification_failed));
+ }
+
+ return ret;
+}
+
+void ManifestIncrementalIndexer::onUntrustedContentObject(
+ Interest::Ptr &&i, ContentObject::Ptr &&c) {
+ auto suffix = c->getName().getSuffix();
+ auto it = suffix_hash_map_.find(suffix);
+
+ if (it != suffix_hash_map_.end()) {
+ auto ret = verifyContentObject(it->second, *c);
+
+ switch (ret) {
+ case VerificationPolicy::ACCEPT_PACKET: {
+ suffix_hash_map_.erase(it);
+ reassembly_->reassemble(std::move(c));
+ break;
+ }
+ case VerificationPolicy::DROP_PACKET: {
+ transport_protocol_->onPacketDropped(std::move(i), std::move(c));
+ break;
+ }
+ case VerificationPolicy::ABORT_SESSION: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ }
+ } else {
+ unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c));
+ }
+}
+
+uint32_t ManifestIncrementalIndexer::getNextSuffix() {
+ auto ret = suffix_strategy_->getNextSuffix();
+
+ if (ret <= suffix_strategy_->getFinalSuffix() &&
+ ret != utils::SuffixStrategy::INVALID_SUFFIX) {
+ suffix_queue_.push(ret);
+ return ret;
+ }
+
+ return IndexManager::invalid_index;
+}
+
+uint32_t ManifestIncrementalIndexer::getFinalSuffix() {
+ return suffix_strategy_->getFinalSuffix();
+}
+
+bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() {
+ return IncrementalIndexer::isFinalSuffixDiscovered();
+}
+
+uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() {
+ if (suffix_queue_.empty()) {
+ return IndexManager::invalid_index;
+ }
+
+ auto ret = suffix_queue_.front();
+ suffix_queue_.pop();
+ return ret;
+}
+
+void ManifestIncrementalIndexer::reset(std::uint32_t offset) {
+ IncrementalIndexer::reset(offset);
+ suffix_hash_map_.clear();
+ unverified_segments_.clear();
+ SuffixQueue empty;
+ std::swap(suffix_queue_, empty);
+ suffix_strategy_->reset(offset);
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.h b/libtransport/src/protocols/manifest_incremental_indexer.h
new file mode 100644
index 000000000..88ae1720b
--- /dev/null
+++ b/libtransport/src/protocols/manifest_incremental_indexer.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <implementation/socket.h>
+#include <protocols/incremental_indexer.h>
+#include <utils/suffix_strategy.h>
+
+#include <list>
+
+namespace transport {
+
+namespace protocol {
+
+class ManifestIncrementalIndexer : public IncrementalIndexer {
+ static constexpr double alpha = 0.3;
+
+ public:
+ using SuffixQueue = std::queue<uint32_t>;
+ using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
+
+ ManifestIncrementalIndexer(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport,
+ Reassembly *reassembly);
+
+ ManifestIncrementalIndexer(IncrementalIndexer &&indexer)
+ : IncrementalIndexer(std::move(indexer)),
+ suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
+ core::NextSegmentCalculationStrategy::INCREMENTAL,
+ next_download_suffix_, 0)) {
+ for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) {
+ suffix_queue_.push(i);
+ }
+ }
+
+ virtual ~ManifestIncrementalIndexer() = default;
+
+ void reset(std::uint32_t offset = 0) override;
+
+ void onContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object) override;
+
+ uint32_t getNextSuffix() override;
+
+ uint32_t getNextReassemblySegment() override;
+
+ bool isFinalSuffixDiscovered() override;
+
+ uint32_t getFinalSuffix() override;
+
+ private:
+ void onUntrustedManifest(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void onUntrustedContentObject(core::Interest::Ptr &&interest,
+ core::ContentObject::Ptr &&content_object);
+ void processTrustedManifest(core::ContentObject::Ptr &&content_object);
+ void onManifestReceived(core::Interest::Ptr &&i,
+ core::ContentObject::Ptr &&c);
+ void onManifestTimeout(core::Interest::Ptr &&i);
+ VerificationPolicy verifyContentObject(
+ const HashEntry &manifest_hash,
+ const core::ContentObject &content_object);
+ bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash);
+
+ protected:
+ std::unique_ptr<utils::SuffixStrategy> suffix_strategy_;
+ SuffixQueue suffix_queue_;
+
+ // Hash verification
+ std::unordered_map<uint32_t, HashEntry> suffix_hash_map_;
+
+ std::unordered_map<uint32_t,
+ std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
+ unverified_segments_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/packet_manager.h b/libtransport/src/protocols/packet_manager.h
new file mode 100644
index 000000000..a552607ea
--- /dev/null
+++ b/libtransport/src/protocols/packet_manager.h
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/object_pool.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+template <typename PacketType, std::size_t packet_pool_size = 4096>
+class PacketManager {
+ static_assert(std::is_base_of<Packet, PacketType>::value,
+ "The packet manager support just Interest and Data.");
+
+ public:
+ PacketManager(std::size_t size = packet_pool_size) : size_(0) {
+ // Create pool of interests
+ increasePoolSize(size);
+ }
+
+ TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) {
+ for (std::size_t i = 0; i < size; i++) {
+ interest_pool_.add(new PacketType());
+ }
+
+ size_ += size;
+ }
+
+ TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() {
+ auto result = interest_pool_.get();
+
+ while (TRANSPORT_EXPECT_FALSE(!result.first)) {
+ // Add packets to the pool
+ increasePoolSize(size_);
+ result = interest_pool_.get();
+ }
+
+ result.second->resetPayload();
+ return std::move(result.second);
+ }
+
+ private:
+ utils::ObjectPool<PacketType> interest_pool_;
+ std::size_t size_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
new file mode 100644
index 000000000..aa290bef8
--- /dev/null
+++ b/libtransport/src/protocols/protocol.cc
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+#include <implementation/socket_consumer.h>
+#include <protocols/protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol)
+ : socket_(icn_socket),
+ reassembly_protocol_(reassembly_protocol),
+ index_manager_(
+ std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
+ is_running_(false),
+ is_first_(false) {
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+ socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
+}
+
+int TransportProtocol::start() {
+ // If the protocol is already running, return otherwise set as running
+ if (is_running_) return -1;
+
+ // Reset the protocol state machine
+ reset();
+
+ // Set it is the first time we schedule an interest
+ is_first_ = true;
+
+ // Schedule next interests
+ scheduleNextInterests();
+
+ is_first_ = false;
+
+ // Set the protocol as running
+ is_running_ = true;
+
+ // Start Event loop
+ portal_->runEventsLoop();
+
+ // Not running anymore
+ is_running_ = false;
+
+ return 0;
+}
+
+void TransportProtocol::stop() {
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void TransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ scheduleNextInterests();
+
+ portal_->runEventsLoop();
+
+ is_running_ = false;
+}
+
+void TransportProtocol::onContentReassembled(std::error_code ec) {
+ interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (!on_payload) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_->getBytesRecv());
+ } else {
+ on_payload->readError(ec);
+ }
+
+ stop();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
new file mode 100644
index 000000000..c094adaae
--- /dev/null
+++ b/libtransport/src/protocols/protocol.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+
+#include <hicn/transport/interfaces/statistics.h>
+#include <hicn/transport/utils/object_pool.h>
+
+#include <implementation/socket.h>
+#include <protocols/data_processing_events.h>
+#include <protocols/indexer.h>
+#include <protocols/packet_manager.h>
+#include <protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class IndexVerificationManager;
+
+class TransportProtocolCallback {
+ virtual void onContentObject(const core::Interest &interest,
+ const core::ContentObject &content_object) = 0;
+ virtual void onTimeout(const core::Interest &interest) = 0;
+};
+
+class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
+ public PacketManager<Interest>,
+ public ContentObjectProcessingEventCallback {
+ static constexpr std::size_t interest_pool_size = 4096;
+
+ friend class ManifestIndexManager;
+
+ public:
+ TransportProtocol(implementation::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol);
+
+ virtual ~TransportProtocol() = default;
+
+ TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
+
+ virtual int start();
+
+ virtual void stop();
+
+ virtual void resume();
+
+ virtual bool verifyKeyPackets() = 0;
+
+ virtual void scheduleNextInterests() = 0;
+
+ // Events generated by the indexing
+ virtual void onContentReassembled(std::error_code ec);
+ virtual void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
+
+ protected:
+ // Consumer Callback
+ virtual void reset() = 0;
+ virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0;
+ virtual void onTimeout(Interest::Ptr &&i) = 0;
+
+ protected:
+ implementation::ConsumerSocket *socket_;
+ std::unique_ptr<Reassembly> reassembly_protocol_;
+ std::unique_ptr<IndexManager> index_manager_;
+ std::shared_ptr<implementation::BasePortal> portal_;
+ std::atomic<bool> is_running_;
+ // True if it si the first time we schedule an interest
+ std::atomic<bool> is_first_;
+ interface::TransportStatistics *stats_;
+};
+
+} // end namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
new file mode 100644
index 000000000..8a38f8ccf
--- /dev/null
+++ b/libtransport/src/protocols/raaqm.cc
@@ -0,0 +1,714 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+#include <implementation/socket_consumer.h>
+#include <protocols/errors.h>
+#include <protocols/indexer.h>
+#include <protocols/raaqm.h>
+
+#include <cstdlib>
+#include <fstream>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+RaaqmTransportProtocol::RaaqmTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
+ current_window_size_(1),
+ interests_in_flight_(0),
+ cur_path_(nullptr),
+ t0_(utils::SteadyClock::now()),
+ rate_estimator_(nullptr) {
+ init();
+}
+
+RaaqmTransportProtocol::~RaaqmTransportProtocol() {
+ if (rate_estimator_) {
+ delete rate_estimator_;
+ }
+}
+
+int RaaqmTransportProtocol::start() {
+ if (rate_estimator_) {
+ rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+ // RAAQM
+ double drop_factor;
+ double minimum_drop_probability;
+ uint32_t sample_number;
+ uint32_t interest_lifetime;
+
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor);
+ socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY,
+ minimum_drop_probability);
+ socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER,
+ sample_number);
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+
+ // Rate Estimation
+ double alpha = 0.0;
+ uint32_t batching_param = 0;
+ uint32_t choice_param = 0;
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ alpha);
+ socket_->getSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
+
+ if (choice_param == 1) {
+ rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
+ &rate_estimator_->observer_);
+
+ // Current path
+ auto cur_path = std::make_unique<RaaqmDataPath>(
+ drop_factor, minimum_drop_probability, interest_lifetime * 1000,
+ sample_number);
+ cur_path_ = cur_path.get();
+ path_table_[default_values::path_id] = std::move(cur_path);
+ }
+
+ portal_->setConsumerCallback(this);
+ return TransportProtocol::start();
+}
+
+void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
+
+void RaaqmTransportProtocol::reset() {
+ // Set first segment to retrieve
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ index_manager_->reset();
+ index_manager_->setFirstSuffix(name->getSuffix());
+ std::queue<Interest::Ptr> empty;
+ std::swap(interest_to_retransmit_, empty);
+ stats_->reset();
+
+ // Reset reassembly component
+ reassembly_protocol_->reInitialize();
+
+ // Reset protocol variables
+ interests_in_flight_ = 0;
+ t0_ = utils::SteadyClock::now();
+}
+
+bool RaaqmTransportProtocol::verifyKeyPackets() {
+ return index_manager_->onKeyToVerify();
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ // return;
+ double max_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
+ max_window_size);
+ if (current_window_size_ < max_window_size) {
+ double gamma = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ // return;
+ double min_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
+ min_window_size);
+ if (current_window_size_ > min_window_size) {
+ double beta = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().getSuffix());
+ rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ (int)content_object.headerSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
+void RaaqmTransportProtocol::init() {
+ std::ifstream is(RAAQM_CONFIG_PATH);
+
+ std::string line;
+ raaqm_autotune_ = false;
+ default_beta_ = default_values::beta_value;
+ default_drop_ = default_values::drop_factor;
+ beta_wifi_ = default_values::beta_value;
+ drop_wifi_ = default_values::drop_factor;
+ beta_lte_ = default_values::beta_value;
+ drop_lte_ = default_values::drop_factor;
+ wifi_delay_ = 1000;
+ lte_delay_ = 15000;
+
+ if (!is) {
+ TRANSPORT_LOGW(
+ "WARNING: RAAQM parameters not found at %s, set default values",
+ RAAQM_CONFIG_PATH);
+ return;
+ }
+
+ while (getline(is, line)) {
+ std::string command;
+ std::istringstream line_s(line);
+
+ line_s >> command;
+
+ if (command == ";") {
+ continue;
+ }
+
+ if (command == "autotune") {
+ std::string tmp;
+ std::string val;
+ line_s >> tmp >> val;
+ if (val == "yes") {
+ raaqm_autotune_ = true;
+ } else {
+ raaqm_autotune_ = false;
+ }
+ continue;
+ }
+
+ if (command == "lifetime") {
+ std::string tmp;
+ uint32_t lifetime;
+ line_s >> tmp >> lifetime;
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ continue;
+ }
+
+ if (command == "retransmissions") {
+ std::string tmp;
+ uint32_t rtx;
+ line_s >> tmp >> rtx;
+ socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx);
+ continue;
+ }
+
+ if (command == "beta") {
+ std::string tmp;
+ line_s >> tmp >> default_beta_;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ default_beta_);
+ continue;
+ }
+
+ if (command == "drop") {
+ std::string tmp;
+ line_s >> tmp >> default_drop_;
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
+ default_drop_);
+ continue;
+ }
+
+ if (command == "beta_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> beta_wifi_;
+ continue;
+ }
+
+ if (command == "drop_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> drop_wifi_;
+ continue;
+ }
+
+ if (command == "beta_lte_") {
+ std::string tmp;
+ line_s >> tmp >> beta_lte_;
+ continue;
+ }
+
+ if (command == "drop_lte_") {
+ std::string tmp;
+ line_s >> tmp >> drop_lte_;
+ continue;
+ }
+
+ if (command == "wifi_delay_") {
+ std::string tmp;
+ line_s >> tmp >> wifi_delay_;
+ continue;
+ }
+
+ if (command == "lte_delay_") {
+ std::string tmp;
+ line_s >> tmp >> lte_delay_;
+ continue;
+ }
+ if (command == "alpha") {
+ std::string tmp;
+ double rate_alpha = 0.0;
+ line_s >> tmp >> rate_alpha;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ rate_alpha);
+ continue;
+ }
+
+ if (command == "batching_parameter") {
+ std::string tmp;
+ uint32_t batching_param = 0;
+ line_s >> tmp >> batching_param;
+ socket_->setSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER,
+ batching_param);
+ continue;
+ }
+
+ if (command == "rate_estimator") {
+ std::string tmp;
+ uint32_t choice_param = 0;
+ line_s >> tmp >> choice_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
+ continue;
+ }
+ }
+
+ is.close();
+}
+
+void RaaqmTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ // Check whether makes sense to continue
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ // Call application-defined callbacks
+ ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &callback_content_object);
+ if (*callback_content_object) {
+ (*callback_content_object)(*socket_->getInterface(), *content_object);
+ }
+
+ ConsumerInterestCallback *callback_interest = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &callback_interest);
+ if (*callback_interest) {
+ (*callback_interest)(*socket_->getInterface(), *interest);
+ }
+
+ if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ stats_->updateBytesRecv(content_object->payloadSize());
+ }
+
+ onContentSegment(std::move(interest), std::move(content_object));
+ scheduleNextInterests();
+}
+
+void RaaqmTransportProtocol::onContentSegment(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+
+ // Decrease in-flight interests
+ interests_in_flight_--;
+
+ // Update stats
+ if (!interest_retransmissions_[incremental_suffix & mask]) {
+ afterContentReception(*interest, *content_object);
+ }
+
+ index_manager_->onContentObject(std::move(interest),
+ std::move(content_object));
+}
+
+void RaaqmTransportProtocol::onPacketDropped(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
+
+ uint64_t segment = interest->getName().getSuffix();
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_->updateRetxCount(1);
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_->getInterface(), *interest);
+ }
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_->getInterface(), *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
+ } else {
+ TRANSPORT_LOGE(
+ "Stop: received not trusted packet %llu times",
+ (unsigned long long)interest_retransmissions_[segment & mask]);
+ onContentReassembled(
+ make_error_code(protocol_error::max_retransmissions_error));
+ }
+}
+
+void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
+
+}
+
+void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ checkForStalePaths();
+
+ const Name &n = interest->getName();
+
+ TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str());
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ uint64_t segment = n.getSuffix();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (segment > index_manager_->getFinalSuffix()) {
+ return;
+ }
+
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_->getInterface(), *interest);
+ }
+
+ afterDataUnsatisfied(segment);
+
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
+
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_->updateRetxCount(1);
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_->getInterface(), *interest);
+ }
+
+ callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ (*callback)(*socket_->getInterface(), *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
+
+ scheduleNextInterests();
+ } else {
+ TRANSPORT_LOGE("Stop: reached max retx limit.");
+ onContentReassembled(std::make_error_code(std::errc(std::errc::io_error)));
+ }
+}
+
+void RaaqmTransportProtocol::scheduleNextInterests() {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ &&
+ interest_to_retransmit_.size() > 0)) {
+ // send at least one interest if there are retransmissions to perform and
+ // there is no space left in the window
+ sendInterest(std::move(interest_to_retransmit_.front()));
+ TRANSPORT_LOGD("Window full, retransmit one content interest");
+ interest_to_retransmit_.pop();
+ }
+
+ uint32_t index = IndexManager::invalid_index;
+
+ // Send the interest needed for filling the window
+ while (interests_in_flight_ < current_window_size_) {
+ if (interest_to_retransmit_.size() > 0) {
+ sendInterest(std::move(interest_to_retransmit_.front()));
+ TRANSPORT_LOGD("Retransmit content interest");
+ interest_to_retransmit_.pop();
+ } else {
+ index = index_manager_->getNextSuffix();
+ if (index == IndexManager::invalid_index) {
+ break;
+ }
+
+ sendInterest(index);
+ TRANSPORT_LOGD("Send content interest %u", index);
+ }
+ }
+}
+
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = getPacket();
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ name->setSuffix((uint32_t)next_suffix);
+ interest->setName(*name);
+
+ uint32_t interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+ interest->setLifetime(interest_lifetime);
+
+ ConsumerInterestCallback *callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback) {
+ callback->operator()(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ // This is set to ~0 so that the next interest_retransmissions_ + 1,
+ // performed by sendInterest, will result in 0
+ interest_retransmissions_[next_suffix & mask] = ~0;
+ interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
+ sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
+ interests_in_flight_++;
+ interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
+ rate_estimator_->onDownloadFinished();
+ TransportProtocol::onContentReassembled(ec);
+}
+
+void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
+ if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
+ throw std::runtime_error("RAAQM ERROR: no current path found, exit");
+ } else {
+ auto now = utils::SteadyClock::now();
+ utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
+
+ // Update stats
+ updateStats((uint32_t)segment, rtt.count(), now);
+
+ if (rate_estimator_) {
+ rate_estimator_->onRttUpdate((double)rtt.count());
+ }
+
+ cur_path_->insertNewRtt(rtt.count());
+ cur_path_->smoothTimer();
+
+ if (cur_path_->newPropagationDelayAvailable()) {
+ checkDropProbability();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ throw errors::RuntimeException("ERROR: no current path found, exit");
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ double coin = ((double)rand() / (RAND_MAX));
+ if (coin <= cur_path_->getDropProb()) {
+ decreaseWindow();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now) {
+ // Update RTT statistics
+ stats_->updateAverageRtt(rtt);
+ stats_->updateAverageWindowSize(current_window_size_);
+
+ // Call statistics callback
+ ConsumerTimerCallback *stats_callback = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback) {
+ auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
+
+ uint32_t timer_interval_milliseconds = 0;
+ socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ timer_interval_milliseconds);
+ if (dt.count() > timer_interval_milliseconds) {
+ (*stats_callback)(*socket_->getInterface(), *stats_);
+ t0_ = utils::SteadyClock::now();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updatePathTable(
+ const ContentObject &content_object) {
+ uint32_t path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
+ // Create a new path with some default param
+
+ if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
+ throw errors::RuntimeException(
+ "[RAAQM] No path initialized for path table, error could be in "
+ "default path initialization.");
+ }
+
+ // Initiate the new path default param
+ auto new_path = std::make_unique<RaaqmDataPath>(
+ *(path_table_.at(default_values::path_id)));
+
+ // Insert the new path into hash table
+ path_table_[path_id] = std::move(new_path);
+ } else {
+ throw errors::RuntimeException(
+ "UNEXPECTED ERROR: when running,current path not found.");
+ }
+ }
+
+ cur_path_ = path_table_[path_id].get();
+
+ size_t header_size = content_object.headerSize();
+ size_t data_size = content_object.payloadSize();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(header_size + data_size, data_size);
+}
+
+void RaaqmTransportProtocol::checkDropProbability() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ unsigned int max_pd = 0;
+ PathTable::iterator it;
+ for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->getPropagationDelay() > max_pd &&
+ it->second->getPropagationDelay() != UINT_MAX &&
+ !it->second->isStale()) {
+ max_pd = it->second->getPropagationDelay();
+ }
+ }
+
+ double drop_prob = 0;
+ double beta = 0;
+ if (max_pd < wifi_delay_) { // only ethernet paths
+ drop_prob = default_drop_;
+ beta = default_beta_;
+ } else if (max_pd < lte_delay_) { // at least one wifi path
+ drop_prob = drop_wifi_;
+ beta = beta_wifi_;
+ } else { // at least one lte path
+ drop_prob = drop_lte_;
+ beta = beta_lte_;
+ }
+
+ double old_drop_prob = 0;
+ double old_beta = 0;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob);
+
+ if (drop_prob == old_drop_prob && beta == old_beta) {
+ return;
+ }
+
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
+
+ for (it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->setDropProb(drop_prob);
+ }
+}
+
+void RaaqmTransportProtocol::checkForStalePaths() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ bool stale = false;
+ PathTable::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->isStale()) {
+ stale = true;
+ break;
+ }
+ }
+ if (stale) {
+ checkDropProbability();
+ }
+}
+
+} // end namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
new file mode 100644
index 000000000..412967770
--- /dev/null
+++ b/libtransport/src/protocols/raaqm.h
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/chrono_typedefs.h>
+
+#include <protocols/byte_stream_reassembly.h>
+#include <protocols/congestion_window_protocol.h>
+#include <protocols/protocol.h>
+#include <protocols/raaqm_data_path.h>
+#include <protocols/rate_estimation.h>
+
+#include <queue>
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+class RaaqmTransportProtocol : public TransportProtocol,
+ public CWindowProtocol {
+ public:
+ RaaqmTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RaaqmTransportProtocol();
+
+ int start() override;
+
+ void resume() override;
+
+ void reset() override;
+
+ virtual bool verifyKeyPackets() override;
+
+ protected:
+ static constexpr uint32_t buffer_size =
+ 1 << interface::default_values::log_2_default_buffer_size;
+ static constexpr uint16_t mask = buffer_size - 1;
+ using PathTable =
+ std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>;
+
+ void increaseWindow() override;
+ void decreaseWindow() override;
+
+ virtual void afterContentReception(const Interest &interest,
+ const ContentObject &content_object);
+ virtual void afterDataUnsatisfied(uint64_t segment);
+
+ virtual void updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now);
+
+ private:
+ void init();
+
+ void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override;
+
+ void onContentSegment(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object);
+
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override;
+
+ void onReassemblyFailed(std::uint32_t missing_segment) override;
+
+ void onTimeout(Interest::Ptr &&i) override;
+
+ virtual void scheduleNextInterests() override;
+
+ void sendInterest(std::uint64_t next_suffix);
+
+ void sendInterest(Interest::Ptr &&interest);
+
+ void onContentReassembled(std::error_code ec) override;
+
+ void updateRtt(uint64_t segment);
+
+ void RAAQM();
+
+ void updatePathTable(const ContentObject &content_object);
+
+ void checkDropProbability();
+
+ void checkForStalePaths();
+
+ void printRtt();
+
+ protected:
+ // Congestion window management
+ double current_window_size_;
+ // Protocol management
+ uint64_t interests_in_flight_;
+ std::array<std::uint32_t, buffer_size> interest_retransmissions_;
+ std::array<utils::TimePoint, buffer_size> interest_timepoints_;
+ std::queue<Interest::Ptr> interest_to_retransmit_;
+
+ private:
+ /**
+ * Current download path
+ */
+ RaaqmDataPath *cur_path_;
+
+ /**
+ * Hash table for path: each entry is a pair path ID(key) - path object
+ */
+ PathTable path_table_;
+
+ // TimePoints for statistic
+ utils::TimePoint t0_;
+
+ bool set_interest_filter_;
+
+ // for rate-estimation at packet level
+ IcnRateEstimator *rate_estimator_;
+
+ // params for autotuning
+ bool raaqm_autotune_;
+ double default_beta_;
+ double default_drop_;
+ double beta_wifi_;
+ double drop_wifi_;
+ double beta_lte_;
+ double drop_lte_;
+ unsigned int wifi_delay_;
+ unsigned int lte_delay_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
new file mode 100644
index 000000000..439549c85
--- /dev/null
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/chrono_typedefs.h>
+
+#include <protocols/raaqm_data_path.h>
+
+namespace transport {
+
+namespace protocol {
+
+RaaqmDataPath::RaaqmDataPath(double drop_factor,
+ double minimum_drop_probability,
+ unsigned new_timer, unsigned int samples,
+ uint64_t new_rtt, uint64_t new_rtt_min,
+ uint64_t new_rtt_max, unsigned new_pd)
+
+ : drop_factor_(drop_factor),
+ minimum_drop_probability_(minimum_drop_probability),
+ timer_(new_timer),
+ samples_(samples),
+ rtt_(new_rtt),
+ rtt_min_(new_rtt_min),
+ rtt_max_(new_rtt_max),
+ prop_delay_(new_pd),
+ new_prop_delay_(false),
+ drop_prob_(0),
+ packets_received_(0),
+ last_packets_received_(0),
+ m_packets_bytes_received_(0),
+ last_packets_bytes_received_(0),
+ raw_data_bytes_received_(0),
+ last_raw_data_bytes_received_(0),
+ rtt_samples_(samples_),
+ last_received_pkt_(utils::SteadyClock::now()),
+ average_rtt_(0),
+ alpha_(ALPHA) {}
+
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+ rtt_ = new_rtt;
+ rtt_samples_.pushBack(new_rtt);
+
+ rtt_max_ = rtt_samples_.rBegin();
+ rtt_min_ = rtt_samples_.begin();
+
+ if (rtt_min_ < prop_delay_) {
+ new_prop_delay_ = true;
+ prop_delay_ = rtt_min_;
+ }
+
+ last_received_pkt_ = utils::SteadyClock::now();
+
+ return *this;
+}
+
+RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size) {
+ packets_received_++;
+ m_packets_bytes_received_ += packet_size;
+ raw_data_bytes_received_ += data_size;
+
+ return *this;
+}
+
+double RaaqmDataPath::getDropFactor() { return drop_factor_; }
+
+double RaaqmDataPath::getDropProb() { return drop_prob_; }
+
+RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) {
+ drop_prob_ = dropProb;
+
+ return *this;
+}
+
+double RaaqmDataPath::getMinimumDropProbability() {
+ return minimum_drop_probability_;
+}
+
+double RaaqmDataPath::getTimer() { return timer_; }
+
+RaaqmDataPath &RaaqmDataPath::smoothTimer() {
+ timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ +
+ (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO);
+
+ return *this;
+}
+
+double RaaqmDataPath::getRtt() { return (double)rtt_; }
+
+double RaaqmDataPath::getAverageRtt() { return average_rtt_; }
+
+double RaaqmDataPath::getRttMax() { return (double)rtt_max_; }
+
+double RaaqmDataPath::getRttMin() { return (double)rtt_min_; }
+
+unsigned RaaqmDataPath::getSampleValue() { return samples_; }
+
+unsigned RaaqmDataPath::getRttQueueSize() {
+ return static_cast<unsigned>(rtt_samples_.size());
+}
+
+RaaqmDataPath &RaaqmDataPath::updateDropProb() {
+ drop_prob_ = 0.0;
+
+ if (getSampleValue() == getRttQueueSize()) {
+ if (rtt_max_ == rtt_min_) {
+ drop_prob_ = minimum_drop_probability_;
+ } else {
+ drop_prob_ = minimum_drop_probability_ +
+ drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_);
+ }
+ }
+
+ return *this;
+}
+
+void RaaqmDataPath::setAlpha(double alpha) {
+ if (alpha >= 0 && alpha <= 1) {
+ alpha_ = alpha;
+ }
+}
+
+bool RaaqmDataPath::newPropagationDelayAvailable() {
+ bool r = new_prop_delay_;
+ new_prop_delay_ = false;
+ return r;
+}
+
+unsigned int RaaqmDataPath::getPropagationDelay() {
+ return (unsigned int)prop_delay_;
+}
+
+bool RaaqmDataPath::isStale() {
+ utils::TimePoint now = utils::SteadyClock::now();
+ auto time =
+ std::chrono::duration_cast<utils::Microseconds>(now - last_received_pkt_)
+ .count();
+ if (time > 2000000) {
+ return true;
+ }
+ return false;
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
new file mode 100644
index 000000000..6f2afde72
--- /dev/null
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -0,0 +1,226 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/chrono_typedefs.h>
+
+#include <utils/min_filter.h>
+
+#include <chrono>
+#include <climits>
+#include <iostream>
+
+#define TIMEOUT_SMOOTHER 0.1
+#define TIMEOUT_RATIO 10
+#define ALPHA 0.8
+
+namespace transport {
+
+namespace protocol {
+
+class RaaqmDataPath {
+ public:
+ RaaqmDataPath(double drop_factor, double minimum_drop_probability,
+ unsigned new_timer, unsigned int samples,
+ uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000,
+ uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX);
+
+ public:
+ /*
+ * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is
+ * full, and thus need overwrite. Also it maintains the validity of min and
+ * max of RTT.
+ * @param new_rtt is the value of the new RTT
+ */
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+
+ /**
+ * @brief Update the path statistics
+ * @param packet_size the size of the packet received, including the ICN
+ * header
+ * @param data_size the size of the data received, without the ICN header
+ */
+ RaaqmDataPath &updateReceivedStats(std::size_t packet_size,
+ std::size_t data_size);
+
+ /**
+ * @brief Get the value of the drop factor parameter
+ */
+ double getDropFactor();
+
+ /**
+ * @brief Get the value of the drop probability
+ */
+ double getDropProb();
+
+ /**
+ * @brief Set the value pf the drop probability
+ * @param drop_prob is the value of the drop probability
+ */
+ RaaqmDataPath &setDropProb(double drop_prob);
+
+ /**
+ * @brief Get the minimum drop probability
+ */
+ double getMinimumDropProbability();
+
+ /**
+ * @brief Get last RTT
+ */
+ double getRtt();
+
+ /**
+ * @brief Get average RTT
+ */
+ double getAverageRtt();
+
+ /**
+ * @brief Get the current m_timer value
+ */
+ double getTimer();
+
+ /**
+ * @brief Smooth he value of the m_timer accordingly with the last RTT
+ * measured
+ */
+ RaaqmDataPath &smoothTimer();
+
+ /**
+ * @brief Get the maximum RTT among the last samples
+ */
+ double getRttMax();
+
+ /**
+ * @brief Get the minimum RTT among the last samples
+ */
+ double getRttMin();
+
+ /**
+ * @brief Get the number of saved samples
+ */
+ unsigned getSampleValue();
+
+ /**
+ * @brief Get the size og the RTT queue
+ */
+ unsigned getRttQueueSize();
+
+ /*
+ * @brief Change drop probability according to RTT statistics
+ * Invoked in RAAQM(), before control window size update.
+ */
+ RaaqmDataPath &updateDropProb();
+
+ void setAlpha(double alpha);
+
+ /**
+ * @brief Returns the smallest RTT registered so far for this path
+ */
+
+ unsigned int getPropagationDelay();
+
+ bool newPropagationDelayAvailable();
+
+ bool isStale();
+
+ private:
+ /**
+ * The value of the drop factor
+ */
+ double drop_factor_;
+
+ /**
+ * The minumum drop probability
+ */
+ double minimum_drop_probability_;
+
+ /**
+ * The timer, expressed in milliseconds
+ */
+ double timer_;
+
+ /**
+ * The number of samples to store for computing the protocol measurements
+ */
+ const unsigned int samples_;
+
+ /**
+ * The last, the minimum and the maximum value of the RTT (among the last
+ * m_samples samples)
+ */
+ uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_;
+
+ bool new_prop_delay_;
+
+ /**
+ * The current drop probability
+ */
+ double drop_prob_;
+
+ /**
+ * The number of packets received in this path
+ */
+ intmax_t packets_received_;
+
+ /**
+ * The first packet received after the statistics print
+ */
+ intmax_t last_packets_received_;
+
+ /**
+ * Total number of bytes received including the ICN header
+ */
+ intmax_t m_packets_bytes_received_;
+
+ /**
+ * The amount of packet bytes received at the last path summary computation
+ */
+ intmax_t last_packets_bytes_received_;
+
+ /**
+ * Total number of bytes received without including the ICN header
+ */
+ intmax_t raw_data_bytes_received_;
+
+ /**
+ * The amount of raw dat bytes received at the last path summary computation
+ */
+ intmax_t last_raw_data_bytes_received_;
+
+ class byArrival;
+
+ class byOrder;
+
+ /**
+ * Double ended queue for the RTTs
+ */
+
+ typedef utils::MinFilter<uint64_t> RTTQueue;
+
+ RTTQueue rtt_samples_;
+
+ /**
+ * Time of the last call to the path reporter method
+ */
+ utils::TimePoint last_received_pkt_;
+
+ double average_rtt_;
+ double alpha_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc
new file mode 100644
index 000000000..a2cf1aefe
--- /dev/null
+++ b/libtransport/src/protocols/rate_estimation.cc
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/utils/log.h>
+
+#include <protocols/rate_estimation.h>
+
+#include <thread>
+
+namespace transport {
+
+namespace protocol {
+
+void *Timer(void *data) {
+ InterRttEstimator *estimator = (InterRttEstimator *)data;
+
+ double dat_rtt, my_avg_win, my_avg_rtt;
+ int my_win_change, number_of_packets, max_packet_size;
+
+ pthread_mutex_lock(&(estimator->mutex_));
+ dat_rtt = estimator->rtt_;
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ while (estimator->is_running_) {
+ std::this_thread::sleep_for(std::chrono::microseconds(
+ (uint64_t)(interface::default_values::kv * dat_rtt)));
+
+ pthread_mutex_lock(&(estimator->mutex_));
+
+ dat_rtt = estimator->rtt_;
+ my_avg_win = estimator->avg_win_;
+ my_avg_rtt = estimator->avg_rtt_;
+ my_win_change = (int)(estimator->win_change_);
+ number_of_packets = estimator->number_of_packets_;
+ max_packet_size = estimator->max_packet_size_;
+ estimator->avg_rtt_ = estimator->rtt_;
+ estimator->avg_win_ = 0;
+ estimator->win_change_ = 0;
+ estimator->number_of_packets_ = 1;
+
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ if (number_of_packets == 0 || my_win_change == 0) {
+ continue;
+ }
+ if (estimator->estimation_ == 0) {
+ estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 /
+ (1.0 * my_win_change)) /
+ (my_avg_rtt / (1.0 * number_of_packets));
+ }
+
+ estimator->estimation_ =
+ estimator->alpha_ * estimator->estimation_ +
+ (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size *
+ 1000000.0 / (1.0 * my_win_change)) /
+ (my_avg_rtt / (1.0 * number_of_packets)));
+
+ if (estimator->observer_) {
+ estimator->observer_->notifyStats(estimator->estimation_);
+ }
+ }
+
+ return nullptr;
+}
+
+InterRttEstimator::InterRttEstimator(double alpha_arg) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->thread_is_running_ = false;
+ this->my_th_ = NULL;
+ this->is_running_ = true;
+ this->avg_rtt_ = 0.0;
+ this->estimation_ = 0.0;
+ this->avg_win_ = 0.0;
+ this->rtt_ = 0.0;
+ this->win_change_ = 0;
+ this->number_of_packets_ = 0;
+ this->max_packet_size_ = 0;
+ this->win_current_ = 1.0;
+
+ pthread_mutex_init(&(this->mutex_), NULL);
+ this->start_time_ = std::chrono::steady_clock::now();
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+InterRttEstimator::~InterRttEstimator() {
+ this->is_running_ = false;
+ if (this->my_th_) {
+ pthread_join(*(this->my_th_), NULL);
+ }
+ this->my_th_ = NULL;
+ pthread_mutex_destroy(&(this->mutex_));
+}
+
+void InterRttEstimator::onRttUpdate(double rtt) {
+ pthread_mutex_lock(&(this->mutex_));
+ this->rtt_ = rtt;
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ if (!thread_is_running_) {
+ my_th_ = (pthread_t *)malloc(sizeof(pthread_t));
+ if (!my_th_) {
+ TRANSPORT_LOGE("Error allocating thread.");
+ my_th_ = NULL;
+ }
+ if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer,
+ (void *)this)) {
+ TRANSPORT_LOGE("Error creating the thread");
+ my_th_ = NULL;
+ }
+ thread_is_running_ = true;
+ }
+}
+
+void InterRttEstimator::onWindowIncrease(double win_current) {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+void InterRttEstimator::onWindowDecrease(double win_current) {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+ALaTcpEstimator::ALaTcpEstimator() {
+ this->estimation_ = 0.0;
+ this->observer_ = NULL;
+ this->start_time_ = std::chrono::steady_clock::now();
+ this->totalSize_ = 0.0;
+}
+
+void ALaTcpEstimator::onStart() {
+ this->totalSize_ = 0.0;
+ this->start_time_ = std::chrono::steady_clock::now();
+}
+
+void ALaTcpEstimator::onDownloadFinished() {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->start_time_).count();
+ this->estimation_ = this->totalSize_ * 8 * 1000000 / delay;
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+}
+
+void ALaTcpEstimator::onDataReceived(int packet_size) {
+ this->totalSize_ += packet_size;
+}
+
+SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) {
+ this->estimation_ = 0.0;
+ this->estimated_ = false;
+ this->observer_ = nullptr;
+ this->batching_param_ = batching_param;
+ this->total_size_ = 0.0;
+ this->number_of_packets_ = 0;
+ this->base_alpha_ = alphaArg;
+ this->alpha_ = alphaArg;
+ this->start_time_ = std::chrono::steady_clock::now();
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+void SimpleEstimator::onStart() {
+ this->estimated_ = false;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ this->start_time_ = std::chrono::steady_clock::now();
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+void SimpleEstimator::onDownloadFinished() {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->start_time_).count();
+ if (observer_) {
+ observer_->notifyDownloadTime((double)delay);
+ }
+ if (!this->estimated_) {
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
+ ((double)this->batching_param_));
+ } else {
+ if (this->number_of_packets_ >=
+ (int)(75.0 * (double)this->batching_param_ / 100.0)) {
+ delay = std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
+ ((double)this->batching_param_));
+ }
+ }
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ this->start_time_ = std::chrono::steady_clock::now();
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+void SimpleEstimator::onDataReceived(int packet_size) {
+ this->total_size_ += packet_size;
+}
+
+void SimpleEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+
+ if (this->number_of_packets_ == this->batching_param_) {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+ // Assuming all packets carry max_packet_size_ bytes of data
+ // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ =
+ alpha_ * this->estimation_ +
+ (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ this->begin_batch_ = std::chrono::steady_clock::now();
+ }
+}
+
+BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg,
+ int param) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->batching_param_ = param;
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ this->max_packet_size_ = 0;
+ this->estimation_ = 0.0;
+ this->win_current_ = 1.0;
+ this->begin_batch_ = std::chrono::steady_clock::now();
+ this->start_time_ = std::chrono::steady_clock::now();
+}
+
+void BatchingPacketsEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+
+ if (number_of_packets_ == this->batching_param_) {
+ if (estimation_ == 0) {
+ estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 /
+ (1.0 * win_change_)) /
+ (avg_rtt_ / (1.0 * number_of_packets_));
+ } else {
+ estimation_ = alpha_ * estimation_ +
+ (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ *
+ 1000000.0 / (1.0 * win_change_)) /
+ (avg_rtt_ / (1.0 * number_of_packets_)));
+ }
+
+ if (observer_) {
+ observer_->notifyStats(estimation_);
+ }
+
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ }
+}
+
+void BatchingPacketsEstimator::onWindowIncrease(double win_current) {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+void BatchingPacketsEstimator::onWindowDecrease(double win_current) {
+ TimePoint end = std::chrono::steady_clock::now();
+ auto delay =
+ std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
+ .count();
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ this->begin_batch_ = std::chrono::steady_clock::now();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h
new file mode 100644
index 000000000..17f39e0b9
--- /dev/null
+++ b/libtransport/src/protocols/rate_estimation.h
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/statistics.h>
+
+#include <protocols/raaqm_data_path.h>
+
+#include <chrono>
+
+namespace transport {
+
+namespace protocol {
+
+class IcnRateEstimator {
+ public:
+ using TimePoint = std::chrono::steady_clock::time_point;
+ using Microseconds = std::chrono::microseconds;
+
+ IcnRateEstimator(){};
+
+ virtual ~IcnRateEstimator(){};
+
+ virtual void onRttUpdate(double rtt){};
+
+ virtual void onDataReceived(int packetSize){};
+
+ virtual void onWindowIncrease(double winCurrent){};
+
+ virtual void onWindowDecrease(double winCurrent){};
+
+ virtual void onStart(){};
+
+ virtual void onDownloadFinished(){};
+
+ virtual void setObserver(interface::IcnObserver *observer) {
+ this->observer_ = observer;
+ };
+ interface::IcnObserver *observer_;
+ TimePoint start_time_;
+ TimePoint begin_batch_;
+ double base_alpha_;
+ double alpha_;
+ double estimation_;
+ int number_of_packets_;
+ // this boolean is to make sure at least one estimation of the BW is done
+ bool estimated_;
+};
+
+// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class InterRttEstimator : public IcnRateEstimator {
+ public:
+ InterRttEstimator(double alpha_arg);
+
+ ~InterRttEstimator();
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart(){};
+
+ void onDownloadFinished(){};
+
+ // private: should be done by using getters
+ pthread_t *my_th_;
+ bool thread_is_running_;
+ double rtt_;
+ bool is_running_;
+ pthread_mutex_t mutex_;
+ double avg_rtt_;
+ double avg_win_;
+ int max_packet_size_;
+ double win_change_;
+ double win_current_;
+};
+
+// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class BatchingPacketsEstimator : public IcnRateEstimator {
+ public:
+ BatchingPacketsEstimator(double alpha_arg, int batchingParam);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart(){};
+
+ void onDownloadFinished(){};
+
+ private:
+ int batching_param_;
+ double avg_rtt_;
+ double avg_win_;
+ double win_change_;
+ int max_packet_size_;
+ double win_current_;
+};
+
+// Segment Estimator
+
+class ALaTcpEstimator : public IcnRateEstimator {
+ public:
+ ALaTcpEstimator();
+
+ void onDataReceived(int packet_size);
+ void onStart();
+ void onDownloadFinished();
+
+ private:
+ double totalSize_;
+};
+
+// A Rate estimator, this one is the simplest: counting batching_param_ packets
+// and then divide the sum of the size of these packets by the time taken to DL
+// them. Should be the one used
+
+class SimpleEstimator : public IcnRateEstimator {
+ public:
+ SimpleEstimator(double alpha, int batching_param);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size);
+
+ void onWindowIncrease(double win_current){};
+
+ void onWindowDecrease(double win_current){};
+
+ void onStart();
+
+ void onDownloadFinished();
+
+ private:
+ int batching_param_;
+ double total_size_;
+};
+
+void *Timer(void *data);
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/reassembly.cc b/libtransport/src/protocols/reassembly.cc
new file mode 100644
index 000000000..c6602153c
--- /dev/null
+++ b/libtransport/src/protocols/reassembly.cc
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
+
+#include <implementation/socket_consumer.h>
+#include <protocols/errors.h>
+#include <protocols/indexer.h>
+#include <protocols/reassembly.h>
+
+namespace transport {
+
+namespace protocol {
+
+void Reassembly::notifyApplication() {
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ if (TRANSPORT_EXPECT_FALSE(!read_callback)) {
+ TRANSPORT_LOGE("Read callback not installed!");
+ return;
+ }
+
+ if (read_callback->isBufferMovable()) {
+ // No need to perform an additional copy. The whole buffer will be
+ // tranferred to the application.
+
+ read_callback->readBufferAvailable(std::move(read_buffer_));
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer_->length();
+
+ while (read_buffer_->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer_->length(), length);
+ std::memcpy(buffer, read_buffer_->data(), to_copy);
+ read_buffer_->trimStart(to_copy);
+ }
+
+ read_callback->readDataAvailable(total_length);
+ read_buffer_->clear();
+ }
+}
+
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h
new file mode 100644
index 000000000..fdc9f2a05
--- /dev/null
+++ b/libtransport/src/protocols/reassembly.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <core/facade.h>
+
+namespace transport {
+
+namespace implementation {
+class ConsumerReadCallback;
+class ConsumerSocket;
+} // namespace implementation
+
+namespace protocol {
+
+class TransportProtocol;
+class Indexer;
+
+// Forward Declaration
+class ManifestManager;
+
+class Reassembly {
+ public:
+ class ContentReassembledCallback {
+ public:
+ virtual void onContentReassembled(std::error_code ec) = 0;
+ };
+
+ Reassembly(implementation::ConsumerSocket *icn_socket,
+ TransportProtocol *transport_protocol)
+ : reassembly_consumer_socket_(icn_socket),
+ transport_protocol_(transport_protocol) {}
+
+ virtual ~Reassembly() = default;
+
+ virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0;
+ virtual void reassemble(
+ std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
+ virtual void reInitialize() = 0;
+ virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; }
+
+ protected:
+ virtual void notifyApplication();
+
+ protected:
+ implementation::ConsumerSocket *reassembly_consumer_socket_;
+ TransportProtocol *transport_protocol_;
+ Indexer *index_manager_;
+ std::unique_ptr<utils::MemBuf> read_buffer_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
new file mode 100644
index 000000000..0ac3839dd
--- /dev/null
+++ b/libtransport/src/protocols/rtc.cc
@@ -0,0 +1,1017 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc.h>
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+
+#include <math.h>
+#include <random>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
+ inflightInterests_(1 << default_values::log_2_default_buffer_size),
+ modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ sentinel_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ reset();
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {
+ if (is_running_) {
+ stop();
+ }
+}
+
+int RTCTransportProtocol::start() {
+ if (is_running_) return -1;
+
+ reset();
+ is_first_ = true;
+
+ probeRtt();
+ sentinelTimer();
+ newRound();
+ scheduleNextInterests();
+
+ is_first_ = false;
+ is_running_ = true;
+ portal_->runEventsLoop();
+ is_running_ = false;
+
+ return 0;
+}
+
+void RTCTransportProtocol::stop() {
+ if (!is_running_) return;
+
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void RTCTransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+ inflightInterestsCount_ = 0;
+
+ probeRtt();
+ sentinelTimer();
+ newRound();
+ scheduleNextInterests();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ portal_->setConsumerCallback(this);
+ // controller var
+ currentState_ = HICN_RTC_SYNC_STATE;
+
+ // cwin var
+ currentCWin_ = HICN_INITIAL_CWIN;
+ maxCWin_ = HICN_INITIAL_CWIN_MAX;
+
+ // names/packets var
+ actualSegment_ = 0;
+ inflightInterestsCount_ = 0;
+ interestRetransmissions_.clear();
+ lastSegNacked_ = 0;
+ lastReceived_ = 0;
+ lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ lastEvent_ = lastReceivedTime_;
+ highestReceived_ = 0;
+ firstSequenceInRound_ = 0;
+
+ rtx_timer_used_ = false;
+ for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) {
+ inflightInterests_[i] = {0};
+ }
+
+ // stats
+ firstPckReceived_ = false;
+ receivedBytes_ = 0;
+ sentInterest_ = 0;
+ receivedData_ = 0;
+ packetLost_ = 0;
+ lossRecovered_ = 0;
+ avgPacketSize_ = HICN_INIT_PACKET_SIZE;
+ gotNack_ = false;
+ gotFutureNack_ = 0;
+ rounds_ = 0;
+ roundsWithoutNacks_ = 0;
+ pathTable_.clear();
+
+ // CC var
+ estimatedBw_ = 0.0;
+ lossRate_ = 0.0;
+ queuingDelay_ = 0.0;
+ protocolState_ = HICN_RTC_NORMAL_STATE;
+
+ producerPathLabels_[0] = 0;
+ producerPathLabels_[1] = 0;
+ initied = false;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ (uint32_t)HICN_RTC_INTEREST_LIFETIME);
+ // XXX this should be done by the application
+}
+
+uint32_t max(uint32_t a, uint32_t b) {
+ if (a > b)
+ return a;
+ else
+ return b;
+}
+
+uint32_t min(uint32_t a, uint32_t b) {
+ if (a < b)
+ return a;
+ else
+ return b;
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats(HICN_ROUND_LEN);
+ newRound();
+ });
+}
+
+void RTCTransportProtocol::updateDelayStats(
+ const ContentObject &content_object) {
+ uint32_t segmentNumber = content_object.getName().getSuffix();
+ uint32_t pkt = segmentNumber & modMask_;
+
+ if (inflightInterests_[pkt].state != sent_) return;
+
+ if (interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end())
+ // this packet was rtx at least once
+ return;
+
+ uint32_t pathLabel = content_object.getPathLabel();
+
+ if (pathTable_.find(pathLabel) == pathTable_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>();
+ pathTable_[pathLabel] = newPath;
+ }
+
+ // RTT measurements are useful both from NACKs and data packets
+ uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ inflightInterests_[pkt].transmissionTime;
+
+ pathTable_[pathLabel]->insertRttSample(RTT);
+ auto payload = content_object.getPayload();
+
+ // we collect OWD only for datapackets
+ if (payload->length() != HICN_NACK_HEADER_SIZE) {
+ uint64_t *senderTimeStamp = (uint64_t *)payload->data();
+ int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ *senderTimeStamp;
+
+ pathTable_[pathLabel]->insertOwdSample(OWD);
+ pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber);
+ } else {
+ pathTable_[pathLabel]->receivedNack();
+ }
+}
+
+void RTCTransportProtocol::updateStats(uint32_t round_duration) {
+ if (pathTable_.empty()) return;
+
+ if (receivedBytes_ != 0) {
+ double bytesPerSec =
+ (double)(receivedBytes_ *
+ ((double)HICN_MILLI_IN_A_SEC / (double)round_duration));
+ estimatedBw_ = (estimatedBw_ * HICN_ESTIMATED_BW_ALPHA) +
+ ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec);
+ }
+
+ uint64_t minRtt = UINT_MAX;
+ uint64_t maxRtt = 0;
+
+ for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
+ it->second->roundEnd();
+ if (it->second->isActive()) {
+ if (it->second->getMinRtt() < minRtt) {
+ minRtt = it->second->getMinRtt();
+ producerPathLabels_[0] = it->first;
+ }
+ if (it->second->getMinRtt() > maxRtt) {
+ maxRtt = it->second->getMinRtt();
+ producerPathLabels_[1] = it->first;
+ }
+ }
+ }
+
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
+ return; // this should not happen
+
+ // as a queuing delay we keep the lowest one among the two paths
+ // if one path is congested the forwarder should decide to do not
+ // use it so it does not make sense to inform the application
+ // that maybe we have a problem
+ if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
+ pathTable_[producerPathLabels_[1]]->getQueuingDealy())
+ queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
+ else
+ queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
+
+ if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
+ uint32_t numberTheoricallyReceivedPackets_ =
+ highestReceived_ - firstSequenceInRound_;
+ double lossRate = 0;
+ if (numberTheoricallyReceivedPackets_ != 0)
+ lossRate = (double)((double)(packetLost_ - lossRecovered_) /
+ (double)numberTheoricallyReceivedPackets_);
+
+ if (lossRate < 0) lossRate = 0;
+
+ if (initied) {
+ lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
+ (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
+ } else {
+ lossRate_ = lossRate;
+ initied = true;
+ }
+ }
+
+ if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE;
+
+ // for the BDP we use the max rtt, so that we calibrate the window on the
+ // RTT of the slowest path. In this way we are sure that the window will
+ // never be too small
+ uint32_t BDP = (uint32_t)ceil(
+ (estimatedBw_ *
+ (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() /
+ (double)HICN_MILLI_IN_A_SEC) *
+ HICN_BANDWIDTH_SLACK_FACTOR) /
+ avgPacketSize_);
+ uint32_t BW = (uint32_t)ceil(estimatedBw_);
+ computeMaxWindow(BW, BDP);
+
+ ConsumerTimerCallback *stats_callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(queuingDelay_);
+ stats_->updateLossRatio(lossRate_);
+ stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
+ (*stats_callback)(*socket_->getInterface(), *stats_);
+ }
+ // bound also by interest lifitime* production rate
+ if (!gotNack_) {
+ roundsWithoutNacks_++;
+ if (currentState_ == HICN_RTC_SYNC_STATE &&
+ roundsWithoutNacks_ >= HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH) {
+ currentState_ = HICN_RTC_NORMAL_STATE;
+ }
+ } else {
+ roundsWithoutNacks_ = 0;
+ }
+
+ updateCCState();
+ updateWindow();
+
+ if (queuingDelay_ > 25.0) {
+ // this indicates that the client will go soon out of synch,
+ // switch to synch mode
+ if (currentState_ == HICN_RTC_NORMAL_STATE) {
+ currentState_ = HICN_RTC_SYNC_STATE;
+ }
+ computeMaxWindow(BW, 0);
+ increaseWindow();
+ }
+
+ // in any case we reset all the counters
+
+ gotNack_ = false;
+ gotFutureNack_ = 0;
+ receivedBytes_ = 0;
+ sentInterest_ = 0;
+ receivedData_ = 0;
+ packetLost_ = 0;
+ lossRecovered_ = 0;
+ rounds_++;
+ firstSequenceInRound_ = highestReceived_;
+}
+
+void RTCTransportProtocol::updateCCState() {
+ // TODO
+}
+
+void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
+ uint32_t BDPWin) {
+ if (productionRate ==
+ 0) // we have no info about the producer, keep the previous maxCWin
+ return;
+
+ uint32_t interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interestLifetime);
+ uint32_t maxWaintingInterest = (uint32_t)ceil(
+ (productionRate / avgPacketSize_) *
+ (double)((double)(interestLifetime *
+ HICN_INTEREST_LIFETIME_REDUCTION_FACTOR) /
+ (double)HICN_MILLI_IN_A_SEC));
+
+ if (currentState_ == HICN_RTC_SYNC_STATE) {
+ // in this case we do not limit the window with the BDP, beacuse most
+ // likely it is wrong
+ maxCWin_ = maxWaintingInterest;
+ return;
+ }
+
+ // currentState = RTC_NORMAL_STATE
+ if (BDPWin != 0) {
+ maxCWin_ = (uint32_t)ceil((double)BDPWin +
+ (((double)BDPWin * 30.0) / 100.0)); // BDP + 30%
+ } else {
+ maxCWin_ = min(maxWaintingInterest, maxCWin_);
+ }
+
+ if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN;
+}
+
+void RTCTransportProtocol::updateWindow() {
+ if (currentState_ == HICN_RTC_SYNC_STATE) return;
+
+ if (currentCWin_ < maxCWin_ * 0.9) {
+ currentCWin_ =
+ min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR));
+ } else if (currentCWin_ > maxCWin_) {
+ currentCWin_ =
+ max((uint32_t)(currentCWin_ * HICN_WIN_DECREASE_FACTOR), HICN_MIN_CWIN);
+ }
+}
+
+void RTCTransportProtocol::decreaseWindow() {
+ // this is used only in SYNC mode
+ if (currentState_ == HICN_RTC_NORMAL_STATE) return;
+
+ if (gotFutureNack_ == 1)
+ currentCWin_ = min((currentCWin_ - 1),
+ (uint32_t)ceil((double)maxCWin_ * 0.66)); // 2/3
+ else
+ currentCWin_--;
+
+ currentCWin_ = max(currentCWin_, HICN_MIN_CWIN);
+}
+
+void RTCTransportProtocol::increaseWindow() {
+ // this is used only in SYNC mode
+ if (currentState_ == HICN_RTC_NORMAL_STATE) return;
+
+ // we need to be carefull to do not increase the window to much
+ if (currentCWin_ < ((double)maxCWin_ * 0.7)) {
+ currentCWin_ = currentCWin_ + 1; // exponential
+ } else {
+ currentCWin_ = min(
+ maxCWin_,
+ (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
+ }
+}
+
+void RTCTransportProtocol::probeRtt() {
+ probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
+ probe_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ probeRtt();
+ });
+
+ // To avoid sending the first probe, because the transport is not running yet
+ if (is_first_ && !is_running_) return;
+
+ time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ // get a random numbe in the probe seq range
+ std::default_random_engine eng((std::random_device())());
+ std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ,
+ HICN_MAX_PROBE_SEQ);
+ probe_seq_number_ = idis(eng);
+ interest_name->setSuffix(probe_seq_number_);
+
+ // we considere the probe as a rtx so that we do not incresea inFlightInt
+ received_probe_ = false;
+ TRANSPORT_LOGD("Send content interest %u (probeRtt)",
+ interest_name->getSuffix());
+ sendInterest(interest_name, true);
+}
+
+void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
+ auto interest = getPacket();
+ interest->setName(*interest_name);
+
+ uint32_t interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interestLifetime);
+ interest->setLifetime(uint32_t(interestLifetime));
+
+ ConsumerInterestCallback *on_interest_output = nullptr;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output);
+
+ if (*on_interest_output) {
+ (*on_interest_output)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+
+ sentInterest_++;
+
+ if (!rtx) {
+ packets_in_window_[interest_name->getSuffix()] = 0;
+ inflightInterestsCount_++;
+ }
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ if (!is_running_ && !is_first_) return;
+
+ TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----",
+ currentCWin_, inflightInterestsCount_,
+ currentCWin_ - inflightInterestsCount_);
+
+ while (inflightInterestsCount_ < currentCWin_) {
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ interest_name->setSuffix(actualSegment_);
+
+ // if the producer socket is not stated (does not reply even with nacks)
+ // we keep asking for something without marking anything as lost (see
+ // timeout). In this way when the producer socket will start the
+ // consumer socket will not miss any packet
+ if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
+ uint32_t pkt = actualSegment_ & modMask_;
+ inflightInterests_[pkt].state = sent_;
+ inflightInterests_[pkt].sequence = actualSegment_;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ TRANSPORT_LOGD(
+ "Send content interest %u (scheduleNextInterests no replies)",
+ interest_name->getSuffix());
+ sendInterest(interest_name, false);
+ return;
+ }
+
+ // we send the packet only if it is not pending yet
+ // notice that this is not true for rtx packets
+ if (portal_->interestIsPending(*interest_name)) {
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ continue;
+ }
+
+ uint32_t pkt = actualSegment_ & modMask_;
+ // if we already reacevied the content we don't ask it again
+ if (inflightInterests_[pkt].state == received_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ continue;
+ }
+
+ // same if the packet is lost
+ if (inflightInterests_[pkt].state == lost_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+ continue;
+ }
+
+ inflightInterests_[pkt].transmissionTime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ // here the packet can be in any state except for lost or recevied
+ inflightInterests_[pkt].state = sent_;
+ inflightInterests_[pkt].sequence = actualSegment_;
+ actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
+
+ TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)",
+ interest_name->getSuffix());
+ sendInterest(interest_name, false);
+ }
+
+ TRANSPORT_LOGD("----- end of scheduleNextInterest -----");
+}
+
+bool RTCTransportProtocol::verifyKeyPackets() {
+ // Not yet implemented
+ return false;
+}
+
+void RTCTransportProtocol::sentinelTimer() {
+ uint32_t wait = 50;
+
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) {
+ // we have all the info to set the timers
+ wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
+ if (wait == 0) wait = 1;
+ }
+
+ sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ sentinel_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) {
+ // we have no info, so we send again
+
+ for (auto it = packets_in_window_.begin(); it != packets_in_window_.end();
+ it++) {
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
+ }
+ }
+ } else {
+ uint64_t max_waiting_time = // wait at least 50ms
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
+
+ if ((currentState_ == HICN_RTC_NORMAL_STATE) &&
+ (inflightInterestsCount_ >= currentCWin_) &&
+ ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) {
+ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
+
+ for (auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++) {
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first &&
+ ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
+ }
+ }
+ }
+ }
+
+ sentinelTimer();
+ });
+}
+void RTCTransportProtocol::addRetransmissions(uint32_t val) {
+ // add only val in the rtx list
+ addRetransmissions(val, val + 1);
+}
+
+void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ bool new_rtx = false;
+ for (uint32_t i = start; i < stop; i++) {
+ auto it = interestRetransmissions_.find(i);
+ if (it == interestRetransmissions_.end()) {
+ uint32_t pkt = i & modMask_;
+ if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) {
+ // it must be larger than the last past nack received
+ packetLost_++;
+ interestRetransmissions_[i] = 0;
+ uint32_t pkt = i & modMask_;
+ // we reset the transmission time setting to now, so that rtx will
+ // happne in one RTT on waint one inter arrival gap
+ inflightInterests_[pkt].transmissionTime = now;
+ new_rtx = true;
+ }
+ } // if the retransmission is already there the rtx timer will
+ // take care of it
+ }
+
+ // in case a new rtx is added to the map we need to run checkRtx()
+ if (new_rtx) {
+ if (rtx_timer_used_) {
+ // if a timer is pending we need to delete it
+ rtx_timer_->cancel();
+ rtx_timer_used_ = false;
+ }
+ checkRtx();
+ }
+}
+
+uint64_t RTCTransportProtocol::retransmit() {
+ auto it = interestRetransmissions_.begin();
+
+ // cut len to max HICN_MAX_RTX_SIZE
+ // since we use a map, the smaller (and so the older) sequence number are at
+ // the beginnin of the map
+ while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) {
+ it = interestRetransmissions_.erase(it);
+ }
+
+ it = interestRetransmissions_.begin();
+ uint64_t smallest_timeout = ULONG_MAX;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ while (it != interestRetransmissions_.end()) {
+ uint32_t pkt = it->first & modMask_;
+
+ if (inflightInterests_[pkt].sequence != it->first) {
+ // this packet is not anymore in the inflight buffer, erase it
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ // we retransmitted the packet too many times
+ if (it->second >= HICN_MAX_RTX) {
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ // this packet is too old
+ if ((lastReceived_ > it->first) &&
+ (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) {
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ uint64_t rtx_time = now;
+
+ if (it->second == 0) {
+ // first rtx
+ if (producerPathLabels_[0] != producerPathLabels_[1]) {
+ // multipath
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end() &&
+ (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
+ HICN_MIN_INTER_ARRIVAL_GAP)) {
+ rtx_time = lastReceivedTime_ +
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
+ } // else low rate producer, send it immediatly
+ } else {
+ // single path
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
+ HICN_MIN_INTER_ARRIVAL_GAP)) {
+ rtx_time = lastReceivedTime_ +
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
+ } // else low rate producer send immediatly
+ }
+ } else {
+ // second or plus rtx, wait for the min rtt
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) {
+ uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
+ rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt();
+ } // if we don't have info we send it immediatly
+ }
+
+ if (now >= rtx_time) {
+ inflightInterests_[pkt].transmissionTime = now;
+ it->second++;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ TRANSPORT_LOGD("Send content interest %u (retransmit)",
+ interest_name->getSuffix());
+ sendInterest(interest_name, true);
+ } else if (rtx_time < smallest_timeout) {
+ smallest_timeout = rtx_time;
+ }
+
+ ++it;
+ }
+ return smallest_timeout;
+}
+
+void RTCTransportProtocol::checkRtx() {
+ if (interestRetransmissions_.empty()) {
+ rtx_timer_used_ = false;
+ return;
+ }
+
+ uint64_t next_timeout = retransmit();
+ uint64_t wait = 1;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (next_timeout != ULONG_MAX && now < next_timeout) {
+ wait = next_timeout - now;
+ }
+ rtx_timer_used_ = true;
+ rtx_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ rtx_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ rtx_timer_used_ = false;
+ checkRtx();
+ });
+}
+
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ uint32_t segmentNumber = interest->getName().getSuffix();
+
+ if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ uint32_t pkt = segmentNumber & modMask_;
+
+ if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
+ // we do nothing, and we keep asking the same stuff over
+ // and over until we get at least a packet
+ inflightInterestsCount_--;
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
+ scheduleNextInterests();
+ return;
+ }
+
+ if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
+ inflightInterestsCount_--;
+ }
+
+ // check how many times we sent this packet
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) {
+ inflightInterests_[pkt].state = lost_;
+ }
+
+ if (inflightInterests_[pkt].state == sent_) {
+ inflightInterests_[pkt].state = timeout1_;
+ } else if (inflightInterests_[pkt].state == timeout1_) {
+ inflightInterests_[pkt].state = timeout2_;
+ } else if (inflightInterests_[pkt].state == timeout2_) {
+ inflightInterests_[pkt].state = lost_;
+ }
+
+ if (inflightInterests_[pkt].state == lost_) {
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
+ addRetransmissions(segmentNumber);
+ }
+
+ scheduleNextInterests();
+}
+
+bool RTCTransportProtocol::onNack(const ContentObject &content_object,
+ bool rtx) {
+ uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
+ uint32_t productionSeg = *payload;
+ uint32_t productionRate = *(++payload);
+ uint32_t nackSegment = content_object.getName().getSuffix();
+
+ bool old_nack = false;
+
+ // if we did not received anything between lastReceived_ + 1 and productionSeg
+ // most likelly some packets got lost
+ if (lastReceived_ != 0) {
+ addRetransmissions(lastReceived_ + 1, productionSeg);
+ }
+
+ if (!rtx) {
+ gotNack_ = true;
+ // we synch the estimated production rate with the actual one
+ estimatedBw_ = (double)productionRate;
+ }
+
+ if (productionSeg > nackSegment) {
+ // we are asking for stuff produced in the past
+ actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ;
+
+ if (!rtx) {
+ if (currentState_ == HICN_RTC_NORMAL_STATE) {
+ currentState_ = HICN_RTC_SYNC_STATE;
+ }
+
+ computeMaxWindow(productionRate, 0);
+ increaseWindow();
+ }
+
+ lastSegNacked_ = productionSeg;
+ old_nack = true;
+
+ } else if (productionSeg < nackSegment) {
+ actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
+
+ if (!rtx) {
+ // we are asking stuff in the future
+ gotFutureNack_++;
+ computeMaxWindow(productionRate, 0);
+ decreaseWindow();
+
+ if (currentState_ == HICN_RTC_SYNC_STATE) {
+ currentState_ = HICN_RTC_NORMAL_STATE;
+ }
+ }
+ } else {
+ // we are asking the right thing, but the producer is slow
+ // keep doing the same until the packet is produced
+ actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
+ }
+
+ return old_nack;
+}
+
+void RTCTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ // as soon as we get a packet firstPckReceived_ will never be false
+ firstPckReceived_ = true;
+
+ auto payload = content_object->getPayload();
+ uint32_t payload_size = (uint32_t)payload->length();
+ uint32_t segmentNumber = content_object->getName().getSuffix();
+ uint32_t pkt = segmentNumber & modMask_;
+
+ ConsumerContentObjectCallback *callback_content_object = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &callback_content_object);
+ if (*callback_content_object) {
+ (*callback_content_object)(*socket_->getInterface(), *content_object);
+ }
+
+ if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segmentNumber);
+ if (segmentNumber == probe_seq_number_ && !received_probe_) {
+ received_probe_ = true;
+
+ uint32_t pathLabel = content_object->getPathLabel();
+ if (pathTable_.find(pathLabel) == pathTable_.end()) {
+ std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>();
+ pathTable_[pathLabel] = newPath;
+ }
+
+ // this is the expected probe, update the RTT and drop the packet
+ uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count() -
+ time_sent_probe_;
+
+ pathTable_[pathLabel]->insertRttSample(RTT);
+ pathTable_[pathLabel]->receivedNack();
+ }
+ return;
+ }
+
+ // check if the packet is a rtx
+ bool is_rtx = false;
+ if (interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end()) {
+ is_rtx = true;
+ } else {
+ auto it_win = packets_in_window_.find(segmentNumber);
+ if (it_win != packets_in_window_.end() && it_win->second != 0)
+ is_rtx = true;
+ }
+
+ if (payload_size == HICN_NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segmentNumber);
+
+ if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
+ inflightInterestsCount_--;
+ }
+
+ bool old_nack = false;
+
+ if (!is_rtx) {
+ // this is not a retransmitted packet
+ old_nack = onNack(*content_object, false);
+ updateDelayStats(*content_object);
+ } else {
+ old_nack = onNack(*content_object, true);
+ }
+
+ // the nacked_ state is used only to avoid to decrease
+ // inflightInterestsCount_ multiple times. In fact, every time that we
+ // receive an event related to an interest (timeout, nacked, content) we
+ // cange the state. In this way we are sure that we do not decrease twice
+ // the counter
+ if (old_nack) {
+ inflightInterests_[pkt].state = lost_;
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
+ inflightInterests_[pkt].state = nacked_;
+ }
+
+ } else {
+ TRANSPORT_LOGD("Received content %u", segmentNumber);
+
+ avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
+ ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
+
+ receivedBytes_ += (uint32_t)(content_object->headerSize() +
+ content_object->payloadSize());
+
+ if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
+ inflightInterestsCount_--; // packet sent without timeouts
+ }
+
+ if (inflightInterests_[pkt].state == sent_ && !is_rtx) {
+ // delay stats are computed only for non retransmitted data
+ updateDelayStats(*content_object);
+ }
+
+ addRetransmissions(lastReceived_ + 1, segmentNumber);
+ if (segmentNumber > highestReceived_) {
+ highestReceived_ = segmentNumber;
+ }
+ if (segmentNumber > lastReceived_) {
+ lastReceived_ = segmentNumber;
+ lastReceivedTime_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+ receivedData_++;
+ inflightInterests_[pkt].state = received_;
+
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end()) lossRecovered_++;
+
+ interestRetransmissions_.erase(segmentNumber);
+
+ reassemble(std::move(content_object));
+ increaseWindow();
+ }
+
+ scheduleNextInterests();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc.h b/libtransport/src/protocols/rtc.h
new file mode 100644
index 000000000..f15cdd1eb
--- /dev/null
+++ b/libtransport/src/protocols/rtc.h
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/datagram_reassembly.h>
+#include <protocols/protocol.h>
+#include <protocols/rtc_data_path.h>
+
+#include <map>
+#include <queue>
+#include <unordered_map>
+
+// algorithm state
+#define HICN_RTC_SYNC_STATE 0
+#define HICN_RTC_NORMAL_STATE 1
+#define HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH 3
+
+// packet constants
+#define HICN_INIT_PACKET_SIZE 1300 // bytes
+#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp
+#define HICN_NACK_HEADER_SIZE 8 // bytes
+#define HICN_TIMESTAMP_SIZE 8 // bytes
+#define HICN_RTC_INTEREST_LIFETIME 1000 // ms
+
+// rtt measurement
+// normal interests for data goes from 0 to
+// HICN_MIN_PROBE_SEQ, the rest is reserverd for
+// probes
+#define HICN_MIN_PROBE_SEQ 0xefffffff
+#define HICN_MAX_PROBE_SEQ 0xffffffff
+
+// controller constant
+#define HICN_ROUND_LEN \
+ 200 // ms interval of time on which
+ // we take decisions / measurements
+#define HICN_MAX_RTX 10
+#define HICN_MAX_RTX_SIZE 1024
+#define HICN_MAX_RTX_MAX_AGE 10000
+#define HICN_MIN_RTT_WIN 30 // rounds
+#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms
+
+// cwin
+#define HICN_INITIAL_CWIN 1 // packets
+#define HICN_INITIAL_CWIN_MAX 100000 // packets
+#define HICN_MIN_CWIN 10 // packets
+#define HICN_WIN_INCREASE_FACTOR 1.5
+#define HICN_WIN_DECREASE_FACTOR 0.9
+
+// statistics constants
+#define HICN_BANDWIDTH_SLACK_FACTOR 1.8
+#define HICN_ESTIMATED_BW_ALPHA 0.7
+#define HICN_ESTIMATED_PACKET_SIZE 0.7
+#define HICN_ESTIMATED_LOSSES_ALPHA 0.8
+#define HICN_INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+
+// other constants
+#define HICN_NANO_IN_A_SEC 1000000000
+#define HICN_MICRO_IN_A_SEC 1000000
+#define HICN_MILLI_IN_A_SEC 1000
+
+namespace transport {
+
+namespace protocol {
+
+enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ };
+
+typedef enum packetState packetState_t;
+
+struct sentInterest {
+ uint64_t transmissionTime;
+ uint32_t sequence; // sequence number of the interest sent
+ // to handle seq % buffer_size
+ packetState_t state; // see packet state
+};
+
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
+ public:
+ RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ int start() override;
+
+ void stop() override;
+
+ void resume() override;
+
+ bool verifyKeyPackets() override;
+
+ private:
+ // algo functions
+ void reset() override;
+
+ // CC functions
+ void updateDelayStats(const ContentObject &content_object);
+ void updateStats(uint32_t round_duration);
+ void updateCCState();
+ void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin);
+ void updateWindow();
+ void decreaseWindow();
+ void increaseWindow();
+ void resetPreviousWindow();
+
+ // packet functions
+ void sendInterest(Name *interest_name, bool rtx);
+ void scheduleNextInterests() override;
+ void sentinelTimer();
+ void addRetransmissions(uint32_t val);
+ void addRetransmissions(uint32_t start, uint32_t stop);
+ uint64_t retransmit();
+ void checkRtx();
+ void probeRtt();
+ void newRound();
+ void onTimeout(Interest::Ptr &&interest) override;
+ bool onNack(const ContentObject &content_object, bool rtx);
+ void onContentObject(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override;
+ void onPacketDropped(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
+
+ TRANSPORT_ALWAYS_INLINE virtual void reassemble(
+ ContentObject::Ptr &&content_object) override {
+ auto read_buffer = content_object->getPayload();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+ }
+
+ // controller var
+ std::unique_ptr<asio::steady_timer> round_timer_;
+ unsigned currentState_;
+
+ // cwin var
+ uint32_t currentCWin_;
+ uint32_t maxCWin_;
+
+ // names/packets var
+ uint32_t actualSegment_;
+ uint32_t inflightInterestsCount_;
+ // map seq to rtx
+ std::map<uint32_t, uint8_t> interestRetransmissions_;
+ bool rtx_timer_used_;
+ std::unique_ptr<asio::steady_timer> rtx_timer_;
+ std::vector<sentInterest> inflightInterests_;
+ uint32_t lastSegNacked_; // indicates the segment id in the last received
+ // past Nack. we do not ask for retransmissions
+ // for samething that is older than this value.
+ uint32_t lastReceived_; // segment of the last content object received
+ // indicates the base of the window on the client
+ uint64_t lastReceivedTime_; // time at which we recevied the
+ // lastReceived_ packet
+
+ // sentinel
+ // if all packets in the window get lost we need something that
+ // wakes up our consumer socket. Interest timeouts set to 1 sec
+ // expire too late. This timers expire much sooner and if it
+ // detects that all the interest in the window may be lost
+ // it sends all of them again
+ std::unique_ptr<asio::steady_timer> sentinel_timer_;
+ uint64_t lastEvent_; // time at which we removed a pending
+ // interest from the window
+ std::unordered_map<uint32_t, uint8_t> packets_in_window_;
+
+ // rtt probes
+ // the RTC transport tends to overestimate the RTT
+ // du to the production time on the server side
+ // once per second we send an interest for wich we know
+ // we will get a nack. This nack will keep our estimation
+ // close to the reality
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+ uint64_t time_sent_probe_;
+ uint32_t probe_seq_number_;
+ bool received_probe_;
+
+ uint32_t modMask_;
+
+ // stats
+ bool firstPckReceived_;
+ uint32_t receivedBytes_;
+ uint32_t sentInterest_;
+ uint32_t receivedData_;
+ int32_t packetLost_;
+ int32_t lossRecovered_;
+ uint32_t firstSequenceInRound_;
+ uint32_t highestReceived_;
+ double avgPacketSize_;
+ bool gotNack_;
+ uint32_t gotFutureNack_;
+ uint32_t rounds_;
+ uint32_t roundsWithoutNacks_;
+
+ // we keep track of up two paths (if only one path is in use
+ // the two values in the vector will be the same)
+ // position 0 stores the path with minRTT
+ // position 1 stores the path with maxRTT
+ uint32_t producerPathLabels_[2];
+
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
+ uint32_t roundCounter_;
+
+ // CC var
+ double estimatedBw_;
+ double lossRate_;
+ double queuingDelay_;
+ unsigned protocolState_;
+
+ bool initied;
+};
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc_data_path.cc b/libtransport/src/protocols/rtc_data_path.cc
new file mode 100644
index 000000000..30644e939
--- /dev/null
+++ b/libtransport/src/protocols/rtc_data_path.cc
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc_data_path.h>
+
+#include <cfloat>
+#include <chrono>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+
+namespace transport {
+
+namespace protocol {
+
+RTCDataPath::RTCDataPath()
+ : min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(0.0),
+ queuing_delay(DBL_MAX),
+ lastRecvSeq_(0),
+ lastRecvTime_(0),
+ avg_inter_arrival_(DBL_MAX),
+ received_nacks_(false),
+ received_packets_(false),
+ rounds_without_packets_(0),
+ RTThistory_(HISTORY_LEN),
+ OWDhistory_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(uint64_t rtt) {
+ // for the rtt we only keep track of the min one
+ if (rtt < min_rtt) min_rtt = rtt;
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ if (avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ // owd is computed only for valid data packets so we count only
+ // this for decide if we recevie traffic or not
+ received_packets_ = true;
+}
+
+void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber) {
+ // got packet in sequence, compute gap
+ if (lastRecvSeq_ == (segmentNumber - 1)) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t delta = now - lastRecvTime_;
+ lastRecvSeq_ = segmentNumber;
+ lastRecvTime_ = now;
+ if (avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ =
+ (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
+ return;
+ }
+
+ // ooo packet, update the stasts if needed
+ if (lastRecvSeq_ <= segmentNumber) {
+ lastRecvSeq_ = segmentNumber;
+ lastRecvTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+}
+
+void RTCDataPath::receivedNack() { received_nacks_ = true; }
+
+double RTCDataPath::getInterArrivalGap() {
+ if (avg_inter_arrival_ == DBL_MAX) return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isActive() {
+ if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
+
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ if (min_rtt == 0) min_rtt = 1;
+
+ RTThistory_.pushBack(min_rtt);
+ min_rtt = UINT_MAX;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ if (min_owd != INT_MAX) {
+ OWDhistory_.pushBack(min_owd);
+ min_owd = INT_MAX;
+
+ // compute queuing delay
+ queuing_delay = avg_owd - getMinOwd();
+
+ } else {
+ queuing_delay = 0.0;
+ }
+
+ if (!received_packets_)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = false;
+}
+
+double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+
+uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); }
+
+int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); }
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc_data_path.h b/libtransport/src/protocols/rtc_data_path.h
new file mode 100644
index 000000000..9076b355f
--- /dev/null
+++ b/libtransport/src/protocols/rtc_data_path.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <climits>
+
+#include <utils/min_filter.h>
+
+#define ALPHA_RTC 0.125
+#define HISTORY_LEN 20 // 4 sec
+
+namespace transport {
+
+namespace protocol {
+
+class RTCDataPath {
+ public:
+ RTCDataPath();
+
+ public:
+ void insertRttSample(uint64_t rtt);
+ void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segmentNumber);
+ void receivedNack();
+
+ uint64_t getMinRtt();
+ double getQueuingDealy();
+ double getInterArrivalGap();
+ bool isActive();
+
+ void roundEnd();
+
+ private:
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ uint32_t lastRecvSeq_;
+ uint64_t lastRecvTime_;
+ double avg_inter_arrival_;
+
+ // flags to check if a path is active
+ // we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ // and if we receives packets
+ bool received_nacks_;
+ bool received_packets_;
+ uint8_t rounds_without_packets_; // if we don't get any packet
+ // for MAX_ROUNDS_WITHOUT_PKTS
+ // we consider the path inactive
+
+ utils::MinFilter<uint64_t> RTThistory_;
+ utils::MinFilter<int64_t> OWDhistory_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/test/CMakeLists.txt b/libtransport/src/protocols/test/CMakeLists.txt
new file mode 100644
index 000000000..6f9fdb9aa
--- /dev/null
+++ b/libtransport/src/protocols/test/CMakeLists.txt
@@ -0,0 +1,10 @@
+# Enable gcov output for the tests
+add_definitions(--coverage)
+set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage")
+
+set(TestsExpectedToPass
+ test_transport_producer)
+
+foreach(test ${TestsExpectedToPass})
+ AddTest(${test})
+endforeach() \ No newline at end of file
diff --git a/libtransport/src/protocols/test/test_transport_producer.cc b/libtransport/src/protocols/test/test_transport_producer.cc
new file mode 100644
index 000000000..204f2cbe2
--- /dev/null
+++ b/libtransport/src/protocols/test/test_transport_producer.cc
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "../socket_producer.h"
+#include "literals.h"
+
+#include <test.h>
+#include <random>
+
+namespace transport {
+
+namespace protocol {
+
+namespace {
+// The fixture for testing class Foo.
+class ProducerTest : public ::testing::Test {
+ protected:
+ ProducerTest() : name_("b001::123|321"), producer_(io_service_) {
+ // You can do set-up work for each test here.
+ }
+
+ virtual ~ProducerTest() {
+ // You can do clean-up work that doesn't throw exceptions here.
+ }
+
+ // If the constructor and destructor are not enough for setting up
+ // and cleaning up each test, you can define the following methods:
+
+ virtual void SetUp() {
+ // Code here will be called immediately after the constructor (right
+ // before each test).
+ }
+
+ virtual void TearDown() {
+ // Code here will be called immediately after each test (right
+ // before the destructor).
+ }
+
+ Name name_;
+ asio::io_service io_service_;
+ ProducerSocket producer_;
+};
+
+} // namespace
+
+// Tests that the Foo::Bar() method does Abc.
+TEST_F(ProducerTest, ProduceContent) {
+ std::string content(250000, '?');
+
+ producer_.registerPrefix(Prefix("b001::/64"));
+ producer_.produce(name_, reinterpret_cast<const uint8_t *>(content.data()),
+ content.size(), true);
+ producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ 500000000_U32);
+ producer_.attach();
+ producer_.serveForever();
+}
+
+} // namespace protocol
+
+} // namespace transport
+
+int main(int argc, char **argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+} \ No newline at end of file
diff --git a/libtransport/src/protocols/verification_manager.cc b/libtransport/src/protocols/verification_manager.cc
new file mode 100644
index 000000000..8eedd6106
--- /dev/null
+++ b/libtransport/src/protocols/verification_manager.cc
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/security/verifier.h>
+
+#include <implementation/socket_consumer.h>
+#include <protocols/verification_manager.h>
+
+namespace transport {
+
+namespace protocol {
+
+interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
+ const Packet& packet) {
+ using namespace interface;
+
+ bool verify_signature = false, key_content = false;
+ VerificationPolicy ret = VerificationPolicy::DROP_PACKET;
+
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+ icn_socket_->getSocketOption(GeneralTransportOptions::KEY_CONTENT,
+ key_content);
+
+ if (!verify_signature) {
+ return VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ if (key_content) {
+ key_packets_.push(copyPacket(packet));
+ return VerificationPolicy::ACCEPT_PACKET;
+ } else if (!key_packets_.empty()) {
+ std::queue<ContentObjectPtr>().swap(key_packets_);
+ }
+
+ ConsumerContentObjectVerificationFailedCallback*
+ verification_failed_callback = VOID_HANDLER;
+ icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback);
+
+ if (!verification_failed_callback) {
+ throw errors::RuntimeException(
+ "No verification failed callback provided by application. "
+ "Aborting.");
+ }
+
+ std::shared_ptr<utils::Verifier> verifier;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ if (TRANSPORT_EXPECT_FALSE(!verifier)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_->getInterface(),
+ dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::no_verifier_provided));
+ return ret;
+ }
+
+ if (!verifier->verify(packet)) {
+ ret = (*verification_failed_callback)(
+ *icn_socket_->getInterface(),
+ dynamic_cast<const ContentObject&>(packet),
+ make_error_code(protocol_error::signature_verification_failed));
+ } else {
+ ret = VerificationPolicy::ACCEPT_PACKET;
+ }
+
+ return ret;
+}
+
+bool SignatureVerificationManager::onKeyToVerify() {
+ if (TRANSPORT_EXPECT_FALSE(key_packets_.empty())) {
+ throw errors::RuntimeException("No key to verify.");
+ }
+
+ while (!key_packets_.empty()) {
+ ContentObjectPtr packet_to_verify = key_packets_.front();
+ key_packets_.pop();
+ if (onPacketToVerify(*packet_to_verify) !=
+ VerificationPolicy::ACCEPT_PACKET)
+ return false;
+ }
+
+ return true;
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/verification_manager.h b/libtransport/src/protocols/verification_manager.h
new file mode 100644
index 000000000..7d8a00a65
--- /dev/null
+++ b/libtransport/src/protocols/verification_manager.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/verification_policy.h>
+
+#include <hicn/transport/core/content_object.h>
+
+#include <protocols/errors.h>
+
+#include <queue>
+
+namespace transport {
+
+namespace interface {
+class ConsumerSocket;
+}
+
+namespace protocol {
+
+using Packet = core::Packet;
+using interface::VerificationPolicy;
+using ContentObjectPtr = std::shared_ptr<core::ContentObject>;
+
+class VerificationManager {
+ public:
+ virtual ~VerificationManager() = default;
+ virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0;
+ virtual bool onKeyToVerify() { return false; }
+};
+
+class SignatureVerificationManager : public VerificationManager {
+ public:
+ SignatureVerificationManager(implementation::ConsumerSocket* icn_socket)
+ : icn_socket_(icn_socket), key_packets_() {}
+
+ interface::VerificationPolicy onPacketToVerify(const Packet& packet) override;
+ bool onKeyToVerify() override;
+
+ private:
+ implementation::ConsumerSocket* icn_socket_;
+ std::queue<ContentObjectPtr> key_packets_;
+
+ ContentObjectPtr copyPacket(const Packet& packet) {
+ std::shared_ptr<utils::MemBuf> packet_copy =
+ packet.acquireMemBufReference();
+ ContentObjectPtr content_object_copy =
+ std::make_shared<core::ContentObject>(std::move(packet_copy));
+ std::unique_ptr<utils::MemBuf> payload_copy = packet.getPayload();
+ content_object_copy->appendPayload(std::move(payload_copy));
+ return content_object_copy;
+ }
+};
+
+} // end namespace protocol
+
+} // end namespace transport