aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols')
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc8
-rw-r--r--libtransport/src/protocols/protocol.cc40
-rw-r--r--libtransport/src/protocols/protocol.h18
-rw-r--r--libtransport/src/protocols/raaqm.cc77
-rw-r--r--libtransport/src/protocols/raaqm.h2
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc5
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h2
-rw-r--r--libtransport/src/protocols/rtc.cc23
8 files changed, 91 insertions, 84 deletions
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index c2996ebc1..e15498bb1 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -80,19 +80,19 @@ void ByteStreamReassembly::assembleContent() {
}
void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
+ auto payload = content_object.getPayloadReference();
+ auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
auto additional_bytes = payload_length > read_buffer_->tailroom()
? payload_length - read_buffer_->tailroom()
: 0;
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
read_buffer_->append(write_size);
if (!read_buffer_->tailroom()) {
notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
additional_bytes);
read_buffer_->append(additional_bytes);
}
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index aa290bef8..8463f84f9 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
index_manager_(
std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
is_running_(false),
- is_first_(false) {
+ is_first_(false),
+ on_interest_retransmission_(VOID_HANDLER),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ verification_failed_callback_(VOID_HANDLER),
+ on_payload_(VOID_HANDLER) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
@@ -46,6 +55,26 @@ int TransportProtocol::start() {
// Set it is the first time we schedule an interest
is_first_ = true;
+ // Get all callbacks references before starting
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
+ &on_content_object_verification_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
// Schedule next interests
scheduleNextInterests();
@@ -81,10 +110,7 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
+ if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
@@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
if (!ec) {
- on_payload->readSuccess(stats_->getBytesRecv());
+ on_payload_->readSuccess(stats_->getBytesRecv());
} else {
- on_payload->readError(ec);
+ on_payload_->readError(ec);
}
stop();
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index 949380959..db4524133 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -17,6 +17,8 @@
#include <atomic>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -34,6 +36,8 @@ using namespace core;
class IndexVerificationManager;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectVerificationCallback
+ *on_content_object_verification_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ interface::ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback_;
+ ReadCallback *on_payload_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 8a38f8ccf..0a93dec44 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject(
}
// Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
- ConsumerInterestCallback *callback_interest = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &callback_interest);
- if (*callback_interest) {
- (*callback_interest)(*socket_->getInterface(), *interest);
+ if (*on_interest_satisfied_) {
+ (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
}
if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped(
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
uint64_t segment = interest->getName().getSuffix();
- ConsumerInterestCallback *callback = VOID_HANDLER;
+
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
} else {
TRANSPORT_LOGE(
@@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
return;
}
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_timeout_) {
+ (*on_interest_timeout_)(*socket_->getInterface(), *interest);
}
afterDataUnsatisfied(segment);
@@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
scheduleNextInterests();
@@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
auto interest = getPacket();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
@@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_lifetime);
interest->setLifetime(interest_lifetime);
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- callback->operator()(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
+ return false;
}
// This is set to ~0 so that the next interest_retransmissions_ + 1,
@@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
+
+ return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
@@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
rate_estimator_->onRttUpdate((double)rtt.count());
}
- cur_path_->insertNewRtt(rtt.count());
+ cur_path_->insertNewRtt(rtt.count(), now);
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
@@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
- ConsumerTimerCallback *stats_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
uint32_t timer_interval_milliseconds = 0;
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_->getInterface(), *stats_);
- t0_ = utils::SteadyClock::now();
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ t0_ = now;
}
}
}
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index 412967770..ecc466755 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void scheduleNextInterests() override;
- void sendInterest(std::uint64_t next_suffix);
+ bool sendInterest(std::uint64_t next_suffix);
void sendInterest(Interest::Ptr &&interest);
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 439549c85..8bbbadcf2 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
average_rtt_(0),
alpha_(ALPHA) {}
-RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt,
+ const utils::TimePoint &now) {
rtt_ = new_rtt;
rtt_samples_.pushBack(new_rtt);
@@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
prop_delay_ = rtt_min_;
}
- last_received_pkt_ = utils::SteadyClock::now();
+ last_received_pkt_ = now;
return *this;
}
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index 6f2afde72..3f037bc76 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -45,7 +45,7 @@ class RaaqmDataPath {
* max of RTT.
* @param new_rtt is the value of the new RTT
*/
- RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now);
/**
* @brief Update the path statistics
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 0ac3839dd..72abb599a 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
- ConsumerTimerCallback *stats_callback = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(queuingDelay_);
stats_->updateLossRatio(lossRate_);
stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_->getInterface(), *stats_);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
}
// bound also by interest lifitime* production rate
if (!gotNack_) {
@@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
interestLifetime);
interest->setLifetime(uint32_t(interestLifetime));
- ConsumerInterestCallback *on_interest_output = nullptr;
-
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output);
-
- if (*on_interest_output) {
- (*on_interest_output)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
@@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject(
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- ConsumerContentObjectCallback *callback_content_object = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {