/* * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace transport { namespace core { class Connector : public std::enable_shared_from_this { public: enum class Type : uint8_t { SOCKET_CONNECTOR, MEMIF_CONNECTOR, LOOPBACK_CONNECTOR, }; enum class State : std::uint8_t { CLOSED, CONNECTING, CONNECTED, }; enum class Role : std::uint8_t { CONSUMER, PRODUCER }; public: static constexpr std::size_t queue_size = 4096; static constexpr std::uint32_t invalid_connector = ~0; #ifdef LINUX static constexpr std::uint16_t max_burst = 256; #endif using Ptr = std::shared_ptr; using PacketQueue = std::deque; using PacketReceivedCallback = std::function; using PacketSentCallback = std::function; using OnCloseCallback = std::function; using OnReconnectCallback = std::function; using Id = std::uint64_t; template Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent, OnClose &&close_callback, OnReconnect &&on_reconnect) : receive_callback_(std::forward(receive_callback)), sent_callback_(std::forward(packet_sent)), on_close_callback_(std::forward(close_callback)), on_reconnect_callback_(std::forward(on_reconnect)), state_(State::CLOSED), connector_id_(invalid_connector) {} virtual ~Connector(){}; template void setReceiveCallback(ReceiveCallback &&callback) { receive_callback_ = std::forward(callback); } template void setSentCallback(SentCallback &&callback) { sent_callback_ = std::forward(callback); } template void setOnCloseCallback(OnClose &&callback) { on_close_callback_ = std::forward(callback); } template void setReconnectCallback(const OnReconnect &&callback) { on_reconnect_callback_ = std::forward(callback); } const PacketReceivedCallback &getReceiveCallback() const { return receive_callback_; } const PacketSentCallback &getSentCallback() { return sent_callback_; } const OnCloseCallback &getOnCloseCallback() { return on_close_callback_; } const OnReconnectCallback &getOnReconnectCallback() { return on_reconnect_callback_; } virtual void send(Packet &packet) = 0; virtual void send(const uint8_t *packet, std::size_t len) = 0; virtual void close() = 0; virtual State state() { return state_; }; virtual bool isConnected() { return state_ == State::CONNECTED; } void setConnectorId(Id connector_id) { connector_id_ = connector_id; } Id getConnectorId() { return connector_id_; } void setConnectorName(std::string connector_name) { connector_name_ = connector_name; } std::string getConnectorName() { return connector_name_; } Endpoint getLocalEndpoint() { return local_endpoint_; } Endpoint getRemoteEndpoint() { return remote_endpoint_; } void setRole(Role r) { role_ = r; } Role getRole() { return role_; } static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer, std::size_t size) { utils::MemBuf::Ptr ret; auto format = Packet::getFormatFromBuffer(buffer, size); if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) { if (Packet::isInterest(buffer)) { ret = core::PacketManager<>::getInstance() .getPacketFromExistingBuffer(buffer, size); } else { ret = core::PacketManager<>::getInstance() .getPacketFromExistingBuffer(buffer, size); } } else { ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size); } return ret; } static std::pair getRawBuffer() { return core::PacketManager<>::getInstance().getRawBuffer(); } protected: inline void sendSuccess(const utils::MemBuf &packet) { stats_.tx_packets_.fetch_add(1, std::memory_order_relaxed); stats_.tx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed); } inline void receiveSuccess(const utils::MemBuf &packet) { stats_.rx_packets_.fetch_add(1, std::memory_order_relaxed); stats_.rx_bytes_.fetch_add(packet.length(), std::memory_order_relaxed); } inline void sendFailed() { stats_.drops_.fetch_add(1, std::memory_order_relaxed); } protected: PacketQueue output_buffer_; // Connector events PacketReceivedCallback receive_callback_; PacketSentCallback sent_callback_; OnCloseCallback on_close_callback_; OnReconnectCallback on_reconnect_callback_; // Connector state std::atomic state_; Id connector_id_; // Endpoints Endpoint local_endpoint_; Endpoint remote_endpoint_; // Connector name std::string connector_name_; // Connector role Role role_; // Stats AtomicConnectorStats stats_; }; } // namespace core } // namespace transport