summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/source/telemetry.md28
-rw-r--r--hicn-light/src/hicn/core/forwarder.c5
-rw-r--r--hicn-light/src/hicn/core/forwarder.h6
-rw-r--r--hicn-light/src/hicn/socket/CMakeLists.txt2
-rw-r--r--hicn-plugin/src/faces/app/face_prod.c18
-rw-r--r--libtransport/includes/hicn/transport/core/content_object.h2
-rw-r--r--libtransport/includes/hicn/transport/core/interest.h2
-rw-r--r--libtransport/includes/hicn/transport/core/packet.h72
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_options_keys.h1
-rw-r--r--libtransport/includes/hicn/transport/utils/CMakeLists.txt1
-rw-r--r--libtransport/includes/hicn/transport/utils/fixed_block_allocator.h136
-rw-r--r--libtransport/src/core/CMakeLists.txt1
-rw-r--r--libtransport/src/core/content_object.cc9
-rw-r--r--libtransport/src/core/forwarder_interface.h4
-rw-r--r--libtransport/src/core/interest.cc9
-rw-r--r--libtransport/src/core/memif_connector.cc14
-rw-r--r--libtransport/src/core/memif_connector.h2
-rw-r--r--libtransport/src/core/packet.cc54
-rw-r--r--libtransport/src/core/pending_interest.h50
-rw-r--r--libtransport/src/core/portal.h20
-rw-r--r--libtransport/src/implementation/socket_consumer.h23
-rw-r--r--libtransport/src/implementation/socket_producer.h94
-rw-r--r--libtransport/src/protocols/byte_stream_reassembly.cc13
-rw-r--r--libtransport/src/protocols/protocol.cc40
-rw-r--r--libtransport/src/protocols/protocol.h18
-rw-r--r--libtransport/src/protocols/raaqm.cc77
-rw-r--r--libtransport/src/protocols/raaqm.h2
-rw-r--r--libtransport/src/protocols/raaqm_data_path.cc5
-rw-r--r--libtransport/src/protocols/raaqm_data_path.h2
-rw-r--r--libtransport/src/protocols/rtc.cc23
-rw-r--r--libtransport/src/security/signer.cc4
-rw-r--r--libtransport/src/security/verifier.cc16
-rw-r--r--libtransport/src/utils/membuf.cc2
-rw-r--r--telemetry/vpp-collectd/CMakeLists.txt3
-rw-r--r--telemetry/vpp-collectd/custom_types.db39
-rw-r--r--telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c124
-rw-r--r--telemetry/vpp-collectd/vpp/vpp.c138
-rw-r--r--utils/src/hiperf.cc42
38 files changed, 713 insertions, 388 deletions
diff --git a/docs/source/telemetry.md b/docs/source/telemetry.md
index 84800c8b2..8887b8f1f 100644
--- a/docs/source/telemetry.md
+++ b/docs/source/telemetry.md
@@ -61,19 +61,31 @@ LoadPlugin vpp_hicn
Before running collectd, a vpp forwarder must be started. If the vpp-hicn plugin
is used, the hicn-plugin must be available in the vpp forwarder.
+If you need the custom types that the two plugins define, they are present in
+`telemetry/custom_types.db`. It is useful if you are using InfluxDB as it requires
+the type database for multi-value metrics
+(see [CollectD protocol support in InfluxDB](https://docs.influxdata.com/influxdb/v1.7/supported_protocols/collectd/)).
+
+## Plugin options
+`vpp` and `vpp-hicn` have the same two options:
+- `Verbose` enables additional statistics. You can check the sources to have an exact list of available metrics.
+- `Tag` tags the data with the given string. Useful for identifying the context in which the data was retrieved in InfluxDB for instance. If the tag value is `None`, no tag is applied.
+
### Example: storing statistics from vpp and vpp-hicn
We'll use the rrdtool and csv plugins to store statistics from vpp and vpp-hicn.
+Copy the configuration below in a file called `collectd.conf` and move
+it to `/etc/collectd`:
-Edit the configuration file as the following:
-
-```html
+```
######################################################################
# Global #
######################################################################
FQDNLookup true
BaseDir "/var/lib/collectd"
Interval 1
+# if you are using custom_types.db, you can specify it
+TypesDB "/usr/share/collectd/types.db" "/etc/collectd/custom_types.db"
######################################################################
# Logging #
@@ -106,6 +118,16 @@ LoadPlugin vpp_hicn
<Plugin rrdtool>
DataDir "/var/lib/collectd/rrd" # the folder where statistics are stored in rrd
</Plugin>
+
+<Plugin vpp>
+ Verbose true
+ Tag "None"
+</Plugin>
+
+<Plugin vpp_hicn>
+ Verbose true
+ Tag "None"
+</Plugin>
```
Run vpp and collectd:
diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c
index 0d7575ea0..f7b0af2c2 100644
--- a/hicn-light/src/hicn/core/forwarder.c
+++ b/hicn-light/src/hicn/core/forwarder.c
@@ -103,9 +103,10 @@ struct forwarder {
PARCClock *clock;
+#if !defined(__APPLE__)
hicn_socket_helper_t
*hicnSocketHelper; // state required to manage hicn connections
-
+#endif
// used by seed48 and nrand48
unsigned short seed[3];
@@ -523,9 +524,11 @@ PARCClock *forwarder_GetClock(const Forwarder *forwarder) {
return forwarder->clock;
}
+#if !defined(__APPLE__)
hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) {
return forwarder->hicnSocketHelper;
}
+#endif
// =======================================================
diff --git a/hicn-light/src/hicn/core/forwarder.h b/hicn-light/src/hicn/core/forwarder.h
index a2401d625..d1815b7d4 100644
--- a/hicn-light/src/hicn/core/forwarder.h
+++ b/hicn-light/src/hicn/core/forwarder.h
@@ -46,7 +46,9 @@
#include <parc/algol/parc_Clock.h>
+#if !defined(__APPLE__)
#include <hicn/socket/api.h>
+#endif
#define PORT_NUMBER 9695
#define PORT_NUMBER_AS_STRING "9695"
@@ -265,9 +267,9 @@ void forwarder_ClearCache(Forwarder *forwarder);
void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix,
strategy_type strategy, unsigned related_prefixes_len,
Name **related_prefixes);
-
+#if !defined(__APPLE__)
hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder);
-
+#endif
#ifdef WITH_MAPME
/**
diff --git a/hicn-light/src/hicn/socket/CMakeLists.txt b/hicn-light/src/hicn/socket/CMakeLists.txt
index ce2a9caf4..775693bf0 100644
--- a/hicn-light/src/hicn/socket/CMakeLists.txt
+++ b/hicn-light/src/hicn/socket/CMakeLists.txt
@@ -34,4 +34,4 @@ set(TO_INSTALL_HEADER_FILES
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
-set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/hicn-plugin/src/faces/app/face_prod.c b/hicn-plugin/src/faces/app/face_prod.c
index c183d6589..ae59719ce 100644
--- a/hicn-plugin/src/faces/app/face_prod.c
+++ b/hicn-plugin/src/faces/app/face_prod.c
@@ -271,6 +271,24 @@ hicn_face_prod_add (fib_prefix_t * prefix, u32 sw_if, u32 * cs_reserved,
if (ret == HICN_ERROR_NONE
&& hicn_face_prod_set_lru_max (*faceid, cs_reserved) == HICN_ERROR_NONE)
{
+ if (ip46_address_is_ip4(&(prefix->fp_addr)))
+ {
+ ip4_address_t mask;
+ ip4_preflen_to_mask (prefix->fp_len, &mask);
+ prefix->fp_addr.ip4.as_u32 = prefix->fp_addr.ip4.as_u32 & mask.as_u32;
+ prefix->fp_proto = FIB_PROTOCOL_IP4;
+ }
+ else
+ {
+ ip6_address_t mask;
+ ip6_preflen_to_mask (prefix->fp_len, &mask);
+ prefix->fp_addr.ip6.as_u64[0] =
+ prefix->fp_addr.ip6.as_u64[0] & mask.as_u64[0];
+ prefix->fp_addr.ip6.as_u64[1] =
+ prefix->fp_addr.ip6.as_u64[1] & mask.as_u64[1];
+ prefix->fp_proto = FIB_PROTOCOL_IP6;
+ }
+
hicn_app_state_create (sw_if, prefix);
ret = hicn_route_add (faceid, 1, prefix);
}
diff --git a/libtransport/includes/hicn/transport/core/content_object.h b/libtransport/includes/hicn/transport/core/content_object.h
index 5af548fe4..822790e56 100644
--- a/libtransport/includes/hicn/transport/core/content_object.h
+++ b/libtransport/includes/hicn/transport/core/content_object.h
@@ -46,8 +46,6 @@ class ContentObject : public Packet {
~ContentObject() override;
- void replace(MemBufPtr &&buffer) override;
-
const Name &getName() const override;
Name &getWritableName() override;
diff --git a/libtransport/includes/hicn/transport/core/interest.h b/libtransport/includes/hicn/transport/core/interest.h
index f0c546a8e..c572afbff 100644
--- a/libtransport/includes/hicn/transport/core/interest.h
+++ b/libtransport/includes/hicn/transport/core/interest.h
@@ -44,8 +44,6 @@ class Interest
~Interest() override;
- void replace(MemBufPtr &&buffer) override;
-
const Name &getName() const override;
Name &getWritableName() override;
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h
index e758aa13e..3ddc4a595 100644
--- a/libtransport/includes/hicn/transport/core/packet.h
+++ b/libtransport/includes/hicn/transport/core/packet.h
@@ -17,6 +17,7 @@
#include <hicn/transport/core/name.h>
#include <hicn/transport/core/payload_type.h>
+#include <hicn/transport/errors/malformed_packet_exception.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/security/crypto_hasher.h>
#include <hicn/transport/security/crypto_suite.h>
@@ -81,7 +82,12 @@ class Packet : public std::enable_shared_from_this<Packet> {
virtual ~Packet();
static std::size_t getHeaderSizeFromFormat(Format format,
- std::size_t signature_size = 0);
+ std::size_t signature_size = 0) {
+ std::size_t header_length;
+ hicn_packet_get_header_length_from_format(format, &header_length);
+ int is_ah = _is_ah(format);
+ return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
+ }
static std::size_t getHeaderSizeFromBuffer(Format format,
const uint8_t *buffer);
@@ -91,9 +97,26 @@ class Packet : public std::enable_shared_from_this<Packet> {
static bool isInterest(const uint8_t *buffer);
- static Format getFormatFromBuffer(const uint8_t *buffer);
+ static Format getFormatFromBuffer(const uint8_t *buffer) {
+ Format format = HF_UNSPEC;
- virtual void replace(MemBufPtr &&buffer);
+ if (TRANSPORT_EXPECT_FALSE(
+ hicn_packet_get_format((const hicn_header_t *)buffer, &format) <
+ 0)) {
+ throw errors::MalformedPacketException();
+ }
+
+ return format;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void replace(MemBufPtr &&buffer) {
+ packet_ = std::move(buffer);
+ packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
+ header_head_ = packet_.get();
+ payload_head_ = nullptr;
+ format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
+ name_.clear();
+ }
std::size_t payloadSize() const;
@@ -123,6 +146,19 @@ class Packet : public std::enable_shared_from_this<Packet> {
std::unique_ptr<utils::MemBuf> getPayload() const;
+ std::pair<const uint8_t *, std::size_t> getPayloadReference() const {
+ int signature_size = 0;
+ if (_is_ah(format_)) {
+ signature_size = (uint32_t)getSignatureSize();
+ }
+
+ auto header_size = getHeaderSizeFromFormat(format_, signature_size);
+ auto payload_length = packet_->length() - header_size;
+
+ return std::make_pair(packet_->data() + header_size,
+ payload_length);
+ }
+
Packet &updateLength(std::size_t length = 0);
PayloadType getPayloadType() const;
@@ -152,7 +188,21 @@ class Packet : public std::enable_shared_from_this<Packet> {
virtual utils::CryptoHash computeDigest(
utils::CryptoHashType algorithm) const;
- void setChecksum();
+ void setChecksum() {
+ uint16_t partial_csum = 0;
+
+ for (utils::MemBuf *current = header_head_->next();
+ current && current != header_head_; current = current->next()) {
+ if (partial_csum != 0) {
+ partial_csum = ~partial_csum;
+ }
+ partial_csum = csum(current->data(), current->length(), partial_csum);
+ }
+ if (hicn_packet_compute_header_checksum(format_, packet_start_,
+ partial_csum) < 0) {
+ throw errors::MalformedPacketException();
+ }
+ }
bool checkIntegrity() const;
@@ -184,7 +234,19 @@ class Packet : public std::enable_shared_from_this<Packet> {
private:
virtual void resetForHash() = 0;
void setSignatureSize(std::size_t size_bytes);
- std::size_t getSignatureSize() const;
+
+ std::size_t getSignatureSize() const {
+ size_t size_bytes;
+ int ret =
+ hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
+
+ if (ret < 0) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ return size_bytes;
+ }
+
uint8_t *getSignature() const;
void separateHeaderPayload();
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
index 7910a4422..0b7a79c3d 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h
@@ -94,7 +94,6 @@ typedef enum {
CACHE_HIT = 506,
CACHE_MISS = 508,
NEW_CONTENT_OBJECT = 509,
- CONTENT_OBJECT_SIGN = 513,
CONTENT_OBJECT_READY = 510,
CONTENT_OBJECT_OUTPUT = 511,
CONTENT_PRODUCED = 512,
diff --git a/libtransport/includes/hicn/transport/utils/CMakeLists.txt b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
index 396bd06d6..38ecc3d37 100644
--- a/libtransport/includes/hicn/transport/utils/CMakeLists.txt
+++ b/libtransport/includes/hicn/transport/utils/CMakeLists.txt
@@ -28,6 +28,7 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/object_pool.h
${CMAKE_CURRENT_SOURCE_DIR}/membuf.h
${CMAKE_CURRENT_SOURCE_DIR}/spinlock.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/fixed_block_allocator.h
)
if(NOT WIN32)
diff --git a/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h
new file mode 100644
index 000000000..1ade1516e
--- /dev/null
+++ b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2019 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/portability/c_portability.h>
+#include <hicn/transport/utils/spinlock.h>
+
+#include <stdint.h>
+#include <cstdlib>
+#include <memory>
+#include <cassert>
+
+namespace utils {
+template <std::size_t DEFAULT_SIZE = 512, std::size_t OBJECTS = 4096>
+class FixedBlockAllocator {
+ FixedBlockAllocator(std::size_t size = DEFAULT_SIZE,
+ std::size_t objects = OBJECTS)
+ : block_size_(size < sizeof(void*) ? sizeof(long*) : size),
+ object_size_(size),
+ max_objects_(objects),
+ p_head_(NULL),
+ pool_index_(0),
+ block_count_(0),
+ blocks_in_use_(0),
+ allocations_(0),
+ deallocations_(0) {
+ p_pool_ = (uint8_t*)new uint8_t[block_size_ * max_objects_];
+ }
+
+ public:
+ static FixedBlockAllocator* getInstance() {
+ if (!instance_) {
+ instance_ = std::unique_ptr<FixedBlockAllocator>(
+ new FixedBlockAllocator(DEFAULT_SIZE, OBJECTS));
+ }
+
+ return instance_.get();
+ }
+
+ ~FixedBlockAllocator() { delete[] p_pool_; }
+
+ TRANSPORT_ALWAYS_INLINE void* allocateBlock(size_t size = DEFAULT_SIZE) {
+ assert(size <= DEFAULT_SIZE);
+ uint32_t index;
+
+ void* p_block = pop();
+ if (!p_block) {
+ if (pool_index_ < max_objects_) {
+ {
+ SpinLock::Acquire locked(lock_);
+ index = pool_index_++;
+ }
+ p_block = (void*)(p_pool_ + (index * block_size_));
+ } else {
+ // TODO Consider increasing pool here instead of throwing an exception
+ throw std::runtime_error("No more memory available from packet pool!");
+ }
+ }
+
+ blocks_in_use_++;
+ allocations_++;
+
+ return p_block;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void deallocateBlock(void* pBlock) {
+ push(pBlock);
+ {
+ SpinLock::Acquire locked(lock_);
+ blocks_in_use_--;
+ deallocations_++;
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE std::size_t blockSize() { return block_size_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t blockCount() { return block_count_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t blocksInUse() { return blocks_in_use_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t allocations() { return allocations_; }
+
+ TRANSPORT_ALWAYS_INLINE uint32_t deallocations() { return deallocations_; }
+
+ private:
+ TRANSPORT_ALWAYS_INLINE void push(void* p_memory) {
+ Block* p_block = (Block*)p_memory;
+ {
+ SpinLock::Acquire locked(lock_);
+ p_block->p_next = p_head_;
+ p_head_ = p_block;
+ }
+ }
+
+ TRANSPORT_ALWAYS_INLINE void* pop() {
+ Block* p_block = nullptr;
+
+ {
+ SpinLock::Acquire locked(lock_);
+ if (p_head_) {
+ p_block = p_head_;
+ p_head_ = p_head_->p_next;
+ }
+ }
+
+ return (void*)p_block;
+ }
+
+ struct Block {
+ Block* p_next;
+ };
+
+ static std::unique_ptr<FixedBlockAllocator> instance_;
+
+ const std::size_t block_size_;
+ const std::size_t object_size_;
+ const std::size_t max_objects_;
+
+ Block* p_head_;
+ uint8_t* p_pool_;
+ uint32_t pool_index_;
+ uint32_t block_count_;
+ uint32_t blocks_in_use_;
+ uint32_t allocations_;
+ uint32_t deallocations_;
+
+ SpinLock lock_;
+};
+
+template <std::size_t A, std::size_t B>
+std::unique_ptr<FixedBlockAllocator<A, B>>
+ FixedBlockAllocator<A, B>::instance_ = nullptr;
+
+} // namespace utils \ No newline at end of file
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt
index 12ef9cfe4..5c8ab9270 100644
--- a/libtransport/src/core/CMakeLists.txt
+++ b/libtransport/src/core/CMakeLists.txt
@@ -33,7 +33,6 @@ list(APPEND HEADER_FILES
list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc
${CMAKE_CURRENT_SOURCE_DIR}/interest.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc
${CMAKE_CURRENT_SOURCE_DIR}/packet.cc
${CMAKE_CURRENT_SOURCE_DIR}/name.cc
${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc
diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc
index 6cbcdb29e..f5cccf404 100644
--- a/libtransport/src/core/content_object.cc
+++ b/libtransport/src/core/content_object.cc
@@ -86,15 +86,6 @@ ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) {
ContentObject::~ContentObject() {}
-void ContentObject::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) <
- 0) {
- throw errors::RuntimeException("Error getting name from content object.");
- }
-}
-
const Name &ContentObject::getName() const {
if (!name_) {
if (hicn_data_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/forwarder_interface.h b/libtransport/src/core/forwarder_interface.h
index 3e70e221d..3b016c4bb 100644
--- a/libtransport/src/core/forwarder_interface.h
+++ b/libtransport/src/core/forwarder_interface.h
@@ -95,15 +95,11 @@ class ForwarderInterface {
packet.setLocator(inet6_address_);
}
- // TRANSPORT_LOGI("Sending packet %s at %lu",
- // packet.getName().toString().c_str(),
- // utils::SteadyClock::now().time_since_epoch().count());
packet.setChecksum();
connector_.send(packet.acquireMemBufReference());
}
TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) {
- // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check;
counters_.tx_packets++;
counters_.tx_bytes += len;
diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc
index 166632f0a..9ee662615 100644
--- a/libtransport/src/core/interest.cc
+++ b/libtransport/src/core/interest.cc
@@ -72,15 +72,6 @@ Interest::Interest(Interest &&other_interest)
Interest::~Interest() {}
-void Interest::replace(MemBufPtr &&buffer) {
- Packet::replace(std::move(buffer));
-
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
const Name &Interest::getName() const {
if (!name_) {
if (hicn_interest_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
index 2292e9b41..49f262ec8 100644
--- a/libtransport/src/core/memif_connector.cc
+++ b/libtransport/src/core/memif_connector.cc
@@ -305,11 +305,13 @@ void MemifConnector::sendCallback(const std::error_code &ec) {
}
}
-void MemifConnector::processInputBuffer() {
+void MemifConnector::processInputBuffer(std::uint16_t total_packets) {
Packet::MemBufPtr ptr;
- while (input_buffer_.pop(ptr)) {
- receive_callback_(std::move(ptr));
+ for (; total_packets > 0; total_packets--) {
+ if (input_buffer_.pop(ptr)) {
+ receive_callback_(std::move(ptr));
+ }
}
}
@@ -339,6 +341,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
memif_connection_t *c = connector->memif_connection_.get();
int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t total_packets = 0;
uint16_t rx;
do {
@@ -353,7 +356,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
c->rx_buf_num += rx;
- if (TRANSPORT_EXPECT_TRUE(connector->io_service_.stopped())) {
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx);
goto error;
}
@@ -386,11 +389,12 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
}
c->rx_buf_num -= rx;
+ total_packets += rx;
} while (ret_val == MEMIF_ERR_NOBUF);
connector->io_service_.post(
- std::bind(&MemifConnector::processInputBuffer, connector));
+ std::bind(&MemifConnector::processInputBuffer, connector, total_packets));
return 0;
diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h
index aafef1e56..693efd14c 100644
--- a/libtransport/src/core/memif_connector.h
+++ b/libtransport/src/core/memif_connector.h
@@ -96,7 +96,7 @@ class MemifConnector : public Connector {
void sendCallback(const std::error_code &ec);
- void processInputBuffer();
+ void processInputBuffer(std::uint16_t total_packets);
private:
static utils::EpollEventReactor main_event_reactor_;
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index 67e647fca..6815868f0 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -69,14 +69,6 @@ Packet::Packet(Packet &&other)
Packet::~Packet() {}
-std::size_t Packet::getHeaderSizeFromFormat(Format format,
- size_t signature_size) {
- std::size_t header_length;
- hicn_packet_get_header_length_from_format(format, &header_length);
- int is_ah = _is_ah(format);
- return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
-}
-
std::size_t Packet::getHeaderSizeFromBuffer(Format format,
const uint8_t *buffer) {
size_t header_length;
@@ -99,17 +91,6 @@ bool Packet::isInterest(const uint8_t *buffer) {
return !is_interest;
}
-Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer) {
- Format format = HF_UNSPEC;
-
- if (TRANSPORT_EXPECT_FALSE(
- hicn_packet_get_format((const hicn_header_t *)buffer, &format) < 0)) {
- throw errors::MalformedPacketException();
- }
-
- return format;
-}
-
std::size_t Packet::getPayloadSizeFromBuffer(Format format,
const uint8_t *buffer) {
std::size_t payload_length;
@@ -122,14 +103,6 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format,
return payload_length;
}
-void Packet::replace(MemBufPtr &&buffer) {
- packet_ = std::move(buffer);
- packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData());
- header_head_ = packet_.get();
- payload_head_ = nullptr;
- format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_));
-}
-
std::size_t Packet::payloadSize() const {
return getPayloadSizeFromBuffer(format_,
reinterpret_cast<uint8_t *>(packet_start_));
@@ -270,17 +243,6 @@ uint8_t *Packet::getSignature() const {
return signature;
}
-std::size_t Packet::getSignatureSize() const {
- size_t size_bytes;
- int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes);
-
- if (ret < 0) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- return size_bytes;
-}
-
void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
int ret =
hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
@@ -366,22 +328,6 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const {
return hasher.finalize();
}
-void Packet::setChecksum() {
- uint16_t partial_csum = 0;
-
- for (utils::MemBuf *current = header_head_->next();
- current && current != header_head_; current = current->next()) {
- if (partial_csum != 0) {
- partial_csum = ~partial_csum;
- }
- partial_csum = csum(current->data(), current->length(), partial_csum);
- }
- if (hicn_packet_compute_header_checksum(format_, packet_start_,
- partial_csum) < 0) {
- throw errors::MalformedPacketException();
- }
-}
-
bool Packet::checkIntegrity() const {
if (hicn_packet_check_integrity(format_, packet_start_) < 0) {
return false;
diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h
index d9ec2ed40..aeff78ea2 100644
--- a/libtransport/src/core/pending_interest.h
+++ b/libtransport/src/core/pending_interest.h
@@ -47,17 +47,29 @@ class PendingInterest {
public:
using Ptr = utils::ObjectPool<PendingInterest>::Ptr;
- PendingInterest();
+ PendingInterest()
+ : interest_(nullptr, nullptr),
+ timer_(),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_() {}
PendingInterest(Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object,
OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer);
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(std::move(on_content_object)),
+ on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
- ~PendingInterest();
+ ~PendingInterest() = default;
template <typename Handler>
TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
@@ -66,19 +78,35 @@ class PendingInterest {
timer_->async_wait(std::forward<Handler &&>(cb));
}
- void cancelTimer();
+ TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); }
- Interest::Ptr &&getInterest();
+ TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() {
+ return std::move(interest_);
+ }
- void setInterest(Interest::Ptr &&interest);
+ TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &&interest) {
+ interest_ = std::move(interest);
+ }
- const OnContentObjectCallback &getOnDataCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback()
+ const {
+ return on_content_object_callback_;
+ }
- void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object);
+ TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback(
+ OnContentObjectCallback &&on_content_object) {
+ PendingInterest::on_content_object_callback_ = on_content_object;
+ }
- const OnInterestTimeoutCallback &getOnTimeoutCallback() const;
+ TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback &
+ getOnTimeoutCallback() const {
+ return on_interest_timeout_callback_;
+ }
- void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout);
+ TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback(
+ OnInterestTimeoutCallback &&on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
+ }
private:
Interest::Ptr interest_;
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index cf1010068..05715543a 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -24,6 +24,7 @@
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/fixed_block_allocator.h>
#include <core/forwarder_interface.h>
#include <core/pending_interest.h>
@@ -52,21 +53,20 @@ class HandlerMemory {
static constexpr std::size_t memory_size = 1024 * 1024;
public:
- HandlerMemory() : index_(0) {}
+ HandlerMemory() {}
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return &storage_[index_++ % memory_size];
+ return utils::FixedBlockAllocator<128, 4096>::getInstance()
+ ->allocateBlock();
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {}
-
- private:
- // Storage space used for handler-based custom memory allocation.
- typename std::aligned_storage<128>::type storage_[memory_size];
- uint32_t index_;
+ TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
+ utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock(
+ pointer);
+ }
#else
public:
HandlerMemory() {}
@@ -627,6 +627,8 @@ class Portal {
}
private:
+ portal_details::HandlerMemory async_callback_memory_;
+
asio::io_service &io_service_;
asio::io_service internal_io_service_;
portal_details::Pool packet_pool_;
@@ -639,8 +641,6 @@ class Portal {
ConsumerCallback *consumer_callback_;
ProducerCallback *producer_callback_;
- portal_details::HandlerMemory async_callback_memory_;
-
typename ForwarderInt::ConnectorType connector_;
ForwarderInt forwarder_interface_;
};
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index 2fc8d2b48..488f238ba 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> {
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
: consumer_interface_(consumer),
- portal_(std::make_shared<Portal>(io_service_)),
+ portal_(std::make_shared<Portal>()),
async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
@@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> {
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
on_content_object_verification_(VOID_HANDLER),
- on_content_object_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
read_callback_(nullptr),
- virtual_download_(false),
timer_interval_milliseconds_(0),
guard_raaqm_params_() {
switch (protocol) {
@@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> {
int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- result = SOCKET_OPTION_SET;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
verify_signature_ = socket_option_value;
result = SOCKET_OPTION_SET;
@@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> {
socket_option_value = transport_protocol_->isRunning();
break;
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
case GeneralTransportOptions::VERIFY_SIGNATURE:
socket_option_value = verify_signature_;
break;
@@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getIoService().dispatch([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> {
protected:
interface::ConsumerSocket *consumer_interface_;
- asio::io_service io_service_;
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
@@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> {
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerContentObjectVerificationCallback on_content_object_verification_;
- ConsumerContentObjectCallback on_content_object_;
ConsumerTimerCallback stats_summary_;
ConsumerContentObjectVerificationFailedCallback verification_failed_callback_;
ReadCallback *read_callback_;
- // Virtual download for traffic generator
- bool virtual_download_;
-
uint32_t timer_interval_milliseconds_;
// Transport protocol
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index d4a239cf6..cae0ad7c7 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -43,6 +43,8 @@ using namespace interface;
class ProducerSocket : public Socket<BasePortal>,
public BasePortal::ProducerCallback {
+ static constexpr uint32_t burst_size = 256;
+
public:
explicit ProducerSocket(interface::ProducerSocket *producer_socket)
: producer_interface_(producer_socket),
@@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>,
}
}
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
+ while (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
+
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ }
+ });
+
io_service_.dispatch([this, buffer_size]() {
if (on_content_produced_) {
on_content_produced_(*producer_interface_,
@@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>,
}
});
- TRANSPORT_LOGD("--------- END PRODUCE ------------");
return suffix_strategy->getTotalCount();
}
@@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>,
break;
}
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
if (socket_option_value == VOID_HANDLER) {
on_content_object_in_output_buffer_ = VOID_HANDLER;
@@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>,
on_new_segment_ = socket_option_value;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
on_content_object_in_output_buffer_ = socket_option_value;
break;
@@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>,
*socket_option_value = &on_new_segment_;
break;
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
*socket_option_value = &on_content_object_in_output_buffer_;
break;
@@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>,
// by the application thread
std::atomic<uint32_t> content_object_expiry_time_;
+ utils::CircularFifo<std::shared_ptr<ContentObject>, 2048>
+ object_queue_for_callbacks_;
+
// buffers
// ContentStore is thread-safe
utils::ContentStore output_buffer_;
@@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>,
portal_->runEventsLoop();
}
- void passContentObjectToCallbacks(
- const std::shared_ptr<ContentObject> &content_object) {
- if (content_object) {
- io_service_.dispatch([this, content_object]() {
- if (on_new_segment_) {
- on_new_segment_(*producer_interface_, *content_object);
- }
+ void scheduleSendBurst() {
+ io_service_.post([this]() {
+ std::shared_ptr<ContentObject> co;
- if (on_content_object_to_sign_) {
- on_content_object_to_sign_(*producer_interface_, *content_object);
- }
+ for (uint32_t i = 0; i < burst_size; i++) {
+ if (object_queue_for_callbacks_.pop(co)) {
+ if (on_new_segment_) {
+ on_new_segment_(*producer_interface_, *co);
+ }
- if (on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_(*producer_interface_,
- *content_object);
- }
- });
+ if (on_content_object_to_sign_) {
+ on_content_object_to_sign_(*producer_interface_, *co);
+ }
- output_buffer_.insert(content_object);
+ if (on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_(*producer_interface_, *co);
+ }
- io_service_.dispatch([this, content_object]() {
- if (on_content_object_output_) {
- on_content_object_output_(*producer_interface_, *content_object);
+ if (on_content_object_output_) {
+ on_content_object_output_(*producer_interface_, *co);
+ }
+ } else {
+ break;
}
- });
+ }
+ });
+ }
- portal_->sendContentObject(*content_object);
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ output_buffer_.insert(content_object);
+ portal_->sendContentObject(*content_object);
+ object_queue_for_callbacks_.push(std::move(content_object));
+
+ if (object_queue_for_callbacks_.size() >= burst_size) {
+ scheduleSendBurst();
}
}
-}; // namespace implementation
+};
} // namespace implementation
diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc
index c2996ebc1..12631637e 100644
--- a/libtransport/src/protocols/byte_stream_reassembly.cc
+++ b/libtransport/src/protocols/byte_stream_reassembly.cc
@@ -39,7 +39,7 @@ ByteStreamReassembly::ByteStreamReassembly(
void ByteStreamReassembly::reassemble(
std::unique_ptr<ContentObjectManifest> &&manifest) {
- if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
+ if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) {
received_packets_.emplace(
std::make_pair(manifest->getName().getSuffix(), nullptr));
assembleContent();
@@ -47,7 +47,8 @@ void ByteStreamReassembly::reassemble(
}
void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) &&
+ read_buffer_->capacity()) {
received_packets_.emplace(std::make_pair(
content_object->getName().getSuffix(), std::move(content_object)));
assembleContent();
@@ -80,19 +81,19 @@ void ByteStreamReassembly::assembleContent() {
}
void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
+ auto payload = content_object.getPayloadReference();
+ auto payload_length = payload.second;
auto write_size = std::min(payload_length, read_buffer_->tailroom());
auto additional_bytes = payload_length > read_buffer_->tailroom()
? payload_length - read_buffer_->tailroom()
: 0;
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ std::memcpy(read_buffer_->writableTail(), payload.first, write_size);
read_buffer_->append(write_size);
if (!read_buffer_->tailroom()) {
notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ std::memcpy(read_buffer_->writableTail(), payload.first + write_size,
additional_bytes);
read_buffer_->append(additional_bytes);
}
diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc
index aa290bef8..8463f84f9 100644
--- a/libtransport/src/protocols/protocol.cc
+++ b/libtransport/src/protocols/protocol.cc
@@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket,
index_manager_(
std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
is_running_(false),
- is_first_(false) {
+ is_first_(false),
+ on_interest_retransmission_(VOID_HANDLER),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ verification_failed_callback_(VOID_HANDLER),
+ on_payload_(VOID_HANDLER) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
@@ -46,6 +55,26 @@ int TransportProtocol::start() {
// Set it is the first time we schedule an interest
is_first_ = true;
+ // Get all callbacks references before starting
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &on_interest_retransmission_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &on_interest_output_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &on_interest_timeout_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &on_interest_satisfied_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &on_content_object_input_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
+ &on_content_object_verification_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_summary_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
+ &verification_failed_callback_);
+ socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
+ &on_payload_);
+
// Schedule next interests
scheduleNextInterests();
@@ -81,10 +110,7 @@ void TransportProtocol::resume() {
}
void TransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
+ if (!on_payload_) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
@@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) {
}
if (!ec) {
- on_payload->readSuccess(stats_->getBytesRecv());
+ on_payload_->readSuccess(stats_->getBytesRecv());
} else {
- on_payload->readError(ec);
+ on_payload_->readError(ec);
}
stop();
diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h
index 949380959..db4524133 100644
--- a/libtransport/src/protocols/protocol.h
+++ b/libtransport/src/protocols/protocol.h
@@ -17,6 +17,8 @@
#include <atomic>
+#include <hicn/transport/interfaces/callbacks.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/object_pool.h>
@@ -34,6 +36,8 @@ using namespace core;
class IndexVerificationManager;
+using ReadCallback = interface::ConsumerSocket::ReadCallback;
+
class TransportProtocolCallback {
virtual void onContentObject(const core::Interest &interest,
const core::ContentObject &content_object) = 0;
@@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback,
// True if it si the first time we schedule an interest
std::atomic<bool> is_first_;
interface::TransportStatistics *stats_;
+
+ // Callbacks
+ interface::ConsumerInterestCallback *on_interest_retransmission_;
+ interface::ConsumerInterestCallback *on_interest_output_;
+ interface::ConsumerInterestCallback *on_interest_timeout_;
+ interface::ConsumerInterestCallback *on_interest_satisfied_;
+ interface::ConsumerContentObjectCallback *on_content_object_input_;
+ interface::ConsumerContentObjectVerificationCallback
+ *on_content_object_verification_;
+ interface::ConsumerContentObjectCallback *on_content_object_;
+ interface::ConsumerTimerCallback *stats_summary_;
+ interface::ConsumerContentObjectVerificationFailedCallback
+ *verification_failed_callback_;
+ ReadCallback *on_payload_;
};
} // end namespace protocol
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 8a38f8ccf..0a93dec44 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject(
}
// Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
- ConsumerInterestCallback *callback_interest = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &callback_interest);
- if (*callback_interest) {
- (*callback_interest)(*socket_->getInterface(), *interest);
+ if (*on_interest_satisfied_) {
+ (*on_interest_satisfied_)(*socket_->getInterface(), *interest);
}
if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped(
socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
uint64_t segment = interest->getName().getSuffix();
- ConsumerInterestCallback *callback = VOID_HANDLER;
+
if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped(
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
} else {
TRANSPORT_LOGE(
@@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
return;
}
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_timeout_) {
+ (*on_interest_timeout_)(*socket_->getInterface(), *interest);
}
afterDataUnsatisfied(segment);
@@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
max_rtx)) {
stats_->updateRetxCount(1);
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_retransmission_) {
+ (*on_interest_retransmission_)(*socket_->getInterface(), *interest);
}
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (!is_running_) {
@@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
}
interest_retransmissions_[segment & mask]++;
-
interest_to_retransmit_.push(std::move(interest));
scheduleNextInterests();
@@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
}
-void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
auto interest = getPacket();
core::Name *name;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
@@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_lifetime);
interest->setLifetime(interest_lifetime);
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- callback->operator()(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ on_interest_output_->operator()(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
+ return false;
}
// This is set to ~0 so that the next interest_retransmissions_ + 1,
@@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
+
+ return true;
}
void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
@@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
rate_estimator_->onRttUpdate((double)rtt.count());
}
- cur_path_->insertNewRtt(rtt.count());
+ cur_path_->insertNewRtt(rtt.count(), now);
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
@@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
stats_->updateAverageWindowSize(current_window_size_);
// Call statistics callback
- ConsumerTimerCallback *stats_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
uint32_t timer_interval_milliseconds = 0;
socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
timer_interval_milliseconds);
if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_->getInterface(), *stats_);
- t0_ = utils::SteadyClock::now();
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
+ t0_ = now;
}
}
}
diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h
index 412967770..ecc466755 100644
--- a/libtransport/src/protocols/raaqm.h
+++ b/libtransport/src/protocols/raaqm.h
@@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol,
virtual void scheduleNextInterests() override;
- void sendInterest(std::uint64_t next_suffix);
+ bool sendInterest(std::uint64_t next_suffix);
void sendInterest(Interest::Ptr &&interest);
diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc
index 439549c85..8bbbadcf2 100644
--- a/libtransport/src/protocols/raaqm_data_path.cc
+++ b/libtransport/src/protocols/raaqm_data_path.cc
@@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
average_rtt_(0),
alpha_(ALPHA) {}
-RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt,
+ const utils::TimePoint &now) {
rtt_ = new_rtt;
rtt_samples_.pushBack(new_rtt);
@@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
prop_delay_ = rtt_min_;
}
- last_received_pkt_ = utils::SteadyClock::now();
+ last_received_pkt_ = now;
return *this;
}
diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h
index 6f2afde72..3f037bc76 100644
--- a/libtransport/src/protocols/raaqm_data_path.h
+++ b/libtransport/src/protocols/raaqm_data_path.h
@@ -45,7 +45,7 @@ class RaaqmDataPath {
* max of RTT.
* @param new_rtt is the value of the new RTT
*/
- RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now);
/**
* @brief Update the path statistics
diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc
index 0ac3839dd..72abb599a 100644
--- a/libtransport/src/protocols/rtc.cc
+++ b/libtransport/src/protocols/rtc.cc
@@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
uint32_t BW = (uint32_t)ceil(estimatedBw_);
computeMaxWindow(BW, BDP);
- ConsumerTimerCallback *stats_callback = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
+ if (*stats_summary_) {
// Send the stats to the app
stats_->updateQueuingDelay(queuingDelay_);
stats_->updateLossRatio(lossRate_);
stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_->getInterface(), *stats_);
+ (*stats_summary_)(*socket_->getInterface(), *stats_);
}
// bound also by interest lifitime* production rate
if (!gotNack_) {
@@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
interestLifetime);
interest->setLifetime(uint32_t(interestLifetime));
- ConsumerInterestCallback *on_interest_output = nullptr;
-
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output);
-
- if (*on_interest_output) {
- (*on_interest_output)(*socket_->getInterface(), *interest);
+ if (*on_interest_output_) {
+ (*on_interest_output_)(*socket_->getInterface(), *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
@@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject(
uint32_t segmentNumber = content_object->getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- ConsumerContentObjectCallback *callback_content_object = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_->getInterface(), *content_object);
+ if (*on_content_object_input_) {
+ (*on_content_object_input_)(*socket_->getInterface(), *content_object);
}
if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
diff --git a/libtransport/src/security/signer.cc b/libtransport/src/security/signer.cc
index 314c3ea82..8a56cfa3d 100644
--- a/libtransport/src/security/signer.cc
+++ b/libtransport/src/security/signer.cc
@@ -162,12 +162,10 @@ void Signer::sign(Packet &packet) {
}
CryptoHash hash = hasher.finalize();
-
signature_ = parcSigner_SignDigestNoAlloc(this->signer_, hash.hash_,
packet.getSignature(),
(uint32_t)signature_length_);
PARCBuffer *buffer = parcSignature_GetSignature(signature_);
-
size_t bytes_len = parcBuffer_Remaining(buffer);
if (bytes_len > signature_length_) {
@@ -176,6 +174,8 @@ void Signer::sign(Packet &packet) {
hicn_packet_copy_header(format, &header_copy,
(hicn_header_t *)packet.packet_start_, false);
+
+ parcSignature_Release(&signature_);
}
size_t Signer::getSignatureLength() { return signature_length_; }
diff --git a/libtransport/src/security/verifier.cc b/libtransport/src/security/verifier.cc
index 19796f718..0cfbdc6f9 100644
--- a/libtransport/src/security/verifier.cc
+++ b/libtransport/src/security/verifier.cc
@@ -116,17 +116,10 @@ PARCKeyId *Verifier::addKeyFromCertificate(const std::string &file_name) {
}
int Verifier::verify(const Packet &packet) {
- // to initialize packet.payload_head_
+ // Initialize packet.payload_head_
const_cast<Packet *>(&packet)->separateHeaderPayload();
- bool valid = false;
-
- // initialize packet.payload_head_
- const_cast<Packet *>(&packet)->separateHeaderPayload();
- // header chain points to the IP + TCP hicn header
- // utils::MemBuf *header_chain = packet.header_head_;
- // utils::MemBuf *payload_chain = packet.payload_head_;
- // uint8_t *hicn_packet = header_chain->writableData();
Packet::Format format = packet.getFormat();
+ bool valid = false;
if (!(packet.format_ & HFO_AH)) {
throw errors::MalformedAHPacketException();
@@ -149,11 +142,12 @@ int Verifier::verify(const Packet &packet) {
int ah_payload_len = (int)packet.getSignatureSize();
uint8_t *_signature = packet.getSignature();
uint8_t *signature = new uint8_t[ah_payload_len];
+ std::shared_ptr<CryptoHasher> hasher;
+
// TODO Remove signature copy at this point, by not setting to zero
// the validation payload.
std::memcpy(signature, _signature, ah_payload_len);
- std::shared_ptr<CryptoHasher> hasher;
switch (CryptoSuite(suite)) {
case CryptoSuite::DSA_SHA256:
case CryptoSuite::RSA_SHA256:
@@ -178,7 +172,7 @@ int Verifier::verify(const Packet &packet) {
parcBuffer_Wrap(signature, ah_payload_len, 0, ah_payload_len);
parcBuffer_Rewind(bits);
- /* IF the signature algo is ECDSA, the signature might be shorter than the
+ /* If the signature algo is ECDSA, the signature might be shorter than the
* signature field */
PARCSigningAlgorithm algo = parcCryptoSuite_GetSigningAlgorithm(suite);
while (algo == PARCSigningAlgorithm_ECDSA && parcBuffer_HasRemaining(bits) &&
diff --git a/libtransport/src/utils/membuf.cc b/libtransport/src/utils/membuf.cc
index e75e85b35..94e5b13a1 100644
--- a/libtransport/src/utils/membuf.cc
+++ b/libtransport/src/utils/membuf.cc
@@ -102,8 +102,6 @@ struct MemBuf::HeapStorage {
};
struct MemBuf::HeapFullStorage {
- // Make sure jemalloc allocates from the 64-byte class. Putting this here
- // because HeapStorage is private so it can't be at namespace level.
static_assert(sizeof(HeapStorage) <= 64,
"MemBuf may not grow over 56 bytes!");
diff --git a/telemetry/vpp-collectd/CMakeLists.txt b/telemetry/vpp-collectd/CMakeLists.txt
index 18926b1c5..ef09fb980 100644
--- a/telemetry/vpp-collectd/CMakeLists.txt
+++ b/telemetry/vpp-collectd/CMakeLists.txt
@@ -16,11 +16,10 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
set (COLLECTD_PLUGINS hicn-collectd-plugins)
project(hicn-collectd-plugins)
-set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" "${CMAKE_CURRENT_SOURCE_DIR}/../../../cmake/Modules/")
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" "${CMAKE_CURRENT_SOURCE_DIR}/../../cmake/Modules/")
include(BuildMacros)
-
add_subdirectory(vpp)
add_subdirectory(vpp-hicn)
diff --git a/telemetry/vpp-collectd/custom_types.db b/telemetry/vpp-collectd/custom_types.db
new file mode 100644
index 000000000..98cd801f4
--- /dev/null
+++ b/telemetry/vpp-collectd/custom_types.db
@@ -0,0 +1,39 @@
+# vpp
+if_drops packets:DERIVE:0:U
+if_punt packets:DERIVE:0:U
+if_ip4 packets:DERIVE:0:U
+if_ip6 packets:DERIVE:0:U
+if_rx_no_buf packets:DERIVE:0:U
+if_rx_miss packets:DERIVE:0:U
+if_rx_error packets:DERIVE:0:U
+if_tx_error packets:DERIVE:0:U
+if_mpls packets:DERIVE:0:U
+if_rx packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_rx_unicast packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_rx_multicast packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_rx_broadcast packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_tx packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_tx_unicast packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_tx_multicast packets:DERIVE:0:U, bytes:DERIVE:0:U
+if_tx_broadcast packets:DERIVE:0:U, bytes:DERIVE:0:U
+
+# vpp-hicn
+pkts_processed packets:GAUGE:0:U
+pkts_interest_count packets:GAUGE:0:U
+pkts_data_count packets:GAUGE:0:U
+pkts_from_cache_count packets:GAUGE:0:U
+pkts_no_pit_count packets:GAUGE:0:U
+pit_expired_count interests:GAUGE:0:U
+cs_expired_count data:GAUGE:0:U
+cs_lru_count data:GAUGE:0:U
+pkts_drop_no_buf packets:GAUGE:0:U
+interests_aggregated interests:GAUGE:0:U
+interests_retx interests:GAUGE:0:U
+interests_hash_collision interests:GAUGE:0:U
+pit_entries_count interests:GAUGE:0:U
+cs_entries_count data:GAUGE:0:U
+cs_entries_ntw_count data:GAUGE:0:U
+irx packets:DERIVE:0:U, bytes:DERIVE:0:U
+itx packets:DERIVE:0:U, bytes:DERIVE:0:U
+drx packets:DERIVE:0:U, bytes:DERIVE:0:U
+dtx packets:DERIVE:0:U, bytes:DERIVE:0:U
diff --git a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c
index b90646073..591b8f584 100644
--- a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c
+++ b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c
@@ -28,6 +28,7 @@
#endif /* DISABLE_ISOC99 */
#endif /* ! HAVE_CONFIG */
+/* Keep order as it is */
#include <config.h>
#include <collectd.h>
#include <common.h>
@@ -41,6 +42,15 @@
DEFINE_VAPI_MSG_IDS_HICN_API_JSON
vapi_ctx_t vapi_ctx;
+/************** OPTIONS ***********************************/
+static const char *config_keys[2] = {
+ "Verbose",
+ "Tag",
+};
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+static bool verbose = false;
+static char *tag = NULL;
+
/************** DATA SOURCES ******************************/
static data_source_t packets_dsrc[1] = {
{"packets", DS_TYPE_GAUGE, 0, NAN},
@@ -184,8 +194,7 @@ static data_set_t dtx_ds = {
* value_list_t and pass it to plugin_dispatch_values.
*/
static int submit(const char *plugin_instance, const char *type,
- const char *type_instance, value_t *values, size_t values_len,
- cdtime_t *timestamp) {
+ value_t *values, size_t values_len, cdtime_t *timestamp) {
value_list_t vl = VALUE_LIST_INIT;
vl.values = values;
vl.values_len = values_len;
@@ -198,8 +207,8 @@ static int submit(const char *plugin_instance, const char *type,
sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
sstrncpy(vl.type, type, sizeof(vl.type));
- if (type_instance != NULL)
- sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
+ if (tag != NULL)
+ sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance));
return plugin_dispatch_values(&vl);
}
@@ -209,6 +218,28 @@ static int submit(const char *plugin_instance, const char *type,
/**********************************************************/
/*
+ * This function is called for each configuration item.
+ */
+static int vpp_hicn_config(const char *key, const char *value) {
+ if (strcasecmp(key, "Verbose") == 0) {
+ verbose = IS_TRUE(value);
+ } else if (strcasecmp(key, "Tag") == 0) {
+ if (tag != NULL) {
+ free(tag);
+ tag = NULL;
+ }
+
+ if (strcasecmp(value, "None")) {
+ tag = strdup(value);
+ }
+ } else {
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
* Callback called by the hICN plugin API when node stats are ready.
*/
static vapi_error_e
@@ -226,36 +257,38 @@ parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv,
cdtime_t timestamp = cdtime();
values[0] = (value_t){.gauge = reply->pkts_processed};
- submit(node_name, pkts_processed_ds.type, NULL, values, 1, &timestamp);
+ submit(node_name, pkts_processed_ds.type, values, 1, &timestamp);
values[0] = (value_t){.gauge = reply->pkts_interest_count};
- submit(node_name, pkts_interest_count_ds.type, NULL, values, 1, &timestamp);
+ submit(node_name, pkts_interest_count_ds.type, values, 1, &timestamp);
values[0] = (value_t){.gauge = reply->pkts_data_count};
- submit(node_name, pkts_data_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->pkts_from_cache_count};
- submit(node_name, pkts_from_cache_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->pkts_no_pit_count};
- submit(node_name, pkts_no_pit_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->pit_expired_count};
- submit(node_name, pit_expired_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->cs_expired_count};
- submit(node_name, cs_expired_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->cs_lru_count};
- submit(node_name, cs_lru_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->pkts_drop_no_buf};
- submit(node_name, pkts_drop_no_buf_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->interests_aggregated};
- submit(node_name, interests_aggregated_ds.type, NULL, values, 1, &timestamp);
+ submit(node_name, pkts_data_count_ds.type, values, 1, &timestamp);
values[0] = (value_t){.gauge = reply->interests_retx};
- submit(node_name, interests_retx_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->interests_hash_collision};
- submit(node_name, interests_hash_collision_ds.type, NULL, values, 1,
- &timestamp);
+ submit(node_name, interests_retx_ds.type, values, 1, &timestamp);
values[0] = (value_t){.gauge = reply->pit_entries_count};
- submit(node_name, pit_entries_count_ds.type, NULL, values, 1, &timestamp);
+ submit(node_name, pit_entries_count_ds.type, values, 1, &timestamp);
values[0] = (value_t){.gauge = reply->cs_entries_count};
- submit(node_name, cs_entries_count_ds.type, NULL, values, 1, &timestamp);
- values[0] = (value_t){.gauge = reply->cs_entries_ntw_count};
- submit(node_name, cs_entries_ntw_count_ds.type, NULL, values, 1, &timestamp);
+ submit(node_name, cs_entries_count_ds.type, values, 1, &timestamp);
+
+ if (verbose) {
+ values[0] = (value_t){.gauge = reply->pkts_from_cache_count};
+ submit(node_name, pkts_from_cache_count_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->interests_aggregated};
+ submit(node_name, interests_aggregated_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->cs_expired_count};
+ submit(node_name, cs_expired_count_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->cs_lru_count};
+ submit(node_name, cs_lru_count_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->pit_expired_count};
+ submit(node_name, pit_expired_count_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->pkts_no_pit_count};
+ submit(node_name, pkts_no_pit_count_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->pkts_drop_no_buf};
+ submit(node_name, pkts_drop_no_buf_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->interests_hash_collision};
+ submit(node_name, interests_hash_collision_ds.type, values, 1, &timestamp);
+ values[0] = (value_t){.gauge = reply->cs_entries_ntw_count};
+ submit(node_name, cs_entries_ntw_count_ds.type, values, 1, &timestamp);
+ }
return VAPI_OK;
}
@@ -280,16 +313,16 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv,
values[0] = (value_t){.derive = reply->irx_packets};
values[1] = (value_t){.derive = reply->irx_bytes};
- submit(face_name, irx_ds.type, NULL, values, 2, &timestamp);
+ submit(face_name, irx_ds.type, values, 2, &timestamp);
values[0] = (value_t){.derive = reply->itx_packets};
values[1] = (value_t){.derive = reply->itx_bytes};
- submit(face_name, itx_ds.type, NULL, values, 2, &timestamp);
+ submit(face_name, itx_ds.type, values, 2, &timestamp);
values[0] = (value_t){.derive = reply->drx_packets};
values[1] = (value_t){.derive = reply->drx_bytes};
- submit(face_name, drx_ds.type, NULL, values, 2, &timestamp);
+ submit(face_name, drx_ds.type, values, 2, &timestamp);
values[0] = (value_t){.derive = reply->dtx_packets};
values[1] = (value_t){.derive = reply->dtx_bytes};
- submit(face_name, dtx_ds.type, NULL, values, 2, &timestamp);
+ submit(face_name, dtx_ds.type, values, 2, &timestamp);
return VAPI_OK;
}
@@ -297,7 +330,7 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv,
/*
* This function is called once upon startup to initialize the plugin.
*/
-static int my_init(void) {
+static int vpp_hicn_init(void) {
int ret = vapi_connect_safe(&vapi_ctx, 0);
if (ret)
@@ -309,7 +342,7 @@ static int my_init(void) {
/*
* This function is called in regular intervalls to collect the data.
*/
-static int my_read(void) {
+static int vpp_hicn_read(void) {
int err = VAPI_OK;
vapi_lock();
@@ -365,7 +398,7 @@ END:
/*
* This function is called when plugin_log () has been used.
*/
-static void my_log(int severity, const char *msg, user_data_t *ud) {
+static void vpp_hicn_log(int severity, const char *msg, user_data_t *ud) {
printf("[LOG %i] %s\n", severity, msg);
return;
}
@@ -373,11 +406,18 @@ static void my_log(int severity, const char *msg, user_data_t *ud) {
/*
* This function is called before shutting down collectd.
*/
-static int my_shutdown(void) {
+static int vpp_hicn_shutdown(void) {
plugin_log(LOG_INFO, "vpp_hicn plugin: shutting down");
+
int ret = vapi_disconnect_safe();
plugin_log(LOG_INFO, "vpp_hicn plugin: disconnect vapi %s",
ret == 0 ? "ok" : "error");
+
+ if (tag != NULL) {
+ free(tag);
+ tag = NULL;
+ }
+
return ret;
}
@@ -408,9 +448,11 @@ void module_register(void) {
plugin_register_data_set(&cs_entries_count_ds);
plugin_register_data_set(&cs_entries_ntw_count_ds);
// callbacks
- plugin_register_log("vpp_hicn", my_log, /* user data */ NULL);
- plugin_register_init("vpp_hicn", my_init);
- plugin_register_read("vpp_hicn", my_read);
- plugin_register_shutdown("vpp_hicn", my_shutdown);
+ plugin_register_log("vpp_hicn", vpp_hicn_log, /* user data */ NULL);
+ plugin_register_config("vpp_hicn", vpp_hicn_config, config_keys,
+ config_keys_num);
+ plugin_register_init("vpp_hicn", vpp_hicn_init);
+ plugin_register_read("vpp_hicn", vpp_hicn_read);
+ plugin_register_shutdown("vpp_hicn", vpp_hicn_shutdown);
return;
}
diff --git a/telemetry/vpp-collectd/vpp/vpp.c b/telemetry/vpp-collectd/vpp/vpp.c
index 72c052212..0fec85dc8 100644
--- a/telemetry/vpp-collectd/vpp/vpp.c
+++ b/telemetry/vpp-collectd/vpp/vpp.c
@@ -28,6 +28,7 @@
#endif /* DISABLE_ISOC99 */
#endif /* ! HAVE_CONFIG */
+/* Keep order as it is */
#include <config.h>
#include <collectd.h>
#include <common.h>
@@ -38,6 +39,15 @@
#include <vppinfra/vec.h>
#undef counter_t
+/************** OPTIONS ***********************************/
+static const char *config_keys[2] = {
+ "Verbose",
+ "Tag",
+};
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+static bool verbose = false;
+static char *tag = NULL;
+
/************** DATA SOURCES ******************************/
static data_source_t combined_dsrc[2] = {
{"packets", DS_TYPE_DERIVE, 0, NAN},
@@ -160,8 +170,7 @@ static data_set_t if_tx_broadcast_ds = {
* value_list_t and pass it to plugin_dispatch_values.
*/
static int submit(const char *plugin_instance, const char *type,
- const char *type_instance, value_t *values, size_t values_len,
- cdtime_t *timestamp) {
+ value_t *values, size_t values_len, cdtime_t *timestamp) {
value_list_t vl = VALUE_LIST_INIT;
vl.values = values;
vl.values_len = values_len;
@@ -174,8 +183,8 @@ static int submit(const char *plugin_instance, const char *type,
sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
sstrncpy(vl.type, type, sizeof(vl.type));
- if (type_instance != NULL)
- sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
+ if (tag != NULL)
+ sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance));
return plugin_dispatch_values(&vl);
}
@@ -188,44 +197,50 @@ static int get_data_set(const char *stat_name, data_set_t *data_set_ptr) {
return 1;
}
- if (strcmp(stat_name, "/if/drops") == 0) {
- *data_set_ptr = if_drops_ds;
- } else if (strcmp(stat_name, "/if/punt") == 0) {
- *data_set_ptr = if_punt_ds;
+ if (strcmp(stat_name, "/if/rx") == 0) {
+ *data_set_ptr = if_rx_ds;
+ } else if (strcmp(stat_name, "/if/tx") == 0) {
+ *data_set_ptr = if_tx_ds;
} else if (strcmp(stat_name, "/if/ip4") == 0) {
*data_set_ptr = if_ip4_ds;
} else if (strcmp(stat_name, "/if/ip6") == 0) {
*data_set_ptr = if_ip6_ds;
- } else if (strcmp(stat_name, "/if/rx-no-buf") == 0) {
- *data_set_ptr = if_rx_no_buf_ds;
- } else if (strcmp(stat_name, "/if/rx-miss") == 0) {
- *data_set_ptr = if_rx_miss_ds;
- } else if (strcmp(stat_name, "/if/rx-error") == 0) {
- *data_set_ptr = if_rx_error_ds;
- } else if (strcmp(stat_name, "/if/tx-error") == 0) {
- *data_set_ptr = if_tx_error_ds;
- } else if (strcmp(stat_name, "/if/mpls") == 0) {
- *data_set_ptr = if_mpls_ds;
- } else if (strcmp(stat_name, "/if/rx") == 0) {
- *data_set_ptr = if_rx_ds;
- } else if (strcmp(stat_name, "/if/rx-unicast") == 0) {
- *data_set_ptr = if_rx_unicast_ds;
- } else if (strcmp(stat_name, "/if/rx-multicast") == 0) {
- *data_set_ptr = if_rx_multicast_ds;
- } else if (strcmp(stat_name, "/if/rx-broadcast") == 0) {
- *data_set_ptr = if_rx_broadcast_ds;
- } else if (strcmp(stat_name, "/if/tx") == 0) {
- *data_set_ptr = if_tx_ds;
- } else if (strcmp(stat_name, "/if/tx-unicast") == 0) {
- *data_set_ptr = if_tx_unicast_ds;
- } else if (strcmp(stat_name, "/if/tx-multicast") == 0) {
- *data_set_ptr = if_tx_multicast_ds;
- } else if (strcmp(stat_name, "/if/tx-broadcast") == 0) {
- *data_set_ptr = if_tx_broadcast_ds;
- } else {
+ } else if (strcmp(stat_name, "/if/drops") == 0) {
+ *data_set_ptr = if_drops_ds;
+ } else if (!verbose) {
return 1;
}
+ if (verbose) {
+ if (strcmp(stat_name, "/if/punt") == 0) {
+ *data_set_ptr = if_punt_ds;
+ } else if (strcmp(stat_name, "/if/mpls") == 0) {
+ *data_set_ptr = if_mpls_ds;
+ } else if (strcmp(stat_name, "/if/rx-no-buf") == 0) {
+ *data_set_ptr = if_rx_no_buf_ds;
+ } else if (strcmp(stat_name, "/if/rx-miss") == 0) {
+ *data_set_ptr = if_rx_miss_ds;
+ } else if (strcmp(stat_name, "/if/rx-error") == 0) {
+ *data_set_ptr = if_rx_error_ds;
+ } else if (strcmp(stat_name, "/if/rx-unicast") == 0) {
+ *data_set_ptr = if_rx_unicast_ds;
+ } else if (strcmp(stat_name, "/if/rx-multicast") == 0) {
+ *data_set_ptr = if_rx_multicast_ds;
+ } else if (strcmp(stat_name, "/if/rx-broadcast") == 0) {
+ *data_set_ptr = if_rx_broadcast_ds;
+ } else if (strcmp(stat_name, "/if/tx-error") == 0) {
+ *data_set_ptr = if_tx_error_ds;
+ } else if (strcmp(stat_name, "/if/tx-unicast") == 0) {
+ *data_set_ptr = if_tx_unicast_ds;
+ } else if (strcmp(stat_name, "/if/tx-multicast") == 0) {
+ *data_set_ptr = if_tx_multicast_ds;
+ } else if (strcmp(stat_name, "/if/tx-broadcast") == 0) {
+ *data_set_ptr = if_tx_broadcast_ds;
+ } else {
+ return 1;
+ }
+ }
+
return 0;
}
@@ -234,9 +249,31 @@ static int get_data_set(const char *stat_name, data_set_t *data_set_ptr) {
/**********************************************************/
/*
+ * This function is called for each configuration item.
+ */
+static int vpp_config(const char *key, const char *value) {
+ if (strcasecmp(key, "Verbose") == 0) {
+ verbose = IS_TRUE(value);
+ } else if (strcasecmp(key, "Tag") == 0) {
+ if (tag != NULL) {
+ free(tag);
+ tag = NULL;
+ }
+
+ if (strcasecmp(value, "None")) {
+ tag = strdup(value);
+ }
+ } else {
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
* This function is called once upon startup to initialize the plugin.
*/
-static int my_init(void) {
+static int vpp_init(void) {
u8 *stat_segment_name = (u8 *)STAT_SEGMENT_SOCKET_FILE;
int ret = stat_segment_connect((char *)stat_segment_name);
@@ -249,7 +286,7 @@ static int my_init(void) {
/*
* This function is called in regular intervalls to collect the data.
*/
-static int my_read(void) {
+static int vpp_read(void) {
uint8_t **patterns = {0};
char **interfaces = {0};
@@ -258,7 +295,6 @@ static int my_read(void) {
uint32_t *dir = stat_segment_ls(patterns);
stat_segment_data_t *res = stat_segment_dump(dir);
- plugin_log(LOG_INFO, "vpp plugin: performed ls and dump");
/* Read all available interfaces */
for (int k = 0; k < vec_len(res); k++) {
@@ -291,8 +327,7 @@ static int my_read(void) {
continue;
}
- err =
- submit(interfaces[j], data_set.type, NULL, values, 1, &timestamp);
+ err = submit(interfaces[j], data_set.type, values, 1, &timestamp);
if (err)
goto END;
@@ -314,8 +349,7 @@ static int my_read(void) {
continue;
}
- err =
- submit(interfaces[j], data_set.type, NULL, values, 2, &timestamp);
+ err = submit(interfaces[j], data_set.type, values, 2, &timestamp);
if (err)
goto END;
@@ -354,7 +388,7 @@ END:
/*
* This function is called when plugin_log () has been used.
*/
-static void my_log(int severity, const char *msg, user_data_t *ud) {
+static void vpp_log(int severity, const char *msg, user_data_t *ud) {
printf("[LOG %i] %s\n", severity, msg);
return;
}
@@ -362,9 +396,16 @@ static void my_log(int severity, const char *msg, user_data_t *ud) {
/*
* This function is called before shutting down collectd.
*/
-static int my_shutdown(void) {
+static int vpp_shutdown(void) {
plugin_log(LOG_INFO, "vpp plugin: shutting down");
+
+ if (tag != NULL) {
+ free(tag);
+ tag = NULL;
+ }
+
stat_segment_disconnect();
+
return 0;
}
@@ -390,9 +431,10 @@ void module_register(void) {
plugin_register_data_set(&if_tx_unicast_ds);
plugin_register_data_set(&if_tx_multicast_ds);
plugin_register_data_set(&if_tx_broadcast_ds);
- plugin_register_log("vpp", my_log, /* user data */ NULL);
- plugin_register_init("vpp", my_init);
- plugin_register_read("vpp", my_read);
- plugin_register_shutdown("vpp", my_shutdown);
+ plugin_register_log("vpp", vpp_log, /* user data */ NULL);
+ plugin_register_config("vpp", vpp_config, config_keys, config_keys_num);
+ plugin_register_init("vpp", vpp_init);
+ plugin_register_read("vpp", vpp_read);
+ plugin_register_shutdown("vpp", vpp_shutdown);
return;
}
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index e037e1309..eabd12c86 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -69,10 +69,10 @@ struct ClientConfiguration {
beta(-1.f),
drop_factor(-1.f),
window(-1),
- virtual_download(true),
producer_certificate(""),
passphrase(""),
receive_buffer(nullptr),
+ receive_buffer_size_(128 * 1024),
download_size(0),
report_interval_milliseconds_(1000),
transport_protocol_(CBR),
@@ -90,10 +90,10 @@ struct ClientConfiguration {
double beta;
double drop_factor;
double window;
- bool virtual_download;
std::string producer_certificate;
std::string passphrase;
std::shared_ptr<utils::MemBuf> receive_buffer;
+ std::size_t receive_buffer_size_;
std::size_t download_size;
std::uint32_t report_interval_milliseconds_;
TransportProtocolAlgorithms transport_protocol_;
@@ -423,12 +423,6 @@ class HIperfClient {
}
}
- if (consumer_socket_->setSocketOption(OtherOptions::VIRTUAL_DOWNLOAD,
- configuration_.virtual_download) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
if (configuration_.verify) {
std::shared_ptr<utils::Verifier> verifier =
std::make_shared<utils::Verifier>();
@@ -570,22 +564,29 @@ class HIperfClient {
};
class Callback : public ConsumerSocket::ReadCallback {
- static constexpr std::size_t read_size = 16 * 1024;
-
public:
- Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {}
+ Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {
+ client_.configuration_.receive_buffer =
+ utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
+ }
- bool isBufferMovable() noexcept override { return true; }
+ bool isBufferMovable() noexcept override { return false; }
void getReadBuffer(uint8_t **application_buffer,
- size_t *max_length) override {}
+ size_t *max_length) override {
+ *application_buffer =
+ client_.configuration_.receive_buffer->writableData();
+ *max_length = client_.configuration_.receive_buffer_size_;
+ }
void readDataAvailable(std::size_t length) noexcept override {}
void readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {}
- size_t maxBufferSize() const override { return read_size; }
+ size_t maxBufferSize() const override {
+ return client_.configuration_.receive_buffer_size_;
+ }
void readError(const std::error_code ec) noexcept override {
std::cerr << "Error " << ec.message() << " while reading from socket"
@@ -742,6 +743,7 @@ class HIperfServer {
}
void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ // std::cout << "Received interest " << interest.getName() << std::endl;
content_objects_[content_objects_index_ & mask_]->setName(
interest.getName());
producer_socket_->produce(
@@ -1149,8 +1151,10 @@ void usage() {
<< std::endl;
std::cerr << "-L\t<interest lifetime>\t\t"
<< "Set interest lifetime." << std::endl;
- std::cerr << "-M\t<Download for real>\t\t"
- << "Store the content downloaded." << std::endl;
+ std::cerr << "-M\t<input_buffer_size>\t\t"
+ << "Size of consumer input buffer. If 0, reassembly of packets "
+ "will be disabled."
+ << std::endl;
std::cerr << "-W\t<window_size>\t\t\t"
<< "Use a fixed congestion window "
"for retrieving the data."
@@ -1204,7 +1208,7 @@ int main(int argc, char *argv[]) {
int opt;
#ifndef _WIN32
while ((opt = getopt(argc, argv,
- "DSCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
+ "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
-1) {
switch (opt) {
// Common
@@ -1218,7 +1222,7 @@ int main(int argc, char *argv[]) {
}
#else
while ((opt = getopt(argc, argv,
- "SCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
+ "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
switch (opt) {
#endif
case 'f': {
@@ -1265,7 +1269,7 @@ int main(int argc, char *argv[]) {
break;
}
case 'M': {
- client_configuration.virtual_download = false;
+ client_configuration.receive_buffer_size_ = std::stoull(optarg);
options = 1;
break;
}