aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-03-04 16:21:54 +0100
committerMauro Sardara <msardara@cisco.com>2020-03-09 13:02:50 +0000
commitafe807c61372fe2481e73af63c8382af1e1d3011 (patch)
tree181269bc2548bba9eb667df4301ded197183a4cf /libtransport/src
parent248bfd5ad0ae3cc17bbd3ea3b9a47fa8d075ee58 (diff)
[HICN-540] Optimizations for libhicntransport
Change-Id: I8b46b4eb2ef5488c09041887cc8296a216440f33 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/core/CMakeLists.txt1
-rw-r--r--libtransport/src/core/content_object.cc9
-rw-r--r--libtransport/src/core/forwarder_interface.h4
-rw-r--r--libtransport/src/core/interest.cc9
-rw-r--r--libtransport/src/core/memif_connector.cc2
-rw-r--r--libtransport/src/core/packet.cc54
-rw-r--r--libtransport/src/core/pending_interest.h50
-rw-r--r--libtransport/src/core/portal.h4
-rw-r--r--libtransport/src/implementation/socket_consumer.h23
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc8
-rw-r--r--libtransport/src/protocols/protocol.cc40
-rw-r--r--libtransport/src/protocols/protocol.h18
-rw-r--r--libtransport/src/protocols/raaqm.cc77
-rw-r--r--libtransport/src/protocols/raaqm.h2
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc5
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h2
-rw-r--r--libtransport/src/protocols/rtc.cc23
-rw-r--r--libtransport/src/utils/membuf.cc2
18 files changed, 137 insertions, 196 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt
index 12ef9cfe4..5c8ab9270 100644
--- a/libtransport/src/core/CMakeLists.txt
+++ b/libtransport/src/core/CMakeLists.txt
@@ -33,7 +33,6 @@ list(APPEND HEADER_FILES
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc
${CMAKE_CURRENT_SOURCE_DIR}/interest.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc
${CMAKE_CURRENT_SOURCE_DIR}/packet.cc
${CMAKE_CURRENT_SOURCE_DIR}/name.cc
${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc
diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc
index 6cbcdb29e..f5cccf404 100644
--- a/libtransport/src/core/content_object.cc
+++ b/libtransport/src/core/content_object.cc
@@ -86,15 +86,6 @@ ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) {
ContentObject::~ContentObject() {}
-void ContentObject::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) <
- 0) {
- throw errors::RuntimeException("Error getting name from content object.");
- }
-}
-
const Name &ContentObject::getName() const {
if (!name_) {
if (hicn_data_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/forwarder_interface.h b/libtransport/src/core/forwarder_interface.h
index 3e70e221d..3b016c4bb 100644
--- a/libtransport/src/core/forwarder_interface.h
+++ b/libtransport/src/core/forwarder_interface.h
@@ -95,15 +95,11 @@ class ForwarderInterface {
packet.setLocator(inet6_address_);
}
- // TRANSPORT_LOGI("Sending packet %s at %lu",
- // packet.getName().toString().c_str(),
- // utils::SteadyClock::now().time_since_epoch().count());
packet.setChecksum();
connector_.send(packet.acquireMemBufReference());
}
TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) {
- // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check;
counters_.tx_packets++;
counters_.tx_bytes += len;
diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc
index 166632f0a..9ee662615 100644
--- a/libtransport/src/core/interest.cc
+++ b/libtransport/src/core/interest.cc
@@ -72,15 +72,6 @@ Interest::Interest(Interest &&other_interest)
Interest::~Interest() {}
-void Interest::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
const Name &Interest::getName() const {
if (!name_) {
if (hicn_interest_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 2292e9b41..179db63e4 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -353,7 +353,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
c->rx_buf_num += rx;
- if (TRANSPORT_EXPECT_TRUE(connector->io_service_.stopped())) {
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
goto error;
}
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index 67e647fca..6815868f0 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -69,14 +69,6 @@ Packet::Packet(Packet &&other)
Packet::~Packet() {}
-std::size_t Packet::getHeaderSizeFromFormat(Format format,
- size_t signature_size) {
- std::size_t header_length;
- hicn_packet_get_header_length_from_format(format, &header_length);
- int is_ah = _is_ah(format);
- return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
-}
-
std::size_t Packet::getHeaderSizeFromBuffer(Format format,
const uint8_t *buffer) {
size_t header_length;
@@ -99,17 +91,6 @@ bool Packet::isInterest(const uint8_t *buffer) {
return !is_interest;
}
-Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer) {
- Format format = HF_UNSPEC;
-
- if (TRANSPORT_EXPECT_FALSE(
- hicn_packet_get_format((const hicn_header_t *)buffer, &format) < 0)) {
- throw errors::MalformedPacketException();
- }
-
- return format;
-}
-
std::size_t Packet::getPayloadSizeFromBuffer(Format format,
const uint8_t *buffer) {
std::size_t payload_length;
@@ -122,14 +103,6 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format,
return payload_length;
}
-void Packet::replace(MemBufPtr &&buffer) {
- packet_ = std::move(buffer);
- packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
- header_head_ = packet_.get();
- payload_head_ = nullptr;
- format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
-}
-
std::size_t Packet::payloadSize() const {
return getPayloadSizeFromBuffer(format_,
reinterpret_cast<uint8_t *>(packet_start_));
@@ -270,17 +243,6 @@ uint8_t *Packet::getSignature() const {
return signature;
}
-std::size_t Packet::getSignatureSize() const {
- size_t size_bytes;
- int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
-
- if (ret < 0) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- return size_bytes;
-}
-
void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
int ret =
hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
@@ -366,22 +328,6 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const {
return hasher.finalize();
}
-void Packet::setChecksum() {
- uint16_t partial_csum = 0;
-
- for (utils::MemBuf *current = header_head_->next();
- current && current != header_head_; current = current->next()) {
- if (partial_csum != 0) {
- partial_csum = ~partial_csum;
- }
- partial_csum = csum(current->data(), current->length(), partial_csum);
- }
- if (hicn_packet_compute_header_checksum(format_, packet_start_,
- partial_csum) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
bool Packet::checkIntegrity() const {
if (hicn_packet_check_integrity(format_, packet_start_) < 0) {
return false;
diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h
index d9ec2ed40..aeff78ea2 100644
--- a/libtransport/src/core/pending_interest.h
+++ b/libtransport/src/core/pending_interest.h
@@ -47,17 +47,29 @@ class PendingInterest {
public:
using Ptr = utils::ObjectPool<PendingInterest>::Ptr;
- PendingInterest();
+ PendingInterest()
+ : interest_(nullptr, nullptr),
+ timer_(),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object,
OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(std::move(on_content_object)),
+ on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
- ~PendingInterest();
+ ~PendingInterest() = default;
template <typename Handler>
TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
@@ -66,19 +78,35 @@ class PendingInterest {
timer_->async_wait(std::forward<Handler &&>(cb));
}
- void cancelTimer();
+ TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); }
- Interest::Ptr &&getInterest();
+ TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() {
+ return std::move(interest_);
+ }
- void setInterest(Interest::Ptr &&interest);
+ TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &&interest) {
+ interest_ = std::move(interest);
+ }
- const OnContentObjectCallback &getOnDataCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback()
+ const {
+ return on_content_object_callback_;
+ }
- void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object);
+ TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback(
+ OnContentObjectCallback &&on_content_object) {
+ PendingInterest::on_content_object_callback_ = on_content_object;
+ }
- const OnInterestTimeoutCallback &getOnTimeoutCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback &
+ getOnTimeoutCallback() const {
+ return on_interest_timeout_callback_;
+ }
- void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout);
+ TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback(
+ OnInterestTimeoutCallback &&on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
+ }
private:
Interest::Ptr interest_;
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index cf1010068..34dfbd826 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -627,6 +627,8 @@ class Portal {
}
private:
+ portal_details::HandlerMemory async_callback_memory_;
+
asio::io_service &io_service_;
asio::io_service internal_io_service_;
portal_details::Pool packet_pool_;
@@ -639,8 +641,6 @@ class Portal {
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- portal_details::HandlerMemory async_callback_memory_;
-
typename ForwarderInt::ConnectorType connector_;
ForwarderInt forwarder_interface_;
};
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 2fc8d2b48..488f238ba 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> {
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>(io_service_)),
+ portal_(std::make_shared<Portal>()),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> {
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
on_content_object_verification_(VOID_HANDLER),
- on_content_object_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
- virtual_download_(false),
timer_interval_milliseconds_(0),
guard_raaqm_params_() {
switch (protocol) {
@@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
verify_signature_ = socket_option_value;
result = SOCKET_OPTION_SET;
@@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
socket_option_value = verify_signature_;
break;
@@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- asio::io_service io_service_;
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
@@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> {
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
- ConsumerContentObjectCallback on_content_object_;
ConsumerTimerCallback stats_summary_;
ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
- // Virtual download for traffic generator
- bool virtual_download_;
-
uint32_t timer_interval_milliseconds_;
// Transport protocol
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index c2996ebc1..e15498bb1 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -80,19 +80,19 @@ void ByteStreamReassembly::assembleContent() {
}
void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
+ auto payload = content_object.getPayloadReference();
+ auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
auto additional_bytes = payload_length > read_buffer_->tailroom()
? payload_length - read_buffer_->tailroom()
: 0;
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
read_buffer_->append(write_size);
if (!read_buffer_->tailroom()) {
notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
additional_bytes);
read_buffer_->append(additional_bytes);
}
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index aa290bef8..8463f84f9 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
index_manager_(
std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
is_running_(false),
- is_first_(false) {
+ is_first_(false),
+ on_interest_retransmission_(VOID_HANDLER),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ verification_failed_callback_(VOID_HANDLER),
+ on_payload_(VOID_HANDLER) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
@@ -46,6 +55,26 @@ int TransportProtocol::start() {
// Set it is the first time we schedule an interest
is_first_ = true;
+ // Get all callbacks references before starting
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
+ &on_content_object_verification_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
// Schedule next interests
scheduleNextInterests();
@@ -81,10 +110,7 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
+ if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
@@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
if (!ec) {
- on_payload->readSuccess(stats_->getBytesRecv());
+ on_payload_->readSuccess(stats_->getBytesRecv());
} else {
- on_payload->readError(ec);
+ on_payload_->readError(ec);
}
stop();
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index 949380959..db4524133 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -17,6 +17,8 @@
#include <atomic>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -34,6 +36,8 @@ using namespace core;
class IndexVerificationManager;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectVerificationCallback
+ *on_content_object_verification_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ interface::ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback_;
+ ReadCallback *on_payload_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 8a38f8ccf..0a93dec44 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject(
}
// Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
- ConsumerInterestCallback *callback_interest = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &callback_interest);
- if (*callback_interest) {
- (*callback_interest)(*socket_->getInterface(), *interest);
+ if (*on_interest_satisfied_) {
+ (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
}
if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped(
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
uint64_t segment = interest->getName().getSuffix();
- ConsumerInterestCallback *callback = VOID_HANDLER;
+
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
} else {
TRANSPORT_LOGE(
@@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
return;
}
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_timeout_) {
+ (*on_interest_timeout_)(*socket_->getInterface(), *interest);
}
afterDataUnsatisfied(segment);
@@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
scheduleNextInterests();
@@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
auto interest = getPacket();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
@@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_lifetime);
interest->setLifetime(interest_lifetime);
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- callback->operator()(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
+ return false;
}
// This is set to ~0 so that the next interest_retransmissions_ + 1,
@@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
+
+ return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
@@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
rate_estimator_->onRttUpdate((double)rtt.count());
}
- cur_path_->insertNewRtt(rtt.count());
+ cur_path_->insertNewRtt(rtt.count(), now);
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
@@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
- ConsumerTimerCallback *stats_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
uint32_t timer_interval_milliseconds = 0;
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_->getInterface(), *stats_);
- t0_ = utils::SteadyClock::now();
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ t0_ = now;
}
}
}
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index 412967770..ecc466755 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void scheduleNextInterests() override;
- void sendInterest(std::uint64_t next_suffix);
+ bool sendInterest(std::uint64_t next_suffix);
void sendInterest(Interest::Ptr &&interest);
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 439549c85..8bbbadcf2 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
average_rtt_(0),
alpha_(ALPHA) {}
-RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt,
+ const utils::TimePoint &now) {
rtt_ = new_rtt;
rtt_samples_.pushBack(new_rtt);
@@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
prop_delay_ = rtt_min_;
}
- last_received_pkt_ = utils::SteadyClock::now();
+ last_received_pkt_ = now;
return *this;
}
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index 6f2afde72..3f037bc76 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -45,7 +45,7 @@ class RaaqmDataPath {
* max of RTT.
* @param new_rtt is the value of the new RTT
*/
- RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now);
/**
* @brief Update the path statistics
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 0ac3839dd..72abb599a 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
- ConsumerTimerCallback *stats_callback = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(queuingDelay_);
stats_->updateLossRatio(lossRate_);
stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_->getInterface(), *stats_);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
}
// bound also by interest lifitime* production rate
if (!gotNack_) {
@@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
interestLifetime);
interest->setLifetime(uint32_t(interestLifetime));
- ConsumerInterestCallback *on_interest_output = nullptr;
-
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output);
-
- if (*on_interest_output) {
- (*on_interest_output)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
@@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject(
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- ConsumerContentObjectCallback *callback_content_object = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
diff --git a/libtransport/src/utils/membuf.cc b/libtransport/src/utils/membuf.cc
index e75e85b35..94e5b13a1 100644
--- a/libtransport/src/utils/membuf.cc
+++ b/libtransport/src/utils/membuf.cc
@@ -102,8 +102,6 @@ struct MemBuf::HeapStorage {
};
struct MemBuf::HeapFullStorage {
- // Make sure jemalloc allocates from the 64-byte class. Putting this here
- // because HeapStorage is private so it can't be at namespace level.
static_assert(sizeof(HeapStorage) <= 64,
"MemBuf may not grow over 56 bytes!");