diff options
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/full_duplex_socket.h')
-rwxr-xr-x | libtransport/src/hicn/transport/interfaces/full_duplex_socket.h | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h new file mode 100755 index 000000000..f881bea54 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This class is created for sending/receiving data over an ICN network. + */ + +#pragma once + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/interfaces/async_transport.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/sharable_vector.h> + +#include <unordered_set> +#include <vector> + +namespace transport { + +namespace interface { + +enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD }; + +enum class Action : uint8_t { + SUBSCRIBE, + CANCEL_SUBSCRIPTION, + SIGNAL_PRODUCTION, +}; + +enum class ReturnCode : uint8_t { + OK, + FAILED, +}; + +struct MessageHeader { + MessageType msg_type; + uint8_t reserved[2]; +}; + +struct ActionMessage { + MessageHeader header; + Action action; + uint64_t name[2]; +}; + +struct ResponseMessage { + MessageHeader header; + ReturnCode return_code; +}; + +struct SubscriptionResponseMessage { + ResponseMessage response; + uint64_t name[2]; +}; + +struct PayloadMessage { + MessageHeader header; + uint8_t reserved[1]; +}; + +// struct NotificationMessage { +// Action action; +// uint8_t reserved[3]; +// uint64_t +// } + +using core::Prefix; + +class AsyncFullDuplexSocket : public AsyncSocket, + public AsyncReader, + public AsyncWriter, + public AsyncAcceptor { + private: + struct Counters { + uint64_t app_bytes_written_; + uint64_t app_bytes_read_; + + TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) { + app_bytes_written_ += bytes; + } + + TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) { + app_bytes_read_ += bytes; + } + }; + + public: + using UniquePtr = std::unique_ptr<AsyncFullDuplexSocket>; + using SharedPtr = std::unique_ptr<AsyncFullDuplexSocket>; + + AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); + AsyncFullDuplexSocket(const core::Prefix &locator); + + ~AsyncFullDuplexSocket() { + TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!"); + }; + + using ReadCallback = AsyncReader::ReadCallback; + using WriteCallback = AsyncWriter::WriteCallback; + + TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override { + read_callback_ = callback; + } + + TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override { + return read_callback_; + } + + TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override { + write_callback_ = callback; + } + + TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override { + return write_callback_; + } + + TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; } + + void connect(ConnectCallback *callback, const core::Prefix &prefix) override; + + void write(WriteCallback *callback, const void *buf, size_t bytes, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) override; + + virtual void write(WriteCallback *callback, + utils::SharableVector<uint8_t> &&output_buffer, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) override; + + void waitForSubscribers(AcceptCallback *cb) override; + + // void writev( + // WriteCallback* callback, + // const iovec* vec, + // size_t count, + // Name &&content_to_publish_name, + // WriteFlags flags = WriteFlags::NONE) override; + + void close() override; + + void closeNow() override; + + void shutdownWrite() override; + + void shutdownWriteNow() override; + + bool good() const override; + + bool readable() const override; + + bool writable() const override; + + bool isPending() const override; + + bool connected() const override; + + bool error() const override; + + void setSendTimeout(uint32_t milliseconds) override; + + size_t getAppBytesWritten() const override; + size_t getRawBytesWritten() const override; + size_t getAppBytesReceived() const override; + size_t getRawBytesReceived() const override; + + uint32_t getSendTimeout() const override; + + private: + std::shared_ptr<core::ContentObject> decodeSynchronizationMessage( + const core::Interest &interest); + + class OnConnectCallback : public BasePortal::ConsumerCallback { + public: + OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; + virtual ~OnConnectCallback() = default; + void onContentObject(core::Interest::Ptr &&, + core::ContentObject::Ptr &&content_object) override; + void onTimeout(core::Interest::Ptr &&interest) override; + + private: + AsyncFullDuplexSocket &socket_; + }; + + class OnSignalCallback : public BasePortal::ConsumerCallback { + public: + OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; + virtual ~OnSignalCallback() = default; + void onContentObject(core::Interest::Ptr &&, + core::ContentObject::Ptr &&content_object); + void onTimeout(core::Interest::Ptr &&interest); + + private: + AsyncFullDuplexSocket &socket_; + }; + + void onControlInterest(ProducerSocket &s, const core::Interest &i); + void onContentProduced(ProducerSocket &producer, const std::error_code &ec, + uint64_t bytes_written); + void onContentRetrieved(ConsumerSocket &s, std::size_t size, + const std::error_code &ec); + + void signalProductionToSubscribers(const core::Name &name); + void piggybackPayloadToSubscribers(const core::Name &name, + const uint8_t *buffer, std::size_t bytes); + + std::shared_ptr<core::ContentObject> createAck(); + std::shared_ptr<core::ContentObject> createSubscriptionResponse( + const core::Name &name); + + core::Prefix locator_; + uint32_t incremental_suffix_; + core::Name sync_notification_; + // std::unique_ptr<BasePortal> portal_; + asio::io_service internal_io_service_; + asio::io_service &io_service_; + asio::io_service::work work_; + + // These names represent the "locator" of a certain + // peer that subscribed to this. + std::unordered_set<core::Name> subscribers_; + + // Useful for publishing / Retrieving data + std::unique_ptr<ProducerSocket> producer_; + std::unique_ptr<ConsumerSocket> consumer_; + + ReadCallback *read_callback_; + WriteCallback *write_callback_; + ConnectCallback *connect_callback_; + AcceptCallback *accept_callback_; + + std::unique_ptr<OnConnectCallback> internal_connect_callback_; + std::unique_ptr<OnSignalCallback> internal_signal_callback_; + + uint32_t send_timeout_milliseconds_; + struct Counters counters_; + std::shared_ptr<utils::SharableVector<uint8_t>> receive_buffer_; +}; + +} // namespace interface +} // namespace transport |