From c99eeb5ff63ba5081087272c9c3f77e887f920dd Mon Sep 17 00:00:00 2001 From: michele papalini Date: Tue, 2 Apr 2019 17:34:38 +0200 Subject: [HICN-94] Handle nacks when the producer socket is not active Change-Id: Ibc8b9ef65feaf6fbe12dbaa285ddcd738e1cd197 Signed-off-by: michele papalini --- .../transport/interfaces/rtc_socket_producer.cc | 71 ++++++++++++++++------ 1 file changed, 51 insertions(+), 20 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc') diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 00cc82543..67fcc83e3 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -24,6 +24,11 @@ #define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps) #define STATS_INTERVAL_DURATION 500 // ms #define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 +#define INACTIVE_TIME 100 // ms opus generates ~50 packets per seocnd, one + // every +// 20ms. to be safe we use 20ms*5 as timer for an +// inactive socket +#define MILLI_IN_A_SEC 1000 // ms in a second // NACK HEADER // +-----------------------------------------+ @@ -46,11 +51,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -63,11 +71,14 @@ RTCProducerSocket::RTCProducerSocket() producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -92,16 +103,15 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::updateStats(uint32_t packet_size) { +void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) { producedBytes_ += packet_size; producedPackets_++; - std::chrono::steady_clock::duration duration = - std::chrono::steady_clock::now() - lastStats_; - if (std::chrono::duration_cast(duration).count() >= - STATS_INTERVAL_DURATION) { - lastStats_ = std::chrono::steady_clock::now(); + uint64_t duration = now - lastStats_; + if (duration >= STATS_INTERVAL_DURATION) { + lastStats_ = now; bytesProductionRate_ = producedBytes_ * perSecondFactor_; packetsProductionRate_ = producedPackets_ * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; producedBytes_ = 0; producedPackets_ = 0; } @@ -117,17 +127,20 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { return; } - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN)); + active_ = true; + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - ContentObject content_object(flowName_.setSuffix(currentSeg_)); + lastProduced_ = now; + + updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); - uint64_t timestamp = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + ContentObject content_object(flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN); - memcpy(payload->writableData(), ×tamp, TIMESTAMP_LEN); + memcpy(payload->writableData(), &now, TIMESTAMP_LEN); memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size); payload->append(buffer_size + TIMESTAMP_LEN); content_object.appendPayload(std::move(payload)); @@ -149,23 +162,41 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } - // packetsProductionRate_ is modified by another thread in updateStats - // this should be safe since I just read here. + if (active_.load()) { + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (now - lastProduced_.load() >= INACTIVE_TIME) { + active_ = false; + } + } + + if (TRANSPORT_EXPECT_FALSE(!active_.load())) { + sendNack(*interest); + return; + } + max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * - (double)packetsProductionRate_)); + (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { sendNack(*interest); } + // else drop packet } void RTCProducerSocket::sendNack(const Interest &interest) { nack_->setName(interest.getName()); uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data(); *payload_ptr = currentSeg_; - *(++payload_ptr) = bytesProductionRate_; + + if (active_.load()) { + *(++payload_ptr) = bytesProductionRate_; + } else { + *(++payload_ptr) = 0; + } nack_->setLifetime(0); nack_->setPathLabel(prodLabel_); -- cgit 1.2.3-korg