diff options
Diffstat (limited to 'libtransport/includes/hicn/transport/core/connector.h')
-rw-r--r-- | libtransport/includes/hicn/transport/core/connector.h | 88 |
1 files changed, 55 insertions, 33 deletions
diff --git a/libtransport/includes/hicn/transport/core/connector.h b/libtransport/includes/hicn/transport/core/connector.h index dcf38cdc8..7882b285d 100644 --- a/libtransport/includes/hicn/transport/core/connector.h +++ b/libtransport/includes/hicn/transport/core/connector.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #pragma once +#include <glog/logging.h> #include <hicn/transport/core/connector_stats.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/endpoint.h> @@ -29,6 +30,7 @@ #include <deque> #include <functional> +#include <system_error> namespace transport { @@ -50,54 +52,51 @@ class Connector : public std::enable_shared_from_this<Connector> { enum class Role : std::uint8_t { CONSUMER, PRODUCER }; - public: static constexpr std::size_t queue_size = 4096; static constexpr std::uint32_t invalid_connector = ~0; - -#ifdef LINUX + static constexpr std::uint32_t max_reconnection_reattempts = 5; static constexpr std::uint16_t max_burst = 256; -#endif using Ptr = std::shared_ptr<Connector>; - using PacketQueue = std::deque<Packet::Ptr>; + using ReceptionBuffer = std::vector<utils::MemBuf::Ptr>; + using PacketQueue = std::deque<utils::MemBuf::Ptr>; using PacketReceivedCallback = std::function<void( - Connector *, utils::MemBuf &, const std::error_code &)>; + Connector *, const ReceptionBuffer &, const std::error_code &)>; using PacketSentCallback = std::function<void(Connector *, const std::error_code &)>; using OnCloseCallback = std::function<void(Connector *)>; - using OnReconnectCallback = std::function<void(Connector *)>; + using OnReconnectCallback = + std::function<void(Connector *, const std::error_code &)>; using Id = std::uint64_t; template <typename ReceiveCallback, typename SentCallback, typename OnClose, typename OnReconnect> Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent, OnClose &&close_callback, OnReconnect &&on_reconnect) - : receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)), - sent_callback_(std::forward<SentCallback &&>(packet_sent)), - on_close_callback_(std::forward<OnClose &&>(close_callback)), - on_reconnect_callback_(std::forward<OnReconnect &&>(on_reconnect)), - state_(State::CLOSED), - connector_id_(invalid_connector) {} + : receive_callback_(std::forward<ReceiveCallback>(receive_callback)), + sent_callback_(std::forward<SentCallback>(packet_sent)), + on_close_callback_(std::forward<OnClose>(close_callback)), + on_reconnect_callback_(std::forward<OnReconnect>(on_reconnect)) {} virtual ~Connector(){}; template <typename ReceiveCallback> void setReceiveCallback(ReceiveCallback &&callback) { - receive_callback_ = std::forward<ReceiveCallback &&>(callback); + receive_callback_ = std::forward<ReceiveCallback>(callback); } template <typename SentCallback> void setSentCallback(SentCallback &&callback) { - sent_callback_ = std::forward<SentCallback &&>(callback); + sent_callback_ = std::forward<SentCallback>(callback); } template <typename OnClose> void setOnCloseCallback(OnClose &&callback) { - on_close_callback_ = std::forward<OnClose &&>(callback); + on_close_callback_ = std::forward<OnClose>(callback); } template <typename OnReconnect> - void setReconnectCallback(const OnReconnect &&callback) { + void setReconnectCallback(OnReconnect &&callback) { on_reconnect_callback_ = std::forward<OnReconnect>(callback); } @@ -115,7 +114,15 @@ class Connector : public std::enable_shared_from_this<Connector> { virtual void send(Packet &packet) = 0; - virtual void send(const uint8_t *packet, std::size_t len) = 0; + virtual void send(const utils::MemBuf::Ptr &buffer) = 0; + + virtual void receive(const std::vector<utils::MemBuf::Ptr> &buffers) { + receive_callback_(this, buffers, std::make_error_code(std::errc())); + } + + virtual void reconnect() { + on_reconnect_callback_(this, std::make_error_code(std::errc())); + } virtual void close() = 0; @@ -127,36 +134,48 @@ class Connector : public std::enable_shared_from_this<Connector> { Id getConnectorId() { return connector_id_; } - void setConnectorName(std::string connector_name) { + void setConnectorName(const std::string &connector_name) { connector_name_ = connector_name; } - std::string getConnectorName() { return connector_name_; } + std::string getConnectorName() const { return connector_name_; } + + template <typename EP> + void setLocalEndpoint(EP &&endpoint) { + local_endpoint_ = std::forward<EP>(endpoint); + } - Endpoint getLocalEndpoint() { return local_endpoint_; } + Endpoint getLocalEndpoint() const { return local_endpoint_; } - Endpoint getRemoteEndpoint() { return remote_endpoint_; } + Endpoint getRemoteEndpoint() const { return remote_endpoint_; } void setRole(Role r) { role_ = r; } - Role getRole() { return role_; } + Role getRole() const { return role_; } static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer, std::size_t size) { utils::MemBuf::Ptr ret; - auto format = Packet::getFormatFromBuffer(buffer, size); + hicn_packet_buffer_t pkbuf; + hicn_packet_set_buffer(&pkbuf, buffer, size, size); + hicn_packet_analyze(&pkbuf); + hicn_packet_type_t type = hicn_packet_get_type(&pkbuf); - if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) { - if (Packet::isInterest(buffer)) { + // XXX reuse pkbuf when creating the packet, to avoid reanalyzing it + + switch (type) { + case HICN_PACKET_TYPE_INTEREST: ret = core::PacketManager<>::getInstance() .getPacketFromExistingBuffer<Interest>(buffer, size); - } else { + break; + case HICN_PACKET_TYPE_DATA: ret = core::PacketManager<>::getInstance() .getPacketFromExistingBuffer<ContentObject>(buffer, size); - } - } else { - ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size); + break; + default: + ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size); + break; } return ret; @@ -191,8 +210,8 @@ class Connector : public std::enable_shared_from_this<Connector> { OnReconnectCallback on_reconnect_callback_; // Connector state - std::atomic<State> state_; - Id connector_id_; + std::atomic<State> state_ = State::CLOSED; + Id connector_id_ = invalid_connector; // Endpoints Endpoint local_endpoint_; @@ -206,6 +225,9 @@ class Connector : public std::enable_shared_from_this<Connector> { // Stats AtomicConnectorStats stats_; + + // Connection attempts + std::uint32_t connection_reattempts_ = 0; }; } // namespace core |