summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/core/forwarder_interface.h2
-rw-r--r--libtransport/src/hicn/transport/core/portal.h10
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc101
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h20
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc64
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h11
6 files changed, 71 insertions, 137 deletions
diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h
index 6bcdaafc1..a89ed8a3c 100644
--- a/libtransport/src/hicn/transport/core/forwarder_interface.h
+++ b/libtransport/src/hicn/transport/core/forwarder_interface.h
@@ -50,7 +50,9 @@ class ForwarderInterface {
output_interface_(""),
content_store_reserved_(standard_cs_reserved) {
inet_address_.family = AF_INET;
+ inet_address_.len = IPV4_ADDR_LEN;
inet6_address_.family = AF_INET6;
+ inet6_address_.len = IPV6_ADDR_LEN;
}
public:
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index 3ea37c938..17f35d819 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -417,8 +417,14 @@ class Portal {
pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler(
async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler,
this, std::placeholders::_1, hash)));
- pending_interest_hash_table_.emplace(
- std::make_pair(hash, std::move(pending_interest)));
+
+ auto it = pending_interest_hash_table_.find(hash);
+ if(it != pending_interest_hash_table_.end()){
+ it->second->cancelTimer();
+ it->second = std::move(pending_interest);
+ }else{
+ pending_interest_hash_table_[hash] = std::move(pending_interest);
+ }
}
/**
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 5667b0640..740f9f77c 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -32,8 +32,8 @@
#define HICN_MAX_DATA_SEQ 0xefffffff
//slow production rate param
-#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed
- // through experiments
+#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed
+ // through experiments
#define LIFETIME_FRACTION 0.5
// NACK HEADER
@@ -63,15 +63,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false),
- active_(false) {
- lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ timer_on_(false){
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
interests_cache_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
+ scheduleRoundTimer();
}
RTCProducerSocket::RTCProducerSocket()
@@ -82,15 +81,14 @@ RTCProducerSocket::RTCProducerSocket()
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false),
- active_(false) {
- lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ timer_on_(false){
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
interests_cache_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
+ scheduleRoundTimer();
}
RTCProducerSocket::~RTCProducerSocket() {}
@@ -113,18 +111,22 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) {
- producedBytes_ += packet_size;
- producedPackets_++;
- uint64_t duration = now - lastStats_;
- if (duration >= STATS_INTERVAL_DURATION) {
- lastStats_ = now;
- bytesProductionRate_ = producedBytes_ * perSecondFactor_;
- packetsProductionRate_ = producedPackets_ * perSecondFactor_;
- if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
- producedBytes_ = 0;
- producedPackets_ = 0;
- }
+void RTCProducerSocket::scheduleRoundTimer(){
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(STATS_INTERVAL_DURATION));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
+}
+
+void RTCProducerSocket::updateStats() {
+ bytesProductionRate_ = producedBytes_.load() * perSecondFactor_;
+ packetsProductionRate_ = producedPackets_.load() * perSecondFactor_;
+ if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
+ producedBytes_ = 0;
+ producedPackets_ = 0;
+ scheduleRoundTimer();
}
void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
@@ -143,13 +145,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
std::chrono::steady_clock::now().time_since_epoch())
.count();
- {
- utils::SpinLock::Acquire locked(lock_);
- active_ = true;
- lastProduced_ = now;
- }
-
- updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
+ producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
+ producedPackets_++;
ContentObject content_object(flowName_.setSuffix(currentSeg_));
@@ -203,27 +200,14 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
std::chrono::steady_clock::now().time_since_epoch())
.count();
- bool isActive;
- {
- utils::SpinLock::Acquire locked(lock_);
- isActive = active_;
- if (isActive) {
- if ((now - lastProduced_) > INACTIVE_TIME) {
- // socket is inactive
- active_ = false;
- isActive = false;
- }
- }
- }
-
if(interestSeg > HICN_MAX_DATA_SEQ){
- sendNack(interestSeg, isActive);
+ sendNack(interestSeg);
return;
}
// if the production rate is less than MIN_PRODUCTION_RATE we put the
// interest in a queue, otherwise we handle it in the usual way
- if(bytesProductionRate_ < MIN_PRODUCTION_RATE && interestSeg > currentSeg_){
+ if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){
utils::SpinLock::Acquire locked(interests_cache_lock_);
@@ -265,34 +249,29 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
if(!timer_on_){
//set timeout
timer_on_ = true;
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
} else {
//re-schedule the timer because a new interest will expires sooner
if(next_timer > timers_map_.begin()->first){
interests_cache_timer_->cancel();
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
}
}
return;
}
- if (TRANSPORT_EXPECT_FALSE(!isActive)) {
- sendNack(interestSeg, false);
- return;
- }
-
uint32_t max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
1000.0) *
(double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
- sendNack(interestSeg, true);
+ sendNack(interestSeg);
}
// else drop packet
}
-void RTCProducerSocket::scheduleTimer(uint64_t wait){
+void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){
interests_cache_timer_->expires_from_now(
std::chrono::milliseconds(wait));
interests_cache_timer_->async_wait([this](std::error_code ec) {
@@ -312,7 +291,7 @@ void RTCProducerSocket::interestCacheTimer(){
uint64_t expire = it_timers->first;
if(expire <= now){
uint32_t seq = it_timers->second;
- sendNack(seq, active_);
+ sendNack(seq);
//remove the interest from the other map
seqs_map_.erase(seq);
it_timers = timers_map_.erase(it_timers);
@@ -325,11 +304,11 @@ void RTCProducerSocket::interestCacheTimer(){
timer_on_ = false;
}else{
timer_on_ = true;
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
}
}
-void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) {
+void RTCProducerSocket::sendNack(uint32_t sequence) {
auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
nack_payload->append(NACK_HEADER_SIZE);
ContentObject nack;
@@ -340,11 +319,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) {
uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data();
*payload_ptr = currentSeg_;
- if (isActive) {
- *(++payload_ptr) = bytesProductionRate_;
- } else {
- *(++payload_ptr) = 0;
- }
+ *(++payload_ptr) = bytesProductionRate_.load();
nack.setLifetime(0);
nack.setPathLabel(prodLabel_);
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index aa67f1a29..a2540ceef 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -41,21 +41,23 @@ class RTCProducerSocket : public ProducerSocket {
void onInterest(Interest::Ptr &&interest) override;
private:
- void sendNack(uint32_t sequence, bool isActive);
- void updateStats(uint32_t packet_size, uint64_t now);
- void scheduleTimer(uint64_t wait);
+ void sendNack(uint32_t sequence);
+ void updateStats();
+ void scheduleCacheTimer(uint64_t wait);
+ void scheduleRoundTimer();
void interestCacheTimer();
uint32_t currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;
- uint32_t producedBytes_;
- uint32_t producedPackets_;
- uint32_t bytesProductionRate_;
+ std::atomic<uint32_t> producedBytes_;
+ std::atomic<uint32_t> producedPackets_;
+ std::atomic<uint32_t> bytesProductionRate_;
std::atomic<uint32_t> packetsProductionRate_;
uint32_t perSecondFactor_;
- uint64_t lastStats_;
+
+ std::unique_ptr<asio::steady_timer> round_timer_;
// cache for the received interests
// this map maps the expiration time of an interest to
@@ -70,10 +72,6 @@ class RTCProducerSocket : public ProducerSocket {
bool timer_on_;
std::unique_ptr<asio::steady_timer> interests_cache_timer_;
utils::SpinLock interests_cache_lock_;
-
- uint64_t lastProduced_;
- bool active_;
- utils::SpinLock lock_;
};
} // namespace interface
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 4104d8883..accd98495 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -31,10 +31,8 @@ RTCTransportProtocol::RTCTransportProtocol(
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
icnet_socket->getSocketOption(PORTAL, portal_);
- nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- nack_timer_used_ = false;
reset();
}
@@ -92,7 +90,6 @@ void RTCTransportProtocol::reset() {
highestReceived_ = 0;
firstSequenceInRound_ = 0;
- nack_timer_used_ = false;
rtx_timer_used_ = false;
for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){
inflightInterests_[i] = {0};
@@ -673,36 +670,6 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
scheduleNextInterests();
}
-bool RTCTransportProtocol::checkIfProducerIsActive(
- const ContentObject &content_object) {
- uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
- uint32_t productionSeg = *payload;
- uint32_t productionRate = *(++payload);
-
- if (productionRate == 0) {
- // the producer socket is not active
- // in this case we consider only the first nack
- if (nack_timer_used_) {
- return false;
- }
-
- nack_timer_used_ = true;
- // actualSegment_ should be the one in the nack, which will be the next in
- // production
- actualSegment_ = productionSeg;
- // all the rest (win size should not change)
- // we wait a bit before pull the socket again
- nack_timer_->expires_from_now(std::chrono::milliseconds(500));
- nack_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- nack_timer_used_ = false;
- scheduleNextInterests();
- });
- return false;
- }
- return true;
-}
-
bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) {
uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
uint32_t productionSeg = *payload;
@@ -719,7 +686,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
if (productionSeg > nackSegment) {
// we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ;
+ actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ;
if(!rtx) {
if (currentState_ == HICN_RTC_NORMAL_STATE) {
@@ -756,7 +723,11 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx)
currentState_ = HICN_RTC_NORMAL_STATE;
}
}
- } // equal should not happen
+ } else {
+ //we are asking the right thing, but the producer is slow
+ //keep doing the same until the packet is produced
+ actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
+ }
return old_nack;
}
@@ -771,7 +742,6 @@ void RTCTransportProtocol::onContentObject(
uint32_t payload_size = (uint32_t)payload->length();
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- bool schedule_next_interest = true;
ConsumerContentObjectCallback *callback_content_object = nullptr;
socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
@@ -802,25 +772,19 @@ void RTCTransportProtocol::onContentObject(
}
if (payload_size == HICN_NACK_HEADER_SIZE) {
- schedule_next_interest = checkIfProducerIsActive(*content_object);
-
if (inflightInterests_[pkt].state == sent_) {
inflightInterestsCount_--;
}
- // if checkIfProducerIsActive returns false, we did all we need to do
- // inside that function, no need to call onNack
bool old_nack = false;
- if (schedule_next_interest){
- if (interestRetransmissions_.find(segmentNumber) ==
+ if (interestRetransmissions_.find(segmentNumber) ==
interestRetransmissions_.end()){
- //this is not a retransmitted packet
- old_nack = onNack(*content_object, false);
- updateDelayStats(*content_object);
- } else {
- old_nack = onNack(*content_object, true);
- }
+ //this is not a retransmitted packet
+ old_nack = onNack(*content_object, false);
+ updateDelayStats(*content_object);
+ } else {
+ old_nack = onNack(*content_object, true);
}
//the nacked_ state is used only to avoid to decrease inflightInterestsCount_
@@ -870,9 +834,7 @@ void RTCTransportProtocol::onContentObject(
interestRetransmissions_.erase(segmentNumber);
- if (schedule_next_interest) {
- scheduleNextInterests();
- }
+ scheduleNextInterests();
}
void RTCTransportProtocol::returnContentToApplication(
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 4ebae2b90..509f11361 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -128,10 +128,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
void checkRtx();
void probeRtt();
void onTimeout(Interest::Ptr &&interest) override;
- // checkIfProducerIsActive: return true if we need to schedule an interest
- // immediatly after, false otherwise (this happens when the producer socket
- // is not active)
- bool checkIfProducerIsActive(const ContentObject &content_object);
bool onNack(const ContentObject &content_object, bool rtx);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
@@ -155,6 +151,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t inflightInterestsCount_;
//map seq to rtx
std::map<uint32_t, uint8_t> interestRetransmissions_;
+ bool rtx_timer_used_;
std::unique_ptr<asio::steady_timer> rtx_timer_;
std::vector<sentInterest> inflightInterests_;
uint32_t lastSegNacked_; //indicates the segment id in the last received
@@ -163,12 +160,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t lastReceived_; //segment of the last content object received
//indicates the base of the window on the client
- bool nack_timer_used_;
- bool rtx_timer_used_;
- std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule
- // a nack retransmission in case
- // of inactive prod socket
-
//rtt probes
//the RTC transport tends to overestimate the RTT
//du to the production time on the server side