diff options
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 96 |
1 files changed, 60 insertions, 36 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index 242abd30d..e49f58167 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -33,13 +33,13 @@ RTCProductionProtocol::RTCProductionProtocol( implementation::ProducerSocket *icn_socket) : ProductionProtocol(icn_socket), current_seg_(1), + prev_produced_bytes_(0), + prev_produced_packets_(0), produced_bytes_(0), produced_packets_(0), - produced_fec_packets_(0), - max_packet_production_(1), - bytes_production_rate_(0), + max_packet_production_(UINT32_MAX), + bytes_production_rate_(UINT32_MAX), packets_production_rate_(0), - fec_packets_production_rate_(0), last_produced_data_ts_(0), last_round_(utils::SteadyTime::nowMs().count()), allow_delayed_nacks_(false), @@ -116,32 +116,47 @@ void RTCProductionProtocol::scheduleRoundTimer() { auto sp = self.lock(); if (sp && sp->isRunning()) { - sp->updateStats(); + sp->updateStats(true); } }); } -void RTCProductionProtocol::updateStats() { +void RTCProductionProtocol::updateStats(bool new_round) { uint64_t now = utils::SteadyTime::nowMs().count(); uint64_t duration = now - last_round_; - if (duration == 0) duration = 1; + if (!new_round) { + duration += rtc::PRODUCER_STATS_INTERVAL; + } else { + prev_produced_bytes_ = 0; + prev_produced_packets_ = 0; + } + double per_second = rtc::MILLI_IN_A_SEC / duration; uint32_t prev_packets_production_rate = packets_production_rate_; - bytes_production_rate_ = ceil((double)produced_bytes_ * per_second); - packets_production_rate_ = ceil((double)produced_packets_ * per_second); - fec_packets_production_rate_ = - ceil((double)produced_fec_packets_ * per_second); + // bytes_production_rate_ does not take into account FEC!!! this is because + // each client requests a differen amount of FEC packet so the client itself + // increase the production rate in the right way + bytes_production_rate_ = + ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second); + packets_production_rate_ = + ceil((double)(produced_packets_ + prev_produced_packets_) * per_second); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Updating production rate: produced_bytes_ = " << produced_bytes_ - << " bps = " << bytes_production_rate_; + // add fec packets looking at the fec code. we don't use directly the number + // of fec packets produced in 1 round because it may happen that different + // numbers of blocks are generated during the rounds and this creates + // inconsistencies in the estimation of the production rate + uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_); + uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_); + + packets_production_rate_ += + ceil((double)packets_production_rate_ / (double)k) * (n - k); // update the production rate as soon as it increases by 10% with respect to // the last round max_packet_production_ = - produced_packets_ + ceil((double)produced_packets_ * 0.1); + produced_packets_ + ceil((double)produced_packets_ * 0.10); if (max_packet_production_ < rtc::WIN_MIN) max_packet_production_ = rtc::WIN_MIN; @@ -158,11 +173,14 @@ void RTCProductionProtocol::updateStats() { sendNacksForPendingInterests(); } - produced_bytes_ = 0; - produced_packets_ = 0; - produced_fec_packets_ = 0; - last_round_ = now; - scheduleRoundTimer(); + if (new_round) { + prev_produced_bytes_ = produced_bytes_; + prev_produced_packets_ = produced_packets_; + produced_bytes_ = 0; + produced_packets_ = 0; + last_round_ = now; + scheduleRoundTimer(); + } } uint32_t RTCProductionProtocol::produceStream( @@ -387,7 +405,7 @@ RTCProductionProtocol::createManifest(const Name &content_name) const { .timestamp = now, .prod_rate = bytes_production_rate_, .prod_seg = current_seg_, - .support_fec = false, + .fec_type = fec_type_, }); return manifest; @@ -434,16 +452,13 @@ void RTCProductionProtocol::producePktInternal( produced_bytes_ += content_object->headerSize() + content_object->payloadSize(); produced_packets_++; - } else { - produced_fec_packets_++; } if (!data_aggregation_ && produced_packets_ >= max_packet_production_) { // in this case all the pending interests may be used to accomodate the // sudden increase in the production rate. calling the updateStats we will // notify all the clients - round_timer_->cancel(); - updateStats(); + updateStats(false); } DLOG_IF(INFO, VLOG_IS_ON(3)) @@ -616,7 +631,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, // if the production rate 0 use delayed nacks if (allow_delayed_nacks_ && interest_seg >= current_seg_) { - uint64_t next_timer = ~0; + uint64_t next_timer = UINT64_MAX; if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } @@ -652,8 +667,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg, (double)((double)((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / rtc::MILLI_IN_A_SEC) * - (double)(packets_production_rate_ + - fec_packets_production_rate_))); + (double)(packets_production_rate_))); if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { sendNack(interest_seg); @@ -723,20 +737,30 @@ void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg, void RTCProductionProtocol::sendNacksForPendingInterests() { std::unordered_set<uint32_t> to_remove; - uint32_t packet_gap = 100000; // set it to a high value (100sec) - if (packets_production_rate_ != 0) - packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); + uint32_t pps = ceil((double)(packets_production_rate_)*rtc:: + INTEREST_LIFETIME_REDUCTION_FACTOR); uint64_t now = utils::SteadyTime::nowMs().count(); - for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { - if (it->first > current_seg_) { - uint64_t production_time = - ((it->first - current_seg_) * packet_gap) + now; - if (production_time >= it->second) { + if (it->first > current_seg_ && it->second > now) { + double exp_time_in_sec = + (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC; + uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec); + + if (it->first > (current_seg_ + packets_prod_before_expire)) { sendNack(it->first); to_remove.insert(it->first); } + } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ || + it->second <= now)) { + // this branch should never be execcuted + // first condition: the packet was already prdocued and we have and old + // interest pending. send a nack to notify the consumer if needed. the + // case it->first = current_seg_ is not handled because + // the interest will be satified by the next data packet. + // second condition: the interest is expired. + sendNack(it->first); + to_remove.insert(it->first); } } |