diff options
Diffstat (limited to 'libtransport/src/protocols/rtc')
28 files changed, 602 insertions, 487 deletions
diff --git a/libtransport/src/protocols/rtc/probe_handler.cc b/libtransport/src/protocols/rtc/probe_handler.cc index 6a84914ab..60eceeb19 100644 --- a/libtransport/src/protocols/rtc/probe_handler.cc +++ b/libtransport/src/protocols/rtc/probe_handler.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <glog/logging.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <protocols/rtc/probe_handler.h> #include <protocols/rtc/rtc_consts.h> @@ -64,7 +65,7 @@ double ProbeHandler::getProbeLossRate() { } void ProbeHandler::setSuffixRange(uint32_t min, uint32_t max) { - assert(min <= max && min >= MIN_PROBE_SEQ); + DCHECK(min <= max && min >= MIN_PROBE_SEQ); distr_ = std::uniform_int_distribution<uint32_t>(min, max); } diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index d2682edfa..9a56269f3 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), new RtcReassembly(icn_socket, this)), + max_aggregated_interest_(1), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); round_timer_ = @@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() { TransportProtocol::resume(); } -std::size_t RTCTransportProtocol::transportHeaderLength() { +std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) { return DATA_HEADER_SIZE + - (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0); + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0); } // private @@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() { std::shared_ptr<auth::Verifier> verifier; socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - uint32_t unverified_interval; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL, - unverified_interval); + uint32_t factor_relevant; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT, + factor_relevant); - double unverified_ratio; - socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO, - unverified_ratio); + uint32_t factor_alert; + socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT, + factor_alert); rc_ = std::make_shared<RTCRateControlCongestionDetection>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( @@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() { } }); - verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval, - unverified_ratio); + verifier_ = + std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert); state_ = std::make_shared<RTCState>( indexer_verifier_.get(), @@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() { last_interest_sent_time_ = 0; last_interest_sent_seq_ = 0; -#if 0 - if(portal_->isConnectedToFwd()){ - max_aggregated_interest_ = 1; - }else{ - max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; - } -#else - max_aggregated_interest_ = 1; - if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) { - LOG(INFO) << "Max Aggregated: " << max_aggr; - max_aggregated_interest_ = std::stoul(std::string(max_aggr)); + // Aggregated interests setup + bool aggregated_interests_on; + socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS, + aggregated_interests_on); + if (aggregated_interests_on) { + if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) + max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr)); + else + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + + max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_, + 1 + MAX_SUFFIXES_IN_MANIFEST); } -#endif + LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_; max_sent_int_ = std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); @@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() { socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy); ldr_->changeRecoveryStrategy( (interface::RtcTransportRecoveryStrategies)strategy); + + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode) ldr_->setContentSharingMode(); ldr_->turnOnRecovery(); ldr_->onNewRound(false); @@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() { Name *name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); Prefix prefix(*name, 128); - if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BEST_PATH); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_REPLICATION) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::REPLICATION); - } else if ((interface::RtcTransportRecoveryStrategies)strategy == - interface::RtcTransportRecoveryStrategies:: - LOW_RATE_AND_ALL_FWD_STRATEGIES) { - fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(), - RTCForwardingStrategy::BOTH); - } - + fwd_strategy_.initFwdStrategy( + portal_, prefix, state_.get(), + (interface::RtcTransportRecoveryStrategies)strategy); updateSyncWindow(); } @@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() { return; } + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE)) + production_rate = MIN_PROD_RATE_SHARING_MODE; + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); uint32_t lifetime = default_values::interest_lifetime; @@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() { double prod_rate = state_->getProducerRate(); double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC; double packet_size = state_->getAveragePacketSize(); + bool content_sharing_mode; + socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE, + content_sharing_mode); + if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE)) + prod_rate = MIN_PROD_RATE_SHARING_MODE; // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { @@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { sendInterest(*interest_name); } +void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) { + if (!isRunning() && !is_first_) return; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + // we got a timeout for this packet so it is not pending anymore + interest_name->setSuffix(seq); + state_->onSendNewInterest(interest_name); + sendInterest(*interest_name); +} + void RTCTransportProtocol::scheduleNextInterests() { DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; @@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() { } // skip received packets - if (indexer_verifier_->checkNextSuffix() <= - state_->getHighestSeqReceivedInOrder()) { - indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); + uint32_t max_received = state_->getHighestSeqReceivedInOrder(); + if (indexer_verifier_->checkNextSuffix() <= max_received) { + indexer_verifier_->jumpToIndex(max_received + 1); } uint32_t sent_interests = 0; @@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() { << "In while loop. Window size: " << current_sync_win_; uint32_t next_seg = indexer_verifier_->getNextSuffix(); - name->setSuffix(next_seg); // send the packet only if: @@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, } timeouts_or_nacks_.insert(segment_number); - if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && segment_number <= state_->getHighestSeqReceived()) { // we retransmit packets only if the producer is active, otherwise we @@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() << ", jump to " << segment_number; // add an extra space in the window - current_sync_win_++; indexer_verifier_->jumpToIndex(segment_number); } state_->onTimeout(segment_number, false); + sendInterestForTimeout(segment_number); scheduleNextInterests(); } @@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it); state_->onJumpForward(production_seg); - verifier_->onJumpForward(production_seg); // the client is asking for content in the past // switch to catch up state and increase the window // this is true only if the packet is not an RTX @@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived( // Check if the packet is a retransmission if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) { if (is_data || is_manifest) { - state_->onPacketRecoveredRtx(segment_number); + uint64_t rtt = ldr_->getRtxRtt(segment_number); + state_->onPacketRecoveredRtx(content_object, rtt); if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); @@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived( } if (is_fec) { - state_->onFecPacketRecoveredRtx(segment_number); + state_->onFecPacketRecoveredRtx(content_object); } } @@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateAverageWindowSize(state_->getPendingInterestNumber()); stats_->updateLossRatio(state_->getPerSecondLossRate()); uint64_t rtt = state_->getAvgRTT(); - stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt)); + stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000)); stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); @@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object, DLOG_IF(INFO, VLOG_IS_ON(4)) << "Send packet " << content_object.getName() << " to FEC decoder"; - uint32_t offset = is_manifest - ? content_object.headerSize() - : content_object.headerSize() + rtc::DATA_HEADER_SIZE; + uint32_t offset = + is_manifest + ? (uint32_t)content_object.headerSize() + : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE); uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType()); fec_decoder_->onDataPacket(content_object, offset, metadata); @@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) { processManifest(*interest, *content_object); } - state_->onPacketRecoveredFec(seq_number, buffer->length()); + state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length()); ldr_->onPacketRecoveredFec(seq_number); if (payload_type == PayloadType::DATA) { @@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest, ContentObject::Ptr RTCTransportProtocol::removeFecHeader( const ContentObject &content_object) { - if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) { + if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) { return nullptr; } - size_t fec_header_size = fec_decoder_->getFecHeaderSize(); + size_t fec_header_size = fec_decoder_->getFecHeaderSize(false); const uint8_t *payload = content_object.data() + content_object.headerSize() + fec_header_size; size_t payload_size = content_object.payloadSize() - fec_header_size; diff --git a/libtransport/src/protocols/rtc/rtc.h b/libtransport/src/protocols/rtc/rtc.h index 3763f33c7..a8a474216 100644 --- a/libtransport/src/protocols/rtc/rtc.h +++ b/libtransport/src/protocols/rtc/rtc.h @@ -44,7 +44,7 @@ class RTCTransportProtocol : public TransportProtocol { void resume() override; - std::size_t transportHeaderLength() override; + std::size_t transportHeaderLength(bool isFEC) override; auto shared_from_this() { return utils::shared_from(this); } @@ -69,6 +69,7 @@ class RTCTransportProtocol : public TransportProtocol { // packet functions void sendRtxInterest(uint32_t seq); void sendProbeInterest(uint32_t seq); + void sendInterestForTimeout(uint32_t seq); void scheduleNextInterests() override; void onInterestTimeout(Interest::Ptr &interest, const Name &name) override; void onNack(const ContentObject &content_object); diff --git a/libtransport/src/protocols/rtc/rtc_consts.h b/libtransport/src/protocols/rtc/rtc_consts.h index 96e39d07e..29b5a3a12 100644 --- a/libtransport/src/protocols/rtc/rtc_consts.h +++ b/libtransport/src/protocols/rtc/rtc_consts.h @@ -54,7 +54,7 @@ const uint32_t PACING_WAIT = 1000; // usec to wait betwing two pacing batch. As const uint32_t MAX_RTX_IN_BATCH = 10; // max rtx to send in loop // packet const -const uint32_t RTC_INTEREST_LIFETIME = 2000; +const uint32_t RTC_INTEREST_LIFETIME = 4000; // probes sequence range const uint32_t MIN_PROBE_SEQ = 0xefffffff; @@ -93,6 +93,7 @@ const double CATCH_UP_WIN_INCREMENT = 1.2; // used in rate control const double WIN_DECREASE_FACTOR = 0.5; const double WIN_INCREASE_FACTOR = 1.5; +const uint32_t MIN_PROD_RATE_SHARING_MODE = 125000; // 1Mbps in bytes // round in congestion const double ROUNDS_BEFORE_TAKE_ACTION = 5; @@ -120,14 +121,14 @@ const uint64_t MAX_TIMER_RTX = ~0; const uint32_t SENTINEL_TIMER_INTERVAL = 100; // ms const uint32_t MAX_RTX_WITH_SENTINEL = 10; // packets const double CATCH_UP_RTT_INCREMENT = 1.2; -const double MAX_RESIDUAL_LOSS_RATE = 2.0; // % -const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC * 5; +const double MAX_RESIDUAL_LOSS_RATE = 1.0; // % +const uint32_t WAIT_BEFORE_FEC_UPDATE = ROUNDS_PER_SEC; +const uint32_t MAX_RTT_BEFORE_FEC = 60; // ms // used by producer const uint32_t PRODUCER_STATS_INTERVAL = 200; // ms const uint32_t MIN_PRODUCTION_RATE = 25; // pps, equal to min window * // rounds in a second -const uint32_t NACK_DELAY = 1500; // ms const uint32_t FEC_PACING_TIME = 5; // ms // aggregated data consts @@ -139,6 +140,73 @@ const uint32_t AGGREGATED_PACKETS_TIMER = 2; // ms const uint32_t MAX_RTT = 200; // ms const double MAX_RESIDUAL_LOSSES = 0.05; // % +const uint8_t FEC_MATRIX[64][10] = { + {1, 2, 2, 2, 3, 3, 4, 5, 5, 6}, // k = 1 + {1, 2, 3, 3, 4, 5, 5, 6, 7, 9}, + {2, 2, 3, 4, 5, 6, 7, 8, 9, 11}, + {2, 3, 4, 5, 5, 7, 8, 9, 11, 13}, + {2, 3, 4, 5, 6, 7, 9, 10, 12, 14}, // k = 5 + {2, 3, 4, 6, 7, 8, 10, 12, 14, 16}, + {2, 4, 5, 6, 8, 9, 11, 13, 15, 18}, + {3, 4, 5, 7, 8, 10, 12, 14, 16, 19}, + {3, 4, 6, 7, 9, 11, 13, 15, 18, 21}, + {3, 4, 6, 8, 9, 11, 14, 16, 19, 23}, // k = 10 + {3, 5, 6, 8, 10, 12, 14, 17, 20, 24}, + {3, 5, 7, 8, 10, 13, 15, 18, 21, 26}, + {3, 5, 7, 9, 11, 13, 16, 19, 23, 27}, + {3, 5, 7, 9, 12, 14, 17, 20, 24, 28}, + {4, 6, 8, 10, 12, 15, 18, 21, 25, 30}, // k = 15 + {4, 6, 8, 10, 13, 15, 19, 22, 26, 31}, + {4, 6, 8, 11, 13, 16, 19, 23, 27, 33}, + {4, 6, 9, 11, 14, 17, 20, 24, 29, 34}, + {4, 6, 9, 11, 14, 17, 21, 25, 30, 35}, + {4, 7, 9, 12, 15, 18, 22, 26, 31, 37}, // k = 20 + {4, 7, 9, 12, 15, 19, 22, 27, 32, 38}, + {4, 7, 10, 13, 16, 19, 23, 28, 33, 40}, + {5, 7, 10, 13, 16, 20, 24, 29, 34, 41}, + {5, 7, 10, 13, 17, 20, 25, 30, 35, 42}, + {5, 8, 11, 14, 17, 21, 26, 31, 37, 44}, // k = 25 + {5, 8, 11, 14, 18, 22, 26, 31, 38, 45}, + {5, 8, 11, 15, 18, 22, 27, 32, 39, 46}, + {5, 8, 11, 15, 19, 23, 28, 33, 40, 48}, + {5, 8, 12, 15, 19, 24, 28, 34, 41, 49}, + {5, 9, 12, 16, 20, 24, 29, 35, 42, 50}, // k = 30 + {5, 9, 12, 16, 20, 25, 30, 36, 43, 51}, + {5, 9, 13, 16, 21, 25, 31, 37, 44, 53}, + {6, 9, 13, 17, 21, 26, 31, 38, 45, 54}, + {6, 9, 13, 17, 22, 26, 32, 39, 46, 55}, + {6, 10, 13, 17, 22, 27, 33, 40, 47, 57}, // k = 35 + {6, 10, 14, 18, 22, 28, 34, 40, 48, 58}, + {6, 10, 14, 18, 23, 28, 34, 41, 49, 59}, + {6, 10, 14, 19, 23, 29, 35, 42, 50, 60}, + {6, 10, 14, 19, 24, 29, 36, 43, 52, 62}, + {6, 10, 15, 19, 24, 30, 36, 44, 53, 63}, // k = 40 + {6, 11, 15, 20, 25, 31, 37, 45, 54, 64}, + {6, 11, 15, 20, 25, 31, 38, 46, 55, 65}, + {7, 11, 15, 20, 26, 32, 39, 46, 56, 67}, + {7, 11, 16, 21, 26, 32, 39, 47, 57, 68}, + {7, 11, 16, 21, 27, 33, 40, 48, 58, 69}, // k = 45 + {7, 11, 16, 21, 27, 33, 41, 49, 59, 70}, + {7, 12, 16, 22, 27, 34, 41, 50, 60, 72}, + {7, 12, 17, 22, 28, 34, 42, 51, 61, 73}, + {7, 12, 17, 22, 28, 35, 43, 52, 62, 74}, + {7, 12, 17, 23, 29, 36, 43, 52, 63, 75}, // k = 50 + {7, 12, 17, 23, 29, 36, 44, 53, 64, 77}, + {7, 12, 18, 23, 30, 37, 45, 54, 65, 78}, + {7, 13, 18, 24, 30, 37, 45, 55, 66, 79}, + {8, 13, 18, 24, 31, 38, 46, 56, 67, 80}, + {8, 13, 18, 24, 31, 38, 47, 57, 68, 82}, // k = 55 + {8, 13, 19, 25, 31, 39, 47, 57, 69, 83}, + {8, 13, 19, 25, 32, 39, 48, 58, 70, 84}, + {8, 13, 19, 25, 32, 40, 49, 59, 71, 85}, + {8, 14, 19, 26, 33, 41, 50, 60, 72, 86}, + {8, 14, 20, 26, 33, 41, 50, 61, 73, 88}, // k = 60 + {8, 14, 20, 26, 34, 42, 51, 61, 74, 89}, + {8, 14, 20, 27, 34, 42, 52, 62, 75, 90}, + {8, 14, 20, 27, 34, 43, 52, 63, 76, 91}, + {8, 14, 21, 27, 35, 43, 53, 64, 77, 92}, // k = 64 +}; + } // namespace rtc } // namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_data_path.cc b/libtransport/src/protocols/rtc/rtc_data_path.cc index b3abf5ea8..a421396b1 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.cc +++ b/libtransport/src/protocols/rtc/rtc_data_path.cc @@ -91,6 +91,8 @@ void RTCDataPath::insertRttSample( rtt_samples_ = 0; last_avg_rtt_compute_ = now; } + + received_packets_++; } void RTCDataPath::insertOwdSample(int64_t owd) { @@ -115,10 +117,6 @@ void RTCDataPath::insertOwdSample(int64_t owd) { int64_t diff = std::abs(owd - last_owd_); last_owd_ = owd; jitter_ += (1.0 / 16.0) * ((double)diff - jitter_); - - // owd is computed only for valid data packets so we count only - // this for decide if we recevie traffic or not - received_packets_++; } void RTCDataPath::computeInterArrivalGap(uint32_t segment_number) { @@ -150,12 +148,17 @@ double RTCDataPath::getInterArrivalGap() { return avg_inter_arrival_; } -bool RTCDataPath::isActive() { +bool RTCDataPath::isValidProducer() { if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true; return false; } +bool RTCDataPath::isActive() { + if (rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) return true; + return false; +} + bool RTCDataPath::pathToProducer() { if (received_nacks_) return true; return false; diff --git a/libtransport/src/protocols/rtc/rtc_data_path.h b/libtransport/src/protocols/rtc/rtc_data_path.h index 5afbbb87f..ba5201fe8 100644 --- a/libtransport/src/protocols/rtc/rtc_data_path.h +++ b/libtransport/src/protocols/rtc/rtc_data_path.h @@ -49,8 +49,9 @@ class RTCDataPath { double getQueuingDealy(); double getInterArrivalGap(); double getJitter(); - bool isActive(); - bool pathToProducer(); + bool isActive(); // pakets recevied from this path in the last rounds + bool pathToProducer(); // path from a producer + bool isValidProducer(); // path from a producer that is also active uint64_t getLastPacketTS(); uint32_t getPacketsLastRound(); diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc index c6bc751e6..4bbd7eac0 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.cc @@ -14,6 +14,7 @@ */ #include <hicn/transport/interfaces/notification.h> +#include <protocols/rtc/rtc_consts.h> #include <protocols/rtc/rtc_forwarding_strategy.h> namespace transport { @@ -24,8 +25,13 @@ namespace rtc { using namespace transport::interface; +const double FWD_MAX_QUEUE = 30.0; // ms +const double FWD_MAX_RTT = MAX_RTT_BEFORE_FEC; // ms +const double FWD_MAX_LOSS_RATE = 0.1; + RTCForwardingStrategy::RTCForwardingStrategy() - : init_(false), + : low_rate_app_(false), + init_(false), forwarder_set_(false), selected_strategy_(NONE), current_strategy_(NONE), @@ -42,17 +48,56 @@ void RTCForwardingStrategy::setCallback( void RTCForwardingStrategy::initFwdStrategy( std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, - strategy_t strategy) { - init_ = true; - selected_strategy_ = strategy; - if (strategy == BOTH) - current_strategy_ = BEST_PATH; - else - current_strategy_ = strategy; - rounds_since_last_set_ = 0; - prefix_ = prefix; - portal_ = portal; - state_ = state; + interface::RtcTransportRecoveryStrategies strategy) { + switch (strategy) { + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_REPLICATION: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies:: + LOW_RATE_AND_ALL_FWD_STRATEGIES: + init_ = true; + low_rate_app_ = true; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_BESTPATH: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = BEST_PATH; + current_strategy_ = BEST_PATH; + break; + case interface::RtcTransportRecoveryStrategies::DELAY_AND_REPLICATION: + init_ = true; + low_rate_app_ = false; + selected_strategy_ = REPLICATION; + current_strategy_ = REPLICATION; + break; + case interface::RtcTransportRecoveryStrategies::RECOVERY_OFF: + case interface::RtcTransportRecoveryStrategies::RTX_ONLY: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY: + case interface::RtcTransportRecoveryStrategies::DELAY_BASED: + case interface::RtcTransportRecoveryStrategies::LOW_RATE: + case interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES: + default: + // fwd strategies are not used + init_ = false; + } + + if (init_) { + rounds_since_last_set_ = 0; + prefix_ = prefix; + portal_ = portal; + state_ = state; + } } void RTCForwardingStrategy::checkStrategy() { @@ -99,16 +144,35 @@ void RTCForwardingStrategy::checkStrategyBestPath() { return; } - uint8_t qs = state_->getQualityScore(); + if (low_rate_app_) { + // this is used for gaming + uint8_t qs = state_->getQualityScore(); - if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec - // between each switch - rounds_since_last_set_++; - return; - } + if (qs >= 4 || rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } - // try to switch path - setStrategy(BEST_PATH); + // try to switch path + setStrategy(BEST_PATH); + } else { + if (rounds_since_last_set_ < 25) { // wait a least 5 sec + // between each switch + rounds_since_last_set_++; + return; + } + + double queue = state_->getQueuing(); + double rtt = state_->getAvgRTT(); + double loss_rate = state_->getPerSecondLossRate(); + + if (queue >= FWD_MAX_QUEUE || rtt >= FWD_MAX_RTT || + loss_rate > FWD_MAX_LOSS_RATE) { + // try to switch path + setStrategy(BEST_PATH); + } + } } void RTCForwardingStrategy::checkStrategyReplication() { @@ -133,7 +197,7 @@ void RTCForwardingStrategy::checkStrategyBoth() { // TODO // for the moment we use only best path. - // but later: + // for later: // 1. if both paths are bad use replication // 2. while using replication compute the effectiveness. if the majority of // the packets are coming from a single path, try to use bestpath diff --git a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h index 9825877fd..c2227e09f 100644 --- a/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_forwarding_strategy.h @@ -41,7 +41,7 @@ class RTCForwardingStrategy { void initFwdStrategy(std::shared_ptr<core::Portal> portal, core::Prefix& prefix, RTCState* state, - strategy_t strategy); + interface::RtcTransportRecoveryStrategies strategy); void checkStrategy(); void setCallback(interface::StrategyCallback&& callback); @@ -56,6 +56,10 @@ class RTCForwardingStrategy { std::array<std::string, 4> string_strategies_ = {"bestpath", "replication", "both", "none"}; + bool low_rate_app_; // if set to true the best path strategy will + // trigger a path switch based on the quality + // score, otherwise it will use the RTT, + // queuing delay and loss rate bool init_; // true if all val are initializes bool forwarder_set_; // true if the strategy is been set at least // once diff --git a/libtransport/src/protocols/rtc/rtc_ldr.cc b/libtransport/src/protocols/rtc/rtc_ldr.cc index abf6cda2c..6e88a8636 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.cc +++ b/libtransport/src/protocols/rtc/rtc_ldr.cc @@ -37,16 +37,24 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( interface::RtcTransportRecoveryStrategies type, RecoveryStrategy::SendRtxCallback &&callback, interface::StrategyCallback &&external_callback) { - rs_type_ = type; if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { rs_ = std::make_shared<RecoveryStrategyRecoveryOff>( - indexer, std::move(callback), io_service, std::move(external_callback)); - } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { rs_ = std::make_shared<RecoveryStrategyDelayBased>( - indexer, std::move(callback), io_service, std::move(external_callback)); - } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + indexer, std::move(callback), io_service, type, + std::move(external_callback)); + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { rs_ = std::make_shared<RecoveryStrategyFecOnly>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_BESTPATH || @@ -55,12 +63,14 @@ RTCLossDetectionAndRecovery::RTCLossDetectionAndRecovery( type == interface::RtcTransportRecoveryStrategies:: LOW_RATE_AND_ALL_FWD_STRATEGIES) { rs_ = std::make_shared<RecoveryStrategyLowRate>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } else { // default - rs_type_ = interface::RtcTransportRecoveryStrategies::RTX_ONLY; + type = interface::RtcTransportRecoveryStrategies::RTX_ONLY; rs_ = std::make_shared<RecoveryStrategyRtxOnly>( - indexer, std::move(callback), io_service, std::move(external_callback)); + indexer, std::move(callback), io_service, type, + std::move(external_callback)); } } @@ -68,15 +78,21 @@ RTCLossDetectionAndRecovery::~RTCLossDetectionAndRecovery() {} void RTCLossDetectionAndRecovery::changeRecoveryStrategy( interface::RtcTransportRecoveryStrategies type) { - if (type == rs_type_) return; + if (type == rs_->getType()) return; - rs_type_ = type; + rs_->updateType(type); if (type == interface::RtcTransportRecoveryStrategies::RECOVERY_OFF) { rs_ = std::make_shared<RecoveryStrategyRecoveryOff>(std::move(*(rs_.get()))); - } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED) { + } else if (type == interface::RtcTransportRecoveryStrategies::DELAY_BASED || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_BESTPATH || + type == interface::RtcTransportRecoveryStrategies:: + DELAY_AND_REPLICATION) { rs_ = std::make_shared<RecoveryStrategyDelayBased>(std::move(*(rs_.get()))); - } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY) { + } else if (type == interface::RtcTransportRecoveryStrategies::FEC_ONLY || + type == interface::RtcTransportRecoveryStrategies:: + FEC_ONLY_LOW_RES_LOSSES) { rs_ = std::make_shared<RecoveryStrategyFecOnly>(std::move(*(rs_.get()))); } else if (type == interface::RtcTransportRecoveryStrategies::LOW_RATE || type == interface::RtcTransportRecoveryStrategies:: @@ -116,14 +132,15 @@ bool RTCLossDetectionAndRecovery::onDataPacketReceived( uint32_t seq = content_object.getName().getSuffix(); bool is_rtx = rs_->isRtx(seq); rs_->receivedPacket(seq); + bool ret = false; DLOG_IF(INFO, VLOG_IS_ON(3)) << "received data. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " << seq; + << rs_->getState()->getHighestSeqReceived() + 1 << " to " << seq; if (!is_rtx) - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, seq, - false); + ret = detectLoss(rs_->getState()->getHighestSeqReceived() + 1, seq, false); - return false; + rs_->getState()->updateHighestSeqReceived(seq); + return ret; } bool RTCLossDetectionAndRecovery::onNackPacketReceived( @@ -141,10 +158,9 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived( // may got lost and we should ask them rs_->receivedPacket(seq); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "received nack. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " - << production_seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received nack. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; // if it is a future nack store it in the list set of nacked seq if (production_seq <= seq) rs_->receivedFutureNack(seq); @@ -152,7 +168,7 @@ bool RTCLossDetectionAndRecovery::onNackPacketReceived( // call the detectLoss function using the probe flag = true. in fact the // losses detected using nacks are the same as the one detected using probes, // we should not increase the loss counter - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, production_seq, true); } @@ -164,12 +180,11 @@ bool RTCLossDetectionAndRecovery::onProbePacketReceived( uint32_t production_seq = RTCState::getProbeParams(probe).prod_seg; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "received probe. add from " - << rs_->getState()->getHighestSeqReceivedInOrder() + 1 << " to " - << production_seq; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "received probe. add from " + << rs_->getState()->getHighestSeqReceived() + 1 + << " to " << production_seq; - return detectLoss(rs_->getState()->getHighestSeqReceivedInOrder() + 1, + return detectLoss(rs_->getState()->getHighestSeqReceived() + 1, production_seq, true); } @@ -183,8 +198,8 @@ bool RTCLossDetectionAndRecovery::detectLoss(uint32_t start, uint32_t stop, } // skip received or lost packets - if (start <= rs_->getState()->getHighestSeqReceivedInOrder()) { - start = rs_->getState()->getHighestSeqReceivedInOrder() + 1; + if (start <= rs_->getState()->getHighestSeqReceived()) { + start = rs_->getState()->getHighestSeqReceived() + 1; } bool loss_detected = false; diff --git a/libtransport/src/protocols/rtc/rtc_ldr.h b/libtransport/src/protocols/rtc/rtc_ldr.h index 7f683eaa6..24f22ffed 100644 --- a/libtransport/src/protocols/rtc/rtc_ldr.h +++ b/libtransport/src/protocols/rtc/rtc_ldr.h @@ -47,6 +47,7 @@ class RTCLossDetectionAndRecovery void setFecParams(uint32_t n, uint32_t k) { rs_->setFecParams(n, k); } + void setContentSharingMode() { rs_->setContentSharingMode(); } void turnOnRecovery() { rs_->turnOnRecovery(); } bool isRtxOn() { return rs_->isRtxOn(); } @@ -68,11 +69,12 @@ class RTCLossDetectionAndRecovery return rs_->isPossibleLossWithNoRtx(seq); } + uint64_t getRtxRtt(uint32_t seq) { return rs_->getRtxRtt(seq); } + private: // returns true if a loss is detected, false otherwise bool detectLoss(uint32_t start, uint32_t stop, bool recv_probe); - interface::RtcTransportRecoveryStrategies rs_type_; std::shared_ptr<RecoveryStrategy> rs_; }; diff --git a/libtransport/src/protocols/rtc/rtc_packet.h b/libtransport/src/protocols/rtc/rtc_packet.h index 391aedfc6..ffbbd78fd 100644 --- a/libtransport/src/protocols/rtc/rtc_packet.h +++ b/libtransport/src/protocols/rtc/rtc_packet.h @@ -52,6 +52,8 @@ #include <hicn/transport/portability/win_portability.h> #endif +#include <hicn/transport/portability/endianess.h> + #include <cstring> namespace transport { @@ -60,24 +62,6 @@ namespace protocol { namespace rtc { -inline uint64_t _ntohll(const uint64_t *input) { - uint64_t return_val; - uint8_t *tmp = (uint8_t *)&return_val; - - tmp[0] = (uint8_t)(*input >> 56); - tmp[1] = (uint8_t)(*input >> 48); - tmp[2] = (uint8_t)(*input >> 40); - tmp[3] = (uint8_t)(*input >> 32); - tmp[4] = (uint8_t)(*input >> 24); - tmp[5] = (uint8_t)(*input >> 16); - tmp[6] = (uint8_t)(*input >> 8); - tmp[7] = (uint8_t)(*input >> 0); - - return return_val; -} - -inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } - const uint32_t DATA_HEADER_SIZE = 12; // bytes // XXX: sizeof(data_packet_t) is 16 // beacuse of padding @@ -87,11 +71,19 @@ struct data_packet_t { uint64_t timestamp; uint32_t prod_rate; - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } }; struct nack_packet_t { @@ -99,14 +91,26 @@ struct nack_packet_t { uint32_t prod_rate; uint32_t prod_seg; - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + inline uint64_t getTimestamp() const { + return portability::net_to_host(timestamp); + } + inline void setTimestamp(uint64_t time) { + timestamp = portability::host_to_net(time); + } - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + inline uint32_t getProductionRate() const { + return portability::net_to_host(prod_rate); + } + inline void setProductionRate(uint32_t rate) { + prod_rate = portability::host_to_net(rate); + } - inline uint32_t getProductionSegment() const { return ntohl(prod_seg); } - inline void setProductionSegment(uint32_t seg) { prod_seg = htonl(seg); } + inline uint32_t getProductionSegment() const { + return portability::net_to_host(prod_seg); + } + inline void setProductionSegment(uint32_t seg) { + prod_seg = portability::host_to_net(seg); + } }; class AggrPktHeader { @@ -225,7 +229,7 @@ class AggrPktHeader { return (uint16_t) * (buf_ + pkt_index); } else { // 16 bits uint16_t *buf_16 = (uint16_t *)buf_; - return ntohs(*(buf_16 + pkt_index)); + return portability::net_to_host(*(buf_16 + pkt_index)); } } @@ -235,7 +239,7 @@ class AggrPktHeader { *(buf_ + pkt_index) = (uint8_t)len; } else { // 16 bits uint16_t *buf_16 = (uint16_t *)buf_; - *(buf_16 + pkt_index) = htons(len); + *(buf_16 + pkt_index) = portability::host_to_net(len); } } diff --git a/libtransport/src/protocols/rtc/rtc_reassembly.cc b/libtransport/src/protocols/rtc/rtc_reassembly.cc index 992bab50e..b1b0fcaba 100644 --- a/libtransport/src/protocols/rtc/rtc_reassembly.cc +++ b/libtransport/src/protocols/rtc/rtc_reassembly.cc @@ -40,7 +40,7 @@ void RtcReassembly::reassemble(core::ContentObject& content_object) { auto read_buffer = content_object.getPayload(); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Size of payload: " << read_buffer->length(); - read_buffer->trimStart(transport_protocol_->transportHeaderLength()); + read_buffer->trimStart(transport_protocol_->transportHeaderLength(false)); if (data_aggregation_) { rtc::AggrPktHeader hdr((uint8_t*)read_buffer->data()); diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc index 66ae5086c..257fdd09b 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.cc @@ -29,8 +29,12 @@ using namespace transport::interface; RecoveryStrategy::RecoveryStrategy( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, - bool use_rtx, bool use_fec, interface::StrategyCallback &&external_callback) - : recovery_on_(false), + bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, + interface::StrategyCallback &&external_callback) + : rs_type_(rs_type), + recovery_on_(false), + content_sharing_mode_(false), rtx_during_fec_(0), next_rtx_timer_(MAX_TIMER_RTX), send_rtx_callback_(std::move(callback)), @@ -43,7 +47,9 @@ RecoveryStrategy::RecoveryStrategy( } RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) - : rtx_during_fec_(0), + : rs_type_(rs.rs_type_), + content_sharing_mode_(rs.content_sharing_mode_), + rtx_during_fec_(0), rtx_state_(std::move(rs.rtx_state_)), rtx_timers_(std::move(rs.rtx_timers_)), recover_with_fec_(std::move(rs.recover_with_fec_)), @@ -64,25 +70,52 @@ RecoveryStrategy::RecoveryStrategy(RecoveryStrategy &&rs) RecoveryStrategy::~RecoveryStrategy() {} void RecoveryStrategy::setFecParams(uint32_t n, uint32_t k) { + // if rs_type == FEC_ONLY_LOW_RES_LOSSES max k == 64 n_ = n; k_ = k; // XXX for the moment we go in steps of 5% loss rate. - // max loss rate = 95% + uint32_t i = 0; for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)(loss_rate + 5) / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - fec_state_ f; - f.fec_to_ask = std::min(fec_to_ask, (n_ - k_)); - f.last_update = round_id_; - f.avg_residual_losses = 0.0; - f.consecutive_use = 0; - fec_per_loss_rate_.push_back(f); + uint32_t fec_to_ask = 0; + if (n_ != 0 && k_ != 0) { + if (rs_type_ == + interface::RtcTransportRecoveryStrategies::FEC_ONLY_LOW_RES_LOSSES) { + // the max loss rate in the matrix is 50% + uint32_t index = i; + if (i > 9) index = 9; + fec_to_ask = FEC_MATRIX[k_ - 1][index]; + } else { + double dec_loss_rate = (double)(loss_rate + 5); + if (dec_loss_rate == 100.0) dec_loss_rate = 95.0; + dec_loss_rate = dec_loss_rate / 100.0; + double exp_losses = ceil((double)k_ * dec_loss_rate); + fec_to_ask = ceil((exp_losses / (1 - dec_loss_rate)) * 1.25); + } + } + fec_to_ask = std::min(fec_to_ask, (n_ - k_)); + fec_per_loss_rate_.push_back(fec_to_ask); + + i++; } } +uint64_t RecoveryStrategy::getRtxRtt(uint32_t seq) { + auto it = rtx_state_.find(seq); + + if (it == rtx_state_.end()) return 0; + + // we can compute the RTT of an RTX only if it was send once. Infact if the + // RTX was sent twice or more the data may be alredy in flight and the RTT + // will be underestimated. This may happen also for packets that we + // retransmitted too soon. in that case the RTT will be filtered out by + // checking the path label + if (it->second.rtx_count_ != 1) return 0; + + // this a potentialy valid packet, compute the RTT + return (utils::SteadyTime::nowMs().count() - it->second.last_send_); +} + bool RecoveryStrategy::lossDetected(uint32_t seq) { if (isRtx(seq)) { // this packet is already in the list of rtx @@ -141,8 +174,10 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { state.first_send_ = state_->getInterestSentTime(seq); if (state.first_send_ == 0) // this interest was never sent before state.first_send_ = getNow(); - state.next_send_ = computeNextSend(seq, true); + state.last_send_ = state.first_send_; // we didn't send an RTX for this + // packet yet state.rtx_count_ = 0; + state.next_send_ = computeNextSend(seq, state.rtx_count_); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Add " << seq << " to retransmissions. next rtx is in " << state.next_send_ - getNow() << " ms"; @@ -158,66 +193,50 @@ void RecoveryStrategy::addNewRtx(uint32_t seq, bool force) { } } -uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, bool new_rtx) { +uint64_t RecoveryStrategy::computeNextSend(uint32_t seq, uint32_t rtx_counter) { uint64_t now = getNow(); - if (new_rtx) { - // for the new rtx we wait one estimated IAT after the loss detection. this - // is bacause, assuming that packets arrive with a constant IAT, we should - // get a new packet every IAT - double prod_rate = state_->getProducerRate(); - uint32_t estimated_iat = SENTINEL_TIMER_INTERVAL; - uint32_t jitter = 0; + if (rtx_counter == 0) { + uint32_t wait = 1; + if (content_sharing_mode_) return now + wait; - if (prod_rate != 0) { - double packet_size = state_->getAveragePacketSize(); - estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); - jitter = ceil(state_->getJitter()); - } + uint32_t jitter = SENTINEL_TIMER_INTERVAL; + double prod_rate = state_->getProducerRate(); + if (prod_rate != 0) jitter = ceil(state_->getJitter()); - uint32_t wait = 1; - if (estimated_iat < 18) { - // for low rate app we do not wait to send a RTX - // we consider low rate stream with less than 50pps (iat >= 20ms) - // (e.g. audio in videoconf, mobile games). - // in the check we use 18ms to accomodate for measurements errors - // for flows with higher rate wait 1 ait + jitter - wait = estimated_iat + jitter; - } + wait += jitter; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "first rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "first rtx for " << seq << " in " << wait + << " ms, jitter = " << jitter; return now + wait; } else { - // wait one RTT - uint32_t wait = SENTINEL_TIMER_INTERVAL; - + // wait one RTT. if an edge is known use the edge RTT for the first 5 rtx double prod_rate = state_->getProducerRate(); if (prod_rate == 0) { return now + SENTINEL_TIMER_INTERVAL; } - double packet_size = state_->getAveragePacketSize(); - uint32_t estimated_iat = ceil(1000.0 / (prod_rate / packet_size)); + uint64_t rtt = 0; + // if the transport detects an edge we try first to get the RTX from the + // edge. if no interest get a reply we move to the full RTT + if (rtx_counter < 5 && (state_->getEdgeRtt() != 0)) { + rtt = state_->getEdgeRtt(); + } else { + rtt = state_->getAvgRTT(); + } - uint64_t rtt = state_->getMinRTT(); if (rtt == 0) rtt = SENTINEL_TIMER_INTERVAL; - wait = rtt; + + if (content_sharing_mode_) return now + rtt; + + uint32_t wait = (uint32_t)rtt; uint32_t jitter = ceil(state_->getJitter()); wait += jitter; - // it may happen that the channel is congested and we have some additional - // queuing delay to take into account - uint32_t queue = ceil(state_->getQueuing()); - wait += queue; - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "next rtx for " << seq << " in " << wait - << " ms, rtt = " << state_->getMinRTT() << " ait = " << estimated_iat - << " jttr = " << jitter << " queue = " << queue; + << "next rtx for " << seq << " in " << wait << " ms, rtt = " << rtt + << " jtter = " << jitter; return now + wait; } @@ -252,7 +271,9 @@ void RecoveryStrategy::retransmit() { state_->onRetransmission(seq); double prod_rate = state_->getProducerRate(); if (prod_rate != 0) rtx_it->second.rtx_count_++; - rtx_it->second.next_send_ = computeNextSend(seq, false); + rtx_it->second.last_send_ = now; + rtx_it->second.next_send_ = + computeNextSend(seq, rtx_it->second.rtx_count_); it = rtx_timers_.erase(it); rtx_timers_.insert( std::pair<uint64_t, uint32_t>(rtx_it->second.next_send_, seq)); @@ -327,6 +348,7 @@ void RecoveryStrategy::deleteRtx(uint32_t seq) { } it_timers++; } + // remove rtx rtx_state_.erase(it_rtx); } @@ -339,53 +361,13 @@ uint32_t RecoveryStrategy::computeFecPacketsToAsk() { if (loss_rate == 0) return 0; - // once per minute try to reduce the fec rate. it may happen that for some bin - // we ask too many fec packet. here we try to reduce this values gently - if (round_id_ % ROUNDS_PER_MIN == 0) { - reduceFec(); - } - // keep track of the last used fec. if we use a new bin on this round reset // consecutive use and avg loss in the prev bin uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (bin > fec_per_loss_rate_.size() - 1) bin = fec_per_loss_rate_.size() - 1; + if (bin > fec_per_loss_rate_.size() - 1) + bin = (uint32_t)fec_per_loss_rate_.size() - 1; - if (bin != last_fec_used_) { - fec_per_loss_rate_[last_fec_used_].consecutive_use = 0; - fec_per_loss_rate_[last_fec_used_].avg_residual_losses = 0.0; - } - last_fec_used_ = bin; - fec_per_loss_rate_[last_fec_used_].consecutive_use++; - - // we update the stats only once very 5 rounds (1sec) that is the rate at - // which we compute residual losses - if (round_id_ % ROUNDS_PER_SEC == 0) { - double residual_losses = state_->getResidualLossRate() * 100; - // update residual loss rate - fec_per_loss_rate_[bin].avg_residual_losses = - (fec_per_loss_rate_[bin].avg_residual_losses * MOVING_AVG_ALPHA) + - (1 - MOVING_AVG_ALPHA) * residual_losses; - - if ((fec_per_loss_rate_[bin].last_update - round_id_) < - WAIT_BEFORE_FEC_UPDATE) { - // this bin is been updated recently so don't modify it and - // return the current state - return fec_per_loss_rate_[bin].fec_to_ask; - } - - // if the residual loss rate is too high and we can ask more fec packets and - // we are using this configuration since at least 5 sec update fec - if (fec_per_loss_rate_[bin].avg_residual_losses > MAX_RESIDUAL_LOSS_RATE && - fec_per_loss_rate_[bin].fec_to_ask < (n_ - k_) && - fec_per_loss_rate_[bin].consecutive_use > WAIT_BEFORE_FEC_UPDATE) { - // so increase the number of fec packets to ask - fec_per_loss_rate_[bin].fec_to_ask++; - fec_per_loss_rate_[bin].last_update = round_id_; - fec_per_loss_rate_[bin].avg_residual_losses = 0.0; - } - } - - return fec_per_loss_rate_[bin].fec_to_ask; + return fec_per_loss_rate_[bin]; } void RecoveryStrategy::setRtxFec(std::optional<bool> rtx_on, @@ -431,21 +413,6 @@ void RecoveryStrategy::removePacketState(uint32_t seq) { deleteRtx(seq); } -// private methods - -void RecoveryStrategy::reduceFec() { - for (uint32_t loss_rate = 5; loss_rate < 100; loss_rate += 5) { - double dec_loss_rate = (double)loss_rate / 100.0; - double exp_losses = (double)k_ * dec_loss_rate; - uint32_t fec_to_ask = ceil(exp_losses / (1 - dec_loss_rate)); - - uint32_t bin = ceil(loss_rate / 5.0) - 1; - if (fec_per_loss_rate_[bin].fec_to_ask > fec_to_ask) { - fec_per_loss_rate_[bin].fec_to_ask--; - } - } -} - } // end namespace rtc } // end namespace protocol diff --git a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h index 482aedc9d..aceb85888 100644 --- a/libtransport/src/protocols/rtc/rtc_recovery_strategy.h +++ b/libtransport/src/protocols/rtc/rtc_recovery_strategy.h @@ -32,9 +32,10 @@ namespace rtc { class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { protected: struct rtx_state_ { - uint64_t first_send_; - uint64_t next_send_; - uint32_t rtx_count_; + uint64_t first_send_; // first time this interest was sent + uint64_t last_send_; // last time this rtx was sent + uint64_t next_send_; // next retransmission time + uint32_t rtx_count_; // number or rtx }; using rtxState = struct rtx_state_; @@ -44,6 +45,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { RecoveryStrategy(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, bool use_rtx, bool use_fec, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategy(RecoveryStrategy &&rs); @@ -55,6 +57,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { void setState(RTCState *state) { state_ = state; } void setRateControl(RTCRateControl *rateControl) { rc_ = rateControl; } void setFecParams(uint32_t n, uint32_t k); + void setContentSharingMode() { content_sharing_mode_ = true; } bool isRtx(uint32_t seq) { if (rtx_state_.find(seq) != rtx_state_.end()) return true; @@ -71,10 +74,20 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { return false; } + interface::RtcTransportRecoveryStrategies getType() { + return rs_type_; + } + void updateType(interface::RtcTransportRecoveryStrategies type) { + rs_type_ = type; + } bool isRtxOn() { return rtx_on_; } bool isFecOn() { return fec_on_; } RTCState *getState() { return state_; } + + // if the function returns 0 it means that the packet is not an RTX or it is + // not a valid packet to safely compute the RTT + uint64_t getRtxRtt(uint32_t seq); bool lossDetected(uint32_t seq); void notifyNewLossDetedcted(uint32_t seq); void requestPossibleLostPacket(uint32_t seq); @@ -98,7 +111,7 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { protected: // rtx functions void addNewRtx(uint32_t seq, bool force); - uint64_t computeNextSend(uint32_t seq, bool new_rtx); + uint64_t computeNextSend(uint32_t seq, uint32_t rtx_counter); void retransmit(); void scheduleNextRtx(); void deleteRtx(uint32_t seq); @@ -109,9 +122,11 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { // common functons void removePacketState(uint32_t seq); + interface::RtcTransportRecoveryStrategies rs_type_; bool recovery_on_; bool rtx_on_; bool fec_on_; + bool content_sharing_mode_; // number of RTX sent after fec turned on // this is used to take into account jitter and out of order packets @@ -152,19 +167,9 @@ class RecoveryStrategy : public std::enable_shared_from_this<RecoveryStrategy> { RTCRateControl *rc_; private: - struct fec_state_ { - uint32_t fec_to_ask; - uint32_t last_update; // round id of the last update - // (wait 10 ruonds (2sec) between updates) - uint32_t consecutive_use; // consecutive ruonds where this fec was used - double avg_residual_losses; - }; - - void reduceFec(); - uint32_t round_id_; // number of rounds uint32_t last_fec_used_; - std::vector<fec_state_> fec_per_loss_rate_; + std::vector<uint32_t> fec_per_loss_rate_; interface::StrategyCallback callback_; }; diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.cc b/libtransport/src/protocols/rtc/rtc_rs_delay.cc index 4be751ec9..7d7a01133 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.cc @@ -25,8 +25,10 @@ namespace rtc { RecoveryStrategyDelayBased::RecoveryStrategyDelayBased( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, + rs_type, std::move(external_callback)), // start with rtx congestion_state_(false), probing_state_(false), @@ -48,7 +50,7 @@ void RecoveryStrategyDelayBased::turnOnRecovery() { recovery_on_ = true; uint64_t rtt = state_->getMinRTT(); uint32_t fec_to_ask = computeFecPacketsToAsk(); - if (rtt > 80 && fec_to_ask != 0) { + if (rtt > MAX_RTT_BEFORE_FEC && fec_to_ask > 0) { // we need to start FEC (see fec only strategy for more details) setRtxFec(true, true); rtx_during_fec_ = 1; // avoid to stop fec @@ -84,16 +86,16 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { return; } - uint64_t rtt = state_->getMinRTT(); + uint64_t rtt = state_->getAvgRTT(); - bool congestion = false; // XXX at the moment we are not looking at congestion events - // congestion = rc_->inCongestionState(); + // bool congestion = rc_->inCongestionState(); - if ((!fec_on_ && rtt >= 100) || (fec_on_ && rtt > 80) || congestion) { + if ((!fec_on_ && rtt >= MAX_RTT_BEFORE_FEC) || + (fec_on_ && rtt > (MAX_RTT_BEFORE_FEC - 10))) { // switch from rtx to fec or keep use fec. Notice that if some rtx are // waiting to be scheduled, they will be sent normally, but no new rtx will - // be created If the loss rate is 0 keep to use RTX. + // be created if the loss rate is 0 keep to use RTX. uint32_t fec_to_ask = computeFecPacketsToAsk(); softSwitchToFec(fec_to_ask); if (rtx_during_fec_ == 0) // if we do not send any RTX the losses @@ -104,7 +106,8 @@ void RecoveryStrategyDelayBased::onNewRound(bool in_sync) { return; } - if ((fec_on_ && rtt <= 80) || (!rtx_on_ && rtt <= 100)) { + if ((fec_on_ && rtt <= (MAX_RTT_BEFORE_FEC - 10)) || + (!rtx_on_ && rtt <= MAX_RTT_BEFORE_FEC)) { // turn on rtx softSwitchToFec(0); indexer_->setNFec(0); diff --git a/libtransport/src/protocols/rtc/rtc_rs_delay.h b/libtransport/src/protocols/rtc/rtc_rs_delay.h index 5ca90f4cb..9e1c41388 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_delay.h +++ b/libtransport/src/protocols/rtc/rtc_rs_delay.h @@ -26,6 +26,7 @@ class RecoveryStrategyDelayBased : public RecoveryStrategy { public: RecoveryStrategyDelayBased(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyDelayBased(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc index c44212bda..5b10823ec 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyFecOnly::RecoveryStrategyFecOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - std::move(external_callback)), + rs_type, std::move(external_callback)), congestion_state_(false), probing_state_(false), switch_rounds_(0) {} diff --git a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h index 1ab78b842..42df25bd9 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_fec_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_fec_only.h @@ -26,6 +26,7 @@ class RecoveryStrategyFecOnly : public RecoveryStrategy { public: RecoveryStrategyFecOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyFecOnly(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc index 48dd3e34f..dbad563cd 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.cc @@ -25,8 +25,10 @@ namespace rtc { RecoveryStrategyLowRate::RecoveryStrategyLowRate( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, true, + rs_type, std::move(external_callback)), // start with fec fec_consecutive_rounds_((MILLI_IN_A_SEC / ROUND_LEN) * 5), // 5 sec rtx_allowed_consecutive_rounds_(0) { @@ -75,7 +77,7 @@ void RecoveryStrategyLowRate::selectRecoveryStrategy(bool in_sync) { } uint32_t loss_rate = std::round(state_->getPerSecondLossRate() * 100); - uint32_t rtt = state_->getAvgRTT(); + uint32_t rtt = (uint32_t)state_->getAvgRTT(); bool use_rtx = false; for (size_t i = 0; i < switch_vector.size(); i++) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h index d66b197e2..0e76efaca 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_low_rate.h +++ b/libtransport/src/protocols/rtc/rtc_rs_low_rate.h @@ -34,6 +34,7 @@ class RecoveryStrategyLowRate : public RecoveryStrategy { public: RecoveryStrategyLowRate(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyLowRate(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc index 16b14eff6..00c6a0504 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, false, false, - std::move(external_callback)) {} + rs_type, std::move(external_callback)) {} RecoveryStrategyRecoveryOff::RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h index 3a9e71e7d..3d59cc473 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h +++ b/libtransport/src/protocols/rtc/rtc_rs_recovery_off.h @@ -26,6 +26,7 @@ class RecoveryStrategyRecoveryOff : public RecoveryStrategy { public: RecoveryStrategyRecoveryOff(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyRecoveryOff(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc index 8e5db5439..4d7cf7a82 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.cc @@ -25,9 +25,10 @@ namespace rtc { RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly( Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback) : RecoveryStrategy(indexer, std::move(callback), io_service, true, false, - std::move(external_callback)) {} + rs_type, std::move(external_callback)) {} RecoveryStrategyRtxOnly::RecoveryStrategyRtxOnly(RecoveryStrategy &&rs) : RecoveryStrategy(std::move(rs)) { diff --git a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h index e90e5ba13..03dbed1c7 100644 --- a/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h +++ b/libtransport/src/protocols/rtc/rtc_rs_rtx_only.h @@ -26,6 +26,7 @@ class RecoveryStrategyRtxOnly : public RecoveryStrategy { public: RecoveryStrategyRtxOnly(Indexer *indexer, SendRtxCallback &&callback, asio::io_service &io_service, + interface::RtcTransportRecoveryStrategies rs_type, interface::StrategyCallback &&external_callback); RecoveryStrategyRtxOnly(RecoveryStrategy &&rs); diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 5b3b5e4c3..82ac0b9c1 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -106,6 +106,7 @@ void RTCState::initParams() { // paths stats path_table_.clear(); main_path_ = nullptr; + edge_path_ = nullptr; // packet cache (not pending anymore) packet_cache_.clear(); @@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, } updatePacketSize(content_object); - updateReceivedBytes(content_object); + updateReceivedBytes(content_object, false); addRecvOrLost(seq, PacketState::RECEIVED); - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; @@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); - // updateReceivedBytes(content_object); - received_fec_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); - - if (seq > highest_seq_received_) highest_seq_received_ = seq; + updateReceivedBytes(content_object, true); PacketState state = getPacketState(seq); if (state != PacketState::LOST) { @@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } } + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } -void RTCState::onPacketRecoveredRtx(uint32_t seq) { +void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt) { + uint32_t seq = content_object.getName().getSuffix(); packets_sent_to_app_++; - if (seq > highest_seq_received_) highest_seq_received_ = seq; // increase the recovered packet counter only if the packet was marked as LOST // before. @@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) { if (state == PacketState::LOST) losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); + updateReceivedBytes(content_object, false); + + if (rtt == 0) return; // nothing to do + + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + if (path_it == path_table_.end()) { + // this is a new path and it must be a cache + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + if (path->pathToProducer()) + return; // this packet is coming from a producer + // even if we sent an RTX. this may happen + // for RTX that are sent too fast or in + // case of multipath + + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); } -void RTCState::onFecPacketRecoveredRtx(uint32_t seq) { +void RTCState::onFecPacketRecoveredRtx( + const core::ContentObject &content_object) { // This is the same as onPacketRecoveredRtx, but in this is case the // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards - if (seq > highest_seq_received_) highest_seq_received_ = seq; losses_recovered_++; + updateReceivedBytes(content_object, true); } void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { @@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { packets_sent_to_app_++; recovered_bytes_with_fec_ += size; - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where @@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // channel losses uint32_t last_round_packets = 0; + uint64_t min_edge_rtt = UINT_MAX; std::shared_ptr<RTCDataPath> old_main_path = main_path_; main_path_ = nullptr; + edge_path_ = nullptr; for (auto it = path_table_.begin(); it != path_table_.end(); it++) { - if (it->second->isActive()) { + if (it->second->isValidProducer()) { uint32_t pkt = it->second->getPacketsLastRound(); if (pkt > last_round_packets) { last_round_packets = pkt; main_path_ = it->second; } + } else if (it->second->isActive() && !it->second->pathToProducer()) { + // this is a path to a cache from where we are receiving content + if (it->second->getMinRtt() < min_edge_rtt) { + min_edge_rtt = it->second->getMinRtt(); + edge_path_ = it->second; + } } it->second->roundEnd(); } if (main_path_ == nullptr) main_path_ = old_main_path; + if (edge_path_ == nullptr) edge_path_ = main_path_; + if (edge_path_->getMinRtt() >= main_path_->getMinRtt()) + edge_path_ = main_path_; // in case we get a new main path we reset the stats of the old one. this is // beacuse, in case we need to switch back we don't what to take decisions on @@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) { rounds_++; } -void RTCState::updateReceivedBytes(const core::ContentObject &content_object) { - received_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); +void RTCState::updateReceivedBytes(const core::ContentObject &content_object, + bool isFec) { + if (isFec) { + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } else { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } } void RTCState::updatePacketSize(const core::ContentObject &content_object) { @@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) { addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } +void RTCState::updateHighestSeqReceived(uint32_t seq) { + if (seq > highest_seq_received_) highest_seq_received_ = seq; +} + void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { @@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { switch (ProbeHandler::getProbeType(seq)) { case ProbeType::INIT: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(probe)); + const_cast<core::ContentObject &>(probe).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; @@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { } case core::PayloadType::MANIFEST: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(data)); + const_cast<core::ContentObject &>(data).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; diff --git a/libtransport/src/protocols/rtc/rtc_state.h b/libtransport/src/protocols/rtc/rtc_state.h index 4bd2f76a0..ac3cc621f 100644 --- a/libtransport/src/protocols/rtc/rtc_state.h +++ b/libtransport/src/protocols/rtc/rtc_state.h @@ -84,8 +84,9 @@ class RTCState : public std::enable_shared_from_this<RTCState> { void onNackPacketReceived(const core::ContentObject &nack, bool compute_stats); void onPacketLost(uint32_t seq); - void onPacketRecoveredRtx(uint32_t seq); - void onFecPacketRecoveredRtx(uint32_t seq); + void onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt); + void onFecPacketRecoveredRtx(const core::ContentObject &content_object); void onPacketRecoveredFec(uint32_t seq, uint32_t size); bool onProbePacketReceived(const core::ContentObject &probe); void onJumpForward(uint32_t next_seq); @@ -117,6 +118,11 @@ class RTCState : public std::enable_shared_from_this<RTCState> { return 0; } + uint64_t getEdgeRtt() const { + if (edge_path_ != nullptr) return edge_path_->getMinRtt(); + return 0; + } + void resetRttStats() { if (mainPathIsValid()) main_path_->clearRtt(); } @@ -149,7 +155,7 @@ class RTCState : public std::enable_shared_from_this<RTCState> { } uint32_t getPendingInterestNumber() const { - return pending_interests_.size(); + return (uint32_t)pending_interests_.size(); } PacketState getPacketState(uint32_t seq) { @@ -242,6 +248,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // set it as TO_BE_RECEIVED. void dataToBeReceived(uint32_t seq); + void updateHighestSeqReceived(uint32_t seq); + // Extract RTC parameters from probes (init or RTT probes) and data packets. static core::ParamsRTC getProbeParams(const core::ContentObject &probe); static core::ParamsRTC getDataParams(const core::ContentObject &data); @@ -259,7 +267,8 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // update stats void updateState(); - void updateReceivedBytes(const core::ContentObject &content_object); + void updateReceivedBytes(const core::ContentObject &content_object, + bool isFec); void updatePacketSize(const core::ContentObject &content_object); void updatePathStats(const core::ContentObject &content_object, bool is_nack); void updateLossRate(bool in_sycn); @@ -360,7 +369,12 @@ class RTCState : public std::enable_shared_from_this<RTCState> { // paths stats std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> path_table_; - std::shared_ptr<RTCDataPath> main_path_; + std::shared_ptr<RTCDataPath> main_path_; // this is the path that connects + // the consumer to the producer. in + // case of multipath the trasnport + // uses the most active path + std::shared_ptr<RTCDataPath> edge_path_; // path to the closest cache if it + // exists // packet received // cache where to store info about the last MAX_CACHED_PACKETS diff --git a/libtransport/src/protocols/rtc/rtc_verifier.cc b/libtransport/src/protocols/rtc/rtc_verifier.cc index 7b6330a1f..861ceee89 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.cc +++ b/libtransport/src/protocols/rtc/rtc_verifier.cc @@ -22,11 +22,11 @@ namespace protocol { namespace rtc { RTCVerifier::RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_interval, - double max_unverified_ratio) + uint32_t factor_relevant, uint32_t factor_alert) : verifier_(verifier), - max_unverified_interval_(max_unverified_interval), - max_unverified_ratio_(max_unverified_ratio) {} + factor_relevant_(factor_relevant), + factor_alert_(factor_alert), + manifest_max_capacity_(std::numeric_limits<uint8_t>::max()) {} void RTCVerifier::setState(std::shared_ptr<RTCState> rtc_state) { rtc_state_ = rtc_state; @@ -36,12 +36,16 @@ void RTCVerifier::setVerifier(std::shared_ptr<auth::Verifier> verifier) { verifier_ = verifier; } -void RTCVerifier::setMaxUnverifiedInterval(uint32_t max_unverified_interval) { - max_unverified_interval_ = max_unverified_interval; +void RTCVerifier::setFactorRelevant(uint32_t factor_relevant) { + factor_relevant_ = factor_relevant; } -void RTCVerifier::setMaxUnverifiedRatio(double max_unverified_ratio) { - max_unverified_ratio_ = max_unverified_ratio; +void RTCVerifier::setFactorAlert(uint32_t factor_alert) { + factor_alert_ = factor_alert; +} + +auth::VerificationPolicy RTCVerifier::verify(core::Interest &interest) { + return verifier_->verifyPackets(&interest); } auth::VerificationPolicy RTCVerifier::verify( @@ -108,19 +112,27 @@ auth::VerificationPolicy RTCVerifier::verifyData( auth::Suffix suffix = content_object.getName().getSuffix(); auth::VerificationPolicy policy = auth::VerificationPolicy::ABORT; - Timestamp now = utils::SteadyTime::nowMs().count(); - // Flush old packets - Timestamp oldest = flush_packets(now); + uint32_t threshold_relevant = factor_relevant_ * manifest_max_capacity_; + uint32_t threshold_alert = factor_alert_ * manifest_max_capacity_; - // Add packet to map of unverified packets - packets_unverif_.add( - {.suffix = suffix, .timestamp = now, .size = content_object.length()}, - content_object.computeDigest(manifest_hash_algo_)); + // Flush packets outside relevance window + for (auto it = packets_unverif_.set().begin(); + it != packets_unverif_.set().end();) { + if (it->first > current_index_ - threshold_relevant) { + break; + } + packets_unverif_erased_.insert((unsigned int)it->first); + it = packets_unverif_.remove(it); + } + + // Add packet to set of unverified packets + packets_unverif_.add({current_index_, suffix}, + content_object.computeDigest(manifest_hash_algo_)); + current_index_++; - // Check that the ratio of unverified packets stays below the limit - if (now - oldest < max_unverified_interval_ || - getBufferRatio() < max_unverified_ratio_) { + // Check that the number of unverified packets is below the alert threshold + if (packets_unverif_.set().size() <= threshold_alert) { policy = auth::VerificationPolicy::ACCEPT; } @@ -139,18 +151,13 @@ auth::VerificationPolicy RTCVerifier::processManifest( auth::VerificationPolicy accept_policy = auth::VerificationPolicy::ACCEPT; // Decode manifest - core::ContentObjectManifest manifest(content_object); + core::ContentObjectManifest manifest(content_object.shared_from_this()); manifest.decode(); - // Update last manifest - if (suffix > last_manifest_) { - last_manifest_ = suffix; - } - - // Extract hash algorithm and hashes + // Extract manifest data + manifest_max_capacity_ = manifest.getMaxCapacity(); manifest_hash_algo_ = manifest.getHashAlgorithm(); - auth::Verifier::SuffixMap suffix_map = - core::ContentObjectManifest::getSuffixMap(&manifest); + auth::Verifier::SuffixMap suffix_map = manifest.getSuffixMap(); // Return early if the manifest is empty if (suffix_map.empty()) { @@ -186,10 +193,7 @@ auth::VerificationPolicy RTCVerifier::processManifest( for (const auto &p : policies) { switch (p.second) { case auth::VerificationPolicy::ACCEPT: { - auto packet_unverif_it = packets_unverif_.packetIt(p.first); - Packet packet_verif = *packet_unverif_it; - packets_unverif_.remove(packet_unverif_it); - packets_verif_.add(packet_verif); + packets_unverif_.remove(packets_unverif_.packet(p.first)); manifest_digests_.erase(p.first); break; } @@ -209,69 +213,20 @@ void RTCVerifier::onDataRecoveredFec(uint32_t suffix) { manifest_digests_.erase(suffix); } -void RTCVerifier::onJumpForward(uint32_t next_suffix) { - if (next_suffix <= last_manifest_ + 1) { - return; - } - - // When we jump forward in the suffix sequence, we remove packets that won't - // be verified. Those packets have a suffix in the range [last_manifest_ + 1, - // next_suffix[. - for (auth::Suffix suffix = last_manifest_ + 1; suffix < next_suffix; - ++suffix) { - auto packet_it = packets_unverif_.packetIt(suffix); - if (packet_it != packets_unverif_.set().end()) { - packets_unverif_.remove(packet_it); - } - } -} - -double RTCVerifier::getBufferRatio() const { - size_t total = packets_verif_.size() + packets_unverif_.size(); - double total_unverified = static_cast<double>(packets_unverif_.size()); - return total ? total_unverified / total : 0.0; -} - -RTCVerifier::Timestamp RTCVerifier::flush_packets(Timestamp now) { - Timestamp oldest_verified = packets_verif_.set().empty() - ? now - : packets_verif_.set().begin()->timestamp; - Timestamp oldest_unverified = packets_unverif_.set().empty() - ? now - : packets_unverif_.set().begin()->timestamp; - - // Prune verified packets older than the unverified interval - for (auto it = packets_verif_.set().begin(); - it != packets_verif_.set().end();) { - if (now - it->timestamp < max_unverified_interval_) { - break; - } - it = packets_verif_.remove(it); - } - - // Prune unverified packets older than the unverified interval - for (auto it = packets_unverif_.set().begin(); - it != packets_unverif_.set().end();) { - if (now - it->timestamp < max_unverified_interval_) { - break; - } - packets_unverif_erased_.insert(it->suffix); - it = packets_unverif_.remove(it); - } - - return std::min(oldest_verified, oldest_unverified); -} - std::pair<RTCVerifier::PacketSet::iterator, bool> RTCVerifier::Packets::add( - const Packet &packet) { + const Packet &packet, const auth::CryptoHash &digest) { auto inserted = packets_.insert(packet); - size_ += inserted.second ? packet.size : 0; + if (inserted.second) { + packets_map_[packet.second] = inserted.first; + suffix_map_[packet.second] = digest; + } return inserted; } RTCVerifier::PacketSet::iterator RTCVerifier::Packets::remove( PacketSet::iterator packet_it) { - size_ -= packet_it->size; + packets_map_.erase(packet_it->second); + suffix_map_.erase(packet_it->second); return packets_.erase(packet_it); } @@ -279,35 +234,13 @@ const std::set<RTCVerifier::Packet> &RTCVerifier::Packets::set() const { return packets_; }; -size_t RTCVerifier::Packets::size() const { return size_; }; - -std::pair<RTCVerifier::PacketSet::iterator, bool> -RTCVerifier::PacketsUnverif::add(const Packet &packet, - const auth::CryptoHash &digest) { - auto inserted = add(packet); - if (inserted.second) { - packets_map_[packet.suffix] = inserted.first; - digests_map_[packet.suffix] = digest; - } - return inserted; -} - -RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::remove( - PacketSet::iterator packet_it) { - size_ -= packet_it->size; - packets_map_.erase(packet_it->suffix); - digests_map_.erase(packet_it->suffix); - return packets_.erase(packet_it); -} - -RTCVerifier::PacketSet::iterator RTCVerifier::PacketsUnverif::packetIt( +RTCVerifier::PacketSet::iterator RTCVerifier::Packets::packet( auth::Suffix suffix) { return packets_map_.at(suffix); }; -const auth::Verifier::SuffixMap &RTCVerifier::PacketsUnverif::suffixMap() - const { - return digests_map_; +const auth::Verifier::SuffixMap &RTCVerifier::Packets::suffixMap() const { + return suffix_map_; } } // end namespace rtc diff --git a/libtransport/src/protocols/rtc/rtc_verifier.h b/libtransport/src/protocols/rtc/rtc_verifier.h index 098984057..c83faf08a 100644 --- a/libtransport/src/protocols/rtc/rtc_verifier.h +++ b/libtransport/src/protocols/rtc/rtc_verifier.h @@ -27,19 +27,16 @@ namespace rtc { class RTCVerifier { public: explicit RTCVerifier(std::shared_ptr<auth::Verifier> verifier, - uint32_t max_unverified_interval, - double max_unverified_ratio); + uint32_t factor_relevant, uint32_t factor_alert); virtual ~RTCVerifier() = default; void setState(std::shared_ptr<RTCState> rtc_state); - void setVerifier(std::shared_ptr<auth::Verifier> verifier); + void setFactorRelevant(uint32_t factor_relevant); + void setFactorAlert(uint32_t factor_alert); - void setMaxUnverifiedInterval(uint32_t max_unverified_interval); - - void setMaxUnverifiedRatio(double max_unverified_ratio); - + auth::VerificationPolicy verify(core::Interest &interest); auth::VerificationPolicy verify(core::ContentObject &content_object, bool is_fec = false); auth::VerificationPolicy verifyProbe(core::ContentObject &content_object); @@ -51,81 +48,47 @@ class RTCVerifier { auth::VerificationPolicy processManifest(core::ContentObject &content_object); void onDataRecoveredFec(uint32_t suffix); - void onJumpForward(uint32_t next_suffix); - - double getBufferRatio() const; protected: - struct Packet; - using Timestamp = uint64_t; + using Index = uint64_t; + using Packet = std::pair<Index, auth::Suffix>; using PacketSet = std::set<Packet>; - struct Packet { - auth::Suffix suffix; - Timestamp timestamp; - size_t size; - - bool operator==(const Packet &b) const { - return timestamp == b.timestamp && suffix == b.suffix; - } - bool operator<(const Packet &b) const { - return timestamp == b.timestamp ? suffix < b.suffix - : timestamp < b.timestamp; - } - }; - class Packets { public: - virtual std::pair<PacketSet::iterator, bool> add(const Packet &packet); - virtual PacketSet::iterator remove(PacketSet::iterator packet_it); - const PacketSet &set() const; - size_t size() const; - - protected: - PacketSet packets_; - size_t size_; - }; - - class PacketsVerif : public Packets {}; - - class PacketsUnverif : public Packets { - public: - using Packets::add; std::pair<PacketSet::iterator, bool> add(const Packet &packet, const auth::CryptoHash &digest); - PacketSet::iterator remove(PacketSet::iterator packet_it) override; - PacketSet::iterator packetIt(auth::Suffix suffix); + PacketSet::iterator remove(PacketSet::iterator packet_it); + const PacketSet &set() const; + PacketSet::iterator packet(auth::Suffix suffix); const auth::Verifier::SuffixMap &suffixMap() const; private: + PacketSet packets_; std::unordered_map<auth::Suffix, PacketSet::iterator> packets_map_; - auth::Verifier::SuffixMap digests_map_; + auth::Verifier::SuffixMap suffix_map_; }; // The RTC state. std::shared_ptr<RTCState> rtc_state_; // The verifier instance. std::shared_ptr<auth::Verifier> verifier_; - // Window to consider when verifying packets. - uint32_t max_unverified_interval_; - // Ratio of unverified packets over which an alert is triggered. - double max_unverified_ratio_; - // The suffix of the last processed manifest. - auth::Suffix last_manifest_; + // Used to compute the relevance windows size (in packets). + uint32_t factor_relevant_; + // Used to compute the alert threshold (in packets). + uint32_t factor_alert_; + // The maximum number of entries a manifest can contain. + uint8_t manifest_max_capacity_; // Hash algorithm used by manifests. auth::CryptoHashType manifest_hash_algo_; // Digests extracted from all manifests received. auth::Verifier::SuffixMap manifest_digests_; - // Verified packets with timestamp >= now - max_unverified_interval_. - PacketsVerif packets_verif_; - // Unverified packets with timestamp >= now - max_unverified_interval_. - PacketsUnverif packets_unverif_; - // Unverified erased packets with timestamp < now - max_unverified_interval_. + // The number of data packets processed. + Index current_index_; + // Unverified packets with index in relevance window. + Packets packets_unverif_; + // Unverified erased packets with index outside relevance window. std::unordered_set<auth::Suffix> packets_unverif_erased_; - - // Flushes all packets with timestamp < now - max_unverified_interval_. - // Returns the timestamp of the oldest packet, verified or not. - Timestamp flush_packets(Timestamp now); }; } // namespace rtc |