summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc329
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h31
2 files changed, 228 insertions, 132 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 882508b41..9402d3b02 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -18,14 +18,6 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/protocols/rtc.h>
-/*
- * TODO
- * 2) start/constructor/rest variable implementation
- * 3) interest retransmission: now I always recover, we should recover only if
- * we have enough time 4) returnContentToApplication: rememeber to remove the
- * first 32bits from the payload
- */
-
namespace transport {
namespace protocol {
@@ -39,6 +31,7 @@ RTCTransportProtocol::RTCTransportProtocol(
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
icnet_socket->getSocketOption(PORTAL, portal_);
nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
nack_timer_used_ = false;
reset();
}
@@ -49,7 +42,10 @@ RTCTransportProtocol::~RTCTransportProtocol() {
}
}
-int RTCTransportProtocol::start() { return TransportProtocol::start(); }
+int RTCTransportProtocol::start() {
+ checkRtx();
+ return TransportProtocol::start();
+}
void RTCTransportProtocol::stop() {
if (!is_running_) return;
@@ -67,6 +63,7 @@ void RTCTransportProtocol::resume() {
inflightInterestsCount_ = 0;
scheduleNextInterests();
+ checkRtx();
portal_->runEventsLoop();
@@ -104,7 +101,7 @@ void RTCTransportProtocol::reset() {
// names/packets var
actualSegment_ = 0;
inflightInterestsCount_ = 0;
- while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ interestRetransmissions_.clear();
lastSegNacked_ = 0;
lastReceived_ = 0;
nackedByProducer_.clear();
@@ -120,10 +117,6 @@ void RTCTransportProtocol::reset() {
gotFutureNack_ = 0;
roundsWithoutNacks_ = 0;
pathTable_.clear();
- // roundCounter_ = 0;
- // minRTTwin_.clear();
- // for (int i = 0; i < MIN_RTT_WIN; i++)
- // minRTTwin_.push_back(UINT_MAX);
minRtt_ = UINT_MAX;
// CC var
@@ -135,7 +128,7 @@ void RTCTransportProtocol::reset() {
producerPathLabel_ = 0;
socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
(uint32_t)HICN_RTC_INTEREST_LIFETIME);
- // XXX this should bedone by the application
+ // XXX this should be done by the application
}
uint32_t max(uint32_t a, uint32_t b) {
@@ -168,9 +161,12 @@ void RTCTransportProtocol::updateDelayStats(
uint32_t segmentNumber = content_object.getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- if (inflightInterests_[pkt].transmissionTime ==
- 0) // this is always the case if we have a retransmitted packet (timeout
- // or RTCP)
+ if (inflightInterests_[pkt].state != sent_)
+ return;
+
+ if(interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end())
+ //this packet was rtx at least once
return;
uint32_t pathLabel = content_object.getPathLabel();
@@ -283,8 +279,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
(double)HICN_MILLI_IN_A_SEC));
if (currentState_ == HICN_RTC_SYNC_STATE) {
- // in this case we do not limit the window with the BDP, beacuse most likly
- // it is wrong
+ // in this case we do not limit the window with the BDP, beacuse most
+ // likely it is wrong
maxCWin_ = maxWaintingInterest;
return;
}
@@ -333,68 +329,12 @@ void RTCTransportProtocol::increaseWindow() {
} else {
currentCWin_ = min(
maxCWin_,
- (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
+ (uint32_t)ceil(currentCWin_ +
+ (1.0 / (double)currentCWin_))); // linear
}
}
-void RTCTransportProtocol::sendInterest() {
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- bool isRTX = false;
- // uint32_t sentInt = 0;
-
- if (interestRetransmissions_.size() > 0) {
- // handle retransmission
- // here we have two possibile retransmissions: retransmissions due to
- // timeouts and retransmissions due to RTCP NACKs. we will send the interest
- // anyway, even if it is pending (this is possible only in the second case)
- uint32_t rtxSeg = interestRetransmissions_.front();
- interestRetransmissions_.pop();
-
- // a packet recovery means that there was a loss
- packetLost_++;
-
- uint32_t pkt = rtxSeg & modMask_;
- interest_name->setSuffix(rtxSeg);
-
- // if the interest is not pending anymore we encrease the retrasnmission
- // counter in order to avoid to handle a recovered packt as a normal one
- if (!portal_->interestIsPending(*interest_name)) {
- inflightInterests_[pkt].retransmissions++;
- }
-
- inflightInterests_[pkt].transmissionTime = 0;
- inflightInterests_[pkt].received = 0;
- inflightInterests_[pkt].sequence = rtxSeg;
- isRTX = true;
- } else {
- // in this case we send the packet only if it is not pending yet
- interest_name->setSuffix(actualSegment_);
- if (portal_->interestIsPending(*interest_name)) {
- actualSegment_++;
- return;
- }
- uint32_t pkt = actualSegment_ & modMask_;
-
- //if we already reacevied the content we don't ask it again
- if(inflightInterests_[pkt].received == 1 &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
- return;
- }
-
- // sentInt = actualSegment_;
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- inflightInterests_[pkt].retransmissions = 0;
- inflightInterests_[pkt].received = 0;
- inflightInterests_[pkt].sequence = actualSegment_;
- actualSegment_++;
- }
-
+void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
auto interest = getPacket();
interest->setName(*interest_name);
@@ -420,27 +360,51 @@ void RTCTransportProtocol::sendInterest() {
sentInterest_++;
- if (!isRTX) {
+ if (!rtx) {
inflightInterestsCount_++;
}
+
}
void RTCTransportProtocol::scheduleNextInterests() {
checkRound();
if (!is_running_) return;
- while (interestRetransmissions_.size() > 0) {
- sendInterest();
- checkRound();
- }
-
while (inflightInterestsCount_ < currentCWin_) {
- sendInterest();
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+
+ //we send the packet only if it is not pending yet
+ interest_name->setSuffix(actualSegment_);
+ if (portal_->interestIsPending(*interest_name)) {
+ actualSegment_++;
+ continue;
+ }
+
+ uint32_t pkt = actualSegment_ & modMask_;
+ //if we already reacevied the content we don't ask it again
+ if(inflightInterests_[pkt].state == received_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_++;
+ continue;
+ }
+
+ inflightInterests_[pkt].transmissionTime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ inflightInterests_[pkt].state = sent_;
+ inflightInterests_[pkt].sequence = actualSegment_;
+ actualSegment_++;
+
+ sendInterest(interest_name, false);
checkRound();
}
}
void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
+#if 0
for (uint32_t i = 0; i < nacks.size(); i++) {
if (nackedByProducer_.find(nacks[i]) != nackedByProducer_.end()) {
continue;
@@ -452,22 +416,142 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
}
scheduleNextInterests();
+#endif
+}
+
+void RTCTransportProtocol::addRetransmissions(uint32_t val){
+ //add only val in the rtx list
+ addRetransmissions(val, val + 1);
}
+
+void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){
+ for(uint32_t i = start; i < stop; i++){
+ auto it = interestRetransmissions_.find(i);
+ if(it == interestRetransmissions_.end()){
+ if (lastSegNacked_ <= i) {
+ //i must be larger than the last past nack received
+ interestRetransmissions_[i] = 0;
+ }
+ }//if the retransmission is already there the rtx timer will
+ //take care of it
+ }
+ retransmit(true);
+}
+
+void RTCTransportProtocol::retransmit(bool first_rtx){
+ auto it = interestRetransmissions_.begin();
+
+ //cut len to max HICN_MAX_RTX_SIZE
+ //since we use a map, the smaller (and so the older) sequence number are at
+ //the beginnin of the map
+ while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){
+ it = interestRetransmissions_.erase(it);
+ }
+
+ it = interestRetransmissions_.begin();
+
+ while (it != interestRetransmissions_.end()){
+ uint32_t pkt = it->first & modMask_;
+
+ if(inflightInterests_[pkt].sequence != it->first){
+ //this packet is not anymore in the inflight buffer, erase it
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ //we retransmitted the packet too many times
+ if(it->second >= HICN_MAX_RTX){
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ //this packet is too old
+ if((lastReceived_ > it->first) &&
+ (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){
+ it = interestRetransmissions_.erase(it);
+ continue;
+ }
+
+ if(first_rtx){
+ //TODO (optimization)
+ //the rtx that we never sent (it->second == 0) are all at the
+ //end, so we can go directly there
+ if(it->second == 0){
+ inflightInterests_[pkt].transmissionTime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ it->second++;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ sendInterest(interest_name, true);
+ }
+ ++it;
+ }else{
+ //base on time
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if((now - inflightInterests_[pkt].transmissionTime) > 20){
+ //XXX replace 20 with rtt
+ inflightInterests_[pkt].transmissionTime =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ it->second++;
+
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ sendInterest(interest_name, true);
+ }
+ ++it;
+ }
+ }
+}
+
+void RTCTransportProtocol::checkRtx(){
+ retransmit(false);
+ rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
+ rtx_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ checkRtx();
+ });
+}
+
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
// packetLost_++;
uint32_t segmentNumber = interest->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- if (inflightInterests_[pkt].retransmissions == 0) {
+ if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--;
}
- if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX &&
- lastSegNacked_ <= segmentNumber &&
- actualSegment_ < segmentNumber) {
- interestRetransmissions_.push(segmentNumber);
- }
+ //check how many times we sent this packet
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){
+ inflightInterests_[pkt].state = lost_;
+ }
+
+ if(inflightInterests_[pkt].state == sent_) {
+ inflightInterests_[pkt].state = timeout1_;
+ } else if (inflightInterests_[pkt].state == timeout1_) {
+ inflightInterests_[pkt].state = timeout2_;
+ } else if (inflightInterests_[pkt].state == timeout2_) {
+ inflightInterests_[pkt].state = lost_;
+ }
+
+ if(inflightInterests_[pkt].state == lost_) {
+ interestRetransmissions_.erase(segmentNumber);
+ }else{
+ addRetransmissions(segmentNumber);
+ }
scheduleNextInterests();
}
@@ -478,6 +562,7 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
+
if (productionRate == 0) {
// the producer socket is not active
// in this case we consider only the first nack
@@ -486,7 +571,7 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
}
nack_timer_used_ = true;
- // actualSegment_ should be the one in the nack, which will the next in
+ // actualSegment_ should be the one in the nack, which will be the next in
// production
actualSegment_ = productionSeg;
// all the rest (win size should not change)
@@ -522,7 +607,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
computeMaxWindow(productionRate, 0);
increaseWindow();
- while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ interestRetransmissions_.clear();
lastSegNacked_ = productionSeg;
if (nackedByProducer_.size() >= nackedByProducerMaxSize_)
@@ -533,30 +618,13 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
// we are asking stuff in the future
gotFutureNack_++;
- if(lastReceived_ > productionSeg){
- //if this happens the producer socket was restarted
- //we erase all the inflight + timeout state (only for the first NACK)
- if(gotFutureNack_ == 1){
- while (interestRetransmissions_.size() != 0)
- interestRetransmissions_.pop();
- for (uint32_t i = 0; i < inflightInterests_.size(); i++){
- inflightInterests_[i].transmissionTime = 0;
- inflightInterests_[i].sequence = 0;
- inflightInterests_[i].retransmissions = 0;
- inflightInterests_[i].received = 0;
- }
- }
- actualSegment_ = productionSeg + 1;
- //we can't say much abouit the window, so keep it as it is
- }else{
- actualSegment_ = productionSeg + 1;
+ actualSegment_ = productionSeg + 1;
- computeMaxWindow(productionRate, 0);
- decreaseWindow();
+ computeMaxWindow(productionRate, 0);
+ decreaseWindow();
- if (currentState_ == HICN_RTC_SYNC_STATE) {
- currentState_ = HICN_RTC_NORMAL_STATE;
- }
+ if (currentState_ == HICN_RTC_SYNC_STATE) {
+ currentState_ = HICN_RTC_NORMAL_STATE;
}
} // equal should not happen
}
@@ -570,11 +638,10 @@ void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) {
// we are asking for stuff produced in the past
actualSegment_ = max(productionSeg + 1, actualSegment_);
- while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ interestRetransmissions_.clear();
lastSegNacked_ = productionSeg;
} else if (productionSeg < nackSegment) {
-
actualSegment_ = productionSeg + 1;
} // equal should not happen
}
@@ -598,8 +665,7 @@ void RTCTransportProtocol::onContentObject(
// Nacks always come form the producer, so we set the producerPathLabel_;
producerPathLabel_ = content_object->getPathLabel();
schedule_next_interest = checkIfProducerIsActive(*content_object);
- if (inflightInterests_[pkt].retransmissions == 0) {
- // discard nacks for rtx packets
+ if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--;
// if checkIfProducerIsActive returns false, we did all we need to do
// inside that function, no need to call onNack
@@ -610,26 +676,37 @@ void RTCTransportProtocol::onContentObject(
}
} else {
- receivedData_++;
- inflightInterests_[pkt].received = 1;
- lastReceived_ = segmentNumber;
-
avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
- if (inflightInterests_[pkt].retransmissions == 0) {
- inflightInterestsCount_--;
+ if (inflightInterests_[pkt].state == sent_) {
+ inflightInterestsCount_--; //packet sent without timeouts
+ }
+
+ if (inflightInterests_[pkt].state == sent_ &&
+ interestRetransmissions_.find(segmentNumber) ==
+ interestRetransmissions_.end()){
// we count only non retransmitted data in order to take into accunt only
// the transmition rate of the producer
receivedBytes_ += (uint32_t)(content_object->headerSize() +
content_object->payloadSize());
updateDelayStats(*content_object);
+
+ addRetransmissions(lastReceived_ + 1, segmentNumber);
+ //lastReceived_ is updated only for data packets received without RTX
+ lastReceived_ = segmentNumber;
}
+ receivedData_++;
+ inflightInterests_[pkt].state = received_;
+
reassemble(std::move(content_object));
increaseWindow();
}
+ //in any case we remove the packet from the rtx list
+ interestRetransmissions_.erase(segmentNumber);
+
if (schedule_next_interest) {
scheduleNextInterests();
}
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 227f1d737..55deead6e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -16,7 +16,7 @@
#pragma once
#include <queue>
-#include <set>
+#include <map>
#include <unordered_map>
#include <hicn/transport/protocols/protocol.h>
@@ -38,7 +38,9 @@
// controller constant
#define HICN_ROUND_LEN \
200 // ms interval of time on which we take decisions / measurements
-#define HICN_MAX_RTX 3
+#define HICN_MAX_RTX 10
+#define HICN_MAX_RTX_SIZE 1024
+#define HICN_MAX_RTX_MAX_AGE 10000
#define HICN_MIN_RTT_WIN 30 // rounds
// cwin
@@ -82,11 +84,21 @@ namespace transport {
namespace protocol {
+enum packetState {
+ sent_,
+ received_,
+ timeout1_,
+ timeout2_,
+ lost_
+};
+
+typedef enum packetState packetState_t;
+
struct sentInterest {
uint64_t transmissionTime;
uint32_t sequence; //sequence number of the interest sent
- uint8_t retransmissions;
- uint8_t received; //1 = received, 0 = not received
+ //to handle seq % buffer_size
+ packetState_t state; //see packet state
};
class RTCTransportProtocol : public TransportProtocol, public Reassembly {
@@ -119,9 +131,13 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
void resetPreviousWindow();
// packet functions
- void sendInterest();
+ void sendInterest(Name *interest_name, bool rtx);
void scheduleNextInterests() override;
void scheduleAppNackRtx(std::vector<uint32_t> &nacks);
+ void addRetransmissions(uint32_t val);
+ void addRetransmissions(uint32_t start, uint32_t stop);
+ void retransmit(bool first_rtx);
+ void checkRtx();
void onTimeout(Interest::Ptr &&interest) override;
// checkIfProducerIsActive: return true if we need to schedule an interest
// immediatly after, false otherwise (this happens when the producer socket
@@ -160,7 +176,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t actualSegment_;
int32_t RTPhICN_offset_;
uint32_t inflightInterestsCount_;
- std::queue<uint32_t> interestRetransmissions_;
+ //map seq to rtx
+ std::map<uint32_t, uint8_t> interestRetransmissions_;
+ std::unique_ptr<asio::steady_timer> rtx_timer_;
+ //std::queue<uint32_t> interestRetransmissions_;
std::vector<sentInterest> inflightInterests_;
uint32_t lastSegNacked_; //indicates the segment id in the last received
// past Nack. we do not ask for retransmissions