diff options
Diffstat (limited to 'libtransport')
4 files changed, 110 insertions, 60 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 00cc82543..67fcc83e3 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -24,6 +24,11 @@ #define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps) #define STATS_INTERVAL_DURATION 500 // ms #define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 +#define INACTIVE_TIME 100 // ms opus generates ~50 packets per seocnd, one + // every +// 20ms. to be safe we use 20ms*5 as timer for an +// inactive socket +#define MILLI_IN_A_SEC 1000 // ms in a second // NACK HEADER // +-----------------------------------------+ @@ -46,11 +51,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -63,11 +71,14 @@ RTCProducerSocket::RTCProducerSocket() producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -92,16 +103,15 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::updateStats(uint32_t packet_size) { +void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) { producedBytes_ += packet_size; producedPackets_++; - std::chrono::steady_clock::duration duration = - std::chrono::steady_clock::now() - lastStats_; - if (std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() >= - STATS_INTERVAL_DURATION) { - lastStats_ = std::chrono::steady_clock::now(); + uint64_t duration = now - lastStats_; + if (duration >= STATS_INTERVAL_DURATION) { + lastStats_ = now; bytesProductionRate_ = producedBytes_ * perSecondFactor_; packetsProductionRate_ = producedPackets_ * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; producedBytes_ = 0; producedPackets_ = 0; } @@ -117,17 +127,20 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { return; } - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN)); + active_ = true; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - ContentObject content_object(flowName_.setSuffix(currentSeg_)); + lastProduced_ = now; + + updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); - uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + ContentObject content_object(flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN); - memcpy(payload->writableData(), ×tamp, TIMESTAMP_LEN); + memcpy(payload->writableData(), &now, TIMESTAMP_LEN); memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size); payload->append(buffer_size + TIMESTAMP_LEN); content_object.appendPayload(std::move(payload)); @@ -149,23 +162,41 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } - // packetsProductionRate_ is modified by another thread in updateStats - // this should be safe since I just read here. + if (active_.load()) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (now - lastProduced_.load() >= INACTIVE_TIME) { + active_ = false; + } + } + + if (TRANSPORT_EXPECT_FALSE(!active_.load())) { + sendNack(*interest); + return; + } + max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * - (double)packetsProductionRate_)); + (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { sendNack(*interest); } + // else drop packet } void RTCProducerSocket::sendNack(const Interest &interest) { nack_->setName(interest.getName()); uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data(); *payload_ptr = currentSeg_; - *(++payload_ptr) = bytesProductionRate_; + + if (active_.load()) { + *(++payload_ptr) = bytesProductionRate_; + } else { + *(++payload_ptr) = 0; + } nack_->setLifetime(0); nack_->setPathLabel(prodLabel_); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index bc54be4bb..cb09ef991 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -18,6 +18,7 @@ #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/content_store.h> +#include <atomic> #include <map> #include <mutex> @@ -41,7 +42,7 @@ class RTCProducerSocket : public ProducerSocket { private: void sendNack(const Interest &interest); - void updateStats(uint32_t packet_size); + void updateStats(uint32_t packet_size, uint64_t now); // std::map<uint32_t, uint64_t> pendingInterests_; uint32_t currentSeg_; @@ -53,9 +54,12 @@ class RTCProducerSocket : public ProducerSocket { uint32_t producedBytes_; uint32_t producedPackets_; uint32_t bytesProductionRate_; - uint32_t packetsProductionRate_; + std::atomic<uint32_t> packetsProductionRate_; uint32_t perSecondFactor_; - std::chrono::steady_clock::time_point lastStats_; + uint64_t lastStats_; + // std::chrono::steady_clock::time_point lastProduced_; + std::atomic<uint64_t> lastProduced_; + std::atomic<bool> active_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index a993d596b..98abbe35b 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -38,6 +38,8 @@ RTCTransportProtocol::RTCTransportProtocol( inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { icnet_socket->getSocketOption(PORTAL, portal_); + nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + nack_timer_used_ = false; reset(); } @@ -211,8 +213,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { auto it = pathTable_.find(producerPathLabel_); if (it == pathTable_.end()) return; - // double maxAvgRTT = it->second->getAverageRtt(); - // double minRTT = it->second->getMinRtt(); minRtt_ = it->second->getMinRtt(); queuingDelay_ = it->second->getQueuingDealy(); @@ -222,22 +222,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { it->second->roundEnd(); } - // this is inefficient but the window is supposed to be small, so it - // probably makes sense to leave it like this - // if(minRTT == 0) - // minRTT = 1; - - // minRTTwin_[roundCounter_ % MIN_RTT_WIN] = minRTT; - // minRtt_ = minRTT; - // for (int i = 0; i < MIN_RTT_WIN; i++) - // if(minRtt_ > minRTTwin_[i]) - // minRtt_ = minRTTwin_[i]; - - // roundCounter_++; - - // std::cout << "min RTT " << minRtt_ << " queuing " << queuingDelay_ << - // std::endl; - if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { double lossRate = (double)((double)packetLost_ / (double)sentInterest_); lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + @@ -473,23 +457,45 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } +bool RTCTransportProtocol::checkIfProducerIsActive( + const ContentObject &content_object) { + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); + uint32_t productionSeg = *payload; + uint32_t productionRate = *(++payload); + + if (productionRate == 0) { + // the producer socket is not active + // in this case we consider only the first nack + if (nack_timer_used_) return false; + + nack_timer_used_ = true; + // actualSegment_ should be the one in the nack, which will the next in + // production + actualSegment_ = productionSeg; + // all the rest (win size should not change) + // we wait a bit before pull the socket again + nack_timer_->expires_from_now(std::chrono::milliseconds(500)); + nack_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + nack_timer_used_ = false; + scheduleNextInterests(); + }); + return false; + } + + return true; +} + void RTCTransportProtocol::onNack(const ContentObject &content_object) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); + gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; - // if(inflightInterests_[segmentNumber % - // default_values::default_buffer_size].retransmissions != 0){ ignore nacks - // for retransmissions - // return; - //} - - gotNack_ = true; - if (productionSeg > nackSegment) { // we are asking for stuff produced in the past actualSegment_ = max(productionSeg + 1, actualSegment_); @@ -535,14 +541,21 @@ void RTCTransportProtocol::onContentObject( uint32_t payload_size = (uint32_t)payload->length(); uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; + bool schedule_next_interest = true; if (payload_size == HICN_NACK_HEADER_SIZE) { - // Nacks always come form the producer, so we set the producerePathLabel_; + // Nacks always come form the producer, so we set the producerPathLabel_; producerPathLabel_ = content_object->getPathLabel(); if (inflightInterests_[pkt].retransmissions == 0) { + // discard nacks for rtx packets inflightInterestsCount_--; - onNack(*content_object); + schedule_next_interest = checkIfProducerIsActive(*content_object); + // if checkIfProducerIsActive returns true, we did all we need to do + // inside that function, no need to call onNack + if (!schedule_next_interest) onNack(*content_object); updateDelayStats(*content_object); + } else { + schedule_next_interest = checkIfProducerIsActive(*content_object); } } else { @@ -564,7 +577,9 @@ void RTCTransportProtocol::onContentObject( increaseWindow(); } - scheduleNextInterests(); + if (schedule_next_interest) { + scheduleNextInterests(); + } } void RTCTransportProtocol::returnContentToApplication( diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 58c143988..0bb9d9b2e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -121,6 +121,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void scheduleNextInterests() override; void scheduleAppNackRtx(std::vector<uint32_t> &nacks); void onTimeout(Interest::Ptr &&interest) override; + // checkIfProducerIsActive: return true if we need to schedule an interest + // immediatly after, false otherwise (this happens when the producer socket + // is not active) + bool checkIfProducerIsActive(const ContentObject &content_object); void onNack(const ContentObject &content_object); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; @@ -142,18 +146,11 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // controller var std::chrono::steady_clock::time_point lastRoundBegin_; - // bool allPacketsInSync_; - // unsigned numberOfRoundsInSync_; - // unsigned numberOfCatchUpRounds_; - // bool catchUpPhase_; unsigned currentState_; - // uint32_t inProduction_; - // cwin var uint32_t currentCWin_; uint32_t maxCWin_; - // uint32_t previousCWin_; // names/packets var uint32_t actualSegment_; @@ -167,6 +164,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // application for pakets for which we already got a // past NACK by the producer these packet are too old, // they will never be retrived + bool nack_timer_used_; + std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule + // a nack retransmission in + // of inactive prod socket uint32_t modMask_; @@ -185,7 +186,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // vector std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_; uint32_t roundCounter_; - // std::vector<uint64_t> minRTTwin_; uint64_t minRtt_; // CC var |