aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc466
1 files changed, 312 insertions, 154 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index 46659ac74..0cb4cda1d 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -17,8 +17,11 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <implementation/socket_consumer.h>
#include <math.h>
+#include <protocols/errors.h>
+#include <protocols/incremental_indexer_bytestream.h>
#include <protocols/rtc/rtc.h>
#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_indexer.h>
#include <protocols/rtc/rtc_rc_queue.h>
#include <algorithm>
@@ -33,39 +36,41 @@ using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
- : TransportProtocol(icn_socket, nullptr),
- DatagramReassembly(icn_socket, this),
+ : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
+ new DatagramReassembly(icn_socket, this)),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
scheduler_timer_ =
std::make_unique<asio::steady_timer>(portal_->getIoService());
+ pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
}
RTCTransportProtocol::~RTCTransportProtocol() {}
void RTCTransportProtocol::resume() {
- if (is_running_) return;
-
- is_running_ = true;
-
newRound();
+ TransportProtocol::resume();
+}
- portal_->runEventsLoop();
- is_running_ = false;
+std::size_t RTCTransportProtocol::transportHeaderLength() {
+ return DATA_HEADER_SIZE +
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0);
}
// private
void RTCTransportProtocol::initParams() {
- portal_->setConsumerCallback(this);
+ TransportProtocol::reset();
rc_ = std::make_shared<RTCRateControlQueue>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ indexer_verifier_.get(),
std::bind(&RTCTransportProtocol::sendRtxInterest, this,
std::placeholders::_1),
portal_->getIoService());
state_ = std::make_shared<RTCState>(
+ indexer_verifier_.get(),
std::bind(&RTCTransportProtocol::sendProbeInterest, this,
std::placeholders::_1),
std::bind(&RTCTransportProtocol::discoveredRtt, this),
@@ -83,8 +88,27 @@ void RTCTransportProtocol::initParams() {
// Cancel timer
number_++;
round_timer_->cancel();
+
scheduler_timer_->cancel();
scheduler_timer_on_ = false;
+ last_interest_sent_time_ = 0;
+ last_interest_sent_seq_ = 0;
+
+#if 0
+ if(portal_->isConnectedToFwd()){
+ max_aggregated_interest_ = 1;
+ }else{
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+ }
+#else
+ max_aggregated_interest_ = 1;
+#endif
+
+ max_sent_int_ =
+ std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
+
+ pacing_timer_->cancel();
+ pacing_timer_on_ = false;
// delete all timeouts and future nacks
timeouts_or_nacks_.clear();
@@ -93,16 +117,28 @@ void RTCTransportProtocol::initParams() {
current_sync_win_ = INITIAL_WIN;
max_sync_win_ = INITIAL_WIN_MAX;
- // names/packets var
- next_segment_ = 0;
-
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
RTC_INTEREST_LIFETIME);
+
+ // FEC
+ using namespace std::placeholders;
+ enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1),
+ /* We leave the buffer allocation to the fec decoder */
+ fec::FECBase::BufferRequested(0));
+
+ if (fec_decoder_) {
+ indexer_verifier_->enableFec(fec_type_);
+ indexer_verifier_->setNFec(0);
+ ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_),
+ fec::FECUtils::getSourceSymbols(fec_type_));
+ } else {
+ indexer_verifier_->disableFec();
+ }
}
// private
void RTCTransportProtocol::reset() {
- TRANSPORT_LOGD("reset called");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called";
initParams();
newRound();
}
@@ -113,11 +149,13 @@ void RTCTransportProtocol::inactiveProducer() {
current_sync_win_ = INITIAL_WIN;
max_sync_win_ = INITIAL_WIN_MAX;
- TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
- max_sync_win_);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_
+ << ", max_sync_win_: " << max_sync_win_;
// names/packets var
- next_segment_ = 0;
+ indexer_verifier_->reset();
+ indexer_verifier_->enableFec(fec_type_);
+ indexer_verifier_->setNFec(0);
ldr_->clear();
}
@@ -137,10 +175,13 @@ void RTCTransportProtocol::newRound() {
uint32_t received_bytes = state_->getReceivedBytesInRound();
uint32_t sent_interest = state_->getSentInterestInRound();
uint32_t lost_data = state_->getLostData();
+ uint32_t definitely_lost = state_->getDefinitelyLostPackets();
uint32_t recovered_losses = state_->getRecoveredLosses();
uint32_t received_nacks = state_->getReceivedNacksInRound();
+ uint32_t received_fec = state_->getReceivedFecPackets();
bool in_sync = (current_state_ == SyncState::in_sync);
+ ldr_->onNewRound(in_sync);
state_->onNewRound((double)ROUND_LEN, in_sync);
rc_->onNewRound((double)ROUND_LEN);
@@ -161,11 +202,13 @@ void RTCTransportProtocol::newRound() {
}
}
- TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Calling updateSyncWindow in newRound function";
updateSyncWindow();
sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
- recovered_losses, received_nacks);
+ definitely_lost, recovered_losses, received_nacks,
+ received_fec);
newRound();
});
}
@@ -173,6 +216,7 @@ void RTCTransportProtocol::newRound() {
void RTCTransportProtocol::discoveredRtt() {
start_send_interest_ = true;
ldr_->turnOnRTX();
+ ldr_->onNewRound(false);
updateSyncWindow();
}
@@ -182,22 +226,23 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
if (production_rate == 0.0 || packet_size == 0.0) {
// the consumer has no info about the producer,
// keep the previous maxCWin
- TRANSPORT_LOGD(
- "Returning in computeMaxSyncWindow because: prod_rate: %d || "
- "packet_size: %d",
- (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Returning in computeMaxSyncWindow because: prod_rate: "
+ << (production_rate == 0.0)
+ << " || packet_size: " << (packet_size == 0.0);
return;
}
+ production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
+
uint32_t lifetime = default_values::interest_lifetime;
socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
lifetime);
double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
-
- max_sync_win_ =
- (uint32_t)ceil((production_rate * lifetime_ms *
- INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+ max_sync_win_ = (uint32_t)ceil(
+ (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) /
+ packet_size);
max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
}
@@ -219,12 +264,25 @@ void RTCTransportProtocol::updateSyncWindow() {
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
- current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
- current_sync_win_ += (uint32_t)
- ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+ double fec_interest_overhead = (double)state_->getPendingFecPackets() /
+ (double)(state_->getPendingInterestNumber() -
+ state_->getPendingFecPackets());
+
+ double fec_overhead =
+ std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead);
+
+ prod_rate += (prod_rate * fec_overhead);
- if(current_state_ == SyncState::catch_up) {
- current_sync_win_ = (uint32_t) (current_sync_win_ * CATCH_UP_WIN_INCREMENT);
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ uint32_t buffer = PRODUCER_BUFFER_MS;
+ if (rtt > 150)
+ buffer = buffer * 2; // if the RTT is too large we increase the
+ // the size of the buffer
+ current_sync_win_ +=
+ ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size);
+
+ if (current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
}
current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
@@ -243,70 +301,48 @@ void RTCTransportProtocol::decreaseSyncWindow() {
scheduleNextInterests();
}
-void RTCTransportProtocol::sendInterest(Name *interest_name) {
- TRANSPORT_LOGD("Sending interest for name %s",
- interest_name->toString().c_str());
-
- auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
- interest->setName(*interest_name);
-
- uint32_t lifetime = default_values::interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- interest->setLifetime(uint32_t(lifetime));
-
- if (*on_interest_output_) {
- (*on_interest_output_)(*socket_->getInterface(), *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
- }
-
- portal_->sendInterest(std::move(interest));
-}
-
void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
- if(!start_send_interest_) return;
+ if (!start_send_interest_) return;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send rtx %u", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq;
interest_name->setSuffix(seq);
- sendInterest(interest_name);
+ sendInterest(*interest_name);
}
void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send probe %u", seq);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq;
interest_name->setSuffix(seq);
- sendInterest(interest_name);
+ sendInterest(*interest_name);
}
void RTCTransportProtocol::scheduleNextInterests() {
- TRANSPORT_LOGD("Schedule next interests");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
- if (!is_running_ && !is_first_) return;
+ if (!isRunning() && !is_first_) return;
- if(!start_send_interest_) return; // RTT discovering phase is not finished so
- // do not start to send interests
+ if (pacing_timer_on_) return; // wait pacing timer for the next send
- if (scheduler_timer_on_) return; // wait befor send other interests
+ if (!start_send_interest_)
+ return; // RTT discovering phase is not finished so
+ // do not start to send interests
if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
- TRANSPORT_LOGD("Inactive producer.");
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer.";
// here we keep seding the same interest until the producer
// does not start again
- if (next_segment_ != 0) {
+ if (indexer_verifier_->checkNextSuffix() != 0) {
// the producer just become inactive, reset the state
inactiveProducer();
}
@@ -315,125 +351,208 @@ void RTCTransportProtocol::scheduleNextInterests() {
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
&interest_name);
- TRANSPORT_LOGD("send interest %u", next_segment_);
- interest_name->setSuffix(next_segment_);
+ uint32_t next_seg = 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg;
+ interest_name->setSuffix(next_seg);
if (portal_->interestIsPending(*interest_name)) {
// if interest 0 is already pending we return
return;
}
- sendInterest(interest_name);
+ sendInterest(*interest_name);
state_->onSendNewInterest(interest_name);
return;
}
- TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
- state_->getPendingInterestNumber(), current_sync_win_);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Pending interest number: " << state_->getPendingInterestNumber()
+ << " -- current_sync_win_: " << current_sync_win_;
+
+ uint32_t pending = state_->getPendingInterestNumber();
+ if (pending >= current_sync_win_) return; // no space in the window
+
+ if ((current_sync_win_ - pending) < max_aggregated_interest_) {
+ if (scheduler_timer_on_) return; // timer already scheduled
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ uint64_t time = now - last_interest_sent_time_;
+ if (time < WAIT_FOR_INTEREST_BATCH) {
+ uint64_t next = WAIT_FOR_INTEREST_BATCH - time;
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(std::chrono::milliseconds(next));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ return; // whait for the timer
+ }
+ }
+
+ scheduler_timer_on_ = false;
+ scheduler_timer_->cancel();
// skip nacked pacekts
- if (next_segment_ <= state_->getLastSeqNacked()) {
- next_segment_ = state_->getLastSeqNacked() + 1;
+ if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) {
+ indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1);
}
// skipe received packets
- if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
- next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ if (indexer_verifier_->checkNextSuffix() <=
+ state_->getHighestSeqReceivedInOrder()) {
+ indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
}
uint32_t sent_interests = 0;
+ uint32_t sent_packets = 0;
+ uint32_t aggregated_counter = 0;
+ Name *name = nullptr;
+ Name interest_name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes;
+
while ((state_->getPendingInterestNumber() < current_sync_win_) &&
- (sent_interests < MAX_INTERESTS_IN_BATCH)) {
- TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ (sent_interests < max_sent_int_)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "In while loop. Window size: " << current_sync_win_;
+
+ uint32_t next_seg = indexer_verifier_->getNextSuffix();
- interest_name->setSuffix(next_segment_);
+ name->setSuffix(next_seg);
// send the packet only if:
// 1) it is not pending yet (not true for rtx)
// 2) the packet is not received or lost
// 3) is not in the rtx list
- if (portal_->interestIsPending(*interest_name) ||
- state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
- ldr_->isRtx(next_segment_)) {
- TRANSPORT_LOGD(
- "skip interest %u because: pending %u, recv %u, rtx %u",
- next_segment_, (portal_->interestIsPending(*interest_name)),
- (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
- (ldr_->isRtx(next_segment_)));
- next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ // 4) is fec and is not in order (!= last sent + 1)
+ if (portal_->interestIsPending(*name) ||
+ state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_seg) ||
+ (indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1)) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "skip interest " << next_seg << " because: pending "
+ << portal_->interestIsPending(*name) << ", recv "
+ << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN)
+ << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec "
+ << ((indexer_verifier_->isFec(next_seg) &&
+ next_seg != last_interest_sent_seq_ + 1));
continue;
}
+ if (aggregated_counter == 0) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg;
+ interest_name = *name;
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg;
+ additional_suffixes[aggregated_counter - 1] = next_seg;
+ }
- sent_interests++;
- TRANSPORT_LOGD("send interest %u", next_segment_);
- sendInterest(interest_name);
- state_->onSendNewInterest(interest_name);
+ last_interest_sent_seq_ = next_seg;
+ state_->onSendNewInterest(name);
+ aggregated_counter++;
+
+ if (aggregated_counter >= max_aggregated_interest_) {
+ sent_packets++;
+ sent_interests++;
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
+ last_interest_sent_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ aggregated_counter = 0;
+ }
+ }
- next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ // exiting the while we may have some pending interest to send
+ if (aggregated_counter != 0) {
+ sent_packets++;
+ last_interest_sent_time_ =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1);
}
if (state_->getPendingInterestNumber() < current_sync_win_) {
- // we still have space in the window but we already sent a batch of
- // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
- // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+ // we still have space in the window but we already sent too many packets
+ // wait PACING_WAIT to avoid drops in the kernel
- scheduler_timer_on_ = true;
- scheduler_timer_->expires_from_now(
- std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ pacing_timer_on_ = true;
+ pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT));
scheduler_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
- if (!scheduler_timer_on_) return;
+ if (!pacing_timer_on_) return;
- scheduler_timer_on_ = false;
+ pacing_timer_on_ = false;
scheduleNextInterests();
});
}
}
-void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- uint32_t segment_number = interest->getName().getSuffix();
-
- TRANSPORT_LOGD("timeout for packet %u", segment_number);
+void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
+ const Name &name) {
+ uint32_t segment_number = name.getSuffix();
if (segment_number >= MIN_PROBE_SEQ) {
// this is a timeout on a probe, do nothing
return;
}
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ if (state != PacketState::UNKNOWN) {
+ // we may recover a packets using fec, ignore this timer
+ return;
+ }
+
timeouts_or_nacks_.insert(segment_number);
if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
- segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ segment_number <= state_->getHighestSeqReceived()) {
// we retransmit packets only if the producer is active, otherwise we
// use timeouts to avoid to send too much traffic
//
// a timeout is sent using RTX only if it is an old packet. if it is for a
// seq number that we didn't reach yet, we send the packet using the normal
// schedule next interest
- TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
- ldr_->onTimeout(segment_number);
- state_->onTimeout(segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "handle timeout for packet " << segment_number << " using rtx";
+ if (ldr_->isRtxOn()) {
+ ldr_->onTimeout(segment_number);
+ if (indexer_verifier_->isFec(segment_number))
+ state_->onTimeout(segment_number, true);
+ else
+ state_->onTimeout(segment_number, false);
+ } else {
+ // in this case we wil never recover the timeout
+ state_->onTimeout(segment_number, true);
+ }
scheduleNextInterests();
return;
}
- TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
- segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number
+ << " using normal interests";
- if (segment_number < next_segment_) {
+ if (segment_number < indexer_verifier_->checkNextSuffix()) {
// this is a timeout for a packet that will be generated in the future but
// we are asking for higher sequence numbers. we need to go back like in the
// case of future nacks
- TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
- next_segment_, segment_number);
- next_segment_ = segment_number;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << segment_number;
+ // add an extra space in the window
+ current_sync_win_++;
+ indexer_verifier_->jumpToIndex(segment_number);
}
- state_->onTimeout(segment_number);
+ state_->onTimeout(segment_number, false);
scheduleNextInterests();
}
@@ -446,8 +565,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// check if the packet got a timeout
- TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
- production_seg);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment
+ << ". Production segment: " << production_seg;
bool compute_stats = true;
auto tn_it = timeouts_or_nacks_.find(nack_segment);
@@ -459,14 +578,15 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
state_->onNackPacketReceived(content_object, compute_stats);
ldr_->onNackPacketReceived(content_object);
- // both in case of past and future nack we set next_segment_ equal to the
+ // both in case of past and future nack we jump to the
// production segment in the nack. In case of past nack we will skip unneded
// interest (this is already done in the scheduleNextInterest in any case)
// while in case of future nacks we can go back in time and ask again for the
// content that generated the nack
- TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
- next_segment_, production_seg);
- next_segment_ = production_seg;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "On nack next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << production_seg;
+ indexer_verifier_->jumpToIndex(production_seg);
if (production_seg > nack_segment) {
// remove the nack is it exists
@@ -496,30 +616,33 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
bool valid = state_->onProbePacketReceived(content_object);
- if(!valid) return;
+ if (!valid) return;
struct nack_packet_t *probe =
(struct nack_packet_t *)content_object.getPayload()->data();
uint32_t production_seg = probe->getProductionSegement();
- // as for the nacks set next_segment_
- TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
- next_segment_, production_seg);
- next_segment_ = production_seg;
+ // as for the nacks set next_segment
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "on probe next seg = " << indexer_verifier_->checkNextSuffix()
+ << ", jump to " << production_seg;
+ indexer_verifier_->jumpToIndex(production_seg);
ldr_->onProbePacketReceived(content_object);
updateSyncWindow();
}
-void RTCTransportProtocol::onContentObject(Interest &interest,
- ContentObject &content_object) {
- TRANSPORT_LOGD("Received content object of size: %zu",
- content_object.payloadSize());
- uint32_t payload_size = (uint32_t) content_object.payloadSize();
+void RTCTransportProtocol::onContentObjectReceived(
+ Interest &interest, ContentObject &content_object, std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received content object of size: " << content_object.payloadSize();
+ uint32_t payload_size = content_object.payloadSize();
uint32_t segment_number = content_object.getName().getSuffix();
+ ec = make_error_code(protocol_error::not_reassemblable);
+
if (segment_number >= MIN_PROBE_SEQ) {
- TRANSPORT_LOGD("Received probe %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
}
@@ -528,7 +651,7 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
}
if (payload_size == NACK_HEADER_SIZE) {
- TRANSPORT_LOGD("Received nack %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number;
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
}
@@ -536,9 +659,8 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
return;
}
- TRANSPORT_LOGD("Received content %u", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number;
- rc_->onDataPacketReceived(content_object);
bool compute_stats = true;
auto tn_it = timeouts_or_nacks_.find(segment_number);
if (tn_it != timeouts_or_nacks_.end()) {
@@ -551,25 +673,49 @@ void RTCTransportProtocol::onContentObject(Interest &interest,
// check if the packet was already received
PacketState state = state_->isReceivedOrLost(segment_number);
- state_->onDataPacketReceived(content_object, compute_stats);
- ldr_->onDataPacketReceived(content_object);
- // if the stat for this seq number is received do not send the packet to app
if (state != PacketState::RECEIVED) {
- if (*on_content_object_input_) {
- (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ // send packet to decoder
+ if (fec_decoder_) {
+ DLOG_IF(INFO, VLOG_IS_ON(4))
+ << "send packet " << segment_number << " to FEC decoder";
+ fec_decoder_->onDataPacket(
+ content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE);
+ }
+ if (!indexer_verifier_->isFec(segment_number)) {
+ // the packet may be alredy sent to the ap by the decoder, check again if
+ // it is already received
+ state = state_->isReceivedOrLost(segment_number);
+ if (state != PacketState::RECEIVED) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number;
+
+ state_->onDataPacketReceived(content_object, compute_stats);
+
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ ec = make_error_code(protocol_error::success);
+ }
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number;
+ state_->onFecPacketReceived(content_object);
}
- reassemble(content_object);
} else {
- TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received duplicated content " << segment_number << ", drop it";
+ ec = make_error_code(protocol_error::duplicated_content);
}
+ ldr_->onDataPacketReceived(content_object);
+ rc_->onDataPacketReceived(content_object);
+
updateSyncWindow();
}
void RTCTransportProtocol::sendStatsToApp(
uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
- uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses,
+ uint32_t received_nacks, uint32_t received_fec) {
if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(state_->getQueuing());
@@ -581,23 +727,35 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateBytesRecv(received_bytes);
stats_->updateInterestTx(sent_interests);
stats_->updateReceivedNacks(received_nacks);
+ stats_->updateReceivedFEC(received_fec);
stats_->updateAverageWindowSize(current_sync_win_);
stats_->updateLossRatio(state_->getLossRate());
stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
+ stats_->updateDefinitelyLostData(definitely_lost);
stats_->updateRecoveredData(recovered_losses);
stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
(*stats_summary_)(*socket_->getInterface(), *stats_);
}
}
-void RTCTransportProtocol::reassemble(ContentObject &content_object) {
- auto read_buffer = content_object.getPayload();
- TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
- read_buffer->trimStart(DATA_HEADER_SIZE);
- Reassembly::read_buffer_ = std::move(read_buffer);
- Reassembly::notifyApplication();
+void RTCTransportProtocol::onFecPackets(
+ std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+ for (auto &packet : packets) {
+ PacketState state = state_->isReceivedOrLost(packet.first);
+ if (state != PacketState::RECEIVED) {
+ state_->onPacketRecoveredFec(packet.first);
+ ldr_->onPacketRecoveredFec(packet.first);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Recovered packet " << packet.first << " through FEC.";
+ reassembly_->reassemble(*packet.second, packet.first);
+ } else {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Packet" << packet.first << "already received.";
+ }
+ }
}
} // end namespace rtc