summaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
authormichele papalini <micpapal@cisco.com>2019-07-03 09:53:14 +0200
committermichele papalini <micpapal@cisco.com>2019-07-03 09:53:14 +0200
commit816964180f5fc15c756580fef0173dad55f59933 (patch)
treed4b955b6fceba5d616dcd4454b8b3a3e39f27b3f /libtransport/src
parent0415ae764f84398ccc14debd9afbac761da385ae (diff)
[HICN-232] fix concurrency problem on rtc producer socket
Change-Id: Ia873aa3c9b6ef4825df88fa05cc1d6dc40bb73a1 Signed-off-by: michele papalini <micpapal@cisco.com>
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc53
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h13
2 files changed, 38 insertions, 28 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 495f8c8f3..c726dfda8 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -24,11 +24,8 @@
#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
#define STATS_INTERVAL_DURATION 500 // ms
#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
-#define INACTIVE_TIME \
- 100 // ms opus generates ~50 packets per seocnd, one
- // every
-// 20ms. to be safe we use 20ms*5 as timer for an
-// inactive socket
+#define INACTIVE_TIME 500 //ms without producing before the socket
+ //is considered inactive
#define MILLI_IN_A_SEC 1000 // ms in a second
// NACK HEADER
@@ -37,9 +34,15 @@
// +-----------------------------------------+
// | 4 bytes: production rate (bytes x sec) |
// +-----------------------------------------+
-// may require additional field (Rate for multiple qualities, ...)
//
+// PACKET HEADER
+// +-----------------------------------------+
+// | 8 bytes: TIMESTAMP |
+// +-----------------------------------------+
+// | packet |
+// +-----------------------------------------+
+
namespace transport {
namespace interface {
@@ -120,12 +123,15 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
return;
}
- active_ = true;
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- lastProduced_ = now;
+ {
+ utils::SpinLock::Acquire locked(lock_);
+ active_ = true;
+ lastProduced_ = now;
+ }
updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
@@ -154,39 +160,44 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
uint32_t interestSeg = interest->getName().getSuffix();
uint32_t lifetime = interest->getLifetime();
- uint32_t max_gap;
if (on_interest_input_ != VOID_HANDLER) {
on_interest_input_(*this, *interest);
}
- if (active_.load()) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ bool isActive;
+ {
+ utils::SpinLock::Acquire locked(lock_);
+ isActive = active_;
+ if(isActive){
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- uint64_t lastProduced = lastProduced_.load();
- if (now - lastProduced >= INACTIVE_TIME) {
- active_ = false;
+ if ((now - lastProduced_) > INACTIVE_TIME) {
+ //socket is inactive
+ active_ = false;
+ isActive = false;
+ }
}
}
- if (TRANSPORT_EXPECT_FALSE(!active_.load())) {
- sendNack(*interest);
+ if (TRANSPORT_EXPECT_FALSE(!isActive)) {
+ sendNack(*interest, false);
return;
}
- max_gap = (uint32_t)floor(
+ uint32_t max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
1000.0) *
(double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
- sendNack(*interest);
+ sendNack(*interest, true);
}
// else drop packet
}
-void RTCProducerSocket::sendNack(const Interest &interest) {
+void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
ContentObject nack;
@@ -197,7 +208,7 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
*payload_ptr = currentSeg_;
- if (active_.load()) {
+ if (isActive) {
*(++payload_ptr) = bytesProductionRate_;
} else {
*(++payload_ptr) = 0;
@@ -215,4 +226,4 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index be39d2b32..408ce3ff7 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -41,26 +41,25 @@ class RTCProducerSocket : public ProducerSocket {
void onInterest(Interest::Ptr &&interest) override;
private:
- void sendNack(const Interest &interest);
+ void sendNack(const Interest &interest, bool isActvie);
void updateStats(uint32_t packet_size, uint64_t now);
- // std::map<uint32_t, uint64_t> pendingInterests_;
uint32_t currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;
- // bool produceInSynch_;
uint32_t producedBytes_;
uint32_t producedPackets_;
uint32_t bytesProductionRate_;
std::atomic<uint32_t> packetsProductionRate_;
uint32_t perSecondFactor_;
uint64_t lastStats_;
- // std::chrono::steady_clock::time_point lastProduced_;
- std::atomic<uint64_t> lastProduced_;
- std::atomic<bool> active_;
+
+ uint64_t lastProduced_;
+ bool active_;
+ utils::SpinLock lock_;
};
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport