diff options
38 files changed, 713 insertions, 388 deletions
diff --git a/docs/source/telemetry.md b/docs/source/telemetry.md index 84800c8b2..8887b8f1f 100644 --- a/docs/source/telemetry.md +++ b/docs/source/telemetry.md @@ -61,19 +61,31 @@ LoadPlugin vpp_hicn Before running collectd, a vpp forwarder must be started. If the vpp-hicn plugin is used, the hicn-plugin must be available in the vpp forwarder. +If you need the custom types that the two plugins define, they are present in +`telemetry/custom_types.db`. It is useful if you are using InfluxDB as it requires +the type database for multi-value metrics +(see [CollectD protocol support in InfluxDB](https://docs.influxdata.com/influxdb/v1.7/supported_protocols/collectd/)). + +## Plugin options +`vpp` and `vpp-hicn` have the same two options: +- `Verbose` enables additional statistics. You can check the sources to have an exact list of available metrics. +- `Tag` tags the data with the given string. Useful for identifying the context in which the data was retrieved in InfluxDB for instance. If the tag value is `None`, no tag is applied. + ### Example: storing statistics from vpp and vpp-hicn We'll use the rrdtool and csv plugins to store statistics from vpp and vpp-hicn. +Copy the configuration below in a file called `collectd.conf` and move +it to `/etc/collectd`: -Edit the configuration file as the following: - -```html +``` ###################################################################### # Global # ###################################################################### FQDNLookup true BaseDir "/var/lib/collectd" Interval 1 +# if you are using custom_types.db, you can specify it +TypesDB "/usr/share/collectd/types.db" "/etc/collectd/custom_types.db" ###################################################################### # Logging # @@ -106,6 +118,16 @@ LoadPlugin vpp_hicn <Plugin rrdtool> DataDir "/var/lib/collectd/rrd" # the folder where statistics are stored in rrd </Plugin> + +<Plugin vpp> + Verbose true + Tag "None" +</Plugin> + +<Plugin vpp_hicn> + Verbose true + Tag "None" +</Plugin> ``` Run vpp and collectd: diff --git a/hicn-light/src/hicn/core/forwarder.c b/hicn-light/src/hicn/core/forwarder.c index 0d7575ea0..f7b0af2c2 100644 --- a/hicn-light/src/hicn/core/forwarder.c +++ b/hicn-light/src/hicn/core/forwarder.c @@ -103,9 +103,10 @@ struct forwarder { PARCClock *clock; +#if !defined(__APPLE__) hicn_socket_helper_t *hicnSocketHelper; // state required to manage hicn connections - +#endif // used by seed48 and nrand48 unsigned short seed[3]; @@ -523,9 +524,11 @@ PARCClock *forwarder_GetClock(const Forwarder *forwarder) { return forwarder->clock; } +#if !defined(__APPLE__) hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder) { return forwarder->hicnSocketHelper; } +#endif // ======================================================= diff --git a/hicn-light/src/hicn/core/forwarder.h b/hicn-light/src/hicn/core/forwarder.h index a2401d625..d1815b7d4 100644 --- a/hicn-light/src/hicn/core/forwarder.h +++ b/hicn-light/src/hicn/core/forwarder.h @@ -46,7 +46,9 @@ #include <parc/algol/parc_Clock.h> +#if !defined(__APPLE__) #include <hicn/socket/api.h> +#endif #define PORT_NUMBER 9695 #define PORT_NUMBER_AS_STRING "9695" @@ -265,9 +267,9 @@ void forwarder_ClearCache(Forwarder *forwarder); void forwarder_SetStrategy(Forwarder *forwarder, Name *prefix, strategy_type strategy, unsigned related_prefixes_len, Name **related_prefixes); - +#if !defined(__APPLE__) hicn_socket_helper_t *forwarder_GetHicnSocketHelper(Forwarder *forwarder); - +#endif #ifdef WITH_MAPME /** diff --git a/hicn-light/src/hicn/socket/CMakeLists.txt b/hicn-light/src/hicn/socket/CMakeLists.txt index ce2a9caf4..775693bf0 100644 --- a/hicn-light/src/hicn/socket/CMakeLists.txt +++ b/hicn-light/src/hicn/socket/CMakeLists.txt @@ -34,4 +34,4 @@ set(TO_INSTALL_HEADER_FILES ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) -set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/hicn-plugin/src/faces/app/face_prod.c b/hicn-plugin/src/faces/app/face_prod.c index c183d6589..ae59719ce 100644 --- a/hicn-plugin/src/faces/app/face_prod.c +++ b/hicn-plugin/src/faces/app/face_prod.c @@ -271,6 +271,24 @@ hicn_face_prod_add (fib_prefix_t * prefix, u32 sw_if, u32 * cs_reserved, if (ret == HICN_ERROR_NONE && hicn_face_prod_set_lru_max (*faceid, cs_reserved) == HICN_ERROR_NONE) { + if (ip46_address_is_ip4(&(prefix->fp_addr))) + { + ip4_address_t mask; + ip4_preflen_to_mask (prefix->fp_len, &mask); + prefix->fp_addr.ip4.as_u32 = prefix->fp_addr.ip4.as_u32 & mask.as_u32; + prefix->fp_proto = FIB_PROTOCOL_IP4; + } + else + { + ip6_address_t mask; + ip6_preflen_to_mask (prefix->fp_len, &mask); + prefix->fp_addr.ip6.as_u64[0] = + prefix->fp_addr.ip6.as_u64[0] & mask.as_u64[0]; + prefix->fp_addr.ip6.as_u64[1] = + prefix->fp_addr.ip6.as_u64[1] & mask.as_u64[1]; + prefix->fp_proto = FIB_PROTOCOL_IP6; + } + hicn_app_state_create (sw_if, prefix); ret = hicn_route_add (faceid, 1, prefix); } diff --git a/libtransport/includes/hicn/transport/core/content_object.h b/libtransport/includes/hicn/transport/core/content_object.h index 5af548fe4..822790e56 100644 --- a/libtransport/includes/hicn/transport/core/content_object.h +++ b/libtransport/includes/hicn/transport/core/content_object.h @@ -46,8 +46,6 @@ class ContentObject : public Packet { ~ContentObject() override; - void replace(MemBufPtr &&buffer) override; - const Name &getName() const override; Name &getWritableName() override; diff --git a/libtransport/includes/hicn/transport/core/interest.h b/libtransport/includes/hicn/transport/core/interest.h index f0c546a8e..c572afbff 100644 --- a/libtransport/includes/hicn/transport/core/interest.h +++ b/libtransport/includes/hicn/transport/core/interest.h @@ -44,8 +44,6 @@ class Interest ~Interest() override; - void replace(MemBufPtr &&buffer) override; - const Name &getName() const override; Name &getWritableName() override; diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h index e758aa13e..3ddc4a595 100644 --- a/libtransport/includes/hicn/transport/core/packet.h +++ b/libtransport/includes/hicn/transport/core/packet.h @@ -17,6 +17,7 @@ #include <hicn/transport/core/name.h> #include <hicn/transport/core/payload_type.h> +#include <hicn/transport/errors/malformed_packet_exception.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/security/crypto_hasher.h> #include <hicn/transport/security/crypto_suite.h> @@ -81,7 +82,12 @@ class Packet : public std::enable_shared_from_this<Packet> { virtual ~Packet(); static std::size_t getHeaderSizeFromFormat(Format format, - std::size_t signature_size = 0); + std::size_t signature_size = 0) { + std::size_t header_length; + hicn_packet_get_header_length_from_format(format, &header_length); + int is_ah = _is_ah(format); + return is_ah * (header_length + signature_size) + (!is_ah) * header_length; + } static std::size_t getHeaderSizeFromBuffer(Format format, const uint8_t *buffer); @@ -91,9 +97,26 @@ class Packet : public std::enable_shared_from_this<Packet> { static bool isInterest(const uint8_t *buffer); - static Format getFormatFromBuffer(const uint8_t *buffer); + static Format getFormatFromBuffer(const uint8_t *buffer) { + Format format = HF_UNSPEC; - virtual void replace(MemBufPtr &&buffer); + if (TRANSPORT_EXPECT_FALSE( + hicn_packet_get_format((const hicn_header_t *)buffer, &format) < + 0)) { + throw errors::MalformedPacketException(); + } + + return format; + } + + TRANSPORT_ALWAYS_INLINE void replace(MemBufPtr &&buffer) { + packet_ = std::move(buffer); + packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData()); + header_head_ = packet_.get(); + payload_head_ = nullptr; + format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_)); + name_.clear(); + } std::size_t payloadSize() const; @@ -123,6 +146,19 @@ class Packet : public std::enable_shared_from_this<Packet> { std::unique_ptr<utils::MemBuf> getPayload() const; + std::pair<const uint8_t *, std::size_t> getPayloadReference() const { + int signature_size = 0; + if (_is_ah(format_)) { + signature_size = (uint32_t)getSignatureSize(); + } + + auto header_size = getHeaderSizeFromFormat(format_, signature_size); + auto payload_length = packet_->length() - header_size; + + return std::make_pair(packet_->data() + header_size, + payload_length); + } + Packet &updateLength(std::size_t length = 0); PayloadType getPayloadType() const; @@ -152,7 +188,21 @@ class Packet : public std::enable_shared_from_this<Packet> { virtual utils::CryptoHash computeDigest( utils::CryptoHashType algorithm) const; - void setChecksum(); + void setChecksum() { + uint16_t partial_csum = 0; + + for (utils::MemBuf *current = header_head_->next(); + current && current != header_head_; current = current->next()) { + if (partial_csum != 0) { + partial_csum = ~partial_csum; + } + partial_csum = csum(current->data(), current->length(), partial_csum); + } + if (hicn_packet_compute_header_checksum(format_, packet_start_, + partial_csum) < 0) { + throw errors::MalformedPacketException(); + } + } bool checkIntegrity() const; @@ -184,7 +234,19 @@ class Packet : public std::enable_shared_from_this<Packet> { private: virtual void resetForHash() = 0; void setSignatureSize(std::size_t size_bytes); - std::size_t getSignatureSize() const; + + std::size_t getSignatureSize() const { + size_t size_bytes; + int ret = + hicn_packet_get_signature_size(format_, packet_start_, &size_bytes); + + if (ret < 0) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + + return size_bytes; + } + uint8_t *getSignature() const; void separateHeaderPayload(); diff --git a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h index 7910a4422..0b7a79c3d 100644 --- a/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/includes/hicn/transport/interfaces/socket_options_keys.h @@ -94,7 +94,6 @@ typedef enum { CACHE_HIT = 506, CACHE_MISS = 508, NEW_CONTENT_OBJECT = 509, - CONTENT_OBJECT_SIGN = 513, CONTENT_OBJECT_READY = 510, CONTENT_OBJECT_OUTPUT = 511, CONTENT_PRODUCED = 512, diff --git a/libtransport/includes/hicn/transport/utils/CMakeLists.txt b/libtransport/includes/hicn/transport/utils/CMakeLists.txt index 396bd06d6..38ecc3d37 100644 --- a/libtransport/includes/hicn/transport/utils/CMakeLists.txt +++ b/libtransport/includes/hicn/transport/utils/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/object_pool.h ${CMAKE_CURRENT_SOURCE_DIR}/membuf.h ${CMAKE_CURRENT_SOURCE_DIR}/spinlock.h + ${CMAKE_CURRENT_SOURCE_DIR}/fixed_block_allocator.h ) if(NOT WIN32) diff --git a/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h new file mode 100644 index 000000000..1ade1516e --- /dev/null +++ b/libtransport/includes/hicn/transport/utils/fixed_block_allocator.h @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + */ + +#pragma once + +#include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/utils/spinlock.h> + +#include <stdint.h> +#include <cstdlib> +#include <memory> +#include <cassert> + +namespace utils { +template <std::size_t DEFAULT_SIZE = 512, std::size_t OBJECTS = 4096> +class FixedBlockAllocator { + FixedBlockAllocator(std::size_t size = DEFAULT_SIZE, + std::size_t objects = OBJECTS) + : block_size_(size < sizeof(void*) ? sizeof(long*) : size), + object_size_(size), + max_objects_(objects), + p_head_(NULL), + pool_index_(0), + block_count_(0), + blocks_in_use_(0), + allocations_(0), + deallocations_(0) { + p_pool_ = (uint8_t*)new uint8_t[block_size_ * max_objects_]; + } + + public: + static FixedBlockAllocator* getInstance() { + if (!instance_) { + instance_ = std::unique_ptr<FixedBlockAllocator>( + new FixedBlockAllocator(DEFAULT_SIZE, OBJECTS)); + } + + return instance_.get(); + } + + ~FixedBlockAllocator() { delete[] p_pool_; } + + TRANSPORT_ALWAYS_INLINE void* allocateBlock(size_t size = DEFAULT_SIZE) { + assert(size <= DEFAULT_SIZE); + uint32_t index; + + void* p_block = pop(); + if (!p_block) { + if (pool_index_ < max_objects_) { + { + SpinLock::Acquire locked(lock_); + index = pool_index_++; + } + p_block = (void*)(p_pool_ + (index * block_size_)); + } else { + // TODO Consider increasing pool here instead of throwing an exception + throw std::runtime_error("No more memory available from packet pool!"); + } + } + + blocks_in_use_++; + allocations_++; + + return p_block; + } + + TRANSPORT_ALWAYS_INLINE void deallocateBlock(void* pBlock) { + push(pBlock); + { + SpinLock::Acquire locked(lock_); + blocks_in_use_--; + deallocations_++; + } + } + + TRANSPORT_ALWAYS_INLINE std::size_t blockSize() { return block_size_; } + + TRANSPORT_ALWAYS_INLINE uint32_t blockCount() { return block_count_; } + + TRANSPORT_ALWAYS_INLINE uint32_t blocksInUse() { return blocks_in_use_; } + + TRANSPORT_ALWAYS_INLINE uint32_t allocations() { return allocations_; } + + TRANSPORT_ALWAYS_INLINE uint32_t deallocations() { return deallocations_; } + + private: + TRANSPORT_ALWAYS_INLINE void push(void* p_memory) { + Block* p_block = (Block*)p_memory; + { + SpinLock::Acquire locked(lock_); + p_block->p_next = p_head_; + p_head_ = p_block; + } + } + + TRANSPORT_ALWAYS_INLINE void* pop() { + Block* p_block = nullptr; + + { + SpinLock::Acquire locked(lock_); + if (p_head_) { + p_block = p_head_; + p_head_ = p_head_->p_next; + } + } + + return (void*)p_block; + } + + struct Block { + Block* p_next; + }; + + static std::unique_ptr<FixedBlockAllocator> instance_; + + const std::size_t block_size_; + const std::size_t object_size_; + const std::size_t max_objects_; + + Block* p_head_; + uint8_t* p_pool_; + uint32_t pool_index_; + uint32_t block_count_; + uint32_t blocks_in_use_; + uint32_t allocations_; + uint32_t deallocations_; + + SpinLock lock_; +}; + +template <std::size_t A, std::size_t B> +std::unique_ptr<FixedBlockAllocator<A, B>> + FixedBlockAllocator<A, B>::instance_ = nullptr; + +} // namespace utils
\ No newline at end of file diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt index 12ef9cfe4..5c8ab9270 100644 --- a/libtransport/src/core/CMakeLists.txt +++ b/libtransport/src/core/CMakeLists.txt @@ -33,7 +33,6 @@ list(APPEND HEADER_FILES list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc - ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc ${CMAKE_CURRENT_SOURCE_DIR}/name.cc ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc index 6cbcdb29e..f5cccf404 100644 --- a/libtransport/src/core/content_object.cc +++ b/libtransport/src/core/content_object.cc @@ -86,15 +86,6 @@ ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) { ContentObject::~ContentObject() {} -void ContentObject::replace(MemBufPtr &&buffer) { - Packet::replace(std::move(buffer)); - - if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < - 0) { - throw errors::RuntimeException("Error getting name from content object."); - } -} - const Name &ContentObject::getName() const { if (!name_) { if (hicn_data_get_name(format_, packet_start_, diff --git a/libtransport/src/core/forwarder_interface.h b/libtransport/src/core/forwarder_interface.h index 3e70e221d..3b016c4bb 100644 --- a/libtransport/src/core/forwarder_interface.h +++ b/libtransport/src/core/forwarder_interface.h @@ -95,15 +95,11 @@ class ForwarderInterface { packet.setLocator(inet6_address_); } - // TRANSPORT_LOGI("Sending packet %s at %lu", - // packet.getName().toString().c_str(), - // utils::SteadyClock::now().time_since_epoch().count()); packet.setChecksum(); connector_.send(packet.acquireMemBufReference()); } TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) { - // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check; counters_.tx_packets++; counters_.tx_bytes += len; diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc index 166632f0a..9ee662615 100644 --- a/libtransport/src/core/interest.cc +++ b/libtransport/src/core/interest.cc @@ -72,15 +72,6 @@ Interest::Interest(Interest &&other_interest) Interest::~Interest() {} -void Interest::replace(MemBufPtr &&buffer) { - Packet::replace(std::move(buffer)); - - if (hicn_interest_get_name(format_, packet_start_, - name_.getStructReference()) < 0) { - throw errors::MalformedPacketException(); - } -} - const Name &Interest::getName() const { if (!name_) { if (hicn_interest_get_name(format_, packet_start_, diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc index 2292e9b41..49f262ec8 100644 --- a/libtransport/src/core/memif_connector.cc +++ b/libtransport/src/core/memif_connector.cc @@ -305,11 +305,13 @@ void MemifConnector::sendCallback(const std::error_code &ec) { } } -void MemifConnector::processInputBuffer() { +void MemifConnector::processInputBuffer(std::uint16_t total_packets) { Packet::MemBufPtr ptr; - while (input_buffer_.pop(ptr)) { - receive_callback_(std::move(ptr)); + for (; total_packets > 0; total_packets--) { + if (input_buffer_.pop(ptr)) { + receive_callback_(std::move(ptr)); + } } } @@ -339,6 +341,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, memif_connection_t *c = connector->memif_connection_.get(); int err = MEMIF_ERR_SUCCESS, ret_val; + uint16_t total_packets = 0; uint16_t rx; do { @@ -353,7 +356,7 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, c->rx_buf_num += rx; - if (TRANSPORT_EXPECT_TRUE(connector->io_service_.stopped())) { + if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) { TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx); goto error; } @@ -386,11 +389,12 @@ int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, } c->rx_buf_num -= rx; + total_packets += rx; } while (ret_val == MEMIF_ERR_NOBUF); connector->io_service_.post( - std::bind(&MemifConnector::processInputBuffer, connector)); + std::bind(&MemifConnector::processInputBuffer, connector, total_packets)); return 0; diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h index aafef1e56..693efd14c 100644 --- a/libtransport/src/core/memif_connector.h +++ b/libtransport/src/core/memif_connector.h @@ -96,7 +96,7 @@ class MemifConnector : public Connector { void sendCallback(const std::error_code &ec); - void processInputBuffer(); + void processInputBuffer(std::uint16_t total_packets); private: static utils::EpollEventReactor main_event_reactor_; diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc index 67e647fca..6815868f0 100644 --- a/libtransport/src/core/packet.cc +++ b/libtransport/src/core/packet.cc @@ -69,14 +69,6 @@ Packet::Packet(Packet &&other) Packet::~Packet() {} -std::size_t Packet::getHeaderSizeFromFormat(Format format, - size_t signature_size) { - std::size_t header_length; - hicn_packet_get_header_length_from_format(format, &header_length); - int is_ah = _is_ah(format); - return is_ah * (header_length + signature_size) + (!is_ah) * header_length; -} - std::size_t Packet::getHeaderSizeFromBuffer(Format format, const uint8_t *buffer) { size_t header_length; @@ -99,17 +91,6 @@ bool Packet::isInterest(const uint8_t *buffer) { return !is_interest; } -Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer) { - Format format = HF_UNSPEC; - - if (TRANSPORT_EXPECT_FALSE( - hicn_packet_get_format((const hicn_header_t *)buffer, &format) < 0)) { - throw errors::MalformedPacketException(); - } - - return format; -} - std::size_t Packet::getPayloadSizeFromBuffer(Format format, const uint8_t *buffer) { std::size_t payload_length; @@ -122,14 +103,6 @@ std::size_t Packet::getPayloadSizeFromBuffer(Format format, return payload_length; } -void Packet::replace(MemBufPtr &&buffer) { - packet_ = std::move(buffer); - packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData()); - header_head_ = packet_.get(); - payload_head_ = nullptr; - format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_)); -} - std::size_t Packet::payloadSize() const { return getPayloadSizeFromBuffer(format_, reinterpret_cast<uint8_t *>(packet_start_)); @@ -270,17 +243,6 @@ uint8_t *Packet::getSignature() const { return signature; } -std::size_t Packet::getSignatureSize() const { - size_t size_bytes; - int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes); - - if (ret < 0) { - throw errors::RuntimeException("Packet without Authentication Header."); - } - - return size_bytes; -} - void Packet::setSignatureTimestamp(const uint64_t ×tamp) { int ret = hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp); @@ -366,22 +328,6 @@ utils::CryptoHash Packet::computeDigest(utils::CryptoHashType algorithm) const { return hasher.finalize(); } -void Packet::setChecksum() { - uint16_t partial_csum = 0; - - for (utils::MemBuf *current = header_head_->next(); - current && current != header_head_; current = current->next()) { - if (partial_csum != 0) { - partial_csum = ~partial_csum; - } - partial_csum = csum(current->data(), current->length(), partial_csum); - } - if (hicn_packet_compute_header_checksum(format_, packet_start_, - partial_csum) < 0) { - throw errors::MalformedPacketException(); - } -} - bool Packet::checkIntegrity() const { if (hicn_packet_check_integrity(format_, packet_start_) < 0) { return false; diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h index d9ec2ed40..aeff78ea2 100644 --- a/libtransport/src/core/pending_interest.h +++ b/libtransport/src/core/pending_interest.h @@ -47,17 +47,29 @@ class PendingInterest { public: using Ptr = utils::ObjectPool<PendingInterest>::Ptr; - PendingInterest(); + PendingInterest() + : interest_(nullptr, nullptr), + timer_(), + on_content_object_callback_(), + on_interest_timeout_callback_() {} PendingInterest(Interest::Ptr &&interest, - std::unique_ptr<asio::steady_timer> &&timer); + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(), + on_interest_timeout_callback_() {} PendingInterest(Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object, OnInterestTimeoutCallback &&on_interest_timeout, - std::unique_ptr<asio::steady_timer> &&timer); + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(std::move(on_content_object)), + on_interest_timeout_callback_(std::move(on_interest_timeout)) {} - ~PendingInterest(); + ~PendingInterest() = default; template <typename Handler> TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { @@ -66,19 +78,35 @@ class PendingInterest { timer_->async_wait(std::forward<Handler &&>(cb)); } - void cancelTimer(); + TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); } - Interest::Ptr &&getInterest(); + TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() { + return std::move(interest_); + } - void setInterest(Interest::Ptr &&interest); + TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &&interest) { + interest_ = std::move(interest); + } - const OnContentObjectCallback &getOnDataCallback() const; + TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback() + const { + return on_content_object_callback_; + } - void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object); + TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback( + OnContentObjectCallback &&on_content_object) { + PendingInterest::on_content_object_callback_ = on_content_object; + } - const OnInterestTimeoutCallback &getOnTimeoutCallback() const; + TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback & + getOnTimeoutCallback() const { + return on_interest_timeout_callback_; + } - void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout); + TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback( + OnInterestTimeoutCallback &&on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; + } private: Interest::Ptr interest_; diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index cf1010068..05715543a 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -24,6 +24,7 @@ #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> #include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/fixed_block_allocator.h> #include <core/forwarder_interface.h> #include <core/pending_interest.h> @@ -52,21 +53,20 @@ class HandlerMemory { static constexpr std::size_t memory_size = 1024 * 1024; public: - HandlerMemory() : index_(0) {} + HandlerMemory() {} HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return &storage_[index_++ % memory_size]; + return utils::FixedBlockAllocator<128, 4096>::getInstance() + ->allocateBlock(); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {} - - private: - // Storage space used for handler-based custom memory allocation. - typename std::aligned_storage<128>::type storage_[memory_size]; - uint32_t index_; + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { + utils::FixedBlockAllocator<128, 4096>::getInstance()->deallocateBlock( + pointer); + } #else public: HandlerMemory() {} @@ -627,6 +627,8 @@ class Portal { } private: + portal_details::HandlerMemory async_callback_memory_; + asio::io_service &io_service_; asio::io_service internal_io_service_; portal_details::Pool packet_pool_; @@ -639,8 +641,6 @@ class Portal { ConsumerCallback *consumer_callback_; ProducerCallback *producer_callback_; - portal_details::HandlerMemory async_callback_memory_; - typename ForwarderInt::ConnectorType connector_; ForwarderInt forwarder_interface_; }; diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 2fc8d2b48..488f238ba 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -35,7 +35,7 @@ class ConsumerSocket : public Socket<BasePortal> { public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) : consumer_interface_(consumer), - portal_(std::make_shared<Portal>(io_service_)), + portal_(std::make_shared<Portal>()), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -62,10 +62,8 @@ class ConsumerSocket : public Socket<BasePortal> { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), on_content_object_verification_(VOID_HANDLER), - on_content_object_(VOID_HANDLER), stats_summary_(VOID_HANDLER), read_callback_(nullptr), - virtual_download_(false), timer_interval_milliseconds_(0), guard_raaqm_params_() { switch (protocol) { @@ -323,11 +321,6 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - case GeneralTransportOptions::VERIFY_SIGNATURE: verify_signature_ = socket_option_value; result = SOCKET_OPTION_SET; @@ -631,10 +624,6 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = transport_protocol_->isRunning(); break; - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - case GeneralTransportOptions::VERIFY_SIGNATURE: socket_option_value = verify_signature_; break; @@ -861,8 +850,9 @@ class ConsumerSocket : public Socket<BasePortal> { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -881,7 +871,6 @@ class ConsumerSocket : public Socket<BasePortal> { protected: interface::ConsumerSocket *consumer_interface_; - asio::io_service io_service_; std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; @@ -925,15 +914,11 @@ class ConsumerSocket : public Socket<BasePortal> { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerContentObjectVerificationCallback on_content_object_verification_; - ConsumerContentObjectCallback on_content_object_; ConsumerTimerCallback stats_summary_; ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; - // Virtual download for traffic generator - bool virtual_download_; - uint32_t timer_interval_milliseconds_; // Transport protocol diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index d4a239cf6..cae0ad7c7 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -43,6 +43,8 @@ using namespace interface; class ProducerSocket : public Socket<BasePortal>, public BasePortal::ProducerCallback { + static constexpr uint32_t burst_size = 256; + public: explicit ProducerSocket(interface::ProducerSocket *producer_socket) : producer_interface_(producer_socket), @@ -293,6 +295,27 @@ class ProducerSocket : public Socket<BasePortal>, } } + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; + while (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } + + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } + + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } + + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } + }); + io_service_.dispatch([this, buffer_size]() { if (on_content_produced_) { on_content_produced_(*producer_interface_, @@ -300,7 +323,6 @@ class ProducerSocket : public Socket<BasePortal>, } }); - TRANSPORT_LOGD("--------- END PRODUCE ------------"); return suffix_strategy->getTotalCount(); } @@ -498,12 +520,6 @@ class ProducerSocket : public Socket<BasePortal>, break; } - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: if (socket_option_value == VOID_HANDLER) { on_content_object_in_output_buffer_ = VOID_HANDLER; @@ -569,10 +585,6 @@ class ProducerSocket : public Socket<BasePortal>, on_new_segment_ = socket_option_value; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: on_content_object_in_output_buffer_ = socket_option_value; break; @@ -755,10 +767,6 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_new_segment_; break; - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: *socket_option_value = &on_content_object_in_output_buffer_; break; @@ -972,6 +980,9 @@ class ProducerSocket : public Socket<BasePortal>, // by the application thread std::atomic<uint32_t> content_object_expiry_time_; + utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> + object_queue_for_callbacks_; + // buffers // ContentStore is thread-safe utils::ContentStore output_buffer_; @@ -1027,36 +1038,45 @@ class ProducerSocket : public Socket<BasePortal>, portal_->runEventsLoop(); } - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - if (content_object) { - io_service_.dispatch([this, content_object]() { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *content_object); - } + void scheduleSendBurst() { + io_service_.post([this]() { + std::shared_ptr<ContentObject> co; - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *content_object); - } + for (uint32_t i = 0; i < burst_size; i++) { + if (object_queue_for_callbacks_.pop(co)) { + if (on_new_segment_) { + on_new_segment_(*producer_interface_, *co); + } - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - *content_object); - } - }); + if (on_content_object_to_sign_) { + on_content_object_to_sign_(*producer_interface_, *co); + } - output_buffer_.insert(content_object); + if (on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_(*producer_interface_, *co); + } - io_service_.dispatch([this, content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); + if (on_content_object_output_) { + on_content_object_output_(*producer_interface_, *co); + } + } else { + break; } - }); + } + }); + } - portal_->sendContentObject(*content_object); + void passContentObjectToCallbacks( + const std::shared_ptr<ContentObject> &content_object) { + output_buffer_.insert(content_object); + portal_->sendContentObject(*content_object); + object_queue_for_callbacks_.push(std::move(content_object)); + + if (object_queue_for_callbacks_.size() >= burst_size) { + scheduleSendBurst(); } } -}; // namespace implementation +}; } // namespace implementation diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc index c2996ebc1..12631637e 100644 --- a/libtransport/src/protocols/byte_stream_reassembly.cc +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -39,7 +39,7 @@ ByteStreamReassembly::ByteStreamReassembly( void ByteStreamReassembly::reassemble( std::unique_ptr<ContentObjectManifest> &&manifest) { - if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { + if (TRANSPORT_EXPECT_TRUE(manifest != nullptr) && read_buffer_->capacity()) { received_packets_.emplace( std::make_pair(manifest->getName().getSuffix(), nullptr)); assembleContent(); @@ -47,7 +47,8 @@ void ByteStreamReassembly::reassemble( } void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr) && + read_buffer_->capacity()) { received_packets_.emplace(std::make_pair( content_object->getName().getSuffix(), std::move(content_object))); assembleContent(); @@ -80,19 +81,19 @@ void ByteStreamReassembly::assembleContent() { } void ByteStreamReassembly::copyContent(const ContentObject &content_object) { - auto a = content_object.getPayload(); - auto payload_length = a->length(); + auto payload = content_object.getPayloadReference(); + auto payload_length = payload.second; auto write_size = std::min(payload_length, read_buffer_->tailroom()); auto additional_bytes = payload_length > read_buffer_->tailroom() ? payload_length - read_buffer_->tailroom() : 0; - std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + std::memcpy(read_buffer_->writableTail(), payload.first, write_size); read_buffer_->append(write_size); if (!read_buffer_->tailroom()) { notifyApplication(); - std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + std::memcpy(read_buffer_->writableTail(), payload.first + write_size, additional_bytes); read_buffer_->append(additional_bytes); } diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc index aa290bef8..8463f84f9 100644 --- a/libtransport/src/protocols/protocol.cc +++ b/libtransport/src/protocols/protocol.cc @@ -31,7 +31,16 @@ TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, index_manager_( std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), is_running_(false), - is_first_(false) { + is_first_(false), + on_interest_retransmission_(VOID_HANDLER), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + on_content_object_verification_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + verification_failed_callback_(VOID_HANDLER), + on_payload_(VOID_HANDLER) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); } @@ -46,6 +55,26 @@ int TransportProtocol::start() { // Set it is the first time we schedule an interest is_first_ = true; + // Get all callbacks references before starting + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &on_interest_retransmission_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &on_interest_timeout_); + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &on_interest_satisfied_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &on_content_object_input_); + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + &on_content_object_verification_); + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_); + socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback_); + socket_->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + &on_payload_); + // Schedule next interests scheduleNextInterests(); @@ -81,10 +110,7 @@ void TransportProtocol::resume() { } void TransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { + if (!on_payload_) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " @@ -92,9 +118,9 @@ void TransportProtocol::onContentReassembled(std::error_code ec) { } if (!ec) { - on_payload->readSuccess(stats_->getBytesRecv()); + on_payload_->readSuccess(stats_->getBytesRecv()); } else { - on_payload->readError(ec); + on_payload_->readError(ec); } stop(); diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h index 949380959..db4524133 100644 --- a/libtransport/src/protocols/protocol.h +++ b/libtransport/src/protocols/protocol.h @@ -17,6 +17,8 @@ #include <atomic> +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/object_pool.h> @@ -34,6 +36,8 @@ using namespace core; class IndexVerificationManager; +using ReadCallback = interface::ConsumerSocket::ReadCallback; + class TransportProtocolCallback { virtual void onContentObject(const core::Interest &interest, const core::ContentObject &content_object) = 0; @@ -89,6 +93,20 @@ class TransportProtocol : public implementation::BasePortal::ConsumerCallback, // True if it si the first time we schedule an interest std::atomic<bool> is_first_; interface::TransportStatistics *stats_; + + // Callbacks + interface::ConsumerInterestCallback *on_interest_retransmission_; + interface::ConsumerInterestCallback *on_interest_output_; + interface::ConsumerInterestCallback *on_interest_timeout_; + interface::ConsumerInterestCallback *on_interest_satisfied_; + interface::ConsumerContentObjectCallback *on_content_object_input_; + interface::ConsumerContentObjectVerificationCallback + *on_content_object_verification_; + interface::ConsumerContentObjectCallback *on_content_object_; + interface::ConsumerTimerCallback *stats_summary_; + interface::ConsumerContentObjectVerificationFailedCallback + *verification_failed_callback_; + ReadCallback *on_payload_; }; } // end namespace protocol diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc index 8a38f8ccf..0a93dec44 100644 --- a/libtransport/src/protocols/raaqm.cc +++ b/libtransport/src/protocols/raaqm.cc @@ -325,18 +325,12 @@ void RaaqmTransportProtocol::onContentObject( } // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_->getInterface(), *content_object); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), *content_object); } - ConsumerInterestCallback *callback_interest = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, - &callback_interest); - if (*callback_interest) { - (*callback_interest)(*socket_->getInterface(), *interest); + if (*on_interest_satisfied_) { + (*on_interest_satisfied_)(*socket_->getInterface(), *interest); } if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { @@ -369,23 +363,17 @@ void RaaqmTransportProtocol::onPacketDropped( socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); uint64_t segment = interest->getName().getSuffix(); - ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { stats_->updateRetxCount(1); - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_retransmission_) { + (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (!is_running_) { @@ -393,7 +381,6 @@ void RaaqmTransportProtocol::onPacketDropped( } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); } else { TRANSPORT_LOGE( @@ -428,11 +415,8 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { return; } - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_timeout_) { + (*on_interest_timeout_)(*socket_->getInterface(), *interest); } afterDataUnsatisfied(segment); @@ -444,18 +428,12 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { max_rtx)) { stats_->updateRetxCount(1); - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_retransmission_) { + (*on_interest_retransmission_)(*socket_->getInterface(), *interest); } - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (!is_running_) { @@ -463,7 +441,6 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } interest_retransmissions_[segment & mask]++; - interest_to_retransmit_.push(std::move(interest)); scheduleNextInterests(); @@ -507,7 +484,7 @@ void RaaqmTransportProtocol::scheduleNextInterests() { } } -void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { +bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { auto interest = getPacket(); core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); @@ -519,15 +496,12 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_lifetime); interest->setLifetime(interest_lifetime); - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - callback->operator()(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + on_interest_output_->operator()(*socket_->getInterface(), *interest); } if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; + return false; } // This is set to ~0 so that the next interest_retransmissions_ + 1, @@ -535,6 +509,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); + + return true; } void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { @@ -564,7 +540,7 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { rate_estimator_->onRttUpdate((double)rtt.count()); } - cur_path_->insertNewRtt(rtt.count()); + cur_path_->insertNewRtt(rtt.count(), now); cur_path_->smoothTimer(); if (cur_path_->newPropagationDelayAvailable()) { @@ -595,18 +571,15 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback - ConsumerTimerCallback *stats_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { + if (*stats_summary_) { auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); uint32_t timer_interval_milliseconds = 0; socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, timer_interval_milliseconds); if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_->getInterface(), *stats_); - t0_ = utils::SteadyClock::now(); + (*stats_summary_)(*socket_->getInterface(), *stats_); + t0_ = now; } } } diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h index 412967770..ecc466755 100644 --- a/libtransport/src/protocols/raaqm.h +++ b/libtransport/src/protocols/raaqm.h @@ -79,7 +79,7 @@ class RaaqmTransportProtocol : public TransportProtocol, virtual void scheduleNextInterests() override; - void sendInterest(std::uint64_t next_suffix); + bool sendInterest(std::uint64_t next_suffix); void sendInterest(Interest::Ptr &&interest); diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc index 439549c85..8bbbadcf2 100644 --- a/libtransport/src/protocols/raaqm_data_path.cc +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -48,7 +48,8 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor, average_rtt_(0), alpha_(ALPHA) {} -RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { +RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt, + const utils::TimePoint &now) { rtt_ = new_rtt; rtt_samples_.pushBack(new_rtt); @@ -60,7 +61,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { prop_delay_ = rtt_min_; } - last_received_pkt_ = utils::SteadyClock::now(); + last_received_pkt_ = now; return *this; } diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h index 6f2afde72..3f037bc76 100644 --- a/libtransport/src/protocols/raaqm_data_path.h +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -45,7 +45,7 @@ class RaaqmDataPath { * max of RTT. * @param new_rtt is the value of the new RTT */ - RaaqmDataPath &insertNewRtt(uint64_t new_rtt); + RaaqmDataPath &insertNewRtt(uint64_t new_rtt, const utils::TimePoint &now); /** * @brief Update the path statistics diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc index 0ac3839dd..72abb599a 100644 --- a/libtransport/src/protocols/rtc.cc +++ b/libtransport/src/protocols/rtc.cc @@ -288,15 +288,12 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); - ConsumerTimerCallback *stats_callback = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { + if (*stats_summary_) { // Send the stats to the app stats_->updateQueuingDelay(queuingDelay_); stats_->updateLossRatio(lossRate_); stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_->getInterface(), *stats_); + (*stats_summary_)(*socket_->getInterface(), *stats_); } // bound also by interest lifitime* production rate if (!gotNack_) { @@ -451,13 +448,8 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { interestLifetime); interest->setLifetime(uint32_t(interestLifetime)); - ConsumerInterestCallback *on_interest_output = nullptr; - - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &on_interest_output); - - if (*on_interest_output) { - (*on_interest_output)(*socket_->getInterface(), *interest); + if (*on_interest_output_) { + (*on_interest_output_)(*socket_->getInterface(), *interest); } if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { @@ -890,11 +882,8 @@ void RTCTransportProtocol::onContentObject( uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - ConsumerContentObjectCallback *callback_content_object = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_->getInterface(), *content_object); + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), *content_object); } if (segmentNumber >= HICN_MIN_PROBE_SEQ) { diff --git a/libtransport/src/security/signer.cc b/libtransport/src/security/signer.cc index 314c3ea82..8a56cfa3d 100644 --- a/libtransport/src/security/signer.cc +++ b/libtransport/src/security/signer.cc @@ -162,12 +162,10 @@ void Signer::sign(Packet &packet) { } CryptoHash hash = hasher.finalize(); - signature_ = parcSigner_SignDigestNoAlloc(this->signer_, hash.hash_, packet.getSignature(), (uint32_t)signature_length_); PARCBuffer *buffer = parcSignature_GetSignature(signature_); - size_t bytes_len = parcBuffer_Remaining(buffer); if (bytes_len > signature_length_) { @@ -176,6 +174,8 @@ void Signer::sign(Packet &packet) { hicn_packet_copy_header(format, &header_copy, (hicn_header_t *)packet.packet_start_, false); + + parcSignature_Release(&signature_); } size_t Signer::getSignatureLength() { return signature_length_; } diff --git a/libtransport/src/security/verifier.cc b/libtransport/src/security/verifier.cc index 19796f718..0cfbdc6f9 100644 --- a/libtransport/src/security/verifier.cc +++ b/libtransport/src/security/verifier.cc @@ -116,17 +116,10 @@ PARCKeyId *Verifier::addKeyFromCertificate(const std::string &file_name) { } int Verifier::verify(const Packet &packet) { - // to initialize packet.payload_head_ + // Initialize packet.payload_head_ const_cast<Packet *>(&packet)->separateHeaderPayload(); - bool valid = false; - - // initialize packet.payload_head_ - const_cast<Packet *>(&packet)->separateHeaderPayload(); - // header chain points to the IP + TCP hicn header - // utils::MemBuf *header_chain = packet.header_head_; - // utils::MemBuf *payload_chain = packet.payload_head_; - // uint8_t *hicn_packet = header_chain->writableData(); Packet::Format format = packet.getFormat(); + bool valid = false; if (!(packet.format_ & HFO_AH)) { throw errors::MalformedAHPacketException(); @@ -149,11 +142,12 @@ int Verifier::verify(const Packet &packet) { int ah_payload_len = (int)packet.getSignatureSize(); uint8_t *_signature = packet.getSignature(); uint8_t *signature = new uint8_t[ah_payload_len]; + std::shared_ptr<CryptoHasher> hasher; + // TODO Remove signature copy at this point, by not setting to zero // the validation payload. std::memcpy(signature, _signature, ah_payload_len); - std::shared_ptr<CryptoHasher> hasher; switch (CryptoSuite(suite)) { case CryptoSuite::DSA_SHA256: case CryptoSuite::RSA_SHA256: @@ -178,7 +172,7 @@ int Verifier::verify(const Packet &packet) { parcBuffer_Wrap(signature, ah_payload_len, 0, ah_payload_len); parcBuffer_Rewind(bits); - /* IF the signature algo is ECDSA, the signature might be shorter than the + /* If the signature algo is ECDSA, the signature might be shorter than the * signature field */ PARCSigningAlgorithm algo = parcCryptoSuite_GetSigningAlgorithm(suite); while (algo == PARCSigningAlgorithm_ECDSA && parcBuffer_HasRemaining(bits) && diff --git a/libtransport/src/utils/membuf.cc b/libtransport/src/utils/membuf.cc index e75e85b35..94e5b13a1 100644 --- a/libtransport/src/utils/membuf.cc +++ b/libtransport/src/utils/membuf.cc @@ -102,8 +102,6 @@ struct MemBuf::HeapStorage { }; struct MemBuf::HeapFullStorage { - // Make sure jemalloc allocates from the 64-byte class. Putting this here - // because HeapStorage is private so it can't be at namespace level. static_assert(sizeof(HeapStorage) <= 64, "MemBuf may not grow over 56 bytes!"); diff --git a/telemetry/vpp-collectd/CMakeLists.txt b/telemetry/vpp-collectd/CMakeLists.txt index 18926b1c5..ef09fb980 100644 --- a/telemetry/vpp-collectd/CMakeLists.txt +++ b/telemetry/vpp-collectd/CMakeLists.txt @@ -16,11 +16,10 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) set (COLLECTD_PLUGINS hicn-collectd-plugins) project(hicn-collectd-plugins) -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" "${CMAKE_CURRENT_SOURCE_DIR}/../../../cmake/Modules/") +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/" "${CMAKE_CURRENT_SOURCE_DIR}/../../cmake/Modules/") include(BuildMacros) - add_subdirectory(vpp) add_subdirectory(vpp-hicn) diff --git a/telemetry/vpp-collectd/custom_types.db b/telemetry/vpp-collectd/custom_types.db new file mode 100644 index 000000000..98cd801f4 --- /dev/null +++ b/telemetry/vpp-collectd/custom_types.db @@ -0,0 +1,39 @@ +# vpp +if_drops packets:DERIVE:0:U +if_punt packets:DERIVE:0:U +if_ip4 packets:DERIVE:0:U +if_ip6 packets:DERIVE:0:U +if_rx_no_buf packets:DERIVE:0:U +if_rx_miss packets:DERIVE:0:U +if_rx_error packets:DERIVE:0:U +if_tx_error packets:DERIVE:0:U +if_mpls packets:DERIVE:0:U +if_rx packets:DERIVE:0:U, bytes:DERIVE:0:U +if_rx_unicast packets:DERIVE:0:U, bytes:DERIVE:0:U +if_rx_multicast packets:DERIVE:0:U, bytes:DERIVE:0:U +if_rx_broadcast packets:DERIVE:0:U, bytes:DERIVE:0:U +if_tx packets:DERIVE:0:U, bytes:DERIVE:0:U +if_tx_unicast packets:DERIVE:0:U, bytes:DERIVE:0:U +if_tx_multicast packets:DERIVE:0:U, bytes:DERIVE:0:U +if_tx_broadcast packets:DERIVE:0:U, bytes:DERIVE:0:U + +# vpp-hicn +pkts_processed packets:GAUGE:0:U +pkts_interest_count packets:GAUGE:0:U +pkts_data_count packets:GAUGE:0:U +pkts_from_cache_count packets:GAUGE:0:U +pkts_no_pit_count packets:GAUGE:0:U +pit_expired_count interests:GAUGE:0:U +cs_expired_count data:GAUGE:0:U +cs_lru_count data:GAUGE:0:U +pkts_drop_no_buf packets:GAUGE:0:U +interests_aggregated interests:GAUGE:0:U +interests_retx interests:GAUGE:0:U +interests_hash_collision interests:GAUGE:0:U +pit_entries_count interests:GAUGE:0:U +cs_entries_count data:GAUGE:0:U +cs_entries_ntw_count data:GAUGE:0:U +irx packets:DERIVE:0:U, bytes:DERIVE:0:U +itx packets:DERIVE:0:U, bytes:DERIVE:0:U +drx packets:DERIVE:0:U, bytes:DERIVE:0:U +dtx packets:DERIVE:0:U, bytes:DERIVE:0:U diff --git a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c index b90646073..591b8f584 100644 --- a/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c +++ b/telemetry/vpp-collectd/vpp-hicn/vpp_hicn.c @@ -28,6 +28,7 @@ #endif /* DISABLE_ISOC99 */ #endif /* ! HAVE_CONFIG */ +/* Keep order as it is */ #include <config.h> #include <collectd.h> #include <common.h> @@ -41,6 +42,15 @@ DEFINE_VAPI_MSG_IDS_HICN_API_JSON vapi_ctx_t vapi_ctx; +/************** OPTIONS ***********************************/ +static const char *config_keys[2] = { + "Verbose", + "Tag", +}; +static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); +static bool verbose = false; +static char *tag = NULL; + /************** DATA SOURCES ******************************/ static data_source_t packets_dsrc[1] = { {"packets", DS_TYPE_GAUGE, 0, NAN}, @@ -184,8 +194,7 @@ static data_set_t dtx_ds = { * value_list_t and pass it to plugin_dispatch_values. */ static int submit(const char *plugin_instance, const char *type, - const char *type_instance, value_t *values, size_t values_len, - cdtime_t *timestamp) { + value_t *values, size_t values_len, cdtime_t *timestamp) { value_list_t vl = VALUE_LIST_INIT; vl.values = values; vl.values_len = values_len; @@ -198,8 +207,8 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (type_instance != NULL) - sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance)); + if (tag != NULL) + sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -209,6 +218,28 @@ static int submit(const char *plugin_instance, const char *type, /**********************************************************/ /* + * This function is called for each configuration item. + */ +static int vpp_hicn_config(const char *key, const char *value) { + if (strcasecmp(key, "Verbose") == 0) { + verbose = IS_TRUE(value); + } else if (strcasecmp(key, "Tag") == 0) { + if (tag != NULL) { + free(tag); + tag = NULL; + } + + if (strcasecmp(value, "None")) { + tag = strdup(value); + } + } else { + return 1; + } + + return 0; +} + +/* * Callback called by the hICN plugin API when node stats are ready. */ static vapi_error_e @@ -226,36 +257,38 @@ parse_node_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, cdtime_t timestamp = cdtime(); values[0] = (value_t){.gauge = reply->pkts_processed}; - submit(node_name, pkts_processed_ds.type, NULL, values, 1, ×tamp); + submit(node_name, pkts_processed_ds.type, values, 1, ×tamp); values[0] = (value_t){.gauge = reply->pkts_interest_count}; - submit(node_name, pkts_interest_count_ds.type, NULL, values, 1, ×tamp); + submit(node_name, pkts_interest_count_ds.type, values, 1, ×tamp); values[0] = (value_t){.gauge = reply->pkts_data_count}; - submit(node_name, pkts_data_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->pkts_from_cache_count}; - submit(node_name, pkts_from_cache_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->pkts_no_pit_count}; - submit(node_name, pkts_no_pit_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->pit_expired_count}; - submit(node_name, pit_expired_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->cs_expired_count}; - submit(node_name, cs_expired_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->cs_lru_count}; - submit(node_name, cs_lru_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->pkts_drop_no_buf}; - submit(node_name, pkts_drop_no_buf_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->interests_aggregated}; - submit(node_name, interests_aggregated_ds.type, NULL, values, 1, ×tamp); + submit(node_name, pkts_data_count_ds.type, values, 1, ×tamp); values[0] = (value_t){.gauge = reply->interests_retx}; - submit(node_name, interests_retx_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->interests_hash_collision}; - submit(node_name, interests_hash_collision_ds.type, NULL, values, 1, - ×tamp); + submit(node_name, interests_retx_ds.type, values, 1, ×tamp); values[0] = (value_t){.gauge = reply->pit_entries_count}; - submit(node_name, pit_entries_count_ds.type, NULL, values, 1, ×tamp); + submit(node_name, pit_entries_count_ds.type, values, 1, ×tamp); values[0] = (value_t){.gauge = reply->cs_entries_count}; - submit(node_name, cs_entries_count_ds.type, NULL, values, 1, ×tamp); - values[0] = (value_t){.gauge = reply->cs_entries_ntw_count}; - submit(node_name, cs_entries_ntw_count_ds.type, NULL, values, 1, ×tamp); + submit(node_name, cs_entries_count_ds.type, values, 1, ×tamp); + + if (verbose) { + values[0] = (value_t){.gauge = reply->pkts_from_cache_count}; + submit(node_name, pkts_from_cache_count_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->interests_aggregated}; + submit(node_name, interests_aggregated_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->cs_expired_count}; + submit(node_name, cs_expired_count_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->cs_lru_count}; + submit(node_name, cs_lru_count_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->pit_expired_count}; + submit(node_name, pit_expired_count_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->pkts_no_pit_count}; + submit(node_name, pkts_no_pit_count_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->pkts_drop_no_buf}; + submit(node_name, pkts_drop_no_buf_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->interests_hash_collision}; + submit(node_name, interests_hash_collision_ds.type, values, 1, ×tamp); + values[0] = (value_t){.gauge = reply->cs_entries_ntw_count}; + submit(node_name, cs_entries_ntw_count_ds.type, values, 1, ×tamp); + } return VAPI_OK; } @@ -280,16 +313,16 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, values[0] = (value_t){.derive = reply->irx_packets}; values[1] = (value_t){.derive = reply->irx_bytes}; - submit(face_name, irx_ds.type, NULL, values, 2, ×tamp); + submit(face_name, irx_ds.type, values, 2, ×tamp); values[0] = (value_t){.derive = reply->itx_packets}; values[1] = (value_t){.derive = reply->itx_bytes}; - submit(face_name, itx_ds.type, NULL, values, 2, ×tamp); + submit(face_name, itx_ds.type, values, 2, ×tamp); values[0] = (value_t){.derive = reply->drx_packets}; values[1] = (value_t){.derive = reply->drx_bytes}; - submit(face_name, drx_ds.type, NULL, values, 2, ×tamp); + submit(face_name, drx_ds.type, values, 2, ×tamp); values[0] = (value_t){.derive = reply->dtx_packets}; values[1] = (value_t){.derive = reply->dtx_bytes}; - submit(face_name, dtx_ds.type, NULL, values, 2, ×tamp); + submit(face_name, dtx_ds.type, values, 2, ×tamp); return VAPI_OK; } @@ -297,7 +330,7 @@ parse_face_stats(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, /* * This function is called once upon startup to initialize the plugin. */ -static int my_init(void) { +static int vpp_hicn_init(void) { int ret = vapi_connect_safe(&vapi_ctx, 0); if (ret) @@ -309,7 +342,7 @@ static int my_init(void) { /* * This function is called in regular intervalls to collect the data. */ -static int my_read(void) { +static int vpp_hicn_read(void) { int err = VAPI_OK; vapi_lock(); @@ -365,7 +398,7 @@ END: /* * This function is called when plugin_log () has been used. */ -static void my_log(int severity, const char *msg, user_data_t *ud) { +static void vpp_hicn_log(int severity, const char *msg, user_data_t *ud) { printf("[LOG %i] %s\n", severity, msg); return; } @@ -373,11 +406,18 @@ static void my_log(int severity, const char *msg, user_data_t *ud) { /* * This function is called before shutting down collectd. */ -static int my_shutdown(void) { +static int vpp_hicn_shutdown(void) { plugin_log(LOG_INFO, "vpp_hicn plugin: shutting down"); + int ret = vapi_disconnect_safe(); plugin_log(LOG_INFO, "vpp_hicn plugin: disconnect vapi %s", ret == 0 ? "ok" : "error"); + + if (tag != NULL) { + free(tag); + tag = NULL; + } + return ret; } @@ -408,9 +448,11 @@ void module_register(void) { plugin_register_data_set(&cs_entries_count_ds); plugin_register_data_set(&cs_entries_ntw_count_ds); // callbacks - plugin_register_log("vpp_hicn", my_log, /* user data */ NULL); - plugin_register_init("vpp_hicn", my_init); - plugin_register_read("vpp_hicn", my_read); - plugin_register_shutdown("vpp_hicn", my_shutdown); + plugin_register_log("vpp_hicn", vpp_hicn_log, /* user data */ NULL); + plugin_register_config("vpp_hicn", vpp_hicn_config, config_keys, + config_keys_num); + plugin_register_init("vpp_hicn", vpp_hicn_init); + plugin_register_read("vpp_hicn", vpp_hicn_read); + plugin_register_shutdown("vpp_hicn", vpp_hicn_shutdown); return; } diff --git a/telemetry/vpp-collectd/vpp/vpp.c b/telemetry/vpp-collectd/vpp/vpp.c index 72c052212..0fec85dc8 100644 --- a/telemetry/vpp-collectd/vpp/vpp.c +++ b/telemetry/vpp-collectd/vpp/vpp.c @@ -28,6 +28,7 @@ #endif /* DISABLE_ISOC99 */ #endif /* ! HAVE_CONFIG */ +/* Keep order as it is */ #include <config.h> #include <collectd.h> #include <common.h> @@ -38,6 +39,15 @@ #include <vppinfra/vec.h> #undef counter_t +/************** OPTIONS ***********************************/ +static const char *config_keys[2] = { + "Verbose", + "Tag", +}; +static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); +static bool verbose = false; +static char *tag = NULL; + /************** DATA SOURCES ******************************/ static data_source_t combined_dsrc[2] = { {"packets", DS_TYPE_DERIVE, 0, NAN}, @@ -160,8 +170,7 @@ static data_set_t if_tx_broadcast_ds = { * value_list_t and pass it to plugin_dispatch_values. */ static int submit(const char *plugin_instance, const char *type, - const char *type_instance, value_t *values, size_t values_len, - cdtime_t *timestamp) { + value_t *values, size_t values_len, cdtime_t *timestamp) { value_list_t vl = VALUE_LIST_INIT; vl.values = values; vl.values_len = values_len; @@ -174,8 +183,8 @@ static int submit(const char *plugin_instance, const char *type, sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance)); sstrncpy(vl.type, type, sizeof(vl.type)); - if (type_instance != NULL) - sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance)); + if (tag != NULL) + sstrncpy(vl.type_instance, tag, sizeof(vl.type_instance)); return plugin_dispatch_values(&vl); } @@ -188,44 +197,50 @@ static int get_data_set(const char *stat_name, data_set_t *data_set_ptr) { return 1; } - if (strcmp(stat_name, "/if/drops") == 0) { - *data_set_ptr = if_drops_ds; - } else if (strcmp(stat_name, "/if/punt") == 0) { - *data_set_ptr = if_punt_ds; + if (strcmp(stat_name, "/if/rx") == 0) { + *data_set_ptr = if_rx_ds; + } else if (strcmp(stat_name, "/if/tx") == 0) { + *data_set_ptr = if_tx_ds; } else if (strcmp(stat_name, "/if/ip4") == 0) { *data_set_ptr = if_ip4_ds; } else if (strcmp(stat_name, "/if/ip6") == 0) { *data_set_ptr = if_ip6_ds; - } else if (strcmp(stat_name, "/if/rx-no-buf") == 0) { - *data_set_ptr = if_rx_no_buf_ds; - } else if (strcmp(stat_name, "/if/rx-miss") == 0) { - *data_set_ptr = if_rx_miss_ds; - } else if (strcmp(stat_name, "/if/rx-error") == 0) { - *data_set_ptr = if_rx_error_ds; - } else if (strcmp(stat_name, "/if/tx-error") == 0) { - *data_set_ptr = if_tx_error_ds; - } else if (strcmp(stat_name, "/if/mpls") == 0) { - *data_set_ptr = if_mpls_ds; - } else if (strcmp(stat_name, "/if/rx") == 0) { - *data_set_ptr = if_rx_ds; - } else if (strcmp(stat_name, "/if/rx-unicast") == 0) { - *data_set_ptr = if_rx_unicast_ds; - } else if (strcmp(stat_name, "/if/rx-multicast") == 0) { - *data_set_ptr = if_rx_multicast_ds; - } else if (strcmp(stat_name, "/if/rx-broadcast") == 0) { - *data_set_ptr = if_rx_broadcast_ds; - } else if (strcmp(stat_name, "/if/tx") == 0) { - *data_set_ptr = if_tx_ds; - } else if (strcmp(stat_name, "/if/tx-unicast") == 0) { - *data_set_ptr = if_tx_unicast_ds; - } else if (strcmp(stat_name, "/if/tx-multicast") == 0) { - *data_set_ptr = if_tx_multicast_ds; - } else if (strcmp(stat_name, "/if/tx-broadcast") == 0) { - *data_set_ptr = if_tx_broadcast_ds; - } else { + } else if (strcmp(stat_name, "/if/drops") == 0) { + *data_set_ptr = if_drops_ds; + } else if (!verbose) { return 1; } + if (verbose) { + if (strcmp(stat_name, "/if/punt") == 0) { + *data_set_ptr = if_punt_ds; + } else if (strcmp(stat_name, "/if/mpls") == 0) { + *data_set_ptr = if_mpls_ds; + } else if (strcmp(stat_name, "/if/rx-no-buf") == 0) { + *data_set_ptr = if_rx_no_buf_ds; + } else if (strcmp(stat_name, "/if/rx-miss") == 0) { + *data_set_ptr = if_rx_miss_ds; + } else if (strcmp(stat_name, "/if/rx-error") == 0) { + *data_set_ptr = if_rx_error_ds; + } else if (strcmp(stat_name, "/if/rx-unicast") == 0) { + *data_set_ptr = if_rx_unicast_ds; + } else if (strcmp(stat_name, "/if/rx-multicast") == 0) { + *data_set_ptr = if_rx_multicast_ds; + } else if (strcmp(stat_name, "/if/rx-broadcast") == 0) { + *data_set_ptr = if_rx_broadcast_ds; + } else if (strcmp(stat_name, "/if/tx-error") == 0) { + *data_set_ptr = if_tx_error_ds; + } else if (strcmp(stat_name, "/if/tx-unicast") == 0) { + *data_set_ptr = if_tx_unicast_ds; + } else if (strcmp(stat_name, "/if/tx-multicast") == 0) { + *data_set_ptr = if_tx_multicast_ds; + } else if (strcmp(stat_name, "/if/tx-broadcast") == 0) { + *data_set_ptr = if_tx_broadcast_ds; + } else { + return 1; + } + } + return 0; } @@ -234,9 +249,31 @@ static int get_data_set(const char *stat_name, data_set_t *data_set_ptr) { /**********************************************************/ /* + * This function is called for each configuration item. + */ +static int vpp_config(const char *key, const char *value) { + if (strcasecmp(key, "Verbose") == 0) { + verbose = IS_TRUE(value); + } else if (strcasecmp(key, "Tag") == 0) { + if (tag != NULL) { + free(tag); + tag = NULL; + } + + if (strcasecmp(value, "None")) { + tag = strdup(value); + } + } else { + return 1; + } + + return 0; +} + +/* * This function is called once upon startup to initialize the plugin. */ -static int my_init(void) { +static int vpp_init(void) { u8 *stat_segment_name = (u8 *)STAT_SEGMENT_SOCKET_FILE; int ret = stat_segment_connect((char *)stat_segment_name); @@ -249,7 +286,7 @@ static int my_init(void) { /* * This function is called in regular intervalls to collect the data. */ -static int my_read(void) { +static int vpp_read(void) { uint8_t **patterns = {0}; char **interfaces = {0}; @@ -258,7 +295,6 @@ static int my_read(void) { uint32_t *dir = stat_segment_ls(patterns); stat_segment_data_t *res = stat_segment_dump(dir); - plugin_log(LOG_INFO, "vpp plugin: performed ls and dump"); /* Read all available interfaces */ for (int k = 0; k < vec_len(res); k++) { @@ -291,8 +327,7 @@ static int my_read(void) { continue; } - err = - submit(interfaces[j], data_set.type, NULL, values, 1, ×tamp); + err = submit(interfaces[j], data_set.type, values, 1, ×tamp); if (err) goto END; @@ -314,8 +349,7 @@ static int my_read(void) { continue; } - err = - submit(interfaces[j], data_set.type, NULL, values, 2, ×tamp); + err = submit(interfaces[j], data_set.type, values, 2, ×tamp); if (err) goto END; @@ -354,7 +388,7 @@ END: /* * This function is called when plugin_log () has been used. */ -static void my_log(int severity, const char *msg, user_data_t *ud) { +static void vpp_log(int severity, const char *msg, user_data_t *ud) { printf("[LOG %i] %s\n", severity, msg); return; } @@ -362,9 +396,16 @@ static void my_log(int severity, const char *msg, user_data_t *ud) { /* * This function is called before shutting down collectd. */ -static int my_shutdown(void) { +static int vpp_shutdown(void) { plugin_log(LOG_INFO, "vpp plugin: shutting down"); + + if (tag != NULL) { + free(tag); + tag = NULL; + } + stat_segment_disconnect(); + return 0; } @@ -390,9 +431,10 @@ void module_register(void) { plugin_register_data_set(&if_tx_unicast_ds); plugin_register_data_set(&if_tx_multicast_ds); plugin_register_data_set(&if_tx_broadcast_ds); - plugin_register_log("vpp", my_log, /* user data */ NULL); - plugin_register_init("vpp", my_init); - plugin_register_read("vpp", my_read); - plugin_register_shutdown("vpp", my_shutdown); + plugin_register_log("vpp", vpp_log, /* user data */ NULL); + plugin_register_config("vpp", vpp_config, config_keys, config_keys_num); + plugin_register_init("vpp", vpp_init); + plugin_register_read("vpp", vpp_read); + plugin_register_shutdown("vpp", vpp_shutdown); return; } diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index e037e1309..eabd12c86 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -69,10 +69,10 @@ struct ClientConfiguration { beta(-1.f), drop_factor(-1.f), window(-1), - virtual_download(true), producer_certificate(""), passphrase(""), receive_buffer(nullptr), + receive_buffer_size_(128 * 1024), download_size(0), report_interval_milliseconds_(1000), transport_protocol_(CBR), @@ -90,10 +90,10 @@ struct ClientConfiguration { double beta; double drop_factor; double window; - bool virtual_download; std::string producer_certificate; std::string passphrase; std::shared_ptr<utils::MemBuf> receive_buffer; + std::size_t receive_buffer_size_; std::size_t download_size; std::uint32_t report_interval_milliseconds_; TransportProtocolAlgorithms transport_protocol_; @@ -423,12 +423,6 @@ class HIperfClient { } } - if (consumer_socket_->setSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, - configuration_.virtual_download) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - if (configuration_.verify) { std::shared_ptr<utils::Verifier> verifier = std::make_shared<utils::Verifier>(); @@ -570,22 +564,29 @@ class HIperfClient { }; class Callback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t read_size = 16 * 1024; - public: - Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {} + Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { + client_.configuration_.receive_buffer = + utils::MemBuf::create(client_.configuration_.receive_buffer_size_); + } - bool isBufferMovable() noexcept override { return true; } + bool isBufferMovable() noexcept override { return false; } void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override {} + size_t *max_length) override { + *application_buffer = + client_.configuration_.receive_buffer->writableData(); + *max_length = client_.configuration_.receive_buffer_size_; + } void readDataAvailable(std::size_t length) noexcept override {} void readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} - size_t maxBufferSize() const override { return read_size; } + size_t maxBufferSize() const override { + return client_.configuration_.receive_buffer_size_; + } void readError(const std::error_code ec) noexcept override { std::cerr << "Error " << ec.message() << " while reading from socket" @@ -742,6 +743,7 @@ class HIperfServer { } void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { + // std::cout << "Received interest " << interest.getName() << std::endl; content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); producer_socket_->produce( @@ -1149,8 +1151,10 @@ void usage() { << std::endl; std::cerr << "-L\t<interest lifetime>\t\t" << "Set interest lifetime." << std::endl; - std::cerr << "-M\t<Download for real>\t\t" - << "Store the content downloaded." << std::endl; + std::cerr << "-M\t<input_buffer_size>\t\t" + << "Size of consumer input buffer. If 0, reassembly of packets " + "will be disabled." + << std::endl; std::cerr << "-W\t<window_size>\t\t\t" << "Use a fixed congestion window " "for retrieving the data." @@ -1204,7 +1208,7 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 while ((opt = getopt(argc, argv, - "DSCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != + "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != -1) { switch (opt) { // Common @@ -1218,7 +1222,7 @@ int main(int argc, char *argv[]) { } #else while ((opt = getopt(argc, argv, - "SCf:b:d:W:RMc:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { + "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { switch (opt) { #endif case 'f': { @@ -1265,7 +1269,7 @@ int main(int argc, char *argv[]) { break; } case 'M': { - client_configuration.virtual_download = false; + client_configuration.receive_buffer_size_ = std::stoull(optarg); options = 1; break; } |