summaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc53
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h13
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h11
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc25
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h1
5 files changed, 48 insertions, 55 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 495f8c8f3..c726dfda8 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -24,11 +24,8 @@
#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
#define STATS_INTERVAL_DURATION 500 // ms
#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
-#define INACTIVE_TIME \
- 100 // ms opus generates ~50 packets per seocnd, one
- // every
-// 20ms. to be safe we use 20ms*5 as timer for an
-// inactive socket
+#define INACTIVE_TIME 500 //ms without producing before the socket
+ //is considered inactive
#define MILLI_IN_A_SEC 1000 // ms in a second
// NACK HEADER
@@ -37,9 +34,15 @@
// +-----------------------------------------+
// | 4 bytes: production rate (bytes x sec) |
// +-----------------------------------------+
-// may require additional field (Rate for multiple qualities, ...)
//
+// PACKET HEADER
+// +-----------------------------------------+
+// | 8 bytes: TIMESTAMP |
+// +-----------------------------------------+
+// | packet |
+// +-----------------------------------------+
+
namespace transport {
namespace interface {
@@ -120,12 +123,15 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
return;
}
- active_ = true;
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- lastProduced_ = now;
+ {
+ utils::SpinLock::Acquire locked(lock_);
+ active_ = true;
+ lastProduced_ = now;
+ }
updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
@@ -154,39 +160,44 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
uint32_t interestSeg = interest->getName().getSuffix();
uint32_t lifetime = interest->getLifetime();
- uint32_t max_gap;
if (on_interest_input_ != VOID_HANDLER) {
on_interest_input_(*this, *interest);
}
- if (active_.load()) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ bool isActive;
+ {
+ utils::SpinLock::Acquire locked(lock_);
+ isActive = active_;
+ if(isActive){
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- uint64_t lastProduced = lastProduced_.load();
- if (now - lastProduced >= INACTIVE_TIME) {
- active_ = false;
+ if ((now - lastProduced_) > INACTIVE_TIME) {
+ //socket is inactive
+ active_ = false;
+ isActive = false;
+ }
}
}
- if (TRANSPORT_EXPECT_FALSE(!active_.load())) {
- sendNack(*interest);
+ if (TRANSPORT_EXPECT_FALSE(!isActive)) {
+ sendNack(*interest, false);
return;
}
- max_gap = (uint32_t)floor(
+ uint32_t max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
1000.0) *
(double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
- sendNack(*interest);
+ sendNack(*interest, true);
}
// else drop packet
}
-void RTCProducerSocket::sendNack(const Interest &interest) {
+void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
ContentObject nack;
@@ -197,7 +208,7 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
*payload_ptr = currentSeg_;
- if (active_.load()) {
+ if (isActive) {
*(++payload_ptr) = bytesProductionRate_;
} else {
*(++payload_ptr) = 0;
@@ -215,4 +226,4 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index be39d2b32..29fd15a4e 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -41,26 +41,25 @@ class RTCProducerSocket : public ProducerSocket {
void onInterest(Interest::Ptr &&interest) override;
private:
- void sendNack(const Interest &interest);
+ void sendNack(const Interest &interest, bool isActive);
void updateStats(uint32_t packet_size, uint64_t now);
- // std::map<uint32_t, uint64_t> pendingInterests_;
uint32_t currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;
- // bool produceInSynch_;
uint32_t producedBytes_;
uint32_t producedPackets_;
uint32_t bytesProductionRate_;
std::atomic<uint32_t> packetsProductionRate_;
uint32_t perSecondFactor_;
uint64_t lastStats_;
- // std::chrono::steady_clock::time_point lastProduced_;
- std::atomic<uint64_t> lastProduced_;
- std::atomic<bool> active_;
+
+ uint64_t lastProduced_;
+ bool active_;
+ utils::SpinLock lock_;
};
} // namespace interface
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 41646c940..8f7a9718c 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -801,16 +801,11 @@ class ConsumerSocket : public BaseSocket {
return SOCKET_OPTION_GET;
}
- protected:
- std::unique_ptr<TransportProtocol> transport_protocol_;
-
private:
- // context inner state variables
asio::io_service internal_io_service_;
asio::io_service &io_service_;
std::shared_ptr<Portal> portal_;
-
utils::EventThread async_downloader_;
Name network_name_;
@@ -848,10 +843,8 @@ class ConsumerSocket : public BaseSocket {
ConsumerInterestCallback on_interest_output_;
ConsumerInterestCallback on_interest_timeout_;
ConsumerInterestCallback on_interest_satisfied_;
-
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
-
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
ConsumerTimerCallback stats_summary_;
@@ -859,11 +852,13 @@ class ConsumerSocket : public BaseSocket {
ReadCallback *read_callback_;
// Virtual download for traffic generator
-
bool virtual_download_;
bool rtt_stats_;
uint32_t timer_interval_milliseconds_;
+
+ // Transport protocol
+ std::unique_ptr<TransportProtocol> transport_protocol_;
};
} // namespace interface
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index b514d0587..5ff0126b0 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -90,6 +90,11 @@ void RTCTransportProtocol::reset() {
nackedByProducer_.clear();
nackedByProducerMaxSize_ = 512;
+ nack_timer_used_ = false;
+ for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){
+ inflightInterests_[i] = {0};
+ }
+
// stats
receivedBytes_ = 0;
sentInterest_ = 0;
@@ -173,7 +178,7 @@ void RTCTransportProtocol::updateDelayStats(
uint64_t *senderTimeStamp = (uint64_t *)payload->data();
int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
+ std::chrono::steady_clock::now().time_since_epoch())
.count() -
*senderTimeStamp;
@@ -383,22 +388,6 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
}
-void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
-#if 0
- for (uint32_t i = 0; i < nacks.size(); i++) {
- if (nackedByProducer_.find(nacks[i]) != nackedByProducer_.end()) {
- continue;
- }
- // packetLost_++;
- // XXX here I need to avoid the retrasmission for packet that were
- // nacked by the network
- interestRetransmissions_.push(nacks[i]);
- }
-
- scheduleNextInterests();
-#endif
-}
-
void RTCTransportProtocol::addRetransmissions(uint32_t val) {
// add only val in the rtx list
addRetransmissions(val, val + 1);
@@ -739,4 +728,4 @@ void RTCTransportProtocol::returnContentToApplication(
} // end namespace protocol
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 3e0ffe6e5..66ad05a88 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -113,7 +113,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// packet functions
void sendInterest(Name *interest_name, bool rtx);
void scheduleNextInterests() override;
- void scheduleAppNackRtx(std::vector<uint32_t> &nacks);
void addRetransmissions(uint32_t val);
void addRetransmissions(uint32_t start, uint32_t stop);
void retransmit(bool first_rtx);