aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/rtc/rtc.cc
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-06-09 21:34:09 +0200
committerLuca Muscariello <muscariello@ieee.org>2022-06-30 10:47:50 +0200
commit6b94663b2455e212009a544ae23bb6a8c55407f8 (patch)
tree0af780ce5eeb1009fd24b8af8af08e8368eda3bd /libtransport/src/protocols/rtc/rtc.cc
parenta1ac96f497719b897793ac14b287cb8d840651c1 (diff)
refactor(lib, hicn-light, vpp, hiperf): HICN-723
- move infra data structure into the shared lib - new packet cache using double hashing and lookup on prefix suffix - testing updates - authenticated requests using interest manifests Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Change-Id: Iaddebfe6aa5279ea8553433b0f519578f6b9ccd9 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r--libtransport/src/protocols/rtc/rtc.cc125
1 files changed, 71 insertions, 54 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc
index d2682edfa..9a56269f3 100644
--- a/libtransport/src/protocols/rtc/rtc.cc
+++ b/libtransport/src/protocols/rtc/rtc.cc
@@ -38,6 +38,7 @@ RTCTransportProtocol::RTCTransportProtocol(
implementation::ConsumerSocket *icn_socket)
: TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this),
new RtcReassembly(icn_socket, this)),
+ max_aggregated_interest_(1),
number_(0) {
icn_socket->getSocketOption(PORTAL, portal_);
round_timer_ =
@@ -55,9 +56,9 @@ void RTCTransportProtocol::resume() {
TransportProtocol::resume();
}
-std::size_t RTCTransportProtocol::transportHeaderLength() {
+std::size_t RTCTransportProtocol::transportHeaderLength(bool isFEC) {
return DATA_HEADER_SIZE +
- (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0);
+ (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize(isFEC) : 0);
}
// private
@@ -75,13 +76,13 @@ void RTCTransportProtocol::initParams() {
std::shared_ptr<auth::Verifier> verifier;
socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
- uint32_t unverified_interval;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_INTERVAL,
- unverified_interval);
+ uint32_t factor_relevant;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT,
+ factor_relevant);
- double unverified_ratio;
- socket_->getSocketOption(GeneralTransportOptions::UNVERIFIED_RATIO,
- unverified_ratio);
+ uint32_t factor_alert;
+ socket_->getSocketOption(GeneralTransportOptions::MANIFEST_FACTOR_ALERT,
+ factor_alert);
rc_ = std::make_shared<RTCRateControlCongestionDetection>();
ldr_ = std::make_shared<RTCLossDetectionAndRecovery>(
@@ -100,8 +101,8 @@ void RTCTransportProtocol::initParams() {
}
});
- verifier_ = std::make_shared<RTCVerifier>(verifier, unverified_interval,
- unverified_ratio);
+ verifier_ =
+ std::make_shared<RTCVerifier>(verifier, factor_relevant, factor_alert);
state_ = std::make_shared<RTCState>(
indexer_verifier_.get(),
@@ -138,19 +139,20 @@ void RTCTransportProtocol::initParams() {
last_interest_sent_time_ = 0;
last_interest_sent_seq_ = 0;
-#if 0
- if(portal_->isConnectedToFwd()){
- max_aggregated_interest_ = 1;
- }else{
- max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
- }
-#else
- max_aggregated_interest_ = 1;
- if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS")) {
- LOG(INFO) << "Max Aggregated: " << max_aggr;
- max_aggregated_interest_ = std::stoul(std::string(max_aggr));
+ // Aggregated interests setup
+ bool aggregated_interests_on;
+ socket_->getSocketOption(RtcTransportOptions::AGGREGATED_INTERESTS,
+ aggregated_interests_on);
+ if (aggregated_interests_on) {
+ if (const char *max_aggr = std::getenv("MAX_AGGREGATED_INTERESTS"))
+ max_aggregated_interest_ = (uint32_t)std::stoul(std::string(max_aggr));
+ else
+ max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH;
+
+ max_aggregated_interest_ = std::min<uint32_t>(max_aggregated_interest_,
+ 1 + MAX_SUFFIXES_IN_MANIFEST);
}
-#endif
+ LOG(INFO) << "Max Aggregated: " << max_aggregated_interest_;
max_sent_int_ =
std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_);
@@ -263,6 +265,11 @@ void RTCTransportProtocol::discoveredRtt() {
socket_->getSocketOption(RtcTransportOptions::RECOVERY_STRATEGY, strategy);
ldr_->changeRecoveryStrategy(
(interface::RtcTransportRecoveryStrategies)strategy);
+
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode) ldr_->setContentSharingMode();
ldr_->turnOnRecovery();
ldr_->onNewRound(false);
@@ -270,22 +277,9 @@ void RTCTransportProtocol::discoveredRtt() {
Name *name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
Prefix prefix(*name, 128);
- if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::LOW_RATE_AND_BESTPATH) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BEST_PATH);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_REPLICATION) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::REPLICATION);
- } else if ((interface::RtcTransportRecoveryStrategies)strategy ==
- interface::RtcTransportRecoveryStrategies::
- LOW_RATE_AND_ALL_FWD_STRATEGIES) {
- fwd_strategy_.initFwdStrategy(portal_, prefix, state_.get(),
- RTCForwardingStrategy::BOTH);
- }
-
+ fwd_strategy_.initFwdStrategy(
+ portal_, prefix, state_.get(),
+ (interface::RtcTransportRecoveryStrategies)strategy);
updateSyncWindow();
}
@@ -302,6 +296,12 @@ void RTCTransportProtocol::computeMaxSyncWindow() {
return;
}
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (production_rate < MIN_PROD_RATE_SHARING_MODE))
+ production_rate = MIN_PROD_RATE_SHARING_MODE;
+
production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead());
uint32_t lifetime = default_values::interest_lifetime;
@@ -330,6 +330,11 @@ void RTCTransportProtocol::updateSyncWindow() {
double prod_rate = state_->getProducerRate();
double rtt = (double)state_->getMinRTT() / MILLI_IN_A_SEC;
double packet_size = state_->getAveragePacketSize();
+ bool content_sharing_mode;
+ socket_->getSocketOption(RtcTransportOptions::CONTENT_SHARING_MODE,
+ content_sharing_mode);
+ if (content_sharing_mode && (prod_rate < MIN_PROD_RATE_SHARING_MODE))
+ prod_rate = MIN_PROD_RATE_SHARING_MODE;
// if some of the info are not available do not update the current win
if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) {
@@ -385,6 +390,19 @@ void RTCTransportProtocol::sendProbeInterest(uint32_t seq) {
sendInterest(*interest_name);
}
+void RTCTransportProtocol::sendInterestForTimeout(uint32_t seq) {
+ if (!isRunning() && !is_first_) return;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ // we got a timeout for this packet so it is not pending anymore
+ interest_name->setSuffix(seq);
+ state_->onSendNewInterest(interest_name);
+ sendInterest(*interest_name);
+}
+
void RTCTransportProtocol::scheduleNextInterests() {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests";
@@ -475,9 +493,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
// skip received packets
- if (indexer_verifier_->checkNextSuffix() <=
- state_->getHighestSeqReceivedInOrder()) {
- indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1);
+ uint32_t max_received = state_->getHighestSeqReceivedInOrder();
+ if (indexer_verifier_->checkNextSuffix() <= max_received) {
+ indexer_verifier_->jumpToIndex(max_received + 1);
}
uint32_t sent_interests = 0;
@@ -495,7 +513,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
<< "In while loop. Window size: " << current_sync_win_;
uint32_t next_seg = indexer_verifier_->getNextSuffix();
-
name->setSuffix(next_seg);
// send the packet only if:
@@ -586,7 +603,6 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
}
timeouts_or_nacks_.insert(segment_number);
-
if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) &&
segment_number <= state_->getHighestSeqReceived()) {
// we retransmit packets only if the producer is active, otherwise we
@@ -627,11 +643,11 @@ void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest,
<< "On timeout next seg = " << indexer_verifier_->checkNextSuffix()
<< ", jump to " << segment_number;
// add an extra space in the window
- current_sync_win_++;
indexer_verifier_->jumpToIndex(segment_number);
}
state_->onTimeout(segment_number, false);
+ sendInterestForTimeout(segment_number);
scheduleNextInterests();
}
@@ -672,7 +688,6 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
if (tn_it != timeouts_or_nacks_.end()) timeouts_or_nacks_.erase(tn_it);
state_->onJumpForward(production_seg);
- verifier_->onJumpForward(production_seg);
// the client is asking for content in the past
// switch to catch up state and increase the window
// this is true only if the packet is not an RTX
@@ -821,7 +836,8 @@ void RTCTransportProtocol::onContentObjectReceived(
// Check if the packet is a retransmission
if (ldr_->isRtx(segment_number) && state != PacketState::RECEIVED) {
if (is_data || is_manifest) {
- state_->onPacketRecoveredRtx(segment_number);
+ uint64_t rtt = ldr_->getRtxRtt(segment_number);
+ state_->onPacketRecoveredRtx(content_object, rtt);
if (*on_content_object_input_) {
(*on_content_object_input_)(*socket_->getInterface(), content_object);
@@ -842,7 +858,7 @@ void RTCTransportProtocol::onContentObjectReceived(
}
if (is_fec) {
- state_->onFecPacketRecoveredRtx(segment_number);
+ state_->onFecPacketRecoveredRtx(content_object);
}
}
@@ -920,7 +936,7 @@ void RTCTransportProtocol::sendStatsToApp(
stats_->updateAverageWindowSize(state_->getPendingInterestNumber());
stats_->updateLossRatio(state_->getPerSecondLossRate());
uint64_t rtt = state_->getAvgRTT();
- stats_->updateAverageRtt(utils::SteadyTime::Milliseconds(rtt));
+ stats_->updateAverageRtt(utils::SteadyTime::Microseconds(rtt * 1000));
stats_->updateQueuingDelay(state_->getQueuing());
stats_->updateLostData(lost_data);
@@ -960,9 +976,10 @@ void RTCTransportProtocol::decodePacket(ContentObject &content_object,
DLOG_IF(INFO, VLOG_IS_ON(4))
<< "Send packet " << content_object.getName() << " to FEC decoder";
- uint32_t offset = is_manifest
- ? content_object.headerSize()
- : content_object.headerSize() + rtc::DATA_HEADER_SIZE;
+ uint32_t offset =
+ is_manifest
+ ? (uint32_t)content_object.headerSize()
+ : (uint32_t)(content_object.headerSize() + rtc::DATA_HEADER_SIZE);
uint32_t metadata = static_cast<uint32_t>(content_object.getPayloadType());
fec_decoder_->onDataPacket(content_object, offset, metadata);
@@ -1016,7 +1033,7 @@ void RTCTransportProtocol::onFecPackets(fec::BufferArray &packets) {
processManifest(*interest, *content_object);
}
- state_->onPacketRecoveredFec(seq_number, buffer->length());
+ state_->onPacketRecoveredFec(seq_number, (uint32_t)buffer->length());
ldr_->onPacketRecoveredFec(seq_number);
if (payload_type == PayloadType::DATA) {
@@ -1038,11 +1055,11 @@ void RTCTransportProtocol::processManifest(Interest &interest,
ContentObject::Ptr RTCTransportProtocol::removeFecHeader(
const ContentObject &content_object) {
- if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize()) {
+ if (!fec_decoder_ || !fec_decoder_->getFecHeaderSize(false)) {
return nullptr;
}
- size_t fec_header_size = fec_decoder_->getFecHeaderSize();
+ size_t fec_header_size = fec_decoder_->getFecHeaderSize(false);
const uint8_t *payload =
content_object.data() + content_object.headerSize() + fec_header_size;
size_t payload_size = content_object.payloadSize() - fec_header_size;