From afe807c61372fe2481e73af63c8382af1e1d3011 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 4 Mar 2020 16:21:54 +0100 Subject: [HICN-540] Optimizations for libhicntransport Change-Id: I8b46b4eb2ef5488c09041887cc8296a216440f33 Signed-off-by: Mauro Sardara --- .../src/protocols/byte_stream_reassembly.cc | 8 +-- libtransport/src/protocols/protocol.cc | 40 +++++++++-- libtransport/src/protocols/protocol.h | 18 +++++ libtransport/src/protocols/raaqm.cc | 77 +++++++--------------- libtransport/src/protocols/raaqm.h | 2 +- libtransport/src/protocols/raaqm_data_path.cc | 5 +- libtransport/src/protocols/raaqm_data_path.h | 2 +- libtransport/src/protocols/rtc.cc | 23 ++----- 8 files changed, 91 insertions(+), 84 deletions(-) (limited to 'libtransport/src/protocols') diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index c2996ebc1..e15498bb1 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -80,19 +80,19 @@ void ByteStreamReassembly::assembleContent() { } void ByteStreamReassembly::copyContent(const ContentObject &content_object) { - auto a = content_object.getPayload(); - auto payload_length = a->length(); + auto payload = content_object.getPayloadReference(); + auto payload_length = payload.second; auto write_size = std::min(payload_length, read_buffer_->tailroom()); auto additional_bytes = payload_length > read_buffer_->tailroom() ? payload_length - read_buffer_->tailroom() : 0; - std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + std::memcpy(read_buffer_->writableTail(), payload.first, write_size); read_buffer_->append(write_size); if (!read_buffer_->tailroom()) { notifyApplication(); - std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + std::memcpy(read_buffer_->writableTail(), payload.first + write_size, additional_bytes); read_buffer_->append(additional_bytes); } diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc index aa290bef8..8463f84f9 100644 --- a/libtransport/src/protocols/protocol.cc +++ b/libtransport/src/protocols/protocol.cc @@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, index_manager_( std::make_unique(socket_, this, reassembly_protocol)), is_running_(false), - is_first_(false) { + is_first_(false), + on_interest_retransmission_(VOID_HANDLER), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + on_content_object_verification_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + verification_failed_callback_(VOID_HANDLER), + on_payload_(VOID_HANDLER) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); } @@ -46,6 +55,26 @@ int TransportProtocol::start() { // Set it is the first time we schedule an interest is_first_ = true; + // Get all callbacks references before starting + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &on_interest_retransmission_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &on_interest_timeout_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &on_interest_satisfied_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &on_content_object_input_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + &on_content_object_verification_); + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_); + socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback_); + socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + &on_payload_); + // Schedule next interests scheduleNextInterests(); @@ -81,10 +110,7 @@ void TransportProtocol::resume() { } void TransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { + if (!on_payload_) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " @@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } if (!ec) { - on_payload->readSuccess(stats_->getBytesRecv()); + on_payload_->readSuccess(stats_->getBytesRecv()); } else { - on_payload->readError(ec); + on_payload_->readError(ec); } stop(); diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h index 949380959..db4524133 100644 --- a/libtransport/src/protocols/protocol.h +++ b/libtransport/src/protocols/protocol.h @@ -17,6 +17,8 @@ #include +#include +#include #include #include @@ -34,6 +36,8 @@ using namespace core; class IndexVerificationManager; +using ReadCallback = interface::ConsumerSocket::ReadCallback; + class TransportProtocolCallback { virtual void onContentObject(const core::Interest &interest, const core::ContentObject &content_object) = 0; @@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback, // True if it si the first time we schedule an interest std::atomic is_first_; interface::TransportStatistics *stats_; + + // Callbacks + interface::ConsumerInterestCallback *on_interest_retransmission_; + interface::ConsumerInterestCallback *on_interest_output_; + interface::ConsumerInterestCallback *on_interest_timeout_; + interface::ConsumerInterestCallback *on_interest_satisfied_; + interface::ConsumerContentObjectCallback *on_content_object_input_; + interface::ConsumerContentObjectVerificationCallback + *on_content_object_verification_; + interface::ConsumerContentObjectCallback *on_content_object_; + interface::ConsumerTimerCallback *stats_summary_; + interface::ConsumerContentObjectVerificationFailedCallback + *verification_failed_callback_; + ReadCallback *on_payload_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 8a38f8ccf..0a93dec44 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject( } // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_->getInterface(), *content_object); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), *content_object); } - ConsumerInterestCallback *callback_interest = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, - &callback_interest); - if (*callback_interest) { - (*callback_interest)(*socket_->getInterface(), *interest); + if (*on_interest_satisfied_) { + (*on_interest_satisfied_)(*socket_->getInterface(), *interest); } if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { @@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped( socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); uint64_t segment = interest->getName().getSuffix(); - ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { stats_->updateRetxCount(1); - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_retransmission_) { + (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (!is_running_) { @@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped( } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); } else { TRANSPORT_LOGE( @@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { return; } - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_timeout_) { + (*on_interest_timeout_)(*socket_->getInterface(), *interest); } afterDataUnsatisfied(segment); @@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { max_rtx)) { stats_->updateRetxCount(1); - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_retransmission_) { + (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (!is_running_) { @@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); scheduleNextInterests(); @@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } } -void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { +bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { auto interest = getPacket(); core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); @@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_lifetime); interest->setLifetime(interest_lifetime); - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - callback->operator()(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + on_interest_output_->operator()(*socket_->getInterface(), *interest); } if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; + return false; } // This is set to ~0 so that the next interest_retransmissions_ + 1, @@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); + + return true; } void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { @@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { rate_estimator_->onRttUpdate((double)rtt.count()); } - cur_path_->insertNewRtt(rtt.count()); + cur_path_->insertNewRtt(rtt.count(), now); cur_path_->smoothTimer(); if (cur_path_->newPropagationDelayAvailable()) { @@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback - ConsumerTimerCallback *stats_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { + if (*stats_summary_) { auto dt = std::chrono::duration_cast(now - t0_); uint32_t timer_interval_milliseconds = 0; socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, timer_interval_milliseconds); if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_->getInterface(), *stats_); - t0_ = utils::SteadyClock::now(); + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; } } } diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index 412967770..ecc466755 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol, virtual void scheduleNextInterests() override; - void sendInterest(std::uint64_t next_suffix); + bool sendInterest(std::uint64_t next_suffix); void sendInterest(Interest::Ptr &&interest); diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index 439549c85..8bbbadcf2 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor, average_rtt_(0), alpha_(ALPHA) {} -RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { +RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt, + const utils::TimePoint &now) { rtt_ = new_rtt; rtt_samples_.pushBack(new_rtt); @@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { prop_delay_ = rtt_min_; } - last_received_pkt_ = utils::SteadyClock::now(); + last_received_pkt_ = now; return *this; } diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index 6f2afde72..3f037bc76 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -45,7 +45,7 @@ class RaaqmDataPath { * max of RTT. * @param new_rtt is the value of the new RTT */ - RaaqmDataPath &insertNewRtt(uint64_t new_rtt); + RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now); /** * @brief Update the path statistics diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc index 0ac3839dd..72abb599a 100644 --- a/libtransport/src/protocols/rtc.cc +++ b/libtransport/src/protocols/rtc.cc @@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); - ConsumerTimerCallback *stats_callback = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { + if (*stats_summary_) { // Send the stats to the app stats_->updateQueuingDelay(queuingDelay_); stats_->updateLossRatio(lossRate_); stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_->getInterface(), *stats_); + (*stats_summary_)(*socket_->getInterface(), *stats_); } // bound also by interest lifitime* production rate if (!gotNack_) { @@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { interestLifetime); interest->setLifetime(uint32_t(interestLifetime)); - ConsumerInterestCallback *on_interest_output = nullptr; - - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &on_interest_output); - - if (*on_interest_output) { - (*on_interest_output)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { @@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject( uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - ConsumerContentObjectCallback *callback_content_object = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_->getInterface(), *content_object); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), *content_object); } if (segmentNumber >= HICN_MIN_PROBE_SEQ) { -- cgit 1.2.3-korg