aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
diff options
context:
space:
mode:
authormichele papalini <micpapal@cisco.com>2019-04-02 17:34:38 +0200
committermichele papalini <micpapal@cisco.com>2019-04-02 18:44:08 +0200
commitc99eeb5ff63ba5081087272c9c3f77e887f920dd (patch)
tree8dc974fbe564655a8c62f761093c06e8f88f6517 /libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
parent3cc0a6bbae4fd3dadb65a0e9789b48e2cea9d303 (diff)
[HICN-94] Handle nacks when the producer socket is not active
Change-Id: Ibc8b9ef65feaf6fbe12dbaa285ddcd738e1cd197 Signed-off-by: michele papalini <micpapal@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc71
1 files changed, 51 insertions, 20 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 00cc82543..67fcc83e3 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -24,6 +24,11 @@
#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
#define STATS_INTERVAL_DURATION 500 // ms
#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+#define INACTIVE_TIME 100 // ms opus generates ~50 packets per seocnd, one
+ // every
+// 20ms. to be safe we use 20ms*5 as timer for an
+// inactive socket
+#define MILLI_IN_A_SEC 1000 // ms in a second
// NACK HEADER
// +-----------------------------------------+
@@ -46,11 +51,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
producedPackets_(0),
bytesProductionRate_(0),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ active_(false) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
nack_->appendPayload(std::move(nack_payload));
- lastStats_ = std::chrono::steady_clock::now();
+ lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
}
@@ -63,11 +71,14 @@ RTCProducerSocket::RTCProducerSocket()
producedPackets_(0),
bytesProductionRate_(0),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
- perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
+ active_(false) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
nack_->appendPayload(std::move(nack_payload));
- lastStats_ = std::chrono::steady_clock::now();
+ lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
}
@@ -92,16 +103,15 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::updateStats(uint32_t packet_size) {
+void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) {
producedBytes_ += packet_size;
producedPackets_++;
- std::chrono::steady_clock::duration duration =
- std::chrono::steady_clock::now() - lastStats_;
- if (std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() >=
- STATS_INTERVAL_DURATION) {
- lastStats_ = std::chrono::steady_clock::now();
+ uint64_t duration = now - lastStats_;
+ if (duration >= STATS_INTERVAL_DURATION) {
+ lastStats_ = now;
bytesProductionRate_ = producedBytes_ * perSecondFactor_;
packetsProductionRate_ = producedPackets_ * perSecondFactor_;
+ if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
producedBytes_ = 0;
producedPackets_ = 0;
}
@@ -117,17 +127,20 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
return;
}
- updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN));
+ active_ = true;
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
- ContentObject content_object(flowName_.setSuffix(currentSeg_));
+ lastProduced_ = now;
+
+ updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
- uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
+ ContentObject content_object(flowName_.setSuffix(currentSeg_));
auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN);
- memcpy(payload->writableData(), &timestamp, TIMESTAMP_LEN);
+ memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size);
payload->append(buffer_size + TIMESTAMP_LEN);
content_object.appendPayload(std::move(payload));
@@ -149,23 +162,41 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
on_interest_input_(*this, *interest);
}
- // packetsProductionRate_ is modified by another thread in updateStats
- // this should be safe since I just read here.
+ if (active_.load()) {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (now - lastProduced_.load() >= INACTIVE_TIME) {
+ active_ = false;
+ }
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(!active_.load())) {
+ sendNack(*interest);
+ return;
+ }
+
max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
1000.0) *
- (double)packetsProductionRate_));
+ (double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
sendNack(*interest);
}
+ // else drop packet
}
void RTCProducerSocket::sendNack(const Interest &interest) {
nack_->setName(interest.getName());
uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data();
*payload_ptr = currentSeg_;
- *(++payload_ptr) = bytesProductionRate_;
+
+ if (active_.load()) {
+ *(++payload_ptr) = bytesProductionRate_;
+ } else {
+ *(++payload_ptr) = 0;
+ }
nack_->setLifetime(0);
nack_->setPathLabel(prodLabel_);