aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authormichele papalini <micpapal@cisco.com>2019-10-25 15:42:20 +0200
committerMauro Sardara <msardara@cisco.com>2019-10-30 14:46:41 +0000
commit0d60d36eaf9d456ad61b453c0660b88d0ccd3b09 (patch)
treea61d37b7e6645064637d1314fae35929489e72a0 /libtransport
parent3d674d504306489c4d845260f058ce44aa083f33 (diff)
[HICN-360] retransmit pending interests when all of them get lost without wainting for the timeout
Signed-off-by: michele papalini <micpapal@cisco.com> Change-Id: I84074d106bf2cfd3f7a3fb02947198179b0b5b74
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc100
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h13
2 files changed, 105 insertions, 8 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index e6134f767..f52494aba 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -33,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol(
icnet_socket->getSocketOption(PORTAL, portal_);
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
reset();
}
@@ -90,6 +91,7 @@ void RTCTransportProtocol::reset() {
lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
+ lastEvent_ = lastReceivedTime_;
highestReceived_ = 0;
firstSequenceInRound_ = 0;
@@ -108,6 +110,7 @@ void RTCTransportProtocol::reset() {
avgPacketSize_ = HICN_INIT_PACKET_SIZE;
gotNack_ = false;
gotFutureNack_ = 0;
+ rounds_ = 0;
roundsWithoutNacks_ = 0;
pathTable_.clear();
@@ -228,6 +231,11 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
return; // this should not happen
+ //set sentinel timer if needed
+ if(rounds_ == 0){
+ sentinelTimer();
+ }
+
// as a queuing delay we keep the lowest one among the two paths
// if one path is congested the forwarder should decide to do not
// use it so it does not make sense to inform the application
@@ -304,6 +312,7 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
receivedData_ = 0;
packetLost_ = 0;
lossRecovered_ = 0;
+ rounds_++;
firstSequenceInRound_ = highestReceived_;
}
@@ -434,6 +443,7 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
sentInterest_++;
if (!rtx) {
+ packets_in_window_[interest_name->getSuffix()] = 0;
inflightInterestsCount_++;
}
}
@@ -499,6 +509,57 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
}
+void RTCTransportProtocol::sentinelTimer(){
+ uint32_t wait = 1;
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){
+ wait = round(
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
+ }
+ if(wait == 0)
+ wait = 1;
+
+ sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ sentinel_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
+
+ uint64_t max_waiting_time =
+ round((pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 2;
+
+ if((currentState_ == HICN_RTC_NORMAL_STATE) &&
+ (inflightInterestsCount_ >= currentCWin_) &&
+ ((now - lastEvent_) > max_waiting_time)){
+
+ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
+
+ for(auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++){
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first &&
+ ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
+ }
+ }
+ }
+ }//esle not enough info to resend the packet, schedule the timer agian
+
+ sentinelTimer();
+ });
+}
+
void RTCTransportProtocol::addRetransmissions(uint32_t val) {
// add only val in the rtx list
addRetransmissions(val, val + 1);
@@ -651,8 +712,6 @@ void RTCTransportProtocol::checkRtx() {
}
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- // packetLost_++;
-
uint32_t segmentNumber = interest->getName().getSuffix();
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
@@ -663,14 +722,22 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
uint32_t pkt = segmentNumber & modMask_;
if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
- inflightInterestsCount_--;
// we do nothing, and we keep asking the same stuff over
// and over until we get at least a packet
+ inflightInterestsCount_--;
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
scheduleNextInterests();
return;
}
if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
@@ -795,15 +862,30 @@ void RTCTransportProtocol::onContentObject(
return;
}
+ //check if the packet is a rtx
+ bool is_rtx = false;
+ if(interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end()){
+ is_rtx = true;
+ }else{
+ auto it_win = packets_in_window_.find(segmentNumber);
+ if(it_win != packets_in_window_.end() &&
+ it_win->second != 0)
+ is_rtx = true;
+ }
+
if (payload_size == HICN_NACK_HEADER_SIZE) {
if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
bool old_nack = false;
- if (interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()) {
+ if (!is_rtx){
// this is not a retransmitted packet
old_nack = onNack(*content_object, false);
updateDelayStats(*content_object);
@@ -831,12 +913,14 @@ void RTCTransportProtocol::onContentObject(
content_object->payloadSize());
if (inflightInterests_[pkt].state == sent_) {
+ lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--; // packet sent without timeouts
}
- if (inflightInterests_[pkt].state == sent_ &&
- interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()) {
+ if (inflightInterests_[pkt].state == sent_ && !is_rtx){
// delay stats are computed only for non retransmitted data
updateDelayStats(*content_object);
}
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 7927e3969..908be017a 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -123,6 +123,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// packet functions
void sendInterest(Name *interest_name, bool rtx);
void scheduleNextInterests() override;
+ void sentinelTimer();
void addRetransmissions(uint32_t val);
void addRetransmissions(uint32_t start, uint32_t stop);
uint64_t retransmit();
@@ -163,6 +164,17 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint64_t lastReceivedTime_; //time at which we recevied the
//lastReceived_ packet
+ //sentinel
+ //if all packets in the window get lost we need something that
+ //wakes up our consumer socket. Interest timeouts set to 1 sec
+ //expire too late. This timers expire much sooner and if it
+ //detects that all the interest in the window may be lost
+ //it sends all of them again
+ std::unique_ptr<asio::steady_timer> sentinel_timer_;
+ uint64_t lastEvent_; //time at which we removed a pending
+ //interest from the window
+ std::unordered_map<uint32_t, uint8_t> packets_in_window_;
+
//rtt probes
//the RTC transport tends to overestimate the RTT
//du to the production time on the server side
@@ -188,6 +200,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
double avgPacketSize_;
bool gotNack_;
uint32_t gotFutureNack_;
+ uint32_t rounds_;
uint32_t roundsWithoutNacks_;
//we keep track of up two paths (if only one path is in use