diff options
Diffstat (limited to 'libtransport/src/protocols/rtc')
21 files changed, 981 insertions, 1150 deletions
diff --git a/libtransport/src/protocols/rtc/CMakeLists.txt b/libtransport/src/protocols/rtc/CMakeLists.txt index 77f065d0e..873b345d0 100644 --- a/libtransport/src/protocols/rtc/CMakeLists.txt +++ b/libtransport/src/protocols/rtc/CMakeLists.txt @@ -11,27 +11,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_consts.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.h - ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_packet.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.h ) list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_ldr.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_rc_queue.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc - ${CMAKE_CURRENT_SOURCE_DIR}/probe_handler.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_state.cc ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/protocols/rtc/congestion_detection.cc b/libtransport/src/protocols/rtc/congestion_detection.cc deleted file mode 100644 index e2d44ae66..000000000 --- a/libtransport/src/protocols/rtc/congestion_detection.cc +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/utils/log.h> -#include <protocols/rtc/congestion_detection.h> - -namespace transport { - -namespace protocol { - -namespace rtc { - -CongestionDetection::CongestionDetection() - : cc_estimator_(), last_processed_chunk_() {} - -CongestionDetection::~CongestionDetection() {} - -void CongestionDetection::updateStats() { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if (chunks_number_.empty()) return; - - uint32_t chunk_number = chunks_number_.front(); - - while (chunks_[chunk_number].getReceivedTime() + HICN_CC_STATS_MAX_DELAY_MS < - now || - chunks_[chunk_number].isComplete()) { - if (chunk_number == last_processed_chunk_.getFrameSeqNum() + 1) { - chunks_[chunk_number].setPreviousSentTime( - last_processed_chunk_.getSentTime()); - - chunks_[chunk_number].setPreviousReceivedTime( - last_processed_chunk_.getReceivedTime()); - cc_estimator_.Update(chunks_[chunk_number].getReceivedDelta(), - chunks_[chunk_number].getSentDelta(), - chunks_[chunk_number].getSentTime(), - chunks_[chunk_number].getReceivedTime(), - chunks_[chunk_number].getFrameSize(), true); - - } else { - TRANSPORT_LOGD( - "CongestionDetection::updateStats frame %u but not the \ - previous one, last one was %u currentFrame %u", - chunk_number, last_processed_chunk_.getFrameSeqNum(), - chunks_[chunk_number].getFrameSeqNum()); - } - - last_processed_chunk_ = chunks_[chunk_number]; - - chunks_.erase(chunk_number); - - chunks_number_.pop(); - if (chunks_number_.empty()) break; - - chunk_number = chunks_number_.front(); - } -} - -void CongestionDetection::addPacket(const core::ContentObject &content_object) { - auto payload = content_object.getPayload(); - uint32_t payload_size = (uint32_t)payload->length(); - uint32_t segmentNumber = content_object.getName().getSuffix(); - // uint32_t pkt = segmentNumber & modMask_; - uint64_t *sentTimePtr = (uint64_t *)payload->data(); - - // this is just for testing with hiperf, assuming a frame is 10 pkts - // in the final version, the split should be based on the timestamp in the pkt - uint32_t frameNum = (int)(segmentNumber / HICN_CC_STATS_CHUNK_SIZE); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if (chunks_.find(frameNum) == chunks_.end()) { - // new chunk of pkts or out of order - if (last_processed_chunk_.getFrameSeqNum() > frameNum) - return; // out of order and we already processed the chunk - - chunks_[frameNum] = FrameStats(frameNum, HICN_CC_STATS_CHUNK_SIZE); - chunks_number_.push(frameNum); - } - - chunks_[frameNum].addPacket(*sentTimePtr, now, payload_size); -} - -} // namespace rtc -} // namespace protocol -} // namespace transport diff --git a/libtransport/src/protocols/rtc/congestion_detection.h b/libtransport/src/protocols/rtc/congestion_detection.h deleted file mode 100644 index 17f4aa54c..000000000 --- a/libtransport/src/protocols/rtc/congestion_detection.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/core/content_object.h> -#include <protocols/rtc/trendline_estimator.h> - -#include <map> -#include <queue> - -#define HICN_CC_STATS_CHUNK_SIZE 10 -#define HICN_CC_STATS_MAX_DELAY_MS 100 - -namespace transport { - -namespace protocol { - -namespace rtc { - -class FrameStats { - public: - FrameStats() - : frame_num_(0), - sent_time_(0), - received_time_(0), - previous_sent_time_(0), - previous_received_time_(0), - size_(0), - received_pkt_m(0), - burst_size_m(HICN_CC_STATS_CHUNK_SIZE){}; - - FrameStats(uint32_t burst_size) - : frame_num_(0), - sent_time_(0), - received_time_(0), - previous_sent_time_(0), - previous_received_time_(0), - size_(0), - received_pkt_m(0), - burst_size_m(burst_size){}; - - FrameStats(uint32_t frame_num, uint32_t burst_size) - : frame_num_(frame_num), - sent_time_(0), - received_time_(0), - previous_sent_time_(0), - previous_received_time_(0), - size_(0), - received_pkt_m(0), - burst_size_m(burst_size){}; - - FrameStats(uint32_t frame_num, uint64_t sent_time, uint64_t received_time, - uint32_t size, FrameStats previousFrame, uint32_t burst_size) - : frame_num_(frame_num), - sent_time_(sent_time), - received_time_(received_time), - previous_sent_time_(previousFrame.getSentTime()), - previous_received_time_(previousFrame.getReceivedTime()), - size_(size), - received_pkt_m(1), - burst_size_m(burst_size){}; - - void addPacket(uint64_t sent_time, uint64_t received_time, uint32_t size) { - size_ += size; - sent_time_ = - (sent_time_ == 0) ? sent_time : std::min(sent_time_, sent_time); - received_time_ = std::max(received_time, received_time_); - received_pkt_m++; - } - - bool isComplete() { return received_pkt_m == burst_size_m; } - - uint32_t getFrameSeqNum() const { return frame_num_; } - uint64_t getSentTime() const { return sent_time_; } - uint64_t getReceivedTime() const { return received_time_; } - uint32_t getFrameSize() const { return size_; } - - void setPreviousReceivedTime(uint64_t time) { - previous_received_time_ = time; - } - void setPreviousSentTime(uint64_t time) { previous_sent_time_ = time; } - - // todo manage first frame - double getReceivedDelta() { - return static_cast<double>(received_time_ - previous_received_time_); - } - double getSentDelta() { - return static_cast<double>(sent_time_ - previous_sent_time_); - } - - private: - uint32_t frame_num_; - uint64_t sent_time_; - uint64_t received_time_; - - uint64_t previous_sent_time_; - uint64_t previous_received_time_; - uint32_t size_; - - uint32_t received_pkt_m; - uint32_t burst_size_m; -}; - -class CongestionDetection { - public: - CongestionDetection(); - ~CongestionDetection(); - - void addPacket(const core::ContentObject &content_object); - - BandwidthUsage getState() { return cc_estimator_.State(); } - - void updateStats(); - - private: - TrendlineEstimator cc_estimator_; - std::map<uint32_t, FrameStats> chunks_; - std::queue<uint32_t> chunks_number_; - - FrameStats last_processed_chunk_; -}; - -} // end namespace rtc - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc index efba362d4..abaca6ad9 100644 --- a/libtransport/src/protocols/rtc/probe_handler.cc +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -43,7 +43,7 @@ uint64_t ProbeHandler::getRtt(uint32_t seq) { std::chrono::steady_clock::now().time_since_epoch()) .count(); uint64_t rtt = now - it->second; - if(rtt < 1) rtt = 1; + if (rtt < 1) rtt = 1; pending_probes_.erase(it); diff --git a/libtransport/src/protocols/rtc/probe_handler.h b/libtransport/src/protocols/rtc/probe_handler.h index b8ed84445..e34b23df0 100644 --- a/libtransport/src/protocols/rtc/probe_handler.h +++ b/libtransport/src/protocols/rtc/probe_handler.h @@ -14,9 +14,8 @@ */ #pragma once #include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> -#include <asio.hpp> -#include <asio/steady_timer.hpp> #include <functional> #include <random> #include <unordered_map> @@ -32,8 +31,7 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { using SendProbeCallback = std::function<void(uint32_t)>; public: - ProbeHandler(SendProbeCallback &&send_callback, - asio::io_service &io_service); + ProbeHandler(SendProbeCallback &&send_callback, asio::io_service &io_service); ~ProbeHandler(); @@ -53,8 +51,8 @@ class ProbeHandler : public std::enable_shared_from_this<ProbeHandler> { private: uint32_t probe_interval_; // us - uint32_t max_probes_; // packets - uint32_t sent_probes_; // packets + uint32_t max_probes_; // packets + uint32_t sent_probes_; // packets std::unique_ptr<asio::steady_timer> probe_timer_; diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index 46659ac74..0cb4cda1d 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -17,8 +17,11 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <implementation/socket_consumer.h> #include <math.h> +#include <protocols/errors.h> +#include <protocols/incremental_indexer_bytestream.h> #include <protocols/rtc/rtc.h> #include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_indexer.h> #include <protocols/rtc/rtc_rc_queue.h> #include <algorithm> @@ -33,39 +36,41 @@ using namespace interface; RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, nullptr), - DatagramReassembly(icn_socket, this), + : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), + new DatagramReassembly(icn_socket, this)), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); scheduler_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); } RTCTransportProtocol::~RTCTransportProtocol() {} void RTCTransportProtocol::resume() { - if (is_running_) return; - - is_running_ = true; - newRound(); + TransportProtocol::resume(); +} - portal_->runEventsLoop(); - is_running_ = false; +std::size_t RTCTransportProtocol::transportHeaderLength() { + return DATA_HEADER_SIZE + + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0); } // private void RTCTransportProtocol::initParams() { - portal_->setConsumerCallback(this); + TransportProtocol::reset(); rc_ = std::make_shared<RTCRateControlQueue>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( + indexer_verifier_.get(), std::bind(&RTCTransportProtocol::sendRtxInterest, this, std::placeholders::_1), portal_->getIoService()); state_ = std::make_shared<RTCState>( + indexer_verifier_.get(), std::bind(&RTCTransportProtocol::sendProbeInterest, this, std::placeholders::_1), std::bind(&RTCTransportProtocol::discoveredRtt, this), @@ -83,8 +88,27 @@ void RTCTransportProtocol::initParams() { // Cancel timer number_++; round_timer_->cancel(); + scheduler_timer_->cancel(); scheduler_timer_on_ = false; + last_interest_sent_time_ = 0; + last_interest_sent_seq_ = 0; + +#if 0 + if(portal_->isConnectedToFwd()){ + max_aggregated_interest_ = 1; + }else{ + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + } +#else + max_aggregated_interest_ = 1; +#endif + + max_sent_int_ = + std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); + + pacing_timer_->cancel(); + pacing_timer_on_ = false; // delete all timeouts and future nacks timeouts_or_nacks_.clear(); @@ -93,16 +117,28 @@ void RTCTransportProtocol::initParams() { current_sync_win_ = INITIAL_WIN; max_sync_win_ = INITIAL_WIN_MAX; - // names/packets var - next_segment_ = 0; - socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, RTC_INTEREST_LIFETIME); + + // FEC + using namespace std::placeholders; + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1), + /* We leave the buffer allocation to the fec decoder */ + fec::FECBase::BufferRequested(0)); + + if (fec_decoder_) { + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), + fec::FECUtils::getSourceSymbols(fec_type_)); + } else { + indexer_verifier_->disableFec(); + } } // private void RTCTransportProtocol::reset() { - TRANSPORT_LOGD("reset called"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called"; initParams(); newRound(); } @@ -113,11 +149,13 @@ void RTCTransportProtocol::inactiveProducer() { current_sync_win_ = INITIAL_WIN; max_sync_win_ = INITIAL_WIN_MAX; - TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_, - max_sync_win_); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_ + << ", max_sync_win_: " << max_sync_win_; // names/packets var - next_segment_ = 0; + indexer_verifier_->reset(); + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); ldr_->clear(); } @@ -137,10 +175,13 @@ void RTCTransportProtocol::newRound() { uint32_t received_bytes = state_->getReceivedBytesInRound(); uint32_t sent_interest = state_->getSentInterestInRound(); uint32_t lost_data = state_->getLostData(); + uint32_t definitely_lost = state_->getDefinitelyLostPackets(); uint32_t recovered_losses = state_->getRecoveredLosses(); uint32_t received_nacks = state_->getReceivedNacksInRound(); + uint32_t received_fec = state_->getReceivedFecPackets(); bool in_sync = (current_state_ == SyncState::in_sync); + ldr_->onNewRound(in_sync); state_->onNewRound((double)ROUND_LEN, in_sync); rc_->onNewRound((double)ROUND_LEN); @@ -161,11 +202,13 @@ void RTCTransportProtocol::newRound() { } } - TRANSPORT_LOGD("Calling updateSyncWindow in newRound function"); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Calling updateSyncWindow in newRound function"; updateSyncWindow(); sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, - recovered_losses, received_nacks); + definitely_lost, recovered_losses, received_nacks, + received_fec); newRound(); }); } @@ -173,6 +216,7 @@ void RTCTransportProtocol::newRound() { void RTCTransportProtocol::discoveredRtt() { start_send_interest_ = true; ldr_->turnOnRTX(); + ldr_->onNewRound(false); updateSyncWindow(); } @@ -182,22 +226,23 @@ void RTCTransportProtocol::computeMaxSyncWindow() { if (production_rate == 0.0 || packet_size == 0.0) { // the consumer has no info about the producer, // keep the previous maxCWin - TRANSPORT_LOGD( - "Returning in computeMaxSyncWindow because: prod_rate: %d || " - "packet_size: %d", - (int)(production_rate == 0.0), (int)(packet_size == 0.0)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Returning in computeMaxSyncWindow because: prod_rate: " + << (production_rate == 0.0) + << " || packet_size: " << (packet_size == 0.0); return; } + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); + uint32_t lifetime = default_values::interest_lifetime; socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; - - max_sync_win_ = - (uint32_t)ceil((production_rate * lifetime_ms * - INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); + max_sync_win_ = (uint32_t)ceil( + (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / + packet_size); max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); } @@ -219,12 +264,25 @@ void RTCTransportProtocol::updateSyncWindow() { // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { - current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); - current_sync_win_ += (uint32_t) - ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size); + double fec_interest_overhead = (double)state_->getPendingFecPackets() / + (double)(state_->getPendingInterestNumber() - + state_->getPendingFecPackets()); + + double fec_overhead = + std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead); + + prod_rate += (prod_rate * fec_overhead); - if(current_state_ == SyncState::catch_up) { - current_sync_win_ = (uint32_t) (current_sync_win_ * CATCH_UP_WIN_INCREMENT); + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + uint32_t buffer = PRODUCER_BUFFER_MS; + if (rtt > 150) + buffer = buffer * 2; // if the RTT is too large we increase the + // the size of the buffer + current_sync_win_ += + ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); + + if (current_state_ == SyncState::catch_up) { + current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; } current_sync_win_ = std::min(current_sync_win_, max_sync_win_); @@ -243,70 +301,48 @@ void RTCTransportProtocol::decreaseSyncWindow() { scheduleNextInterests(); } -void RTCTransportProtocol::sendInterest(Name *interest_name) { - TRANSPORT_LOGD("Sending interest for name %s", - interest_name->toString().c_str()); - - auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); - interest->setName(*interest_name); - - uint32_t lifetime = default_values::interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - interest->setLifetime(uint32_t(lifetime)); - - if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - portal_->sendInterest(std::move(interest)); -} - void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; - if(!start_send_interest_) return; + if (!start_send_interest_) return; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send rtx %u", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq; interest_name->setSuffix(seq); - sendInterest(interest_name); + sendInterest(*interest_name); } void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send probe %u", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq; interest_name->setSuffix(seq); - sendInterest(interest_name); + sendInterest(*interest_name); } void RTCTransportProtocol::scheduleNextInterests() { - TRANSPORT_LOGD("Schedule next interests"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; - if(!start_send_interest_) return; // RTT discovering phase is not finished so - // do not start to send interests + if (pacing_timer_on_) return; // wait pacing timer for the next send - if (scheduler_timer_on_) return; // wait befor send other interests + if (!start_send_interest_) + return; // RTT discovering phase is not finished so + // do not start to send interests if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { - TRANSPORT_LOGD("Inactive producer."); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; // here we keep seding the same interest until the producer // does not start again - if (next_segment_ != 0) { + if (indexer_verifier_->checkNextSuffix() != 0) { // the producer just become inactive, reset the state inactiveProducer(); } @@ -315,125 +351,208 @@ void RTCTransportProtocol::scheduleNextInterests() { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send interest %u", next_segment_); - interest_name->setSuffix(next_segment_); + uint32_t next_seg = 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg; + interest_name->setSuffix(next_seg); if (portal_->interestIsPending(*interest_name)) { // if interest 0 is already pending we return return; } - sendInterest(interest_name); + sendInterest(*interest_name); state_->onSendNewInterest(interest_name); return; } - TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d", - state_->getPendingInterestNumber(), current_sync_win_); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Pending interest number: " << state_->getPendingInterestNumber() + << " -- current_sync_win_: " << current_sync_win_; + + uint32_t pending = state_->getPendingInterestNumber(); + if (pending >= current_sync_win_) return; // no space in the window + + if ((current_sync_win_ - pending) < max_aggregated_interest_) { + if (scheduler_timer_on_) return; // timer already scheduled + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint64_t time = now - last_interest_sent_time_; + if (time < WAIT_FOR_INTEREST_BATCH) { + uint64_t next = WAIT_FOR_INTEREST_BATCH - time; + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); + scheduler_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + if (!scheduler_timer_on_) return; + + scheduler_timer_on_ = false; + scheduleNextInterests(); + }); + return; // whait for the timer + } + } + + scheduler_timer_on_ = false; + scheduler_timer_->cancel(); // skip nacked pacekts - if (next_segment_ <= state_->getLastSeqNacked()) { - next_segment_ = state_->getLastSeqNacked() + 1; + if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) { + indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); } // skipe received packets - if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) { - next_segment_ = state_->getHighestSeqReceivedInOrder() + 1; + if (indexer_verifier_->checkNextSuffix() <= + state_->getHighestSeqReceivedInOrder()) { + indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); } uint32_t sent_interests = 0; + uint32_t sent_packets = 0; + uint32_t aggregated_counter = 0; + Name *name = nullptr; + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; + while ((state_->getPendingInterestNumber() < current_sync_win_) && - (sent_interests < MAX_INTERESTS_IN_BATCH)) { - TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_); - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + (sent_interests < max_sent_int_)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "In while loop. Window size: " << current_sync_win_; + + uint32_t next_seg = indexer_verifier_->getNextSuffix(); - interest_name->setSuffix(next_segment_); + name->setSuffix(next_seg); // send the packet only if: // 1) it is not pending yet (not true for rtx) // 2) the packet is not received or lost // 3) is not in the rtx list - if (portal_->interestIsPending(*interest_name) || - state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN || - ldr_->isRtx(next_segment_)) { - TRANSPORT_LOGD( - "skip interest %u because: pending %u, recv %u, rtx %u", - next_segment_, (portal_->interestIsPending(*interest_name)), - (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN), - (ldr_->isRtx(next_segment_))); - next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + // 4) is fec and is not in order (!= last sent + 1) + if (portal_->interestIsPending(*name) || + state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN || + ldr_->isRtx(next_seg) || + (indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "skip interest " << next_seg << " because: pending " + << portal_->interestIsPending(*name) << ", recv " + << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN) + << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec " + << ((indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)); continue; } + if (aggregated_counter == 0) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg; + interest_name = *name; + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg; + additional_suffixes[aggregated_counter - 1] = next_seg; + } - sent_interests++; - TRANSPORT_LOGD("send interest %u", next_segment_); - sendInterest(interest_name); - state_->onSendNewInterest(interest_name); + last_interest_sent_seq_ = next_seg; + state_->onSendNewInterest(name); + aggregated_counter++; + + if (aggregated_counter >= max_aggregated_interest_) { + sent_packets++; + sent_interests++; + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + last_interest_sent_time_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + aggregated_counter = 0; + } + } - next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + // exiting the while we may have some pending interest to send + if (aggregated_counter != 0) { + sent_packets++; + last_interest_sent_time_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); } if (state_->getPendingInterestNumber() < current_sync_win_) { - // we still have space in the window but we already sent a batch of - // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one - // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop + // we still have space in the window but we already sent too many packets + // wait PACING_WAIT to avoid drops in the kernel - scheduler_timer_on_ = true; - scheduler_timer_->expires_from_now( - std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES)); + pacing_timer_on_ = true; + pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); scheduler_timer_->async_wait([this](std::error_code ec) { if (ec) return; - if (!scheduler_timer_on_) return; + if (!pacing_timer_on_) return; - scheduler_timer_on_ = false; + pacing_timer_on_ = false; scheduleNextInterests(); }); } } -void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { - uint32_t segment_number = interest->getName().getSuffix(); - - TRANSPORT_LOGD("timeout for packet %u", segment_number); +void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, + const Name &name) { + uint32_t segment_number = name.getSuffix(); if (segment_number >= MIN_PROBE_SEQ) { // this is a timeout on a probe, do nothing return; } + PacketState state = state_->isReceivedOrLost(segment_number); + if (state != PacketState::UNKNOWN) { + // we may recover a packets using fec, ignore this timer + return; + } + timeouts_or_nacks_.insert(segment_number); if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && - segment_number <= state_->getHighestSeqReceivedInOrder()) { + segment_number <= state_->getHighestSeqReceived()) { // we retransmit packets only if the producer is active, otherwise we // use timeouts to avoid to send too much traffic // // a timeout is sent using RTX only if it is an old packet. if it is for a // seq number that we didn't reach yet, we send the packet using the normal // schedule next interest - TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number); - ldr_->onTimeout(segment_number); - state_->onTimeout(segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "handle timeout for packet " << segment_number << " using rtx"; + if (ldr_->isRtxOn()) { + ldr_->onTimeout(segment_number); + if (indexer_verifier_->isFec(segment_number)) + state_->onTimeout(segment_number, true); + else + state_->onTimeout(segment_number, false); + } else { + // in this case we wil never recover the timeout + state_->onTimeout(segment_number, true); + } scheduleNextInterests(); return; } - TRANSPORT_LOGD("handle timeout for packet %u using normal interests", - segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number + << " using normal interests"; - if (segment_number < next_segment_) { + if (segment_number < indexer_verifier_->checkNextSuffix()) { // this is a timeout for a packet that will be generated in the future but // we are asking for higher sequence numbers. we need to go back like in the // case of future nacks - TRANSPORT_LOGD("on timeout next seg = %u, jump to %u", - next_segment_, segment_number); - next_segment_ = segment_number; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << segment_number; + // add an extra space in the window + current_sync_win_++; + indexer_verifier_->jumpToIndex(segment_number); } - state_->onTimeout(segment_number); + state_->onTimeout(segment_number, false); scheduleNextInterests(); } @@ -446,8 +565,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // check if the packet got a timeout - TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment, - production_seg); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment + << ". Production segment: " << production_seg; bool compute_stats = true; auto tn_it = timeouts_or_nacks_.find(nack_segment); @@ -459,14 +578,15 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { state_->onNackPacketReceived(content_object, compute_stats); ldr_->onNackPacketReceived(content_object); - // both in case of past and future nack we set next_segment_ equal to the + // both in case of past and future nack we jump to the // production segment in the nack. In case of past nack we will skip unneded // interest (this is already done in the scheduleNextInterest in any case) // while in case of future nacks we can go back in time and ask again for the // content that generated the nack - TRANSPORT_LOGD("on nack next seg = %u, jump to %u", - next_segment_, production_seg); - next_segment_ = production_seg; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On nack next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); if (production_seg > nack_segment) { // remove the nack is it exists @@ -496,30 +616,33 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { void RTCTransportProtocol::onProbe(const ContentObject &content_object) { bool valid = state_->onProbePacketReceived(content_object); - if(!valid) return; + if (!valid) return; struct nack_packet_t *probe = (struct nack_packet_t *)content_object.getPayload()->data(); uint32_t production_seg = probe->getProductionSegement(); - // as for the nacks set next_segment_ - TRANSPORT_LOGD("on probe next seg = %u, jump to %u", - next_segment_, production_seg); - next_segment_ = production_seg; + // as for the nacks set next_segment + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "on probe next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); ldr_->onProbePacketReceived(content_object); updateSyncWindow(); } -void RTCTransportProtocol::onContentObject(Interest &interest, - ContentObject &content_object) { - TRANSPORT_LOGD("Received content object of size: %zu", - content_object.payloadSize()); - uint32_t payload_size = (uint32_t) content_object.payloadSize(); +void RTCTransportProtocol::onContentObjectReceived( + Interest &interest, ContentObject &content_object, std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received content object of size: " << content_object.payloadSize(); + uint32_t payload_size = content_object.payloadSize(); uint32_t segment_number = content_object.getName().getSuffix(); + ec = make_error_code(protocol_error::not_reassemblable); + if (segment_number >= MIN_PROBE_SEQ) { - TRANSPORT_LOGD("Received probe %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); } @@ -528,7 +651,7 @@ void RTCTransportProtocol::onContentObject(Interest &interest, } if (payload_size == NACK_HEADER_SIZE) { - TRANSPORT_LOGD("Received nack %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); } @@ -536,9 +659,8 @@ void RTCTransportProtocol::onContentObject(Interest &interest, return; } - TRANSPORT_LOGD("Received content %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; - rc_->onDataPacketReceived(content_object); bool compute_stats = true; auto tn_it = timeouts_or_nacks_.find(segment_number); if (tn_it != timeouts_or_nacks_.end()) { @@ -551,25 +673,49 @@ void RTCTransportProtocol::onContentObject(Interest &interest, // check if the packet was already received PacketState state = state_->isReceivedOrLost(segment_number); - state_->onDataPacketReceived(content_object, compute_stats); - ldr_->onDataPacketReceived(content_object); - // if the stat for this seq number is received do not send the packet to app if (state != PacketState::RECEIVED) { - if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), content_object); + // send packet to decoder + if (fec_decoder_) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "send packet " << segment_number << " to FEC decoder"; + fec_decoder_->onDataPacket( + content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE); + } + if (!indexer_verifier_->isFec(segment_number)) { + // the packet may be alredy sent to the ap by the decoder, check again if + // it is already received + state = state_->isReceivedOrLost(segment_number); + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number; + + state_->onDataPacketReceived(content_object, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + ec = make_error_code(protocol_error::success); + } + } else { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number; + state_->onFecPacketReceived(content_object); } - reassemble(content_object); } else { - TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received duplicated content " << segment_number << ", drop it"; + ec = make_error_code(protocol_error::duplicated_content); } + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object); + updateSyncWindow(); } void RTCTransportProtocol::sendStatsToApp( uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, - uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) { + uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec) { if (*stats_summary_) { // Send the stats to the app stats_->updateQueuingDelay(state_->getQueuing()); @@ -581,23 +727,35 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateBytesRecv(received_bytes); stats_->updateInterestTx(sent_interests); stats_->updateReceivedNacks(received_nacks); + stats_->updateReceivedFEC(received_fec); stats_->updateAverageWindowSize(current_sync_win_); stats_->updateLossRatio(state_->getLossRate()); stats_->updateAverageRtt(state_->getRTT()); + stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); + stats_->updateDefinitelyLostData(definitely_lost); stats_->updateRecoveredData(recovered_losses); stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); (*stats_summary_)(*socket_->getInterface(), *stats_); } } -void RTCTransportProtocol::reassemble(ContentObject &content_object) { - auto read_buffer = content_object.getPayload(); - TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length()); - read_buffer->trimStart(DATA_HEADER_SIZE); - Reassembly::read_buffer_ = std::move(read_buffer); - Reassembly::notifyApplication(); +void RTCTransportProtocol::onFecPackets( + std::vector<std::pair<uint32_t, fec::buffer>> &packets) { + for (auto &packet : packets) { + PacketState state = state_->isReceivedOrLost(packet.first); + if (state != PacketState::RECEIVED) { + state_->onPacketRecoveredFec(packet.first); + ldr_->onPacketRecoveredFec(packet.first); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << packet.first << " through FEC."; + reassembly_->reassemble(*packet.second, packet.first); + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet" << packet.first << "already received."; + } + } } } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h index 596887067..e6431264d 100644 --- a/libtransport/src/protocols/rtc/rtc.h +++ b/libtransport/src/protocols/rtc/rtc.h @@ -30,8 +30,7 @@ namespace protocol { namespace rtc { -class RTCTransportProtocol : public TransportProtocol, - public DatagramReassembly { +class RTCTransportProtocol : public TransportProtocol { public: RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket); @@ -43,6 +42,8 @@ class RTCTransportProtocol : public TransportProtocol, void resume() override; + std::size_t transportHeaderLength() override; + private: enum class SyncState { catch_up = 0, in_sync = 1, last }; @@ -63,24 +64,28 @@ class RTCTransportProtocol : public TransportProtocol, void decreaseSyncWindow(); // packet functions - void sendInterest(Name *interest_name); void sendRtxInterest(uint32_t seq); void sendProbeInterest(uint32_t seq); void scheduleNextInterests() override; - void onTimeout(Interest::Ptr &&interest) override; + void onInterestTimeout(Interest::Ptr &interest, const Name &name) override; void onNack(const ContentObject &content_object); void onProbe(const ContentObject &content_object); - void reassemble(ContentObject &content_object) override; - void onContentObject(Interest &interest, - ContentObject &content_object) override; - void onPacketDropped(Interest &interest, - ContentObject &content_object) override {} + void onContentObjectReceived(Interest &interest, + ContentObject &content_object, + std::error_code &ec) override; + void onPacketDropped(Interest &interest, ContentObject &content_object, + const std::error_code &reason) override {} void onReassemblyFailed(std::uint32_t missing_segment) override {} // interaction with app functions void sendStatsToApp(uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, uint32_t lost_data, - uint32_t recovered_losses, uint32_t received_nacks); + uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec); + + // FEC functions + void onFecPackets(std::vector<std::pair<uint32_t, fec::buffer>> &packets); + // protocol state bool start_send_interest_; SyncState current_state_; @@ -88,17 +93,30 @@ class RTCTransportProtocol : public TransportProtocol, uint32_t current_sync_win_; uint32_t max_sync_win_; - // controller var + // round timer std::unique_ptr<asio::steady_timer> round_timer_; + + // scheduler timer (postpone interest sending to explot aggregated interests) std::unique_ptr<asio::steady_timer> scheduler_timer_; bool scheduler_timer_on_; + uint64_t last_interest_sent_time_; + uint64_t last_interest_sent_seq_; + + // maximum aggregated interest. if the transport is connected to the forwarder + // we cannot use aggregated interests + uint32_t max_aggregated_interest_; + // maximum number of intereset that can be sent in a loop to avoid packets + // dropped by the kernel + uint32_t max_sent_int_; + + // pacing timer (do not send too many interests in a short time to avoid + // packet drops in the kernel) + std::unique_ptr<asio::steady_timer> pacing_timer_; + bool pacing_timer_on_; // timeouts std::unordered_set<uint32_t> timeouts_or_nacks_; - // names/packets var - uint32_t next_segment_; - std::shared_ptr<RTCState> state_; std::shared_ptr<RTCRateControl> rc_; std::shared_ptr<RTCLossDetectionAndRecovery> ldr_; diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h index e172fc7a1..d04bc1b1f 100644 --- a/libtransport/src/protocols/rtc/rtc_consts.h +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -37,12 +37,26 @@ const double INTEREST_LIFETIME_REDUCTION_FACTOR = 0.8; const uint32_t PRODUCER_BUFFER_MS = 200; // ms // interest scheduler -const uint32_t MAX_INTERESTS_IN_BATCH = 5; -const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec +// const uint32_t MAX_INTERESTS_IN_BATCH = 5; +// const uint32_t WAIT_BETWEEN_INTEREST_BATCHES = 1000; // usec +const uint32_t MAX_INTERESTS_IN_BATCH = 5; // number of seq numbers per + // aggregated interest packet + // considering the name itself +const uint32_t WAIT_FOR_INTEREST_BATCH = 20; // msec. timer that we wait to try + // to aggregate interest in the + // same packet +const uint32_t MAX_PACING_BATCH = 5; +// number of interest that we can send inside +// the loop before they get dropped by the +// kernel. +const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As + // for MAX_PACING_BATCH this value was + // computed during tests +const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop // packet const const uint32_t HICN_HEADER_SIZE = 40 + 20; // IPv6 + TCP bytes -const uint32_t RTC_INTEREST_LIFETIME = 1000; +const uint32_t RTC_INTEREST_LIFETIME = 2000; // probes sequence range const uint32_t MIN_PROBE_SEQ = 0xefffffff; @@ -51,19 +65,18 @@ const uint32_t MAX_RTT_PROBE_SEQ = 0xffffffff - 1; // RTT_PROBE_INTERVAL will be used during the section while // INIT_RTT_PROBE_INTERVAL is used at the beginning to // quickily estimate the RTT -const uint32_t RTT_PROBE_INTERVAL = 200000; // us -const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us -const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT +const uint32_t RTT_PROBE_INTERVAL = 200000; // us +const uint32_t INIT_RTT_PROBE_INTERVAL = 500; // us +const uint32_t INIT_RTT_PROBES = 40; // number of probes to init RTT // if the produdcer is not yet started we need to probe multple times // to get an answer. we wait 100ms between each try -const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms +const uint32_t INIT_RTT_PROBE_RESTART = 100; // ms // once we get the first probe we wait at most 60ms for the others -const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms +const uint32_t INIT_RTT_PROBE_WAIT = 30; // ms // we reuires at least 5 probes to be recevied -const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; //ms +const uint32_t INIT_RTT_MIN_PROBES_TO_RECV = 5; // ms const uint32_t MAX_PENDING_PROBES = 10; - // congestion const double MAX_QUEUING_DELAY = 100.0; // ms @@ -97,7 +110,7 @@ const double MAX_CACHED_PACKETS = 262144; // 2^18 // about 50 sec of traffic at 50Mbps // with 1200 bytes packets -const uint32_t MAX_ROUND_WHIOUT_PACKETS = (const uint32_t) +const uint32_t MAX_ROUND_WHIOUT_PACKETS = (20 * MILLI_IN_A_SEC) / ROUND_LEN; // 20 sec in rounds; // used in ldr diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc index a545225cb..c098088a3 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.cc +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -69,7 +69,7 @@ void RTCDataPath::insertOwdSample(int64_t owd) { if (avg_owd != DBL_MAX) avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); else { - avg_owd = (double)owd; + avg_owd = owd; } int64_t queueVal = owd - std::min(getMinOwd(), min_owd); @@ -77,7 +77,7 @@ void RTCDataPath::insertOwdSample(int64_t owd) { if (queuing_delay != DBL_MAX) queuing_delay = (queuing_delay * (1 - ALPHA_RTC)) + (queueVal * ALPHA_RTC); else { - queuing_delay = (double)queueVal; + queuing_delay = queueVal; } // keep track of the jitter computed as for RTP (RFC 3550) @@ -100,7 +100,7 @@ void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { largest_recv_seq_ = segment_number; largest_recv_seq_time_ = now; if (avg_inter_arrival_ == DBL_MAX) - avg_inter_arrival_ = (double)delta; + avg_inter_arrival_ = delta; else avg_inter_arrival_ = (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC); diff --git a/libtransport/src/protocols/rtc/rtc_indexer.h b/libtransport/src/protocols/rtc/rtc_indexer.h new file mode 100644 index 000000000..4aee242bb --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_indexer.h @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/errors.h> +#include <protocols/fec_utils.h> +#include <protocols/indexer.h> +#include <protocols/rtc/rtc_consts.h> +#include <protocols/transport_protocol.h> + +#include <deque> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +namespace rtc { + +template <uint32_t LIMIT = MIN_PROBE_SEQ> +class RtcIndexer : public Indexer { + public: + RtcIndexer(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport) + : Indexer(icn_socket, transport), + first_suffix_(1), + next_suffix_(first_suffix_), + fec_type_(fec::FECType::UNKNOWN), + n_fec_(0), + n_current_fec_(n_fec_) {} + + RtcIndexer(RtcIndexer &&other) : Indexer(std::forward<Indexer>(other)) {} + + ~RtcIndexer() {} + + void reset() override { + next_suffix_ = first_suffix_; + n_fec_ = 0; + } + + uint32_t checkNextSuffix() override { return next_suffix_; } + + uint32_t getNextSuffix() override { + if (isFec(next_suffix_)) { + if (n_current_fec_) { + auto ret = next_suffix_++; + n_current_fec_--; + return ret; + } else { + n_current_fec_ = n_fec_; + next_suffix_ = nextSource(next_suffix_); + } + } else if (!n_current_fec_) { + n_current_fec_ = n_fec_; + } + + return (next_suffix_++ % LIMIT); + } + + void setFirstSuffix(uint32_t suffix) override { + first_suffix_ = suffix % LIMIT; + } + + uint32_t getFirstSuffix() override { return first_suffix_; } + + uint32_t jumpToIndex(uint32_t index) override { + next_suffix_ = index % LIMIT; + return next_suffix_; + } + + void onContentObject(core::Interest &interest, + core::ContentObject &content_object, + bool reassembly) override { + setVerifier(); + auto ret = verifier_->verifyPackets(&content_object); + + switch (ret) { + case auth::VerificationPolicy::ACCEPT: { + if (reassembly) { + reassembly_->reassemble(content_object); + } + break; + } + + case auth::VerificationPolicy::UNKNOWN: + case auth::VerificationPolicy::DROP: { + transport_->onPacketDropped( + interest, content_object, + make_error_code(protocol_error::verification_failed)); + break; + } + + case auth::VerificationPolicy::ABORT: { + transport_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + } + + /** + * Retrieve the next segment to be reassembled. + */ + uint32_t getNextReassemblySegment() override { + throw errors::RuntimeException( + "Get reassembly segment called on rtc indexer. RTC indexer does not " + "provide " + "reassembly."); + } + + bool isFinalSuffixDiscovered() override { return true; } + + uint32_t getFinalSuffix() override { return LIMIT; } + + void enableFec(fec::FECType fec_type) override { fec_type_ = fec_type; } + + void disableFec() override { fec_type_ = fec::FECType::UNKNOWN; } + + void setNFec(uint32_t n_fec) override { + n_fec_ = n_fec; + n_current_fec_ = n_fec_; + } + + uint32_t getNFec() override { return n_fec_; } + + bool isFec(uint32_t index) override { + return isFec(fec_type_, index, first_suffix_); + } + + double getFecOverhead() override { + if (fec_type_ == fec::FECType::UNKNOWN) { + return 0; + } + + double k = (double)fec::FECUtils::getSourceSymbols(fec_type_); + return (double)n_fec_ / k; + } + + double getMaxFecOverhead() override { + if (fec_type_ == fec::FECType::UNKNOWN) { + return 0; + } + + double k = (double)fec::FECUtils::getSourceSymbols(fec_type_); + double n = (double)fec::FECUtils::getBlockSymbols(fec_type_); + return (double)(n - k) / k; + } + + static bool isFec(fec::FECType fec_type, uint32_t index, + uint32_t first_suffix) { + if (index < LIMIT) { + return fec::FECUtils::isFec(fec_type, index, first_suffix); + } + + return false; + } + + static uint32_t nextSource(fec::FECType fec_type, uint32_t index, + uint32_t first_suffix) { + return fec::FECUtils::nextSource(fec_type, index, first_suffix) % LIMIT; + } + + private: + uint32_t nextSource(uint32_t index) { + return nextSource(fec_type_, index, first_suffix_); + } + + private: + uint32_t first_suffix_; + uint32_t next_suffix_; + fec::FECType fec_type_; + bool fec_enabled_; + uint32_t n_fec_; + uint32_t n_current_fec_; +}; + +} // namespace rtc +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index 0ef381fe1..f0de48871 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <glog/logging.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_ldr.h> @@ -26,11 +27,13 @@ namespace protocol { namespace rtc { RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( - SendRtxCallback &&callback, asio::io_service &io_service) + Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service) : rtx_on_(false), + fec_on_(false), next_rtx_timer_(MAX_TIMER_RTX), last_event_(0), sentinel_timer_interval_(MAX_TIMER_RTX), + indexer_(indexer), send_rtx_callback_(std::move(callback)) { timer_ = std::make_unique<asio::steady_timer>(io_service); sentinel_timer_ = std::make_unique<asio::steady_timer>(io_service); @@ -40,7 +43,7 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} void RTCLossDetectionAndRecovery::turnOnRTX() { rtx_on_ = true; - scheduleSentinelTimer((uint32_t)(state_->getRTT() * CATCH_UP_RTT_INCREMENT)); + scheduleSentinelTimer(state_->getRTT() * CATCH_UP_RTT_INCREMENT); } void RTCLossDetectionAndRecovery::turnOffRTX() { @@ -48,6 +51,54 @@ void RTCLossDetectionAndRecovery::turnOffRTX() { clear(); } +uint32_t RTCLossDetectionAndRecovery::computeFecPacketsToAsk(bool in_sync) { + uint32_t current_fec = indexer_->getNFec(); + double current_loss_rate = state_->getLossRate(); + double last_loss_rate = state_->getLastRoundLossRate(); + + // when in sync ask for fec only if there are losses for 2 rounds + if (in_sync && current_fec == 0 && + (current_loss_rate == 0 || last_loss_rate == 0)) + return 0; + + double loss_rate = state_->getMaxLossRate() * 1.5; + + if (!in_sync && loss_rate == 0) loss_rate = 0.05; + if (loss_rate > 0.5) loss_rate = 0.5; + + double exp_losses = (double)k_ * loss_rate; + uint32_t fec_to_ask = ceil(exp_losses / (1 - loss_rate)); + + if (fec_to_ask > (n_ - k_)) fec_to_ask = n_ - k_; + + return fec_to_ask; +} + +void RTCLossDetectionAndRecovery::onNewRound(bool in_sync) { + uint64_t rtt = state_->getRTT(); + if (!fec_on_ && rtt >= 100) { + // turn on fec, here we may have no info so ask for all packets + fec_on_ = true; + turnOffRTX(); + indexer_->setNFec(computeFecPacketsToAsk(in_sync)); + return; + } + + if (fec_on_ && rtt > 80) { + // keep using fec, maybe update it + indexer_->setNFec(computeFecPacketsToAsk(in_sync)); + return; + } + + if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { + // turn on rtx + fec_on_ = false; + indexer_->setNFec(0); + turnOnRTX(); + return; + } +} + void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { // always add timeouts to the RTX list to avoid to send the same packet as if // it was not a rtx @@ -55,17 +106,23 @@ void RTCLossDetectionAndRecovery::onTimeout(uint32_t seq) { last_event_ = getNow(); } +void RTCLossDetectionAndRecovery::onPacketRecoveredFec(uint32_t seq) { + // if an RTX is scheduled for a packet recovered using FEC delete it + deleteRtx(seq); + recover_with_fec_.erase(seq); +} + void RTCLossDetectionAndRecovery::onDataPacketReceived( const core::ContentObject &content_object) { last_event_ = getNow(); uint32_t seq = content_object.getName().getSuffix(); if (deleteRtx(seq)) { - state_->onPacketRecovered(seq); + state_->onPacketRecoveredRtx(seq); } else { - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off - TRANSPORT_LOGD("received data. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received data. add from " + << state_->getHighestSeqReceivedInOrder() + 1 << " to " << seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, seq); } } @@ -76,8 +133,6 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( uint32_t seq = nack.getName().getSuffix(); - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off - struct nack_packet_t *nack_pkt = (struct nack_packet_t *)nack.getPayload()->data(); uint32_t production_seq = nack_pkt->getProductionSegement(); @@ -91,8 +146,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( // productionSeq = 14. 9 is lost but we can try to recover packets 12 13 and // 14 that are not arrived yet deleteRtx(seq); - TRANSPORT_LOGD("received past nack. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received past nack. add from " + << state_->getHighestSeqReceivedInOrder() + 1 + << " to " << production_seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } else { @@ -105,8 +161,9 @@ void RTCLossDetectionAndRecovery::onNackPacketReceived( // with productionSeq = 18. this says that all the packets between 12 and 18 // may got lost and we should ask them deleteRtx(seq); - TRANSPORT_LOGD("received futrue nack. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received futrue nack. add from " + << state_->getHighestSeqReceivedInOrder() + 1 + << " to " << production_seq; addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } @@ -117,12 +174,13 @@ void RTCLossDetectionAndRecovery::onProbePacketReceived( // we don't log the reception of a probe packet for the sentinel timer because // probes are not taken into account into the sync window. we use them as // future nacks to detect possible packets lost - if (TRANSPORT_EXPECT_FALSE(!rtx_on_)) return; // do not add if RTX is off struct nack_packet_t *probe_pkt = (struct nack_packet_t *)probe.getPayload()->data(); uint32_t production_seq = probe_pkt->getProductionSegement(); - TRANSPORT_LOGD("received probe. add from %u to %u ", - state_->getHighestSeqReceivedInOrder() + 1, production_seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "received probe. add from " + << state_->getHighestSeqReceivedInOrder() + 1 << " to " << production_seq; + addToRetransmissions(state_->getHighestSeqReceivedInOrder() + 1, production_seq); } @@ -150,20 +208,41 @@ void RTCLossDetectionAndRecovery::addToRetransmissions(uint32_t start, } for (uint32_t seq = start; seq < stop; seq++) { - if (!isRtx(seq) && // is not already an rtx - // is not received or lost - state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { - // add rtx - rtxState state; - state.first_send_ = state_->getInterestSentTime(seq); - if (state.first_send_ == 0) // this interest was never sent before - state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); - state.rtx_count_ = 0; - TRANSPORT_LOGD("add %u to retransmissions. next rtx is %lu ", seq, - (state.next_send_ - getNow())); - rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); - rtx_timers_.insert(std::pair<uint64_t, uint32_t>(state.next_send_, seq)); + if (state_->isReceivedOrLost(seq) == PacketState::UNKNOWN) { + if (rtx_on_) { + if (!indexer_->isFec(seq)) { + // handle it with rtx + if (!isRtx(seq)) { + state_->onLossDetected(seq); + rtxState state; + state.first_send_ = state_->getInterestSentTime(seq); + if (state.first_send_ == 0) // this interest was never sent before + state.first_send_ = getNow(); + state.next_send_ = computeNextSend(seq, true); + state.rtx_count_ = 0; + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "Add " << seq << " to retransmissions. next rtx is %lu " + << state.next_send_ - getNow(); + rtx_state_.insert(std::pair<uint32_t, rtxState>(seq, state)); + rtx_timers_.insert( + std::pair<uint64_t, uint32_t>(state.next_send_, seq)); + } + } else { + // is fec, do not send it + auto it = recover_with_fec_.find(seq); + if (it == recover_with_fec_.end()) { + state_->onLossDetected(seq); + recover_with_fec_.insert(seq); + } + } + } else { + // keep track of losses but recover with FEC + auto it = recover_with_fec_.find(seq); + if (it == recover_with_fec_.end()) { + state_->onLossDetected(seq); + recover_with_fec_.insert(seq); + } + } } } scheduleNextRtx(); @@ -182,13 +261,15 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, if (prod_rate != 0) { double packet_size = state_->getAveragePacketSize(); - estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); - jitter = (uint32_t)ceil(state_->getJitter()); + estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + jitter = ceil(state_->getJitter()); } uint32_t wait = estimated_iat + jitter; - TRANSPORT_LOGD("first rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u", - seq, wait, state_->getRTT(), estimated_iat, jitter); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "first rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat + << " jttr = " << jitter; return now + wait; } else { @@ -202,25 +283,26 @@ uint64_t RTCLossDetectionAndRecovery::computeNextSend(uint32_t seq, } double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); uint64_t rtt = state_->getRTT(); if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = (uint32_t)rtt; + wait = rtt; if (estimated_iat > rtt) wait = estimated_iat; - uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + uint32_t jitter = ceil(state_->getJitter()); wait += jitter; // it may happen that the channel is congested and we have some additional // queuing delay to take into account - uint32_t queue = (uint32_t)ceil(state_->getQueuing()); + uint32_t queue = ceil(state_->getQueuing()); wait += queue; - TRANSPORT_LOGD( - "next rtx for %u in %u ms, rtt = %lu ait = %u jttr = %u queue = %u", - seq, wait, state_->getRTT(), estimated_iat, jitter, queue); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next rtx for " << seq << " in " << wait + << " ms, rtt = " << state_->getRTT() << " ait = " << estimated_iat + << " jttr = " << jitter << " queue = " << queue; return now + wait; } @@ -235,7 +317,7 @@ void RTCLossDetectionAndRecovery::retransmit() { std::unordered_set<uint32_t> lost_pkt; uint32_t sent_counter = 0; while (it != rtx_timers_.end() && it->first <= now && - sent_counter < MAX_INTERESTS_IN_BATCH) { + sent_counter < MAX_RTX_IN_BATCH) { uint32_t seq = it->second; auto rtx_it = rtx_state_.find(seq); // this should always return a valid iter @@ -243,11 +325,11 @@ void RTCLossDetectionAndRecovery::retransmit() { (now - rtx_it->second.first_send_) >= RTC_MAX_AGE || seq < state_->getLastSeqNacked()) { // max rtx reached or packet too old or packet nacked, this packet is lost - TRANSPORT_LOGD( - "packet %u lost because 1) max rtx: %u 2) max age: %u 3) naked: %u", - seq, (rtx_it->second.rtx_count_ >= RTC_MAX_RTX), - ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE), - (seq < state_->getLastSeqNacked())); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "packet " << seq << " lost because 1) max rtx: " + << (rtx_it->second.rtx_count_ >= RTC_MAX_RTX) << " 2) max age: " + << ((now - rtx_it->second.first_send_) >= RTC_MAX_AGE) + << " 3) nacked: " << (seq < state_->getLastSeqNacked()); lost_pkt.insert(seq); it++; } else { @@ -259,8 +341,9 @@ void RTCLossDetectionAndRecovery::retransmit() { it = rtx_timers_.erase(it); rtx_timers_.insert( std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); - TRANSPORT_LOGD("send rtx for sequence %u, next send in %lu", seq, - (rtx_it->second.next_send_ - now)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "send rtx for sequence " << seq << ", next send in " + << (rtx_it->second.next_send_ - now); send_rtx_callback_(seq); sent_counter++; } @@ -358,20 +441,21 @@ void RTCLossDetectionAndRecovery::sentinelTimer() { if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { // this happens at the beginning (or if the producer stops for some // reason) we need to keep sending interest 0 until we get an answer - TRANSPORT_LOGD( - "sentinel timer: the producer is not active, send packet 0"); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "sentinel timer: the producer is not active, send packet 0"; state_->onRetransmission(0); send_rtx_callback_(0); } else { - TRANSPORT_LOGD( - "sentinel timer: the producer is active, send the 10 oldest packets"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "sentinel timer: the producer is active, " + "send the 10 oldest packets"; sent = true; uint32_t rtx = 0; auto it = state_->getPendingInterestsMapBegin(); auto end = state_->getPendingInterestsMapEnd(); while (it != end && rtx < MAX_RTX_WITH_SENTINEL) { uint32_t seq = it->first; - TRANSPORT_LOGD("sentinel timer, add %u to the rtx list", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "sentinel timer, add " << seq << " to the rtx list"; addToRetransmissions(seq, seq + 1); rtx++; it++; @@ -384,36 +468,38 @@ void RTCLossDetectionAndRecovery::sentinelTimer() { uint32_t next_timer; double prod_rate = state_->getProducerRate(); if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive()) || prod_rate == 0) { - TRANSPORT_LOGD("next timer in %u", SENTINEL_TIMER_INTERVAL); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "next timer in " << SENTINEL_TIMER_INTERVAL; next_timer = SENTINEL_TIMER_INTERVAL; } else { double prod_rate = state_->getProducerRate(); double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = (uint32_t)ceil(1000.0 / (prod_rate / packet_size)); - uint32_t jitter = (uint32_t)ceil(state_->getJitter()); + uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint32_t jitter = ceil(state_->getJitter()); // try to reduce the number of timers if the estimated IAT is too small next_timer = std::max((estimated_iat + jitter) * 20, (uint32_t)1); - TRANSPORT_LOGD("next sentinel in %u ms, rate: %f, iat: %u, jitter: %u", - next_timer, ((prod_rate * 8.0) / 1000000.0), estimated_iat, - jitter); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "next sentinel in " << next_timer + << " ms, rate: " << ((prod_rate * 8.0) / 1000000.0) + << ", iat: " << estimated_iat << ", jitter: " << jitter; if (!expired) { // discount the amout of time that is already passed - uint32_t discount = (uint32_t)(now - last_event_); + uint32_t discount = now - last_event_; if (next_timer > discount) { next_timer = next_timer - discount; } else { // in this case we trigger the timer in 1 ms next_timer = 1; } - TRANSPORT_LOGD("timer after discout: %u", next_timer); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "timer after discout: " << next_timer; } else if (sent) { // wait at least one producer stats interval + owd to check if the // production rate is reducing. - uint32_t min_wait = PRODUCER_STATS_INTERVAL + (uint32_t)ceil(state_->getQueuing()); + uint32_t min_wait = PRODUCER_STATS_INTERVAL + ceil(state_->getQueuing()); next_timer = std::max(next_timer, min_wait); - TRANSPORT_LOGD("wait for updates from prod, next timer: %u", next_timer); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "wait for updates from prod, next timer: " << next_timer; } } diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h index c0912303b..1b9f9afd6 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.h +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -15,15 +15,16 @@ #pragma once #include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> +#include <protocols/indexer.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_state.h> -#include <asio.hpp> -#include <asio/steady_timer.hpp> #include <functional> #include <map> +#include <unordered_map> namespace transport { @@ -43,16 +44,23 @@ class RTCLossDetectionAndRecovery using SendRtxCallback = std::function<void(uint32_t)>; public: - RTCLossDetectionAndRecovery(SendRtxCallback &&callback, + RTCLossDetectionAndRecovery(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service); ~RTCLossDetectionAndRecovery(); void setState(std::shared_ptr<RTCState> state) { state_ = state; } + void setFecParams(uint32_t n, uint32_t k) { + n_ = n; + k_ = k; + } void turnOnRTX(); void turnOffRTX(); + bool isRtxOn() { return rtx_on_; } + void onNewRound(bool in_sync); void onTimeout(uint32_t seq); + void onPacketRecoveredFec(uint32_t seq); void onDataPacketReceived(const core::ContentObject &content_object); void onNackPacketReceived(const core::ContentObject &nack); void onProbePacketReceived(const core::ContentObject &probe); @@ -72,6 +80,7 @@ class RTCLossDetectionAndRecovery bool deleteRtx(uint32_t seq); void scheduleSentinelTimer(uint64_t expires_from_now); void sentinelTimer(); + uint32_t computeFecPacketsToAsk(bool in_sync); uint64_t getNow() { using namespace std::chrono; @@ -90,14 +99,25 @@ class RTCLossDetectionAndRecovery // should be sent, and the val is the interest seq number std::multimap<uint64_t, uint32_t> rtx_timers_; + // lost packets that will be recovered with fec + std::unordered_set<uint32_t> recover_with_fec_; + bool rtx_on_; + bool fec_on_; uint64_t next_rtx_timer_; uint64_t last_event_; uint64_t sentinel_timer_interval_; + + // fec params + uint32_t n_; + uint32_t k_; + std::unique_ptr<asio::steady_timer> timer_; std::unique_ptr<asio::steady_timer> sentinel_timer_; std::shared_ptr<RTCState> state_; + Indexer *indexer_; + SendRtxCallback send_rtx_callback_; }; diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h index 2f2b19fb9..7dc2f82c3 100644 --- a/libtransport/src/protocols/rtc/rtc_packet.h +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -90,4 +90,4 @@ struct nack_packet_t { } // end namespace protocol -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.cc b/libtransport/src/protocols/rtc/rtc_rc_frame.cc deleted file mode 100644 index b577b5bea..000000000 --- a/libtransport/src/protocols/rtc/rtc_rc_frame.cc +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <protocols/rtc/rtc_consts.h> -#include <protocols/rtc/rtc_rc_frame.h> - -#include <algorithm> - -namespace transport { - -namespace protocol { - -namespace rtc { - -RTCRateControlFrame::RTCRateControlFrame() : cc_detector_() {} - -RTCRateControlFrame::~RTCRateControlFrame() {} - -void RTCRateControlFrame::onNewRound(double round_len) { - if (!rc_on_) return; - - CongestionState prev_congestion_state = congestion_state_; - cc_detector_.updateStats(); - congestion_state_ = (CongestionState)cc_detector_.getState(); - - if (congestion_state_ == CongestionState::Congested) { - if (prev_congestion_state == CongestionState::Normal) { - // congestion detected, notify app and init congestion win - double prod_rate = protocol_state_->getReceivedRate(); - double rtt = (double)protocol_state_->getRTT() / MILLI_IN_A_SEC; - double packet_size = protocol_state_->getAveragePacketSize(); - - if (prod_rate == 0.0 || rtt == 0.0 || packet_size == 0.0) { - // TODO do something - return; - } - - congestion_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); - } - uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; - congestion_win_ = std::max(win, WIN_MIN); - return; - } -} - -void RTCRateControlFrame::onDataPacketReceived( - const core::ContentObject &content_object) { - if (!rc_on_) return; - - uint32_t seq = content_object.getName().getSuffix(); - if (!protocol_state_->isPending(seq)) return; - - cc_detector_.addPacket(content_object); -} - -void RTCRateControlFrame::receivedBwProbeTrain(uint64_t firts_probe_ts, - uint64_t last_probe_ts, - uint32_t total_probes) { - // TODO - return; -} - -} // end namespace rtc - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_frame.h b/libtransport/src/protocols/rtc/rtc_rc_frame.h deleted file mode 100644 index 25d5ddbb6..000000000 --- a/libtransport/src/protocols/rtc/rtc_rc_frame.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2017-2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once -#include <protocols/rtc/congestion_detection.h> -#include <protocols/rtc/rtc_rc.h> - -namespace transport { - -namespace protocol { - -namespace rtc { - -class RTCRateControlFrame : public RTCRateControl { - public: - RTCRateControlFrame(); - - ~RTCRateControlFrame(); - - void onNewRound(double round_len); - void onDataPacketReceived(const core::ContentObject &content_object); - - void receivedBwProbeTrain(uint64_t firts_probe_ts, uint64_t last_probe_ts, - uint32_t total_probes); - - private: - CongestionDetection cc_detector_; -}; - -} // end namespace rtc - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_rc_queue.cc b/libtransport/src/protocols/rtc/rtc_rc_queue.cc index 3c7318dae..a1c89e329 100644 --- a/libtransport/src/protocols/rtc/rtc_rc_queue.cc +++ b/libtransport/src/protocols/rtc/rtc_rc_queue.cc @@ -67,11 +67,11 @@ void RTCRateControlQueue::onNewRound(double round_len) { if (prev_congestion_state == CongestionState::Normal) { // init the congetion window using the received rate congestion_win_ = (uint32_t)ceil(received_rate * rtt / packet_size); - rounds_since_last_drop_ = (uint32_t)ROUNDS_BEFORE_TAKE_ACTION + 1; + rounds_since_last_drop_ = ROUNDS_BEFORE_TAKE_ACTION + 1; } if (rounds_since_last_drop_ >= ROUNDS_BEFORE_TAKE_ACTION) { - uint32_t win = congestion_win_ * (uint32_t)WIN_DECREASE_FACTOR; + uint32_t win = congestion_win_ * WIN_DECREASE_FACTOR; congestion_win_ = std::max(win, WIN_MIN); rounds_since_last_drop_ = 0; return; @@ -88,7 +88,7 @@ void RTCRateControlQueue::onNewRound(double round_len) { rounds_without_congestion_++; if (rounds_without_congestion_ < ROUNDS_BEFORE_TAKE_ACTION) return; - congestion_win_ = congestion_win_ * (uint32_t)WIN_INCREASE_FACTOR; + congestion_win_ = congestion_win_ * WIN_INCREASE_FACTOR; congestion_win_ = std::min(congestion_win_, INITIAL_WIN_MAX); } } diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.h b/libtransport/src/protocols/rtc/rtc_reassembly.h new file mode 100644 index 000000000..15722a6d5 --- /dev/null +++ b/libtransport/src/protocols/rtc/rtc_reassembly.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <glog/logging.h> +#include <protocols/datagram_reassembly.h> +#include <protocols/rtc/rtc_consts.h> + +namespace transport { + +namespace protocol { + +namespace rtc { + +class RtcReassembly : public DatagramReassembly { + public: + RtcReassembly(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : DatagramReassembly(icn_socket, transport_protocol) {} + + void reassemble(core::ContentObject &content_object) override { + auto read_buffer = content_object.getPayload(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Size of payload: " << read_buffer->length(); + read_buffer->trimStart(transport_protocol_->transportHeaderLength()); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); + } +}; + +} // namespace rtc +} // namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 9c965bfed..c99205a26 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <glog/logging.h> #include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_state.h> @@ -22,11 +23,13 @@ namespace protocol { namespace rtc { -RTCState::RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, +RTCState::RTCState(Indexer *indexer, + ProbeHandler::SendProbeCallback &&rtt_probes_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service) - : rtt_probes_(std::make_shared<ProbeHandler>( - std::move(rtt_probes_callback), io_service)), + : indexer_(indexer), + rtt_probes_(std::make_shared<ProbeHandler>(std::move(rtt_probes_callback), + io_service)), discovered_rtt_callback_(std::move(discovered_rtt_callback)) { init_rtt_timer_ = std::make_unique<asio::steady_timer>(io_service); initParams(); @@ -45,14 +48,22 @@ void RTCState::initParams() { // loss counters packets_lost_ = 0; + definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = 0; highest_seq_received_ = 0; highest_seq_received_in_order_ = 0; last_seq_nacked_ = 0; loss_rate_ = 0.0; + avg_loss_rate_ = 0.0; + max_loss_rate_ = 0.0; + last_round_loss_rate_ = 0.0; residual_loss_rate_ = 0.0; + // fec counters + pending_fec_pkt_ = 0; + received_fec_pkt_ = 0; + // bw counters received_bytes_ = 0; avg_packet_size_ = INIT_PACKET_SIZE; @@ -90,8 +101,14 @@ void RTCState::initParams() { // pending interests pending_interests_.clear(); + // skipped interest + last_interest_sent_ = 0; + skipped_interests_.clear(); + // init rtt - first_interest_sent_ = ~0; + first_interest_sent_time_ = ~0; + first_interest_sent_seq_ = 0; + init_rtt_ = false; rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); rtt_probes_->sendProbes(); @@ -106,18 +123,51 @@ void RTCState::onSendNewInterest(const core::Name *interest_name) { uint32_t seq = interest_name->getSuffix(); pending_interests_.insert(std::pair<uint32_t, uint64_t>(seq, now)); - if(sent_interests_ == 0) first_interest_sent_ = now; + if (sent_interests_ == 0) { + first_interest_sent_time_ = now; + first_interest_sent_seq_ = seq; + } + + if (indexer_->isFec(seq)) { + pending_fec_pkt_++; + } + + if (last_interest_sent_ == 0 && seq != 0) { + last_interest_sent_ = seq; // init last interest sent + } + + // TODO what happen in case of jumps? + // look for skipped interests + skipped_interests_.erase(seq); // remove seq if it is there + for (uint32_t i = last_interest_sent_ + 1; i < seq; i++) { + if (indexer_->isFec(i)) { + skipped_interests_.insert(i); + } + } + + last_interest_sent_ = seq; sent_interests_++; sent_interests_last_round_++; } -void RTCState::onTimeout(uint32_t seq) { +void RTCState::onTimeout(uint32_t seq, bool lost) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); } received_timeouts_++; + + if (lost) onPacketLost(seq); +} + +void RTCState::onLossDetected(uint32_t seq) { + if (!indexer_->isFec(seq)) { + packets_lost_++; + } else if (skipped_interests_.find(seq) == skipped_interests_.end() && + seq >= first_interest_sent_seq_) { + packets_lost_++; + } } void RTCState::onRetransmission(uint32_t seq) { @@ -128,7 +178,9 @@ void RTCState::onRetransmission(uint32_t seq) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { pending_interests_.erase(it); +#if 0 packets_lost_++; +#endif } sent_rtx_++; sent_rtx_last_round_++; @@ -165,6 +217,16 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, received_packets_last_round_++; } +void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { + uint32_t seq = content_object.getName().getSuffix(); + updateReceivedBytes(content_object); + addRecvOrLost(seq, PacketState::RECEIVED); + received_fec_pkt_++; + // the producer is responding + // it is generating valid data packets so we consider it active + producer_is_active_ = true; +} + void RTCState::onNackPacketReceived(const core::ContentObject &nack, bool compute_stats) { uint32_t seq = nack.getName().getSuffix(); @@ -197,12 +259,14 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, // old nack, seq is lost // update last nacked if (last_seq_nacked_ < seq) last_seq_nacked_ = seq; - TRANSPORT_LOGD("lost packet %u beacuse of a past nack", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "lost packet " << seq << " beacuse of a past nack"; onPacketLost(seq); } else if (seq > production_seq) { // future nack // remove the nack from the pending interest map // (the packet is not received/lost yet) + if (indexer_->isFec(seq)) pending_fec_pkt_--; pending_interests_.erase(seq); } else { // this should be a quite rear event. simply remove the @@ -221,17 +285,28 @@ void RTCState::onNackPacketReceived(const core::ContentObject &nack, } void RTCState::onPacketLost(uint32_t seq) { - TRANSPORT_LOGD("packet %u is lost", seq); +#if 0 + DLOG_IF(INFO, VLOG_IS_ON(3)) << "packet " << seq << " is lost"; auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { // this packet was never retransmitted so it does // not appear in the loss count packets_lost_++; } +#endif + if (!indexer_->isFec(seq)) { + definitely_lost_pkt_++; + DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; + } addRecvOrLost(seq, PacketState::LOST); } -void RTCState::onPacketRecovered(uint32_t seq) { +void RTCState::onPacketRecoveredRtx(uint32_t seq) { + losses_recovered_++; + addRecvOrLost(seq, PacketState::RECEIVED); +} + +void RTCState::onPacketRecoveredFec(uint32_t seq) { losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); } @@ -258,7 +333,6 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { uint32_t production_seq = probe_pkt->getProductionSegement(); uint32_t production_rate = probe_pkt->getProductionRate(); - if (path_it == path_table_.end()) { // found a new path std::shared_ptr<RTCDataPath> newPath = @@ -298,13 +372,14 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { // wait forever received_probes_++; - if(!init_rtt_ && received_probes_ <= INIT_RTT_PROBES){ - if(received_probes_ == 1){ - // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the others + if (!init_rtt_ && received_probes_ <= INIT_RTT_PROBES) { + if (received_probes_ == 1) { + // we got the first probe, wait at most INIT_RTT_PROBE_WAIT sec for the + // others main_path_ = path; setInitRttTimer(INIT_RTT_PROBE_WAIT); } - if(received_probes_ == INIT_RTT_PROBES) { + if (received_probes_ == INIT_RTT_PROBES) { // we are done init_rtt_timer_->cancel(); checkInitRttTimer(); @@ -314,7 +389,7 @@ bool RTCState::onProbePacketReceived(const core::ContentObject &probe) { received_packets_last_round_++; // ignore probes sent before the first interest - if((now - rtt) <= first_interest_sent_) return false; + if ((now - rtt) <= first_interest_sent_time_) return false; return true; } @@ -327,11 +402,11 @@ void RTCState::onNewRound(double round_len, bool in_sync) { double bytes_per_sec = ((double)received_bytes_ * (MILLI_IN_A_SEC / round_len)); - if(received_rate_ == 0) + if (received_rate_ == 0) received_rate_ = bytes_per_sec; else received_rate_ = (received_rate_ * MOVING_AVG_ALPHA) + - ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); + ((1 - MOVING_AVG_ALPHA) * bytes_per_sec); // search for an active path. There should be only one active path (meaning a // path that leads to the producer socket -no cache- and from which we are @@ -354,7 +429,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { } } - if (in_sync) updateLossRate(); + // if (in_sync) updateLossRate(); + updateLossRate(); // handle nacks if (!nack_on_last_round_ && received_bytes_ > 0) { @@ -385,6 +461,7 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // reset counters received_bytes_ = 0; packets_lost_ = 0; + definitely_lost_pkt_ = 0; losses_recovered_ = 0; first_seq_in_round_ = highest_seq_received_; @@ -397,6 +474,8 @@ void RTCState::onNewRound(double round_len, bool in_sync) { sent_interests_last_round_ = 0; sent_rtx_last_round_ = 0; + received_fec_pkt_ = 0; + rounds_++; } @@ -465,6 +544,7 @@ void RTCState::updatePathStats(const core::ContentObject &content_object, } void RTCState::updateLossRate() { + last_round_loss_rate_ = loss_rate_; loss_rate_ = 0.0; residual_loss_rate_ = 0.0; @@ -475,9 +555,29 @@ void RTCState::updateLossRate() { // division by 0 if (number_theorically_received_packets_ == 0) return; + // XXX this may be quite inefficient if the rate is high + // maybe is better to iterate over the set? + for (uint32_t i = first_seq_in_round_; i < highest_seq_received_; i++) { + auto it = skipped_interests_.find(i); + if (it != skipped_interests_.end()) { + if (number_theorically_received_packets_ > 0) + number_theorically_received_packets_--; + skipped_interests_.erase(it); + } + } + loss_rate_ = (double)((double)(packets_lost_) / (double)number_theorically_received_packets_); + if (rounds_ % 15 == 0) max_loss_rate_ = 0; // reset every 3 sec + if (loss_rate_ > max_loss_rate_) max_loss_rate_ = loss_rate_; + + if (avg_loss_rate_ == 0) + avg_loss_rate_ = loss_rate_; + else + avg_loss_rate_ = + avg_loss_rate_ * MOVING_AVG_ALPHA + loss_rate_ * (1 - MOVING_AVG_ALPHA); + residual_loss_rate_ = (double)((double)(packets_lost_ - losses_recovered_) / (double)number_theorically_received_packets_); @@ -485,6 +585,10 @@ void RTCState::updateLossRate() { } void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { + if (indexer_->isFec(seq)) { + pending_fec_pkt_--; + } + pending_interests_.erase(seq); if (received_or_lost_packets_.size() >= MAX_CACHED_PACKETS) { received_or_lost_packets_.erase(received_or_lost_packets_.begin()); @@ -507,10 +611,12 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { // 1) there is a gap in the sequence so we do not update largest_in_seq_ // 2) all the packets from largest_in_seq_ to seq are in // received_or_lost_packets_ an we upate largest_in_seq_ + // or are FEC packets for (uint32_t i = highest_seq_received_in_order_ + 1; i <= seq; i++) { if (received_or_lost_packets_.find(i) == - received_or_lost_packets_.end()) { + received_or_lost_packets_.end() && + !indexer_->isFec(i)) { break; } // this packet is in order so we can update the @@ -520,17 +626,17 @@ void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { } } -void RTCState::setInitRttTimer(uint32_t wait){ +void RTCState::setInitRttTimer(uint32_t wait) { init_rtt_timer_->cancel(); init_rtt_timer_->expires_from_now(std::chrono::milliseconds(wait)); init_rtt_timer_->async_wait([this](std::error_code ec) { - if(ec) return; + if (ec) return; checkInitRttTimer(); }); } void RTCState::checkInitRttTimer() { - if(received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV){ + if (received_probes_ < INIT_RTT_MIN_PROBES_TO_RECV) { // we didn't received enough probes, restart received_probes_ = 0; rtt_probes_->setProbes(INIT_RTT_PROBE_INTERVAL, INIT_RTT_PROBES); @@ -547,7 +653,7 @@ void RTCState::checkInitRttTimer() { double prod_rate = getProducerRate(); double rtt = (double)getRTT() / MILLI_IN_A_SEC; double packet_size = getAveragePacketSize(); - uint32_t pkt_in_rtt_ = (uint32_t)std::floor(((prod_rate / packet_size) * rtt) * 0.8); + uint32_t pkt_in_rtt_ = std::floor(((prod_rate / packet_size) * rtt) * 0.8); last_seq_nacked_ = last_production_seq_ + pkt_in_rtt_; discovered_rtt_callback_(); diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index e4fefaffe..729ba7a1b 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -15,13 +15,13 @@ #pragma once #include <hicn/transport/config.h> +#include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/name.h> +#include <protocols/indexer.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_data_path.h> -#include <asio.hpp> -#include <asio/steady_timer.hpp> #include <map> #include <set> @@ -36,8 +36,10 @@ enum class PacketState : uint8_t { RECEIVED, LOST, UNKNOWN }; class RTCState : std::enable_shared_from_this<RTCState> { public: using DiscoveredRttCallback = std::function<void()>; + public: - RTCState(ProbeHandler::SendProbeCallback &&rtt_probes_callback, + RTCState(Indexer *indexer, + ProbeHandler::SendProbeCallback &&rtt_probes_callback, DiscoveredRttCallback &&discovered_rtt_callback, asio::io_service &io_service); @@ -45,14 +47,17 @@ class RTCState : std::enable_shared_from_this<RTCState> { // packet events void onSendNewInterest(const core::Name *interest_name); - void onTimeout(uint32_t seq); + void onTimeout(uint32_t seq, bool lost); + void onLossDetected(uint32_t seq); void onRetransmission(uint32_t seq); void onDataPacketReceived(const core::ContentObject &content_object, bool compute_stats); + void onFecPacketReceived(const core::ContentObject &content_object); void onNackPacketReceived(const core::ContentObject &nack, bool compute_stats); void onPacketLost(uint32_t seq); - void onPacketRecovered(uint32_t seq); + void onPacketRecoveredRtx(uint32_t seq); + void onPacketRecoveredFec(uint32_t seq); bool onProbePacketReceived(const core::ContentObject &probe); // protocol state @@ -65,9 +70,7 @@ class RTCState : std::enable_shared_from_this<RTCState> { } // delay metrics - bool isRttDiscovered() const { - return init_rtt_; - } + bool isRttDiscovered() const { return init_rtt_; } uint64_t getRTT() const { if (mainPathIsValid()) return main_path_->getMinRtt(); @@ -97,13 +100,16 @@ class RTCState : std::enable_shared_from_this<RTCState> { if (it != pending_interests_.end()) return it->second; return 0; } + bool isPending(uint32_t seq) { if (pending_interests_.find(seq) != pending_interests_.end()) return true; return false; } + uint32_t getPendingInterestNumber() const { - return (uint32_t)pending_interests_.size(); + return pending_interests_.size(); } + PacketState isReceivedOrLost(uint32_t seq) { auto it = received_or_lost_packets_.find(seq); if (it != received_or_lost_packets_.end()) return it->second; @@ -112,12 +118,25 @@ class RTCState : std::enable_shared_from_this<RTCState> { // loss rate double getLossRate() const { return loss_rate_; } + double getAvgLossRate() const { return avg_loss_rate_; } + double getMaxLossRate() const { return max_loss_rate_; } + double getLastRoundLossRate() const { return last_round_loss_rate_; } double getResidualLossRate() const { return residual_loss_rate_; } + + uint32_t getLostData() const { return packets_lost_; }; + uint32_t getRecoveredLosses() const { return losses_recovered_; } + + uint32_t getDefinitelyLostPackets() const { return definitely_lost_pkt_; } + + uint32_t getHighestSeqReceived() const { return highest_seq_received_; } + uint32_t getHighestSeqReceivedInOrder() const { return highest_seq_received_in_order_; } - uint32_t getLostData() const { return packets_lost_; }; - uint32_t getRecoveredLosses() const { return losses_recovered_; } + + // fec packets + uint32_t getReceivedFecPackets() const { return received_fec_pkt_; } + uint32_t getPendingFecPackets() const { return pending_fec_pkt_; } // generic stats uint32_t getReceivedBytesInRound() const { return received_bytes_; } @@ -183,11 +202,15 @@ class RTCState : std::enable_shared_from_this<RTCState> { // loss counters int32_t packets_lost_; int32_t losses_recovered_; + uint32_t definitely_lost_pkt_; uint32_t first_seq_in_round_; uint32_t highest_seq_received_; uint32_t highest_seq_received_in_order_; uint32_t last_seq_nacked_; // segment for which we got an oldNack double loss_rate_; + double avg_loss_rate_; + double max_loss_rate_; + double last_round_loss_rate_; double residual_loss_rate_; // bw counters @@ -211,18 +234,24 @@ class RTCState : std::enable_shared_from_this<RTCState> { uint32_t sent_interests_last_round_; uint32_t sent_rtx_last_round_; + // fec counter + uint32_t received_fec_pkt_; + uint32_t pending_fec_pkt_; + // round conunters uint32_t rounds_; uint32_t rounds_without_nacks_; uint32_t rounds_without_packets_; // init rtt - uint64_t first_interest_sent_; + uint64_t first_interest_sent_time_; + uint32_t first_interest_sent_seq_; // producer state bool producer_is_active_; // the prodcuer is active if we receive some packets - uint32_t last_production_seq_; // last production seq received by the producer + uint32_t + last_production_seq_; // last production seq received by the producer uint64_t last_prod_update_; // timestamp of the last packets used to update // stats from the producer @@ -237,6 +266,13 @@ class RTCState : std::enable_shared_from_this<RTCState> { // pending interests std::map<uint32_t, uint64_t> pending_interests_; + // indexer + Indexer *indexer_; + + // skipped interests + uint32_t last_interest_sent_; + std::unordered_set<uint32_t> skipped_interests_; + // probes std::shared_ptr<ProbeHandler> rtt_probes_; bool init_rtt_; diff --git a/libtransport/src/protocols/rtc/trendline_estimator.cc b/libtransport/src/protocols/rtc/trendline_estimator.cc deleted file mode 100644 index 7a0803857..000000000 --- a/libtransport/src/protocols/rtc/trendline_estimator.cc +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -// FROM -// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.cc - -#include "trendline_estimator.h" - -#include <math.h> - -#include <algorithm> -#include <string> - -namespace transport { - -namespace protocol { - -namespace rtc { - -// Parameters for linear least squares fit of regression line to noisy data. -constexpr double kDefaultTrendlineSmoothingCoeff = 0.9; -constexpr double kDefaultTrendlineThresholdGain = 4.0; -// const char kBweWindowSizeInPacketsExperiment[] = -// "WebRTC-BweWindowSizeInPackets"; - -/*size_t ReadTrendlineFilterWindowSize( - const WebRtcKeyValueConfig* key_value_config) { - std::string experiment_string = - key_value_config->Lookup(kBweWindowSizeInPacketsExperiment); - size_t window_size; - int parsed_values = - sscanf(experiment_string.c_str(), "Enabled-%zu", &window_size); - if (parsed_values == 1) { - if (window_size > 1) - return window_size; - RTC_LOG(WARNING) << "Window size must be greater than 1."; - } - RTC_LOG(LS_WARNING) << "Failed to parse parameters for BweWindowSizeInPackets" - " experiment from field trial string. Using default."; - return TrendlineEstimatorSettings::kDefaultTrendlineWindowSize; -} -*/ - -OptionalDouble LinearFitSlope( - const std::deque<TrendlineEstimator::PacketTiming>& packets) { - // RTC_DCHECK(packets.size() >= 2); - // Compute the "center of mass". - double sum_x = 0; - double sum_y = 0; - for (const auto& packet : packets) { - sum_x += packet.arrival_time_ms; - sum_y += packet.smoothed_delay_ms; - } - double x_avg = sum_x / packets.size(); - double y_avg = sum_y / packets.size(); - // Compute the slope k = \sum (x_i-x_avg)(y_i-y_avg) / \sum (x_i-x_avg)^2 - double numerator = 0; - double denominator = 0; - for (const auto& packet : packets) { - double x = packet.arrival_time_ms; - double y = packet.smoothed_delay_ms; - numerator += (x - x_avg) * (y - y_avg); - denominator += (x - x_avg) * (x - x_avg); - } - if (denominator == 0) return OptionalDouble(); - return OptionalDouble(numerator / denominator); -} - -OptionalDouble ComputeSlopeCap( - const std::deque<TrendlineEstimator::PacketTiming>& packets, - const TrendlineEstimatorSettings& settings) { - /*RTC_DCHECK(1 <= settings.beginning_packets && - settings.beginning_packets < packets.size()); - RTC_DCHECK(1 <= settings.end_packets && - settings.end_packets < packets.size()); - RTC_DCHECK(settings.beginning_packets + settings.end_packets <= - packets.size());*/ - TrendlineEstimator::PacketTiming early = packets[0]; - for (size_t i = 1; i < settings.beginning_packets; ++i) { - if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i]; - } - size_t late_start = packets.size() - settings.end_packets; - TrendlineEstimator::PacketTiming late = packets[late_start]; - for (size_t i = late_start + 1; i < packets.size(); ++i) { - if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i]; - } - if (late.arrival_time_ms - early.arrival_time_ms < 1) { - return OptionalDouble(); - } - return OptionalDouble((late.raw_delay_ms - early.raw_delay_ms) / - (late.arrival_time_ms - early.arrival_time_ms) + - settings.cap_uncertainty); -} - -constexpr double kMaxAdaptOffsetMs = 15.0; -constexpr double kOverUsingTimeThreshold = 10; -constexpr int kMinNumDeltas = 60; -constexpr int kDeltaCounterMax = 1000; - -//} // namespace - -constexpr char TrendlineEstimatorSettings::kKey[]; - -TrendlineEstimatorSettings::TrendlineEstimatorSettings( - /*const WebRtcKeyValueConfig* key_value_config*/) { - /*if (absl::StartsWith( - key_value_config->Lookup(kBweWindowSizeInPacketsExperiment), - "Enabled")) { - window_size = ReadTrendlineFilterWindowSize(key_value_config); - } - Parser()->Parse(key_value_config->Lookup(TrendlineEstimatorSettings::kKey));*/ - window_size = kDefaultTrendlineWindowSize; - enable_cap = false; - beginning_packets = end_packets = 0; - cap_uncertainty = 0.0; - - /*if (window_size < 10 || 200 < window_size) { - RTC_LOG(LS_WARNING) << "Window size must be between 10 and 200 packets"; - window_size = kDefaultTrendlineWindowSize; - } - if (enable_cap) { - if (beginning_packets < 1 || end_packets < 1 || - beginning_packets > window_size || end_packets > window_size) { - RTC_LOG(LS_WARNING) << "Size of beginning and end must be between 1 and " - << window_size; - enable_cap = false; - beginning_packets = end_packets = 0; - cap_uncertainty = 0.0; - } - if (beginning_packets + end_packets > window_size) { - RTC_LOG(LS_WARNING) - << "Size of beginning plus end can't exceed the window size"; - enable_cap = false; - beginning_packets = end_packets = 0; - cap_uncertainty = 0.0; - } - if (cap_uncertainty < 0.0 || 0.025 < cap_uncertainty) { - RTC_LOG(LS_WARNING) << "Cap uncertainty must be between 0 and 0.025"; - cap_uncertainty = 0.0; - } - }*/ -} - -/*std::unique_ptr<StructParametersParser> TrendlineEstimatorSettings::Parser() { - return StructParametersParser::Create("sort", &enable_sort, // - "cap", &enable_cap, // - "beginning_packets", - &beginning_packets, // - "end_packets", &end_packets, // - "cap_uncertainty", &cap_uncertainty, // - "window_size", &window_size); -}*/ - -TrendlineEstimator::TrendlineEstimator( - /*const WebRtcKeyValueConfig* key_value_config, - NetworkStatePredictor* network_state_predictor*/) - : settings_(), - smoothing_coef_(kDefaultTrendlineSmoothingCoeff), - threshold_gain_(kDefaultTrendlineThresholdGain), - num_of_deltas_(0), - first_arrival_time_ms_(-1), - accumulated_delay_(0), - smoothed_delay_(0), - delay_hist_(), - k_up_(0.0087), - k_down_(0.039), - overusing_time_threshold_(kOverUsingTimeThreshold), - threshold_(12.5), - prev_modified_trend_(NAN), - last_update_ms_(-1), - prev_trend_(0.0), - time_over_using_(-1), - overuse_counter_(0), - hypothesis_(BandwidthUsage::kBwNormal){ - // hypothesis_predicted_(BandwidthUsage::kBwNormal){//}, - // network_state_predictor_(network_state_predictor) { - /* RTC_LOG(LS_INFO) - << "Using Trendline filter for delay change estimation with settings " - << settings_.Parser()->Encode() << " and " - // << (network_state_predictor_ ? "injected" : "no") - << " network state predictor";*/ -} - -TrendlineEstimator::~TrendlineEstimator() {} - -void TrendlineEstimator::UpdateTrendline(double recv_delta_ms, - double send_delta_ms, - int64_t send_time_ms, - int64_t arrival_time_ms, - size_t packet_size) { - const double delta_ms = recv_delta_ms - send_delta_ms; - ++num_of_deltas_; - num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax); - if (first_arrival_time_ms_ == -1) first_arrival_time_ms_ = arrival_time_ms; - - // Exponential backoff filter. - accumulated_delay_ += delta_ms; - // BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms, - // accumulated_delay_); - smoothed_delay_ = smoothing_coef_ * smoothed_delay_ + - (1 - smoothing_coef_) * accumulated_delay_; - // BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms, - // smoothed_delay_); - - // Maintain packet window - delay_hist_.emplace_back( - static_cast<double>(arrival_time_ms - first_arrival_time_ms_), - smoothed_delay_, accumulated_delay_); - if (settings_.enable_sort) { - for (size_t i = delay_hist_.size() - 1; - i > 0 && - delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms; - --i) { - std::swap(delay_hist_[i], delay_hist_[i - 1]); - } - } - if (delay_hist_.size() > settings_.window_size) delay_hist_.pop_front(); - - // Simple linear regression. - double trend = prev_trend_; - if (delay_hist_.size() == settings_.window_size) { - // Update trend_ if it is possible to fit a line to the data. The delay - // trend can be seen as an estimate of (send_rate - capacity)/capacity. - // 0 < trend < 1 -> the delay increases, queues are filling up - // trend == 0 -> the delay does not change - // trend < 0 -> the delay decreases, queues are being emptied - OptionalDouble trendO = LinearFitSlope(delay_hist_); - if (trendO.has_value()) trend = trendO.value(); - if (settings_.enable_cap) { - OptionalDouble cap = ComputeSlopeCap(delay_hist_, settings_); - // We only use the cap to filter out overuse detections, not - // to detect additional underuses. - if (trend >= 0 && cap.has_value() && trend > cap.value()) { - trend = cap.value(); - } - } - } - // BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend); - - Detect(trend, send_delta_ms, arrival_time_ms); -} - -void TrendlineEstimator::Update(double recv_delta_ms, double send_delta_ms, - int64_t send_time_ms, int64_t arrival_time_ms, - size_t packet_size, bool calculated_deltas) { - if (calculated_deltas) { - UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms, - packet_size); - } - /*if (network_state_predictor_) { - hypothesis_predicted_ = network_state_predictor_->Update( - send_time_ms, arrival_time_ms, hypothesis_); - }*/ -} - -BandwidthUsage TrendlineEstimator::State() const { - return /*network_state_predictor_ ? hypothesis_predicted_ :*/ hypothesis_; -} - -void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) { - /*if (num_of_deltas_ < 2) { - hypothesis_ = BandwidthUsage::kBwNormal; - return; - }*/ - - const double modified_trend = - std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_; - prev_modified_trend_ = modified_trend; - // BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend); - // BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_); - if (modified_trend > threshold_) { - if (time_over_using_ == -1) { - // Initialize the timer. Assume that we've been - // over-using half of the time since the previous - // sample. - time_over_using_ = ts_delta / 2; - } else { - // Increment timer - time_over_using_ += ts_delta; - } - overuse_counter_++; - if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) { - if (trend >= prev_trend_) { - time_over_using_ = 0; - overuse_counter_ = 0; - hypothesis_ = BandwidthUsage::kBwOverusing; - } - } - } else if (modified_trend < -threshold_) { - time_over_using_ = -1; - overuse_counter_ = 0; - hypothesis_ = BandwidthUsage::kBwUnderusing; - } else { - time_over_using_ = -1; - overuse_counter_ = 0; - hypothesis_ = BandwidthUsage::kBwNormal; - } - prev_trend_ = trend; - UpdateThreshold(modified_trend, now_ms); -} - -void TrendlineEstimator::UpdateThreshold(double modified_trend, - int64_t now_ms) { - if (last_update_ms_ == -1) last_update_ms_ = now_ms; - - if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) { - // Avoid adapting the threshold to big latency spikes, caused e.g., - // by a sudden capacity drop. - last_update_ms_ = now_ms; - return; - } - - const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_; - const int64_t kMaxTimeDeltaMs = 100; - int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs); - threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms; - if (threshold_ < 6.f) threshold_ = 6.f; - if (threshold_ > 600.f) threshold_ = 600.f; - // threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f); - last_update_ms_ = now_ms; -} - -} // namespace rtc - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/protocols/rtc/trendline_estimator.h b/libtransport/src/protocols/rtc/trendline_estimator.h deleted file mode 100644 index 372acbc67..000000000 --- a/libtransport/src/protocols/rtc/trendline_estimator.h +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -// FROM -// https://source.chromium.org/chromium/chromium/src/+/master:third_party/webrtc/modules/congestion_controller/goog_cc/trendline_estimator.h - -#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ -#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ - -#include <stddef.h> -#include <stdint.h> - -#include <algorithm> -#include <deque> -#include <memory> -#include <utility> - -namespace transport { - -namespace protocol { - -namespace rtc { - -class OptionalDouble { - public: - OptionalDouble() : val(0), has_val(false){}; - OptionalDouble(double val) : val(val), has_val(true){}; - - double value() { return val; } - bool has_value() { return has_val; } - - private: - double val; - bool has_val; -}; - -enum class BandwidthUsage { - kBwNormal = 0, - kBwUnderusing = 1, - kBwOverusing = 2, - kLast -}; - -struct TrendlineEstimatorSettings { - static constexpr char kKey[] = "WebRTC-Bwe-TrendlineEstimatorSettings"; - static constexpr unsigned kDefaultTrendlineWindowSize = 20; - - // TrendlineEstimatorSettings() = delete; - TrendlineEstimatorSettings( - /*const WebRtcKeyValueConfig* key_value_config*/); - - // Sort the packets in the window. Should be redundant, - // but then almost no cost. - bool enable_sort = false; - - // Cap the trendline slope based on the minimum delay seen - // in the beginning_packets and end_packets respectively. - bool enable_cap = false; - unsigned beginning_packets = 7; - unsigned end_packets = 7; - double cap_uncertainty = 0.0; - - // Size (in packets) of the window. - unsigned window_size = kDefaultTrendlineWindowSize; - - // std::unique_ptr<StructParametersParser> Parser(); -}; - -class TrendlineEstimator /*: public DelayIncreaseDetectorInterface */ { - public: - TrendlineEstimator(/*const WebRtcKeyValueConfig* key_value_config, - NetworkStatePredictor* network_state_predictor*/); - - ~TrendlineEstimator(); - - // Update the estimator with a new sample. The deltas should represent deltas - // between timestamp groups as defined by the InterArrival class. - void Update(double recv_delta_ms, double send_delta_ms, int64_t send_time_ms, - int64_t arrival_time_ms, size_t packet_size, - bool calculated_deltas); - - void UpdateTrendline(double recv_delta_ms, double send_delta_ms, - int64_t send_time_ms, int64_t arrival_time_ms, - size_t packet_size); - - BandwidthUsage State() const; - - struct PacketTiming { - PacketTiming(double arrival_time_ms, double smoothed_delay_ms, - double raw_delay_ms) - : arrival_time_ms(arrival_time_ms), - smoothed_delay_ms(smoothed_delay_ms), - raw_delay_ms(raw_delay_ms) {} - double arrival_time_ms; - double smoothed_delay_ms; - double raw_delay_ms; - }; - - private: - // friend class GoogCcStatePrinter; - void Detect(double trend, double ts_delta, int64_t now_ms); - - void UpdateThreshold(double modified_offset, int64_t now_ms); - - // Parameters. - TrendlineEstimatorSettings settings_; - const double smoothing_coef_; - const double threshold_gain_; - // Used by the existing threshold. - int num_of_deltas_; - // Keep the arrival times small by using the change from the first packet. - int64_t first_arrival_time_ms_; - // Exponential backoff filtering. - double accumulated_delay_; - double smoothed_delay_; - // Linear least squares regression. - std::deque<PacketTiming> delay_hist_; - - const double k_up_; - const double k_down_; - double overusing_time_threshold_; - double threshold_; - double prev_modified_trend_; - int64_t last_update_ms_; - double prev_trend_; - double time_over_using_; - int overuse_counter_; - BandwidthUsage hypothesis_; - // BandwidthUsage hypothesis_predicted_; - // NetworkStatePredictor* network_state_predictor_; - - // RTC_DISALLOW_COPY_AND_ASSIGN(TrendlineEstimator); -}; - -} // namespace rtc - -} // end namespace protocol - -} // end namespace transport -#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_TRENDLINE_ESTIMATOR_H_ |