aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/includes/hicn/transport/core/connector.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/includes/hicn/transport/core/connector.h')
-rw-r--r--libtransport/includes/hicn/transport/core/connector.h212
1 files changed, 212 insertions, 0 deletions
diff --git a/libtransport/includes/hicn/transport/core/connector.h b/libtransport/includes/hicn/transport/core/connector.h
new file mode 100644
index 000000000..dcf38cdc8
--- /dev/null
+++ b/libtransport/includes/hicn/transport/core/connector.h
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/connector_stats.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/endpoint.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/packet.h>
+#include <hicn/transport/portability/platform.h>
+#include <hicn/transport/utils/membuf.h>
+#include <hicn/transport/utils/object_pool.h>
+#include <hicn/transport/utils/ring_buffer.h>
+#include <hicn/transport/utils/shared_ptr_utils.h>
+
+#include <deque>
+#include <functional>
+
+namespace transport {
+
+namespace core {
+
+class Connector : public std::enable_shared_from_this<Connector> {
+ public:
+ enum class Type : uint8_t {
+ SOCKET_CONNECTOR,
+ MEMIF_CONNECTOR,
+ LOOPBACK_CONNECTOR,
+ };
+
+ enum class State : std::uint8_t {
+ CLOSED,
+ CONNECTING,
+ CONNECTED,
+ };
+
+ enum class Role : std::uint8_t { CONSUMER, PRODUCER };
+
+ public:
+ static constexpr std::size_t queue_size = 4096;
+ static constexpr std::uint32_t invalid_connector = ~0;
+
+#ifdef LINUX
+ static constexpr std::uint16_t max_burst = 256;
+#endif
+
+ using Ptr = std::shared_ptr<Connector>;
+ using PacketQueue = std::deque<Packet::Ptr>;
+ using PacketReceivedCallback = std::function<void(
+ Connector *, utils::MemBuf &, const std::error_code &)>;
+ using PacketSentCallback =
+ std::function<void(Connector *, const std::error_code &)>;
+ using OnCloseCallback = std::function<void(Connector *)>;
+ using OnReconnectCallback = std::function<void(Connector *)>;
+ using Id = std::uint64_t;
+
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect>
+ Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent,
+ OnClose &&close_callback, OnReconnect &&on_reconnect)
+ : receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
+ sent_callback_(std::forward<SentCallback &&>(packet_sent)),
+ on_close_callback_(std::forward<OnClose &&>(close_callback)),
+ on_reconnect_callback_(std::forward<OnReconnect &&>(on_reconnect)),
+ state_(State::CLOSED),
+ connector_id_(invalid_connector) {}
+
+ virtual ~Connector(){};
+
+ template <typename ReceiveCallback>
+ void setReceiveCallback(ReceiveCallback &&callback) {
+ receive_callback_ = std::forward<ReceiveCallback &&>(callback);
+ }
+
+ template <typename SentCallback>
+ void setSentCallback(SentCallback &&callback) {
+ sent_callback_ = std::forward<SentCallback &&>(callback);
+ }
+
+ template <typename OnClose>
+ void setOnCloseCallback(OnClose &&callback) {
+ on_close_callback_ = std::forward<OnClose &&>(callback);
+ }
+
+ template <typename OnReconnect>
+ void setReconnectCallback(const OnReconnect &&callback) {
+ on_reconnect_callback_ = std::forward<OnReconnect>(callback);
+ }
+
+ const PacketReceivedCallback &getReceiveCallback() const {
+ return receive_callback_;
+ }
+
+ const PacketSentCallback &getSentCallback() { return sent_callback_; }
+
+ const OnCloseCallback &getOnCloseCallback() { return on_close_callback_; }
+
+ const OnReconnectCallback &getOnReconnectCallback() {
+ return on_reconnect_callback_;
+ }
+
+ virtual void send(Packet &packet) = 0;
+
+ virtual void send(const uint8_t *packet, std::size_t len) = 0;
+
+ virtual void close() = 0;
+
+ virtual State state() { return state_; };
+
+ virtual bool isConnected() { return state_ == State::CONNECTED; }
+
+ void setConnectorId(Id connector_id) { connector_id_ = connector_id; }
+
+ Id getConnectorId() { return connector_id_; }
+
+ void setConnectorName(std::string connector_name) {
+ connector_name_ = connector_name;
+ }
+
+ std::string getConnectorName() { return connector_name_; }
+
+ Endpoint getLocalEndpoint() { return local_endpoint_; }
+
+ Endpoint getRemoteEndpoint() { return remote_endpoint_; }
+
+ void setRole(Role r) { role_ = r; }
+
+ Role getRole() { return role_; }
+
+ static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer,
+ std::size_t size) {
+ utils::MemBuf::Ptr ret;
+
+ auto format = Packet::getFormatFromBuffer(buffer, size);
+
+ if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) {
+ if (Packet::isInterest(buffer)) {
+ ret = core::PacketManager<>::getInstance()
+ .getPacketFromExistingBuffer<Interest>(buffer, size);
+ } else {
+ ret = core::PacketManager<>::getInstance()
+ .getPacketFromExistingBuffer<ContentObject>(buffer, size);
+ }
+ } else {
+ ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size);
+ }
+
+ return ret;
+ }
+
+ static std::pair<uint8_t *, std::size_t> getRawBuffer() {
+ return core::PacketManager<>::getInstance().getRawBuffer();
+ }
+
+ protected:
+ inline void sendSuccess(const utils::MemBuf &packet) {
+ stats_.tx_packets_.fetch_add(1, std::memory_order_relaxed);
+ stats_.tx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
+ }
+
+ inline void receiveSuccess(const utils::MemBuf &packet) {
+ stats_.rx_packets_.fetch_add(1, std::memory_order_relaxed);
+ stats_.rx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed);
+ }
+
+ inline void sendFailed() {
+ stats_.drops_.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ protected:
+ PacketQueue output_buffer_;
+
+ // Connector events
+ PacketReceivedCallback receive_callback_;
+ PacketSentCallback sent_callback_;
+ OnCloseCallback on_close_callback_;
+ OnReconnectCallback on_reconnect_callback_;
+
+ // Connector state
+ std::atomic<State> state_;
+ Id connector_id_;
+
+ // Endpoints
+ Endpoint local_endpoint_;
+ Endpoint remote_endpoint_;
+
+ // Connector name
+ std::string connector_name_;
+
+ // Connector role
+ Role role_;
+
+ // Stats
+ AtomicConnectorStats stats_;
+};
+
+} // namespace core
+} // namespace transport