diff options
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc | 34 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h | 2 |
2 files changed, 17 insertions, 19 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 8ca27d24c..e7dc98868 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -151,7 +151,7 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedPackets_++; auto content_object = std::make_shared<ContentObject>( - flowName_.setSuffix(currentSeg_)); + flowName_.setSuffix(currentSeg_.load())); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); @@ -176,25 +176,21 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { on_content_object_output_(*this, *content_object); } + uint32_t old_curr = currentSeg_.load(); + currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; + //remove interests from the interest cache if it exists + //this generates nacks that will tell to the consumer + //that a new data packet was produced if(!seqs_map_.empty()){ - utils::SpinLock::Acquire locked(interests_cache_lock_); - - auto it_seqs = seqs_map_.find(currentSeg_); - if(it_seqs != seqs_map_.end()){ - auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ - timers_map_.erase(it_timers); - break; - } - } - seqs_map_.erase(it_seqs); + for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){ + if(it->first != old_curr) + sendNack(it->first); } + seqs_map_.clear(); + timers_map_.clear(); } - - currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ; } void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { @@ -236,7 +232,8 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ + if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()){ utils::SpinLock::Acquire locked(interests_cache_lock_); @@ -294,7 +291,8 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { 1000.0) * (double)packetsProductionRate_.load())); - if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { + if (interestSeg < currentSeg_.load() || + interestSeg > (max_gap + currentSeg_.load())) { sendNack(interestSeg); } // else drop packet @@ -346,7 +344,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { nack.setName(flowName_.setSuffix(sequence)); uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); - *payload_ptr = currentSeg_; + *payload_ptr = currentSeg_.load(); *(++payload_ptr) = bytesProductionRate_.load(); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index a2540ceef..37ba88d8a 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -47,7 +47,7 @@ class RTCProducerSocket : public ProducerSocket { void scheduleRoundTimer(); void interestCacheTimer(); - uint32_t currentSeg_; + std::atomic<uint32_t> currentSeg_; uint32_t prodLabel_; uint16_t headerSize_; Name flowName_; |