summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-03-04 16:21:54 +0100
committerMauro Sardara <msardara@cisco.com>2020-03-09 13:02:50 +0000
commitafe807c61372fe2481e73af63c8382af1e1d3011 (patch)
tree181269bc2548bba9eb667df4301ded197183a4cf /libtransport
parent248bfd5ad0ae3cc17bbd3ea3b9a47fa8d075ee58 (diff)
[HICN-540] Optimizations for libhicntransport
Change-Id: I8b46b4eb2ef5488c09041887cc8296a216440f33 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/includes/hicn/transport/core/content_object.h2
-rw-r--r--libtransport/includes/hicn/transport/core/interest.h2
-rw-r--r--libtransport/includes/hicn/transport/core/packet.h72
-rw-r--r--libtransport/includes/hicn/transport/utils/CMakeLists.txt1
-rw-r--r--libtransport/includes/hicn/transport/utils/fixed_block_allocator.h136
-rw-r--r--libtransport/src/core/CMakeLists.txt1
-rw-r--r--libtransport/src/core/content_object.cc9
-rw-r--r--libtransport/src/core/forwarder_interface.h4
-rw-r--r--libtransport/src/core/interest.cc9
-rw-r--r--libtransport/src/core/memif_connector.cc2
-rw-r--r--libtransport/src/core/packet.cc54
-rw-r--r--libtransport/src/core/pending_interest.h50
-rw-r--r--libtransport/src/core/portal.h4
-rw-r--r--libtransport/src/implementation/socket_consumer.h23
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc8
-rw-r--r--libtransport/src/protocols/protocol.cc40
-rw-r--r--libtransport/src/protocols/protocol.h18
-rw-r--r--libtransport/src/protocols/raaqm.cc77
-rw-r--r--libtransport/src/protocols/raaqm.h2
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc5
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h2
-rw-r--r--libtransport/src/protocols/rtc.cc23
-rw-r--r--libtransport/src/utils/membuf.cc2
23 files changed, 341 insertions, 205 deletions
diff --git a/libtransport/includes/hicn/transport/core/content_object.h b/libtransport/includes/hicn/transport/core/content_object.h
index 5af548fe4..822790e56 100644
--- a/libtransport/includes/hicn/transport/core/content_object.h
+++ b/libtransport/includes/hicn/transport/core/content_object.h
@@ -46,8 +46,6 @@ class ContentObject : public Packet {
~ContentObject() override;
- void replace(MemBufPtr &&buffer) override;
-
const Name &getName() const override;
Name &getWritableName() override;
diff --git a/libtransport/includes/hicn/transport/core/interest.h b/libtransport/includes/hicn/transport/core/interest.h
index f0c546a8e..c572afbff 100644
--- a/libtransport/includes/hicn/transport/core/interest.h
+++ b/libtransport/includes/hicn/transport/core/interest.h
@@ -44,8 +44,6 @@ class Interest
~Interest() override;
- void replace(MemBufPtr &&buffer) override;
-
const Name &getName() const override;
Name &getWritableName() override;
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h
index e758aa13e..3ddc4a595 100644
--- a/libtransport/includes/hicn/transport/core/packet.h
+++ b/libtransport/includes/hicn/transport/core/packet.h
@@ -17,6 +17,7 @@
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/payload_type.h>
+#include <hicn/transport/errors/malformed_packet_exception.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/security/crypto_hasher.h>
#include <hicn/transport/security/crypto_suite.h>
@@ -81,7 +82,12 @@ class Packet : public std::enable_shared_from_this<Packet> {
virtual ~Packet();
static std::size_t getHeaderSizeFromFormat(Format format,
- std::size_t signature_size = 0);
+ std::size_t signature_size = 0) {
+ std::size_t header_length;
+ hicn_packet_get_header_length_from_format(format, &header_length);
+ int is_ah = _is_ah(format);
+ return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
+ }
static std::size_t getHeaderSizeFromBuffer(Format format,
const uint8_t *buffer);
@@ -91,9 +97,26 @@ class Packet : public std::enable_shared_from_this<Packet> {
static bool isInterest(const uint8_t *buffer);
- static Format getFormatFromBuffer(const uint8_t *buffer);
+ static Format getFormatFromBuffer(const uint8_t *buffer) {
+ Format format = HF_UNSPEC;
- virtual void replace(MemBufPtr &&buffer);
+ if (TRANSPORT_EXPECT_FALSE(
+ hicn_packet_get_format((const hicn_header_t *)buffer, &format) <
+ 0)) {
+ throw errors::MalformedPacketException();
+ }
+
+ return format;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void replace(MemBufPtr &&buffer) {
+ packet_ = std::move(buffer);
+ packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
+ header_head_ = packet_.get();
+ payload_head_ = nullptr;
+ format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
+ name_.clear();
+ }
std::size_t payloadSize() const;
@@ -123,6 +146,19 @@ class Packet : public std::enable_shared_from_this<Packet> {
std::unique_ptr<utils::MemBuf> getPayload() const;
+ std::pair<const uint8_t *, std::size_t> getPayloadReference() const {
+ int signature_size = 0;
+ if (_is_ah(format_)) {
+ signature_size = (uint32_t)getSignatureSize();
+ }
+
+ auto header_size = getHeaderSizeFromFormat(format_, signature_size);
+ auto payload_length = packet_->length() - header_size;
+
+ return std::make_pair(packet_->data() + header_size,
+ payload_length);
+ }
+
Packet &updateLength(std::size_t length = 0);
PayloadType getPayloadType() const;
@@ -152,7 +188,21 @@ class Packet : public std::enable_shared_from_this<Packet> {
virtual utils::CryptoHash computeDigest(
utils::CryptoHashType algorithm) const;
- void setChecksum();
+ void setChecksum() {
+ uint16_t partial_csum = 0;
+
+ for (utils::MemBuf *current = header_head_->next();
+ current && current != header_head_; current = current->next()) {
+ if (partial_csum != 0) {
+ partial_csum = ~partial_csum;
+ }
+ partial_csum = csum(current->data(), current->length(), partial_csum);
+ }
+ if (hicn_packet_compute_header_checksum(format_, packet_start_,
+ partial_csum) < 0) {
+ throw errors::MalformedPacketException();
+ }
+ }
bool checkIntegrity() const;
@@ -184,7 +234,19 @@ class Packet : public std::enable_shared_from_this<Packet> {
private:
virtual void resetForHash() = 0;
void setSignatureSize(std::size_t size_bytes);
- std::size_t getSignatureSize() const;
+
+ std::size_t getSignatureSize() const {
+ size_t size_bytes;
+ int ret =
+ hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
+
+ if (ret < 0) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ return size_bytes;
+ }
+
uint8_t *getSignature() const;
void separateHeaderPayload();
diff --git a/libtransport/includes/hicn/transport/utils/CMakeLists.txt b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
index 396bd06d6..38ecc3d37 100644
--- a/libtransport/includes/hicn/transport/utils/CMakeLists.txt
+++ b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
@@ -28,6 +28,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/object_pool.h
${CMAKE_CURRENT_SOURCE_DIR}/membuf.h
${CMAKE_CURRENT_SOURCE_DIR}/spinlock.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/fixed_block_allocator.h
)
if(NOT WIN32)
diff --git a/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h
new file mode 100644
index 000000000..1ade1516e
--- /dev/null
+++ b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/portability/c_portability.h>
+#include <hicn/transport/utils/spinlock.h>
+
+#include <stdint.h>
+#include <cstdlib>
+#include <memory>
+#include <cassert>
+
+namespace utils {
+template <std::size_t DEFAULT_SIZE = 512, std::size_t OBJECTS = 4096>
+class FixedBlockAllocator {
+ FixedBlockAllocator(std::size_t size = DEFAULT_SIZE,
+ std::size_t objects = OBJECTS)
+ : block_size_(size < sizeof(void*) ? sizeof(long*) : size),
+ object_size_(size),
+ max_objects_(objects),
+ p_head_(NULL),
+ pool_index_(0),
+ block_count_(0),
+ blocks_in_use_(0),
+ allocations_(0),
+ deallocations_(0) {
+ p_pool_ = (uint8_t*)new uint8_t[block_size_ * max_objects_];
+ }
+
+ public:
+ static FixedBlockAllocator* getInstance() {
+ if (!instance_) {
+ instance_ = std::unique_ptr<FixedBlockAllocator>(
+ new FixedBlockAllocator(DEFAULT_SIZE, OBJECTS));
+ }
+
+ return instance_.get();
+ }
+
+ ~FixedBlockAllocator() { delete[] p_pool_; }
+
+ TRANSPORT_ALWAYS_INLINE void* allocateBlock(size_t size = DEFAULT_SIZE) {
+ assert(size <= DEFAULT_SIZE);
+ uint32_t index;
+
+ void* p_block = pop();
+ if (!p_block) {
+ if (pool_index_ < max_objects_) {
+ {
+ SpinLock::Acquire locked(lock_);
+ index = pool_index_++;
+ }
+ p_block = (void*)(p_pool_ + (index * block_size_));
+ } else {
+ // TODO Consider increasing pool here instead of throwing an exception
+ throw std::runtime_error("No more memory available from packet pool!");
+ }
+ }
+
+ blocks_in_use_++;
+ allocations_++;
+
+ return p_block;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void deallocateBlock(void* pBlock) {
+ push(pBlock);
+ {
+ SpinLock::Acquire locked(lock_);
+ blocks_in_use_--;
+ deallocations_++;
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE std::size_t blockSize() { return block_size_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t blockCount() { return block_count_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t blocksInUse() { return blocks_in_use_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t allocations() { return allocations_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t deallocations() { return deallocations_; }
+
+ private:
+ TRANSPORT_ALWAYS_INLINE void push(void* p_memory) {
+ Block* p_block = (Block*)p_memory;
+ {
+ SpinLock::Acquire locked(lock_);
+ p_block->p_next = p_head_;
+ p_head_ = p_block;
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE void* pop() {
+ Block* p_block = nullptr;
+
+ {
+ SpinLock::Acquire locked(lock_);
+ if (p_head_) {
+ p_block = p_head_;
+ p_head_ = p_head_->p_next;
+ }
+ }
+
+ return (void*)p_block;
+ }
+
+ struct Block {
+ Block* p_next;
+ };
+
+ static std::unique_ptr<FixedBlockAllocator> instance_;
+
+ const std::size_t block_size_;
+ const std::size_t object_size_;
+ const std::size_t max_objects_;
+
+ Block* p_head_;
+ uint8_t* p_pool_;
+ uint32_t pool_index_;
+ uint32_t block_count_;
+ uint32_t blocks_in_use_;
+ uint32_t allocations_;
+ uint32_t deallocations_;
+
+ SpinLock lock_;
+};
+
+template <std::size_t A, std::size_t B>
+std::unique_ptr<FixedBlockAllocator<A, B>>
+ FixedBlockAllocator<A, B>::instance_ = nullptr;
+
+} // namespace utils \ No newline at end of file
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt
index 12ef9cfe4..5c8ab9270 100644
--- a/libtransport/src/core/CMakeLists.txt
+++ b/libtransport/src/core/CMakeLists.txt
@@ -33,7 +33,6 @@ list(APPEND HEADER_FILES
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc
${CMAKE_CURRENT_SOURCE_DIR}/interest.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc
${CMAKE_CURRENT_SOURCE_DIR}/packet.cc
${CMAKE_CURRENT_SOURCE_DIR}/name.cc
${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc
diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc
index 6cbcdb29e..f5cccf404 100644
--- a/libtransport/src/core/content_object.cc
+++ b/libtransport/src/core/content_object.cc
@@ -86,15 +86,6 @@ ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) {
ContentObject::~ContentObject() {}
-void ContentObject::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) <
- 0) {
- throw errors::RuntimeException("Error getting name from content object.");
- }
-}
-
const Name &ContentObject::getName() const {
if (!name_) {
if (hicn_data_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/forwarder_interface.h b/libtransport/src/core/forwarder_interface.h
index 3e70e221d..3b016c4bb 100644
--- a/libtransport/src/core/forwarder_interface.h
+++ b/libtransport/src/core/forwarder_interface.h
@@ -95,15 +95,11 @@ class ForwarderInterface {
packet.setLocator(inet6_address_);
}
- // TRANSPORT_LOGI("Sending packet %s at %lu",
- // packet.getName().toString().c_str(),
- // utils::SteadyClock::now().time_since_epoch().count());
packet.setChecksum();
connector_.send(packet.acquireMemBufReference());
}
TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) {
- // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check;
counters_.tx_packets++;
counters_.tx_bytes += len;
diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc
index 166632f0a..9ee662615 100644
--- a/libtransport/src/core/interest.cc
+++ b/libtransport/src/core/interest.cc
@@ -72,15 +72,6 @@ Interest::Interest(Interest &&other_interest)
Interest::~Interest() {}
-void Interest::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
const Name &Interest::getName() const {
if (!name_) {
if (hicn_interest_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 2292e9b41..179db63e4 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -353,7 +353,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
c->rx_buf_num += rx;
- if (TRANSPORT_EXPECT_TRUE(connector->io_service_.stopped())) {
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
goto error;
}
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index 67e647fca..6815868f0 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -69,14 +69,6 @@ Packet::Packet(Packet &&other)
Packet::~Packet() {}
-std::size_t Packet::getHeaderSizeFromFormat(Format format,
- size_t signature_size) {
- std::size_t header_length;
- hicn_packet_get_header_length_from_format(format, &header_length);
- int is_ah = _is_ah(format);
- return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
-}
-
std::size_t Packet::getHeaderSizeFromBuffer(Format format,
const uint8_t *buffer) {
size_t header_length;
@@ -99,17 +91,6 @@ bool Packet::isInterest(const uint8_t *buffer) {
return !is_interest;
}
-Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer) {
- Format format = HF_UNSPEC;
-
- if (TRANSPORT_EXPECT_FALSE(
- hicn_packet_get_format((const hicn_header_t *)buffer, &format) < 0)) {
- throw errors::MalformedPacketException();
- }
-
- return format;
-}
-
std::size_t Packet::getPayloadSizeFromBuffer(Format format,
const uint8_t *buffer) {
std::size_t payload_length;
@@ -122,14 +103,6 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format,
return payload_length;
}
-void Packet::replace(MemBufPtr &&buffer) {
- packet_ = std::move(buffer);
- packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
- header_head_ = packet_.get();
- payload_head_ = nullptr;
- format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
-}
-
std::size_t Packet::payloadSize() const {
return getPayloadSizeFromBuffer(format_,
reinterpret_cast<uint8_t *>(packet_start_));
@@ -270,17 +243,6 @@ uint8_t *Packet::getSignature() const {
return signature;
}
-std::size_t Packet::getSignatureSize() const {
- size_t size_bytes;
- int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
-
- if (ret < 0) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- return size_bytes;
-}
-
void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
int ret =
hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
@@ -366,22 +328,6 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const {
return hasher.finalize();
}
-void Packet::setChecksum() {
- uint16_t partial_csum = 0;
-
- for (utils::MemBuf *current = header_head_->next();
- current && current != header_head_; current = current->next()) {
- if (partial_csum != 0) {
- partial_csum = ~partial_csum;
- }
- partial_csum = csum(current->data(), current->length(), partial_csum);
- }
- if (hicn_packet_compute_header_checksum(format_, packet_start_,
- partial_csum) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
bool Packet::checkIntegrity() const {
if (hicn_packet_check_integrity(format_, packet_start_) < 0) {
return false;
diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h
index d9ec2ed40..aeff78ea2 100644
--- a/libtransport/src/core/pending_interest.h
+++ b/libtransport/src/core/pending_interest.h
@@ -47,17 +47,29 @@ class PendingInterest {
public:
using Ptr = utils::ObjectPool<PendingInterest>::Ptr;
- PendingInterest();
+ PendingInterest()
+ : interest_(nullptr, nullptr),
+ timer_(),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object,
OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(std::move(on_content_object)),
+ on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
- ~PendingInterest();
+ ~PendingInterest() = default;
template <typename Handler>
TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
@@ -66,19 +78,35 @@ class PendingInterest {
timer_->async_wait(std::forward<Handler &&>(cb));
}
- void cancelTimer();
+ TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); }
- Interest::Ptr &&getInterest();
+ TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() {
+ return std::move(interest_);
+ }
- void setInterest(Interest::Ptr &&interest);
+ TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &&interest) {
+ interest_ = std::move(interest);
+ }
- const OnContentObjectCallback &getOnDataCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback()
+ const {
+ return on_content_object_callback_;
+ }
- void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object);
+ TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback(
+ OnContentObjectCallback &&on_content_object) {
+ PendingInterest::on_content_object_callback_ = on_content_object;
+ }
- const OnInterestTimeoutCallback &getOnTimeoutCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback &
+ getOnTimeoutCallback() const {
+ return on_interest_timeout_callback_;
+ }
- void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout);
+ TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback(
+ OnInterestTimeoutCallback &&on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
+ }
private:
Interest::Ptr interest_;
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index cf1010068..34dfbd826 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -627,6 +627,8 @@ class Portal {
}
private:
+ portal_details::HandlerMemory async_callback_memory_;
+
asio::io_service &io_service_;
asio::io_service internal_io_service_;
portal_details::Pool packet_pool_;
@@ -639,8 +641,6 @@ class Portal {
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- portal_details::HandlerMemory async_callback_memory_;
-
typename ForwarderInt::ConnectorType connector_;
ForwarderInt forwarder_interface_;
};
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 2fc8d2b48..488f238ba 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> {
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>(io_service_)),
+ portal_(std::make_shared<Portal>()),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> {
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
on_content_object_verification_(VOID_HANDLER),
- on_content_object_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
- virtual_download_(false),
timer_interval_milliseconds_(0),
guard_raaqm_params_() {
switch (protocol) {
@@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
verify_signature_ = socket_option_value;
result = SOCKET_OPTION_SET;
@@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
socket_option_value = verify_signature_;
break;
@@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- asio::io_service io_service_;
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
@@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> {
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
- ConsumerContentObjectCallback on_content_object_;
ConsumerTimerCallback stats_summary_;
ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
- // Virtual download for traffic generator
- bool virtual_download_;
-
uint32_t timer_interval_milliseconds_;
// Transport protocol
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index c2996ebc1..e15498bb1 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -80,19 +80,19 @@ void ByteStreamReassembly::assembleContent() {
}
void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
+ auto payload = content_object.getPayloadReference();
+ auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
auto additional_bytes = payload_length > read_buffer_->tailroom()
? payload_length - read_buffer_->tailroom()
: 0;
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
read_buffer_->append(write_size);
if (!read_buffer_->tailroom()) {
notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
additional_bytes);
read_buffer_->append(additional_bytes);
}
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index aa290bef8..8463f84f9 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
index_manager_(
std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
is_running_(false),
- is_first_(false) {
+ is_first_(false),
+ on_interest_retransmission_(VOID_HANDLER),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ verification_failed_callback_(VOID_HANDLER),
+ on_payload_(VOID_HANDLER) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
@@ -46,6 +55,26 @@ int TransportProtocol::start() {
// Set it is the first time we schedule an interest
is_first_ = true;
+ // Get all callbacks references before starting
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
+ &on_content_object_verification_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
// Schedule next interests
scheduleNextInterests();
@@ -81,10 +110,7 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
+ if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
@@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
if (!ec) {
- on_payload->readSuccess(stats_->getBytesRecv());
+ on_payload_->readSuccess(stats_->getBytesRecv());
} else {
- on_payload->readError(ec);
+ on_payload_->readError(ec);
}
stop();
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index 949380959..db4524133 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -17,6 +17,8 @@
#include <atomic>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -34,6 +36,8 @@ using namespace core;
class IndexVerificationManager;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectVerificationCallback
+ *on_content_object_verification_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ interface::ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback_;
+ ReadCallback *on_payload_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 8a38f8ccf..0a93dec44 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject(
}
// Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
- ConsumerInterestCallback *callback_interest = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &callback_interest);
- if (*callback_interest) {
- (*callback_interest)(*socket_->getInterface(), *interest);
+ if (*on_interest_satisfied_) {
+ (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
}
if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped(
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
uint64_t segment = interest->getName().getSuffix();
- ConsumerInterestCallback *callback = VOID_HANDLER;
+
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
} else {
TRANSPORT_LOGE(
@@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
return;
}
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_timeout_) {
+ (*on_interest_timeout_)(*socket_->getInterface(), *interest);
}
afterDataUnsatisfied(segment);
@@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
scheduleNextInterests();
@@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
auto interest = getPacket();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
@@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_lifetime);
interest->setLifetime(interest_lifetime);
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- callback->operator()(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
+ return false;
}
// This is set to ~0 so that the next interest_retransmissions_ + 1,
@@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
+
+ return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
@@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
rate_estimator_->onRttUpdate((double)rtt.count());
}
- cur_path_->insertNewRtt(rtt.count());
+ cur_path_->insertNewRtt(rtt.count(), now);
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
@@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
- ConsumerTimerCallback *stats_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
uint32_t timer_interval_milliseconds = 0;
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_->getInterface(), *stats_);
- t0_ = utils::SteadyClock::now();
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ t0_ = now;
}
}
}
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index 412967770..ecc466755 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void scheduleNextInterests() override;
- void sendInterest(std::uint64_t next_suffix);
+ bool sendInterest(std::uint64_t next_suffix);
void sendInterest(Interest::Ptr &&interest);
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 439549c85..8bbbadcf2 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
average_rtt_(0),
alpha_(ALPHA) {}
-RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt,
+ const utils::TimePoint &now) {
rtt_ = new_rtt;
rtt_samples_.pushBack(new_rtt);
@@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
prop_delay_ = rtt_min_;
}
- last_received_pkt_ = utils::SteadyClock::now();
+ last_received_pkt_ = now;
return *this;
}
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index 6f2afde72..3f037bc76 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -45,7 +45,7 @@ class RaaqmDataPath {
* max of RTT.
* @param new_rtt is the value of the new RTT
*/
- RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now);
/**
* @brief Update the path statistics
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 0ac3839dd..72abb599a 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
- ConsumerTimerCallback *stats_callback = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(queuingDelay_);
stats_->updateLossRatio(lossRate_);
stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_->getInterface(), *stats_);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
}
// bound also by interest lifitime* production rate
if (!gotNack_) {
@@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
interestLifetime);
interest->setLifetime(uint32_t(interestLifetime));
- ConsumerInterestCallback *on_interest_output = nullptr;
-
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output);
-
- if (*on_interest_output) {
- (*on_interest_output)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
@@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject(
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- ConsumerContentObjectCallback *callback_content_object = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
diff --git a/libtransport/src/utils/membuf.cc b/libtransport/src/utils/membuf.cc
index e75e85b35..94e5b13a1 100644
--- a/libtransport/src/utils/membuf.cc
+++ b/libtransport/src/utils/membuf.cc
@@ -102,8 +102,6 @@ struct MemBuf::HeapStorage {
};
struct MemBuf::HeapFullStorage {
- // Make sure jemalloc allocates from the 64-byte class. Putting this here
- // because HeapStorage is private so it can't be at namespace level.
static_assert(sizeof(HeapStorage) <= 64,
"MemBuf may not grow over 56 bytes!");