aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/protocols
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/protocols')
-rw-r--r--libtransport/src/protocols/CMakeLists.txt21
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc49
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.h4
-rw-r--r--libtransport/src/protocols/data_processing_events.h3
-rw-r--r--libtransport/src/protocols/datagram_reassembly.cc4
-rw-r--r--libtransport/src/protocols/datagram_reassembly.h2
-rw-r--r--libtransport/src/protocols/errors.cc4
-rw-r--r--libtransport/src/protocols/fec_base.h86
-rw-r--r--libtransport/src/protocols/incremental_indexer.cc31
-rw-r--r--libtransport/src/protocols/incremental_indexer.h35
-rw-r--r--libtransport/src/protocols/indexer.cc24
-rw-r--r--libtransport/src/protocols/indexer.h16
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.cc215
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.h39
-rw-r--r--libtransport/src/protocols/prod_protocol_bytestream.cc390
-rw-r--r--libtransport/src/protocols/prod_protocol_bytestream.h72
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc481
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.h127
-rw-r--r--libtransport/src/protocols/production_protocol.cc135
-rw-r--r--libtransport/src/protocols/production_protocol.h108
-rw-r--r--libtransport/src/protocols/raaqm.cc58
-rw-r--r--libtransport/src/protocols/raaqm.h15
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc1
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h1
-rw-r--r--libtransport/src/protocols/rate_estimation.cc1
-rw-r--r--libtransport/src/protocols/rate_estimation.h1
-rw-r--r--libtransport/src/protocols/reassembly.cc1
-rw-r--r--libtransport/src/protocols/reassembly.h2
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt38
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.cc101
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.h138
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc107
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h75
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc607
-rw-r--r--libtransport/src/protocols/rtc/rtc.h113
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h121
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc197
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h97
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc427
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h108
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h89
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc.h58
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.cc79
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc560
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h253
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.cc334
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.h147
-rw-r--r--libtransport/src/protocols/transport_protocol.cc132
-rw-r--r--libtransport/src/protocols/transport_protocol.h104
52 files changed, 5726 insertions, 284 deletions
diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt
index 8bfbdd6ad..eba8d1aab 100644
--- a/libtransport/src/protocols/CMakeLists.txt
+++ b/libtransport/src/protocols/CMakeLists.txt
@@ -21,16 +21,15 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h
${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h
${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
- ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h
- ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/transport_protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/production_protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_bytestream.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_rtc.h
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h
${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h
${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
- ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
)
@@ -41,15 +40,15 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc
${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/transport_protocol.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/production_protocol.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_bytestream.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/prod_protocol_rtc.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc
${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
)
set(RAAQM_CONFIG_INSTALL_PREFIX
@@ -71,5 +70,7 @@ install(
COMPONENT lib${LIBTRANSPORT}
)
+add_subdirectory(rtc)
+
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
-set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index 6662bec3f..d2bc961c4 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -20,7 +20,7 @@
#include <protocols/byte_stream_reassembly.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
-#include <protocols/protocol.h>
+#include <protocols/transport_protocol.h>
namespace transport {
@@ -45,11 +45,11 @@ void ByteStreamReassembly::reassemble(
}
}
-void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) &&
- read_buffer_->capacity()) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
+void ByteStreamReassembly::reassemble(ContentObject &content_object) {
+ if (TRANSPORT_EXPECT_TRUE(read_buffer_->capacity())) {
+ received_packets_.emplace(
+ std::make_pair(content_object.getName().getSuffix(),
+ content_object.shared_from_this()));
assembleContent();
}
}
@@ -81,25 +81,32 @@ void ByteStreamReassembly::assembleContent() {
}
}
-bool ByteStreamReassembly::copyContent(const ContentObject &content_object) {
+bool ByteStreamReassembly::copyContent(ContentObject &content_object) {
bool ret = false;
- auto payload = content_object.getPayloadReference();
- auto payload_length = payload.second;
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
+ content_object.trimStart(content_object.headerSize());
- std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
- read_buffer_->append(write_size);
+ utils::MemBuf *current = &content_object;
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
+ do {
+ auto payload_length = current->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), current->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), current->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
+
+ current = current->next();
+ } while (current != &content_object);
download_complete_ =
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h
index e4f62b3a8..c682d58cb 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.h
+++ b/libtransport/src/protocols/byte_stream_reassembly.h
@@ -27,12 +27,12 @@ class ByteStreamReassembly : public Reassembly {
TransportProtocol *transport_protocol);
protected:
- virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+ virtual void reassemble(core::ContentObject &content_object) override;
virtual void reassemble(
std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
- bool copyContent(const core::ContentObject &content_object);
+ bool copyContent(core::ContentObject &content_object);
virtual void reInitialize() override;
diff --git a/libtransport/src/protocols/data_processing_events.h b/libtransport/src/protocols/data_processing_events.h
index 8975c2b4a..5c8c16157 100644
--- a/libtransport/src/protocols/data_processing_events.h
+++ b/libtransport/src/protocols/data_processing_events.h
@@ -24,8 +24,7 @@ namespace protocol {
class ContentObjectProcessingEventCallback {
public:
virtual ~ContentObjectProcessingEventCallback() = default;
- virtual void onPacketDropped(core::Interest::Ptr &&i,
- core::ContentObject::Ptr &&c) = 0;
+ virtual void onPacketDropped(core::Interest &i, core::ContentObject &c) = 0;
virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
};
diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc
index abd7e984d..962c1e020 100644
--- a/libtransport/src/protocols/datagram_reassembly.cc
+++ b/libtransport/src/protocols/datagram_reassembly.cc
@@ -24,8 +24,8 @@ DatagramReassembly::DatagramReassembly(
TransportProtocol* transport_protocol)
: Reassembly(icn_socket, transport_protocol) {}
-void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) {
- read_buffer_ = content_object->getPayload();
+void DatagramReassembly::reassemble(core::ContentObject& content_object) {
+ read_buffer_ = content_object.getPayload();
Reassembly::notifyApplication();
}
diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h
index 2427ae62f..3462212d3 100644
--- a/libtransport/src/protocols/datagram_reassembly.h
+++ b/libtransport/src/protocols/datagram_reassembly.h
@@ -26,7 +26,7 @@ class DatagramReassembly : public Reassembly {
DatagramReassembly(implementation::ConsumerSocket *icn_socket,
TransportProtocol *transport_protocol);
- virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
+ virtual void reassemble(core::ContentObject &content_object) override;
virtual void reInitialize() override;
virtual void reassemble(
std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
diff --git a/libtransport/src/protocols/errors.cc b/libtransport/src/protocols/errors.cc
index eefb6f957..ae7b6e634 100644
--- a/libtransport/src/protocols/errors.cc
+++ b/libtransport/src/protocols/errors.cc
@@ -52,7 +52,9 @@ std::string protocol_category_impl::message(int ev) const {
case protocol_error::session_aborted: {
return "The session has been aborted by the application.";
}
- default: { return "Unknown protocol error"; }
+ default: {
+ return "Unknown protocol error";
+ }
}
}
diff --git a/libtransport/src/protocols/fec_base.h b/libtransport/src/protocols/fec_base.h
new file mode 100644
index 000000000..a135c474f
--- /dev/null
+++ b/libtransport/src/protocols/fec_base.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+
+#include <functional>
+
+namespace transport {
+namespace protocol {
+
+/**
+ * Interface classes to integrate FEC inside any producer transport protocol
+ */
+class ProducerFECBase {
+ public:
+ /**
+ * Callback, to be called by implementations as soon as a repair packet is
+ * ready.
+ */
+ using RepairPacketsReady =
+ std::function<void(std::vector<core::ContentObject::Ptr> &)>;
+
+ /**
+ * Producers will call this function upon production of a new packet.
+ */
+ virtual void onPacketProduced(const core::ContentObject &content_object) = 0;
+
+ /**
+ * Set callback to signal production protocol the repair packet is ready.
+ */
+ void setFECCallback(const RepairPacketsReady &on_repair_packet) {
+ rep_packet_ready_callback_ = on_repair_packet;
+ }
+
+ protected:
+ RepairPacketsReady rep_packet_ready_callback_;
+};
+
+/**
+ * Interface classes to integrate FEC inside any consumer transport protocol
+ */
+class ConsumerFECBase {
+ public:
+ /**
+ * Callback, to be called by implemrntations as soon as a packet is recovered.
+ */
+ using OnPacketsRecovered =
+ std::function<void(std::vector<core::ContentObject::Ptr> &)>;
+
+ /**
+ * Consumers will call this function when they receive a FEC packet.
+ */
+ virtual void onFECPacket(const core::ContentObject &content_object) = 0;
+
+ /**
+ * Consumers will call this function when they receive a data packet
+ */
+ virtual void onDataPacket(const core::ContentObject &content_object) = 0;
+
+ /**
+ * Set callback to signal consumer protocol the repair packet is ready.
+ */
+ void setFECCallback(const OnPacketsRecovered &on_repair_packet) {
+ packet_recovered_callback_ = on_repair_packet;
+ }
+
+ protected:
+ OnPacketsRecovered packet_recovered_callback_;
+};
+
+} // namespace protocol
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc
index 0872c4554..95daa0a3e 100644
--- a/libtransport/src/protocols/incremental_indexer.cc
+++ b/libtransport/src/protocols/incremental_indexer.cc
@@ -13,37 +13,38 @@
* limitations under the License.
*/
-#include <protocols/incremental_indexer.h>
-
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <protocols/protocol.h>
+#include <protocols/errors.h>
+#include <protocols/incremental_indexer.h>
+#include <protocols/transport_protocol.h>
namespace transport {
namespace protocol {
-void IncrementalIndexer::onContentObject(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
+void IncrementalIndexer::onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) {
using namespace interface;
- TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
+ TRANSPORT_LOGD("Received content %s",
+ content_object.getName().toString().c_str());
- if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
- final_suffix_ = content_object->getName().getSuffix();
+ if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) {
+ final_suffix_ = content_object.getName().getSuffix();
}
- auto ret = verification_manager_->onPacketToVerify(*content_object);
+ auto ret = verifier_->verifyPackets(&content_object);
switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- reassembly_->reassemble(std::move(content_object));
+ case auth::VerificationPolicy::ACCEPT: {
+ reassembly_->reassemble(content_object);
break;
}
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(interest),
- std::move(content_object));
+ case auth::VerificationPolicy::UNKNOWN:
+ case auth::VerificationPolicy::DROP: {
+ transport_protocol_->onPacketDropped(interest, content_object);
break;
}
- case VerificationPolicy::ABORT_SESSION: {
+ case auth::VerificationPolicy::ABORT: {
transport_protocol_->onContentReassembled(
make_error_code(protocol_error::session_aborted));
break;
diff --git a/libtransport/src/protocols/incremental_indexer.h b/libtransport/src/protocols/incremental_indexer.h
index 20c5e4759..d7760f8e6 100644
--- a/libtransport/src/protocols/incremental_indexer.h
+++ b/libtransport/src/protocols/incremental_indexer.h
@@ -15,13 +15,13 @@
#pragma once
-#include <hicn/transport/errors/runtime_exception.h>
-#include <hicn/transport/errors/unexpected_manifest_exception.h>
+#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/auth/verifier.h>
#include <hicn/transport/utils/literals.h>
-
+#include <implementation/socket_consumer.h>
#include <protocols/indexer.h>
#include <protocols/reassembly.h>
-#include <protocols/verification_manager.h>
#include <deque>
@@ -47,11 +47,12 @@ class IncrementalIndexer : public Indexer {
first_suffix_(0),
next_download_suffix_(0),
next_reassembly_suffix_(0),
- verification_manager_(
- std::make_unique<SignatureVerificationManager>(icn_socket)) {
+ verifier_(nullptr) {
if (reassembly_) {
reassembly_->setIndexer(this);
}
+ socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER,
+ verifier_);
}
IncrementalIndexer(const IncrementalIndexer &) = delete;
@@ -64,15 +65,14 @@ class IncrementalIndexer : public Indexer {
first_suffix_(other.first_suffix_),
next_download_suffix_(other.next_download_suffix_),
next_reassembly_suffix_(other.next_reassembly_suffix_),
- verification_manager_(std::move(other.verification_manager_)) {
+ verifier_(nullptr) {
if (reassembly_) {
reassembly_->setIndexer(this);
}
+ socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER,
+ verifier_);
}
- /**
- *
- */
virtual ~IncrementalIndexer() {}
TRANSPORT_ALWAYS_INLINE virtual void reset(
@@ -112,8 +112,8 @@ class IncrementalIndexer : public Indexer {
return final_suffix_;
}
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
+ void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) override;
TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) {
reassembly_ = reassembly;
@@ -123,10 +123,6 @@ class IncrementalIndexer : public Indexer {
}
}
- TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override {
- return verification_manager_->onKeyToVerify();
- }
-
protected:
implementation::ConsumerSocket *socket_;
Reassembly *reassembly_;
@@ -135,9 +131,8 @@ class IncrementalIndexer : public Indexer {
uint32_t first_suffix_;
uint32_t next_download_suffix_;
uint32_t next_reassembly_suffix_;
- std::unique_ptr<VerificationManager> verification_manager_;
+ std::shared_ptr<auth::Verifier> verifier_;
};
-} // end namespace protocol
-
-} // end namespace transport
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/indexer.cc b/libtransport/src/protocols/indexer.cc
index ca12330a6..1379a609c 100644
--- a/libtransport/src/protocols/indexer.cc
+++ b/libtransport/src/protocols/indexer.cc
@@ -14,11 +14,9 @@
*/
#include <hicn/transport/utils/branch_prediction.h>
-
#include <protocols/incremental_indexer.h>
#include <protocols/indexer.h>
#include <protocols/manifest_incremental_indexer.h>
-#include <protocols/protocol.h>
namespace transport {
namespace protocol {
@@ -32,16 +30,16 @@ IndexManager::IndexManager(implementation::ConsumerSocket *icn_socket,
transport_(transport),
reassembly_(reassembly) {}
-void IndexManager::onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) {
+void IndexManager::onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) {
if (first_segment_received_) {
- indexer_->onContentObject(std::move(interest), std::move(content_object));
+ indexer_->onContentObject(interest, content_object);
} else {
- std::uint32_t segment_number = interest->getName().getSuffix();
+ std::uint32_t segment_number = interest.getName().getSuffix();
if (segment_number == 0) {
// Check if manifest
- if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ if (content_object.getPayloadType() == core::PayloadType::MANIFEST) {
IncrementalIndexer *indexer =
static_cast<IncrementalIndexer *>(indexer_.release());
indexer_ =
@@ -49,25 +47,21 @@ void IndexManager::onContentObject(core::Interest::Ptr &&interest,
delete indexer;
}
- indexer_->onContentObject(std::move(interest), std::move(content_object));
+ indexer_->onContentObject(interest, content_object);
auto it = interest_data_set_.begin();
while (it != interest_data_set_.end()) {
- indexer_->onContentObject(
- std::move(const_cast<core::Interest::Ptr &&>(it->first)),
- std::move(const_cast<core::ContentObject::Ptr &&>(it->second)));
+ indexer_->onContentObject(*it->first, *it->second);
it = interest_data_set_.erase(it);
}
first_segment_received_ = true;
} else {
- interest_data_set_.emplace(std::move(interest),
- std::move(content_object));
+ interest_data_set_.emplace(interest.shared_from_this(),
+ content_object.shared_from_this());
}
}
}
-bool IndexManager::onKeyToVerify() { return indexer_->onKeyToVerify(); }
-
void IndexManager::reset(std::uint32_t offset) {
indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
reassembly_);
diff --git a/libtransport/src/protocols/indexer.h b/libtransport/src/protocols/indexer.h
index 8213a1503..49e22a4cf 100644
--- a/libtransport/src/protocols/indexer.h
+++ b/libtransport/src/protocols/indexer.h
@@ -33,10 +33,8 @@ class TransportProtocol;
class Indexer {
public:
- /**
- *
- */
virtual ~Indexer() = default;
+
/**
* Retrieve from the manifest the next suffix to retrieve.
*/
@@ -55,10 +53,8 @@ class Indexer {
virtual void reset(std::uint32_t offset = 0) = 0;
- virtual void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) = 0;
-
- virtual bool onKeyToVerify() = 0;
+ virtual void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) = 0;
};
class IndexManager : Indexer {
@@ -86,10 +82,8 @@ class IndexManager : Indexer {
void reset(std::uint32_t offset = 0) override;
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
-
- bool onKeyToVerify() override;
+ void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) override;
private:
std::unique_ptr<Indexer> indexer_;
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc
index da835b577..a6312ca90 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer.cc
+++ b/libtransport/src/protocols/manifest_incremental_indexer.cc
@@ -14,9 +14,9 @@
*/
#include <implementation/socket_consumer.h>
-
+#include <protocols/errors.h>
#include <protocols/manifest_incremental_indexer.h>
-#include <protocols/protocol.h>
+#include <protocols/transport_protocol.h>
#include <cmath>
#include <deque>
@@ -36,41 +36,46 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer(
0)) {}
void ManifestIncrementalIndexer::onContentObject(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- // Check if manifest or not
- if (content_object->getPayloadType() == PayloadType::MANIFEST) {
- TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
- onUntrustedManifest(std::move(interest), std::move(content_object));
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str());
- onUntrustedContentObject(std::move(interest), std::move(content_object));
- }
-}
-
-void ManifestIncrementalIndexer::onUntrustedManifest(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- auto ret = verification_manager_->onPacketToVerify(*content_object);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- processTrustedManifest(std::move(content_object));
+ core::Interest &interest, core::ContentObject &content_object) {
+ switch (content_object.getPayloadType()) {
+ case PayloadType::DATA: {
+ TRANSPORT_LOGD("Received content %s",
+ content_object.getName().toString().c_str());
+ onUntrustedContentObject(interest, content_object);
break;
}
- case VerificationPolicy::DROP_PACKET:
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
+ case PayloadType::MANIFEST: {
+ TRANSPORT_LOGD("Received manifest %s",
+ content_object.getName().toString().c_str());
+ onUntrustedManifest(interest, content_object);
break;
}
+ default: {
+ return;
+ }
}
}
-void ManifestIncrementalIndexer::processTrustedManifest(
- ContentObject::Ptr &&content_object) {
+void ManifestIncrementalIndexer::onUntrustedManifest(
+ core::Interest &interest, core::ContentObject &content_object) {
auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(*content_object));
+ std::make_unique<ContentObjectManifest>(std::move(content_object));
+
+ auth::VerificationPolicy policy = verifier_->verifyPackets(manifest.get());
+
manifest->decode();
+ if (policy != auth::VerificationPolicy::ACCEPT) {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ return;
+ }
+
+ processTrustedManifest(interest, std::move(manifest));
+}
+
+void ManifestIncrementalIndexer::processTrustedManifest(
+ core::Interest &interest, std::unique_ptr<ContentObjectManifest> manifest) {
if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
core::ManifestVersion::VERSION_1)) {
throw errors::RuntimeException("Received manifest with unknown version.");
@@ -78,23 +83,45 @@ void ManifestIncrementalIndexer::processTrustedManifest(
switch (manifest->getManifestType()) {
case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = manifest->getSuffixList().end();
-
suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber());
- for (; _it != _end; _it++) {
- auto hash =
- std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
+ // The packets to verify with the received manifest
+ std::vector<auth::PacketPtr> packets;
+
+ // Convert the received manifest to a map of packet suffixes to hashes
+ std::unordered_map<auth::Suffix, auth::HashEntry> current_manifest =
+ core::ContentObjectManifest::getSuffixMap(manifest.get());
+
+ // Update 'suffix_map_' with new hashes from the received manifest and
+ // build 'packets'
+ for (auto it = current_manifest.begin(); it != current_manifest.end();) {
+ if (unverified_segments_.find(it->first) ==
+ unverified_segments_.end()) {
+ suffix_map_[it->first] = std::move(it->second);
+ current_manifest.erase(it++);
+ continue;
+ }
- if (!checkUnverifiedSegments(_it->first, hash)) {
- suffix_hash_map_[_it->first] = std::move(hash);
+ packets.push_back(unverified_segments_[it->first].second.get());
+ it++;
+ }
+
+ // Verify unverified segments using the received manifest
+ std::vector<auth::VerificationPolicy> policies =
+ verifier_->verifyPackets(packets, current_manifest);
+
+ for (unsigned int i = 0; i < packets.size(); ++i) {
+ auth::Suffix suffix = packets[i]->getName().getSuffix();
+
+ if (policies[i] != auth::VerificationPolicy::UNKNOWN) {
+ unverified_segments_.erase(suffix);
}
+
+ applyPolicy(*unverified_segments_[suffix].first,
+ *unverified_segments_[suffix].second, policies[i]);
}
reassembly_->reassemble(std::move(manifest));
-
break;
}
case core::ManifestType::FLIC_MANIFEST: {
@@ -106,89 +133,47 @@ void ManifestIncrementalIndexer::processTrustedManifest(
}
}
-bool ManifestIncrementalIndexer::checkUnverifiedSegments(
- std::uint32_t suffix, const HashEntry &hash) {
- auto it = unverified_segments_.find(suffix);
-
- if (it != unverified_segments_.end()) {
- auto ret = verifyContentObject(hash, *it->second.second);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- reassembly_->reassemble(std::move(it->second.second));
- break;
- }
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(it->second.first),
- std::move(it->second.second));
- break;
- }
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
+void ManifestIncrementalIndexer::onUntrustedContentObject(
+ Interest &interest, ContentObject &content_object) {
+ auth::Suffix suffix = content_object.getName().getSuffix();
+ auth::VerificationPolicy policy =
+ verifier_->verifyPackets(&content_object, suffix_map_);
+
+ switch (policy) {
+ case auth::VerificationPolicy::UNKNOWN: {
+ unverified_segments_[suffix] = std::make_pair(
+ interest.shared_from_this(), content_object.shared_from_this());
+ break;
+ }
+ default: {
+ suffix_map_.erase(suffix);
+ break;
}
-
- unverified_segments_.erase(it);
- return true;
- }
-
- return false;
-}
-
-VerificationPolicy ManifestIncrementalIndexer::verifyContentObject(
- const HashEntry &manifest_hash, const ContentObject &content_object) {
- VerificationPolicy ret;
-
- auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second);
- auto data_packet_digest = content_object.computeDigest(manifest_hash.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first;
-
- if (utils::CryptoHash::compareBinaryDigest(
- data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) {
- ret = VerificationPolicy::ACCEPT_PACKET;
- } else {
- ConsumerContentObjectVerificationFailedCallback
- *verification_failed_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
- &verification_failed_callback);
- ret = (*verification_failed_callback)(
- *socket_->getInterface(), content_object,
- make_error_code(protocol_error::integrity_verification_failed));
}
- return ret;
+ applyPolicy(interest, content_object, policy);
}
-void ManifestIncrementalIndexer::onUntrustedContentObject(
- Interest::Ptr &&i, ContentObject::Ptr &&c) {
- auto suffix = c->getName().getSuffix();
- auto it = suffix_hash_map_.find(suffix);
-
- if (it != suffix_hash_map_.end()) {
- auto ret = verifyContentObject(it->second, *c);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- suffix_hash_map_.erase(it);
- reassembly_->reassemble(std::move(c));
- break;
- }
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(i), std::move(c));
- break;
- }
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
+void ManifestIncrementalIndexer::applyPolicy(
+ core::Interest &interest, core::ContentObject &content_object,
+ auth::VerificationPolicy policy) {
+ switch (policy) {
+ case auth::VerificationPolicy::ACCEPT: {
+ reassembly_->reassemble(content_object);
+ break;
+ }
+ case auth::VerificationPolicy::DROP: {
+ transport_protocol_->onPacketDropped(interest, content_object);
+ break;
+ }
+ case auth::VerificationPolicy::ABORT: {
+ transport_protocol_->onContentReassembled(
+ make_error_code(protocol_error::session_aborted));
+ break;
+ }
+ default: {
+ break;
}
- } else {
- unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c));
}
}
@@ -224,7 +209,7 @@ uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() {
void ManifestIncrementalIndexer::reset(std::uint32_t offset) {
IncrementalIndexer::reset(offset);
- suffix_hash_map_.clear();
+ suffix_map_.clear();
unverified_segments_.clear();
SuffixQueue empty;
std::swap(suffix_queue_, empty);
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.h b/libtransport/src/protocols/manifest_incremental_indexer.h
index 38b01533e..1bb76eb87 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer.h
+++ b/libtransport/src/protocols/manifest_incremental_indexer.h
@@ -15,6 +15,7 @@
#pragma once
+#include <hicn/transport/auth/common.h>
#include <implementation/socket.h>
#include <protocols/incremental_indexer.h>
#include <utils/suffix_strategy.h>
@@ -22,7 +23,6 @@
#include <list>
namespace transport {
-
namespace protocol {
class ManifestIncrementalIndexer : public IncrementalIndexer {
@@ -30,7 +30,8 @@ class ManifestIncrementalIndexer : public IncrementalIndexer {
public:
using SuffixQueue = std::queue<uint32_t>;
- using HashEntry = std::pair<std::vector<uint8_t>, utils::CryptoHashType>;
+ using InterestContentPair =
+ std::pair<core::Interest::Ptr, core::ContentObject::Ptr>;
ManifestIncrementalIndexer(implementation::ConsumerSocket *icn_socket,
TransportProtocol *transport,
@@ -50,8 +51,8 @@ class ManifestIncrementalIndexer : public IncrementalIndexer {
void reset(std::uint32_t offset = 0) override;
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
+ void onContentObject(core::Interest &interest,
+ core::ContentObject &content_object) override;
uint32_t getNextSuffix() override;
@@ -61,30 +62,24 @@ class ManifestIncrementalIndexer : public IncrementalIndexer {
uint32_t getFinalSuffix() override;
- private:
- void onUntrustedManifest(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object);
- void onUntrustedContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object);
- void processTrustedManifest(core::ContentObject::Ptr &&content_object);
- void onManifestReceived(core::Interest::Ptr &&i,
- core::ContentObject::Ptr &&c);
- void onManifestTimeout(core::Interest::Ptr &&i);
- VerificationPolicy verifyContentObject(
- const HashEntry &manifest_hash,
- const core::ContentObject &content_object);
- bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash);
-
protected:
std::unique_ptr<utils::SuffixStrategy> suffix_strategy_;
SuffixQueue suffix_queue_;
// Hash verification
- std::unordered_map<uint32_t, HashEntry> suffix_hash_map_;
+ std::unordered_map<auth::Suffix, auth::HashEntry> suffix_map_;
+ std::unordered_map<auth::Suffix, InterestContentPair> unverified_segments_;
- std::unordered_map<uint32_t,
- std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
- unverified_segments_;
+ private:
+ void onUntrustedManifest(core::Interest &interest,
+ core::ContentObject &content_object);
+ void processTrustedManifest(core::Interest &interest,
+ std::unique_ptr<ContentObjectManifest> manifest);
+ void onUntrustedContentObject(core::Interest &interest,
+ core::ContentObject &content_object);
+ void applyPolicy(core::Interest &interest,
+ core::ContentObject &content_object,
+ auth::VerificationPolicy policy);
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.cc b/libtransport/src/protocols/prod_protocol_bytestream.cc
new file mode 100644
index 000000000..6bd989fe4
--- /dev/null
+++ b/libtransport/src/protocols/prod_protocol_bytestream.cc
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/socket_producer.h>
+#include <protocols/prod_protocol_bytestream.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+using namespace implementation;
+
+ByteStreamProductionProtocol::ByteStreamProductionProtocol(
+ implementation::ProducerSocket *icn_socket)
+ : ProductionProtocol(icn_socket) {}
+
+ByteStreamProductionProtocol::~ByteStreamProductionProtocol() {
+ stop();
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+uint32_t ByteStreamProductionProtocol::produceDatagram(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
+ throw errors::NotImplementedException();
+}
+
+uint32_t ByteStreamProductionProtocol::produceDatagram(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size) {
+ throw errors::NotImplementedException();
+}
+
+uint32_t ByteStreamProductionProtocol::produceStream(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size,
+ bool is_last,
+ uint32_t start_offset) {
+ if (!buffer_size) {
+ return 0;
+ }
+
+ return produceStream(content_name,
+ utils::MemBuf::copyBuffer(buffer, buffer_size), is_last,
+ start_offset);
+}
+
+uint32_t ByteStreamProductionProtocol::produceStream(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
+ if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) {
+ return 0;
+ }
+
+ Name name(content_name);
+
+ // Get the atomic variables to ensure they keep the same value
+ // during the production
+
+ // Total size of the data packet
+ uint32_t data_packet_size;
+ socket_->getSocketOption(GeneralTransportOptions::DATA_PACKET_SIZE,
+ data_packet_size);
+
+ // Expiry time
+ uint32_t content_object_expiry_time;
+ socket_->getSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ content_object_expiry_time);
+
+ // Hash algorithm
+ auth::CryptoHashType hash_algo;
+ socket_->getSocketOption(GeneralTransportOptions::HASH_ALGORITHM, hash_algo);
+
+ // Use manifest
+ bool making_manifest;
+ socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest);
+
+ // Suffix calculation strategy
+ core::NextSegmentCalculationStrategy _suffix_strategy;
+ socket_->getSocketOption(GeneralTransportOptions::SUFFIX_STRATEGY,
+ _suffix_strategy);
+ auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy(
+ _suffix_strategy, start_offset);
+
+ std::shared_ptr<auth::Signer> signer;
+ socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer);
+
+ auto buffer_size = buffer->length();
+ int bytes_segmented = 0;
+ std::size_t header_size;
+ std::size_t manifest_header_size = 0;
+ std::size_t signature_length = 0;
+ std::uint32_t final_block_number = start_offset;
+ uint64_t free_space_for_content = 0;
+
+ core::Packet::Format format;
+ std::shared_ptr<core::ContentObjectManifest> manifest;
+ bool is_last_manifest = false;
+
+ // TODO Manifest may still be used for indexing
+ if (making_manifest && !signer) {
+ TRANSPORT_LOGE("Making manifests without setting producer identity.");
+ }
+
+ core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
+ core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
+
+ if (name.getType() == HNT_CONTIGUOUS_V4 || name.getType() == HNT_IOV_V4) {
+ hf_format = core::Packet::Format::HF_INET_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
+ } else if (name.getType() == HNT_CONTIGUOUS_V6 ||
+ name.getType() == HNT_IOV_V6) {
+ hf_format = core::Packet::Format::HF_INET6_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
+ } else {
+ throw errors::RuntimeException("Unknown name format.");
+ }
+
+ format = hf_format;
+ if (making_manifest) {
+ manifest_header_size = core::Packet::getHeaderSizeFromFormat(
+ signer ? hf_format_ah : hf_format,
+ signer ? signer->getSignatureSize() : 0);
+ } else if (signer) {
+ format = hf_format_ah;
+ signature_length = signer->getSignatureSize();
+ }
+
+ header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
+ free_space_for_content = data_packet_size - header_size;
+ uint32_t number_of_segments =
+ uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
+ if (free_space_for_content * number_of_segments < buffer_size) {
+ number_of_segments++;
+ }
+
+ // TODO allocate space for all the headers
+ if (making_manifest) {
+ uint32_t segment_in_manifest = static_cast<uint32_t>(
+ std::floor(double(data_packet_size - manifest_header_size -
+ ContentObjectManifest::getManifestHeaderSize()) /
+ ContentObjectManifest::getManifestEntrySize()) -
+ 1.0);
+ uint32_t number_of_manifests = static_cast<uint32_t>(
+ std::ceil(float(number_of_segments) / segment_in_manifest));
+ final_block_number += number_of_segments + number_of_manifests - 1;
+
+ manifest.reset(ContentObjectManifest::createManifest(
+ name.setSuffix(suffix_strategy->getNextManifestSuffix()),
+ core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
+ hash_algo, is_last_manifest, name, _suffix_strategy,
+ signer ? signer->getSignatureSize() : 0));
+ manifest->setLifetime(content_object_expiry_time);
+
+ if (is_last) {
+ manifest->setFinalBlockNumber(final_block_number);
+ } else {
+ manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX);
+ }
+ }
+
+ for (unsigned int packaged_segments = 0;
+ packaged_segments < number_of_segments; packaged_segments++) {
+ if (making_manifest) {
+ if (manifest->estimateManifestSize(2) >
+ data_packet_size - manifest_header_size) {
+ manifest->encode();
+
+ // If identity set, sign manifest
+ if (signer) {
+ signer->signPacket(manifest.get());
+ }
+
+ // Send the current manifest
+ passContentObjectToCallbacks(manifest);
+
+ TRANSPORT_LOGD("Send manifest %s",
+ manifest->getName().toString().c_str());
+
+ // Send content objects stored in the queue
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %s",
+ content_queue_.front()->getName().toString().c_str());
+ content_queue_.pop();
+ }
+
+ // Create new manifest. The reference to the last manifest has been
+ // acquired in the passContentObjectToCallbacks function, so we can
+ // safely release this reference
+ manifest.reset(ContentObjectManifest::createManifest(
+ name.setSuffix(suffix_strategy->getNextManifestSuffix()),
+ core::ManifestVersion::VERSION_1,
+ core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest,
+ name, _suffix_strategy, signer ? signer->getSignatureSize() : 0));
+
+ manifest->setLifetime(content_object_expiry_time);
+ manifest->setFinalBlockNumber(
+ is_last ? final_block_number
+ : utils::SuffixStrategy::INVALID_SUFFIX);
+ }
+ }
+
+ auto content_suffix = suffix_strategy->getNextContentSuffix();
+ auto content_object = std::make_shared<ContentObject>(
+ name.setSuffix(content_suffix), format,
+ signer && !making_manifest ? signer->getSignatureSize() : 0);
+ content_object->setLifetime(content_object_expiry_time);
+
+ auto b = buffer->cloneOne();
+ b->trimStart(free_space_for_content * packaged_segments);
+ b->trimEnd(b->length());
+
+ if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) {
+ b->append(buffer_size - bytes_segmented);
+ bytes_segmented += (int)(buffer_size - bytes_segmented);
+
+ if (is_last && making_manifest) {
+ is_last_manifest = true;
+ } else if (is_last) {
+ content_object->setRst();
+ }
+
+ } else {
+ b->append(free_space_for_content);
+ bytes_segmented += (int)(free_space_for_content);
+ }
+
+ content_object->appendPayload(std::move(b));
+
+ if (making_manifest) {
+ using namespace std::chrono_literals;
+ auth::CryptoHash hash = content_object->computeDigest(hash_algo);
+ manifest->addSuffixHash(content_suffix, hash);
+ content_queue_.push(content_object);
+ } else {
+ if (signer) {
+ signer->signPacket(content_object.get());
+ }
+ passContentObjectToCallbacks(content_object);
+ TRANSPORT_LOGD("Send content %s",
+ content_object->getName().toString().c_str());
+ }
+ }
+
+ if (making_manifest) {
+ if (is_last_manifest) {
+ manifest->setFinalManifest(is_last_manifest);
+ }
+
+ manifest->encode();
+
+ if (signer) {
+ signer->signPacket(manifest.get());
+ }
+
+ passContentObjectToCallbacks(manifest);
+ TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+
+ while (!content_queue_.empty()) {
+ passContentObjectToCallbacks(content_queue_.front());
+ TRANSPORT_LOGD("Send content %s",
+ content_queue_.front()->getName().toString().c_str());
+ content_queue_.pop();
+ }
+ }
+
+ portal_->getIoService().post([this]() {
+ std::shared_ptr<ContentObject> co;
+ while (object_queue_for_callbacks_.pop(co)) {
+ if (*on_new_segment_) {
+ on_new_segment_->operator()(*socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_to_sign_) {
+ on_content_object_to_sign_->operator()(*socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(
+ *socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *co);
+ }
+ }
+ });
+
+ portal_->getIoService().dispatch([this, buffer_size]() {
+ if (*on_content_produced_) {
+ on_content_produced_->operator()(*socket_->getInterface(),
+ std::make_error_code(std::errc(0)),
+ buffer_size);
+ }
+ });
+
+ return suffix_strategy->getTotalCount();
+}
+
+void ByteStreamProductionProtocol::scheduleSendBurst() {
+ portal_->getIoService().post([this]() {
+ std::shared_ptr<ContentObject> co;
+
+ for (uint32_t i = 0; i < burst_size; i++) {
+ if (object_queue_for_callbacks_.pop(co)) {
+ if (*on_new_segment_) {
+ on_new_segment_->operator()(*socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_to_sign_) {
+ on_content_object_to_sign_->operator()(*socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(
+ *socket_->getInterface(), *co);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *co);
+ }
+ } else {
+ break;
+ }
+ }
+ });
+}
+
+void ByteStreamProductionProtocol::passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ output_buffer_.insert(content_object);
+ portal_->sendContentObject(*content_object);
+ object_queue_for_callbacks_.push(std::move(content_object));
+
+ if (object_queue_for_callbacks_.size() >= burst_size) {
+ scheduleSendBurst();
+ }
+}
+
+void ByteStreamProductionProtocol::onInterest(Interest &interest) {
+ TRANSPORT_LOGD("Received interest for %s",
+ interest.getName().toString().c_str());
+ if (*on_interest_input_) {
+ on_interest_input_->operator()(*socket_->getInterface(), interest);
+ }
+
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(interest);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(),
+ interest);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ }
+}
+
+void ByteStreamProductionProtocol::onError(std::error_code ec) {}
+
+} // namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/prod_protocol_bytestream.h b/libtransport/src/protocols/prod_protocol_bytestream.h
new file mode 100644
index 000000000..cf36b90a5
--- /dev/null
+++ b/libtransport/src/protocols/prod_protocol_bytestream.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/ring_buffer.h>
+#include <protocols/production_protocol.h>
+
+#include <atomic>
+#include <queue>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class ByteStreamProductionProtocol : public ProductionProtocol {
+ static constexpr uint32_t burst_size = 256;
+
+ public:
+ ByteStreamProductionProtocol(implementation::ProducerSocket *icn_socket);
+
+ ~ByteStreamProductionProtocol() override;
+
+ using ProductionProtocol::start;
+ using ProductionProtocol::stop;
+
+ uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) override;
+ uint32_t produceStream(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) override;
+ uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) override;
+ uint32_t produceDatagram(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size) override;
+
+ protected:
+ // Consumer Callback
+ // void reset() override;
+ void onInterest(core::Interest &i) override;
+ void onError(std::error_code ec) override;
+
+ private:
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object);
+ void scheduleSendBurst();
+
+ private:
+ // While manifests are being built, contents are stored in a queue
+ std::queue<std::shared_ptr<ContentObject>> content_queue_;
+ utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
+ object_queue_for_callbacks_;
+};
+
+} // end namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
new file mode 100644
index 000000000..8081923e3
--- /dev/null
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -0,0 +1,481 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <implementation/socket_producer.h>
+#include <protocols/prod_protocol_rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include <unordered_set>
+
+namespace transport {
+namespace protocol {
+
+RTCProductionProtocol::RTCProductionProtocol(
+ implementation::ProducerSocket *icn_socket)
+ : ProductionProtocol(icn_socket),
+ current_seg_(1),
+ produced_bytes_(0),
+ produced_packets_(0),
+ max_packet_production_(1),
+ bytes_production_rate_(0),
+ packets_production_rate_(0),
+ last_round_(std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count()),
+ allow_delayed_nacks_(false),
+ queue_timer_on_(false),
+ consumer_in_sync_(false),
+ on_consumer_in_sync_(nullptr) {
+ srand((unsigned int)time(NULL));
+ prod_label_ = rand() % 256;
+ interests_queue_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ setOutputBufferSize(10000);
+ scheduleRoundTimer();
+}
+
+RTCProductionProtocol::~RTCProductionProtocol() {}
+
+void RTCProductionProtocol::registerNamespaceWithNetwork(
+ const Prefix &producer_namespace) {
+ ProductionProtocol::registerNamespaceWithNetwork(producer_namespace);
+
+ flow_name_ = producer_namespace.getName();
+ auto family = flow_name_.getAddressFamily();
+
+ switch (family) {
+ case AF_INET6:
+ header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ break;
+ case AF_INET:
+ header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
+ break;
+ default:
+ throw errors::RuntimeException("Unknown name format.");
+ }
+}
+
+void RTCProductionProtocol::scheduleRoundTimer() {
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
+}
+
+void RTCProductionProtocol::updateStats() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t duration = now - last_round_;
+ if (duration == 0) duration = 1;
+ double per_second = rtc::MILLI_IN_A_SEC / duration;
+
+ uint32_t prev_packets_production_rate = packets_production_rate_;
+
+ bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
+ packets_production_rate_ = ceil((double)produced_packets_ * per_second);
+
+ TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u",
+ produced_bytes_, bytes_production_rate_);
+
+ // update the production rate as soon as it increases by 10% with respect to
+ // the last round
+ max_packet_production_ =
+ produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ if (max_packet_production_ < rtc::WIN_MIN)
+ max_packet_production_ = rtc::WIN_MIN;
+
+ if (packets_production_rate_ != 0) {
+ allow_delayed_nacks_ = false;
+ } else if (prev_packets_production_rate == 0) {
+ // at least 2 rounds with production rate = 0
+ allow_delayed_nacks_ = true;
+ }
+
+ // check if the production rate is decreased. if yes send nacks if needed
+ if (prev_packets_production_rate < packets_production_rate_) {
+ sendNacksForPendingInterests();
+ }
+
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
+}
+
+uint32_t RTCProductionProtocol::produceStream(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
+ throw errors::NotImplementedException();
+}
+
+uint32_t RTCProductionProtocol::produceStream(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size, bool is_last,
+ uint32_t start_offset) {
+ throw errors::NotImplementedException();
+}
+
+void RTCProductionProtocol::produce(ContentObject &content_object) {
+ throw errors::NotImplementedException();
+}
+
+uint32_t RTCProductionProtocol::produceDatagram(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
+ std::size_t buffer_size = buffer->length();
+ if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0;
+
+ uint32_t data_packet_size;
+ socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
+ data_packet_size);
+
+ if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ +
+ rtc::DATA_HEADER_SIZE) > data_packet_size)) {
+ return 0;
+ }
+
+ auto content_object =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ // add rtc header to the payload
+ struct rtc::data_packet_t header;
+ content_object->appendPayload((const uint8_t *)&header,
+ rtc::DATA_HEADER_SIZE);
+ content_object->appendPayload(buffer->data(), buffer->length());
+
+ std::shared_ptr<ContentObject> co = std::move(content_object);
+
+ // schedule actual sending on internal thread
+ portal_->getIoService().dispatch(
+ [this, content_object{std::move(co)}, content_name]() mutable {
+ produceInternal(std::move(content_object), content_name);
+ });
+
+ return 1;
+}
+
+void RTCProductionProtocol::produceInternal(
+ std::shared_ptr<ContentObject> &&content_object, const Name &content_name) {
+ // set rtc header
+ struct rtc::data_packet_t *data_pkt =
+ (struct rtc::data_packet_t *)content_object->getPayload()->data();
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ data_pkt->setTimestamp(now);
+ data_pkt->setProductionRate(bytes_production_rate_);
+
+ // set hicn stuff
+ Name n(content_name);
+ content_object->setName(n.setSuffix(current_seg_));
+ content_object->setLifetime(500); // XXX this should be set by the APP
+ content_object->setPathLabel(prod_label_);
+
+ // update stats
+ produced_bytes_ +=
+ content_object->headerSize() + content_object->payloadSize();
+ produced_packets_++;
+
+ if (produced_packets_ >= max_packet_production_) {
+ // in this case all the pending interests may be used to accomodate the
+ // sudden increase in the production rate. calling the updateStats we will
+ // notify all the clients
+ round_timer_->cancel();
+ updateStats();
+ }
+
+ TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str());
+
+ output_buffer_.insert(content_object);
+
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+
+ // remove interests from the interest cache if it exists
+ removeFromInterestQueue(current_seg_);
+
+ current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ;
+}
+
+void RTCProductionProtocol::onInterest(Interest &interest) {
+ uint32_t interest_seg = interest.getName().getSuffix();
+ uint32_t lifetime = interest.getLifetime();
+
+ if (interest_seg == 0) {
+ // first packet from the consumer, reset sync state
+ consumer_in_sync_ = false;
+ }
+
+ if (*on_interest_input_) {
+ on_interest_input_->operator()(*socket_->getInterface(), interest);
+ }
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (interest_seg > rtc::MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("received probe %u", interest_seg);
+ sendNack(interest_seg);
+ return;
+ }
+
+ TRANSPORT_LOGD("received interest %u", interest_seg);
+
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(interest);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(),
+ interest);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+
+ TRANSPORT_LOGD("Send content %u (onInterest)",
+ content_object->getName().getSuffix());
+ portal_->sendContentObject(*content_object);
+ return;
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ }
+
+ // if the production rate 0 use delayed nacks
+ if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
+ uint64_t next_timer = ~0;
+ if (!timers_map_.empty()) {
+ next_timer = timers_map_.begin()->first;
+ }
+
+ uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL;
+ addToInterestQueue(interest_seg, expiration);
+
+ // here we have at least one interest in the queue, we need to start or
+ // update the timer
+ if (!queue_timer_on_) {
+ // set timeout
+ queue_timer_on_ = true;
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ } else {
+ // re-schedule the timer because a new interest will expires sooner
+ if (next_timer > timers_map_.begin()->first) {
+ interests_queue_timer_->cancel();
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ }
+ }
+ return;
+ }
+
+ if (queue_timer_on_) {
+ // the producer is producing. Send nacks to packets that will expire before
+ // the data production and remove the timer
+ queue_timer_on_ = false;
+ interests_queue_timer_->cancel();
+ sendNacksForPendingInterests();
+ }
+
+ uint32_t max_gap = (uint32_t)floor(
+ (double)((double)((double)lifetime *
+ rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
+ rtc::MILLI_IN_A_SEC) *
+ (double)packets_production_rate_));
+
+ if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
+ sendNack(interest_seg);
+ } else {
+ if (!consumer_in_sync_ && on_consumer_in_sync_) {
+ // we consider the remote consumer to be in sync as soon as it covers 70%
+ // of the production window with interests
+ uint32_t perc = ceil((double)max_gap * 0.7);
+ if (interest_seg > (perc + current_seg_)) {
+ consumer_in_sync_ = true;
+ on_consumer_in_sync_(*socket_->getInterface(), interest);
+ }
+ }
+ uint64_t expiration =
+ now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
+ addToInterestQueue(interest_seg, expiration);
+ }
+}
+
+void RTCProductionProtocol::onError(std::error_code ec) {}
+
+void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) {
+ interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ interests_queue_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ interestQueueTimer();
+ });
+}
+
+void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
+ uint64_t expiration) {
+ // check if the seq number exists already
+ auto it_seqs = seqs_map_.find(interest_seg);
+ if (it_seqs != seqs_map_.end()) {
+ // the seq already exists
+ if (expiration < it_seqs->second) {
+ // we need to update the timer becasue we got a smaller one
+ // 1) remove the entry from the multimap
+ // 2) update this entry
+ auto range = timers_map_.equal_range(it_seqs->second);
+ for (auto it_timers = range.first; it_timers != range.second;
+ it_timers++) {
+ if (it_timers->second == it_seqs->first) {
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interest_seg));
+ it_seqs->second = expiration;
+ } else {
+ // nothing to do here
+ return;
+ }
+ } else {
+ // add the new seq
+ timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg));
+ seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration));
+ }
+}
+
+void RTCProductionProtocol::sendNacksForPendingInterests() {
+ std::unordered_set<uint32_t> to_remove;
+
+ uint32_t packet_gap = 100000; // set it to a high value (100sec)
+ if (packets_production_rate_ != 0)
+ packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
+ if (it->first > current_seg_) {
+ uint64_t production_time =
+ ((it->first - current_seg_) * packet_gap) + now;
+ if (production_time >= it->second) {
+ sendNack(it->first);
+ to_remove.insert(it->first);
+ }
+ }
+ }
+
+ // delete nacked interests
+ for (auto it = to_remove.begin(); it != to_remove.end(); it++) {
+ removeFromInterestQueue(*it);
+ }
+}
+
+void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) {
+ auto seq_it = seqs_map_.find(interest_seg);
+ if (seq_it != seqs_map_.end()) {
+ auto range = timers_map_.equal_range(seq_it->second);
+ for (auto it_timers = range.first; it_timers != range.second; it_timers++) {
+ if (it_timers->second == seq_it->first) {
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ seqs_map_.erase(seq_it);
+ }
+}
+
+void RTCProductionProtocol::interestQueueTimer() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
+ uint64_t expire = it_timers->first;
+ if (expire <= now) {
+ uint32_t seq = it_timers->second;
+ sendNack(seq);
+ // remove the interest from the other map
+ seqs_map_.erase(seq);
+ it_timers = timers_map_.erase(it_timers);
+ } else {
+ // stop, we are done!
+ break;
+ }
+ }
+ if (timers_map_.empty()) {
+ queue_timer_on_ = false;
+ } else {
+ queue_timer_on_ = true;
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ }
+}
+
+void RTCProductionProtocol::sendNack(uint32_t sequence) {
+ auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint32_t next_packet = current_seg_;
+ uint32_t prod_rate = bytes_production_rate_;
+
+ struct rtc::nack_packet_t header;
+ header.setTimestamp(now);
+ header.setProductionRate(prod_rate);
+ header.setProductionSegement(next_packet);
+ nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE);
+
+ Name n(flow_name_);
+ n.setSuffix(sequence);
+ nack->setName(n);
+ nack->setLifetime(0);
+ nack->setPathLabel(prod_label_);
+
+ if (!consumer_in_sync_ && on_consumer_in_sync_ &&
+ sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) {
+ consumer_in_sync_ = true;
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(n);
+ on_consumer_in_sync_(*socket_->getInterface(), *interest);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *nack);
+ }
+
+ TRANSPORT_LOGD("Send nack %u", sequence);
+ portal_->sendContentObject(*nack);
+}
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/prod_protocol_rtc.h b/libtransport/src/protocols/prod_protocol_rtc.h
new file mode 100644
index 000000000..f3584f74a
--- /dev/null
+++ b/libtransport/src/protocols/prod_protocol_rtc.h
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/name.h>
+#include <protocols/production_protocol.h>
+
+#include <atomic>
+#include <map>
+#include <mutex>
+
+namespace transport {
+namespace protocol {
+
+class RTCProductionProtocol : public ProductionProtocol {
+ public:
+ RTCProductionProtocol(implementation::ProducerSocket *icn_socket);
+ ~RTCProductionProtocol() override;
+
+ using ProductionProtocol::start;
+ using ProductionProtocol::stop;
+
+ void produce(ContentObject &content_object) override;
+ uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) override;
+ uint32_t produceStream(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size, bool is_last = true,
+ uint32_t start_offset = 0) override;
+ uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) override;
+ uint32_t produceDatagram(const Name &content_name, const uint8_t *buffer,
+ size_t buffer_size) override {
+ return produceDatagram(content_name, utils::MemBuf::wrapBuffer(
+ buffer, buffer_size, buffer_size));
+ }
+
+ void registerNamespaceWithNetwork(const Prefix &producer_namespace) override;
+
+ void setConsumerInSyncCallback(
+ interface::ProducerInterestCallback &&callback) {
+ on_consumer_in_sync_ = std::move(callback);
+ }
+
+ private:
+ // packet handlers
+ void onInterest(Interest &interest) override;
+ void onError(std::error_code ec) override;
+ void produceInternal(std::shared_ptr<ContentObject> &&content_object,
+ const Name &content_name);
+ void sendNack(uint32_t sequence);
+
+ // stats
+ void updateStats();
+ void scheduleRoundTimer();
+
+ // pending intersts functions
+ void addToInterestQueue(uint32_t interest_seg, uint64_t expiration);
+ void sendNacksForPendingInterests();
+ void removeFromInterestQueue(uint32_t interest_seg);
+ void scheduleQueueTimer(uint64_t wait);
+ void interestQueueTimer();
+
+ core::Name flow_name_;
+
+ uint32_t current_seg_; // seq id of the next packet produced
+ uint32_t prod_label_; // path lable of the producer
+ uint16_t header_size_; // hicn header size
+
+ uint32_t produced_bytes_; // bytes produced in the last round
+ uint32_t produced_packets_; // packet produed in the last round
+
+ uint32_t max_packet_production_; // never exceed this number of packets
+ // without update stats
+
+ uint32_t bytes_production_rate_; // bytes per sec
+ uint32_t packets_production_rate_; // pps
+
+ std::unique_ptr<asio::steady_timer> round_timer_;
+ uint64_t last_round_;
+
+ // delayed nacks are used by the producer to avoid to send too
+ // many nacks we the producer rate is 0. however, if the producer moves
+ // from a production rate higher than 0 to 0 the first round the dealyed
+ // should be avoided in order to notify the consumer as fast as possible
+ // of the new rate.
+ bool allow_delayed_nacks_;
+
+ // queue for the received interests
+ // this map maps the expiration time of an interest to
+ // its sequence number. the map is sorted by timeouts
+ // the same timeout may be used for multiple sequence numbers
+ // but for each sequence number we store only the smallest
+ // expiry time. In this way the mapping from seqs_map_ to
+ // timers_map_ is unique
+ std::multimap<uint64_t, uint32_t> timers_map_;
+
+ // this map does the opposite, this map is not ordered
+ std::unordered_map<uint32_t, uint64_t> seqs_map_;
+ bool queue_timer_on_;
+ std::unique_ptr<asio::steady_timer> interests_queue_timer_;
+
+ // this callback is called when the remote consumer is in sync with high
+ // probability. it is called only the first time that the switch happen.
+ // XXX this makes sense only in P2P mode, while in standard mode is
+ // impossible to know the state of the consumers so it should not be used.
+ bool consumer_in_sync_;
+ interface::ProducerInterestCallback on_consumer_in_sync_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc
new file mode 100644
index 000000000..8addf52d1
--- /dev/null
+++ b/libtransport/src/protocols/production_protocol.cc
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <implementation/socket_producer.h>
+#include <protocols/production_protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ProductionProtocol::ProductionProtocol(
+ implementation::ProducerSocket *icn_socket)
+ : socket_(icn_socket),
+ is_running_(false),
+ on_interest_input_(VOID_HANDLER),
+ on_interest_dropped_input_buffer_(VOID_HANDLER),
+ on_interest_inserted_input_buffer_(VOID_HANDLER),
+ on_interest_satisfied_output_buffer_(VOID_HANDLER),
+ on_interest_process_(VOID_HANDLER),
+ on_new_segment_(VOID_HANDLER),
+ on_content_object_to_sign_(VOID_HANDLER),
+ on_content_object_in_output_buffer_(VOID_HANDLER),
+ on_content_object_output_(VOID_HANDLER),
+ on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
+ on_content_produced_(VOID_HANDLER) {
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+ // TODO add statistics for producer
+ // socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
+}
+
+ProductionProtocol::~ProductionProtocol() {
+ if (!is_async_ && is_running_) {
+ stop();
+ }
+
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+int ProductionProtocol::start() {
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ &on_interest_input_);
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP,
+ &on_interest_dropped_input_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS,
+ &on_interest_inserted_input_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT,
+ &on_interest_satisfied_output_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ &on_interest_process_);
+ socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT,
+ &on_new_segment_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY,
+ &on_content_object_in_output_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT,
+ &on_content_object_output_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN,
+ &on_content_object_to_sign_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ &on_content_produced_);
+
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
+
+ bool first = true;
+
+ for (core::Prefix &producer_namespace : served_namespaces_) {
+ if (first) {
+ core::BindConfig bind_config(producer_namespace, 1000);
+ portal_->bind(bind_config);
+ portal_->setProducerCallback(this);
+ first = !first;
+ } else {
+ portal_->registerRoute(producer_namespace);
+ }
+ }
+
+ is_running_ = true;
+
+ if (!is_async_) {
+ listening_thread_ = std::thread([this]() { portal_->runEventsLoop(); });
+ }
+
+ return 0;
+}
+
+void ProductionProtocol::stop() {
+ is_running_ = false;
+
+ if (!is_async_) {
+ portal_->stopEventsLoop();
+ } else {
+ portal_->clear();
+ }
+}
+
+void ProductionProtocol::produce(ContentObject &content_object) {
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(*socket_->getInterface(),
+ content_object);
+ }
+
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this()));
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ content_object);
+ }
+
+ portal_->sendContentObject(content_object);
+}
+
+void ProductionProtocol::registerNamespaceWithNetwork(
+ const Prefix &producer_namespace) {
+ served_namespaces_.push_back(producer_namespace);
+}
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/production_protocol.h b/libtransport/src/protocols/production_protocol.h
new file mode 100644
index 000000000..780972321
--- /dev/null
+++ b/libtransport/src/protocols/production_protocol.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/interfaces/statistics.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <implementation/socket.h>
+#include <utils/content_store.h>
+
+#include <atomic>
+#include <thread>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class ProductionProtocol : public Portal::ProducerCallback {
+ public:
+ ProductionProtocol(implementation::ProducerSocket *icn_socket);
+ virtual ~ProductionProtocol();
+
+ bool isRunning() { return is_running_; }
+
+ virtual int start();
+ virtual void stop();
+
+ virtual void produce(ContentObject &content_object);
+ virtual uint32_t produceStream(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last = true,
+ uint32_t start_offset = 0) = 0;
+ virtual uint32_t produceStream(const Name &content_name,
+ const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true,
+ uint32_t start_offset = 0) = 0;
+ virtual uint32_t produceDatagram(const Name &content_name,
+ std::unique_ptr<utils::MemBuf> &&buffer) = 0;
+ virtual uint32_t produceDatagram(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size) = 0;
+
+ void setOutputBufferSize(std::size_t size) { output_buffer_.setLimit(size); }
+ std::size_t getOutputBufferSize() { return output_buffer_.getLimit(); }
+
+ virtual void registerNamespaceWithNetwork(const Prefix &producer_namespace);
+ const std::list<Prefix> &getNamespaces() const { return served_namespaces_; }
+
+ protected:
+ // Producer callback
+ virtual void onInterest(core::Interest &i) override = 0;
+ virtual void onError(std::error_code ec) override{};
+
+ protected:
+ implementation::ProducerSocket *socket_;
+
+ // Thread pool responsible for IO operations (send data / receive interests)
+ std::vector<utils::EventThread> io_threads_;
+
+ // TODO remove this thread
+ std::thread listening_thread_;
+ std::shared_ptr<Portal> portal_;
+ std::atomic<bool> is_running_;
+ interface::ProductionStatistics *stats_;
+
+ // Callbacks
+ interface::ProducerInterestCallback *on_interest_input_;
+ interface::ProducerInterestCallback *on_interest_dropped_input_buffer_;
+ interface::ProducerInterestCallback *on_interest_inserted_input_buffer_;
+ interface::ProducerInterestCallback *on_interest_satisfied_output_buffer_;
+ interface::ProducerInterestCallback *on_interest_process_;
+
+ interface::ProducerContentObjectCallback *on_new_segment_;
+ interface::ProducerContentObjectCallback *on_content_object_to_sign_;
+ interface::ProducerContentObjectCallback *on_content_object_in_output_buffer_;
+ interface::ProducerContentObjectCallback *on_content_object_output_;
+ interface::ProducerContentObjectCallback
+ *on_content_object_evicted_from_output_buffer_;
+
+ interface::ProducerContentCallback *on_content_produced_;
+
+ // Output buffer
+ utils::ContentStore output_buffer_;
+
+ // List ot routes served by current producer protocol
+ std::list<Prefix> served_namespaces_;
+
+ bool is_async_;
+};
+
+} // end namespace protocol
+} // end namespace transport
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 5023adf2e..bc8500227 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
@@ -126,10 +127,6 @@ void RaaqmTransportProtocol::reset() {
}
}
-bool RaaqmTransportProtocol::verifyKeyPackets() {
- return index_manager_->onKeyToVerify();
-}
-
void RaaqmTransportProtocol::increaseWindow() {
// return;
double max_window_size = 0.;
@@ -325,8 +322,8 @@ void RaaqmTransportProtocol::init() {
is.close();
}
-void RaaqmTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
// Check whether makes sense to continue
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
return;
@@ -334,54 +331,53 @@ void RaaqmTransportProtocol::onContentObject(
// Call application-defined callbacks
if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), *content_object);
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
}
if (*on_interest_satisfied_) {
- (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
+ (*on_interest_satisfied_)(*socket_->getInterface(), interest);
}
- if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- stats_->updateBytesRecv(content_object->payloadSize());
+ if (content_object.getPayloadType() == PayloadType::DATA) {
+ stats_->updateBytesRecv(content_object.payloadSize());
}
- onContentSegment(std::move(interest), std::move(content_object));
+ onContentSegment(interest, content_object);
scheduleNextInterests();
}
-void RaaqmTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
+void RaaqmTransportProtocol::onContentSegment(Interest &interest,
+ ContentObject &content_object) {
+ uint32_t incremental_suffix = content_object.getName().getSuffix();
// Decrease in-flight interests
interests_in_flight_--;
// Update stats
if (!interest_retransmissions_[incremental_suffix & mask]) {
- afterContentReception(*interest, *content_object);
+ afterContentReception(interest, content_object);
}
- index_manager_->onContentObject(std::move(interest),
- std::move(content_object));
+ index_manager_->onContentObject(interest, content_object);
}
-void RaaqmTransportProtocol::onPacketDropped(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+void RaaqmTransportProtocol::onPacketDropped(Interest &interest,
+ ContentObject &content_object) {
uint32_t max_rtx = 0;
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
- uint64_t segment = interest->getName().getSuffix();
+ uint64_t segment = interest.getName().getSuffix();
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
if (*on_interest_retransmission_) {
- (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
+ (*on_interest_retransmission_)(*socket_->getInterface(), interest);
}
if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
+ (*on_interest_output_)(*socket_->getInterface(), interest);
}
if (!is_running_) {
@@ -389,7 +385,7 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
- interest_to_retransmit_.push(std::move(interest));
+ interest_to_retransmit_.push(interest.shared_from_this());
} else {
TRANSPORT_LOGE(
"Stop: received not trusted packet %llu times",
@@ -477,6 +473,11 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
sendInterest(std::move(interest_to_retransmit_.front()));
interest_to_retransmit_.pop();
} else {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ TRANSPORT_LOGI("Adios");
+ break;
+ }
+
index = index_manager_->getNextSuffix();
if (index == IndexManager::invalid_index) {
break;
@@ -487,8 +488,8 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getPacket();
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
name->setSuffix((uint32_t)next_suffix);
@@ -502,19 +503,12 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
if (*on_interest_output_) {
on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return false;
- }
-
// This is set to ~0 so that the next interest_retransmissions_ + 1,
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
-
- return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index fce4194d4..be477d39f 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -18,9 +18,9 @@
#include <hicn/transport/utils/chrono_typedefs.h>
#include <protocols/byte_stream_reassembly.h>
#include <protocols/congestion_window_protocol.h>
-#include <protocols/protocol.h>
#include <protocols/raaqm_data_path.h>
#include <protocols/rate_estimation.h>
+#include <protocols/transport_protocol.h>
#include <queue>
#include <vector>
@@ -42,8 +42,6 @@ class RaaqmTransportProtocol : public TransportProtocol,
void reset() override;
- virtual bool verifyKeyPackets() override;
-
protected:
static constexpr uint32_t buffer_size =
1 << interface::default_values::log_2_default_buffer_size;
@@ -64,13 +62,12 @@ class RaaqmTransportProtocol : public TransportProtocol,
private:
void init();
- void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override;
+ void onContentObject(Interest &i, ContentObject &c) override;
- void onContentSegment(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object);
+ void onContentSegment(Interest &interest, ContentObject &content_object);
- void onPacketDropped(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) override;
+ void onPacketDropped(Interest &interest,
+ ContentObject &content_object) override;
void onReassemblyFailed(std::uint32_t missing_segment) override;
@@ -78,7 +75,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void scheduleNextInterests() override;
- bool sendInterest(std::uint64_t next_suffix);
+ void sendInterest(std::uint64_t next_suffix);
void sendInterest(Interest::Ptr &&interest);
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 8bbbadcf2..f2c21b9ef 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -14,7 +14,6 @@
*/
#include <hicn/transport/utils/chrono_typedefs.h>
-
#include <protocols/raaqm_data_path.h>
namespace transport {
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index 3f037bc76..c0b53a690 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -16,7 +16,6 @@
#pragma once
#include <hicn/transport/utils/chrono_typedefs.h>
-
#include <utils/min_filter.h>
#include <chrono>
diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc
index a2cf1aefe..5ca925760 100644
--- a/libtransport/src/protocols/rate_estimation.cc
+++ b/libtransport/src/protocols/rate_estimation.cc
@@ -15,7 +15,6 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/utils/log.h>
-
#include <protocols/rate_estimation.h>
#include <thread>
diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h
index 17f39e0b9..42ae74194 100644
--- a/libtransport/src/protocols/rate_estimation.h
+++ b/libtransport/src/protocols/rate_estimation.h
@@ -16,7 +16,6 @@
#pragma once
#include <hicn/transport/interfaces/statistics.h>
-
#include <protocols/raaqm_data_path.h>
#include <chrono>
diff --git a/libtransport/src/protocols/reassembly.cc b/libtransport/src/protocols/reassembly.cc
index c6602153c..0e59832dc 100644
--- a/libtransport/src/protocols/reassembly.cc
+++ b/libtransport/src/protocols/reassembly.cc
@@ -16,7 +16,6 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/array.h>
#include <hicn/transport/utils/membuf.h>
-
#include <implementation/socket_consumer.h>
#include <protocols/errors.h>
#include <protocols/indexer.h>
diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h
index fdc9f2a05..385122c53 100644
--- a/libtransport/src/protocols/reassembly.h
+++ b/libtransport/src/protocols/reassembly.h
@@ -46,7 +46,7 @@ class Reassembly {
virtual ~Reassembly() = default;
- virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0;
+ virtual void reassemble(core::ContentObject &content_object) = 0;
virtual void reassemble(
std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
virtual void reInitialize() = 0;
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
new file mode 100644
index 000000000..77f065d0e
--- /dev/null
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -0,0 +1,38 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc
new file mode 100644
index 000000000..e2d44ae66
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.cc
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/log.h>
+#include <protocols/rtc/congestion_detection.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+CongestionDetection::CongestionDetection()
+ : cc_estimator_(), last_processed_chunk_() {}
+
+CongestionDetection::~CongestionDetection() {}
+
+void CongestionDetection::updateStats() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_number_.empty()) return;
+
+ uint32_t chunk_number = chunks_number_.front();
+
+ while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS <
+ now ||
+ chunks_[chunk_number].isComplete()) {
+ if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) {
+ chunks_[chunk_number].setPreviousSentTime(
+ last_processed_chunk_.getSentTime());
+
+ chunks_[chunk_number].setPreviousReceivedTime(
+ last_processed_chunk_.getReceivedTime());
+ cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(),
+ chunks_[chunk_number].getSentDelta(),
+ chunks_[chunk_number].getSentTime(),
+ chunks_[chunk_number].getReceivedTime(),
+ chunks_[chunk_number].getFrameSize(), true);
+
+ } else {
+ TRANSPORT_LOGD(
+ "CongestionDetection::updateStats frame %u but not the \
+ previous one, last one was %u currentFrame %u",
+ chunk_number, last_processed_chunk_.getFrameSeqNum(),
+ chunks_[chunk_number].getFrameSeqNum());
+ }
+
+ last_processed_chunk_ = chunks_[chunk_number];
+
+ chunks_.erase(chunk_number);
+
+ chunks_number_.pop();
+ if (chunks_number_.empty()) break;
+
+ chunk_number = chunks_number_.front();
+ }
+}
+
+void CongestionDetection::addPacket(const core::ContentObject &content_object) {
+ auto payload = content_object.getPayload();
+ uint32_t payload_size = (uint32_t)payload->length();
+ uint32_t segmentNumber = content_object.getName().getSuffix();
+ // uint32_t pkt = segmentNumber & modMask_;
+ uint64_t *sentTimePtr = (uint64_t *)payload->data();
+
+ // this is just for testing with hiperf, assuming a frame is 10 pkts
+ // in the final version, the split should be based on the timestamp in the pkt
+ uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE);
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_.find(frameNum) == chunks_.end()) {
+ // new chunk of pkts or out of order
+ if (last_processed_chunk_.getFrameSeqNum() > frameNum)
+ return; // out of order and we already processed the chunk
+
+ chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE);
+ chunks_number_.push(frameNum);
+ }
+
+ chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size);
+}
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h
new file mode 100644
index 000000000..17f4aa54c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.h
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/content_object.h>
+#include <protocols/rtc/trendline_estimator.h>
+
+#include <map>
+#include <queue>
+
+#define HICN_CC_STATS_CHUNK_SIZE 10
+#define HICN_CC_STATS_MAX_DELAY_MS 100
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class FrameStats {
+ public:
+ FrameStats()
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(HICN_CC_STATS_CHUNK_SIZE){};
+
+ FrameStats(uint32_t burst_size)
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time,
+ uint32_t size, FrameStats previousFrame, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(sent_time),
+ received_time_(received_time),
+ previous_sent_time_(previousFrame.getSentTime()),
+ previous_received_time_(previousFrame.getReceivedTime()),
+ size_(size),
+ received_pkt_m(1),
+ burst_size_m(burst_size){};
+
+ void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) {
+ size_ += size;
+ sent_time_ =
+ (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time);
+ received_time_ = std::max(received_time, received_time_);
+ received_pkt_m++;
+ }
+
+ bool isComplete() { return received_pkt_m == burst_size_m; }
+
+ uint32_t getFrameSeqNum() const { return frame_num_; }
+ uint64_t getSentTime() const { return sent_time_; }
+ uint64_t getReceivedTime() const { return received_time_; }
+ uint32_t getFrameSize() const { return size_; }
+
+ void setPreviousReceivedTime(uint64_t time) {
+ previous_received_time_ = time;
+ }
+ void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; }
+
+ // todo manage first frame
+ double getReceivedDelta() {
+ return static_cast<double>(received_time_ - previous_received_time_);
+ }
+ double getSentDelta() {
+ return static_cast<double>(sent_time_ - previous_sent_time_);
+ }
+
+ private:
+ uint32_t frame_num_;
+ uint64_t sent_time_;
+ uint64_t received_time_;
+
+ uint64_t previous_sent_time_;
+ uint64_t previous_received_time_;
+ uint32_t size_;
+
+ uint32_t received_pkt_m;
+ uint32_t burst_size_m;
+};
+
+class CongestionDetection {
+ public:
+ CongestionDetection();
+ ~CongestionDetection();
+
+ void addPacket(const core::ContentObject &content_object);
+
+ BandwidthUsage getState() { return cc_estimator_.State(); }
+
+ void updateStats();
+
+ private:
+ TrendlineEstimator cc_estimator_;
+ std::map<uint32_t, FrameStats> chunks_;
+ std::queue<uint32_t> chunks_number_;
+
+ FrameStats last_processed_chunk_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
new file mode 100644
index 000000000..efba362d4
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service)
+ : probe_interval_(0),
+ max_probes_(0),
+ sent_probes_(0),
+ probe_timer_(std::make_unique<asio::steady_timer>(io_service)),
+ rand_eng_((std::random_device())()),
+ distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ),
+ send_probe_callback_(std::move(send_callback)) {}
+
+ProbeHandler::~ProbeHandler() {}
+
+uint64_t ProbeHandler::getRtt(uint32_t seq) {
+ auto it = pending_probes_.find(seq);
+
+ if (it == pending_probes_.end()) return 0;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t rtt = now - it->second;
+ if(rtt < 1) rtt = 1;
+
+ pending_probes_.erase(it);
+
+ return rtt;
+}
+
+void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) {
+ stopProbes();
+ probe_interval_ = probe_interval;
+ max_probes_ = max_probes;
+}
+
+void ProbeHandler::stopProbes() {
+ probe_interval_ = 0;
+ max_probes_ = 0;
+ sent_probes_ = 0;
+ probe_timer_->cancel();
+}
+
+void ProbeHandler::sendProbes() {
+ if (probe_interval_ == 0) return;
+ if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint32_t seq = distr_(rand_eng_);
+ pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+ send_probe_callback_(seq);
+ sent_probes_++;
+
+ // clean up
+ // a probe may get lost. if the pending_probes_ size becomes bigger than
+ // MAX_PENDING_PROBES remove all the probes older than a seconds
+ if (pending_probes_.size() > MAX_PENDING_PROBES) {
+ for (auto it = pending_probes_.begin(); it != pending_probes_.end();) {
+ if ((now - it->second) > 1000)
+ it = pending_probes_.erase(it);
+ else
+ it++;
+ }
+ }
+
+ if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sendProbes();
+ }
+ });
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
new file mode 100644
index 000000000..b8ed84445
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/config.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <random>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
+ public:
+ using SendProbeCallback = std::function<void(uint32_t)>;
+
+ public:
+ ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service);
+
+ ~ProbeHandler();
+
+ // if the function returns 0 the probe is not valaid
+ uint64_t getRtt(uint32_t seq);
+
+ // reset the probes parameters. it stop the current probing.
+ // to restar call sendProbes.
+ // probe_interval = 0 means that no event will be scheduled
+ // max_probe = 0 means no limit to the number of probe to send
+ void setProbes(uint32_t probe_interval, uint32_t max_probes);
+
+ // stop to schedule probes
+ void stopProbes();
+
+ void sendProbes();
+
+ private:
+ uint32_t probe_interval_; // us
+ uint32_t max_probes_; // packets
+ uint32_t sent_probes_; // packets
+
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+
+ // map from seqnumber to timestamp
+ std::unordered_map<uint32_t, uint64_t> pending_probes_;
+
+ // random generator
+ std::default_random_engine rand_eng_;
+ std::uniform_int_distribution<uint32_t> distr_;
+
+ SendProbeCallback send_probe_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
new file mode 100644
index 000000000..bb95ab686
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -0,0 +1,607 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <math.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
+ number_(0) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ scheduler_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {}
+
+void RTCTransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ newRound();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+// private
+void RTCTransportProtocol::initParams() {
+ portal_->setConsumerCallback(this);
+
+ rc_ = std::make_shared<RTCRateControlQueue>();
+ ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ std::bind(&RTCTransportProtocol::sendRtxInterest, this,
+ std::placeholders::_1),
+ portal_->getIoService());
+
+ state_ = std::make_shared<RTCState>(
+ std::bind(&RTCTransportProtocol::sendProbeInterest, this,
+ std::placeholders::_1),
+ std::bind(&RTCTransportProtocol::discoveredRtt, this),
+ portal_->getIoService());
+
+ rc_->setState(state_);
+ // TODO: for the moment we keep the congestion control disabled
+ // rc_->tunrOnRateControl();
+ ldr_->setState(state_);
+
+ // protocol state
+ start_send_interest_ = false;
+ current_state_ = SyncState::catch_up;
+
+ // Cancel timer
+ number_++;
+ round_timer_->cancel();
+ scheduler_timer_->cancel();
+ scheduler_timer_on_ = false;
+
+ // delete all timeouts and future nacks
+ timeouts_or_nacks_.clear();
+
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ // names/packets var
+ next_segment_ = 0;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ RTC_INTEREST_LIFETIME);
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ TRANSPORT_LOGD("reset called");
+ initParams();
+ newRound();
+}
+
+void RTCTransportProtocol::inactiveProducer() {
+ // when the producer is inactive we reset the consumer state
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
+ max_sync_win_);
+
+ // names/packets var
+ next_segment_ = 0;
+
+ ldr_->clear();
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
+ // TODO pass weak_ptr here
+ round_timer_->async_wait([this, n{number_}](std::error_code ec) {
+ if (ec) return;
+
+ if (n != number_) {
+ return;
+ }
+
+ // saving counters that will be reset on new round
+ uint32_t sent_retx = state_->getSentRtxInRound();
+ uint32_t received_bytes = state_->getReceivedBytesInRound();
+ uint32_t sent_interest = state_->getSentInterestInRound();
+ uint32_t lost_data = state_->getLostData();
+ uint32_t recovered_losses = state_->getRecoveredLosses();
+ uint32_t received_nacks = state_->getReceivedNacksInRound();
+
+ bool in_sync = (current_state_ == SyncState::in_sync);
+ state_->onNewRound((double)ROUND_LEN, in_sync);
+ rc_->onNewRound((double)ROUND_LEN);
+
+ // update sync state if needed
+ if (current_state_ == SyncState::in_sync) {
+ double cache_rate = state_->getPacketFromCacheRatio();
+ if (cache_rate > MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::catch_up;
+ }
+ } else {
+ double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate = state_->getReceivedRate();
+ uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
+ double cache_ratio = state_->getPacketFromCacheRatio();
+ if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
+ received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::in_sync;
+ }
+ }
+
+ TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ updateSyncWindow();
+
+ sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ recovered_losses, received_nacks);
+ newRound();
+ });
+}
+
+void RTCTransportProtocol::discoveredRtt() {
+ start_send_interest_ = true;
+ ldr_->turnOnRTX();
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::computeMaxSyncWindow() {
+ double production_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ if (production_rate == 0.0 || packet_size == 0.0) {
+ // the consumer has no info about the producer,
+ // keep the previous maxCWin
+ TRANSPORT_LOGD(
+ "Returning in computeMaxSyncWindow because: prod_rate: %d || "
+ "packet_size: %d",
+ (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ return;
+ }
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
+
+
+ max_sync_win_ =
+ (uint32_t)ceil((production_rate * lifetime_ms *
+ INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+}
+
+void RTCTransportProtocol::updateSyncWindow() {
+ computeMaxSyncWindow();
+
+ if (max_sync_win_ == INITIAL_WIN_MAX) {
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return;
+
+ current_sync_win_ = INITIAL_WIN;
+ scheduleNextInterests();
+ return;
+ }
+
+ double prod_rate = state_->getProducerRate();
+ double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = state_->getAveragePacketSize();
+
+ // if some of the info are not available do not update the current win
+ if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ current_sync_win_ +=
+ ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+
+ if(current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
+ }
+
+ current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ }
+
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::decreaseSyncWindow() {
+ // called on future nack
+ // we have a new sample of the production rate, so update max win first
+ computeMaxSyncWindow();
+ current_sync_win_--;
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::sendInterest(Name *interest_name) {
+ TRANSPORT_LOGD("Sending interest for name %s",
+ interest_name->toString().c_str());
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(*interest_name);
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ interest->setLifetime(uint32_t(lifetime));
+
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send rtx %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send probe %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ TRANSPORT_LOGD("Schedule next interests");
+
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return; // RTT discovering phase is not finished so
+ // do not start to send interests
+
+ if (scheduler_timer_on_) return; // wait befor send other interests
+
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ TRANSPORT_LOGD("Inactive producer.");
+ // here we keep seding the same interest until the producer
+ // does not start again
+ if (next_segment_ != 0) {
+ // the producer just become inactive, reset the state
+ inactiveProducer();
+ }
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ interest_name->setSuffix(next_segment_);
+
+ if (portal_->interestIsPending(*interest_name)) {
+ // if interest 0 is already pending we return
+ return;
+ }
+
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+ return;
+ }
+
+ TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
+ state_->getPendingInterestNumber(), current_sync_win_);
+
+ // skip nacked pacekts
+ if (next_segment_ <= state_->getLastSeqNacked()) {
+ next_segment_ = state_->getLastSeqNacked() + 1;
+ }
+
+ // skipe received packets
+ if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
+ next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ uint32_t sent_interests = 0;
+ while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ (sent_interests < MAX_INTERESTS_IN_BATCH)) {
+ TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ interest_name->setSuffix(next_segment_);
+
+ // send the packet only if:
+ // 1) it is not pending yet (not true for rtx)
+ // 2) the packet is not received or lost
+ // 3) is not in the rtx list
+ if (portal_->interestIsPending(*interest_name) ||
+ state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_segment_)) {
+ TRANSPORT_LOGD(
+ "skip interest %u because: pending %u, recv %u, rtx %u",
+ next_segment_, (portal_->interestIsPending(*interest_name)),
+ (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
+ (ldr_->isRtx(next_segment_)));
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ continue;
+ }
+
+
+ sent_interests++;
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ }
+
+ if (state_->getPendingInterestNumber() < current_sync_win_) {
+ // we still have space in the window but we already sent a batch of
+ // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
+ // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(
+ std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ }
+}
+
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ uint32_t segment_number = interest->getName().getSuffix();
+
+ TRANSPORT_LOGD("timeout for packet %u", segment_number);
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ timeouts_or_nacks_.insert(segment_number);
+
+ if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
+ segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ // we retransmit packets only if the producer is active, otherwise we
+ // use timeouts to avoid to send too much traffic
+ //
+ // a timeout is sent using RTX only if it is an old packet. if it is for a
+ // seq number that we didn't reach yet, we send the packet using the normal
+ // schedule next interest
+ TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
+ ldr_->onTimeout(segment_number);
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+ return;
+ }
+
+ TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
+ segment_number);
+
+ if (segment_number < next_segment_) {
+ // this is a timeout for a packet that will be generated in the future but
+ // we are asking for higher sequence numbers. we need to go back like in the
+ // case of future nacks
+ TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
+ next_segment_, segment_number);
+ next_segment_ = segment_number;
+ }
+
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ struct nack_packet_t *nack =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = nack->getProductionSegement();
+ uint32_t nack_segment = content_object.getName().getSuffix();
+ bool is_rtx = ldr_->isRtx(nack_segment);
+
+ // check if the packet got a timeout
+
+ TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
+ production_seg);
+
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(nack_segment);
+ if (tn_it != timeouts_or_nacks_.end() || is_rtx) {
+ compute_stats = false;
+ // remove packets from timeouts_or_nacks only in case of a past nack
+ }
+
+ state_->onNackPacketReceived(content_object, compute_stats);
+ ldr_->onNackPacketReceived(content_object);
+
+ // both in case of past and future nack we set next_segment_ equal to the
+ // production segment in the nack. In case of past nack we will skip unneded
+ // interest (this is already done in the scheduleNextInterest in any case)
+ // while in case of future nacks we can go back in time and ask again for the
+ // content that generated the nack
+ TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ if (production_seg > nack_segment) {
+ // remove the nack is it exists
+ if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+
+ // the client is asking for content in the past
+ // switch to catch up state and increase the window
+ // this is true only if the packet is not an RTX
+ if (!is_rtx) current_state_ = SyncState::catch_up;
+
+ updateSyncWindow();
+ } else {
+ // if production_seg == nack_segment we consider this a future nack, since
+ // production_seg is not yet created. this may happen in case of low
+ // production rate (e.g. ping at 1pps)
+
+ // if a future nack was also retransmitted add it to the timeout_or_nacks
+ // set
+ if (is_rtx) timeouts_or_nacks_.insert(nack_segment);
+
+ // the client is asking for content in the future
+ // switch to in sync state and decrease the window
+ current_state_ = SyncState::in_sync;
+ decreaseSyncWindow();
+ }
+}
+
+void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
+ bool valid = state_->onProbePacketReceived(content_object);
+ if(!valid) return;
+
+ struct nack_packet_t *probe =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = probe->getProductionSegement();
+
+ // as for the nacks set next_segment_
+ TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ ldr_->onProbePacketReceived(content_object);
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("Received content object of size: %zu",
+ content_object.payloadSize());
+ uint32_t payload_size = content_object.payloadSize();
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onProbe(content_object);
+ return;
+ }
+
+ if (payload_size == NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onNack(content_object);
+ return;
+ }
+
+ TRANSPORT_LOGD("Received content %u", segment_number);
+
+ rc_->onDataPacketReceived(content_object);
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(segment_number);
+ if (tn_it != timeouts_or_nacks_.end()) {
+ compute_stats = false;
+ timeouts_or_nacks_.erase(tn_it);
+ }
+ if (ldr_->isRtx(segment_number)) {
+ compute_stats = false;
+ }
+
+ // check if the packet was already received
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ state_->onDataPacketReceived(content_object, compute_stats);
+ ldr_->onDataPacketReceived(content_object);
+
+ // if the stat for this seq number is received do not send the packet to app
+ if (state != PacketState::RECEIVED) {
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ reassemble(content_object);
+ } else {
+ TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ }
+
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::sendStatsToApp(
+ uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
+ uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ if (*stats_summary_) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(state_->getQueuing());
+
+ // stats_->updateInterestFecTx(0); //todo must be implemented
+ // stats_->updateBytesFecRecv(0); //todo must be implemented
+
+ stats_->updateRetxCount(retx_count);
+ stats_->updateBytesRecv(received_bytes);
+ stats_->updateInterestTx(sent_interests);
+ stats_->updateReceivedNacks(received_nacks);
+
+ stats_->updateAverageWindowSize(current_sync_win_);
+ stats_->updateLossRatio(state_->getLossRate());
+ stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateLostData(lost_data);
+ stats_->updateRecoveredData(recovered_losses);
+ stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ }
+}
+
+void RTCTransportProtocol::reassemble(ContentObject &content_object) {
+ auto read_buffer = content_object.getPayload();
+ TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
+ read_buffer->trimStart(DATA_HEADER_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
new file mode 100644
index 000000000..596887067
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_state.h>
+#include <protocols/transport_protocol.h>
+
+#include <unordered_set>
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
+ public:
+ RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ using TransportProtocol::start;
+
+ using TransportProtocol::stop;
+
+ void resume() override;
+
+ private:
+ enum class SyncState { catch_up = 0, in_sync = 1, last };
+
+ private:
+ // setup functions
+ void initParams();
+ void reset() override;
+
+ void inactiveProducer();
+
+ // protocol functions
+ void discoveredRtt();
+ void newRound();
+
+ // window functions
+ void computeMaxSyncWindow();
+ void updateSyncWindow();
+ void decreaseSyncWindow();
+
+ // packet functions
+ void sendInterest(Name *interest_name);
+ void sendRtxInterest(uint32_t seq);
+ void sendProbeInterest(uint32_t seq);
+ void scheduleNextInterests() override;
+ void onTimeout(Interest::Ptr &&interest) override;
+ void onNack(const ContentObject &content_object);
+ void onProbe(const ContentObject &content_object);
+ void reassemble(ContentObject &content_object) override;
+ void onContentObject(Interest &interest,
+ ContentObject &content_object) override;
+ void onPacketDropped(Interest &interest,
+ ContentObject &content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
+
+ // interaction with app functions
+ void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
+ uint32_t sent_interests, uint32_t lost_data,
+ uint32_t recovered_losses, uint32_t received_nacks);
+ // protocol state
+ bool start_send_interest_;
+ SyncState current_state_;
+ // cwin vars
+ uint32_t current_sync_win_;
+ uint32_t max_sync_win_;
+
+ // controller var
+ std::unique_ptr<asio::steady_timer> round_timer_;
+ std::unique_ptr<asio::steady_timer> scheduler_timer_;
+ bool scheduler_timer_on_;
+
+ // timeouts
+ std::unordered_set<uint32_t> timeouts_or_nacks_;
+
+ // names/packets var
+ uint32_t next_segment_;
+
+ std::shared_ptr<RTCState> state_;
+ std::shared_ptr<RTCRateControl> rc_;
+ std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
+
+ uint32_t number_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
new file mode 100644
index 000000000..0cf9516ab
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/rtc/rtc_packet.h>
+#include <stdint.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// used in rtc
+// protocol consts
+const uint32_t ROUND_LEN = 200;
+// ms interval of time on which
+// we take decisions / measurements
+const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
+// how big (in ms) should be the buffer at the producer.
+// increasing this number we increase the time that an
+// interest will wait for the data packet to be produced
+// at the producer socket
+const uint32_t PRODUCER_BUFFER_MS = 200; // ms
+
+// interest scheduler
+const uint32_t MAX_INTERESTS_IN_BATCH = 5;
+const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+
+// packet const
+const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes
+const uint32_t RTC_INTEREST_LIFETIME = 1000;
+
+// probes sequence range
+const uint32_t MIN_PROBE_SEQ = 0xefffffff;
+const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
+// RTT_PROBE_INTERVAL will be used during the section while
+// INIT_RTT_PROBE_INTERVAL is used at the beginning to
+// quickily estimate the RTT
+const uint32_t RTT_PROBE_INTERVAL = 200000; // us
+const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
+const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
+// if the produdcer is not yet started we need to probe multple times
+// to get an answer. we wait 100ms between each try
+const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
+// once we get the first probe we wait at most 60ms for the others
+const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+// we reuires at least 5 probes to be recevied
+const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms
+const uint32_t MAX_PENDING_PROBES = 10;
+
+
+// congestion
+const double MAX_QUEUING_DELAY = 100.0; // ms
+
+// data from cache
+const double MAX_DATA_FROM_CACHE = 0.25; // 25%
+
+// window const
+const uint32_t INITIAL_WIN = 5; // pkts
+const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
+const uint32_t WIN_MIN = 5; // pkts
+const double CATCH_UP_WIN_INCREMENT = 1.2;
+// used in rate control
+const double WIN_DECREASE_FACTOR = 0.5;
+const double WIN_INCREASE_FACTOR = 1.5;
+
+// round in congestion
+const double ROUNDS_BEFORE_TAKE_ACTION = 5;
+
+// used in state
+const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3;
+const double PRODUCTION_RATE_FRACTION = 0.8;
+
+const uint32_t INIT_PACKET_SIZE = 1200;
+
+const double MOVING_AVG_ALPHA = 0.8;
+
+const double MILLI_IN_A_SEC = 1000.0;
+const double MICRO_IN_A_SEC = 1000000.0;
+
+const double MAX_CACHED_PACKETS = 262144; // 2^18
+ // about 50 sec of traffic at 50Mbps
+ // with 1200 bytes packets
+
+const uint32_t MAX_ROUND_WHIOUT_PACKETS =
+ (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
+
+// used in ldr
+const uint32_t RTC_MAX_RTX = 100;
+const uint32_t RTC_MAX_AGE = 60000; // in ms
+const uint64_t MAX_TIMER_RTX = ~0;
+const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
+const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
+const double CATCH_UP_RTT_INCREMENT = 1.2;
+
+// used by producer
+const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
+const uint32_t MIN_PRODUCTION_RATE = 10; // pps
+ // min prod rate
+ // set running several test
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
new file mode 100644
index 000000000..c098088a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_data_path.h>
+#include <stdlib.h>
+
+#include <algorithm>
+#include <cfloat>
+#include <chrono>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCDataPath::RTCDataPath(uint32_t path_id)
+ : path_id_(path_id),
+ min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(DBL_MAX),
+ queuing_delay(DBL_MAX),
+ jitter_(0.0),
+ last_owd_(0),
+ largest_recv_seq_(0),
+ largest_recv_seq_time_(0),
+ avg_inter_arrival_(DBL_MAX),
+ received_nacks_(false),
+ received_packets_(false),
+ rounds_without_packets_(0),
+ last_received_data_packet_(0),
+ RTT_history_(HISTORY_LEN),
+ OWD_history_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(uint64_t rtt) {
+ // for the rtt we only keep track of the min one
+ if (rtt < min_rtt) min_rtt = rtt;
+ last_received_data_packet_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ if (avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ int64_t queueVal = owd - std::min(getMinOwd(), min_owd);
+
+ if (queuing_delay != DBL_MAX)
+ queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC);
+ else {
+ queuing_delay = queueVal;
+ }
+
+ // keep track of the jitter computed as for RTP (RFC 3550)
+ int64_t diff = std::abs(owd - last_owd_);
+ last_owd_ = owd;
+ jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
+
+ // owd is computed only for valid data packets so we count only
+ // this for decide if we recevie traffic or not
+ received_packets_ = true;
+}
+
+void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
+ // got packet in sequence, compute gap
+ if (largest_recv_seq_ == (segment_number - 1)) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t delta = now - largest_recv_seq_time_;
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ = now;
+ if (avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ =
+ (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
+ return;
+ }
+
+ // ooo packet, update the stasts if needed
+ if (largest_recv_seq_ <= segment_number) {
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+}
+
+void RTCDataPath::receivedNack() { received_nacks_ = true; }
+
+double RTCDataPath::getInterArrivalGap() {
+ if (avg_inter_arrival_ == DBL_MAX) return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isActive() {
+ if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
+
+bool RTCDataPath::pathToProducer() {
+ if (received_nacks_) return true;
+ return false;
+}
+
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ if (min_rtt == 0) min_rtt = 1;
+
+ RTT_history_.pushBack(min_rtt);
+ min_rtt = UINT_MAX;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ if (min_owd != INT_MAX) {
+ OWD_history_.pushBack(min_owd);
+ min_owd = INT_MAX;
+ }
+
+ if (!received_packets_)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = false;
+}
+
+uint32_t RTCDataPath::getPathId() { return path_id_; }
+
+double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+
+uint64_t RTCDataPath::getMinRtt() {
+ if (RTT_history_.size() != 0) return RTT_history_.begin();
+ return 0;
+}
+
+int64_t RTCDataPath::getMinOwd() {
+ if (OWD_history_.size() != 0) return OWD_history_.begin();
+ return 0;
+}
+
+double RTCDataPath::getJitter() { return jitter_; }
+
+uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; }
+
+void RTCDataPath::clearRtt() { RTT_history_.clear(); }
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
new file mode 100644
index 000000000..c5c37fc0d
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <utils/min_filter.h>
+
+#include <climits>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const double ALPHA_RTC = 0.125;
+const uint32_t HISTORY_LEN = 20; // 4 sec
+
+class RTCDataPath {
+ public:
+ RTCDataPath(uint32_t path_id);
+
+ public:
+ void insertRttSample(uint64_t rtt);
+ void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segment_number);
+ void receivedNack();
+
+ uint32_t getPathId();
+ uint64_t getMinRtt();
+ double getQueuingDealy();
+ double getInterArrivalGap();
+ double getJitter();
+ bool isActive();
+ bool pathToProducer();
+ uint64_t getLastPacketTS();
+
+ void clearRtt();
+
+ void roundEnd();
+
+ private:
+ uint32_t path_id_;
+
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ double jitter_;
+ int64_t last_owd_;
+
+ uint32_t largest_recv_seq_;
+ uint64_t largest_recv_seq_time_;
+ double avg_inter_arrival_;
+
+ // flags to check if a path is active
+ // we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ // and if we receives packets
+ bool received_nacks_;
+ bool received_packets_;
+ uint8_t rounds_without_packets_; // if we don't get any packet
+ // for MAX_ROUNDS_WITHOUT_PKTS
+ // we consider the path inactive
+ uint64_t last_received_data_packet_; // timestamp for the last data received
+ // on this path
+
+ utils::MinFilter<uint64_t> RTT_history_;
+ utils::MinFilter<int64_t> OWD_history_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
new file mode 100644
index 000000000..e91b29c04
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_ldr.h>
+
+#include <algorithm>
+#include <unordered_set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
+ SendRtxCallback &&callback, asio::io_service &io_service)
+ : rtx_on_(false),
+ next_rtx_timer_(MAX_TIMER_RTX),
+ last_event_(0),
+ sentinel_timer_interval_(MAX_TIMER_RTX),
+ send_rtx_callback_(std::move(callback)) {
+ timer_ = std::make_unique<asio::steady_timer>(io_service);
+ sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
+
+void RTCLossDetectionAndRecovery::turnOnRTX() {
+ rtx_on_ = true;
+ scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
+}
+
+void RTCLossDetectionAndRecovery::turnOffRTX() {
+ rtx_on_ = false;
+ clear();
+}
+
+void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
+ // always add timeouts to the RTX list to avoid to send the same packet as if
+ // it was not a rtx
+ addToRetransmissions(seq, seq + 1);
+ last_event_ = getNow();
+}
+
+void RTCLossDetectionAndRecovery::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ last_event_ = getNow();
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (deleteRtx(seq)) {
+ state_->onPacketRecovered(seq);
+ } else {
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ TRANSPORT_LOGD("received data. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onNackPacketReceived(
+ const core::ContentObject &nack) {
+ last_event_ = getNow();
+
+ uint32_t seq = nack.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+
+ if (production_seq > seq) {
+ // this is a past nack, all data before productionSeq are lost. if
+ // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to
+ // recover any packet. If this is not the case we can try to recover the
+ // packets between state_->getHighestSeqReceivedInOrder() and productionSeq.
+ // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with
+ // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
+ // 14 that are not arrived yet
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received past nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ } else {
+ // future nack. here there should be a gap between the last data received
+ // and this packet and is it possible to recover the packets between the
+ // last received data and the production seq. we should not use the seq
+ // number of the nack since we know that is too early to ask for this seq
+ // number
+ // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
+ // with productionSeq = 18. this says that all the packets between 12 and 18
+ // may got lost and we should ask them
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received futrue nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onProbePacketReceived(
+ const core::ContentObject &probe) {
+ // we don't log the reception of a probe packet for the sentinel timer because
+ // probes are not taken into account into the sync window. we use them as
+ // future nacks to detect possible packets lost
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ TRANSPORT_LOGD("received probe. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+}
+
+void RTCLossDetectionAndRecovery::clear() {
+ rtx_state_.clear();
+ rtx_timers_.clear();
+ sentinel_timer_->cancel();
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ timer_->cancel();
+ }
+}
+
+void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
+ uint32_t stop) {
+ // skip nacked packets
+ if (start <= state_->getLastSeqNacked()) {
+ start = state_->getLastSeqNacked() + 1;
+ }
+
+ // skip received or lost packets
+ if (start <= state_->getHighestSeqReceivedInOrder()) {
+ start = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ for (uint32_t seq = start; seq < stop; seq++) {
+ if (!isRtx(seq) && // is not already an rtx
+ // is not received or lost
+ state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
+ // add rtx
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq,
+ (state.next_send_ - getNow()));
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ }
+ }
+ scheduleNextRtx();
+}
+
+uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
+ bool new_rtx) {
+ uint64_t now = getNow();
+ if (new_rtx) {
+ // for the new rtx we wait one estimated IAT after the loss detection. this
+ // is bacause, assuming that packets arrive with a constant IAT, we should
+ // get a new packet every IAT
+ double prod_rate = state_->getProducerRate();
+ uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
+ uint32_t jitter = 0;
+
+ if (prod_rate != 0) {
+ double packet_size = state_->getAveragePacketSize();
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
+ }
+
+ uint32_t wait = estimated_iat + jitter;
+ TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter);
+
+ return now + wait;
+ } else {
+ // wait one RTT
+ // however if the IAT is larger than the RTT, wait one IAT
+ uint32_t wait = SENTINEL_TIMER_INTERVAL;
+
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate == 0) {
+ return now + SENTINEL_TIMER_INTERVAL;
+ }
+
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+
+ uint64_t rtt = state_->getRTT();
+ if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
+ wait = rtt;
+
+ if (estimated_iat > rtt) wait = estimated_iat;
+
+ uint32_t jitter = ceil(state_->getJitter());
+ wait += jitter;
+
+ // it may happen that the channel is congested and we have some additional
+ // queuing delay to take into account
+ uint32_t queue = ceil(state_->getQueuing());
+ wait += queue;
+
+ TRANSPORT_LOGD(
+ "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter, queue);
+
+ return now + wait;
+ }
+}
+
+void RTCLossDetectionAndRecovery::retransmit() {
+ if (rtx_timers_.size() == 0) return;
+
+ uint64_t now = getNow();
+
+ auto it = rtx_timers_.begin();
+ std::unordered_set<uint32_t> lost_pkt;
+ uint32_t sent_counter = 0;
+ while (it != rtx_timers_.end() && it->first <= now &&
+ sent_counter < MAX_INTERESTS_IN_BATCH) {
+ uint32_t seq = it->second;
+ auto rtx_it =
+ rtx_state_.find(seq); // this should always return a valid iter
+ if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
+ (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
+ seq < state_->getLastSeqNacked()) {
+ // max rtx reached or packet too old or packet nacked, this packet is lost
+ TRANSPORT_LOGD(
+ "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u",
+ seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX),
+ ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE),
+ (seq < state_->getLastSeqNacked()));
+ lost_pkt.insert(seq);
+ it++;
+ } else {
+ // resend the packet
+ state_->onRetransmission(seq);
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) rtx_it->second.rtx_count_++;
+ rtx_it->second.next_send_ = computeNextSend(seq, false);
+ it = rtx_timers_.erase(it);
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
+ TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq,
+ (rtx_it->second.next_send_ - now));
+ send_rtx_callback_(seq);
+ sent_counter++;
+ }
+ }
+
+ // remove packets if needed
+ for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
+ uint32_t seq = *lost_it;
+ state_->onPacketLost(seq);
+ deleteRtx(seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::scheduleNextRtx() {
+ if (rtx_timers_.size() == 0) {
+ // all the rtx were removed, reset timer
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ return;
+ }
+
+ // check if timer is alreay set
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ // a new check for rtx is already scheduled
+ if (next_rtx_timer_ > rtx_timers_.begin()->first) {
+ // we need to re-schedule it
+ timer_->cancel();
+ } else {
+ // wait for the next timer
+ return;
+ }
+ }
+
+ // set a new timer
+ next_rtx_timer_ = rtx_timers_.begin()->first;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t wait = 1;
+ if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
+ wait = next_rtx_timer_ - now;
+
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ timer_->expires_from_now(std::chrono::milliseconds(wait));
+ timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->retransmit();
+ s->next_rtx_timer_ = MAX_TIMER_RTX;
+ s->scheduleNextRtx();
+ }
+ });
+}
+
+bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) {
+ auto it_rtx = rtx_state_.find(seq);
+ if (it_rtx == rtx_state_.end()) return false; // rtx not found
+
+ uint64_t ts = it_rtx->second.next_send_;
+ auto it_timers = rtx_timers_.find(ts);
+ while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
+ if (it_timers->second == seq) {
+ rtx_timers_.erase(it_timers);
+ break;
+ }
+ it_timers++;
+ }
+
+ bool lost = it_rtx->second.rtx_count_ > 0;
+ rtx_state_.erase(it_rtx);
+
+ return lost;
+}
+
+void RTCLossDetectionAndRecovery::scheduleSentinelTimer(
+ uint64_t expires_from_now) {
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ sentinel_timer_->expires_from_now(
+ std::chrono::milliseconds(expires_from_now));
+ sentinel_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sentinelTimer();
+ }
+ });
+}
+
+void RTCLossDetectionAndRecovery::sentinelTimer() {
+ uint64_t now = getNow();
+
+ bool expired = false;
+ bool sent = false;
+ if ((now - last_event_) >= sentinel_timer_interval_) {
+ // at least a sentinel_timer_interval_ elapsed since last event
+ expired = true;
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ // this happens at the beginning (or if the producer stops for some
+ // reason) we need to keep sending interest 0 until we get an answer
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is not active, send packet 0");
+ state_->onRetransmission(0);
+ send_rtx_callback_(0);
+ } else {
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is active, send the 10 oldest packets");
+ sent = true;
+ uint32_t rtx = 0;
+ auto it = state_->getPendingInterestsMapBegin();
+ auto end = state_->getPendingInterestsMapEnd();
+ while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
+ uint32_t seq = it->first;
+ TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq);
+ addToRetransmissions(seq, seq + 1);
+ rtx++;
+ it++;
+ }
+ }
+ } else {
+ // sentinel timer did not expire because we registered at least one event
+ }
+
+ uint32_t next_timer;
+ double prod_rate = state_->getProducerRate();
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
+ TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL);
+ next_timer = SENTINEL_TIMER_INTERVAL;
+ } else {
+ double prod_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t jitter = ceil(state_->getJitter());
+
+ // try to reduce the number of timers if the estimated IAT is too small
+ next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
+ TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u",
+ next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat,
+ jitter);
+
+ if (!expired) {
+ // discount the amout of time that is already passed
+ uint32_t discount = now - last_event_;
+ if (next_timer > discount) {
+ next_timer = next_timer - discount;
+ } else {
+ // in this case we trigger the timer in 1 ms
+ next_timer = 1;
+ }
+ TRANSPORT_LOGD("timer after discout: %u", next_timer);
+ } else if (sent) {
+ // wait at least one producer stats interval + owd to check if the
+ // production rate is reducing.
+ uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
+ next_timer = std::max(next_timer, min_wait);
+ TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer);
+ }
+ }
+
+ scheduleSentinelTimer(next_timer);
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
new file mode 100644
index 000000000..c0912303b
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCLossDetectionAndRecovery
+ : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> {
+ struct rtx_state_ {
+ uint64_t first_send_;
+ uint64_t next_send_;
+ uint32_t rtx_count_;
+ };
+
+ using rtxState = struct rtx_state_;
+ using SendRtxCallback = std::function<void(uint32_t)>;
+
+ public:
+ RTCLossDetectionAndRecovery(SendRtxCallback &&callback,
+ asio::io_service &io_service);
+
+ ~RTCLossDetectionAndRecovery();
+
+ void setState(std::shared_ptr<RTCState> state) { state_ = state; }
+ void turnOnRTX();
+ void turnOffRTX();
+
+ void onTimeout(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+ void onNackPacketReceived(const core::ContentObject &nack);
+ void onProbePacketReceived(const core::ContentObject &probe);
+
+ void clear();
+
+ bool isRtx(uint32_t seq) {
+ if (rtx_state_.find(seq) != rtx_state_.end()) return true;
+ return false;
+ }
+
+ private:
+ void addToRetransmissions(uint32_t start, uint32_t stop);
+ uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ void retransmit();
+ void scheduleNextRtx();
+ bool deleteRtx(uint32_t seq);
+ void scheduleSentinelTimer(uint64_t expires_from_now);
+ void sentinelTimer();
+
+ uint64_t getNow() {
+ using namespace std::chrono;
+ uint64_t now =
+ duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
+ .count();
+ return now;
+ }
+
+ // this map keeps track of the retransmitted interest, ordered from the oldest
+ // to the newest one. the state contains the timer of the first send of the
+ // interest (from pendingIntetests_), the timer of the next send (key of the
+ // multimap) and the number of rtx
+ std::map<uint32_t, rtxState> rtx_state_;
+ // this map stored the rtx by timer. The key is the time at which the rtx
+ // should be sent, and the val is the interest seq number
+ std::multimap<uint64_t, uint32_t> rtx_timers_;
+
+ bool rtx_on_;
+ uint64_t next_rtx_timer_;
+ uint64_t last_event_;
+ uint64_t sentinel_timer_interval_;
+ std::unique_ptr<asio::steady_timer> timer_;
+ std::unique_ptr<asio::steady_timer> sentinel_timer_;
+ std::shared_ptr<RTCState> state_;
+
+ SendRtxCallback send_rtx_callback_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
new file mode 100644
index 000000000..abb1323a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+/* data packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | payload |
+ * | ... |
+ */
+
+/* nack packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | uint32_t: current seg in production |
+ * +-----------------------------------------+
+ */
+
+#pragma once
+#include <arpa/inet.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
+
+const uint32_t DATA_HEADER_SIZE = 12; // bytes
+ // XXX: sizeof(data_packet_t) is 16
+ // beacuse of padding
+const uint32_t NACK_HEADER_SIZE = 16;
+
+struct data_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+};
+
+struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h
new file mode 100644
index 000000000..34d090092
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> {
+ public:
+ RTCRateControl()
+ : rc_on_(false),
+ congestion_win_(1000000), // init the win to a large number
+ congestion_state_(CongestionState::Normal),
+ protocol_state_(nullptr) {}
+
+ virtual ~RTCRateControl() = default;
+
+ void turnOnRateControl() { rc_on_ = true; }
+ void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; };
+ uint32_t getCongesionWindow() { return congestion_win_; };
+
+ virtual void onNewRound(double round_len) = 0;
+ virtual void onDataPacketReceived(
+ const core::ContentObject &content_object) = 0;
+
+ protected:
+ enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last };
+
+ protected:
+ bool rc_on_;
+ uint32_t congestion_win_;
+ CongestionState congestion_state_;
+
+ std::shared_ptr<RTCState> protocol_state_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
new file mode 100644
index 000000000..b577b5bea
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_frame.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {}
+
+RTCRateControlFrame::~RTCRateControlFrame() {}
+
+void RTCRateControlFrame::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ CongestionState prev_congestion_state = congestion_state_;
+ cc_detector_.updateStats();
+ congestion_state_ = (CongestionState)cc_detector_.getState();
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // congestion detected, notify app and init congestion win
+ double prod_rate = protocol_state_->getReceivedRate();
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+
+ if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) {
+ // TODO do something
+ return;
+ }
+
+ congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ }
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ return;
+ }
+}
+
+void RTCRateControlFrame::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ if (!rc_on_) return;
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (!protocol_state_->isPending(seq)) return;
+
+ cc_detector_.addPacket(content_object);
+}
+
+void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts,
+ uint64_t last_probe_ts,
+ uint32_t total_probes) {
+ // TODO
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h
new file mode 100644
index 000000000..25d5ddbb6
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/congestion_detection.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlFrame : public RTCRateControl {
+ public:
+ RTCRateControlFrame();
+
+ ~RTCRateControlFrame();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts,
+ uint32_t total_probes);
+
+ private:
+ CongestionDetection cc_detector_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
new file mode 100644
index 000000000..a1c89e329
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlQueue::RTCRateControlQueue()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ last_queue_(0) {}
+
+RTCRateControlQueue::~RTCRateControlQueue() {}
+
+void RTCRateControlQueue::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate();
+ double target_rate =
+ protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ if (prev_congestion_state == CongestionState::Normal &&
+ received_rate >= target_rate) {
+ // if the queue is high in this case we are most likelly fighting with
+ // a TCP flow and there is enough bandwidth to match the producer rate
+ congestion_state_ = CongestionState::Normal;
+ } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ // here we detect congestion. in the case that last_queue == queue
+ // the consumer didn't receive any packet from the producer so we
+ // consider this case as congestion
+ // TODO: wath happen in case of high loss rate?
+ congestion_state_ = CongestionState::Congested;
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+}
+
+void RTCRateControlQueue::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h
new file mode 100644
index 000000000..407354d43
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlQueue : public RTCRateControl {
+ public:
+ RTCRateControlQueue();
+
+ ~RTCRateControlQueue();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
new file mode 100644
index 000000000..eabf8942c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -0,0 +1,560 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service)
+ : rtt_probes_(std::make_shared<ProbeHandler>(
+ std::move(rtt_probes_callback), io_service)),
+ discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
+ init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
+ initParams();
+}
+
+RTCState::~RTCState() {}
+
+void RTCState::initParams() {
+ // packets counters (total)
+ sent_interests_ = 0;
+ sent_rtx_ = 0;
+ received_data_ = 0;
+ received_nacks_ = 0;
+ received_timeouts_ = 0;
+ received_probes_ = 0;
+
+ // loss counters
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = 0;
+ highest_seq_received_ = 0;
+ highest_seq_received_in_order_ = 0;
+ last_seq_nacked_ = 0;
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ // bw counters
+ received_bytes_ = 0;
+ avg_packet_size_ = INIT_PACKET_SIZE;
+ production_rate_ = 0.0;
+ received_rate_ = 0.0;
+
+ // nack counter
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ // packets counter
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ data_from_cache_rate_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ // round conunters
+ rounds_ = 0;
+ rounds_without_nacks_ = 0;
+ rounds_without_packets_ = 0;
+
+ last_production_seq_ = 0;
+ producer_is_active_ = false;
+ last_prod_update_ = 0;
+
+ // paths stats
+ path_table_.clear();
+ main_path_ = nullptr;
+
+ // packet received
+ received_or_lost_packets_.clear();
+
+ // pending interests
+ pending_interests_.clear();
+
+ // init rtt
+ first_interest_sent_ = ~0;
+ init_rtt_ = false;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+}
+
+// packet events
+void RTCState::onSendNewInterest(const core::Name *interest_name) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint32_t seq = interest_name->getSuffix();
+ pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+
+ if(sent_interests_ == 0) first_interest_sent_ = now;
+
+ sent_interests_++;
+ sent_interests_last_round_++;
+}
+
+void RTCState::onTimeout(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ }
+ received_timeouts_++;
+}
+
+void RTCState::onRetransmission(uint32_t seq) {
+ // remove the interest for the pendingInterest map only after the first rtx.
+ // in this way we can handle the ooo packets that come in late as normla
+ // packet. we consider a packet lost only if we sent at least an RTX for it.
+ // XXX this may become problematic if we stop the RTX transmissions
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ packets_lost_++;
+ }
+ sent_rtx_++;
+ sent_rtx_last_round_++;
+}
+
+void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) {
+ uint32_t seq = content_object.getName().getSuffix();
+ if (compute_stats) {
+ updatePathStats(content_object, false);
+ received_data_last_round_++;
+ }
+ received_data_++;
+
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t production_time = data_pkt->getTimestamp();
+ if (last_prod_update_ < production_time) {
+ last_prod_update_ = production_time;
+ uint32_t production_rate = data_pkt->getProductionRate();
+ production_rate_ = (double)production_rate;
+ }
+
+ updatePacketSize(content_object);
+ updateReceivedBytes(content_object);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats) {
+ uint32_t seq = nack.getName().getSuffix();
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint64_t production_time = nack_pkt->getTimestamp();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_rate = nack_pkt->getProductionRate();
+
+ if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
+ last_prod_update_ < production_time) {
+ // update production rate
+ last_prod_update_ = production_time;
+ last_production_seq_ = production_seq;
+ production_rate_ = (double)production_rate;
+ }
+
+ if (compute_stats) {
+ // this is not an RTX
+ updatePathStats(nack, true);
+ nack_on_last_round_ = true;
+ }
+
+ // for statistics pourpose we log all nacks, also the one received for
+ // retransmitted packets
+ received_nacks_++;
+ received_nacks_last_round_++;
+
+ if (production_seq > seq) {
+ // old nack, seq is lost
+ // update last nacked
+ if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
+ TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq);
+ onPacketLost(seq);
+ } else if (seq > production_seq) {
+ // future nack
+ // remove the nack from the pending interest map
+ // (the packet is not received/lost yet)
+ pending_interests_.erase(seq);
+ } else {
+ // this should be a quite rear event. simply remove the
+ // packet from the pending interest list
+ pending_interests_.erase(seq);
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence number is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onPacketLost(uint32_t seq) {
+ TRANSPORT_LOGD("packet %u is lost", seq);
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ // this packet was never retransmitted so it does
+ // not appear in the loss count
+ packets_lost_++;
+ }
+ addRecvOrLost(seq, PacketState::LOST);
+}
+
+void RTCState::onPacketRecovered(uint32_t seq) {
+ losses_recovered_++;
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ uint64_t rtt;
+
+ rtt = rtt_probes_->getRtt(seq);
+
+ if (rtt == 0) return false; // this is not a valid probe
+
+ // like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to esimate
+ // info on the path
+ uint32_t path_label = probe.getPathLabel();
+
+ auto path_it = path_table_.find(path_label);
+
+ // update production rate and last_seq_nacked like in case of a nack
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint64_t sender_timestamp = probe_pkt->getTimestamp();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ uint32_t production_rate = probe_pkt->getProductionRate();
+
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ path->insertRttSample(rtt);
+ path->receivedNack();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ if (last_prod_update_ < sender_timestamp) {
+ last_production_seq_ = production_seq;
+ last_prod_update_ = sender_timestamp;
+ production_rate_ = (double)production_rate;
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence numner is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ // check for init RTT. if received_probes_ is equal to 0 schedule a timer to
+ // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
+ // wait forever
+ received_probes_++;
+
+ if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){
+ if(received_probes_ == 1){
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others
+ main_path_ = path;
+ setInitRttTimer(INIT_RTT_PROBE_WAIT);
+ }
+ if(received_probes_ == INIT_RTT_PROBES) {
+ // we are done
+ init_rtt_timer_->cancel();
+ checkInitRttTimer();
+ }
+ }
+
+ received_packets_last_round_++;
+
+ // ignore probes sent before the first interest
+ if((now - rtt) <= first_interest_sent_) return false;
+ return true;
+}
+
+void RTCState::onNewRound(double round_len, bool in_sync) {
+ // XXX
+ // here we take into account only the single path case so we assume that we
+ // don't use two paths in parellel for this single flow
+
+ if (path_table_.empty()) return;
+
+ double bytes_per_sec =
+ ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
+ if(received_rate_ == 0)
+ received_rate_ = bytes_per_sec;
+ else
+ received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+
+ // search for an active path. There should be only one active path (meaning a
+ // path that leads to the producer socket -no cache- and from which we are
+ // currently getting data packets) at any time. However it may happen that
+ // there are mulitple active paths in case of mobility (the old path will
+ // remain active for a short ammount of time). The main path is selected as
+ // the active path from where the consumer received the latest data packet
+
+ uint64_t last_packet_ts = 0;
+ main_path_ = nullptr;
+
+ for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->roundEnd();
+ if (it->second->isActive()) {
+ uint64_t ts = it->second->getLastPacketTS();
+ if (ts > last_packet_ts) {
+ last_packet_ts = ts;
+ main_path_ = it->second;
+ }
+ }
+ }
+
+ if (in_sync) updateLossRate();
+
+ // handle nacks
+ if (!nack_on_last_round_ && received_bytes_ > 0) {
+ rounds_without_nacks_++;
+ } else {
+ rounds_without_nacks_ = 0;
+ }
+
+ // check if the producer is active
+ if (received_packets_last_round_ != 0) {
+ rounds_without_packets_ = 0;
+ } else {
+ rounds_without_packets_++;
+ if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS &&
+ producer_is_active_ != false) {
+ initParams();
+ }
+ }
+
+ // compute cache/producer ratio
+ if (received_data_last_round_ != 0) {
+ double new_rate =
+ (double)received_data_from_cache_ / (double)received_data_last_round_;
+ data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
+ (new_rate * (1 - MOVING_AVG_ALPHA));
+ }
+
+ // reset counters
+ received_bytes_ = 0;
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = highest_seq_received_;
+
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ rounds_++;
+}
+
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+}
+
+void RTCState::updatePacketSize(const core::ContentObject &content_object) {
+ uint32_t pkt_size =
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) +
+ ((1 - MOVING_AVG_ALPHA) * pkt_size);
+}
+
+void RTCState::updatePathStats(const core::ContentObject &content_object,
+ bool is_nack) {
+ // get packet path
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ // compute rtt
+ uint32_t seq = content_object.getName().getSuffix();
+ uint64_t interest_sent_time = getInterestSentTime(seq);
+ if (interest_sent_time == 0)
+ return; // this should not happen,
+ // it means that we are processing an interest
+ // that is not pending
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint64_t RTT = now - interest_sent_time;
+
+ path->insertRttSample(RTT);
+
+ // compute OWD (the first part of the nack and data packet header are the
+ // same, so we cast to data data packet)
+ struct data_packet_t *packet =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t sender_timestamp = packet->getTimestamp();
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ // compute IAT or set path to producer
+ if (!is_nack) {
+ // compute the iat only for the content packets
+ uint32_t segment_number = content_object.getName().getSuffix();
+ path->computeInterArrivalGap(segment_number);
+ if (!path->pathToProducer()) received_data_from_cache_++;
+ } else {
+ path->receivedNack();
+ }
+}
+
+void RTCState::updateLossRate() {
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ uint32_t number_theorically_received_packets_ =
+ highest_seq_received_ - first_seq_in_round_;
+
+ // in this case no new packet was recevied after the previuos round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0) return;
+
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+
+ residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
+ (double)number_theorically_received_packets_);
+
+ if (residual_loss_rate_ < 0) residual_loss_rate_ = 0;
+}
+
+void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ pending_interests_.erase(seq);
+ if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
+ received_or_lost_packets_.erase(received_or_lost_packets_.begin());
+ }
+ // notice that it may happen that a packet that we consider lost arrives after
+ // some time, in this case we simply overwrite the packet state.
+ received_or_lost_packets_[seq] = state;
+
+ // keep track of the last packet received/lost
+ // without holes.
+ if (highest_seq_received_in_order_ < last_seq_nacked_) {
+ highest_seq_received_in_order_ = last_seq_nacked_;
+ }
+
+ if ((highest_seq_received_in_order_ + 1) == seq) {
+ highest_seq_received_in_order_ = seq;
+ } else if (seq <= highest_seq_received_in_order_) {
+ // here we do nothing
+ } else if (seq > highest_seq_received_in_order_) {
+ // 1) there is a gap in the sequence so we do not update largest_in_seq_
+ // 2) all the packets from largest_in_seq_ to seq are in
+ // received_or_lost_packets_ an we upate largest_in_seq_
+
+ for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
+ if (received_or_lost_packets_.find(i) ==
+ received_or_lost_packets_.end()) {
+ break;
+ }
+ // this packet is in order so we can update the
+ // highest_seq_received_in_order_
+ highest_seq_received_in_order_ = i;
+ }
+ }
+}
+
+void RTCState::setInitRttTimer(uint32_t wait){
+ init_rtt_timer_->cancel();
+ init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ init_rtt_timer_->async_wait([this](std::error_code ec) {
+ if(ec) return;
+ checkInitRttTimer();
+ });
+}
+
+void RTCState::checkInitRttTimer() {
+ if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){
+ // we didn't received enough probes, restart
+ received_probes_ = 0;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+ return;
+ }
+ init_rtt_ = true;
+ main_path_->roundEnd();
+ rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0);
+ rtt_probes_->sendProbes();
+
+ // init last_seq_nacked_. skip packets that may come from the cache
+ double prod_rate = getProducerRate();
+ double rtt = (double)getRTT() / MILLI_IN_A_SEC;
+ double packet_size = getAveragePacketSize();
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
+
+ discovered_rtt_callback_();
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
new file mode 100644
index 000000000..943a0a113
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -0,0 +1,253 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_data_path.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <map>
+#include <set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
+
+class RTCState : std::enable_shared_from_this<RTCState> {
+ public:
+ using DiscoveredRttCallback = std::function<void()>;
+ public:
+ RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service);
+
+ ~RTCState();
+
+ // packet events
+ void onSendNewInterest(const core::Name *interest_name);
+ void onTimeout(uint32_t seq);
+ void onRetransmission(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+ void onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats);
+ void onPacketLost(uint32_t seq);
+ void onPacketRecovered(uint32_t seq);
+ bool onProbePacketReceived(const core::ContentObject &probe);
+
+ // protocol state
+ void onNewRound(double round_len, bool in_sync);
+
+ // main path
+ uint32_t getProducerPath() const {
+ if (mainPathIsValid()) return main_path_->getPathId();
+ return 0;
+ }
+
+ // delay metrics
+ bool isRttDiscovered() const {
+ return init_rtt_;
+ }
+
+ uint64_t getRTT() const {
+ if (mainPathIsValid()) return main_path_->getMinRtt();
+ return 0;
+ }
+ void resetRttStats() {
+ if (mainPathIsValid()) main_path_->clearRtt();
+ }
+
+ double getQueuing() const {
+ if (mainPathIsValid()) return main_path_->getQueuingDealy();
+ return 0.0;
+ }
+ double getIAT() const {
+ if (mainPathIsValid()) return main_path_->getInterArrivalGap();
+ return 0.0;
+ }
+
+ double getJitter() const {
+ if (mainPathIsValid()) return main_path_->getJitter();
+ return 0.0;
+ }
+
+ // pending interests
+ uint64_t getInterestSentTime(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) return it->second;
+ return 0;
+ }
+ bool isPending(uint32_t seq) {
+ if (pending_interests_.find(seq) != pending_interests_.end()) return true;
+ return false;
+ }
+ uint32_t getPendingInterestNumber() const {
+ return pending_interests_.size();
+ }
+ PacketState isReceivedOrLost(uint32_t seq) {
+ auto it = received_or_lost_packets_.find(seq);
+ if (it != received_or_lost_packets_.end()) return it->second;
+ return PacketState::UNKNOWN;
+ }
+
+ // loss rate
+ double getLossRate() const { return loss_rate_; }
+ double getResidualLossRate() const { return residual_loss_rate_; }
+ uint32_t getHighestSeqReceivedInOrder() const {
+ return highest_seq_received_in_order_;
+ }
+ uint32_t getLostData() const { return packets_lost_; };
+ uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ // generic stats
+ uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedNacksInRound() const {
+ return received_nacks_last_round_;
+ }
+ uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
+ uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
+
+ // bandwidth/production metrics
+ double getAvailableBw() const { return 0.0; }; // TODO
+ double getProducerRate() const { return production_rate_; }
+ double getReceivedRate() const { return received_rate_; }
+ double getAveragePacketSize() const { return avg_packet_size_; }
+
+ // nacks
+ uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; }
+ uint32_t getLastSeqNacked() const { return last_seq_nacked_; }
+
+ // producer state
+ bool isProducerActive() const { return producer_is_active_; }
+
+ // packets from cache
+ double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() {
+ return pending_interests_.begin();
+ }
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() {
+ return pending_interests_.end();
+ }
+
+ private:
+ void initParams();
+
+ // update stats
+ void updateState();
+ void updateReceivedBytes(const core::ContentObject &content_object);
+ void updatePacketSize(const core::ContentObject &content_object);
+ void updatePathStats(const core::ContentObject &content_object, bool is_nack);
+ void updateLossRate();
+
+ void addRecvOrLost(uint32_t seq, PacketState state);
+
+ void setInitRttTimer(uint32_t wait);
+ void checkInitRttTimer();
+
+ bool mainPathIsValid() const {
+ if (main_path_ != nullptr)
+ return true;
+ else
+ return false;
+ }
+
+ // packets counters (total)
+ uint32_t sent_interests_;
+ uint32_t sent_rtx_;
+ uint32_t received_data_;
+ uint32_t received_nacks_;
+ uint32_t received_timeouts_;
+ uint32_t received_probes_;
+
+ // loss counters
+ int32_t packets_lost_;
+ int32_t losses_recovered_;
+ uint32_t first_seq_in_round_;
+ uint32_t highest_seq_received_;
+ uint32_t highest_seq_received_in_order_;
+ uint32_t last_seq_nacked_; // segment for which we got an oldNack
+ double loss_rate_;
+ double residual_loss_rate_;
+
+ // bw counters
+ uint32_t received_bytes_;
+ double avg_packet_size_;
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer
+
+ // nack counter
+ // the bool takes tracks only about the valid nacks (no rtx) and it is used to
+ // switch between the states. Instead received_nacks_last_round_ logs all the
+ // nacks for statistics
+ bool nack_on_last_round_;
+ uint32_t received_nacks_last_round_;
+
+ // packets counter
+ uint32_t received_packets_last_round_;
+ uint32_t received_data_last_round_;
+ uint32_t received_data_from_cache_;
+ double data_from_cache_rate_;
+ uint32_t sent_interests_last_round_;
+ uint32_t sent_rtx_last_round_;
+
+ // round conunters
+ uint32_t rounds_;
+ uint32_t rounds_without_nacks_;
+ uint32_t rounds_without_packets_;
+
+ // init rtt
+ uint64_t first_interest_sent_;
+
+ // producer state
+ bool
+ producer_is_active_; // the prodcuer is active if we receive some packets
+ uint32_t last_production_seq_; // last production seq received by the producer
+ uint64_t last_prod_update_; // timestamp of the last packets used to update
+ // stats from the producer
+
+ // paths stats
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
+ std::shared_ptr<RTCDataPath> main_path_;
+
+ // packet received
+ // cache where to store info about the last MAX_CACHED_PACKETS
+ std::map<uint32_t, PacketState> received_or_lost_packets_;
+
+ // pending interests
+ std::map<uint32_t, uint64_t> pending_interests_;
+
+ // probes
+ std::shared_ptr<ProbeHandler> rtt_probes_;
+ bool init_rtt_;
+ std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+
+ // callbacks
+ DiscoveredRttCallback discovered_rtt_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc
new file mode 100644
index 000000000..7a0803857
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.cc
@@ -0,0 +1,334 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc
+
+#include "trendline_estimator.h"
+
+#include <math.h>
+
+#include <algorithm>
+#include <string>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// Parameters for linear least squares fit of regression line to noisy data.
+constexpr double kDefaultTrendlineSmoothingCoeff = 0.9;
+constexpr double kDefaultTrendlineThresholdGain = 4.0;
+// const char kBweWindowSizeInPacketsExperiment[] =
+// "WebRTC-BweWindowSizeInPackets";
+
+/*size_t ReadTrendlineFilterWindowSize(
+ const WebRtcKeyValueConfig* key_value_config) {
+ std::string experiment_string =
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment);
+ size_t window_size;
+ int parsed_values =
+ sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size);
+ if (parsed_values == 1) {
+ if (window_size > 1)
+ return window_size;
+ RTC_LOG(WARNING) << "Window size must be greater than 1.";
+ }
+ RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets"
+ " experiment from field trial string. Using default.";
+ return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize;
+}
+*/
+
+OptionalDouble LinearFitSlope(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets) {
+ // RTC_DCHECK(packets.size() >= 2);
+ // Compute the "center of mass".
+ double sum_x = 0;
+ double sum_y = 0;
+ for (const auto& packet : packets) {
+ sum_x += packet.arrival_time_ms;
+ sum_y += packet.smoothed_delay_ms;
+ }
+ double x_avg = sum_x / packets.size();
+ double y_avg = sum_y / packets.size();
+ // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2
+ double numerator = 0;
+ double denominator = 0;
+ for (const auto& packet : packets) {
+ double x = packet.arrival_time_ms;
+ double y = packet.smoothed_delay_ms;
+ numerator += (x - x_avg) * (y - y_avg);
+ denominator += (x - x_avg) * (x - x_avg);
+ }
+ if (denominator == 0) return OptionalDouble();
+ return OptionalDouble(numerator / denominator);
+}
+
+OptionalDouble ComputeSlopeCap(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets,
+ const TrendlineEstimatorSettings& settings) {
+ /*RTC_DCHECK(1 <= settings.beginning_packets &&
+ settings.beginning_packets < packets.size());
+ RTC_DCHECK(1 <= settings.end_packets &&
+ settings.end_packets < packets.size());
+ RTC_DCHECK(settings.beginning_packets + settings.end_packets <=
+ packets.size());*/
+ TrendlineEstimator::PacketTiming early = packets[0];
+ for (size_t i = 1; i < settings.beginning_packets; ++i) {
+ if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i];
+ }
+ size_t late_start = packets.size() - settings.end_packets;
+ TrendlineEstimator::PacketTiming late = packets[late_start];
+ for (size_t i = late_start + 1; i < packets.size(); ++i) {
+ if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i];
+ }
+ if (late.arrival_time_ms - early.arrival_time_ms < 1) {
+ return OptionalDouble();
+ }
+ return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) /
+ (late.arrival_time_ms - early.arrival_time_ms) +
+ settings.cap_uncertainty);
+}
+
+constexpr double kMaxAdaptOffsetMs = 15.0;
+constexpr double kOverUsingTimeThreshold = 10;
+constexpr int kMinNumDeltas = 60;
+constexpr int kDeltaCounterMax = 1000;
+
+//} // namespace
+
+constexpr char TrendlineEstimatorSettings::kKey[];
+
+TrendlineEstimatorSettings::TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/) {
+ /*if (absl::StartsWith(
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment),
+ "Enabled")) {
+ window_size = ReadTrendlineFilterWindowSize(key_value_config);
+ }
+ Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/
+ window_size = kDefaultTrendlineWindowSize;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+
+ /*if (window_size < 10 || 200 < window_size) {
+ RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets";
+ window_size = kDefaultTrendlineWindowSize;
+ }
+ if (enable_cap) {
+ if (beginning_packets < 1 || end_packets < 1 ||
+ beginning_packets > window_size || end_packets > window_size) {
+ RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and "
+ << window_size;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (beginning_packets + end_packets > window_size) {
+ RTC_LOG(LS_WARNING)
+ << "Size of beginning plus end can't exceed the window size";
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) {
+ RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025";
+ cap_uncertainty = 0.0;
+ }
+ }*/
+}
+
+/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() {
+ return StructParametersParser::Create("sort", &enable_sort, //
+ "cap", &enable_cap, //
+ "beginning_packets",
+ &beginning_packets, //
+ "end_packets", &end_packets, //
+ "cap_uncertainty", &cap_uncertainty, //
+ "window_size", &window_size);
+}*/
+
+TrendlineEstimator::TrendlineEstimator(
+ /*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/)
+ : settings_(),
+ smoothing_coef_(kDefaultTrendlineSmoothingCoeff),
+ threshold_gain_(kDefaultTrendlineThresholdGain),
+ num_of_deltas_(0),
+ first_arrival_time_ms_(-1),
+ accumulated_delay_(0),
+ smoothed_delay_(0),
+ delay_hist_(),
+ k_up_(0.0087),
+ k_down_(0.039),
+ overusing_time_threshold_(kOverUsingTimeThreshold),
+ threshold_(12.5),
+ prev_modified_trend_(NAN),
+ last_update_ms_(-1),
+ prev_trend_(0.0),
+ time_over_using_(-1),
+ overuse_counter_(0),
+ hypothesis_(BandwidthUsage::kBwNormal){
+ // hypothesis_predicted_(BandwidthUsage::kBwNormal){//},
+ // network_state_predictor_(network_state_predictor) {
+ /* RTC_LOG(LS_INFO)
+ << "Using Trendline filter for delay change estimation with settings "
+ << settings_.Parser()->Encode() << " and "
+ // << (network_state_predictor_ ? "injected" : "no")
+ << " network state predictor";*/
+}
+
+TrendlineEstimator::~TrendlineEstimator() {}
+
+void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
+ double send_delta_ms,
+ int64_t send_time_ms,
+ int64_t arrival_time_ms,
+ size_t packet_size) {
+ const double delta_ms = recv_delta_ms - send_delta_ms;
+ ++num_of_deltas_;
+ num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
+ if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms;
+
+ // Exponential backoff filter.
+ accumulated_delay_ += delta_ms;
+ // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
+ // accumulated_delay_);
+ smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
+ (1 - smoothing_coef_) * accumulated_delay_;
+ // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
+ // smoothed_delay_);
+
+ // Maintain packet window
+ delay_hist_.emplace_back(
+ static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
+ smoothed_delay_, accumulated_delay_);
+ if (settings_.enable_sort) {
+ for (size_t i = delay_hist_.size() - 1;
+ i > 0 &&
+ delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
+ --i) {
+ std::swap(delay_hist_[i], delay_hist_[i - 1]);
+ }
+ }
+ if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front();
+
+ // Simple linear regression.
+ double trend = prev_trend_;
+ if (delay_hist_.size() == settings_.window_size) {
+ // Update trend_ if it is possible to fit a line to the data. The delay
+ // trend can be seen as an estimate of (send_rate - capacity)/capacity.
+ // 0 < trend < 1 -> the delay increases, queues are filling up
+ // trend == 0 -> the delay does not change
+ // trend < 0 -> the delay decreases, queues are being emptied
+ OptionalDouble trendO = LinearFitSlope(delay_hist_);
+ if (trendO.has_value()) trend = trendO.value();
+ if (settings_.enable_cap) {
+ OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_);
+ // We only use the cap to filter out overuse detections, not
+ // to detect additional underuses.
+ if (trend >= 0 && cap.has_value() && trend > cap.value()) {
+ trend = cap.value();
+ }
+ }
+ }
+ // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
+
+ Detect(trend, send_delta_ms, arrival_time_ms);
+}
+
+void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size, bool calculated_deltas) {
+ if (calculated_deltas) {
+ UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
+ packet_size);
+ }
+ /*if (network_state_predictor_) {
+ hypothesis_predicted_ = network_state_predictor_->Update(
+ send_time_ms, arrival_time_ms, hypothesis_);
+ }*/
+}
+
+BandwidthUsage TrendlineEstimator::State() const {
+ return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_;
+}
+
+void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
+ /*if (num_of_deltas_ < 2) {
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ return;
+ }*/
+
+ const double modified_trend =
+ std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
+ prev_modified_trend_ = modified_trend;
+ // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
+ // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
+ if (modified_trend > threshold_) {
+ if (time_over_using_ == -1) {
+ // Initialize the timer. Assume that we've been
+ // over-using half of the time since the previous
+ // sample.
+ time_over_using_ = ts_delta / 2;
+ } else {
+ // Increment timer
+ time_over_using_ += ts_delta;
+ }
+ overuse_counter_++;
+ if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
+ if (trend >= prev_trend_) {
+ time_over_using_ = 0;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwOverusing;
+ }
+ }
+ } else if (modified_trend < -threshold_) {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwUnderusing;
+ } else {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ }
+ prev_trend_ = trend;
+ UpdateThreshold(modified_trend, now_ms);
+}
+
+void TrendlineEstimator::UpdateThreshold(double modified_trend,
+ int64_t now_ms) {
+ if (last_update_ms_ == -1) last_update_ms_ = now_ms;
+
+ if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
+ // Avoid adapting the threshold to big latency spikes, caused e.g.,
+ // by a sudden capacity drop.
+ last_update_ms_ = now_ms;
+ return;
+ }
+
+ const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
+ const int64_t kMaxTimeDeltaMs = 100;
+ int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
+ threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
+ if (threshold_ < 6.f) threshold_ = 6.f;
+ if (threshold_ > 600.f) threshold_ = 600.f;
+ // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
+ last_update_ms_ = now_ms;
+}
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h
new file mode 100644
index 000000000..372acbc67
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.h
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h
+
+#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <deque>
+#include <memory>
+#include <utility>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class OptionalDouble {
+ public:
+ OptionalDouble() : val(0), has_val(false){};
+ OptionalDouble(double val) : val(val), has_val(true){};
+
+ double value() { return val; }
+ bool has_value() { return has_val; }
+
+ private:
+ double val;
+ bool has_val;
+};
+
+enum class BandwidthUsage {
+ kBwNormal = 0,
+ kBwUnderusing = 1,
+ kBwOverusing = 2,
+ kLast
+};
+
+struct TrendlineEstimatorSettings {
+ static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings";
+ static constexpr unsigned kDefaultTrendlineWindowSize = 20;
+
+ // TrendlineEstimatorSettings() = delete;
+ TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/);
+
+ // Sort the packets in the window. Should be redundant,
+ // but then almost no cost.
+ bool enable_sort = false;
+
+ // Cap the trendline slope based on the minimum delay seen
+ // in the beginning_packets and end_packets respectively.
+ bool enable_cap = false;
+ unsigned beginning_packets = 7;
+ unsigned end_packets = 7;
+ double cap_uncertainty = 0.0;
+
+ // Size (in packets) of the window.
+ unsigned window_size = kDefaultTrendlineWindowSize;
+
+ // std::unique_ptr<StructParametersParser> Parser();
+};
+
+class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ {
+ public:
+ TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/);
+
+ ~TrendlineEstimator();
+
+ // Update the estimator with a new sample. The deltas should represent deltas
+ // between timestamp groups as defined by the InterArrival class.
+ void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms,
+ int64_t arrival_time_ms, size_t packet_size,
+ bool calculated_deltas);
+
+ void UpdateTrendline(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size);
+
+ BandwidthUsage State() const;
+
+ struct PacketTiming {
+ PacketTiming(double arrival_time_ms, double smoothed_delay_ms,
+ double raw_delay_ms)
+ : arrival_time_ms(arrival_time_ms),
+ smoothed_delay_ms(smoothed_delay_ms),
+ raw_delay_ms(raw_delay_ms) {}
+ double arrival_time_ms;
+ double smoothed_delay_ms;
+ double raw_delay_ms;
+ };
+
+ private:
+ // friend class GoogCcStatePrinter;
+ void Detect(double trend, double ts_delta, int64_t now_ms);
+
+ void UpdateThreshold(double modified_offset, int64_t now_ms);
+
+ // Parameters.
+ TrendlineEstimatorSettings settings_;
+ const double smoothing_coef_;
+ const double threshold_gain_;
+ // Used by the existing threshold.
+ int num_of_deltas_;
+ // Keep the arrival times small by using the change from the first packet.
+ int64_t first_arrival_time_ms_;
+ // Exponential backoff filtering.
+ double accumulated_delay_;
+ double smoothed_delay_;
+ // Linear least squares regression.
+ std::deque<PacketTiming> delay_hist_;
+
+ const double k_up_;
+ const double k_down_;
+ double overusing_time_threshold_;
+ double threshold_;
+ double prev_modified_trend_;
+ int64_t last_update_ms_;
+ double prev_trend_;
+ double time_over_using_;
+ int overuse_counter_;
+ BandwidthUsage hypothesis_;
+ // BandwidthUsage hypothesis_predicted_;
+ // NetworkStatePredictor* network_state_predictor_;
+
+ // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator);
+};
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
+#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
diff --git a/libtransport/src/protocols/transport_protocol.cc b/libtransport/src/protocols/transport_protocol.cc
new file mode 100644
index 000000000..611c39212
--- /dev/null
+++ b/libtransport/src/protocols/transport_protocol.cc
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <protocols/transport_protocol.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol)
+ : socket_(icn_socket),
+ reassembly_protocol_(reassembly_protocol),
+ index_manager_(
+ std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
+ is_running_(false),
+ is_first_(false),
+ on_interest_retransmission_(VOID_HANDLER),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ on_payload_(VOID_HANDLER) {
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+ socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
+}
+
+int TransportProtocol::start() {
+ // If the protocol is already running, return otherwise set as running
+ if (is_running_) return -1;
+
+ // Get all callbacks references
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
+
+ // Set it is the first time we schedule an interest
+ is_first_ = true;
+
+ // Reset the protocol state machine
+ reset();
+ // Schedule next interests
+ scheduleNextInterests();
+
+ is_first_ = false;
+
+ // Set the protocol as running
+ is_running_ = true;
+
+ if (!is_async_) {
+ // Start Event loop
+ portal_->runEventsLoop();
+
+ // Not running anymore
+ is_running_ = false;
+ }
+
+ return 0;
+}
+
+void TransportProtocol::stop() {
+ is_running_ = false;
+
+ if (!is_async_) {
+ portal_->stopEventsLoop();
+ } else {
+ portal_->clear();
+ }
+}
+
+void TransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ scheduleNextInterests();
+
+ portal_->runEventsLoop();
+
+ is_running_ = false;
+}
+
+void TransportProtocol::onContentReassembled(std::error_code ec) {
+ stop();
+
+ if (!on_payload_) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload_->readSuccess(stats_->getBytesRecv());
+ } else {
+ on_payload_->readError(ec);
+ }
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/transport_protocol.h b/libtransport/src/protocols/transport_protocol.h
new file mode 100644
index 000000000..124c57122
--- /dev/null
+++ b/libtransport/src/protocols/transport_protocol.h
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/statistics.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <implementation/socket.h>
+#include <protocols/data_processing_events.h>
+#include <protocols/indexer.h>
+#include <protocols/reassembly.h>
+
+#include <atomic>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+class IndexVerificationManager;
+
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
+class TransportProtocolCallback {
+ virtual void onContentObject(const core::Interest &interest,
+ const core::ContentObject &content_object) = 0;
+ virtual void onTimeout(const core::Interest &interest) = 0;
+};
+
+class TransportProtocol : public core::Portal::ConsumerCallback,
+ public ContentObjectProcessingEventCallback {
+ static constexpr std::size_t interest_pool_size = 4096;
+
+ friend class ManifestIndexManager;
+
+ public:
+ TransportProtocol(implementation::ConsumerSocket *icn_socket,
+ Reassembly *reassembly_protocol);
+
+ virtual ~TransportProtocol() = default;
+
+ TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
+
+ virtual int start();
+
+ virtual void stop();
+
+ virtual void resume();
+
+ virtual void scheduleNextInterests() = 0;
+
+ // Events generated by the indexing
+ virtual void onContentReassembled(std::error_code ec);
+ virtual void onPacketDropped(Interest &interest,
+ ContentObject &content_object) override = 0;
+ virtual void onReassemblyFailed(std::uint32_t missing_segment) override = 0;
+
+ protected:
+ // Consumer Callback
+ virtual void reset() = 0;
+ virtual void onContentObject(Interest &i, ContentObject &c) override = 0;
+ virtual void onTimeout(Interest::Ptr &&i) override = 0;
+ virtual void onError(std::error_code ec) override {}
+
+ protected:
+ implementation::ConsumerSocket *socket_;
+ std::unique_ptr<Reassembly> reassembly_protocol_;
+ std::unique_ptr<IndexManager> index_manager_;
+ std::shared_ptr<core::Portal> portal_;
+ std::atomic<bool> is_running_;
+ // True if it si the first time we schedule an interest
+ std::atomic<bool> is_first_;
+ interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ ReadCallback *on_payload_;
+
+ bool is_async_;
+};
+
+} // end namespace protocol
+} // end namespace transport