diff options
Diffstat (limited to 'libtransport')
7 files changed, 105 insertions, 137 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index 14cd27b6b..64b60101d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -121,37 +121,6 @@ asio::io_service &ConsumerSocket::getIoService() { return portal_->getIoService(); } -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template <typename Lambda, typename arg2> -int ConsumerSocket::rescheduleOnIOService(int socket_option_key, - arg2 socket_option_value, - Lambda lambda_func) { - // To enforce type check - std::function<int(int, arg2)> func = lambda_func; - int result = SOCKET_OPTION_SET; - if (transport_protocol_->isRunning()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, - &result, &done, &func]() { - std::unique_lock<std::mutex> lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - }); - std::unique_lock<std::mutex> lck(mtx); - if (!done) { - cv.wait(lck); - } - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - int ConsumerSocket::setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in case diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index e3620b269..eceee2d34 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -282,9 +282,36 @@ class ConsumerSocket : public BaseSocket { ConsumerTimerCallback **socket_option_value); protected: + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func); + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, + &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } private: asio::io_service internal_io_service_; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index f90197490..8f8fc1a79 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -391,70 +391,6 @@ void ProducerSocket::onInterest(Interest &interest) { } } -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template <typename Lambda, typename arg2> -int ProducerSocket::rescheduleOnIOService(int socket_option_key, - arg2 socket_option_value, - Lambda lambda_func) { - // To enforce type check - std::function<int(int, arg2)> func = lambda_func; - int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, - &result, &done, &func]() { - std::unique_lock<std::mutex> lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - }); - std::unique_lock<std::mutex> lck(mtx); - if (!done) { - cv.wait(lck); - } - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - -// If the thread calling lambda_func is not the same of io_service, this -// function reschedule the function on it -template <typename Lambda, typename arg2> -int ProducerSocket::rescheduleOnIOServiceWithReference( - int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { - // To enforce type check - std::function<int(int, arg2 &)> func = lambda_func; - int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != this->listening_thread_.get_id()) { - std::mutex mtx; - /* Condition variable for the wait */ - std::condition_variable cv; - std::unique_lock<std::mutex> lck(mtx); - bool done = false; - io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, - &cv, &result, &done, &func]() { - std::unique_lock<std::mutex> lck(mtx); - done = true; - result = func(socket_option_key, socket_option_value); - - if (!done) { - cv.wait(lck); - } - }); - } else { - result = func(socket_option_key, socket_option_value); - } - - return result; -} - int ProducerSocket::setSocketOption(int socket_option_key, uint32_t socket_option_value) { switch (socket_option_key) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 5c617d761..709a2582b 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -160,15 +160,6 @@ class ProducerSocket : public Socket<BasePortal>, virtual int getSocketOption(int socket_option_key, std::string &socket_option_value); - template <typename Lambda, typename arg2> - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func); - - template <typename Lambda, typename arg2> - int rescheduleOnIOServiceWithReference(int socket_option_key, - arg2 &socket_option_value, - Lambda lambda_func); - protected: // Threads std::thread listening_thread_; @@ -215,6 +206,38 @@ class ProducerSocket : public Socket<BasePortal>, ProducerContentCallback on_content_produced_; + // If the thread calling lambda_func is not the same of io_service, this + // function reschedule the function on it + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([&socket_option_key, &socket_option_value, + &mtx, &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + cv.notify_all(); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; + } + private: void listen(); diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index c816158f9..779f9a9a1 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -519,7 +519,7 @@ void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload) { + if (!on_payload) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index f52494aba..1a3511003 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -34,6 +34,7 @@ RTCTransportProtocol::RTCTransportProtocol( rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); reset(); } @@ -45,6 +46,8 @@ RTCTransportProtocol::~RTCTransportProtocol() { int RTCTransportProtocol::start() { probeRtt(); + sentinelTimer(); + newRound(); return TransportProtocol::start(); } @@ -60,10 +63,11 @@ void RTCTransportProtocol::resume() { is_running_ = true; - lastRoundBegin_ = std::chrono::steady_clock::now(); inflightInterestsCount_ = 0; probeRtt(); + sentinelTimer(); + newRound(); scheduleNextInterests(); portal_->runEventsLoop(); @@ -75,7 +79,6 @@ void RTCTransportProtocol::resume() { void RTCTransportProtocol::reset() { portal_->setConsumerCallback(this); // controller var - lastRoundBegin_ = std::chrono::steady_clock::now(); currentState_ = HICN_RTC_SYNC_STATE; // cwin var @@ -143,15 +146,14 @@ uint32_t min(uint32_t a, uint32_t b) { return b; } -void RTCTransportProtocol::checkRound() { - uint32_t duration = - (uint32_t)std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now() - lastRoundBegin_) - .count(); - if (duration >= HICN_ROUND_LEN) { - lastRoundBegin_ = std::chrono::steady_clock::now(); - updateStats(duration); // update stats and window - } +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds( + HICN_ROUND_LEN)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(HICN_ROUND_LEN); + newRound(); + }); } void RTCTransportProtocol::updateDelayStats( @@ -231,11 +233,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) return; // this should not happen - //set sentinel timer if needed - if(rounds_ == 0){ - sentinelTimer(); - } - // as a queuing delay we keep the lowest one among the two paths // if one path is congested the forwarder should decide to do not // use it so it does not make sense to inform the application @@ -449,7 +446,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { } void RTCTransportProtocol::scheduleNextInterests() { - checkRound(); if (!is_running_ && !is_first_) return; while (inflightInterestsCount_ < currentCWin_) { @@ -505,29 +501,47 @@ void RTCTransportProtocol::scheduleNextInterests() { actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; sendInterest(interest_name, false); - checkRound(); } } void RTCTransportProtocol::sentinelTimer(){ - uint32_t wait = 1; - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ - wait = round( - pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); + uint32_t wait = 10; + + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ + //we have all the info to set the timers + wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); + if(wait == 0) + wait = 1; } - if(wait == 0) - wait = 1; sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); sentinel_timer_->async_wait([this](std::error_code ec) { + if (ec) return; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ + if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){ + //we have no info, so we send again + for(auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++){ + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + }else{ uint64_t max_waiting_time = round((pathTable_[producerPathLabels_[1]]->getMinRtt() - pathTable_[producerPathLabels_[0]]->getMinRtt()) + @@ -554,12 +568,11 @@ void RTCTransportProtocol::sentinelTimer(){ } } } - }//esle not enough info to resend the packet, schedule the timer agian + } sentinelTimer(); }); } - void RTCTransportProtocol::addRetransmissions(uint32_t val) { // add only val in the rtx list addRetransmissions(val, val + 1); diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 908be017a..46063d041 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -108,7 +108,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { private: // algo functions void reset() override; - void checkRound(); // CC functions void updateDelayStats(const ContentObject &content_object); @@ -129,6 +128,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint64_t retransmit(); void checkRtx(); void probeRtt(); + void newRound(); void onTimeout(Interest::Ptr &&interest) override; bool onNack(const ContentObject &content_object, bool rtx); void onContentObject(Interest::Ptr &&interest, @@ -141,7 +141,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { } // controller var - std::chrono::steady_clock::time_point lastRoundBegin_; + std::unique_ptr<asio::steady_timer> round_timer_; unsigned currentState_; // cwin var |