aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormichele papalini <micpapal@cisco.com>2019-10-18 16:04:21 +0200
committermichele papalini <micpapal@cisco.com>2019-10-18 16:04:21 +0200
commit7204bac00804448a797d4e76ced04a3b84d0d741 (patch)
treed6cd7289ee18128287a00f66177cc6fbc1bad7d3
parent306b098c350771c9497219694f82c090634eb974 (diff)
[HICN-339] improve rtc producer socket for low rate traffic
Signed-off-by: michele papalini <micpapal@cisco.com> Change-Id: I1e6fdada9a55e0a93b8d5db768124f2e47daf05b
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc34
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h2
2 files changed, 17 insertions, 19 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 8ca27d24c..e7dc98868 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -151,7 +151,7 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
producedPackets_++;
auto content_object = std::make_shared<ContentObject>(
- flowName_.setSuffix(currentSeg_));
+ flowName_.setSuffix(currentSeg_.load()));
auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
@@ -176,25 +176,21 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
on_content_object_output_(*this, *content_object);
}
+ uint32_t old_curr = currentSeg_.load();
+ currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ;
+
//remove interests from the interest cache if it exists
+ //this generates nacks that will tell to the consumer
+ //that a new data packet was produced
if(!seqs_map_.empty()){
-
utils::SpinLock::Acquire locked(interests_cache_lock_);
-
- auto it_seqs = seqs_map_.find(currentSeg_);
- if(it_seqs != seqs_map_.end()){
- auto range = timers_map_.equal_range(it_seqs->second);
- for(auto it_timers = range.first; it_timers != range.second; it_timers++){
- if(it_timers->second == it_seqs->first){
- timers_map_.erase(it_timers);
- break;
- }
- }
- seqs_map_.erase(it_seqs);
+ for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){
+ if(it->first != old_curr)
+ sendNack(it->first);
}
+ seqs_map_.clear();
+ timers_map_.clear();
}
-
- currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ;
}
void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
@@ -236,7 +232,8 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
// if the production rate is less than MIN_PRODUCTION_RATE we put the
// interest in a queue, otherwise we handle it in the usual way
- if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){
+ if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
+ interestSeg >= currentSeg_.load()){
utils::SpinLock::Acquire locked(interests_cache_lock_);
@@ -294,7 +291,8 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
1000.0) *
(double)packetsProductionRate_.load()));
- if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
+ if (interestSeg < currentSeg_.load() ||
+ interestSeg > (max_gap + currentSeg_.load())) {
sendNack(interestSeg);
}
// else drop packet
@@ -346,7 +344,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) {
nack.setName(flowName_.setSuffix(sequence));
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
- *payload_ptr = currentSeg_;
+ *payload_ptr = currentSeg_.load();
*(++payload_ptr) = bytesProductionRate_.load();
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index a2540ceef..37ba88d8a 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -47,7 +47,7 @@ class RTCProducerSocket : public ProducerSocket {
void scheduleRoundTimer();
void interestCacheTimer();
- uint32_t currentSeg_;
+ std::atomic<uint32_t> currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;