diff options
Diffstat (limited to 'libtransport')
6 files changed, 71 insertions, 137 deletions
diff --git a/libtransport/src/hicn/transport/core/forwarder_interface.h b/libtransport/src/hicn/transport/core/forwarder_interface.h index 6bcdaafc1..a89ed8a3c 100644 --- a/libtransport/src/hicn/transport/core/forwarder_interface.h +++ b/libtransport/src/hicn/transport/core/forwarder_interface.h @@ -50,7 +50,9 @@ class ForwarderInterface { output_interface_(""), content_store_reserved_(standard_cs_reserved) { inet_address_.family = AF_INET; + inet_address_.len = IPV4_ADDR_LEN; inet6_address_.family = AF_INET6; + inet6_address_.len = IPV6_ADDR_LEN; } public: diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index 3ea37c938..17f35d819 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -417,8 +417,14 @@ class Portal { pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, this, std::placeholders::_1, hash))); - pending_interest_hash_table_.emplace( - std::make_pair(hash, std::move(pending_interest))); + + auto it = pending_interest_hash_table_.find(hash); + if(it != pending_interest_hash_table_.end()){ + it->second->cancelTimer(); + it->second = std::move(pending_interest); + }else{ + pending_interest_hash_table_[hash] = std::move(pending_interest); + } } /** diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 5667b0640..740f9f77c 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -32,8 +32,8 @@ #define HICN_MAX_DATA_SEQ 0xefffffff //slow production rate param -#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed - // through experiments +#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,15 +63,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false), - active_(false) { - lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + timer_on_(false){ srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); interests_cache_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); + scheduleRoundTimer(); } RTCProducerSocket::RTCProducerSocket() @@ -82,15 +81,14 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false), - active_(false) { - lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + timer_on_(false){ srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); interests_cache_timer_ = std::make_unique<asio::steady_timer>( this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); + scheduleRoundTimer(); } RTCProducerSocket::~RTCProducerSocket() {} @@ -113,18 +111,22 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) { - producedBytes_ += packet_size; - producedPackets_++; - uint64_t duration = now - lastStats_; - if (duration >= STATS_INTERVAL_DURATION) { - lastStats_ = now; - bytesProductionRate_ = producedBytes_ * perSecondFactor_; - packetsProductionRate_ = producedPackets_ * perSecondFactor_; - if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; - producedBytes_ = 0; - producedPackets_ = 0; - } +void RTCProducerSocket::scheduleRoundTimer(){ + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); +} + +void RTCProducerSocket::updateStats() { + bytesProductionRate_ = producedBytes_.load() * perSecondFactor_; + packetsProductionRate_ = producedPackets_.load() * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; + producedBytes_ = 0; + producedPackets_ = 0; + scheduleRoundTimer(); } void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { @@ -143,13 +145,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { std::chrono::steady_clock::now().time_since_epoch()) .count(); - { - utils::SpinLock::Acquire locked(lock_); - active_ = true; - lastProduced_ = now; - } - - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); + producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); + producedPackets_++; ContentObject content_object(flowName_.setSuffix(currentSeg_)); @@ -203,27 +200,14 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { std::chrono::steady_clock::now().time_since_epoch()) .count(); - bool isActive; - { - utils::SpinLock::Acquire locked(lock_); - isActive = active_; - if (isActive) { - if ((now - lastProduced_) > INACTIVE_TIME) { - // socket is inactive - active_ = false; - isActive = false; - } - } - } - if(interestSeg > HICN_MAX_DATA_SEQ){ - sendNack(interestSeg, isActive); + sendNack(interestSeg); return; } // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(bytesProductionRate_ < MIN_PRODUCTION_RATE && interestSeg > currentSeg_){ + if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ utils::SpinLock::Acquire locked(interests_cache_lock_); @@ -265,34 +249,29 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { if(!timer_on_){ //set timeout timer_on_ = true; - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } else { //re-schedule the timer because a new interest will expires sooner if(next_timer > timers_map_.begin()->first){ interests_cache_timer_->cancel(); - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } } return; } - if (TRANSPORT_EXPECT_FALSE(!isActive)) { - sendNack(interestSeg, false); - return; - } - uint32_t max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { - sendNack(interestSeg, true); + sendNack(interestSeg); } // else drop packet } -void RTCProducerSocket::scheduleTimer(uint64_t wait){ +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ interests_cache_timer_->expires_from_now( std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { @@ -312,7 +291,7 @@ void RTCProducerSocket::interestCacheTimer(){ uint64_t expire = it_timers->first; if(expire <= now){ uint32_t seq = it_timers->second; - sendNack(seq, active_); + sendNack(seq); //remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); @@ -325,11 +304,11 @@ void RTCProducerSocket::interestCacheTimer(){ timer_on_ = false; }else{ timer_on_ = true; - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } } -void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { +void RTCProducerSocket::sendNack(uint32_t sequence) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); ContentObject nack; @@ -340,11 +319,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); *payload_ptr = currentSeg_; - if (isActive) { - *(++payload_ptr) = bytesProductionRate_; - } else { - *(++payload_ptr) = 0; - } + *(++payload_ptr) = bytesProductionRate_.load(); nack.setLifetime(0); nack.setPathLabel(prodLabel_); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index aa67f1a29..a2540ceef 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -41,21 +41,23 @@ class RTCProducerSocket : public ProducerSocket { void onInterest(Interest::Ptr &&interest) override; private: - void sendNack(uint32_t sequence, bool isActive); - void updateStats(uint32_t packet_size, uint64_t now); - void scheduleTimer(uint64_t wait); + void sendNack(uint32_t sequence); + void updateStats(); + void scheduleCacheTimer(uint64_t wait); + void scheduleRoundTimer(); void interestCacheTimer(); uint32_t currentSeg_; uint32_t prodLabel_; uint16_t headerSize_; Name flowName_; - uint32_t producedBytes_; - uint32_t producedPackets_; - uint32_t bytesProductionRate_; + std::atomic<uint32_t> producedBytes_; + std::atomic<uint32_t> producedPackets_; + std::atomic<uint32_t> bytesProductionRate_; std::atomic<uint32_t> packetsProductionRate_; uint32_t perSecondFactor_; - uint64_t lastStats_; + + std::unique_ptr<asio::steady_timer> round_timer_; // cache for the received interests // this map maps the expiration time of an interest to @@ -70,10 +72,6 @@ class RTCProducerSocket : public ProducerSocket { bool timer_on_; std::unique_ptr<asio::steady_timer> interests_cache_timer_; utils::SpinLock interests_cache_lock_; - - uint64_t lastProduced_; - bool active_; - utils::SpinLock lock_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 4104d8883..accd98495 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -31,10 +31,8 @@ RTCTransportProtocol::RTCTransportProtocol( inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { icnet_socket->getSocketOption(PORTAL, portal_); - nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); - nack_timer_used_ = false; reset(); } @@ -92,7 +90,6 @@ void RTCTransportProtocol::reset() { highestReceived_ = 0; firstSequenceInRound_ = 0; - nack_timer_used_ = false; rtx_timer_used_ = false; for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ inflightInterests_[i] = {0}; @@ -673,36 +670,6 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } -bool RTCTransportProtocol::checkIfProducerIsActive( - const ContentObject &content_object) { - uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); - uint32_t productionSeg = *payload; - uint32_t productionRate = *(++payload); - - if (productionRate == 0) { - // the producer socket is not active - // in this case we consider only the first nack - if (nack_timer_used_) { - return false; - } - - nack_timer_used_ = true; - // actualSegment_ should be the one in the nack, which will be the next in - // production - actualSegment_ = productionSeg; - // all the rest (win size should not change) - // we wait a bit before pull the socket again - nack_timer_->expires_from_now(std::chrono::milliseconds(500)); - nack_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - nack_timer_used_ = false; - scheduleNextInterests(); - }); - return false; - } - return true; -} - bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; @@ -719,7 +686,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) if (productionSeg > nackSegment) { // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg + 1, actualSegment_) % HICN_MIN_PROBE_SEQ; + actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; if(!rtx) { if (currentState_ == HICN_RTC_NORMAL_STATE) { @@ -756,7 +723,11 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) currentState_ = HICN_RTC_NORMAL_STATE; } } - } // equal should not happen + } else { + //we are asking the right thing, but the producer is slow + //keep doing the same until the packet is produced + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; + } return old_nack; } @@ -771,7 +742,6 @@ void RTCTransportProtocol::onContentObject( uint32_t payload_size = (uint32_t)payload->length(); uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - bool schedule_next_interest = true; ConsumerContentObjectCallback *callback_content_object = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, @@ -802,25 +772,19 @@ void RTCTransportProtocol::onContentObject( } if (payload_size == HICN_NACK_HEADER_SIZE) { - schedule_next_interest = checkIfProducerIsActive(*content_object); - if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; } - // if checkIfProducerIsActive returns false, we did all we need to do - // inside that function, no need to call onNack bool old_nack = false; - if (schedule_next_interest){ - if (interestRetransmissions_.find(segmentNumber) == + if (interestRetransmissions_.find(segmentNumber) == interestRetransmissions_.end()){ - //this is not a retransmitted packet - old_nack = onNack(*content_object, false); - updateDelayStats(*content_object); - } else { - old_nack = onNack(*content_object, true); - } + //this is not a retransmitted packet + old_nack = onNack(*content_object, false); + updateDelayStats(*content_object); + } else { + old_nack = onNack(*content_object, true); } //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ @@ -870,9 +834,7 @@ void RTCTransportProtocol::onContentObject( interestRetransmissions_.erase(segmentNumber); - if (schedule_next_interest) { - scheduleNextInterests(); - } + scheduleNextInterests(); } void RTCTransportProtocol::returnContentToApplication( diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 4ebae2b90..509f11361 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -128,10 +128,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void checkRtx(); void probeRtt(); void onTimeout(Interest::Ptr &&interest) override; - // checkIfProducerIsActive: return true if we need to schedule an interest - // immediatly after, false otherwise (this happens when the producer socket - // is not active) - bool checkIfProducerIsActive(const ContentObject &content_object); bool onNack(const ContentObject &content_object, bool rtx); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; @@ -155,6 +151,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t inflightInterestsCount_; //map seq to rtx std::map<uint32_t, uint8_t> interestRetransmissions_; + bool rtx_timer_used_; std::unique_ptr<asio::steady_timer> rtx_timer_; std::vector<sentInterest> inflightInterests_; uint32_t lastSegNacked_; //indicates the segment id in the last received @@ -163,12 +160,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t lastReceived_; //segment of the last content object received //indicates the base of the window on the client - bool nack_timer_used_; - bool rtx_timer_used_; - std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule - // a nack retransmission in case - // of inactive prod socket - //rtt probes //the RTC transport tends to overestimate the RTT //du to the production time on the server side |