aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc_state.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc_state.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc_state.cc80
1 files changed, 60 insertions, 20 deletions
diff --git a/libtransport/src/protocols/rtc/rtc_state.cc b/libtransport/src/protocols/rtc/rtc_state.cc
index 5b3b5e4c3..82ac0b9c1 100644
--- a/libtransport/src/protocols/rtc/rtc_state.cc
+++ b/libtransport/src/protocols/rtc/rtc_state.cc
@@ -106,6 +106,7 @@ void RTCState::initParams() {
// paths stats
path_table_.clear();
main_path_ = nullptr;
+ edge_path_ = nullptr;
// packet cache (not pending anymore)
packet_cache_.clear();
@@ -231,11 +232,9 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
}
updatePacketSize(content_object);
- updateReceivedBytes(content_object);
+ updateReceivedBytes(content_object, false);
addRecvOrLost(seq, PacketState::RECEIVED);
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// the producer is responding
// it is generating valid data packets so we consider it active
producer_is_active_ = true;
@@ -245,11 +244,7 @@ void RTCState::onDataPacketReceived(const core::ContentObject &content_object,
void RTCState::onFecPacketReceived(const core::ContentObject &content_object) {
uint32_t seq = content_object.getName().getSuffix();
- // updateReceivedBytes(content_object);
- received_fec_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
-
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
+ updateReceivedBytes(content_object, true);
PacketState state = getPacketState(seq);
if (state != PacketState::LOST) {
@@ -328,12 +323,14 @@ void RTCState::onPacketLost(uint32_t seq) {
DLOG_IF(INFO, VLOG_IS_ON(4)) << "packet " << seq << " is lost";
}
}
+
addRecvOrLost(seq, PacketState::DEFINITELY_LOST);
}
-void RTCState::onPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onPacketRecoveredRtx(const core::ContentObject &content_object,
+ uint64_t rtt) {
+ uint32_t seq = content_object.getName().getSuffix();
packets_sent_to_app_++;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
// increase the recovered packet counter only if the packet was marked as LOST
// before.
@@ -341,13 +338,37 @@ void RTCState::onPacketRecoveredRtx(uint32_t seq) {
if (state == PacketState::LOST) losses_recovered_++;
addRecvOrLost(seq, PacketState::RECEIVED);
+ updateReceivedBytes(content_object, false);
+
+ if (rtt == 0) return; // nothing to do
+
+ uint32_t path_label = content_object.getPathLabel();
+ auto path_it = path_table_.find(path_label);
+ if (path_it == path_table_.end()) {
+ // this is a new path and it must be a cache
+ std::shared_ptr<RTCDataPath> newPath =
+ std::make_shared<RTCDataPath>(path_label);
+ auto ret = path_table_.insert(
+ std::pair<uint32_t, std::shared_ptr<RTCDataPath>>(path_label, newPath));
+ path_it = ret.first;
+ }
+
+ auto path = path_it->second;
+ if (path->pathToProducer())
+ return; // this packet is coming from a producer
+ // even if we sent an RTX. this may happen
+ // for RTX that are sent too fast or in
+ // case of multipath
+
+ path->insertRttSample(utils::SteadyTime::Milliseconds(rtt), true);
}
-void RTCState::onFecPacketRecoveredRtx(uint32_t seq) {
+void RTCState::onFecPacketRecoveredRtx(
+ const core::ContentObject &content_object) {
// This is the same as onPacketRecoveredRtx, but in this is case the
// pkt is also a FEC pkt, the addRecvOrLost will be called afterwards
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
losses_recovered_++;
+ updateReceivedBytes(content_object, true);
}
void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
@@ -355,8 +376,6 @@ void RTCState::onPacketRecoveredFec(uint32_t seq, uint32_t size) {
packets_sent_to_app_++;
recovered_bytes_with_fec_ += size;
- if (seq > highest_seq_received_) highest_seq_received_ = seq;
-
// adding header to the count
recovered_bytes_with_fec_ += 60; // XXX get header size some where
@@ -487,21 +506,32 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
// channel losses
uint32_t last_round_packets = 0;
+ uint64_t min_edge_rtt = UINT_MAX;
std::shared_ptr<RTCDataPath> old_main_path = main_path_;
main_path_ = nullptr;
+ edge_path_ = nullptr;
for (auto it = path_table_.begin(); it != path_table_.end(); it++) {
- if (it->second->isActive()) {
+ if (it->second->isValidProducer()) {
uint32_t pkt = it->second->getPacketsLastRound();
if (pkt > last_round_packets) {
last_round_packets = pkt;
main_path_ = it->second;
}
+ } else if (it->second->isActive() && !it->second->pathToProducer()) {
+ // this is a path to a cache from where we are receiving content
+ if (it->second->getMinRtt() < min_edge_rtt) {
+ min_edge_rtt = it->second->getMinRtt();
+ edge_path_ = it->second;
+ }
}
it->second->roundEnd();
}
if (main_path_ == nullptr) main_path_ = old_main_path;
+ if (edge_path_ == nullptr) edge_path_ = main_path_;
+ if (edge_path_->getMinRtt() >= main_path_->getMinRtt())
+ edge_path_ = main_path_;
// in case we get a new main path we reset the stats of the old one. this is
// beacuse, in case we need to switch back we don't what to take decisions on
@@ -551,9 +581,15 @@ void RTCState::onNewRound(double round_len, bool in_sync) {
rounds_++;
}
-void RTCState::updateReceivedBytes(const core::ContentObject &content_object) {
- received_bytes_ +=
- (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+void RTCState::updateReceivedBytes(const core::ContentObject &content_object,
+ bool isFec) {
+ if (isFec) {
+ received_fec_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ } else {
+ received_bytes_ +=
+ (uint32_t)(content_object.headerSize() + content_object.payloadSize());
+ }
}
void RTCState::updatePacketSize(const core::ContentObject &content_object) {
@@ -703,6 +739,10 @@ void RTCState::dataToBeReceived(uint32_t seq) {
addToPacketCache(seq, PacketState::TO_BE_RECEIVED);
}
+void RTCState::updateHighestSeqReceived(uint32_t seq) {
+ if (seq > highest_seq_received_) highest_seq_received_ = seq;
+}
+
void RTCState::addRecvOrLost(uint32_t seq, PacketState state) {
auto it = pending_interests_.find(seq);
if (it != pending_interests_.end()) {
@@ -803,7 +843,7 @@ core::ParamsRTC RTCState::getProbeParams(const core::ContentObject &probe) {
switch (ProbeHandler::getProbeType(seq)) {
case ProbeType::INIT: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(probe));
+ const_cast<core::ContentObject &>(probe).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;
@@ -841,7 +881,7 @@ core::ParamsRTC RTCState::getDataParams(const core::ContentObject &data) {
}
case core::PayloadType::MANIFEST: {
core::ContentObjectManifest manifest(
- const_cast<core::ContentObject &>(data));
+ const_cast<core::ContentObject &>(data).shared_from_this());
manifest.decode();
params = manifest.getParamsRTC();
break;