summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc138
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h18
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc2
3 files changed, 147 insertions, 11 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index d1e89efdc..446b9ef8e 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -31,6 +31,11 @@
#define HICN_MAX_DATA_SEQ 0xefffffff
+//slow production rate param
+#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed
+ // through experiments
+#define LIFETIME_FRACTION 0.5
+
// NACK HEADER
// +-----------------------------------------+
// | 4 bytes: current segment in production |
@@ -58,12 +63,15 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ timer_on_(false),
active_(false) {
lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
+ interests_cache_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
}
RTCProducerSocket::RTCProducerSocket()
@@ -80,6 +88,8 @@ RTCProducerSocket::RTCProducerSocket()
.count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
+ interests_cache_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
}
RTCProducerSocket::~RTCProducerSocket() {}
@@ -159,6 +169,23 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
portal_->sendContentObject(content_object);
+ //remove interests from the interest cache if it exists
+ if(!seqs_map_.empty()){
+
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+
+ auto it_seqs = seqs_map_.find(currentSeg_);
+ if(it_seqs != seqs_map_.end()){
+ auto range = timers_map_.equal_range(it_seqs->second);
+ for(auto it_timers = range.first; it_timers != range.second; it_timers++){
+ if(it_timers->second == it_seqs->first){
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ }
+ }
+
currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ;
}
@@ -170,14 +197,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
on_interest_input_(*this, *interest);
}
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
bool isActive;
{
utils::SpinLock::Acquire locked(lock_);
isActive = active_;
if (isActive) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
if ((now - lastProduced_) > INACTIVE_TIME) {
// socket is inactive
active_ = false;
@@ -186,13 +214,68 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
}
}
- if (TRANSPORT_EXPECT_FALSE(!isActive)) {
- sendNack(*interest, false);
+ // if the production rate is less than MIN_PRODUCTION_RATE we put the
+ // interest in a queue, otherwise we handle it in the usual way
+ if(bytesProductionRate_ < MIN_PRODUCTION_RATE){
+
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+
+ uint64_t next_timer = ~0;
+ if(!timers_map_.empty()){
+ next_timer = timers_map_.begin()->first;
+ }
+
+ uint64_t expiration = now + (lifetime * LIFETIME_FRACTION);
+ //check if the seq number exists already
+ auto it_seqs = seqs_map_.find(interestSeg);
+ if(it_seqs != seqs_map_.end()){
+ //the seq already exists
+ if(expiration < it_seqs->second){
+ // we need to update the timer becasue we got a smaller one
+ // 1) remove the entry from the multimap
+ // 2) update this entry
+ auto range = timers_map_.equal_range(it_seqs->second);
+ for(auto it_timers = range.first; it_timers != range.second; it_timers++){
+ if(it_timers->second == it_seqs->first){
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
+ it_seqs->second = expiration;
+ }else{
+ //nothing to do here
+ return;
+ }
+ }else{
+ // add the new seq
+ timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
+ seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration));
+ }
+
+ //here we have at least one interest in the queue, we need to start or
+ //update the timer
+ if(!timer_on_){
+ //set timeout
+ timer_on_ = true;
+ scheduleTimer(timers_map_.begin()->first - now);
+ } else {
+ //re-schedule the timer because a new interest will expires sooner
+ if(next_timer > timers_map_.begin()->first){
+ interests_cache_timer_->cancel();
+ scheduleTimer(timers_map_.begin()->first - now);
+ }
+ }
return;
}
if(interestSeg > HICN_MAX_DATA_SEQ){
- sendNack(*interest, isActive);
+ sendNack(interestSeg, isActive);
+ return;
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!isActive)) {
+ sendNack(interestSeg, false);
return;
}
@@ -202,18 +285,55 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
(double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
- sendNack(*interest, true);
+ sendNack(interestSeg, true);
}
// else drop packet
}
-void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) {
+void RTCProducerSocket::scheduleTimer(uint64_t wait){
+ interests_cache_timer_->expires_from_now(
+ std::chrono::milliseconds(wait));
+ interests_cache_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ interestCacheTimer();
+ });
+}
+
+void RTCProducerSocket::interestCacheTimer(){
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ utils::SpinLock::Acquire locked(interests_cache_lock_);
+
+ for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){
+ uint64_t expire = it_timers->first;
+ if(expire <= now){
+ uint32_t seq = it_timers->second;
+ sendNack(seq, active_);
+ //remove the interest from the other map
+ seqs_map_.erase(seq);
+ it_timers = timers_map_.erase(it_timers);
+ }else{
+ //stop, we are done!
+ break;
+ }
+ }
+ if(timers_map_.empty()){
+ timer_on_ = false;
+ }else{
+ timer_on_ = true;
+ scheduleTimer(timers_map_.begin()->first - now);
+ }
+}
+
+void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
ContentObject nack;
nack.appendPayload(std::move(nack_payload));
- nack.setName(interest.getName());
+ nack.setName(flowName_.setSuffix(sequence));
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
*payload_ptr = currentSeg_;
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index 5b9a23dd7..aa67f1a29 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -41,8 +41,10 @@ class RTCProducerSocket : public ProducerSocket {
void onInterest(Interest::Ptr &&interest) override;
private:
- void sendNack(const Interest &interest, bool isActive);
+ void sendNack(uint32_t sequence, bool isActive);
void updateStats(uint32_t packet_size, uint64_t now);
+ void scheduleTimer(uint64_t wait);
+ void interestCacheTimer();
uint32_t currentSeg_;
uint32_t prodLabel_;
@@ -55,6 +57,20 @@ class RTCProducerSocket : public ProducerSocket {
uint32_t perSecondFactor_;
uint64_t lastStats_;
+ // cache for the received interests
+ // this map maps the expiration time of an interest to
+ // its sequence number. the map is sorted by timeouts
+ // the same timeout may be used for multiple sequence numbers
+ // but for each sequence number we store only the smallest
+ // expiry time. In this way the mapping from seqs_map_ to
+ // timers_map_ is unique
+ std::multimap<uint64_t,uint32_t> timers_map_;
+ // this map does the opposite, this map is not ordered
+ std::unordered_map<uint32_t,uint64_t> seqs_map_;
+ bool timer_on_;
+ std::unique_ptr<asio::steady_timer> interests_cache_timer_;
+ utils::SpinLock interests_cache_lock_;
+
uint64_t lastProduced_;
bool active_;
utils::SpinLock lock_;
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index b3a00c58d..4104d8883 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -744,7 +744,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
old_nack = true;
} else if (productionSeg < nackSegment) {
- actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ;
+ actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
if(!rtx){
// we are asking stuff in the future