aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc96
1 files changed, 60 insertions, 36 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index 242abd30d..e49f58167 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -33,13 +33,13 @@ RTCProductionProtocol::RTCProductionProtocol(
implementation::ProducerSocket *icn_socket)
: ProductionProtocol(icn_socket),
current_seg_(1),
+ prev_produced_bytes_(0),
+ prev_produced_packets_(0),
produced_bytes_(0),
produced_packets_(0),
- produced_fec_packets_(0),
- max_packet_production_(1),
- bytes_production_rate_(0),
+ max_packet_production_(UINT32_MAX),
+ bytes_production_rate_(UINT32_MAX),
packets_production_rate_(0),
- fec_packets_production_rate_(0),
last_produced_data_ts_(0),
last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
@@ -116,32 +116,47 @@ void RTCProductionProtocol::scheduleRoundTimer() {
auto sp = self.lock();
if (sp && sp->isRunning()) {
- sp->updateStats();
+ sp->updateStats(true);
}
});
}
-void RTCProductionProtocol::updateStats() {
+void RTCProductionProtocol::updateStats(bool new_round) {
uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t duration = now - last_round_;
- if (duration == 0) duration = 1;
+ if (!new_round) {
+ duration += rtc::PRODUCER_STATS_INTERVAL;
+ } else {
+ prev_produced_bytes_ = 0;
+ prev_produced_packets_ = 0;
+ }
+
double per_second = rtc::MILLI_IN_A_SEC / duration;
uint32_t prev_packets_production_rate = packets_production_rate_;
- bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
- packets_production_rate_ = ceil((double)produced_packets_ * per_second);
- fec_packets_production_rate_ =
- ceil((double)produced_fec_packets_ * per_second);
+ // bytes_production_rate_ does not take into account FEC!!! this is because
+ // each client requests a differen amount of FEC packet so the client itself
+ // increase the production rate in the right way
+ bytes_production_rate_ =
+ ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second);
+ packets_production_rate_ =
+ ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Updating production rate: produced_bytes_ = " << produced_bytes_
- << " bps = " << bytes_production_rate_;
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
// update the production rate as soon as it increases by 10% with respect to
// the last round
max_packet_production_ =
- produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ produced_packets_ + ceil((double)produced_packets_ * 0.10);
if (max_packet_production_ < rtc::WIN_MIN)
max_packet_production_ = rtc::WIN_MIN;
@@ -158,11 +173,14 @@ void RTCProductionProtocol::updateStats() {
sendNacksForPendingInterests();
}
- produced_bytes_ = 0;
- produced_packets_ = 0;
- produced_fec_packets_ = 0;
- last_round_ = now;
- scheduleRoundTimer();
+ if (new_round) {
+ prev_produced_bytes_ = produced_bytes_;
+ prev_produced_packets_ = produced_packets_;
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
+ }
}
uint32_t RTCProductionProtocol::produceStream(
@@ -387,7 +405,7 @@ RTCProductionProtocol::createManifest(const Name &content_name) const {
.timestamp = now,
.prod_rate = bytes_production_rate_,
.prod_seg = current_seg_,
- .support_fec = false,
+ .fec_type = fec_type_,
});
return manifest;
@@ -434,16 +452,13 @@ void RTCProductionProtocol::producePktInternal(
produced_bytes_ +=
content_object->headerSize() + content_object->payloadSize();
produced_packets_++;
- } else {
- produced_fec_packets_++;
}
if (!data_aggregation_ && produced_packets_ >= max_packet_production_) {
// in this case all the pending interests may be used to accomodate the
// sudden increase in the production rate. calling the updateStats we will
// notify all the clients
- round_timer_->cancel();
- updateStats();
+ updateStats(false);
}
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -616,7 +631,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
// if the production rate 0 use delayed nacks
if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = ~0;
+ uint64_t next_timer = UINT64_MAX;
if (!timers_map_.empty()) {
next_timer = timers_map_.begin()->first;
}
@@ -652,8 +667,7 @@ void RTCProductionProtocol::processInterest(uint32_t interest_seg,
(double)((double)((double)lifetime *
rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_ +
- fec_packets_production_rate_)));
+ (double)(packets_production_rate_)));
if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
sendNack(interest_seg);
@@ -723,20 +737,30 @@ void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
void RTCProductionProtocol::sendNacksForPendingInterests() {
std::unordered_set<uint32_t> to_remove;
- uint32_t packet_gap = 100000; // set it to a high value (100sec)
- if (packets_production_rate_ != 0)
- packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+ uint32_t pps = ceil((double)(packets_production_rate_)*rtc::
+ INTEREST_LIFETIME_REDUCTION_FACTOR);
uint64_t now = utils::SteadyTime::nowMs().count();
-
for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_) {
- uint64_t production_time =
- ((it->first - current_seg_) * packet_gap) + now;
- if (production_time >= it->second) {
+ if (it->first > current_seg_ && it->second > now) {
+ double exp_time_in_sec =
+ (double)(it->second - now) / (double)rtc::MILLI_IN_A_SEC;
+ uint32_t packets_prod_before_expire = ceil((double)pps * exp_time_in_sec);
+
+ if (it->first > (current_seg_ + packets_prod_before_expire)) {
sendNack(it->first);
to_remove.insert(it->first);
}
+ } else if (TRANSPORT_EXPECT_FALSE(it->first < current_seg_ ||
+ it->second <= now)) {
+ // this branch should never be execcuted
+ // first condition: the packet was already prdocued and we have and old
+ // interest pending. send a nack to notify the consumer if needed. the
+ // case it->first = current_seg_ is not handled because
+ // the interest will be satified by the next data packet.
+ // second condition: the interest is expired.
+ sendNack(it->first);
+ to_remove.insert(it->first);
}
}