aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc607
1 files changed, 607 insertions, 0 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
new file mode 100644
index 000000000..bb95ab686
--- /dev/null
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -0,0 +1,607 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <implementation/socket_consumer.h>
+#include <math.h>
+#include <protocols/rtc/rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <protocols/rtc/rtc_rc_queue.h>
+
+#include <algorithm>
+
+namespace transport {
+
+namespace protocol {
+
+namespace rtc {
+
+using namespace interface;
+
+RTCTransportProtocol::RTCTransportProtocol(
+ implementation::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
+ number_(0) {
+ icn_socket->getSocketOption(PORTAL, portal_);
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ scheduler_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+}
+
+RTCTransportProtocol::~RTCTransportProtocol() {}
+
+void RTCTransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
+
+ newRound();
+
+ portal_->runEventsLoop();
+ is_running_ = false;
+}
+
+// private
+void RTCTransportProtocol::initParams() {
+ portal_->setConsumerCallback(this);
+
+ rc_ = std::make_shared<RTCRateControlQueue>();
+ ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
+ std::bind(&RTCTransportProtocol::sendRtxInterest, this,
+ std::placeholders::_1),
+ portal_->getIoService());
+
+ state_ = std::make_shared<RTCState>(
+ std::bind(&RTCTransportProtocol::sendProbeInterest, this,
+ std::placeholders::_1),
+ std::bind(&RTCTransportProtocol::discoveredRtt, this),
+ portal_->getIoService());
+
+ rc_->setState(state_);
+ // TODO: for the moment we keep the congestion control disabled
+ // rc_->tunrOnRateControl();
+ ldr_->setState(state_);
+
+ // protocol state
+ start_send_interest_ = false;
+ current_state_ = SyncState::catch_up;
+
+ // Cancel timer
+ number_++;
+ round_timer_->cancel();
+ scheduler_timer_->cancel();
+ scheduler_timer_on_ = false;
+
+ // delete all timeouts and future nacks
+ timeouts_or_nacks_.clear();
+
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ // names/packets var
+ next_segment_ = 0;
+
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ RTC_INTEREST_LIFETIME);
+}
+
+// private
+void RTCTransportProtocol::reset() {
+ TRANSPORT_LOGD("reset called");
+ initParams();
+ newRound();
+}
+
+void RTCTransportProtocol::inactiveProducer() {
+ // when the producer is inactive we reset the consumer state
+ // cwin vars
+ current_sync_win_ = INITIAL_WIN;
+ max_sync_win_ = INITIAL_WIN_MAX;
+
+ TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_,
+ max_sync_win_);
+
+ // names/packets var
+ next_segment_ = 0;
+
+ ldr_->clear();
+}
+
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(ROUND_LEN));
+ // TODO pass weak_ptr here
+ round_timer_->async_wait([this, n{number_}](std::error_code ec) {
+ if (ec) return;
+
+ if (n != number_) {
+ return;
+ }
+
+ // saving counters that will be reset on new round
+ uint32_t sent_retx = state_->getSentRtxInRound();
+ uint32_t received_bytes = state_->getReceivedBytesInRound();
+ uint32_t sent_interest = state_->getSentInterestInRound();
+ uint32_t lost_data = state_->getLostData();
+ uint32_t recovered_losses = state_->getRecoveredLosses();
+ uint32_t received_nacks = state_->getReceivedNacksInRound();
+
+ bool in_sync = (current_state_ == SyncState::in_sync);
+ state_->onNewRound((double)ROUND_LEN, in_sync);
+ rc_->onNewRound((double)ROUND_LEN);
+
+ // update sync state if needed
+ if (current_state_ == SyncState::in_sync) {
+ double cache_rate = state_->getPacketFromCacheRatio();
+ if (cache_rate > MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::catch_up;
+ }
+ } else {
+ double target_rate = state_->getProducerRate() * PRODUCTION_RATE_FRACTION;
+ double received_rate = state_->getReceivedRate();
+ uint32_t round_without_nacks = state_->getRoundsWithoutNacks();
+ double cache_ratio = state_->getPacketFromCacheRatio();
+ if (round_without_nacks >= ROUNDS_IN_SYNC_BEFORE_SWITCH &&
+ received_rate >= target_rate && cache_ratio < MAX_DATA_FROM_CACHE) {
+ current_state_ = SyncState::in_sync;
+ }
+ }
+
+ TRANSPORT_LOGD("Calling updateSyncWindow in newRound function");
+ updateSyncWindow();
+
+ sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data,
+ recovered_losses, received_nacks);
+ newRound();
+ });
+}
+
+void RTCTransportProtocol::discoveredRtt() {
+ start_send_interest_ = true;
+ ldr_->turnOnRTX();
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::computeMaxSyncWindow() {
+ double production_rate = state_->getProducerRate();
+ double packet_size = state_->getAveragePacketSize();
+ if (production_rate == 0.0 || packet_size == 0.0) {
+ // the consumer has no info about the producer,
+ // keep the previous maxCWin
+ TRANSPORT_LOGD(
+ "Returning in computeMaxSyncWindow because: prod_rate: %d || "
+ "packet_size: %d",
+ (int)(production_rate == 0.0), (int)(packet_size == 0.0));
+ return;
+ }
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC;
+
+
+ max_sync_win_ =
+ (uint32_t)ceil((production_rate * lifetime_ms *
+ INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size);
+
+ max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow());
+}
+
+void RTCTransportProtocol::updateSyncWindow() {
+ computeMaxSyncWindow();
+
+ if (max_sync_win_ == INITIAL_WIN_MAX) {
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) return;
+
+ current_sync_win_ = INITIAL_WIN;
+ scheduleNextInterests();
+ return;
+ }
+
+ double prod_rate = state_->getProducerRate();
+ double rtt = (double)state_->getRTT() / MILLI_IN_A_SEC;
+ double packet_size = state_->getAveragePacketSize();
+
+ // if some of the info are not available do not update the current win
+ if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
+ current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size);
+ current_sync_win_ +=
+ ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size);
+
+ if(current_state_ == SyncState::catch_up) {
+ current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT;
+ }
+
+ current_sync_win_ = std::min(current_sync_win_, max_sync_win_);
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ }
+
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::decreaseSyncWindow() {
+ // called on future nack
+ // we have a new sample of the production rate, so update max win first
+ computeMaxSyncWindow();
+ current_sync_win_--;
+ current_sync_win_ = std::max(current_sync_win_, WIN_MIN);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::sendInterest(Name *interest_name) {
+ TRANSPORT_LOGD("Sending interest for name %s",
+ interest_name->toString().c_str());
+
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(*interest_name);
+
+ uint32_t lifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
+ interest->setLifetime(uint32_t(lifetime));
+
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
+ return;
+ }
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RTCTransportProtocol::sendRtxInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send rtx %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
+ if (!is_running_ && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send probe %u", seq);
+ interest_name->setSuffix(seq);
+ sendInterest(interest_name);
+}
+
+void RTCTransportProtocol::scheduleNextInterests() {
+ TRANSPORT_LOGD("Schedule next interests");
+
+ if (!is_running_ && !is_first_) return;
+
+ if(!start_send_interest_) return; // RTT discovering phase is not finished so
+ // do not start to send interests
+
+ if (scheduler_timer_on_) return; // wait befor send other interests
+
+ if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) {
+ TRANSPORT_LOGD("Inactive producer.");
+ // here we keep seding the same interest until the producer
+ // does not start again
+ if (next_segment_ != 0) {
+ // the producer just become inactive, reset the state
+ inactiveProducer();
+ }
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ interest_name->setSuffix(next_segment_);
+
+ if (portal_->interestIsPending(*interest_name)) {
+ // if interest 0 is already pending we return
+ return;
+ }
+
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+ return;
+ }
+
+ TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d",
+ state_->getPendingInterestNumber(), current_sync_win_);
+
+ // skip nacked pacekts
+ if (next_segment_ <= state_->getLastSeqNacked()) {
+ next_segment_ = state_->getLastSeqNacked() + 1;
+ }
+
+ // skipe received packets
+ if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) {
+ next_segment_ = state_->getHighestSeqReceivedInOrder() + 1;
+ }
+
+ uint32_t sent_interests = 0;
+ while ((state_->getPendingInterestNumber() < current_sync_win_) &&
+ (sent_interests < MAX_INTERESTS_IN_BATCH)) {
+ TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_);
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ interest_name->setSuffix(next_segment_);
+
+ // send the packet only if:
+ // 1) it is not pending yet (not true for rtx)
+ // 2) the packet is not received or lost
+ // 3) is not in the rtx list
+ if (portal_->interestIsPending(*interest_name) ||
+ state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN ||
+ ldr_->isRtx(next_segment_)) {
+ TRANSPORT_LOGD(
+ "skip interest %u because: pending %u, recv %u, rtx %u",
+ next_segment_, (portal_->interestIsPending(*interest_name)),
+ (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN),
+ (ldr_->isRtx(next_segment_)));
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ continue;
+ }
+
+
+ sent_interests++;
+ TRANSPORT_LOGD("send interest %u", next_segment_);
+ sendInterest(interest_name);
+ state_->onSendNewInterest(interest_name);
+
+ next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ;
+ }
+
+ if (state_->getPendingInterestNumber() < current_sync_win_) {
+ // we still have space in the window but we already sent a batch of
+ // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one
+ // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop
+
+ scheduler_timer_on_ = true;
+ scheduler_timer_->expires_from_now(
+ std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES));
+ scheduler_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ if (!scheduler_timer_on_) return;
+
+ scheduler_timer_on_ = false;
+ scheduleNextInterests();
+ });
+ }
+}
+
+void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ uint32_t segment_number = interest->getName().getSuffix();
+
+ TRANSPORT_LOGD("timeout for packet %u", segment_number);
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ // this is a timeout on a probe, do nothing
+ return;
+ }
+
+ timeouts_or_nacks_.insert(segment_number);
+
+ if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
+ segment_number <= state_->getHighestSeqReceivedInOrder()) {
+ // we retransmit packets only if the producer is active, otherwise we
+ // use timeouts to avoid to send too much traffic
+ //
+ // a timeout is sent using RTX only if it is an old packet. if it is for a
+ // seq number that we didn't reach yet, we send the packet using the normal
+ // schedule next interest
+ TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number);
+ ldr_->onTimeout(segment_number);
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+ return;
+ }
+
+ TRANSPORT_LOGD("handle timeout for packet %u using normal interests",
+ segment_number);
+
+ if (segment_number < next_segment_) {
+ // this is a timeout for a packet that will be generated in the future but
+ // we are asking for higher sequence numbers. we need to go back like in the
+ // case of future nacks
+ TRANSPORT_LOGD("on timeout next seg = %u, jump to %u",
+ next_segment_, segment_number);
+ next_segment_ = segment_number;
+ }
+
+ state_->onTimeout(segment_number);
+ scheduleNextInterests();
+}
+
+void RTCTransportProtocol::onNack(const ContentObject &content_object) {
+ struct nack_packet_t *nack =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = nack->getProductionSegement();
+ uint32_t nack_segment = content_object.getName().getSuffix();
+ bool is_rtx = ldr_->isRtx(nack_segment);
+
+ // check if the packet got a timeout
+
+ TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment,
+ production_seg);
+
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(nack_segment);
+ if (tn_it != timeouts_or_nacks_.end() || is_rtx) {
+ compute_stats = false;
+ // remove packets from timeouts_or_nacks only in case of a past nack
+ }
+
+ state_->onNackPacketReceived(content_object, compute_stats);
+ ldr_->onNackPacketReceived(content_object);
+
+ // both in case of past and future nack we set next_segment_ equal to the
+ // production segment in the nack. In case of past nack we will skip unneded
+ // interest (this is already done in the scheduleNextInterest in any case)
+ // while in case of future nacks we can go back in time and ask again for the
+ // content that generated the nack
+ TRANSPORT_LOGD("on nack next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ if (production_seg > nack_segment) {
+ // remove the nack is it exists
+ if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
+
+ // the client is asking for content in the past
+ // switch to catch up state and increase the window
+ // this is true only if the packet is not an RTX
+ if (!is_rtx) current_state_ = SyncState::catch_up;
+
+ updateSyncWindow();
+ } else {
+ // if production_seg == nack_segment we consider this a future nack, since
+ // production_seg is not yet created. this may happen in case of low
+ // production rate (e.g. ping at 1pps)
+
+ // if a future nack was also retransmitted add it to the timeout_or_nacks
+ // set
+ if (is_rtx) timeouts_or_nacks_.insert(nack_segment);
+
+ // the client is asking for content in the future
+ // switch to in sync state and decrease the window
+ current_state_ = SyncState::in_sync;
+ decreaseSyncWindow();
+ }
+}
+
+void RTCTransportProtocol::onProbe(const ContentObject &content_object) {
+ bool valid = state_->onProbePacketReceived(content_object);
+ if(!valid) return;
+
+ struct nack_packet_t *probe =
+ (struct nack_packet_t *)content_object.getPayload()->data();
+ uint32_t production_seg = probe->getProductionSegement();
+
+ // as for the nacks set next_segment_
+ TRANSPORT_LOGD("on probe next seg = %u, jump to %u",
+ next_segment_, production_seg);
+ next_segment_ = production_seg;
+
+ ldr_->onProbePacketReceived(content_object);
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::onContentObject(Interest &interest,
+ ContentObject &content_object) {
+ TRANSPORT_LOGD("Received content object of size: %zu",
+ content_object.payloadSize());
+ uint32_t payload_size = content_object.payloadSize();
+ uint32_t segment_number = content_object.getName().getSuffix();
+
+ if (segment_number >= MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("Received probe %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onProbe(content_object);
+ return;
+ }
+
+ if (payload_size == NACK_HEADER_SIZE) {
+ TRANSPORT_LOGD("Received nack %u", segment_number);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ onNack(content_object);
+ return;
+ }
+
+ TRANSPORT_LOGD("Received content %u", segment_number);
+
+ rc_->onDataPacketReceived(content_object);
+ bool compute_stats = true;
+ auto tn_it = timeouts_or_nacks_.find(segment_number);
+ if (tn_it != timeouts_or_nacks_.end()) {
+ compute_stats = false;
+ timeouts_or_nacks_.erase(tn_it);
+ }
+ if (ldr_->isRtx(segment_number)) {
+ compute_stats = false;
+ }
+
+ // check if the packet was already received
+ PacketState state = state_->isReceivedOrLost(segment_number);
+ state_->onDataPacketReceived(content_object, compute_stats);
+ ldr_->onDataPacketReceived(content_object);
+
+ // if the stat for this seq number is received do not send the packet to app
+ if (state != PacketState::RECEIVED) {
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), content_object);
+ }
+ reassemble(content_object);
+ } else {
+ TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number);
+ }
+
+ updateSyncWindow();
+}
+
+void RTCTransportProtocol::sendStatsToApp(
+ uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests,
+ uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) {
+ if (*stats_summary_) {
+ // Send the stats to the app
+ stats_->updateQueuingDelay(state_->getQueuing());
+
+ // stats_->updateInterestFecTx(0); //todo must be implemented
+ // stats_->updateBytesFecRecv(0); //todo must be implemented
+
+ stats_->updateRetxCount(retx_count);
+ stats_->updateBytesRecv(received_bytes);
+ stats_->updateInterestTx(sent_interests);
+ stats_->updateReceivedNacks(received_nacks);
+
+ stats_->updateAverageWindowSize(current_sync_win_);
+ stats_->updateLossRatio(state_->getLossRate());
+ stats_->updateAverageRtt(state_->getRTT());
+ stats_->updateLostData(lost_data);
+ stats_->updateRecoveredData(recovered_losses);
+ stats_->updateCCState((unsigned int)current_state_ ? 1 : 0);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ }
+}
+
+void RTCTransportProtocol::reassemble(ContentObject &content_object) {
+ auto read_buffer = content_object.getPayload();
+ TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length());
+ read_buffer->trimStart(DATA_HEADER_SIZE);
+ Reassembly::read_buffer_ = std::move(read_buffer);
+ Reassembly::notifyApplication();
+}
+
+} // end namespace rtc
+
+} // end namespace protocol
+
+} // end namespace transport