diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc_state.cc | 80 |
1 files changed, 60 insertions, 20 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc index 5b3b5e4c3..82ac0b9c1 100644 --- a/libtransport/src/protocols/rtc/rtc_state.cc +++ b/libtransport/src/protocols/rtc/rtc_state.cc @@ -106,6 +106,7 @@ void RTCState::initParams() { // paths stats path_table_.clear(); main_path_ = nullptr; + edge_path_ = nullptr; // packet cache (not pending anymore) packet_cache_.clear(); @@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, } updatePacketSize(content_object); - updateReceivedBytes(content_object); + updateReceivedBytes(content_object, false); addRecvOrLost(seq, PacketState::RECEIVED); - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // the producer is responding // it is generating valid data packets so we consider it active producer_is_active_ = true; @@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object, void RTCState::onFecPacketReceived(const core::ContentObject &content_object) { uint32_t seq = content_object.getName().getSuffix(); - // updateReceivedBytes(content_object); - received_fec_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); - - if (seq > highest_seq_received_) highest_seq_received_ = seq; + updateReceivedBytes(content_object, true); PacketState state = getPacketState(seq); if (state != PacketState::LOST) { @@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost"; } } + addRecvOrLost(seq, PacketState::DEFINITELY_LOST); } -void RTCState::onPacketRecoveredRtx(uint32_t seq) { +void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object, + uint64_t rtt) { + uint32_t seq = content_object.getName().getSuffix(); packets_sent_to_app_++; - if (seq > highest_seq_received_) highest_seq_received_ = seq; // increase the recovered packet counter only if the packet was marked as LOST // before. @@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) { if (state == PacketState::LOST) losses_recovered_++; addRecvOrLost(seq, PacketState::RECEIVED); + updateReceivedBytes(content_object, false); + + if (rtt == 0) return; // nothing to do + + uint32_t path_label = content_object.getPathLabel(); + auto path_it = path_table_.find(path_label); + if (path_it == path_table_.end()) { + // this is a new path and it must be a cache + std::shared_ptr<RTCDataPath> newPath = + std::make_shared<RTCDataPath>(path_label); + auto ret = path_table_.insert( + std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath)); + path_it = ret.first; + } + + auto path = path_it->second; + if (path->pathToProducer()) + return; // this packet is coming from a producer + // even if we sent an RTX. this may happen + // for RTX that are sent too fast or in + // case of multipath + + path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true); } -void RTCState::onFecPacketRecoveredRtx(uint32_t seq) { +void RTCState::onFecPacketRecoveredRtx( + const core::ContentObject &content_object) { // This is the same as onPacketRecoveredRtx, but in this is case the // pkt is also a FEC pkt, the addRecvOrLost will be called afterwards - if (seq > highest_seq_received_) highest_seq_received_ = seq; losses_recovered_++; + updateReceivedBytes(content_object, true); } void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { @@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) { packets_sent_to_app_++; recovered_bytes_with_fec_ += size; - if (seq > highest_seq_received_) highest_seq_received_ = seq; - // adding header to the count recovered_bytes_with_fec_ += 60; // XXX get header size some where @@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) { // channel losses uint32_t last_round_packets = 0; + uint64_t min_edge_rtt = UINT_MAX; std::shared_ptr<RTCDataPath> old_main_path = main_path_; main_path_ = nullptr; + edge_path_ = nullptr; for (auto it = path_table_.begin(); it != path_table_.end(); it++) { - if (it->second->isActive()) { + if (it->second->isValidProducer()) { uint32_t pkt = it->second->getPacketsLastRound(); if (pkt > last_round_packets) { last_round_packets = pkt; main_path_ = it->second; } + } else if (it->second->isActive() && !it->second->pathToProducer()) { + // this is a path to a cache from where we are receiving content + if (it->second->getMinRtt() < min_edge_rtt) { + min_edge_rtt = it->second->getMinRtt(); + edge_path_ = it->second; + } } it->second->roundEnd(); } if (main_path_ == nullptr) main_path_ = old_main_path; + if (edge_path_ == nullptr) edge_path_ = main_path_; + if (edge_path_->getMinRtt() >= main_path_->getMinRtt()) + edge_path_ = main_path_; // in case we get a new main path we reset the stats of the old one. this is // beacuse, in case we need to switch back we don't what to take decisions on @@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) { rounds_++; } -void RTCState::updateReceivedBytes(const core::ContentObject &content_object) { - received_bytes_ += - (uint32_t)(content_object.headerSize() + content_object.payloadSize()); +void RTCState::updateReceivedBytes(const core::ContentObject &content_object, + bool isFec) { + if (isFec) { + received_fec_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } else { + received_bytes_ += + (uint32_t)(content_object.headerSize() + content_object.payloadSize()); + } } void RTCState::updatePacketSize(const core::ContentObject &content_object) { @@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) { addToPacketCache(seq, PacketState::TO_BE_RECEIVED); } +void RTCState::updateHighestSeqReceived(uint32_t seq) { + if (seq > highest_seq_received_) highest_seq_received_ = seq; +} + void RTCState::addRecvOrLost(uint32_t seq, PacketState state) { auto it = pending_interests_.find(seq); if (it != pending_interests_.end()) { @@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) { switch (ProbeHandler::getProbeType(seq)) { case ProbeType::INIT: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(probe)); + const_cast<core::ContentObject &>(probe).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; @@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) { } case core::PayloadType::MANIFEST: { core::ContentObjectManifest manifest( - const_cast<core::ContentObject &>(data)); + const_cast<core::ContentObject &>(data).shared_from_this()); manifest.decode(); params = manifest.getParamsRTC(); break; |