aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc
diff options
context:
space:
mode:
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/protocols/rtc
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/protocols/rtc')
-rw-r--r--libtransport/src/protocols/rtc/CMakeLists.txt38
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.cc101
-rw-r--r--libtransport/src/protocols/rtc/congestion_detection.h138
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.cc107
-rw-r--r--libtransport/src/protocols/rtc/probe_handler.h75
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc607
-rw-r--r--libtransport/src/protocols/rtc/rtc.h113
-rw-r--r--libtransport/src/protocols/rtc/rtc_consts.h121
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.cc197
-rw-r--r--libtransport/src/protocols/rtc/rtc_data_path.h97
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.cc427
-rw-r--r--libtransport/src/protocols/rtc/rtc_ldr.h108
-rw-r--r--libtransport/src/protocols/rtc/rtc_packet.h89
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc.h58
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.cc79
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_frame.h46
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.cc106
-rw-r--r--libtransport/src/protocols/rtc/rtc_rc_queue.h47
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc560
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.h253
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.cc334
-rw-r--r--libtransport/src/protocols/rtc/trendline_estimator.h147
22 files changed, 3848 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt
new file mode 100644
index 000000000..77f065d0e
--- /dev/null
+++ b/libtransport/src/protocols/rtc/CMakeLists.txt
@@ -0,0 +1,38 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc
new file mode 100644
index 000000000..e2d44ae66
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.cc
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/utils/log.h>
+#include <protocols/rtc/congestion_detection.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+CongestionDetection::CongestionDetection()
+ : cc_estimator_(), last_processed_chunk_() {}
+
+CongestionDetection::~CongestionDetection() {}
+
+void CongestionDetection::updateStats() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_number_.empty()) return;
+
+ uint32_t chunk_number = chunks_number_.front();
+
+ while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS <
+ now ||
+ chunks_[chunk_number].isComplete()) {
+ if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) {
+ chunks_[chunk_number].setPreviousSentTime(
+ last_processed_chunk_.getSentTime());
+
+ chunks_[chunk_number].setPreviousReceivedTime(
+ last_processed_chunk_.getReceivedTime());
+ cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(),
+ chunks_[chunk_number].getSentDelta(),
+ chunks_[chunk_number].getSentTime(),
+ chunks_[chunk_number].getReceivedTime(),
+ chunks_[chunk_number].getFrameSize(), true);
+
+ } else {
+ TRANSPORT_LOGD(
+ "CongestionDetection::updateStats frame %u but not the \
+ previous one, last one was %u currentFrame %u",
+ chunk_number, last_processed_chunk_.getFrameSeqNum(),
+ chunks_[chunk_number].getFrameSeqNum());
+ }
+
+ last_processed_chunk_ = chunks_[chunk_number];
+
+ chunks_.erase(chunk_number);
+
+ chunks_number_.pop();
+ if (chunks_number_.empty()) break;
+
+ chunk_number = chunks_number_.front();
+ }
+}
+
+void CongestionDetection::addPacket(const core::ContentObject &content_object) {
+ auto payload = content_object.getPayload();
+ uint32_t payload_size = (uint32_t)payload->length();
+ uint32_t segmentNumber = content_object.getName().getSuffix();
+ // uint32_t pkt = segmentNumber & modMask_;
+ uint64_t *sentTimePtr = (uint64_t *)payload->data();
+
+ // this is just for testing with hiperf, assuming a frame is 10 pkts
+ // in the final version, the split should be based on the timestamp in the pkt
+ uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE);
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (chunks_.find(frameNum) == chunks_.end()) {
+ // new chunk of pkts or out of order
+ if (last_processed_chunk_.getFrameSeqNum() > frameNum)
+ return; // out of order and we already processed the chunk
+
+ chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE);
+ chunks_number_.push(frameNum);
+ }
+
+ chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size);
+}
+
+} // namespace rtc
+} // namespace protocol
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h
new file mode 100644
index 000000000..17f4aa54c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/congestion_detection.h
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/content_object.h>
+#include <protocols/rtc/trendline_estimator.h>
+
+#include <map>
+#include <queue>
+
+#define HICN_CC_STATS_CHUNK_SIZE 10
+#define HICN_CC_STATS_MAX_DELAY_MS 100
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class FrameStats {
+ public:
+ FrameStats()
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(HICN_CC_STATS_CHUNK_SIZE){};
+
+ FrameStats(uint32_t burst_size)
+ : frame_num_(0),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(0),
+ received_time_(0),
+ previous_sent_time_(0),
+ previous_received_time_(0),
+ size_(0),
+ received_pkt_m(0),
+ burst_size_m(burst_size){};
+
+ FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time,
+ uint32_t size, FrameStats previousFrame, uint32_t burst_size)
+ : frame_num_(frame_num),
+ sent_time_(sent_time),
+ received_time_(received_time),
+ previous_sent_time_(previousFrame.getSentTime()),
+ previous_received_time_(previousFrame.getReceivedTime()),
+ size_(size),
+ received_pkt_m(1),
+ burst_size_m(burst_size){};
+
+ void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) {
+ size_ += size;
+ sent_time_ =
+ (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time);
+ received_time_ = std::max(received_time, received_time_);
+ received_pkt_m++;
+ }
+
+ bool isComplete() { return received_pkt_m == burst_size_m; }
+
+ uint32_t getFrameSeqNum() const { return frame_num_; }
+ uint64_t getSentTime() const { return sent_time_; }
+ uint64_t getReceivedTime() const { return received_time_; }
+ uint32_t getFrameSize() const { return size_; }
+
+ void setPreviousReceivedTime(uint64_t time) {
+ previous_received_time_ = time;
+ }
+ void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; }
+
+ // todo manage first frame
+ double getReceivedDelta() {
+ return static_cast<double>(received_time_ - previous_received_time_);
+ }
+ double getSentDelta() {
+ return static_cast<double>(sent_time_ - previous_sent_time_);
+ }
+
+ private:
+ uint32_t frame_num_;
+ uint64_t sent_time_;
+ uint64_t received_time_;
+
+ uint64_t previous_sent_time_;
+ uint64_t previous_received_time_;
+ uint32_t size_;
+
+ uint32_t received_pkt_m;
+ uint32_t burst_size_m;
+};
+
+class CongestionDetection {
+ public:
+ CongestionDetection();
+ ~CongestionDetection();
+
+ void addPacket(const core::ContentObject &content_object);
+
+ BandwidthUsage getState() { return cc_estimator_.State(); }
+
+ void updateStats();
+
+ private:
+ TrendlineEstimator cc_estimator_;
+ std::map<uint32_t, FrameStats> chunks_;
+ std::queue<uint32_t> chunks_number_;
+
+ FrameStats last_processed_chunk_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc
new file mode 100644
index 000000000..efba362d4
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.cc
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_consts.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+ProbeHandler::ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service)
+ : probe_interval_(0),
+ max_probes_(0),
+ sent_probes_(0),
+ probe_timer_(std::make_unique<asio::steady_timer>(io_service)),
+ rand_eng_((std::random_device())()),
+ distr_(MIN_RTT_PROBE_SEQ, MAX_RTT_PROBE_SEQ),
+ send_probe_callback_(std::move(send_callback)) {}
+
+ProbeHandler::~ProbeHandler() {}
+
+uint64_t ProbeHandler::getRtt(uint32_t seq) {
+ auto it = pending_probes_.find(seq);
+
+ if (it == pending_probes_.end()) return 0;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t rtt = now - it->second;
+ if(rtt < 1) rtt = 1;
+
+ pending_probes_.erase(it);
+
+ return rtt;
+}
+
+void ProbeHandler::setProbes(uint32_t probe_interval, uint32_t max_probes) {
+ stopProbes();
+ probe_interval_ = probe_interval;
+ max_probes_ = max_probes;
+}
+
+void ProbeHandler::stopProbes() {
+ probe_interval_ = 0;
+ max_probes_ = 0;
+ sent_probes_ = 0;
+ probe_timer_->cancel();
+}
+
+void ProbeHandler::sendProbes() {
+ if (probe_interval_ == 0) return;
+ if (max_probes_ != 0 && sent_probes_ >= max_probes_) return;
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint32_t seq = distr_(rand_eng_);
+ pending_probes_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+ send_probe_callback_(seq);
+ sent_probes_++;
+
+ // clean up
+ // a probe may get lost. if the pending_probes_ size becomes bigger than
+ // MAX_PENDING_PROBES remove all the probes older than a seconds
+ if (pending_probes_.size() > MAX_PENDING_PROBES) {
+ for (auto it = pending_probes_.begin(); it != pending_probes_.end();) {
+ if ((now - it->second) > 1000)
+ it = pending_probes_.erase(it);
+ else
+ it++;
+ }
+ }
+
+ if (probe_interval_ == 0) return;
+
+ std::weak_ptr<ProbeHandler> self(shared_from_this());
+ probe_timer_->expires_from_now(std::chrono::microseconds(probe_interval_));
+ probe_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sendProbes();
+ }
+ });
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h
new file mode 100644
index 000000000..b8ed84445
--- /dev/null
+++ b/libtransport/src/protocols/rtc/probe_handler.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <hicn/transport/config.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <random>
+#include <unordered_map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> {
+ public:
+ using SendProbeCallback = std::function<void(uint32_t)>;
+
+ public:
+ ProbeHandler(SendProbeCallback &&send_callback,
+ asio::io_service &io_service);
+
+ ~ProbeHandler();
+
+ // if the function returns 0 the probe is not valaid
+ uint64_t getRtt(uint32_t seq);
+
+ // reset the probes parameters. it stop the current probing.
+ // to restar call sendProbes.
+ // probe_interval = 0 means that no event will be scheduled
+ // max_probe = 0 means no limit to the number of probe to send
+ void setProbes(uint32_t probe_interval, uint32_t max_probes);
+
+ // stop to schedule probes
+ void stopProbes();
+
+ void sendProbes();
+
+ private:
+ uint32_t probe_interval_; // us
+ uint32_t max_probes_; // packets
+ uint32_t sent_probes_; // packets
+
+ std::unique_ptr<asio::steady_timer> probe_timer_;
+
+ // map from seqnumber to timestamp
+ std::unordered_map<uint32_t, uint64_t> pending_probes_;
+
+ // random generator
+ std::default_random_engine rand_eng_;
+ std::uniform_int_distribution<uint32_t> distr_;
+
+ SendProbeCallback send_probe_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
new file mode 100644
index 000000000..bb95ab686
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -0,0 +1,607 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <math.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
+ number_(0) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ scheduler_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {}
+
+void RTCTransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ newRound();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+// private
+void RTCTransportProtocol::initParams() {
+ portal_->setConsumerCallback(this);
+
+ rc_ = std::make_shared<RTCRateControlQueue>();
+ ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ std::bind(&RTCTransportProtocol::sendRtxInterest, this,
+ std::placeholders::_1),
+ portal_->getIoService());
+
+ state_ = std::make_shared<RTCState>(
+ std::bind(&RTCTransportProtocol::sendProbeInterest, this,
+ std::placeholders::_1),
+ std::bind(&RTCTransportProtocol::discoveredRtt, this),
+ portal_->getIoService());
+
+ rc_->setState(state_);
+ // TODO: for the moment we keep the congestion control disabled
+ // rc_->tunrOnRateControl();
+ ldr_->setState(state_);
+
+ // protocol state
+ start_send_interest_ = false;
+ current_state_ = SyncState::catch_up;
+
+ // Cancel timer
+ number_++;
+ round_timer_->cancel();
+ scheduler_timer_->cancel();
+ scheduler_timer_on_ = false;
+
+ // delete all timeouts and future nacks
+ timeouts_or_nacks_.clear();
+
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ // names/packets var
+ next_segment_ = 0;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ RTC_INTEREST_LIFETIME);
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ TRANSPORT_LOGD("reset called");
+ initParams();
+ newRound();
+}
+
+void RTCTransportProtocol::inactiveProducer() {
+ // when the producer is inactive we reset the consumer state
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
+ max_sync_win_);
+
+ // names/packets var
+ next_segment_ = 0;
+
+ ldr_->clear();
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
+ // TODO pass weak_ptr here
+ round_timer_->async_wait([this, n{number_}](std::error_code ec) {
+ if (ec) return;
+
+ if (n != number_) {
+ return;
+ }
+
+ // saving counters that will be reset on new round
+ uint32_t sent_retx = state_->getSentRtxInRound();
+ uint32_t received_bytes = state_->getReceivedBytesInRound();
+ uint32_t sent_interest = state_->getSentInterestInRound();
+ uint32_t lost_data = state_->getLostData();
+ uint32_t recovered_losses = state_->getRecoveredLosses();
+ uint32_t received_nacks = state_->getReceivedNacksInRound();
+
+ bool in_sync = (current_state_ == SyncState::in_sync);
+ state_->onNewRound((double)ROUND_LEN, in_sync);
+ rc_->onNewRound((double)ROUND_LEN);
+
+ // update sync state if needed
+ if (current_state_ == SyncState::in_sync) {
+ double cache_rate = state_->getPacketFromCacheRatio();
+ if (cache_rate > MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::catch_up;
+ }
+ } else {
+ double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate = state_->getReceivedRate();
+ uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
+ double cache_ratio = state_->getPacketFromCacheRatio();
+ if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
+ received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::in_sync;
+ }
+ }
+
+ TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ updateSyncWindow();
+
+ sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ recovered_losses, received_nacks);
+ newRound();
+ });
+}
+
+void RTCTransportProtocol::discoveredRtt() {
+ start_send_interest_ = true;
+ ldr_->turnOnRTX();
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::computeMaxSyncWindow() {
+ double production_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ if (production_rate == 0.0 || packet_size == 0.0) {
+ // the consumer has no info about the producer,
+ // keep the previous maxCWin
+ TRANSPORT_LOGD(
+ "Returning in computeMaxSyncWindow because: prod_rate: %d || "
+ "packet_size: %d",
+ (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ return;
+ }
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
+
+
+ max_sync_win_ =
+ (uint32_t)ceil((production_rate * lifetime_ms *
+ INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+}
+
+void RTCTransportProtocol::updateSyncWindow() {
+ computeMaxSyncWindow();
+
+ if (max_sync_win_ == INITIAL_WIN_MAX) {
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return;
+
+ current_sync_win_ = INITIAL_WIN;
+ scheduleNextInterests();
+ return;
+ }
+
+ double prod_rate = state_->getProducerRate();
+ double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = state_->getAveragePacketSize();
+
+ // if some of the info are not available do not update the current win
+ if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ current_sync_win_ +=
+ ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+
+ if(current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
+ }
+
+ current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ }
+
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::decreaseSyncWindow() {
+ // called on future nack
+ // we have a new sample of the production rate, so update max win first
+ computeMaxSyncWindow();
+ current_sync_win_--;
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::sendInterest(Name *interest_name) {
+ TRANSPORT_LOGD("Sending interest for name %s",
+ interest_name->toString().c_str());
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(*interest_name);
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ interest->setLifetime(uint32_t(lifetime));
+
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send rtx %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send probe %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ TRANSPORT_LOGD("Schedule next interests");
+
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return; // RTT discovering phase is not finished so
+ // do not start to send interests
+
+ if (scheduler_timer_on_) return; // wait befor send other interests
+
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ TRANSPORT_LOGD("Inactive producer.");
+ // here we keep seding the same interest until the producer
+ // does not start again
+ if (next_segment_ != 0) {
+ // the producer just become inactive, reset the state
+ inactiveProducer();
+ }
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ interest_name->setSuffix(next_segment_);
+
+ if (portal_->interestIsPending(*interest_name)) {
+ // if interest 0 is already pending we return
+ return;
+ }
+
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+ return;
+ }
+
+ TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
+ state_->getPendingInterestNumber(), current_sync_win_);
+
+ // skip nacked pacekts
+ if (next_segment_ <= state_->getLastSeqNacked()) {
+ next_segment_ = state_->getLastSeqNacked() + 1;
+ }
+
+ // skipe received packets
+ if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
+ next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ uint32_t sent_interests = 0;
+ while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ (sent_interests < MAX_INTERESTS_IN_BATCH)) {
+ TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ interest_name->setSuffix(next_segment_);
+
+ // send the packet only if:
+ // 1) it is not pending yet (not true for rtx)
+ // 2) the packet is not received or lost
+ // 3) is not in the rtx list
+ if (portal_->interestIsPending(*interest_name) ||
+ state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_segment_)) {
+ TRANSPORT_LOGD(
+ "skip interest %u because: pending %u, recv %u, rtx %u",
+ next_segment_, (portal_->interestIsPending(*interest_name)),
+ (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
+ (ldr_->isRtx(next_segment_)));
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ continue;
+ }
+
+
+ sent_interests++;
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ }
+
+ if (state_->getPendingInterestNumber() < current_sync_win_) {
+ // we still have space in the window but we already sent a batch of
+ // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
+ // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(
+ std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ }
+}
+
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ uint32_t segment_number = interest->getName().getSuffix();
+
+ TRANSPORT_LOGD("timeout for packet %u", segment_number);
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ timeouts_or_nacks_.insert(segment_number);
+
+ if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
+ segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ // we retransmit packets only if the producer is active, otherwise we
+ // use timeouts to avoid to send too much traffic
+ //
+ // a timeout is sent using RTX only if it is an old packet. if it is for a
+ // seq number that we didn't reach yet, we send the packet using the normal
+ // schedule next interest
+ TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
+ ldr_->onTimeout(segment_number);
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+ return;
+ }
+
+ TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
+ segment_number);
+
+ if (segment_number < next_segment_) {
+ // this is a timeout for a packet that will be generated in the future but
+ // we are asking for higher sequence numbers. we need to go back like in the
+ // case of future nacks
+ TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
+ next_segment_, segment_number);
+ next_segment_ = segment_number;
+ }
+
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ struct nack_packet_t *nack =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = nack->getProductionSegement();
+ uint32_t nack_segment = content_object.getName().getSuffix();
+ bool is_rtx = ldr_->isRtx(nack_segment);
+
+ // check if the packet got a timeout
+
+ TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
+ production_seg);
+
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(nack_segment);
+ if (tn_it != timeouts_or_nacks_.end() || is_rtx) {
+ compute_stats = false;
+ // remove packets from timeouts_or_nacks only in case of a past nack
+ }
+
+ state_->onNackPacketReceived(content_object, compute_stats);
+ ldr_->onNackPacketReceived(content_object);
+
+ // both in case of past and future nack we set next_segment_ equal to the
+ // production segment in the nack. In case of past nack we will skip unneded
+ // interest (this is already done in the scheduleNextInterest in any case)
+ // while in case of future nacks we can go back in time and ask again for the
+ // content that generated the nack
+ TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ if (production_seg > nack_segment) {
+ // remove the nack is it exists
+ if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+
+ // the client is asking for content in the past
+ // switch to catch up state and increase the window
+ // this is true only if the packet is not an RTX
+ if (!is_rtx) current_state_ = SyncState::catch_up;
+
+ updateSyncWindow();
+ } else {
+ // if production_seg == nack_segment we consider this a future nack, since
+ // production_seg is not yet created. this may happen in case of low
+ // production rate (e.g. ping at 1pps)
+
+ // if a future nack was also retransmitted add it to the timeout_or_nacks
+ // set
+ if (is_rtx) timeouts_or_nacks_.insert(nack_segment);
+
+ // the client is asking for content in the future
+ // switch to in sync state and decrease the window
+ current_state_ = SyncState::in_sync;
+ decreaseSyncWindow();
+ }
+}
+
+void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
+ bool valid = state_->onProbePacketReceived(content_object);
+ if(!valid) return;
+
+ struct nack_packet_t *probe =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = probe->getProductionSegement();
+
+ // as for the nacks set next_segment_
+ TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ ldr_->onProbePacketReceived(content_object);
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("Received content object of size: %zu",
+ content_object.payloadSize());
+ uint32_t payload_size = content_object.payloadSize();
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onProbe(content_object);
+ return;
+ }
+
+ if (payload_size == NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onNack(content_object);
+ return;
+ }
+
+ TRANSPORT_LOGD("Received content %u", segment_number);
+
+ rc_->onDataPacketReceived(content_object);
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(segment_number);
+ if (tn_it != timeouts_or_nacks_.end()) {
+ compute_stats = false;
+ timeouts_or_nacks_.erase(tn_it);
+ }
+ if (ldr_->isRtx(segment_number)) {
+ compute_stats = false;
+ }
+
+ // check if the packet was already received
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ state_->onDataPacketReceived(content_object, compute_stats);
+ ldr_->onDataPacketReceived(content_object);
+
+ // if the stat for this seq number is received do not send the packet to app
+ if (state != PacketState::RECEIVED) {
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ reassemble(content_object);
+ } else {
+ TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ }
+
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::sendStatsToApp(
+ uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
+ uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ if (*stats_summary_) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(state_->getQueuing());
+
+ // stats_->updateInterestFecTx(0); //todo must be implemented
+ // stats_->updateBytesFecRecv(0); //todo must be implemented
+
+ stats_->updateRetxCount(retx_count);
+ stats_->updateBytesRecv(received_bytes);
+ stats_->updateInterestTx(sent_interests);
+ stats_->updateReceivedNacks(received_nacks);
+
+ stats_->updateAverageWindowSize(current_sync_win_);
+ stats_->updateLossRatio(state_->getLossRate());
+ stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateLostData(lost_data);
+ stats_->updateRecoveredData(recovered_losses);
+ stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ }
+}
+
+void RTCTransportProtocol::reassemble(ContentObject &content_object) {
+ auto read_buffer = content_object.getPayload();
+ TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
+ read_buffer->trimStart(DATA_HEADER_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h
new file mode 100644
index 000000000..596887067
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/datagram_reassembly.h>
+#include <protocols/rtc/rtc_ldr.h>
+#include <protocols/rtc/rtc_rc.h>
+#include <protocols/rtc/rtc_state.h>
+#include <protocols/transport_protocol.h>
+
+#include <unordered_set>
+#include <vector>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCTransportProtocol : public TransportProtocol,
+ public DatagramReassembly {
+ public:
+ RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket);
+
+ ~RTCTransportProtocol();
+
+ using TransportProtocol::start;
+
+ using TransportProtocol::stop;
+
+ void resume() override;
+
+ private:
+ enum class SyncState { catch_up = 0, in_sync = 1, last };
+
+ private:
+ // setup functions
+ void initParams();
+ void reset() override;
+
+ void inactiveProducer();
+
+ // protocol functions
+ void discoveredRtt();
+ void newRound();
+
+ // window functions
+ void computeMaxSyncWindow();
+ void updateSyncWindow();
+ void decreaseSyncWindow();
+
+ // packet functions
+ void sendInterest(Name *interest_name);
+ void sendRtxInterest(uint32_t seq);
+ void sendProbeInterest(uint32_t seq);
+ void scheduleNextInterests() override;
+ void onTimeout(Interest::Ptr &&interest) override;
+ void onNack(const ContentObject &content_object);
+ void onProbe(const ContentObject &content_object);
+ void reassemble(ContentObject &content_object) override;
+ void onContentObject(Interest &interest,
+ ContentObject &content_object) override;
+ void onPacketDropped(Interest &interest,
+ ContentObject &content_object) override {}
+ void onReassemblyFailed(std::uint32_t missing_segment) override {}
+
+ // interaction with app functions
+ void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes,
+ uint32_t sent_interests, uint32_t lost_data,
+ uint32_t recovered_losses, uint32_t received_nacks);
+ // protocol state
+ bool start_send_interest_;
+ SyncState current_state_;
+ // cwin vars
+ uint32_t current_sync_win_;
+ uint32_t max_sync_win_;
+
+ // controller var
+ std::unique_ptr<asio::steady_timer> round_timer_;
+ std::unique_ptr<asio::steady_timer> scheduler_timer_;
+ bool scheduler_timer_on_;
+
+ // timeouts
+ std::unordered_set<uint32_t> timeouts_or_nacks_;
+
+ // names/packets var
+ uint32_t next_segment_;
+
+ std::shared_ptr<RTCState> state_;
+ std::shared_ptr<RTCRateControl> rc_;
+ std::shared_ptr<RTCLossDetectionAndRecovery> ldr_;
+
+ uint32_t number_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h
new file mode 100644
index 000000000..0cf9516ab
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_consts.h
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <protocols/rtc/rtc_packet.h>
+#include <stdint.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// used in rtc
+// protocol consts
+const uint32_t ROUND_LEN = 200;
+// ms interval of time on which
+// we take decisions / measurements
+const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8;
+// how big (in ms) should be the buffer at the producer.
+// increasing this number we increase the time that an
+// interest will wait for the data packet to be produced
+// at the producer socket
+const uint32_t PRODUCER_BUFFER_MS = 200; // ms
+
+// interest scheduler
+const uint32_t MAX_INTERESTS_IN_BATCH = 5;
+const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec
+
+// packet const
+const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes
+const uint32_t RTC_INTEREST_LIFETIME = 1000;
+
+// probes sequence range
+const uint32_t MIN_PROBE_SEQ = 0xefffffff;
+const uint32_t MIN_RTT_PROBE_SEQ = MIN_PROBE_SEQ;
+const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1;
+// RTT_PROBE_INTERVAL will be used during the section while
+// INIT_RTT_PROBE_INTERVAL is used at the beginning to
+// quickily estimate the RTT
+const uint32_t RTT_PROBE_INTERVAL = 200000; // us
+const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us
+const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT
+// if the produdcer is not yet started we need to probe multple times
+// to get an answer. we wait 100ms between each try
+const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms
+// once we get the first probe we wait at most 60ms for the others
+const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms
+// we reuires at least 5 probes to be recevied
+const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms
+const uint32_t MAX_PENDING_PROBES = 10;
+
+
+// congestion
+const double MAX_QUEUING_DELAY = 100.0; // ms
+
+// data from cache
+const double MAX_DATA_FROM_CACHE = 0.25; // 25%
+
+// window const
+const uint32_t INITIAL_WIN = 5; // pkts
+const uint32_t INITIAL_WIN_MAX = 1000000; // pkts
+const uint32_t WIN_MIN = 5; // pkts
+const double CATCH_UP_WIN_INCREMENT = 1.2;
+// used in rate control
+const double WIN_DECREASE_FACTOR = 0.5;
+const double WIN_INCREASE_FACTOR = 1.5;
+
+// round in congestion
+const double ROUNDS_BEFORE_TAKE_ACTION = 5;
+
+// used in state
+const uint8_t ROUNDS_IN_SYNC_BEFORE_SWITCH = 3;
+const double PRODUCTION_RATE_FRACTION = 0.8;
+
+const uint32_t INIT_PACKET_SIZE = 1200;
+
+const double MOVING_AVG_ALPHA = 0.8;
+
+const double MILLI_IN_A_SEC = 1000.0;
+const double MICRO_IN_A_SEC = 1000000.0;
+
+const double MAX_CACHED_PACKETS = 262144; // 2^18
+ // about 50 sec of traffic at 50Mbps
+ // with 1200 bytes packets
+
+const uint32_t MAX_ROUND_WHIOUT_PACKETS =
+ (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds;
+
+// used in ldr
+const uint32_t RTC_MAX_RTX = 100;
+const uint32_t RTC_MAX_AGE = 60000; // in ms
+const uint64_t MAX_TIMER_RTX = ~0;
+const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms
+const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets
+const double CATCH_UP_RTT_INCREMENT = 1.2;
+
+// used by producer
+const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms
+const uint32_t MIN_PRODUCTION_RATE = 10; // pps
+ // min prod rate
+ // set running several test
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc
new file mode 100644
index 000000000..c098088a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.cc
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_data_path.h>
+#include <stdlib.h>
+
+#include <algorithm>
+#include <cfloat>
+#include <chrono>
+
+#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCDataPath::RTCDataPath(uint32_t path_id)
+ : path_id_(path_id),
+ min_rtt(UINT_MAX),
+ prev_min_rtt(UINT_MAX),
+ min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
+ // real OWD, but the measured one, that depends on the
+ // clock of sender and receiver. the only meaningful
+ // value is is the queueing delay. for this reason we
+ // keep both RTT (for the windowd calculation) and OWD
+ // (for congestion/quality control)
+ prev_min_owd(INT_MAX),
+ avg_owd(DBL_MAX),
+ queuing_delay(DBL_MAX),
+ jitter_(0.0),
+ last_owd_(0),
+ largest_recv_seq_(0),
+ largest_recv_seq_time_(0),
+ avg_inter_arrival_(DBL_MAX),
+ received_nacks_(false),
+ received_packets_(false),
+ rounds_without_packets_(0),
+ last_received_data_packet_(0),
+ RTT_history_(HISTORY_LEN),
+ OWD_history_(HISTORY_LEN){};
+
+void RTCDataPath::insertRttSample(uint64_t rtt) {
+ // for the rtt we only keep track of the min one
+ if (rtt < min_rtt) min_rtt = rtt;
+ last_received_data_packet_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+}
+
+void RTCDataPath::insertOwdSample(int64_t owd) {
+ // for owd we use both min and avg
+ if (owd < min_owd) min_owd = owd;
+
+ if (avg_owd != DBL_MAX)
+ avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
+ else {
+ avg_owd = owd;
+ }
+
+ int64_t queueVal = owd - std::min(getMinOwd(), min_owd);
+
+ if (queuing_delay != DBL_MAX)
+ queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC);
+ else {
+ queuing_delay = queueVal;
+ }
+
+ // keep track of the jitter computed as for RTP (RFC 3550)
+ int64_t diff = std::abs(owd - last_owd_);
+ last_owd_ = owd;
+ jitter_ += (1.0 / 16.0) * ((double)diff - jitter_);
+
+ // owd is computed only for valid data packets so we count only
+ // this for decide if we recevie traffic or not
+ received_packets_ = true;
+}
+
+void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) {
+ // got packet in sequence, compute gap
+ if (largest_recv_seq_ == (segment_number - 1)) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t delta = now - largest_recv_seq_time_;
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ = now;
+ if (avg_inter_arrival_ == DBL_MAX)
+ avg_inter_arrival_ = delta;
+ else
+ avg_inter_arrival_ =
+ (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC);
+ return;
+ }
+
+ // ooo packet, update the stasts if needed
+ if (largest_recv_seq_ <= segment_number) {
+ largest_recv_seq_ = segment_number;
+ largest_recv_seq_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+}
+
+void RTCDataPath::receivedNack() { received_nacks_ = true; }
+
+double RTCDataPath::getInterArrivalGap() {
+ if (avg_inter_arrival_ == DBL_MAX) return 0;
+ return avg_inter_arrival_;
+}
+
+bool RTCDataPath::isActive() {
+ if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
+ return true;
+ return false;
+}
+
+bool RTCDataPath::pathToProducer() {
+ if (received_nacks_) return true;
+ return false;
+}
+
+void RTCDataPath::roundEnd() {
+ // reset min_rtt and add it to the history
+ if (min_rtt != UINT_MAX) {
+ prev_min_rtt = min_rtt;
+ } else {
+ // this may happen if we do not receive any packet
+ // from this path in the last round. in this case
+ // we use the measure from the previuos round
+ min_rtt = prev_min_rtt;
+ }
+
+ if (min_rtt == 0) min_rtt = 1;
+
+ RTT_history_.pushBack(min_rtt);
+ min_rtt = UINT_MAX;
+
+ // do the same for min owd
+ if (min_owd != INT_MAX) {
+ prev_min_owd = min_owd;
+ } else {
+ min_owd = prev_min_owd;
+ }
+
+ if (min_owd != INT_MAX) {
+ OWD_history_.pushBack(min_owd);
+ min_owd = INT_MAX;
+ }
+
+ if (!received_packets_)
+ rounds_without_packets_++;
+ else
+ rounds_without_packets_ = 0;
+ received_packets_ = false;
+}
+
+uint32_t RTCDataPath::getPathId() { return path_id_; }
+
+double RTCDataPath::getQueuingDealy() { return queuing_delay; }
+
+uint64_t RTCDataPath::getMinRtt() {
+ if (RTT_history_.size() != 0) return RTT_history_.begin();
+ return 0;
+}
+
+int64_t RTCDataPath::getMinOwd() {
+ if (OWD_history_.size() != 0) return OWD_history_.begin();
+ return 0;
+}
+
+double RTCDataPath::getJitter() { return jitter_; }
+
+uint64_t RTCDataPath::getLastPacketTS() { return last_received_data_packet_; }
+
+void RTCDataPath::clearRtt() { RTT_history_.clear(); }
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h
new file mode 100644
index 000000000..c5c37fc0d
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_data_path.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <utils/min_filter.h>
+
+#include <climits>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+const double ALPHA_RTC = 0.125;
+const uint32_t HISTORY_LEN = 20; // 4 sec
+
+class RTCDataPath {
+ public:
+ RTCDataPath(uint32_t path_id);
+
+ public:
+ void insertRttSample(uint64_t rtt);
+ void insertOwdSample(int64_t owd);
+ void computeInterArrivalGap(uint32_t segment_number);
+ void receivedNack();
+
+ uint32_t getPathId();
+ uint64_t getMinRtt();
+ double getQueuingDealy();
+ double getInterArrivalGap();
+ double getJitter();
+ bool isActive();
+ bool pathToProducer();
+ uint64_t getLastPacketTS();
+
+ void clearRtt();
+
+ void roundEnd();
+
+ private:
+ uint32_t path_id_;
+
+ int64_t getMinOwd();
+
+ uint64_t min_rtt;
+ uint64_t prev_min_rtt;
+
+ int64_t min_owd;
+ int64_t prev_min_owd;
+
+ double avg_owd;
+
+ double queuing_delay;
+
+ double jitter_;
+ int64_t last_owd_;
+
+ uint32_t largest_recv_seq_;
+ uint64_t largest_recv_seq_time_;
+ double avg_inter_arrival_;
+
+ // flags to check if a path is active
+ // we considere a path active if it reaches a producer
+ //(not a cache) --aka we got at least one nack on this path--
+ // and if we receives packets
+ bool received_nacks_;
+ bool received_packets_;
+ uint8_t rounds_without_packets_; // if we don't get any packet
+ // for MAX_ROUNDS_WITHOUT_PKTS
+ // we consider the path inactive
+ uint64_t last_received_data_packet_; // timestamp for the last data received
+ // on this path
+
+ utils::MinFilter<uint64_t> RTT_history_;
+ utils::MinFilter<int64_t> OWD_history_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc
new file mode 100644
index 000000000..e91b29c04
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.cc
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_ldr.h>
+
+#include <algorithm>
+#include <unordered_set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery(
+ SendRtxCallback &&callback, asio::io_service &io_service)
+ : rtx_on_(false),
+ next_rtx_timer_(MAX_TIMER_RTX),
+ last_event_(0),
+ sentinel_timer_interval_(MAX_TIMER_RTX),
+ send_rtx_callback_(std::move(callback)) {
+ timer_ = std::make_unique<asio::steady_timer>(io_service);
+ sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service);
+}
+
+RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {}
+
+void RTCLossDetectionAndRecovery::turnOnRTX() {
+ rtx_on_ = true;
+ scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT);
+}
+
+void RTCLossDetectionAndRecovery::turnOffRTX() {
+ rtx_on_ = false;
+ clear();
+}
+
+void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) {
+ // always add timeouts to the RTX list to avoid to send the same packet as if
+ // it was not a rtx
+ addToRetransmissions(seq, seq + 1);
+ last_event_ = getNow();
+}
+
+void RTCLossDetectionAndRecovery::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ last_event_ = getNow();
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (deleteRtx(seq)) {
+ state_->onPacketRecovered(seq);
+ } else {
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ TRANSPORT_LOGD("received data. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onNackPacketReceived(
+ const core::ContentObject &nack) {
+ last_event_ = getNow();
+
+ uint32_t seq = nack.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+
+ if (production_seq > seq) {
+ // this is a past nack, all data before productionSeq are lost. if
+ // productionSeq > state_->getHighestSeqReceivedInOrder() is impossible to
+ // recover any packet. If this is not the case we can try to recover the
+ // packets between state_->getHighestSeqReceivedInOrder() and productionSeq.
+ // e.g.: the client receives packets 8 10 11 9 where 9 is a nack with
+ // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and
+ // 14 that are not arrived yet
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received past nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ } else {
+ // future nack. here there should be a gap between the last data received
+ // and this packet and is it possible to recover the packets between the
+ // last received data and the production seq. we should not use the seq
+ // number of the nack since we know that is too early to ask for this seq
+ // number
+ // e.g.: // e.g.: the client receives packets 10 11 12 20 where 20 is a nack
+ // with productionSeq = 18. this says that all the packets between 12 and 18
+ // may got lost and we should ask them
+ deleteRtx(seq);
+ TRANSPORT_LOGD("received futrue nack. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::onProbePacketReceived(
+ const core::ContentObject &probe) {
+ // we don't log the reception of a probe packet for the sentinel timer because
+ // probes are not taken into account into the sync window. we use them as
+ // future nacks to detect possible packets lost
+ if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ TRANSPORT_LOGD("received probe. add from %u to %u ",
+ state_->getHighestSeqReceivedInOrder() + 1, production_seq);
+ addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1,
+ production_seq);
+}
+
+void RTCLossDetectionAndRecovery::clear() {
+ rtx_state_.clear();
+ rtx_timers_.clear();
+ sentinel_timer_->cancel();
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ timer_->cancel();
+ }
+}
+
+void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start,
+ uint32_t stop) {
+ // skip nacked packets
+ if (start <= state_->getLastSeqNacked()) {
+ start = state_->getLastSeqNacked() + 1;
+ }
+
+ // skip received or lost packets
+ if (start <= state_->getHighestSeqReceivedInOrder()) {
+ start = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ for (uint32_t seq = start; seq < stop; seq++) {
+ if (!isRtx(seq) && // is not already an rtx
+ // is not received or lost
+ state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) {
+ // add rtx
+ rtxState state;
+ state.first_send_ = state_->getInterestSentTime(seq);
+ if (state.first_send_ == 0) // this interest was never sent before
+ state.first_send_ = getNow();
+ state.next_send_ = computeNextSend(seq, true);
+ state.rtx_count_ = 0;
+ TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq,
+ (state.next_send_ - getNow()));
+ rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state));
+ rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq));
+ }
+ }
+ scheduleNextRtx();
+}
+
+uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq,
+ bool new_rtx) {
+ uint64_t now = getNow();
+ if (new_rtx) {
+ // for the new rtx we wait one estimated IAT after the loss detection. this
+ // is bacause, assuming that packets arrive with a constant IAT, we should
+ // get a new packet every IAT
+ double prod_rate = state_->getProducerRate();
+ uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL;
+ uint32_t jitter = 0;
+
+ if (prod_rate != 0) {
+ double packet_size = state_->getAveragePacketSize();
+ estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ jitter = ceil(state_->getJitter());
+ }
+
+ uint32_t wait = estimated_iat + jitter;
+ TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter);
+
+ return now + wait;
+ } else {
+ // wait one RTT
+ // however if the IAT is larger than the RTT, wait one IAT
+ uint32_t wait = SENTINEL_TIMER_INTERVAL;
+
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate == 0) {
+ return now + SENTINEL_TIMER_INTERVAL;
+ }
+
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+
+ uint64_t rtt = state_->getRTT();
+ if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL;
+ wait = rtt;
+
+ if (estimated_iat > rtt) wait = estimated_iat;
+
+ uint32_t jitter = ceil(state_->getJitter());
+ wait += jitter;
+
+ // it may happen that the channel is congested and we have some additional
+ // queuing delay to take into account
+ uint32_t queue = ceil(state_->getQueuing());
+ wait += queue;
+
+ TRANSPORT_LOGD(
+ "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u",
+ seq, wait, state_->getRTT(), estimated_iat, jitter, queue);
+
+ return now + wait;
+ }
+}
+
+void RTCLossDetectionAndRecovery::retransmit() {
+ if (rtx_timers_.size() == 0) return;
+
+ uint64_t now = getNow();
+
+ auto it = rtx_timers_.begin();
+ std::unordered_set<uint32_t> lost_pkt;
+ uint32_t sent_counter = 0;
+ while (it != rtx_timers_.end() && it->first <= now &&
+ sent_counter < MAX_INTERESTS_IN_BATCH) {
+ uint32_t seq = it->second;
+ auto rtx_it =
+ rtx_state_.find(seq); // this should always return a valid iter
+ if (rtx_it->second.rtx_count_ >= RTC_MAX_RTX ||
+ (now - rtx_it->second.first_send_) >= RTC_MAX_AGE ||
+ seq < state_->getLastSeqNacked()) {
+ // max rtx reached or packet too old or packet nacked, this packet is lost
+ TRANSPORT_LOGD(
+ "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u",
+ seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX),
+ ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE),
+ (seq < state_->getLastSeqNacked()));
+ lost_pkt.insert(seq);
+ it++;
+ } else {
+ // resend the packet
+ state_->onRetransmission(seq);
+ double prod_rate = state_->getProducerRate();
+ if (prod_rate != 0) rtx_it->second.rtx_count_++;
+ rtx_it->second.next_send_ = computeNextSend(seq, false);
+ it = rtx_timers_.erase(it);
+ rtx_timers_.insert(
+ std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq));
+ TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq,
+ (rtx_it->second.next_send_ - now));
+ send_rtx_callback_(seq);
+ sent_counter++;
+ }
+ }
+
+ // remove packets if needed
+ for (auto lost_it = lost_pkt.begin(); lost_it != lost_pkt.end(); lost_it++) {
+ uint32_t seq = *lost_it;
+ state_->onPacketLost(seq);
+ deleteRtx(seq);
+ }
+}
+
+void RTCLossDetectionAndRecovery::scheduleNextRtx() {
+ if (rtx_timers_.size() == 0) {
+ // all the rtx were removed, reset timer
+ next_rtx_timer_ = MAX_TIMER_RTX;
+ return;
+ }
+
+ // check if timer is alreay set
+ if (next_rtx_timer_ != MAX_TIMER_RTX) {
+ // a new check for rtx is already scheduled
+ if (next_rtx_timer_ > rtx_timers_.begin()->first) {
+ // we need to re-schedule it
+ timer_->cancel();
+ } else {
+ // wait for the next timer
+ return;
+ }
+ }
+
+ // set a new timer
+ next_rtx_timer_ = rtx_timers_.begin()->first;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t wait = 1;
+ if (next_rtx_timer_ != MAX_TIMER_RTX && next_rtx_timer_ > now)
+ wait = next_rtx_timer_ - now;
+
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ timer_->expires_from_now(std::chrono::milliseconds(wait));
+ timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->retransmit();
+ s->next_rtx_timer_ = MAX_TIMER_RTX;
+ s->scheduleNextRtx();
+ }
+ });
+}
+
+bool RTCLossDetectionAndRecovery::deleteRtx(uint32_t seq) {
+ auto it_rtx = rtx_state_.find(seq);
+ if (it_rtx == rtx_state_.end()) return false; // rtx not found
+
+ uint64_t ts = it_rtx->second.next_send_;
+ auto it_timers = rtx_timers_.find(ts);
+ while (it_timers != rtx_timers_.end() && it_timers->first == ts) {
+ if (it_timers->second == seq) {
+ rtx_timers_.erase(it_timers);
+ break;
+ }
+ it_timers++;
+ }
+
+ bool lost = it_rtx->second.rtx_count_ > 0;
+ rtx_state_.erase(it_rtx);
+
+ return lost;
+}
+
+void RTCLossDetectionAndRecovery::scheduleSentinelTimer(
+ uint64_t expires_from_now) {
+ std::weak_ptr<RTCLossDetectionAndRecovery> self(shared_from_this());
+ sentinel_timer_->expires_from_now(
+ std::chrono::milliseconds(expires_from_now));
+ sentinel_timer_->async_wait([self](std::error_code ec) {
+ if (ec) return;
+ if (auto s = self.lock()) {
+ s->sentinelTimer();
+ }
+ });
+}
+
+void RTCLossDetectionAndRecovery::sentinelTimer() {
+ uint64_t now = getNow();
+
+ bool expired = false;
+ bool sent = false;
+ if ((now - last_event_) >= sentinel_timer_interval_) {
+ // at least a sentinel_timer_interval_ elapsed since last event
+ expired = true;
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ // this happens at the beginning (or if the producer stops for some
+ // reason) we need to keep sending interest 0 until we get an answer
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is not active, send packet 0");
+ state_->onRetransmission(0);
+ send_rtx_callback_(0);
+ } else {
+ TRANSPORT_LOGD(
+ "sentinel timer: the producer is active, send the 10 oldest packets");
+ sent = true;
+ uint32_t rtx = 0;
+ auto it = state_->getPendingInterestsMapBegin();
+ auto end = state_->getPendingInterestsMapEnd();
+ while (it != end && rtx < MAX_RTX_WITH_SENTINEL) {
+ uint32_t seq = it->first;
+ TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq);
+ addToRetransmissions(seq, seq + 1);
+ rtx++;
+ it++;
+ }
+ }
+ } else {
+ // sentinel timer did not expire because we registered at least one event
+ }
+
+ uint32_t next_timer;
+ double prod_rate = state_->getProducerRate();
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) {
+ TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL);
+ next_timer = SENTINEL_TIMER_INTERVAL;
+ } else {
+ double prod_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size));
+ uint32_t jitter = ceil(state_->getJitter());
+
+ // try to reduce the number of timers if the estimated IAT is too small
+ next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1);
+ TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u",
+ next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat,
+ jitter);
+
+ if (!expired) {
+ // discount the amout of time that is already passed
+ uint32_t discount = now - last_event_;
+ if (next_timer > discount) {
+ next_timer = next_timer - discount;
+ } else {
+ // in this case we trigger the timer in 1 ms
+ next_timer = 1;
+ }
+ TRANSPORT_LOGD("timer after discout: %u", next_timer);
+ } else if (sent) {
+ // wait at least one producer stats interval + owd to check if the
+ // production rate is reducing.
+ uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing());
+ next_timer = std::max(next_timer, min_wait);
+ TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer);
+ }
+ }
+
+ scheduleSentinelTimer(next_timer);
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h
new file mode 100644
index 000000000..c0912303b
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_ldr.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <map>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCLossDetectionAndRecovery
+ : public std::enable_shared_from_this<RTCLossDetectionAndRecovery> {
+ struct rtx_state_ {
+ uint64_t first_send_;
+ uint64_t next_send_;
+ uint32_t rtx_count_;
+ };
+
+ using rtxState = struct rtx_state_;
+ using SendRtxCallback = std::function<void(uint32_t)>;
+
+ public:
+ RTCLossDetectionAndRecovery(SendRtxCallback &&callback,
+ asio::io_service &io_service);
+
+ ~RTCLossDetectionAndRecovery();
+
+ void setState(std::shared_ptr<RTCState> state) { state_ = state; }
+ void turnOnRTX();
+ void turnOffRTX();
+
+ void onTimeout(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+ void onNackPacketReceived(const core::ContentObject &nack);
+ void onProbePacketReceived(const core::ContentObject &probe);
+
+ void clear();
+
+ bool isRtx(uint32_t seq) {
+ if (rtx_state_.find(seq) != rtx_state_.end()) return true;
+ return false;
+ }
+
+ private:
+ void addToRetransmissions(uint32_t start, uint32_t stop);
+ uint64_t computeNextSend(uint32_t seq, bool new_rtx);
+ void retransmit();
+ void scheduleNextRtx();
+ bool deleteRtx(uint32_t seq);
+ void scheduleSentinelTimer(uint64_t expires_from_now);
+ void sentinelTimer();
+
+ uint64_t getNow() {
+ using namespace std::chrono;
+ uint64_t now =
+ duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
+ .count();
+ return now;
+ }
+
+ // this map keeps track of the retransmitted interest, ordered from the oldest
+ // to the newest one. the state contains the timer of the first send of the
+ // interest (from pendingIntetests_), the timer of the next send (key of the
+ // multimap) and the number of rtx
+ std::map<uint32_t, rtxState> rtx_state_;
+ // this map stored the rtx by timer. The key is the time at which the rtx
+ // should be sent, and the val is the interest seq number
+ std::multimap<uint64_t, uint32_t> rtx_timers_;
+
+ bool rtx_on_;
+ uint64_t next_rtx_timer_;
+ uint64_t last_event_;
+ uint64_t sentinel_timer_interval_;
+ std::unique_ptr<asio::steady_timer> timer_;
+ std::unique_ptr<asio::steady_timer> sentinel_timer_;
+ std::shared_ptr<RTCState> state_;
+
+ SendRtxCallback send_rtx_callback_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h
new file mode 100644
index 000000000..abb1323a3
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_packet.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+/* data packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | payload |
+ * | ... |
+ */
+
+/* nack packet
+ * +-----------------------------------------+
+ * | uint64_t: timestamp |
+ * | |
+ * +-----------------------------------------+
+ * | uint32_t: prod rate (bytes per sec) |
+ * +-----------------------------------------+
+ * | uint32_t: current seg in production |
+ * +-----------------------------------------+
+ */
+
+#pragma once
+#include <arpa/inet.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+inline uint64_t _ntohll(const uint64_t *input) {
+ uint64_t return_val;
+ uint8_t *tmp = (uint8_t *)&return_val;
+
+ tmp[0] = *input >> 56;
+ tmp[1] = *input >> 48;
+ tmp[2] = *input >> 40;
+ tmp[3] = *input >> 32;
+ tmp[4] = *input >> 24;
+ tmp[5] = *input >> 16;
+ tmp[6] = *input >> 8;
+ tmp[7] = *input >> 0;
+
+ return return_val;
+}
+
+inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); }
+
+const uint32_t DATA_HEADER_SIZE = 12; // bytes
+ // XXX: sizeof(data_packet_t) is 16
+ // beacuse of padding
+const uint32_t NACK_HEADER_SIZE = 16;
+
+struct data_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+};
+
+struct nack_packet_t {
+ uint64_t timestamp;
+ uint32_t prod_rate;
+ uint32_t prod_seg;
+
+ inline uint64_t getTimestamp() const { return _ntohll(&timestamp); }
+ inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); }
+
+ inline uint32_t getProductionRate() const { return ntohl(prod_rate); }
+ inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); }
+
+ inline uint32_t getProductionSegement() const { return ntohl(prod_seg); }
+ inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); }
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc.h b/libtransport/src/protocols/rtc/rtc_rc.h
new file mode 100644
index 000000000..34d090092
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControl : public std::enable_shared_from_this<RTCRateControl> {
+ public:
+ RTCRateControl()
+ : rc_on_(false),
+ congestion_win_(1000000), // init the win to a large number
+ congestion_state_(CongestionState::Normal),
+ protocol_state_(nullptr) {}
+
+ virtual ~RTCRateControl() = default;
+
+ void turnOnRateControl() { rc_on_ = true; }
+ void setState(std::shared_ptr<RTCState> state) { protocol_state_ = state; };
+ uint32_t getCongesionWindow() { return congestion_win_; };
+
+ virtual void onNewRound(double round_len) = 0;
+ virtual void onDataPacketReceived(
+ const core::ContentObject &content_object) = 0;
+
+ protected:
+ enum class CongestionState { Normal = 0, Underuse = 1, Congested = 2, Last };
+
+ protected:
+ bool rc_on_;
+ uint32_t congestion_win_;
+ CongestionState congestion_state_;
+
+ std::shared_ptr<RTCState> protocol_state_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
new file mode 100644
index 000000000..b577b5bea
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.cc
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_frame.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {}
+
+RTCRateControlFrame::~RTCRateControlFrame() {}
+
+void RTCRateControlFrame::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ CongestionState prev_congestion_state = congestion_state_;
+ cc_detector_.updateStats();
+ congestion_state_ = (CongestionState)cc_detector_.getState();
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // congestion detected, notify app and init congestion win
+ double prod_rate = protocol_state_->getReceivedRate();
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+
+ if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) {
+ // TODO do something
+ return;
+ }
+
+ congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ }
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ return;
+ }
+}
+
+void RTCRateControlFrame::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ if (!rc_on_) return;
+
+ uint32_t seq = content_object.getName().getSuffix();
+ if (!protocol_state_->isPending(seq)) return;
+
+ cc_detector_.addPacket(content_object);
+}
+
+void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts,
+ uint64_t last_probe_ts,
+ uint32_t total_probes) {
+ // TODO
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h
new file mode 100644
index 000000000..25d5ddbb6
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_frame.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <protocols/rtc/congestion_detection.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlFrame : public RTCRateControl {
+ public:
+ RTCRateControlFrame();
+
+ ~RTCRateControlFrame();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts,
+ uint32_t total_probes);
+
+ private:
+ CongestionDetection cc_detector_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
new file mode 100644
index 000000000..a1c89e329
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCRateControlQueue::RTCRateControlQueue()
+ : rounds_since_last_drop_(0),
+ rounds_without_congestion_(0),
+ last_queue_(0) {}
+
+RTCRateControlQueue::~RTCRateControlQueue() {}
+
+void RTCRateControlQueue::onNewRound(double round_len) {
+ if (!rc_on_) return;
+
+ double received_rate = protocol_state_->getReceivedRate();
+ double target_rate =
+ protocol_state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = protocol_state_->getAveragePacketSize();
+ double queue = protocol_state_->getQueuing();
+
+ if (rtt == 0.0) return; // no info from the producer
+
+ CongestionState prev_congestion_state = congestion_state_;
+
+ if (prev_congestion_state == CongestionState::Normal &&
+ received_rate >= target_rate) {
+ // if the queue is high in this case we are most likelly fighting with
+ // a TCP flow and there is enough bandwidth to match the producer rate
+ congestion_state_ = CongestionState::Normal;
+ } else if (queue > MAX_QUEUING_DELAY || last_queue_ == queue) {
+ // here we detect congestion. in the case that last_queue == queue
+ // the consumer didn't receive any packet from the producer so we
+ // consider this case as congestion
+ // TODO: wath happen in case of high loss rate?
+ congestion_state_ = CongestionState::Congested;
+ } else {
+ // nothing bad is happening
+ congestion_state_ = CongestionState::Normal;
+ }
+
+ last_queue_ = queue;
+
+ if (congestion_state_ == CongestionState::Congested) {
+ if (prev_congestion_state == CongestionState::Normal) {
+ // init the congetion window using the received rate
+ congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size);
+ rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1;
+ }
+
+ if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) {
+ uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR;
+ congestion_win_ = std::max(win, WIN_MIN);
+ rounds_since_last_drop_ = 0;
+ return;
+ }
+
+ rounds_since_last_drop_++;
+ }
+
+ if (congestion_state_ == CongestionState::Normal) {
+ if (prev_congestion_state == CongestionState::Congested) {
+ rounds_without_congestion_ = 0;
+ }
+
+ rounds_without_congestion_++;
+ if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return;
+
+ congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR;
+ congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX);
+ }
+}
+
+void RTCRateControlQueue::onDataPacketReceived(
+ const core::ContentObject &content_object) {
+ // nothing to do
+ return;
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.h b/libtransport/src/protocols/rtc/rtc_rc_queue.h
new file mode 100644
index 000000000..407354d43
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_rc_queue.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/utils/shared_ptr_utils.h>
+#include <protocols/rtc/rtc_rc.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class RTCRateControlQueue : public RTCRateControl {
+ public:
+ RTCRateControlQueue();
+
+ ~RTCRateControlQueue();
+
+ void onNewRound(double round_len);
+ void onDataPacketReceived(const core::ContentObject &content_object);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ uint32_t rounds_since_last_drop_;
+ uint32_t rounds_without_congestion_;
+ double last_queue_;
+};
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
new file mode 100644
index 000000000..eabf8942c
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -0,0 +1,560 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_state.h>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service)
+ : rtt_probes_(std::make_shared<ProbeHandler>(
+ std::move(rtt_probes_callback), io_service)),
+ discovered_rtt_callback_(std::move(discovered_rtt_callback)) {
+ init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service);
+ initParams();
+}
+
+RTCState::~RTCState() {}
+
+void RTCState::initParams() {
+ // packets counters (total)
+ sent_interests_ = 0;
+ sent_rtx_ = 0;
+ received_data_ = 0;
+ received_nacks_ = 0;
+ received_timeouts_ = 0;
+ received_probes_ = 0;
+
+ // loss counters
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = 0;
+ highest_seq_received_ = 0;
+ highest_seq_received_in_order_ = 0;
+ last_seq_nacked_ = 0;
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ // bw counters
+ received_bytes_ = 0;
+ avg_packet_size_ = INIT_PACKET_SIZE;
+ production_rate_ = 0.0;
+ received_rate_ = 0.0;
+
+ // nack counter
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ // packets counter
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ data_from_cache_rate_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ // round conunters
+ rounds_ = 0;
+ rounds_without_nacks_ = 0;
+ rounds_without_packets_ = 0;
+
+ last_production_seq_ = 0;
+ producer_is_active_ = false;
+ last_prod_update_ = 0;
+
+ // paths stats
+ path_table_.clear();
+ main_path_ = nullptr;
+
+ // packet received
+ received_or_lost_packets_.clear();
+
+ // pending interests
+ pending_interests_.clear();
+
+ // init rtt
+ first_interest_sent_ = ~0;
+ init_rtt_ = false;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+}
+
+// packet events
+void RTCState::onSendNewInterest(const core::Name *interest_name) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint32_t seq = interest_name->getSuffix();
+ pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now));
+
+ if(sent_interests_ == 0) first_interest_sent_ = now;
+
+ sent_interests_++;
+ sent_interests_last_round_++;
+}
+
+void RTCState::onTimeout(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ }
+ received_timeouts_++;
+}
+
+void RTCState::onRetransmission(uint32_t seq) {
+ // remove the interest for the pendingInterest map only after the first rtx.
+ // in this way we can handle the ooo packets that come in late as normla
+ // packet. we consider a packet lost only if we sent at least an RTX for it.
+ // XXX this may become problematic if we stop the RTX transmissions
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ pending_interests_.erase(it);
+ packets_lost_++;
+ }
+ sent_rtx_++;
+ sent_rtx_last_round_++;
+}
+
+void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats) {
+ uint32_t seq = content_object.getName().getSuffix();
+ if (compute_stats) {
+ updatePathStats(content_object, false);
+ received_data_last_round_++;
+ }
+ received_data_++;
+
+ struct data_packet_t *data_pkt =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t production_time = data_pkt->getTimestamp();
+ if (last_prod_update_ < production_time) {
+ last_prod_update_ = production_time;
+ uint32_t production_rate = data_pkt->getProductionRate();
+ production_rate_ = (double)production_rate;
+ }
+
+ updatePacketSize(content_object);
+ updateReceivedBytes(content_object);
+ addRecvOrLost(seq, PacketState::RECEIVED);
+
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+
+ // the producer is responding
+ // it is generating valid data packets so we consider it active
+ producer_is_active_ = true;
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats) {
+ uint32_t seq = nack.getName().getSuffix();
+ struct nack_packet_t *nack_pkt =
+ (struct nack_packet_t *)nack.getPayload()->data();
+ uint64_t production_time = nack_pkt->getTimestamp();
+ uint32_t production_seq = nack_pkt->getProductionSegement();
+ uint32_t production_rate = nack_pkt->getProductionRate();
+
+ if (TRANSPORT_EXPECT_FALSE(main_path_ == nullptr) ||
+ last_prod_update_ < production_time) {
+ // update production rate
+ last_prod_update_ = production_time;
+ last_production_seq_ = production_seq;
+ production_rate_ = (double)production_rate;
+ }
+
+ if (compute_stats) {
+ // this is not an RTX
+ updatePathStats(nack, true);
+ nack_on_last_round_ = true;
+ }
+
+ // for statistics pourpose we log all nacks, also the one received for
+ // retransmitted packets
+ received_nacks_++;
+ received_nacks_last_round_++;
+
+ if (production_seq > seq) {
+ // old nack, seq is lost
+ // update last nacked
+ if (last_seq_nacked_ < seq) last_seq_nacked_ = seq;
+ TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq);
+ onPacketLost(seq);
+ } else if (seq > production_seq) {
+ // future nack
+ // remove the nack from the pending interest map
+ // (the packet is not received/lost yet)
+ pending_interests_.erase(seq);
+ } else {
+ // this should be a quite rear event. simply remove the
+ // packet from the pending interest list
+ pending_interests_.erase(seq);
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence number is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ received_packets_last_round_++;
+}
+
+void RTCState::onPacketLost(uint32_t seq) {
+ TRANSPORT_LOGD("packet %u is lost", seq);
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) {
+ // this packet was never retransmitted so it does
+ // not appear in the loss count
+ packets_lost_++;
+ }
+ addRecvOrLost(seq, PacketState::LOST);
+}
+
+void RTCState::onPacketRecovered(uint32_t seq) {
+ losses_recovered_++;
+ addRecvOrLost(seq, PacketState::RECEIVED);
+}
+
+bool RTCState::onProbePacketReceived(const core::ContentObject &probe) {
+ uint32_t seq = probe.getName().getSuffix();
+ uint64_t rtt;
+
+ rtt = rtt_probes_->getRtt(seq);
+
+ if (rtt == 0) return false; // this is not a valid probe
+
+ // like for data and nacks update the path stats. Here the RTT is computed
+ // by the probe handler. Both probes for rtt and bw are good to esimate
+ // info on the path
+ uint32_t path_label = probe.getPathLabel();
+
+ auto path_it = path_table_.find(path_label);
+
+ // update production rate and last_seq_nacked like in case of a nack
+ struct nack_packet_t *probe_pkt =
+ (struct nack_packet_t *)probe.getPayload()->data();
+ uint64_t sender_timestamp = probe_pkt->getTimestamp();
+ uint32_t production_seq = probe_pkt->getProductionSegement();
+ uint32_t production_rate = probe_pkt->getProductionRate();
+
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ path->insertRttSample(rtt);
+ path->receivedNack();
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ if (last_prod_update_ < sender_timestamp) {
+ last_production_seq_ = production_seq;
+ last_prod_update_ = sender_timestamp;
+ production_rate_ = (double)production_rate;
+ }
+
+ // the producer is responding
+ // we consider it active only if the production rate is not 0
+ // or the production sequence numner is not 1
+ if (production_rate_ != 0 || production_seq != 1) {
+ producer_is_active_ = true;
+ }
+
+ // check for init RTT. if received_probes_ is equal to 0 schedule a timer to
+ // wait for the INIT_RTT_PROBES. in this way if some probes get lost we don't
+ // wait forever
+ received_probes_++;
+
+ if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){
+ if(received_probes_ == 1){
+ // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others
+ main_path_ = path;
+ setInitRttTimer(INIT_RTT_PROBE_WAIT);
+ }
+ if(received_probes_ == INIT_RTT_PROBES) {
+ // we are done
+ init_rtt_timer_->cancel();
+ checkInitRttTimer();
+ }
+ }
+
+ received_packets_last_round_++;
+
+ // ignore probes sent before the first interest
+ if((now - rtt) <= first_interest_sent_) return false;
+ return true;
+}
+
+void RTCState::onNewRound(double round_len, bool in_sync) {
+ // XXX
+ // here we take into account only the single path case so we assume that we
+ // don't use two paths in parellel for this single flow
+
+ if (path_table_.empty()) return;
+
+ double bytes_per_sec =
+ ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len));
+ if(received_rate_ == 0)
+ received_rate_ = bytes_per_sec;
+ else
+ received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) +
+ ((1 - MOVING_AVG_ALPHA) * bytes_per_sec);
+
+ // search for an active path. There should be only one active path (meaning a
+ // path that leads to the producer socket -no cache- and from which we are
+ // currently getting data packets) at any time. However it may happen that
+ // there are mulitple active paths in case of mobility (the old path will
+ // remain active for a short ammount of time). The main path is selected as
+ // the active path from where the consumer received the latest data packet
+
+ uint64_t last_packet_ts = 0;
+ main_path_ = nullptr;
+
+ for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->roundEnd();
+ if (it->second->isActive()) {
+ uint64_t ts = it->second->getLastPacketTS();
+ if (ts > last_packet_ts) {
+ last_packet_ts = ts;
+ main_path_ = it->second;
+ }
+ }
+ }
+
+ if (in_sync) updateLossRate();
+
+ // handle nacks
+ if (!nack_on_last_round_ && received_bytes_ > 0) {
+ rounds_without_nacks_++;
+ } else {
+ rounds_without_nacks_ = 0;
+ }
+
+ // check if the producer is active
+ if (received_packets_last_round_ != 0) {
+ rounds_without_packets_ = 0;
+ } else {
+ rounds_without_packets_++;
+ if (rounds_without_packets_ >= MAX_ROUND_WHIOUT_PACKETS &&
+ producer_is_active_ != false) {
+ initParams();
+ }
+ }
+
+ // compute cache/producer ratio
+ if (received_data_last_round_ != 0) {
+ double new_rate =
+ (double)received_data_from_cache_ / (double)received_data_last_round_;
+ data_from_cache_rate_ = data_from_cache_rate_ * MOVING_AVG_ALPHA +
+ (new_rate * (1 - MOVING_AVG_ALPHA));
+ }
+
+ // reset counters
+ received_bytes_ = 0;
+ packets_lost_ = 0;
+ losses_recovered_ = 0;
+ first_seq_in_round_ = highest_seq_received_;
+
+ nack_on_last_round_ = false;
+ received_nacks_last_round_ = 0;
+
+ received_packets_last_round_ = 0;
+ received_data_last_round_ = 0;
+ received_data_from_cache_ = 0;
+ sent_interests_last_round_ = 0;
+ sent_rtx_last_round_ = 0;
+
+ rounds_++;
+}
+
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+}
+
+void RTCState::updatePacketSize(const core::ContentObject &content_object) {
+ uint32_t pkt_size =
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ avg_packet_size_ = (MOVING_AVG_ALPHA * avg_packet_size_) +
+ ((1 - MOVING_AVG_ALPHA) * pkt_size);
+}
+
+void RTCState::updatePathStats(const core::ContentObject &content_object,
+ bool is_nack) {
+ // get packet path
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+
+ if (path_it == path_table_.end()) {
+ // found a new path
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+
+ // compute rtt
+ uint32_t seq = content_object.getName().getSuffix();
+ uint64_t interest_sent_time = getInterestSentTime(seq);
+ if (interest_sent_time == 0)
+ return; // this should not happen,
+ // it means that we are processing an interest
+ // that is not pending
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint64_t RTT = now - interest_sent_time;
+
+ path->insertRttSample(RTT);
+
+ // compute OWD (the first part of the nack and data packet header are the
+ // same, so we cast to data data packet)
+ struct data_packet_t *packet =
+ (struct data_packet_t *)content_object.getPayload()->data();
+ uint64_t sender_timestamp = packet->getTimestamp();
+ int64_t OWD = now - sender_timestamp;
+ path->insertOwdSample(OWD);
+
+ // compute IAT or set path to producer
+ if (!is_nack) {
+ // compute the iat only for the content packets
+ uint32_t segment_number = content_object.getName().getSuffix();
+ path->computeInterArrivalGap(segment_number);
+ if (!path->pathToProducer()) received_data_from_cache_++;
+ } else {
+ path->receivedNack();
+ }
+}
+
+void RTCState::updateLossRate() {
+ loss_rate_ = 0.0;
+ residual_loss_rate_ = 0.0;
+
+ uint32_t number_theorically_received_packets_ =
+ highest_seq_received_ - first_seq_in_round_;
+
+ // in this case no new packet was recevied after the previuos round, avoid
+ // division by 0
+ if (number_theorically_received_packets_ == 0) return;
+
+ loss_rate_ = (double)((double)(packets_lost_) /
+ (double)number_theorically_received_packets_);
+
+ residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) /
+ (double)number_theorically_received_packets_);
+
+ if (residual_loss_rate_ < 0) residual_loss_rate_ = 0;
+}
+
+void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
+ pending_interests_.erase(seq);
+ if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) {
+ received_or_lost_packets_.erase(received_or_lost_packets_.begin());
+ }
+ // notice that it may happen that a packet that we consider lost arrives after
+ // some time, in this case we simply overwrite the packet state.
+ received_or_lost_packets_[seq] = state;
+
+ // keep track of the last packet received/lost
+ // without holes.
+ if (highest_seq_received_in_order_ < last_seq_nacked_) {
+ highest_seq_received_in_order_ = last_seq_nacked_;
+ }
+
+ if ((highest_seq_received_in_order_ + 1) == seq) {
+ highest_seq_received_in_order_ = seq;
+ } else if (seq <= highest_seq_received_in_order_) {
+ // here we do nothing
+ } else if (seq > highest_seq_received_in_order_) {
+ // 1) there is a gap in the sequence so we do not update largest_in_seq_
+ // 2) all the packets from largest_in_seq_ to seq are in
+ // received_or_lost_packets_ an we upate largest_in_seq_
+
+ for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) {
+ if (received_or_lost_packets_.find(i) ==
+ received_or_lost_packets_.end()) {
+ break;
+ }
+ // this packet is in order so we can update the
+ // highest_seq_received_in_order_
+ highest_seq_received_in_order_ = i;
+ }
+ }
+}
+
+void RTCState::setInitRttTimer(uint32_t wait){
+ init_rtt_timer_->cancel();
+ init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ init_rtt_timer_->async_wait([this](std::error_code ec) {
+ if(ec) return;
+ checkInitRttTimer();
+ });
+}
+
+void RTCState::checkInitRttTimer() {
+ if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){
+ // we didn't received enough probes, restart
+ received_probes_ = 0;
+ rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES);
+ rtt_probes_->sendProbes();
+ setInitRttTimer(INIT_RTT_PROBE_RESTART);
+ return;
+ }
+ init_rtt_ = true;
+ main_path_->roundEnd();
+ rtt_probes_->setProbes(RTT_PROBE_INTERVAL, 0);
+ rtt_probes_->sendProbes();
+
+ // init last_seq_nacked_. skip packets that may come from the cache
+ double prod_rate = getProducerRate();
+ double rtt = (double)getRTT() / MILLI_IN_A_SEC;
+ double packet_size = getAveragePacketSize();
+ uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8);
+ last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_;
+
+ discovered_rtt_callback_();
+}
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h
new file mode 100644
index 000000000..943a0a113
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc_state.h
@@ -0,0 +1,253 @@
+/*
+ * Copyright (c) 2017-2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/name.h>
+#include <protocols/rtc/probe_handler.h>
+#include <protocols/rtc/rtc_data_path.h>
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <map>
+#include <set>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN };
+
+class RTCState : std::enable_shared_from_this<RTCState> {
+ public:
+ using DiscoveredRttCallback = std::function<void()>;
+ public:
+ RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback,
+ DiscoveredRttCallback &&discovered_rtt_callback,
+ asio::io_service &io_service);
+
+ ~RTCState();
+
+ // packet events
+ void onSendNewInterest(const core::Name *interest_name);
+ void onTimeout(uint32_t seq);
+ void onRetransmission(uint32_t seq);
+ void onDataPacketReceived(const core::ContentObject &content_object,
+ bool compute_stats);
+ void onNackPacketReceived(const core::ContentObject &nack,
+ bool compute_stats);
+ void onPacketLost(uint32_t seq);
+ void onPacketRecovered(uint32_t seq);
+ bool onProbePacketReceived(const core::ContentObject &probe);
+
+ // protocol state
+ void onNewRound(double round_len, bool in_sync);
+
+ // main path
+ uint32_t getProducerPath() const {
+ if (mainPathIsValid()) return main_path_->getPathId();
+ return 0;
+ }
+
+ // delay metrics
+ bool isRttDiscovered() const {
+ return init_rtt_;
+ }
+
+ uint64_t getRTT() const {
+ if (mainPathIsValid()) return main_path_->getMinRtt();
+ return 0;
+ }
+ void resetRttStats() {
+ if (mainPathIsValid()) main_path_->clearRtt();
+ }
+
+ double getQueuing() const {
+ if (mainPathIsValid()) return main_path_->getQueuingDealy();
+ return 0.0;
+ }
+ double getIAT() const {
+ if (mainPathIsValid()) return main_path_->getInterArrivalGap();
+ return 0.0;
+ }
+
+ double getJitter() const {
+ if (mainPathIsValid()) return main_path_->getJitter();
+ return 0.0;
+ }
+
+ // pending interests
+ uint64_t getInterestSentTime(uint32_t seq) {
+ auto it = pending_interests_.find(seq);
+ if (it != pending_interests_.end()) return it->second;
+ return 0;
+ }
+ bool isPending(uint32_t seq) {
+ if (pending_interests_.find(seq) != pending_interests_.end()) return true;
+ return false;
+ }
+ uint32_t getPendingInterestNumber() const {
+ return pending_interests_.size();
+ }
+ PacketState isReceivedOrLost(uint32_t seq) {
+ auto it = received_or_lost_packets_.find(seq);
+ if (it != received_or_lost_packets_.end()) return it->second;
+ return PacketState::UNKNOWN;
+ }
+
+ // loss rate
+ double getLossRate() const { return loss_rate_; }
+ double getResidualLossRate() const { return residual_loss_rate_; }
+ uint32_t getHighestSeqReceivedInOrder() const {
+ return highest_seq_received_in_order_;
+ }
+ uint32_t getLostData() const { return packets_lost_; };
+ uint32_t getRecoveredLosses() const { return losses_recovered_; }
+
+ // generic stats
+ uint32_t getReceivedBytesInRound() const { return received_bytes_; }
+ uint32_t getReceivedNacksInRound() const {
+ return received_nacks_last_round_;
+ }
+ uint32_t getSentInterestInRound() const { return sent_interests_last_round_; }
+ uint32_t getSentRtxInRound() const { return sent_rtx_last_round_; }
+
+ // bandwidth/production metrics
+ double getAvailableBw() const { return 0.0; }; // TODO
+ double getProducerRate() const { return production_rate_; }
+ double getReceivedRate() const { return received_rate_; }
+ double getAveragePacketSize() const { return avg_packet_size_; }
+
+ // nacks
+ uint32_t getRoundsWithoutNacks() const { return rounds_without_nacks_; }
+ uint32_t getLastSeqNacked() const { return last_seq_nacked_; }
+
+ // producer state
+ bool isProducerActive() const { return producer_is_active_; }
+
+ // packets from cache
+ double getPacketFromCacheRatio() const { return data_from_cache_rate_; }
+
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapBegin() {
+ return pending_interests_.begin();
+ }
+ std::map<uint32_t, uint64_t>::iterator getPendingInterestsMapEnd() {
+ return pending_interests_.end();
+ }
+
+ private:
+ void initParams();
+
+ // update stats
+ void updateState();
+ void updateReceivedBytes(const core::ContentObject &content_object);
+ void updatePacketSize(const core::ContentObject &content_object);
+ void updatePathStats(const core::ContentObject &content_object, bool is_nack);
+ void updateLossRate();
+
+ void addRecvOrLost(uint32_t seq, PacketState state);
+
+ void setInitRttTimer(uint32_t wait);
+ void checkInitRttTimer();
+
+ bool mainPathIsValid() const {
+ if (main_path_ != nullptr)
+ return true;
+ else
+ return false;
+ }
+
+ // packets counters (total)
+ uint32_t sent_interests_;
+ uint32_t sent_rtx_;
+ uint32_t received_data_;
+ uint32_t received_nacks_;
+ uint32_t received_timeouts_;
+ uint32_t received_probes_;
+
+ // loss counters
+ int32_t packets_lost_;
+ int32_t losses_recovered_;
+ uint32_t first_seq_in_round_;
+ uint32_t highest_seq_received_;
+ uint32_t highest_seq_received_in_order_;
+ uint32_t last_seq_nacked_; // segment for which we got an oldNack
+ double loss_rate_;
+ double residual_loss_rate_;
+
+ // bw counters
+ uint32_t received_bytes_;
+ double avg_packet_size_;
+ double production_rate_; // rate communicated by the producer using nacks
+ double received_rate_; // rate recevied by the consumer
+
+ // nack counter
+ // the bool takes tracks only about the valid nacks (no rtx) and it is used to
+ // switch between the states. Instead received_nacks_last_round_ logs all the
+ // nacks for statistics
+ bool nack_on_last_round_;
+ uint32_t received_nacks_last_round_;
+
+ // packets counter
+ uint32_t received_packets_last_round_;
+ uint32_t received_data_last_round_;
+ uint32_t received_data_from_cache_;
+ double data_from_cache_rate_;
+ uint32_t sent_interests_last_round_;
+ uint32_t sent_rtx_last_round_;
+
+ // round conunters
+ uint32_t rounds_;
+ uint32_t rounds_without_nacks_;
+ uint32_t rounds_without_packets_;
+
+ // init rtt
+ uint64_t first_interest_sent_;
+
+ // producer state
+ bool
+ producer_is_active_; // the prodcuer is active if we receive some packets
+ uint32_t last_production_seq_; // last production seq received by the producer
+ uint64_t last_prod_update_; // timestamp of the last packets used to update
+ // stats from the producer
+
+ // paths stats
+ std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_;
+ std::shared_ptr<RTCDataPath> main_path_;
+
+ // packet received
+ // cache where to store info about the last MAX_CACHED_PACKETS
+ std::map<uint32_t, PacketState> received_or_lost_packets_;
+
+ // pending interests
+ std::map<uint32_t, uint64_t> pending_interests_;
+
+ // probes
+ std::shared_ptr<ProbeHandler> rtt_probes_;
+ bool init_rtt_;
+ std::unique_ptr<asio::steady_timer> init_rtt_timer_;
+
+ // callbacks
+ DiscoveredRttCallback discovered_rtt_callback_;
+};
+
+} // namespace rtc
+
+} // namespace protocol
+
+} // namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc
new file mode 100644
index 000000000..7a0803857
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.cc
@@ -0,0 +1,334 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc
+
+#include "trendline_estimator.h"
+
+#include <math.h>
+
+#include <algorithm>
+#include <string>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+// Parameters for linear least squares fit of regression line to noisy data.
+constexpr double kDefaultTrendlineSmoothingCoeff = 0.9;
+constexpr double kDefaultTrendlineThresholdGain = 4.0;
+// const char kBweWindowSizeInPacketsExperiment[] =
+// "WebRTC-BweWindowSizeInPackets";
+
+/*size_t ReadTrendlineFilterWindowSize(
+ const WebRtcKeyValueConfig* key_value_config) {
+ std::string experiment_string =
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment);
+ size_t window_size;
+ int parsed_values =
+ sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size);
+ if (parsed_values == 1) {
+ if (window_size > 1)
+ return window_size;
+ RTC_LOG(WARNING) << "Window size must be greater than 1.";
+ }
+ RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets"
+ " experiment from field trial string. Using default.";
+ return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize;
+}
+*/
+
+OptionalDouble LinearFitSlope(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets) {
+ // RTC_DCHECK(packets.size() >= 2);
+ // Compute the "center of mass".
+ double sum_x = 0;
+ double sum_y = 0;
+ for (const auto& packet : packets) {
+ sum_x += packet.arrival_time_ms;
+ sum_y += packet.smoothed_delay_ms;
+ }
+ double x_avg = sum_x / packets.size();
+ double y_avg = sum_y / packets.size();
+ // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2
+ double numerator = 0;
+ double denominator = 0;
+ for (const auto& packet : packets) {
+ double x = packet.arrival_time_ms;
+ double y = packet.smoothed_delay_ms;
+ numerator += (x - x_avg) * (y - y_avg);
+ denominator += (x - x_avg) * (x - x_avg);
+ }
+ if (denominator == 0) return OptionalDouble();
+ return OptionalDouble(numerator / denominator);
+}
+
+OptionalDouble ComputeSlopeCap(
+ const std::deque<TrendlineEstimator::PacketTiming>& packets,
+ const TrendlineEstimatorSettings& settings) {
+ /*RTC_DCHECK(1 <= settings.beginning_packets &&
+ settings.beginning_packets < packets.size());
+ RTC_DCHECK(1 <= settings.end_packets &&
+ settings.end_packets < packets.size());
+ RTC_DCHECK(settings.beginning_packets + settings.end_packets <=
+ packets.size());*/
+ TrendlineEstimator::PacketTiming early = packets[0];
+ for (size_t i = 1; i < settings.beginning_packets; ++i) {
+ if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i];
+ }
+ size_t late_start = packets.size() - settings.end_packets;
+ TrendlineEstimator::PacketTiming late = packets[late_start];
+ for (size_t i = late_start + 1; i < packets.size(); ++i) {
+ if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i];
+ }
+ if (late.arrival_time_ms - early.arrival_time_ms < 1) {
+ return OptionalDouble();
+ }
+ return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) /
+ (late.arrival_time_ms - early.arrival_time_ms) +
+ settings.cap_uncertainty);
+}
+
+constexpr double kMaxAdaptOffsetMs = 15.0;
+constexpr double kOverUsingTimeThreshold = 10;
+constexpr int kMinNumDeltas = 60;
+constexpr int kDeltaCounterMax = 1000;
+
+//} // namespace
+
+constexpr char TrendlineEstimatorSettings::kKey[];
+
+TrendlineEstimatorSettings::TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/) {
+ /*if (absl::StartsWith(
+ key_value_config->Lookup(kBweWindowSizeInPacketsExperiment),
+ "Enabled")) {
+ window_size = ReadTrendlineFilterWindowSize(key_value_config);
+ }
+ Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/
+ window_size = kDefaultTrendlineWindowSize;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+
+ /*if (window_size < 10 || 200 < window_size) {
+ RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets";
+ window_size = kDefaultTrendlineWindowSize;
+ }
+ if (enable_cap) {
+ if (beginning_packets < 1 || end_packets < 1 ||
+ beginning_packets > window_size || end_packets > window_size) {
+ RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and "
+ << window_size;
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (beginning_packets + end_packets > window_size) {
+ RTC_LOG(LS_WARNING)
+ << "Size of beginning plus end can't exceed the window size";
+ enable_cap = false;
+ beginning_packets = end_packets = 0;
+ cap_uncertainty = 0.0;
+ }
+ if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) {
+ RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025";
+ cap_uncertainty = 0.0;
+ }
+ }*/
+}
+
+/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() {
+ return StructParametersParser::Create("sort", &enable_sort, //
+ "cap", &enable_cap, //
+ "beginning_packets",
+ &beginning_packets, //
+ "end_packets", &end_packets, //
+ "cap_uncertainty", &cap_uncertainty, //
+ "window_size", &window_size);
+}*/
+
+TrendlineEstimator::TrendlineEstimator(
+ /*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/)
+ : settings_(),
+ smoothing_coef_(kDefaultTrendlineSmoothingCoeff),
+ threshold_gain_(kDefaultTrendlineThresholdGain),
+ num_of_deltas_(0),
+ first_arrival_time_ms_(-1),
+ accumulated_delay_(0),
+ smoothed_delay_(0),
+ delay_hist_(),
+ k_up_(0.0087),
+ k_down_(0.039),
+ overusing_time_threshold_(kOverUsingTimeThreshold),
+ threshold_(12.5),
+ prev_modified_trend_(NAN),
+ last_update_ms_(-1),
+ prev_trend_(0.0),
+ time_over_using_(-1),
+ overuse_counter_(0),
+ hypothesis_(BandwidthUsage::kBwNormal){
+ // hypothesis_predicted_(BandwidthUsage::kBwNormal){//},
+ // network_state_predictor_(network_state_predictor) {
+ /* RTC_LOG(LS_INFO)
+ << "Using Trendline filter for delay change estimation with settings "
+ << settings_.Parser()->Encode() << " and "
+ // << (network_state_predictor_ ? "injected" : "no")
+ << " network state predictor";*/
+}
+
+TrendlineEstimator::~TrendlineEstimator() {}
+
+void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
+ double send_delta_ms,
+ int64_t send_time_ms,
+ int64_t arrival_time_ms,
+ size_t packet_size) {
+ const double delta_ms = recv_delta_ms - send_delta_ms;
+ ++num_of_deltas_;
+ num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
+ if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms;
+
+ // Exponential backoff filter.
+ accumulated_delay_ += delta_ms;
+ // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
+ // accumulated_delay_);
+ smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
+ (1 - smoothing_coef_) * accumulated_delay_;
+ // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
+ // smoothed_delay_);
+
+ // Maintain packet window
+ delay_hist_.emplace_back(
+ static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
+ smoothed_delay_, accumulated_delay_);
+ if (settings_.enable_sort) {
+ for (size_t i = delay_hist_.size() - 1;
+ i > 0 &&
+ delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
+ --i) {
+ std::swap(delay_hist_[i], delay_hist_[i - 1]);
+ }
+ }
+ if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front();
+
+ // Simple linear regression.
+ double trend = prev_trend_;
+ if (delay_hist_.size() == settings_.window_size) {
+ // Update trend_ if it is possible to fit a line to the data. The delay
+ // trend can be seen as an estimate of (send_rate - capacity)/capacity.
+ // 0 < trend < 1 -> the delay increases, queues are filling up
+ // trend == 0 -> the delay does not change
+ // trend < 0 -> the delay decreases, queues are being emptied
+ OptionalDouble trendO = LinearFitSlope(delay_hist_);
+ if (trendO.has_value()) trend = trendO.value();
+ if (settings_.enable_cap) {
+ OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_);
+ // We only use the cap to filter out overuse detections, not
+ // to detect additional underuses.
+ if (trend >= 0 && cap.has_value() && trend > cap.value()) {
+ trend = cap.value();
+ }
+ }
+ }
+ // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
+
+ Detect(trend, send_delta_ms, arrival_time_ms);
+}
+
+void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size, bool calculated_deltas) {
+ if (calculated_deltas) {
+ UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
+ packet_size);
+ }
+ /*if (network_state_predictor_) {
+ hypothesis_predicted_ = network_state_predictor_->Update(
+ send_time_ms, arrival_time_ms, hypothesis_);
+ }*/
+}
+
+BandwidthUsage TrendlineEstimator::State() const {
+ return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_;
+}
+
+void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
+ /*if (num_of_deltas_ < 2) {
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ return;
+ }*/
+
+ const double modified_trend =
+ std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
+ prev_modified_trend_ = modified_trend;
+ // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
+ // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
+ if (modified_trend > threshold_) {
+ if (time_over_using_ == -1) {
+ // Initialize the timer. Assume that we've been
+ // over-using half of the time since the previous
+ // sample.
+ time_over_using_ = ts_delta / 2;
+ } else {
+ // Increment timer
+ time_over_using_ += ts_delta;
+ }
+ overuse_counter_++;
+ if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
+ if (trend >= prev_trend_) {
+ time_over_using_ = 0;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwOverusing;
+ }
+ }
+ } else if (modified_trend < -threshold_) {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwUnderusing;
+ } else {
+ time_over_using_ = -1;
+ overuse_counter_ = 0;
+ hypothesis_ = BandwidthUsage::kBwNormal;
+ }
+ prev_trend_ = trend;
+ UpdateThreshold(modified_trend, now_ms);
+}
+
+void TrendlineEstimator::UpdateThreshold(double modified_trend,
+ int64_t now_ms) {
+ if (last_update_ms_ == -1) last_update_ms_ = now_ms;
+
+ if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
+ // Avoid adapting the threshold to big latency spikes, caused e.g.,
+ // by a sudden capacity drop.
+ last_update_ms_ = now_ms;
+ return;
+ }
+
+ const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
+ const int64_t kMaxTimeDeltaMs = 100;
+ int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
+ threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
+ if (threshold_ < 6.f) threshold_ = 6.f;
+ if (threshold_ > 600.f) threshold_ = 600.f;
+ // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
+ last_update_ms_ = now_ms;
+}
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h
new file mode 100644
index 000000000..372acbc67
--- /dev/null
+++ b/libtransport/src/protocols/rtc/trendline_estimator.h
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+// FROM
+// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h
+
+#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <deque>
+#include <memory>
+#include <utility>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+class OptionalDouble {
+ public:
+ OptionalDouble() : val(0), has_val(false){};
+ OptionalDouble(double val) : val(val), has_val(true){};
+
+ double value() { return val; }
+ bool has_value() { return has_val; }
+
+ private:
+ double val;
+ bool has_val;
+};
+
+enum class BandwidthUsage {
+ kBwNormal = 0,
+ kBwUnderusing = 1,
+ kBwOverusing = 2,
+ kLast
+};
+
+struct TrendlineEstimatorSettings {
+ static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings";
+ static constexpr unsigned kDefaultTrendlineWindowSize = 20;
+
+ // TrendlineEstimatorSettings() = delete;
+ TrendlineEstimatorSettings(
+ /*const WebRtcKeyValueConfig* key_value_config*/);
+
+ // Sort the packets in the window. Should be redundant,
+ // but then almost no cost.
+ bool enable_sort = false;
+
+ // Cap the trendline slope based on the minimum delay seen
+ // in the beginning_packets and end_packets respectively.
+ bool enable_cap = false;
+ unsigned beginning_packets = 7;
+ unsigned end_packets = 7;
+ double cap_uncertainty = 0.0;
+
+ // Size (in packets) of the window.
+ unsigned window_size = kDefaultTrendlineWindowSize;
+
+ // std::unique_ptr<StructParametersParser> Parser();
+};
+
+class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ {
+ public:
+ TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config,
+ NetworkStatePredictor* network_state_predictor*/);
+
+ ~TrendlineEstimator();
+
+ // Update the estimator with a new sample. The deltas should represent deltas
+ // between timestamp groups as defined by the InterArrival class.
+ void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms,
+ int64_t arrival_time_ms, size_t packet_size,
+ bool calculated_deltas);
+
+ void UpdateTrendline(double recv_delta_ms, double send_delta_ms,
+ int64_t send_time_ms, int64_t arrival_time_ms,
+ size_t packet_size);
+
+ BandwidthUsage State() const;
+
+ struct PacketTiming {
+ PacketTiming(double arrival_time_ms, double smoothed_delay_ms,
+ double raw_delay_ms)
+ : arrival_time_ms(arrival_time_ms),
+ smoothed_delay_ms(smoothed_delay_ms),
+ raw_delay_ms(raw_delay_ms) {}
+ double arrival_time_ms;
+ double smoothed_delay_ms;
+ double raw_delay_ms;
+ };
+
+ private:
+ // friend class GoogCcStatePrinter;
+ void Detect(double trend, double ts_delta, int64_t now_ms);
+
+ void UpdateThreshold(double modified_offset, int64_t now_ms);
+
+ // Parameters.
+ TrendlineEstimatorSettings settings_;
+ const double smoothing_coef_;
+ const double threshold_gain_;
+ // Used by the existing threshold.
+ int num_of_deltas_;
+ // Keep the arrival times small by using the change from the first packet.
+ int64_t first_arrival_time_ms_;
+ // Exponential backoff filtering.
+ double accumulated_delay_;
+ double smoothed_delay_;
+ // Linear least squares regression.
+ std::deque<PacketTiming> delay_hist_;
+
+ const double k_up_;
+ const double k_down_;
+ double overusing_time_threshold_;
+ double threshold_;
+ double prev_modified_trend_;
+ int64_t last_update_ms_;
+ double prev_trend_;
+ double time_over_using_;
+ int overuse_counter_;
+ BandwidthUsage hypothesis_;
+ // BandwidthUsage hypothesis_predicted_;
+ // NetworkStatePredictor* network_state_predictor_;
+
+ // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator);
+};
+
+} // namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport
+#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_