From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- .../src/hicn/transport/interfaces/CMakeLists.txt | 4 +- .../hicn/transport/interfaces/async_transport.h | 641 --------------------- .../src/hicn/transport/interfaces/callbacks.h | 110 ++++ .../transport/interfaces/full_duplex_socket.cc | 490 ---------------- .../hicn/transport/interfaces/full_duplex_socket.h | 243 -------- .../src/hicn/transport/interfaces/socket.h | 62 +- .../hicn/transport/interfaces/socket_consumer.cc | 14 +- .../hicn/transport/interfaces/socket_consumer.h | 263 ++++++--- .../transport/interfaces/socket_options_keys.h | 2 +- .../hicn/transport/interfaces/socket_producer.cc | 11 +- .../hicn/transport/interfaces/socket_producer.h | 6 +- 11 files changed, 320 insertions(+), 1526 deletions(-) delete mode 100644 libtransport/src/hicn/transport/interfaces/async_transport.h create mode 100644 libtransport/src/hicn/transport/interfaces/callbacks.h delete mode 100644 libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc delete mode 100644 libtransport/src/hicn/transport/interfaces/full_duplex_socket.h (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index cbf371bac..2ff4fda56 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -19,15 +19,13 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/async_transport.h - ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.h ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h + ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h ) list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc diff --git a/libtransport/src/hicn/transport/interfaces/async_transport.h b/libtransport/src/hicn/transport/interfaces/async_transport.h deleted file mode 100644 index 692dd318c..000000000 --- a/libtransport/src/hicn/transport/interfaces/async_transport.h +++ /dev/null @@ -1,641 +0,0 @@ - -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#ifndef _WIN32 -#include -#endif - -#include - -namespace transport { - -namespace interface { - -/* - * flags given by the application for write* calls - */ -enum class WriteFlags : uint32_t { - NONE = 0x00, - /* - * Whether to delay the output until a subsequent non-corked write. - * (Note: may not be supported in all subclasses or on all platforms.) - */ - CORK = 0x01, - /* - * for a socket that has ACK latency enabled, it will cause the kernel - * to fire a TCP ESTATS event when the last byte of the given write call - * will be acknowledged. - */ - EOR = 0x02, - /* - * this indicates that only the write side of socket should be shutdown - */ - WRITE_SHUTDOWN = 0x04, - /* - * use msg zerocopy if allowed - */ - WRITE_MSG_ZEROCOPY = 0x08, -}; - -/* - * union operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator|(WriteFlags a, WriteFlags b) { - return static_cast(static_cast(a) | - static_cast(b)); -} - -/* - * compound assignment union operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags &operator|=(WriteFlags &a, WriteFlags b) { - a = a | b; - return a; -} - -/* - * intersection operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator&(WriteFlags a, WriteFlags b) { - return static_cast(static_cast(a) & - static_cast(b)); -} - -/* - * compound assignment intersection operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags &operator&=(WriteFlags &a, WriteFlags b) { - a = a & b; - return a; -} - -/* - * exclusion parameter - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator~(WriteFlags a) { - return static_cast(~static_cast(a)); -} - -/* - * unset operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags unSet(WriteFlags a, WriteFlags b) { - return a & ~b; -} - -/* - * inclusion operator - */ -TRANSPORT_ALWAYS_INLINE bool isSet(WriteFlags a, WriteFlags b) { - return (a & b) == b; -} - -class ConnectCallback { - public: - virtual ~ConnectCallback() = default; - - /** - * connectSuccess() will be invoked when the connection has been - * successfully established. - */ - virtual void connectSuccess() noexcept = 0; - - /** - * connectErr() will be invoked if the connection attempt fails. - * - * @param ex An exception describing the error that occurred. - */ - virtual void connectErr(const std::error_code ec) noexcept = 0; -}; - -/** - * AsyncSocket defines an asynchronous API for streaming I/O. - * - * This class provides an API to for asynchronously waiting for data - * on a streaming transport, and for asynchronously sending data. - * - * The APIs for reading and writing are intentionally asymmetric. Waiting for - * data to read is a persistent API: a callback is installed, and is notified - * whenever new data is available. It continues to be notified of new events - * until it is uninstalled. - * - * AsyncSocket does not provide read timeout functionality, because it - * typically cannot determine when the timeout should be active. Generally, a - * timeout should only be enabled when processing is blocked waiting on data - * from the remote endpoint. For server-side applications, the timeout should - * not be active if the server is currently processing one or more outstanding - * requests on this transport. For client-side applications, the timeout - * should not be active if there are no requests pending on the transport. - * Additionally, if a client has multiple pending requests, it will ususally - * want a separate timeout for each request, rather than a single read timeout. - * - * The write API is fairly intuitive: a user can request to send a block of - * data, and a callback will be informed once the entire block has been - * transferred to the kernel, or on error. AsyncSocket does provide a send - * timeout, since most callers want to give up if the remote end stops - * responding and no further progress can be made sending the data. - */ -class AsyncSocket { - public: - /** - * Close the transport. - * - * This gracefully closes the transport, waiting for all pending write - * requests to complete before actually closing the underlying transport. - * - * If a read callback is set, readEOF() will be called immediately. If there - * are outstanding write requests, the close will be delayed until all - * remaining writes have completed. No new writes may be started after - * close() has been called. - */ - virtual void close() = 0; - - /** - * Close the transport immediately. - * - * This closes the transport immediately, dropping any outstanding data - * waiting to be written. - * - * If a read callback is set, readEOF() will be called immediately. - * If there are outstanding write requests, these requests will be aborted - * and writeError() will be invoked immediately on all outstanding write - * callbacks. - */ - virtual void closeNow() = 0; - - /** - * Perform a half-shutdown of the write side of the transport. - * - * The caller should not make any more calls to write() or writev() after - * shutdownWrite() is called. Any future write attempts will fail - * immediately. - * - * Not all transport types support half-shutdown. If the underlying - * transport does not support half-shutdown, it will fully shutdown both the - * read and write sides of the transport. (Fully shutting down the socket is - * better than doing nothing at all, since the caller may rely on the - * shutdownWrite() call to notify the other end of the connection that no - * more data can be read.) - * - * If there is pending data still waiting to be written on the transport, - * the actual shutdown will be delayed until the pending data has been - * written. - * - * Note: There is no corresponding shutdownRead() equivalent. Simply - * uninstall the read callback if you wish to stop reading. (On TCP sockets - * at least, shutting down the read side of the socket is a no-op anyway.) - */ - virtual void shutdownWrite() = 0; - - /** - * Perform a half-shutdown of the write side of the transport. - * - * shutdownWriteNow() is identical to shutdownWrite(), except that it - * immediately performs the shutdown, rather than waiting for pending writes - * to complete. Any pending write requests will be immediately failed when - * shutdownWriteNow() is called. - */ - virtual void shutdownWriteNow() = 0; - - /** - * Determine if transport is open and ready to read or write. - * - * Note that this function returns false on EOF; you must also call error() - * to distinguish between an EOF and an error. - * - * @return true iff the transport is open and ready, false otherwise. - */ - virtual bool good() const = 0; - - /** - * Determine if the transport is readable or not. - * - * @return true iff the transport is readable, false otherwise. - */ - virtual bool readable() const = 0; - - /** - * Determine if the transport is writable or not. - * - * @return true iff the transport is writable, false otherwise. - */ - virtual bool writable() const { - // By default return good() - leave it to implementers to override. - return good(); - } - - /** - * Determine if the there is pending data on the transport. - * - * @return true iff the if the there is pending data, false otherwise. - */ - virtual bool isPending() const { return readable(); } - - /** - * Determine if transport is connected to the endpoint - * - * @return false iff the transport is connected, otherwise true - */ - virtual bool connected() const = 0; - - /** - * Determine if an error has occurred with this transport. - * - * @return true iff an error has occurred (not EOF). - */ - virtual bool error() const = 0; - - // /** - // * Attach the transport to a EventBase. - // * - // * This may only be called if the transport is not currently attached to a - // * EventBase (by an earlier call to detachEventBase()). - // * - // * This method must be invoked in the EventBase's thread. - // */ - // virtual void attachEventBase(EventBase* eventBase) = 0; - - // /** - // * Detach the transport from its EventBase. - // * - // * This may only be called when the transport is idle and has no reads or - // * writes pending. Once detached, the transport may not be used again - // until - // * it is re-attached to a EventBase by calling attachEventBase(). - // * - // * This method must be called from the current EventBase's thread. - // */ - // virtual void detachEventBase() = 0; - - // /** - // * Determine if the transport can be detached. - // * - // * This method must be called from the current EventBase's thread. - // */ - // virtual bool isDetachable() const = 0; - - /** - * Set the send timeout. - * - * If write requests do not make any progress for more than the specified - * number of milliseconds, fail all pending writes and close the transport. - * - * If write requests are currently pending when setSendTimeout() is called, - * the timeout interval is immediately restarted using the new value. - * - * @param milliseconds The timeout duration, in milliseconds. If 0, no - * timeout will be used. - */ - virtual void setSendTimeout(uint32_t milliseconds) = 0; - - /** - * Get the send timeout. - * - * @return Returns the current send timeout, in milliseconds. A return value - * of 0 indicates that no timeout is set. - */ - virtual uint32_t getSendTimeout() const = 0; - - virtual void connect(ConnectCallback *callback, - const core::Prefix &prefix_) = 0; - - // /** - // * Get the address of the local endpoint of this transport. - // * - // * This function may throw AsyncSocketException on error. - // * - // * @param address The local address will be stored in the specified - // * SocketAddress. - // */ - // virtual void getLocalAddress(* address) const = 0; - - virtual size_t getAppBytesWritten() const = 0; - virtual size_t getRawBytesWritten() const = 0; - virtual size_t getAppBytesReceived() const = 0; - virtual size_t getRawBytesReceived() const = 0; - - class BufferCallback { - public: - virtual ~BufferCallback() {} - virtual void onEgressBuffered() = 0; - virtual void onEgressBufferCleared() = 0; - }; - - ~AsyncSocket() = default; -}; - -class AsyncAcceptor { - public: - class AcceptCallback { - public: - virtual ~AcceptCallback() = default; - - /** - * connectionAccepted() is called whenever a new client connection is - * received. - * - * The AcceptCallback will remain installed after connectionAccepted() - * returns. - * - * @param fd The newly accepted client socket. The AcceptCallback - * assumes ownership of this socket, and is responsible - * for closing it when done. The newly accepted file - * descriptor will have already been put into - * non-blocking mode. - * @param clientAddr A reference to a SocketAddress struct containing the - * client's address. This struct is only guaranteed to - * remain valid until connectionAccepted() returns. - */ - virtual void connectionAccepted( - const core::Name &subscriber_name) noexcept = 0; - - /** - * acceptError() is called if an error occurs while accepting. - * - * The AcceptCallback will remain installed even after an accept error, - * as the errors are typically somewhat transient, such as being out of - * file descriptors. The server socket must be explicitly stopped if you - * wish to stop accepting after an error. - * - * @param ex An exception representing the error. - */ - virtual void acceptError(const std::exception &ex) noexcept = 0; - - /** - * acceptStarted() will be called in the callback's EventBase thread - * after this callback has been added to the AsyncServerSocket. - * - * acceptStarted() will be called before any calls to connectionAccepted() - * or acceptError() are made on this callback. - * - * acceptStarted() makes it easier for callbacks to perform initialization - * inside the callback thread. (The call to addAcceptCallback() must - * always be made from the AsyncServerSocket's primary EventBase thread. - * acceptStarted() provides a hook that will always be invoked in the - * callback's thread.) - * - * Note that the call to acceptStarted() is made once the callback is - * added, regardless of whether or not the AsyncServerSocket is actually - * accepting at the moment. acceptStarted() will be called even if the - * AsyncServerSocket is paused when the callback is added (including if - * the initial call to startAccepting() on the AsyncServerSocket has not - * been made yet). - */ - virtual void acceptStarted() noexcept {} - - /** - * acceptStopped() will be called when this AcceptCallback is removed from - * the AsyncServerSocket, or when the AsyncServerSocket is destroyed, - * whichever occurs first. - * - * No more calls to connectionAccepted() or acceptError() will be made - * after acceptStopped() is invoked. - */ - virtual void acceptStopped() noexcept {} - }; - - /** - * Wait for subscribers - * - */ - virtual void waitForSubscribers(AcceptCallback *cb) = 0; -}; - -class AsyncReader { - public: - class ReadCallback { - public: - virtual ~ReadCallback() = default; - - /** - * When data becomes available, getReadBuffer() will be invoked to get the - * buffer into which data should be read. - * - * This method allows the ReadCallback to delay buffer allocation until - * data becomes available. This allows applications to manage large - * numbers of idle connections, without having to maintain a separate read - * buffer for each idle connection. - * - * It is possible that in some cases, getReadBuffer() may be called - * multiple times before readDataAvailable() is invoked. In this case, the - * data will be written to the buffer returned from the most recent call to - * readDataAvailable(). If the previous calls to readDataAvailable() - * returned different buffers, the ReadCallback is responsible for ensuring - * that they are not leaked. - * - * If getReadBuffer() throws an exception, returns a nullptr buffer, or - * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. - * - * getReadBuffer() is not allowed to change the transport state before it - * returns. (For example, it should never uninstall the read callback, or - * set a different read callback.) - * - * @param bufReturn getReadBuffer() should update *bufReturn to contain the - * address of the read buffer. This parameter will never - * be nullptr. - * @param lenReturn getReadBuffer() should update *lenReturn to contain the - * maximum number of bytes that may be written to the read - * buffer. This parameter will never be nullptr. - * - * - * XXX TODO this does not seems to be completely true Checlk i/. - */ - virtual void getReadBuffer(void **bufReturn, size_t *lenReturn) = 0; - - /** - * readDataAvailable() will be invoked when data has been successfully read - * into the buffer returned by the last call to getReadBuffer(). - * - * The read callback remains installed after readDataAvailable() returns. - * It must be explicitly uninstalled to stop receiving read events. - * getReadBuffer() will be called at least once before each call to - * readDataAvailable(). getReadBuffer() will also be called before any - * call to readEOF(). - * - * @param len The number of bytes placed in the buffer. - */ - - virtual void readDataAvailable(size_t len) noexcept = 0; - - /** - * When data becomes available, isBufferMovable() will be invoked to figure - * out which API will be used, readBufferAvailable() or - * readDataAvailable(). If isBufferMovable() returns true, that means - * ReadCallback supports the IOBuf ownership transfer and - * readBufferAvailable() will be used. Otherwise, not. - - * By default, isBufferMovable() always return false. If - * readBufferAvailable() is implemented and to be invoked, You should - * overwrite isBufferMovable() and return true in the inherited class. - * - * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by - * itself until data becomes available. Compared with the pre/post buffer - * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable() - * has two advantages. First, this can avoid memcpy. E.g., in - * AsyncSSLSocket, the decrypted data was copied from the openssl internal - * buffer to the readbuf buffer. With the buffer ownership transfer, the - * internal buffer can be directly "moved" to ReadCallback. Second, the - * memory allocation can be more precise. The reason is - * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size - * because they have more context about the available data than - * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes - * buffer, but the available data is always 16KB (max OpenSSL record size). - */ - - virtual bool isBufferMovable() noexcept { return false; } - - /** - * Suggested buffer size, allocated for read operations, - * if callback is movable and supports folly::IOBuf - */ - - virtual size_t maxBufferSize() const { - return 64 * 1024; // 64K - } - - /** - * readBufferAvailable() will be invoked when data has been successfully - * read. - * - * Note that only either readBufferAvailable() or readDataAvailable() will - * be invoked according to the return value of isBufferMovable(). The timing - * and aftereffect of readBufferAvailable() are the same as - * readDataAvailable() - * - * @param readBuf The unique pointer of read buffer. - */ - - // virtual void readBufferAvailable(uint8_t** buffer, std::size_t - // *buf_length) noexcept {} - - virtual void readBufferAvailable(ContentBuffer &&buffer) noexcept {} - - // virtual void readBufferAvailable(utils::SharableBuffer&& buffer) - // noexcept {} - - /** - * readEOF() will be invoked when the transport is closed. - * - * The read callback will be automatically uninstalled immediately before - * readEOF() is invoked. - */ - virtual void readEOF() noexcept = 0; - - /** - * readError() will be invoked if an error occurs reading from the - * transport. - * - * The read callback will be automatically uninstalled immediately before - * readError() is invoked. - * - * @param ex An exception describing the error that occurred. - */ - virtual void readErr(const std::error_code ec) noexcept = 0; - }; - - // Read methods that aren't part of AsyncTransport. - virtual void setReadCB(ReadCallback *callback) = 0; - virtual ReadCallback *getReadCallback() const = 0; - - protected: - virtual ~AsyncReader() = default; -}; - -class AsyncWriter { - public: - class WriteCallback { - public: - virtual ~WriteCallback() = default; - - /** - * writeSuccess() will be invoked when all of the data has been - * successfully written. - * - * Note that this mainly signals that the buffer containing the data to - * write is no longer needed and may be freed or re-used. It does not - * guarantee that the data has been fully transmitted to the remote - * endpoint. For example, on socket-based transports, writeSuccess() only - * indicates that the data has been given to the kernel for eventual - * transmission. - */ - virtual void writeSuccess() noexcept = 0; - - /** - * writeError() will be invoked if an error occurs writing the data. - * - * @param bytesWritten The number of bytes that were successfull - * @param ex An exception describing the error that occurred. - */ - virtual void writeErr(size_t bytesWritten) noexcept = 0; - }; - - /** - * If you supply a non-null WriteCallback, exactly one of writeSuccess() - * or writeErr() will be invoked when the write completes. If you supply - * the same WriteCallback object for multiple write() calls, it will be - * invoked exactly once per call. The only way to cancel outstanding - * write requests is to close the socket (e.g., with closeNow() or - * shutdownWriteNow()). When closing the socket this way, writeErr() will - * still be invoked once for each outstanding write operation. - */ - virtual void write(WriteCallback *callback, const void *buf, size_t bytes, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) = 0; - - /** - * If you supply a non-null WriteCallback, exactly one of writeSuccess() - * or writeErr() will be invoked when the write completes. If you supply - * the same WriteCallback object for multiple write() calls, it will be - * invoked exactly once per call. The only way to cancel outstanding - * write requests is to close the socket (e.g., with closeNow() or - * shutdownWriteNow()). When closing the socket this way, writeErr() will - * still be invoked once for each outstanding write operation. - */ - virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) = 0; - - // /** - // * If you supply a non-null WriteCallback, exactly one of writeSuccess() - // * or writeErr() will be invoked when the write completes. If you supply - // * the same WriteCallback object for multiple write() calls, it will be - // * invoked exactly once per call. The only way to cancel outstanding - // * write requests is to close the socket (e.g., with closeNow() or - // * shutdownWriteNow()). When closing the socket this way, writeErr() will - // * still be invoked once for each outstanding write operation. - // */ - // virtual void writeChain( - // WriteCallback* callback, - // std::unique_ptr&& buf, - // WriteFlags flags = WriteFlags::NONE) = 0; - - virtual void setWriteCB(WriteCallback *callback) = 0; - virtual WriteCallback *getWriteCallback() const = 0; - - protected: - virtual ~AsyncWriter() = default; -}; - -} // namespace interface - -} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h new file mode 100644 index 000000000..24f47eb75 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace utils { +class MemBuf; +} + +namespace transport { + +namespace protocol { + +class IcnObserver; +class TransportStatistics; + +} // namespace protocol + +namespace core { + +class ContentObject; +class Interest; +} // namespace core + +namespace interface { + +// Forward declarations +class ConsumerSocket; +class ProducerSocket; + +/** + * The ConsumerInterestCallback will be called in different parts of the + * consumer socket processing pipeline, with a ConsumerSocket and an Interest as + * parameters. + */ +using ConsumerInterestCallback = + std::function; + +/** + * The ConsumerTimerCallback is called periodically for exposing to applications + * a summary of the statistics of the transport protocol in use. + */ +using ConsumerTimerCallback = std::function; + +/** + * The ProducerContentCallback will be called by the producer socket right after + * a content has been segmented and published. + */ +using ProducerContentCallback = std::function; + +/** + * The ConsumerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ConsumerSocket and an + * ContentObject as parameters. + */ +using ConsumerContentObjectCallback = + std::function; + +/** + * The ConsumerContentObjectVerificationCallback will be called by the transport + * if an application is willing to verify each content object. Note that a + * better alternative is to instrument the transport to perform the verification + * autonomously, without requiring the intervention of the application. + */ +using ConsumerContentObjectVerificationCallback = + std::function; + +/** + * The ConsumerManifestCallback will be called by the consumer socket when a + * manifest is received. + */ +using ConsumerManifestCallback = + std::function; + +/** + * The ProducerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ProducerSocket and an + * ContentObject as parameters. + */ +using ProducerContentObjectCallback = + std::function; + +/** + * The ProducerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ProducerSocket and an + * Interest as parameters. + */ +using ProducerInterestCallback = + std::function; + +} // namespace interface + +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc deleted file mode 100644 index fdd422dee..000000000 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include - -namespace transport { - -namespace interface { - -static const std::string producer_identity = "producer_socket"; - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator) - : AsyncFullDuplexSocket(locator, internal_io_service_) {} - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator, - asio::io_service &io_service) - : locator_(locator), - incremental_suffix_(0), - io_service_(io_service), - work_(io_service), - producer_(std::make_unique(io_service_)), - consumer_(std::make_unique( - TransportProtocolAlgorithms::RAAQM /* , io_service_ */)), - read_callback_(nullptr), - write_callback_(nullptr), - connect_callback_(nullptr), - accept_callback_(nullptr), - internal_connect_callback_(new OnConnectCallback(*this)), - internal_signal_callback_(new OnSignalCallback(*this)), - send_timeout_milliseconds_(~0), - counters_({0}), - receive_buffer_(ContentBuffer()) { - using namespace transport; - using namespace std::placeholders; - producer_->registerPrefix(locator); - - producer_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2)); - - producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, - uint32_t{150000}); - - ProducerContentCallback producer_callback = - std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3); - producer_->setSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - producer_callback); - - producer_->connect(); - - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, - (ConsumerContentObjectVerificationCallback)[]( - ConsumerSocket & s, const ContentObject &c) - ->bool { return true; }); - - ConsumerContentCallback consumer_callback = - std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3); - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, - consumer_callback); - - consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, - uint32_t{4}); - - consumer_->connect(); -} - -void AsyncFullDuplexSocket::close() { - this->consumer_->stop(); - this->producer_->stop(); -} - -void AsyncFullDuplexSocket::closeNow() { close(); } - -void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); } - -void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); } - -bool AsyncFullDuplexSocket::good() const { return true; } - -bool AsyncFullDuplexSocket::readable() const { - // TODO return status of consumer socket - return true; -} - -bool AsyncFullDuplexSocket::writable() const { - // TODO return status of producer socket - return true; -} - -bool AsyncFullDuplexSocket::isPending() const { - // TODO save if there are production operation in the ops queue - // in producer socket - return true; -} - -bool AsyncFullDuplexSocket::connected() const { - // No real connection here (ICN world). Return good - return good(); -} - -bool AsyncFullDuplexSocket::error() const { return !good(); } - -void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) { - // TODO if production takes too much to complete - // let's abort the operation. - - // Normally with hicn this should be done for content - // pull, not for production. - - send_timeout_milliseconds_ = milliseconds; -} - -uint32_t AsyncFullDuplexSocket::getSendTimeout() const { - return send_timeout_milliseconds_; -} - -size_t AsyncFullDuplexSocket::getAppBytesWritten() const { - return counters_.app_bytes_written_; -} - -size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; } - -size_t AsyncFullDuplexSocket::getAppBytesReceived() const { - return counters_.app_bytes_read_; -} - -size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; } - -void AsyncFullDuplexSocket::connect(ConnectCallback *callback, - const core::Prefix &prefix) { - connect_callback_ = callback; - - // Create an interest for a subscription - auto interest = - core::Interest::Ptr(new core::Interest(prefix.makeRandomName())); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = _payload->writableData(); - ActionMessage *subscription_message = - reinterpret_cast(payload); - subscription_message->header.msg_type = MessageType::ACTION; - subscription_message->action = Action::SUBSCRIBE; - subscription_message->header.reserved[0] = 0; - subscription_message->header.reserved[1] = 0; - - // Set the name the other part should use for notifying a content production - sync_notification_ = std::move(locator_.makeRandomName()); - sync_notification_.copyToDestination( - reinterpret_cast(subscription_message->name)); - - TRANSPORT_LOGI( - "Trying to connect. Sending interest: %s, name for notifications: %s", - prefix.getName().toString().c_str(), - sync_notification_.toString().c_str()); - - interest->setLifetime(1000); - interest->appendPayload(std::move(_payload)); - consumer_->asyncSendInterest(std::move(interest), - internal_connect_callback_.get()); -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf, - size_t bytes, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << bytes << std::endl; - - if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), - reinterpret_cast(buf), bytes); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers( - options.getName(), reinterpret_cast(buf), bytes); - } -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, - ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << output_buffer->size() << std::endl; - - if (output_buffer->size() > - core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), std::move(output_buffer)); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers(options.getName(), &(*output_buffer)[0], - output_buffer->size()); - } -} - -void AsyncFullDuplexSocket::piggybackPayloadToSubscribers( - const core::Name &name, const uint8_t *buffer, std::size_t bytes) { - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage)); - _payload->append(bytes + sizeof(PayloadMessage)); - auto payload = _payload->writableData(); - - PayloadMessage *interest_payload = - reinterpret_cast(payload); - interest_payload->header.msg_type = MessageType::PAYLOAD; - interest_payload->header.reserved[0] = 0; - interest_payload->header.reserved[1] = 0; - interest_payload->reserved[0] = 0; - std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::signalProductionToSubscribers( - const core::Name &name) { - // Signal the other part we are producing a content - // Create an interest for a subscription - - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - // Todo consider using preallocated pool of membufs - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = interest->getPayload()->writableData(); - - ActionMessage *produce_notification = - reinterpret_cast(payload); - produce_notification->header.msg_type = MessageType::ACTION; - produce_notification->action = Action::SIGNAL_PRODUCTION; - produce_notification->header.reserved[0] = 0; - produce_notification->header.reserved[1] = 0; - name.copyToDestination( - reinterpret_cast(produce_notification->name)); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) { - accept_callback_ = cb; -} - -std::shared_ptr -AsyncFullDuplexSocket::decodeSynchronizationMessage( - const core::Interest &interest) { - auto mesg = interest.getPayload(); - const MessageHeader *header = - reinterpret_cast(mesg->data()); - - switch (header->msg_type) { - case MessageType::ACTION: { - // Check what is the action to perform - const ActionMessage *message = - reinterpret_cast(header); - - if (message->action == Action::SUBSCRIBE) { - // Add consumer to list on consumers to be notified - auto ret = - subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0); - TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str()); - if (ret.second) { - accept_callback_->connectionAccepted(*ret.first); - } - - TRANSPORT_LOGI("Connection success!"); - - sync_notification_ = std::move(locator_.makeRandomName()); - return createSubscriptionResponse(sync_notification_); - - } else if (message->action == Action::CANCEL_SUBSCRIPTION) { - // XXX Modify name!!! Each allocated name allocates a 128 bit array. - subscribers_.erase( - core::Name(AF_INET6, (const uint8_t *)message->name, 0)); - return createAck(); - } else if (message->action == Action::SIGNAL_PRODUCTION) { - // trigger a reverse pull for the name contained in the message - core::Name n(AF_INET6, (const uint8_t *)message->name, 0); - std::cout << "PROD NOTIFICATION: Content to retrieve: " << n - << std::endl; - std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName() - << std::endl; // << " compared to " << sync_notification_ << - // std::endl; - - if (sync_notification_.equals(interest.getName(), false)) { - std::cout << "Starting reverse pull for " << n << std::endl; - consumer_->asyncConsume(n, receive_buffer_); - return createAck(); - } - } else { - TRANSPORT_LOGE("Received unknown message. Dropping it."); - } - - break; - } - case MessageType::RESPONSE: { - throw errors::RuntimeException( - "The response should be a content object!!"); - } - case MessageType::PAYLOAD: { - // The interest contains the payload directly. - // We saved one round trip :) - - auto buffer = ContentBuffer(); - const uint8_t *data = mesg->data() + sizeof(PayloadMessage); - buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage)); - read_callback_->readBufferAvailable(std::move(buffer)); - return createAck(); - } - default: { return std::shared_ptr(nullptr); } - } - - return std::shared_ptr(nullptr); -} - -void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, - const core::Interest &i) { - auto payload = i.getPayload(); - if (payload->length()) { - // Try to decode payload and see if starting an async pull operation - auto response = decodeSynchronizationMessage(i); - if (response) { - response->setName(i.getName()); - s.produce(*response); - } - } -} - -void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer, - const std::error_code &ec, - uint64_t bytes_written) { - if (write_callback_) { - if (!ec) { - write_callback_->writeSuccess(); - } else { - write_callback_->writeErr(bytes_written); - } - } -} - -void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, - std::size_t size, - const std::error_code &ec) { - // Sanity check - if (size != receive_buffer_->size()) { - TRANSPORT_LOGE( - "Received content size differs from size retrieved from the buffer."); - return; - } - - TRANSPORT_LOGI("Received content with size %zu", size); - if (!ec) { - read_callback_->readBufferAvailable(std::move(receive_buffer_)); - } else { - TRANSPORT_LOGE("Error retrieving content."); - } - // consumer_->stop(); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - // The ack message should contain the name to be used for notifying - // the production of the content to the other part - - if (content_object->getPayload()->length() == 0) { - TRANSPORT_LOGW("Connection response message empty...."); - return; - } - - SubscriptionResponseMessage *response = - reinterpret_cast( - content_object->getPayload()->writableData()); - - if (response->response.header.msg_type == MessageType::RESPONSE) { - if (response->response.return_code == ReturnCode::OK) { - auto ret = - socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0); - TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s", - ret.first->toString().c_str()); - socket_.connect_callback_->connectSuccess(); - } - } -} - -void AsyncFullDuplexSocket::OnSignalCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - return; -} - -void AsyncFullDuplexSocket::OnSignalCallback::onTimeout( - core::Interest::Ptr &&interest) { - TRANSPORT_LOGE("Retransmitting signalization interest to %s!!", - interest->getName().toString().c_str()); - socket_.consumer_->asyncSendInterest(std::move(interest), - socket_.internal_signal_callback_.get()); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onTimeout( - core::Interest::Ptr &&interest) { - socket_.connect_callback_->connectErr( - std::make_error_code(std::errc::not_connected)); -} - -std::shared_ptr AsyncFullDuplexSocket::createAck() { - // Send the response back - core::Name name("b001::abcd"); - auto response = std::make_shared(name); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ResponseMessage)); - auto payload = response->getPayload()->data(); - ResponseMessage *response_message = (ResponseMessage *)payload; - response_message->header.msg_type = MessageType::RESPONSE; - response_message->header.reserved[0] = 0; - response_message->header.reserved[1] = 0; - response_message->return_code = ReturnCode::OK; - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -std::shared_ptr -AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) { - // Send the response back - core::Name tmp_name("b001::abcd"); - auto response = std::make_shared(tmp_name); - auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage)); - _payload->append(sizeof(SubscriptionResponseMessage)); - auto payload = _payload->data(); - SubscriptionResponseMessage *response_message = - (SubscriptionResponseMessage *)payload; - response_message->response.header.msg_type = MessageType::RESPONSE; - response_message->response.header.reserved[0] = 0; - response_message->response.header.reserved[1] = 0; - response_message->response.return_code = ReturnCode::OK; - name.copyToDestination(reinterpret_cast(response_message->name)); - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -} // namespace interface -} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h deleted file mode 100644 index 438325fdb..000000000 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * This class is created for sending/receiving data over an ICN network. - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include - -namespace transport { - -namespace interface { - -enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD }; - -enum class Action : uint8_t { - SUBSCRIBE, - CANCEL_SUBSCRIPTION, - SIGNAL_PRODUCTION, -}; - -enum class ReturnCode : uint8_t { - OK, - FAILED, -}; - -struct MessageHeader { - MessageType msg_type; - uint8_t reserved[2]; -}; - -struct ActionMessage { - MessageHeader header; - Action action; - uint64_t name[2]; -}; - -struct ResponseMessage { - MessageHeader header; - ReturnCode return_code; -}; - -struct SubscriptionResponseMessage { - ResponseMessage response; - uint64_t name[2]; -}; - -struct PayloadMessage { - MessageHeader header; - uint8_t reserved[1]; -}; - -// struct NotificationMessage { -// Action action; -// uint8_t reserved[3]; -// uint64_t -// } - -using core::Prefix; - -class AsyncFullDuplexSocket : public AsyncSocket, - public AsyncReader, - public AsyncWriter, - public AsyncAcceptor { - private: - struct Counters { - uint64_t app_bytes_written_; - uint64_t app_bytes_read_; - - TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) { - app_bytes_written_ += bytes; - } - - TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) { - app_bytes_read_ += bytes; - } - }; - - public: - using UniquePtr = std::unique_ptr; - using SharedPtr = std::unique_ptr; - - AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); - AsyncFullDuplexSocket(const core::Prefix &locator); - - ~AsyncFullDuplexSocket(){}; - - using ReadCallback = AsyncReader::ReadCallback; - using WriteCallback = AsyncWriter::WriteCallback; - - TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override { - read_callback_ = callback; - } - - TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override { - return read_callback_; - } - - TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override { - write_callback_ = callback; - } - - TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override { - return write_callback_; - } - - TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; } - - void connect(ConnectCallback *callback, const core::Prefix &prefix) override; - - void write(WriteCallback *callback, const void *buf, size_t bytes, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) override; - - virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) override; - - void waitForSubscribers(AcceptCallback *cb) override; - - void close() override; - - void closeNow() override; - - void shutdownWrite() override; - - void shutdownWriteNow() override; - - bool good() const override; - - bool readable() const override; - - bool writable() const override; - - bool isPending() const override; - - bool connected() const override; - - bool error() const override; - - void setSendTimeout(uint32_t milliseconds) override; - - size_t getAppBytesWritten() const override; - size_t getRawBytesWritten() const override; - size_t getAppBytesReceived() const override; - size_t getRawBytesReceived() const override; - - uint32_t getSendTimeout() const override; - - private: - std::shared_ptr decodeSynchronizationMessage( - const core::Interest &interest); - - class OnConnectCallback : public BasePortal::ConsumerCallback { - public: - OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; - virtual ~OnConnectCallback() = default; - void onContentObject(core::Interest::Ptr &&, - core::ContentObject::Ptr &&content_object) override; - void onTimeout(core::Interest::Ptr &&interest) override; - - private: - AsyncFullDuplexSocket &socket_; - }; - - class OnSignalCallback : public BasePortal::ConsumerCallback { - public: - OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; - virtual ~OnSignalCallback() = default; - void onContentObject(core::Interest::Ptr &&, - core::ContentObject::Ptr &&content_object); - void onTimeout(core::Interest::Ptr &&interest); - - private: - AsyncFullDuplexSocket &socket_; - }; - - void onControlInterest(ProducerSocket &s, const core::Interest &i); - void onContentProduced(ProducerSocket &producer, const std::error_code &ec, - uint64_t bytes_written); - void onContentRetrieved(ConsumerSocket &s, std::size_t size, - const std::error_code &ec); - - void signalProductionToSubscribers(const core::Name &name); - void piggybackPayloadToSubscribers(const core::Name &name, - const uint8_t *buffer, std::size_t bytes); - - std::shared_ptr createAck(); - std::shared_ptr createSubscriptionResponse( - const core::Name &name); - - core::Prefix locator_; - uint32_t incremental_suffix_; - core::Name sync_notification_; - // std::unique_ptr portal_; - asio::io_service internal_io_service_; - asio::io_service &io_service_; - asio::io_service::work work_; - - // These names represent the "locator" of a certain - // peer that subscribed to this. - std::unordered_set subscribers_; - - // Useful for publishing / Retrieving data - std::unique_ptr producer_; - std::unique_ptr consumer_; - - ReadCallback *read_callback_; - WriteCallback *write_callback_; - ConnectCallback *connect_callback_; - AcceptCallback *accept_callback_; - - std::unique_ptr internal_connect_callback_; - std::unique_ptr internal_signal_callback_; - - uint32_t send_timeout_milliseconds_; - struct Counters counters_; - ContentBuffer receive_buffer_; -}; - -} // namespace interface -} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h index 7d50d0fbd..90f6a3ef6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket.h +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -16,18 +16,10 @@ #pragma once #include -#include #include -#include -#include -#include -#include +#include #include #include -#include -#include -#include -#include #define SOCKET_OPTION_GET 0 #define SOCKET_OPTION_NOT_GET 1 @@ -39,26 +31,14 @@ namespace transport { -namespace protocol { -class IcnObserver; -class TransportStatistics; -} // namespace protocol - namespace interface { +// Forward Declarations template class Socket; -class ConsumerSocket; -class ProducerSocket; - -// using Interest = core::Interest; -// using ContentObject = core::ContentObject; -// using Name = core::Name; -// using HashAlgorithm = core::HashAlgorithm; -// using CryptoSuite = utils::CryptoSuite; -// using Identity = utils::Identity; -// using Verifier = utils::Verifier; +// Define the portal and its connector, depending on the compilation options +// passed by the build tool. using HicnForwarderPortal = core::HicnForwarderPortal; #ifdef __linux__ @@ -76,40 +56,6 @@ using BaseSocket = Socket; using BasePortal = HicnForwarderPortal; #endif -using PayloadType = core::PayloadType; -using Prefix = core::Prefix; -using Array = utils::Array; -using ContentBuffer = std::shared_ptr>; - -using ConsumerInterestCallback = - std::function; - -using ConsumerContentCallback = - std::function; - -using ConsumerTimerCallback = std::function; - -using ProducerContentCallback = std::function; - -using ConsumerContentObjectCallback = - std::function; - -using ConsumerContentObjectVerificationCallback = - std::function; - -using ConsumerManifestCallback = - std::function; - -using ProducerContentObjectCallback = - std::function; - -using ProducerInterestCallback = - std::function; - -using namespace protocol; - template class Socket { static_assert(std::is_same::value diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index ca9722849..37d545779 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -48,7 +48,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) is_async_(false), verifier_(std::make_shared()), verify_signature_(false), - content_buffer_(nullptr), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), @@ -56,7 +55,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) on_content_object_verification_(VOID_HANDLER), on_content_object_(VOID_HANDLER), on_manifest_(VOID_HANDLER), - on_payload_retrieved_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + read_callback_(nullptr), virtual_download_(false), rtt_stats_(false), timer_interval_milliseconds_(0) { @@ -81,13 +81,11 @@ ConsumerSocket::~ConsumerSocket() { void ConsumerSocket::connect() { portal_->connect(); } -int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) { +int ConsumerSocket::consume(const Name &name) { if (transport_protocol_->isRunning()) { return CONSUMER_BUSY; } - content_buffer_ = receive_buffer; - network_name_ = name; network_name_.setSuffix(0); is_async_ = false; @@ -97,14 +95,12 @@ int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) { return CONSUMER_FINISHED; } -int ConsumerSocket::asyncConsume(const Name &name, - ContentBuffer &receive_buffer) { +int ConsumerSocket::asyncConsume(const Name &name) { if (!async_downloader_.stopped()) { - async_downloader_.add([this, receive_buffer, name]() { + async_downloader_.add([this, name]() { network_name_ = std::move(name); network_name_.setSuffix(0); is_async_ = true; - content_buffer_ = receive_buffer; transport_protocol_->start(); }); } diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index ceed53954..40344af5d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -19,6 +19,11 @@ #include #include #include +#include + +extern "C" { +#include +} #define CONSUMER_FINISHED 0 #define CONSUMER_BUSY 1 @@ -28,28 +33,210 @@ namespace transport { namespace interface { +using namespace core; +using namespace protocol; + +/** + * @brief Main interface for consumer applications. + * + * The consumer socket is the main interface for a consumer application. + * It allows to retrieve an application data from one/many producers, by + * hiding all the complexity of the transport protocol used underneath. + */ class ConsumerSocket : public BaseSocket { public: + /** + * The ReadCallback is a class which can be used by the transport for both + * querying the application needs and notifying events. + * + * Beware that the methods of this class will be called synchronously while + * the transport is working, so the operations the application is performing + * on the data retrieved should be executed in another thread in an + * asynchronous manner. Blocking one of these callbacks means blocking the + * transport. + */ + class ReadCallback { + public: + virtual ~ReadCallback() = default; + + /** + * This API will specify to the transport whether the buffer should be + * allocated by the application (and then the retrieved content will be + * copied there) or the transport should allocate the buffer and "move" it + * to the application. In other words, if isBufferMovable return true, the + * transport will transfer the ownership of the read buffer to the + * application, without performing an additional copy, while if it returns + * false the transport will use the getReadBuffer API. + * + * By default this method returns true. + * + */ + virtual bool isBufferMovable() noexcept { return true; } + + /** + * This method will be called by the transport when the content is + * available. The application can then allocate its own buffer and provide + * the address to the transport, which will use it for writing the data. + * Note that if the application won't allocate enough memory this method + * will be called several times, until the internal read buffer will be + * emptied. For ensuring this method will be called once, applications + * should allocate at least maxBufferSize() bytes. + * + * @param application_buffer - Pointer to the application's buffer. + * @param max_length - The length of the application buffer. + */ + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) = 0; + + /** + * This method will be called by the transport after calling getReadBuffer, + * in order to notify the application that length bytes are available in the + * buffer. The max_length size of the buffer could be larger than the actual + * amount of bytes written. + * + * @param length - The number of bytes placed in the buffer. + */ + virtual void readDataAvailable(size_t length) noexcept = 0; + + /** + * This method will be called by the transport for understanding how many + * bytes it should read (at most) before notifying the application. + * + * By default it reads 64 KB. + */ + virtual size_t maxBufferSize() const { return 64 * 1024; } + + /** + * This method will be called by the transport iff (isBufferMovable == + * true). The unique_ptr underlines the fact that the ownership of the + * buffer is being transferred to the application. + * + * @param buffer - The buffer + */ + virtual void readBufferAvailable( + std::unique_ptr &&buffer) noexcept {} + + /** + * readError() will be invoked if an error occurs reading from the + * transport. + * + * @param ec - An error code describing the error. + */ + virtual void readError(const std::error_code ec) noexcept = 0; + + /** + * This callback will be invoked when the whole content is retrieved. The + * transport itself knows when a content is retrieved (since it is not an + * opaque bytestream like TCP), and the transport itself is able to tell + * the application when the transfer is done. + */ + virtual void readSuccess(std::size_t total_size) noexcept = 0; + }; + + /** + * @brief Create a new consumer socket. + * + * @param protocol - The transport protocol to use. So far the following + * transport are supported: + * - CBR: Constant bitrate + * - Raaqm: Based on paper: Optimal multipath congestion control and request + * forwarding in information-centric networks: Protocol design and + * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks + * 110, 104-117 + * - RTC: Real time communication + */ explicit ConsumerSocket(int protocol); explicit ConsumerSocket(int protocol, asio::io_service &io_service); + /** + * @brief Destroy the consumer socket. + */ ~ConsumerSocket(); + /** + * @brief Connect the consumer socket to the underlying hICN forwarder. + */ void connect() override; - int consume(const Name &name, ContentBuffer &receive_buffer); - - int asyncConsume(const Name &name, ContentBuffer &receive_buffer); - + /** + * Retrieve a content using the protocol specified in the constructor. + * This function blocks until the whole content is downloaded. + * For monitoring the status of the download, the application MUST set the + * ConsumerRead callback. This callback will be called periodically (depending + * on the needs of the application), allowing the application to save the + * retrieved data. + * + * @param name - The name of the content to retrieve. + * + * @return CONSUMER_BUSY if a pending download exists + * @return CONSUMER_FINISHED when the download finishes + * + * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the + * content retrieval succeeded. This information can be obtained from the + * error code in CONTENT_RETRIEVED callback. + */ + int consume(const Name &name); + int asyncConsume(const Name &name); + + /** + * Send an interest asynchronously in another thread, which is the same used + * for asyncConsume. + * + * @param interest - An Interest::Ptr to the interest. Notice that the + * application looses the ownership of the interest, which is transferred to + * the library itself. + * @param callback - A ConsumerCallback containing the events to be trigger in + * case of timeout or content reception. + * + */ void asyncSendInterest(Interest::Ptr &&interest, Portal::ConsumerCallback *callback); + /** + * Stops the consumer socket. If several downloads are queued (using + * asyncConsume), this call stops just the current one. + */ void stop(); + /** + * Resume the download from the same exact point it stopped. + */ void resume(); + /** + * Get the io_service which is running the transport protocol event loop. + * + * @return A reference to the internal io_service where the transport protocol + * is running. + */ asio::io_service &getIoService() override; + TRANSPORT_ALWAYS_INLINE int setSocketOption( + int socket_option_key, ReadCallback *socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + TRANSPORT_ALWAYS_INLINE int getSocketOption( + int socket_option_key, ReadCallback **socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, double socket_option_value) { switch (socket_option_key) { @@ -150,12 +337,6 @@ class ConsumerSocket : public BaseSocket { break; } - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - if (socket_option_value == VOID_HANDLER) { - on_payload_retrieved_ = VOID_HANDLER; - break; - } - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: if (socket_option_value > 0) { rate_estimation_batching_parameter_ = socket_option_value; @@ -274,20 +455,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_SET; } - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerContentCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - on_payload_retrieved_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, ConsumerManifestCallback socket_option_value) { switch (socket_option_key) { @@ -331,21 +498,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_SET; } - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - const std::shared_ptr> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - content_buffer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, const std::string &socket_option_value) { switch (socket_option_key) { @@ -568,18 +720,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_GET; } - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - *socket_option_value = &on_payload_retrieved_; - return SOCKET_OPTION_GET; - - default: - return SOCKET_OPTION_NOT_GET; - } - } - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, ConsumerManifestCallback **socket_option_value) { switch (socket_option_key) { @@ -635,20 +775,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_GET; } - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - socket_option_value = content_buffer_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, std::string &socket_option_value) { switch (socket_option_key) { @@ -718,8 +844,6 @@ class ConsumerSocket : public BaseSocket { PARCKeyId *key_id_; bool verify_signature_; - ContentBuffer content_buffer_; - ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; ConsumerInterestCallback on_interest_timeout_; @@ -730,11 +854,10 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; - - ConsumerContentCallback on_payload_retrieved_; - ConsumerTimerCallback stats_summary_; + ReadCallback *read_callback_; + // Virtual download for traffic generator bool virtual_download_; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index bcf049310..c21108186 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -78,7 +78,7 @@ typedef enum { CONTENT_OBJECT_INPUT = 411, MANIFEST_INPUT = 412, CONTENT_OBJECT_TO_VERIFY = 413, - CONTENT_RETRIEVED = 414, + READ_CALLBACK = 414, STATS_SUMMARY = 415 } ConsumerCallbacksOptions; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 3f9d4959d..c4cf95895 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -14,6 +14,7 @@ */ #include +#include #include @@ -336,16 +337,6 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } } -void ProducerSocket::asyncProduce(const Name &suffix, - ContentBuffer &&output_buffer) { - if (!async_thread_.stopped()) { - async_thread_.add( - [this, suff = suffix, buffer = std::move(output_buffer)]() { - produce(suff, &(*buffer)[0], buffer->size(), true); - }); - } -} - void ProducerSocket::onInterest(Interest &interest) { if (on_interest_input_ != VOID_HANDLER) { on_interest_input_(*this, interest); diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 6ba5671cc..200c32a95 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -30,6 +30,10 @@ #define REGISTRATION_FAILURE 2 #define REGISTRATION_IN_PROGRESS 3 +namespace utils { +class Identity; +} + namespace transport { namespace interface { @@ -59,7 +63,7 @@ class ProducerSocket : public Socket, void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size); - void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer); + void asyncProduce(const Name &suffix); void asyncProduce(ContentObject &content_object); -- cgit 1.2.3-korg