From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- apps/http-proxy/src/ATSConnector.cc | 2 +- .../src/hicn/transport/http/CMakeLists.txt | 1 - libtransport/src/hicn/transport/http/callbacks.h | 46 -- .../src/hicn/transport/http/client_connection.cc | 127 ++-- .../src/hicn/transport/http/client_connection.h | 65 ++- .../src/hicn/transport/http/server_acceptor.h | 4 +- .../src/hicn/transport/http/server_publisher.h | 2 - .../src/hicn/transport/interfaces/CMakeLists.txt | 4 +- .../hicn/transport/interfaces/async_transport.h | 641 --------------------- .../src/hicn/transport/interfaces/callbacks.h | 110 ++++ .../transport/interfaces/full_duplex_socket.cc | 490 ---------------- .../hicn/transport/interfaces/full_duplex_socket.h | 243 -------- .../src/hicn/transport/interfaces/socket.h | 62 +- .../hicn/transport/interfaces/socket_consumer.cc | 14 +- .../hicn/transport/interfaces/socket_consumer.h | 263 ++++++--- .../transport/interfaces/socket_options_keys.h | 2 +- .../hicn/transport/interfaces/socket_producer.cc | 11 +- .../hicn/transport/interfaces/socket_producer.h | 6 +- libtransport/src/hicn/transport/protocols/raaqm.cc | 62 +- .../src/hicn/transport/protocols/reassembly.cc | 78 ++- .../src/hicn/transport/protocols/reassembly.h | 8 + libtransport/src/hicn/transport/protocols/rtc.cc | 204 ++++--- utils/src/hiperf.cc | 211 +++++-- utils/src/ping_server.cc | 115 ++-- 24 files changed, 895 insertions(+), 1876 deletions(-) delete mode 100644 libtransport/src/hicn/transport/http/callbacks.h delete mode 100644 libtransport/src/hicn/transport/interfaces/async_transport.h create mode 100644 libtransport/src/hicn/transport/interfaces/callbacks.h delete mode 100644 libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc delete mode 100644 libtransport/src/hicn/transport/interfaces/full_duplex_socket.h diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc index cc41ca219..287907347 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/ATSConnector.cc @@ -61,7 +61,7 @@ void ATSConnector::send(utils::MemBuf *buffer, doWrite(); } } else { - TRANSPORT_LOGD(" Tell the handle connect it has data to write"); + TRANSPORT_LOGD("Tell the handle connect it has data to write"); data_available_ = true; } }); diff --git a/libtransport/src/hicn/transport/http/CMakeLists.txt b/libtransport/src/hicn/transport/http/CMakeLists.txt index ddcf1fdc3..b24c80195 100644 --- a/libtransport/src/hicn/transport/http/CMakeLists.txt +++ b/libtransport/src/hicn/transport/http/CMakeLists.txt @@ -29,7 +29,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/response.h ${CMAKE_CURRENT_SOURCE_DIR}/message.h - ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/http/callbacks.h b/libtransport/src/hicn/transport/http/callbacks.h deleted file mode 100644 index a0f3d5999..000000000 --- a/libtransport/src/hicn/transport/http/callbacks.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include - -namespace transport { - -namespace http { - -enum class RC : uint8_t { - SUCCESS, - CONTENT_PUBLISHED, - ERR_UNDEFINED, -}; - -using OnHttpRequest = - std::function&, const uint8_t*, - std::size_t, int request_id)>; -using DeadlineTimerCallback = std::function; -using ReceiveCallback = std::function&)>; -using OnPayloadCallback = - std::function; -using ContentSentCallback = - std::function; - -} // namespace http - -} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index b31d89b6b..fb7dbdfac 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -27,6 +27,8 @@ using namespace transport; HTTPClientConnection::HTTPClientConnection() : consumer_(TransportProtocolAlgorithms::RAAQM, io_service_), + read_bytes_callback_(nullptr), + read_buffer_(nullptr), response_(std::make_shared()), timer_(nullptr) { consumer_.setSocketOption( @@ -35,11 +37,7 @@ HTTPClientConnection::HTTPClientConnection() &HTTPClientConnection::verifyData, this, std::placeholders::_1, std::placeholders::_2)); - consumer_.setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HTTPClientConnection::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); consumer_.connect(); std::shared_ptr portal; @@ -47,36 +45,48 @@ HTTPClientConnection::HTTPClientConnection() timer_ = std::make_unique(portal->getIoService()); } -HTTPClientConnection &HTTPClientConnection::get( +HTTPClientConnection::RC HTTPClientConnection::get( const std::string &url, HTTPHeaders headers, HTTPPayload payload, - std::shared_ptr response) { - return sendRequest(url, HTTPMethod::GET, headers, payload, response); + std::shared_ptr response, ReadBytesCallback *callback) { + return sendRequest(url, HTTPMethod::GET, headers, payload, response, + callback); } -HTTPClientConnection &HTTPClientConnection::sendRequest( +HTTPClientConnection::RC HTTPClientConnection::sendRequest( const std::string &url, HTTPMethod method, HTTPHeaders headers, - HTTPPayload payload, std::shared_ptr response) { + HTTPPayload payload, std::shared_ptr response, + ReadBytesCallback *callback) { + current_url_ = url; + read_bytes_callback_ = callback; if (!response) { response = response_; } auto start = std::chrono::steady_clock::now(); HTTPRequest request(method, url, headers, payload); - std::string name = sendRequestGetReply(request, response); - auto end = std::chrono::steady_clock::now(); - - TRANSPORT_LOGI( - "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", - method_map[method].c_str(), url.c_str(), name.c_str(), - (unsigned long long)std::chrono::duration_cast( - end - start) - .count(), - response->size()); + response->clear(); - return *this; + success_callback_ = [this, method = std::move(method), url = std::move(url), + start = std::move(start), + response = std::move(response)]( + std::size_t size) -> std::shared_ptr { + auto end = std::chrono::steady_clock::now(); + TRANSPORT_LOGI( + "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", + method_map[method].c_str(), url.c_str(), name_.str().c_str(), + (unsigned long long) + std::chrono::duration_cast(end - start) + .count(), + size); + + return response; + }; + + sendRequestGetReply(request, response); + return return_code_; } -std::string HTTPClientConnection::sendRequestGetReply( +void HTTPClientConnection::sendRequestGetReply( const HTTPRequest &request, std::shared_ptr &response) { const std::string &request_string = request.getRequestString(); const std::string &locator = request.getLocator(); @@ -94,38 +104,28 @@ std::string HTTPClientConnection::sendRequestGetReply( &HTTPClientConnection::processLeavingInterest, this, std::placeholders::_1, std::placeholders::_2, request_string)); - // Send content to producer piggybacking it through first interest (to fix) - - response->clear(); - // Factor hicn name using hash + name_.str(""); - std::stringstream stream; - - stream << std::hex << http::default_values::ipv6_first_word << ":"; + name_ << std::hex << http::default_values::ipv6_first_word << ":"; for (uint16_t *word = (uint16_t *)&locator_hash; std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } for (uint16_t *word = (uint16_t *)&request_hash; std::size_t(word) < (std::size_t(&request_hash) + sizeof(request_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } - stream << "|0"; - - ContentBuffer response_ptr = - std::static_pointer_cast>(response); + name_ << "|0"; - consumer_.consume(Name(stream.str()), response_ptr); + consumer_.consume(Name(name_.str())); consumer_.stop(); - - return stream.str(); } HTTPResponse HTTPClientConnection::response() { @@ -133,14 +133,6 @@ HTTPResponse HTTPClientConnection::response() { return std::move(*response_); } -void HTTPClientConnection::processPayload(ConsumerSocket &c, - std::size_t bytes_transferred, - const std::error_code &ec) { - if (ec) { - TRANSPORT_LOGE("Download failed!!"); - } -} - bool HTTPClientConnection::verifyData( ConsumerSocket &c, const core::ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { @@ -192,6 +184,49 @@ HTTPClientConnection &HTTPClientConnection::setCertificate( return *this; } +// Read buffer management +void HTTPClientConnection::readBufferAvailable( + std::unique_ptr &&buffer) noexcept { + if (!read_bytes_callback_) { + if (!read_buffer_) { + read_buffer_ = std::move(buffer); + } else { + read_buffer_->prependChain(std::move(buffer)); + } + } else { + read_bytes_callback_->onBytesReceived(std::move(buffer)); + } +} + +// Read buffer management +void HTTPClientConnection::readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error %s during download of %s", ec.message().c_str(), + current_url_.c_str()); + if (read_bytes_callback_) { + read_bytes_callback_->onError(ec); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_FAILED; +} + +void HTTPClientConnection::readSuccess(std::size_t total_size) noexcept { + auto response = success_callback_(total_size); + if (read_bytes_callback_) { + read_bytes_callback_->onSuccess(total_size); + } else { + response->reserve(total_size); + const utils::MemBuf *head = read_buffer_.get(), *current = head; + do { + response->insert(response->end(), current->data(), current->tail()); + current = current->next(); + } while (current != head); + + read_buffer_.reset(); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_SUCCESS; +} + } // namespace http } // namespace transport diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h index faad1864c..6c150f848 100644 --- a/libtransport/src/hicn/transport/http/client_connection.h +++ b/libtransport/src/hicn/transport/http/client_connection.h @@ -31,18 +31,30 @@ namespace http { using namespace interface; using namespace core; -class HTTPClientConnection { +class HTTPClientConnection : public ConsumerSocket::ReadCallback { + static constexpr uint32_t max_buffer_capacity = 64 * 1024; + public: + class ReadBytesCallback { + public: + virtual void onBytesReceived(std::unique_ptr &&buffer) = 0; + virtual void onSuccess(std::size_t bytes) = 0; + virtual void onError(const std::error_code ec) = 0; + }; + + enum class RC : uint32_t { DOWNLOAD_FAILED, DOWNLOAD_SUCCESS }; + HTTPClientConnection(); - HTTPClientConnection &get(const std::string &url, HTTPHeaders headers = {}, - HTTPPayload payload = {}, - std::shared_ptr response = nullptr); + RC get(const std::string &url, HTTPHeaders headers = {}, + HTTPPayload payload = {}, + std::shared_ptr response = nullptr, + ReadBytesCallback *callback = nullptr); - HTTPClientConnection &sendRequest( - const std::string &url, HTTPMethod method, HTTPHeaders headers = {}, - HTTPPayload payload = {}, - std::shared_ptr response = nullptr); + RC sendRequest(const std::string &url, HTTPMethod method, + HTTPHeaders headers = {}, HTTPPayload payload = {}, + std::shared_ptr response = nullptr, + ReadBytesCallback *callback = nullptr); HTTPResponse response(); @@ -55,11 +67,8 @@ class HTTPClientConnection { HTTPClientConnection &setCertificate(const std::string &cert_path); private: - void processPayload(interface::ConsumerSocket &c, - std::size_t bytes_transferred, const std::error_code &ec); - - std::string sendRequestGetReply(const HTTPRequest &request, - std::shared_ptr &response); + void sendRequestGetReply(const HTTPRequest &request, + std::shared_ptr &response); bool verifyData(interface::ConsumerSocket &c, const core::ContentObject &contentObject); @@ -68,12 +77,42 @@ class HTTPClientConnection { const core::Interest &interest, std::string &payload); + // Read callback + bool isBufferMovable() noexcept override { return true; } + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override {} + void readDataAvailable(size_t length) noexcept override {} + size_t maxBufferSize() const override { return max_buffer_capacity; } + void readBufferAvailable( + std::unique_ptr &&buffer) noexcept override; + void readError(const std::error_code ec) noexcept override; + void readSuccess(std::size_t total_size) noexcept override; + asio::io_service io_service_; + // The consumer socket ConsumerSocket consumer_; + // The current url provided by the application + std::string current_url_; + // The current hICN name used for downloading + std::stringstream name_; + // Function to be called when the read is successful + std::function(std::size_t)> success_callback_; + // Return code for current download + RC return_code_; + + // Application provided callback for saving the received content during + // the download. If this callback is used, the HTTPClient will NOT save + // any byte internally. + ReadBytesCallback *read_bytes_callback_; + + // Internal read buffer and HTTP response, to be used if the application does + // not provide any read_bytes_callback + std::unique_ptr read_buffer_; std::shared_ptr response_; + // Timer std::unique_ptr timer_; }; diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h index 99480028a..4e7350b76 100644 --- a/libtransport/src/hicn/transport/http/server_acceptor.h +++ b/libtransport/src/hicn/transport/http/server_acceptor.h @@ -15,7 +15,6 @@ #pragma once -#include #include #include #include @@ -31,6 +30,9 @@ namespace http { class HTTPServerAcceptor { friend class HTTPServerPublisher; + using OnHttpRequest = + std::function &, + const uint8_t *, std::size_t, int request_id)>; public: HTTPServerAcceptor(std::string &&server_locator, OnHttpRequest callback); diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h index f3c39c3fe..1f12fd8f9 100644 --- a/libtransport/src/hicn/transport/http/server_publisher.h +++ b/libtransport/src/hicn/transport/http/server_publisher.h @@ -63,8 +63,6 @@ class HTTPServerPublisher { std::unique_ptr producer_; ProducerInterestCallback interest_enter_callback_; utils::UserCallback wait_callback_; - - ContentBuffer receive_buffer_; }; } // end namespace http diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index cbf371bac..2ff4fda56 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -19,15 +19,13 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/async_transport.h - ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.h ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h + ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h ) list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc diff --git a/libtransport/src/hicn/transport/interfaces/async_transport.h b/libtransport/src/hicn/transport/interfaces/async_transport.h deleted file mode 100644 index 692dd318c..000000000 --- a/libtransport/src/hicn/transport/interfaces/async_transport.h +++ /dev/null @@ -1,641 +0,0 @@ - -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#ifndef _WIN32 -#include -#endif - -#include - -namespace transport { - -namespace interface { - -/* - * flags given by the application for write* calls - */ -enum class WriteFlags : uint32_t { - NONE = 0x00, - /* - * Whether to delay the output until a subsequent non-corked write. - * (Note: may not be supported in all subclasses or on all platforms.) - */ - CORK = 0x01, - /* - * for a socket that has ACK latency enabled, it will cause the kernel - * to fire a TCP ESTATS event when the last byte of the given write call - * will be acknowledged. - */ - EOR = 0x02, - /* - * this indicates that only the write side of socket should be shutdown - */ - WRITE_SHUTDOWN = 0x04, - /* - * use msg zerocopy if allowed - */ - WRITE_MSG_ZEROCOPY = 0x08, -}; - -/* - * union operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator|(WriteFlags a, WriteFlags b) { - return static_cast(static_cast(a) | - static_cast(b)); -} - -/* - * compound assignment union operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags &operator|=(WriteFlags &a, WriteFlags b) { - a = a | b; - return a; -} - -/* - * intersection operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator&(WriteFlags a, WriteFlags b) { - return static_cast(static_cast(a) & - static_cast(b)); -} - -/* - * compound assignment intersection operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags &operator&=(WriteFlags &a, WriteFlags b) { - a = a & b; - return a; -} - -/* - * exclusion parameter - */ -TRANSPORT_ALWAYS_INLINE WriteFlags operator~(WriteFlags a) { - return static_cast(~static_cast(a)); -} - -/* - * unset operator - */ -TRANSPORT_ALWAYS_INLINE WriteFlags unSet(WriteFlags a, WriteFlags b) { - return a & ~b; -} - -/* - * inclusion operator - */ -TRANSPORT_ALWAYS_INLINE bool isSet(WriteFlags a, WriteFlags b) { - return (a & b) == b; -} - -class ConnectCallback { - public: - virtual ~ConnectCallback() = default; - - /** - * connectSuccess() will be invoked when the connection has been - * successfully established. - */ - virtual void connectSuccess() noexcept = 0; - - /** - * connectErr() will be invoked if the connection attempt fails. - * - * @param ex An exception describing the error that occurred. - */ - virtual void connectErr(const std::error_code ec) noexcept = 0; -}; - -/** - * AsyncSocket defines an asynchronous API for streaming I/O. - * - * This class provides an API to for asynchronously waiting for data - * on a streaming transport, and for asynchronously sending data. - * - * The APIs for reading and writing are intentionally asymmetric. Waiting for - * data to read is a persistent API: a callback is installed, and is notified - * whenever new data is available. It continues to be notified of new events - * until it is uninstalled. - * - * AsyncSocket does not provide read timeout functionality, because it - * typically cannot determine when the timeout should be active. Generally, a - * timeout should only be enabled when processing is blocked waiting on data - * from the remote endpoint. For server-side applications, the timeout should - * not be active if the server is currently processing one or more outstanding - * requests on this transport. For client-side applications, the timeout - * should not be active if there are no requests pending on the transport. - * Additionally, if a client has multiple pending requests, it will ususally - * want a separate timeout for each request, rather than a single read timeout. - * - * The write API is fairly intuitive: a user can request to send a block of - * data, and a callback will be informed once the entire block has been - * transferred to the kernel, or on error. AsyncSocket does provide a send - * timeout, since most callers want to give up if the remote end stops - * responding and no further progress can be made sending the data. - */ -class AsyncSocket { - public: - /** - * Close the transport. - * - * This gracefully closes the transport, waiting for all pending write - * requests to complete before actually closing the underlying transport. - * - * If a read callback is set, readEOF() will be called immediately. If there - * are outstanding write requests, the close will be delayed until all - * remaining writes have completed. No new writes may be started after - * close() has been called. - */ - virtual void close() = 0; - - /** - * Close the transport immediately. - * - * This closes the transport immediately, dropping any outstanding data - * waiting to be written. - * - * If a read callback is set, readEOF() will be called immediately. - * If there are outstanding write requests, these requests will be aborted - * and writeError() will be invoked immediately on all outstanding write - * callbacks. - */ - virtual void closeNow() = 0; - - /** - * Perform a half-shutdown of the write side of the transport. - * - * The caller should not make any more calls to write() or writev() after - * shutdownWrite() is called. Any future write attempts will fail - * immediately. - * - * Not all transport types support half-shutdown. If the underlying - * transport does not support half-shutdown, it will fully shutdown both the - * read and write sides of the transport. (Fully shutting down the socket is - * better than doing nothing at all, since the caller may rely on the - * shutdownWrite() call to notify the other end of the connection that no - * more data can be read.) - * - * If there is pending data still waiting to be written on the transport, - * the actual shutdown will be delayed until the pending data has been - * written. - * - * Note: There is no corresponding shutdownRead() equivalent. Simply - * uninstall the read callback if you wish to stop reading. (On TCP sockets - * at least, shutting down the read side of the socket is a no-op anyway.) - */ - virtual void shutdownWrite() = 0; - - /** - * Perform a half-shutdown of the write side of the transport. - * - * shutdownWriteNow() is identical to shutdownWrite(), except that it - * immediately performs the shutdown, rather than waiting for pending writes - * to complete. Any pending write requests will be immediately failed when - * shutdownWriteNow() is called. - */ - virtual void shutdownWriteNow() = 0; - - /** - * Determine if transport is open and ready to read or write. - * - * Note that this function returns false on EOF; you must also call error() - * to distinguish between an EOF and an error. - * - * @return true iff the transport is open and ready, false otherwise. - */ - virtual bool good() const = 0; - - /** - * Determine if the transport is readable or not. - * - * @return true iff the transport is readable, false otherwise. - */ - virtual bool readable() const = 0; - - /** - * Determine if the transport is writable or not. - * - * @return true iff the transport is writable, false otherwise. - */ - virtual bool writable() const { - // By default return good() - leave it to implementers to override. - return good(); - } - - /** - * Determine if the there is pending data on the transport. - * - * @return true iff the if the there is pending data, false otherwise. - */ - virtual bool isPending() const { return readable(); } - - /** - * Determine if transport is connected to the endpoint - * - * @return false iff the transport is connected, otherwise true - */ - virtual bool connected() const = 0; - - /** - * Determine if an error has occurred with this transport. - * - * @return true iff an error has occurred (not EOF). - */ - virtual bool error() const = 0; - - // /** - // * Attach the transport to a EventBase. - // * - // * This may only be called if the transport is not currently attached to a - // * EventBase (by an earlier call to detachEventBase()). - // * - // * This method must be invoked in the EventBase's thread. - // */ - // virtual void attachEventBase(EventBase* eventBase) = 0; - - // /** - // * Detach the transport from its EventBase. - // * - // * This may only be called when the transport is idle and has no reads or - // * writes pending. Once detached, the transport may not be used again - // until - // * it is re-attached to a EventBase by calling attachEventBase(). - // * - // * This method must be called from the current EventBase's thread. - // */ - // virtual void detachEventBase() = 0; - - // /** - // * Determine if the transport can be detached. - // * - // * This method must be called from the current EventBase's thread. - // */ - // virtual bool isDetachable() const = 0; - - /** - * Set the send timeout. - * - * If write requests do not make any progress for more than the specified - * number of milliseconds, fail all pending writes and close the transport. - * - * If write requests are currently pending when setSendTimeout() is called, - * the timeout interval is immediately restarted using the new value. - * - * @param milliseconds The timeout duration, in milliseconds. If 0, no - * timeout will be used. - */ - virtual void setSendTimeout(uint32_t milliseconds) = 0; - - /** - * Get the send timeout. - * - * @return Returns the current send timeout, in milliseconds. A return value - * of 0 indicates that no timeout is set. - */ - virtual uint32_t getSendTimeout() const = 0; - - virtual void connect(ConnectCallback *callback, - const core::Prefix &prefix_) = 0; - - // /** - // * Get the address of the local endpoint of this transport. - // * - // * This function may throw AsyncSocketException on error. - // * - // * @param address The local address will be stored in the specified - // * SocketAddress. - // */ - // virtual void getLocalAddress(* address) const = 0; - - virtual size_t getAppBytesWritten() const = 0; - virtual size_t getRawBytesWritten() const = 0; - virtual size_t getAppBytesReceived() const = 0; - virtual size_t getRawBytesReceived() const = 0; - - class BufferCallback { - public: - virtual ~BufferCallback() {} - virtual void onEgressBuffered() = 0; - virtual void onEgressBufferCleared() = 0; - }; - - ~AsyncSocket() = default; -}; - -class AsyncAcceptor { - public: - class AcceptCallback { - public: - virtual ~AcceptCallback() = default; - - /** - * connectionAccepted() is called whenever a new client connection is - * received. - * - * The AcceptCallback will remain installed after connectionAccepted() - * returns. - * - * @param fd The newly accepted client socket. The AcceptCallback - * assumes ownership of this socket, and is responsible - * for closing it when done. The newly accepted file - * descriptor will have already been put into - * non-blocking mode. - * @param clientAddr A reference to a SocketAddress struct containing the - * client's address. This struct is only guaranteed to - * remain valid until connectionAccepted() returns. - */ - virtual void connectionAccepted( - const core::Name &subscriber_name) noexcept = 0; - - /** - * acceptError() is called if an error occurs while accepting. - * - * The AcceptCallback will remain installed even after an accept error, - * as the errors are typically somewhat transient, such as being out of - * file descriptors. The server socket must be explicitly stopped if you - * wish to stop accepting after an error. - * - * @param ex An exception representing the error. - */ - virtual void acceptError(const std::exception &ex) noexcept = 0; - - /** - * acceptStarted() will be called in the callback's EventBase thread - * after this callback has been added to the AsyncServerSocket. - * - * acceptStarted() will be called before any calls to connectionAccepted() - * or acceptError() are made on this callback. - * - * acceptStarted() makes it easier for callbacks to perform initialization - * inside the callback thread. (The call to addAcceptCallback() must - * always be made from the AsyncServerSocket's primary EventBase thread. - * acceptStarted() provides a hook that will always be invoked in the - * callback's thread.) - * - * Note that the call to acceptStarted() is made once the callback is - * added, regardless of whether or not the AsyncServerSocket is actually - * accepting at the moment. acceptStarted() will be called even if the - * AsyncServerSocket is paused when the callback is added (including if - * the initial call to startAccepting() on the AsyncServerSocket has not - * been made yet). - */ - virtual void acceptStarted() noexcept {} - - /** - * acceptStopped() will be called when this AcceptCallback is removed from - * the AsyncServerSocket, or when the AsyncServerSocket is destroyed, - * whichever occurs first. - * - * No more calls to connectionAccepted() or acceptError() will be made - * after acceptStopped() is invoked. - */ - virtual void acceptStopped() noexcept {} - }; - - /** - * Wait for subscribers - * - */ - virtual void waitForSubscribers(AcceptCallback *cb) = 0; -}; - -class AsyncReader { - public: - class ReadCallback { - public: - virtual ~ReadCallback() = default; - - /** - * When data becomes available, getReadBuffer() will be invoked to get the - * buffer into which data should be read. - * - * This method allows the ReadCallback to delay buffer allocation until - * data becomes available. This allows applications to manage large - * numbers of idle connections, without having to maintain a separate read - * buffer for each idle connection. - * - * It is possible that in some cases, getReadBuffer() may be called - * multiple times before readDataAvailable() is invoked. In this case, the - * data will be written to the buffer returned from the most recent call to - * readDataAvailable(). If the previous calls to readDataAvailable() - * returned different buffers, the ReadCallback is responsible for ensuring - * that they are not leaked. - * - * If getReadBuffer() throws an exception, returns a nullptr buffer, or - * returns a 0 length, the ReadCallback will be uninstalled and its - * readError() method will be invoked. - * - * getReadBuffer() is not allowed to change the transport state before it - * returns. (For example, it should never uninstall the read callback, or - * set a different read callback.) - * - * @param bufReturn getReadBuffer() should update *bufReturn to contain the - * address of the read buffer. This parameter will never - * be nullptr. - * @param lenReturn getReadBuffer() should update *lenReturn to contain the - * maximum number of bytes that may be written to the read - * buffer. This parameter will never be nullptr. - * - * - * XXX TODO this does not seems to be completely true Checlk i/. - */ - virtual void getReadBuffer(void **bufReturn, size_t *lenReturn) = 0; - - /** - * readDataAvailable() will be invoked when data has been successfully read - * into the buffer returned by the last call to getReadBuffer(). - * - * The read callback remains installed after readDataAvailable() returns. - * It must be explicitly uninstalled to stop receiving read events. - * getReadBuffer() will be called at least once before each call to - * readDataAvailable(). getReadBuffer() will also be called before any - * call to readEOF(). - * - * @param len The number of bytes placed in the buffer. - */ - - virtual void readDataAvailable(size_t len) noexcept = 0; - - /** - * When data becomes available, isBufferMovable() will be invoked to figure - * out which API will be used, readBufferAvailable() or - * readDataAvailable(). If isBufferMovable() returns true, that means - * ReadCallback supports the IOBuf ownership transfer and - * readBufferAvailable() will be used. Otherwise, not. - - * By default, isBufferMovable() always return false. If - * readBufferAvailable() is implemented and to be invoked, You should - * overwrite isBufferMovable() and return true in the inherited class. - * - * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by - * itself until data becomes available. Compared with the pre/post buffer - * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable() - * has two advantages. First, this can avoid memcpy. E.g., in - * AsyncSSLSocket, the decrypted data was copied from the openssl internal - * buffer to the readbuf buffer. With the buffer ownership transfer, the - * internal buffer can be directly "moved" to ReadCallback. Second, the - * memory allocation can be more precise. The reason is - * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size - * because they have more context about the available data than - * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes - * buffer, but the available data is always 16KB (max OpenSSL record size). - */ - - virtual bool isBufferMovable() noexcept { return false; } - - /** - * Suggested buffer size, allocated for read operations, - * if callback is movable and supports folly::IOBuf - */ - - virtual size_t maxBufferSize() const { - return 64 * 1024; // 64K - } - - /** - * readBufferAvailable() will be invoked when data has been successfully - * read. - * - * Note that only either readBufferAvailable() or readDataAvailable() will - * be invoked according to the return value of isBufferMovable(). The timing - * and aftereffect of readBufferAvailable() are the same as - * readDataAvailable() - * - * @param readBuf The unique pointer of read buffer. - */ - - // virtual void readBufferAvailable(uint8_t** buffer, std::size_t - // *buf_length) noexcept {} - - virtual void readBufferAvailable(ContentBuffer &&buffer) noexcept {} - - // virtual void readBufferAvailable(utils::SharableBuffer&& buffer) - // noexcept {} - - /** - * readEOF() will be invoked when the transport is closed. - * - * The read callback will be automatically uninstalled immediately before - * readEOF() is invoked. - */ - virtual void readEOF() noexcept = 0; - - /** - * readError() will be invoked if an error occurs reading from the - * transport. - * - * The read callback will be automatically uninstalled immediately before - * readError() is invoked. - * - * @param ex An exception describing the error that occurred. - */ - virtual void readErr(const std::error_code ec) noexcept = 0; - }; - - // Read methods that aren't part of AsyncTransport. - virtual void setReadCB(ReadCallback *callback) = 0; - virtual ReadCallback *getReadCallback() const = 0; - - protected: - virtual ~AsyncReader() = default; -}; - -class AsyncWriter { - public: - class WriteCallback { - public: - virtual ~WriteCallback() = default; - - /** - * writeSuccess() will be invoked when all of the data has been - * successfully written. - * - * Note that this mainly signals that the buffer containing the data to - * write is no longer needed and may be freed or re-used. It does not - * guarantee that the data has been fully transmitted to the remote - * endpoint. For example, on socket-based transports, writeSuccess() only - * indicates that the data has been given to the kernel for eventual - * transmission. - */ - virtual void writeSuccess() noexcept = 0; - - /** - * writeError() will be invoked if an error occurs writing the data. - * - * @param bytesWritten The number of bytes that were successfull - * @param ex An exception describing the error that occurred. - */ - virtual void writeErr(size_t bytesWritten) noexcept = 0; - }; - - /** - * If you supply a non-null WriteCallback, exactly one of writeSuccess() - * or writeErr() will be invoked when the write completes. If you supply - * the same WriteCallback object for multiple write() calls, it will be - * invoked exactly once per call. The only way to cancel outstanding - * write requests is to close the socket (e.g., with closeNow() or - * shutdownWriteNow()). When closing the socket this way, writeErr() will - * still be invoked once for each outstanding write operation. - */ - virtual void write(WriteCallback *callback, const void *buf, size_t bytes, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) = 0; - - /** - * If you supply a non-null WriteCallback, exactly one of writeSuccess() - * or writeErr() will be invoked when the write completes. If you supply - * the same WriteCallback object for multiple write() calls, it will be - * invoked exactly once per call. The only way to cancel outstanding - * write requests is to close the socket (e.g., with closeNow() or - * shutdownWriteNow()). When closing the socket this way, writeErr() will - * still be invoked once for each outstanding write operation. - */ - virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) = 0; - - // /** - // * If you supply a non-null WriteCallback, exactly one of writeSuccess() - // * or writeErr() will be invoked when the write completes. If you supply - // * the same WriteCallback object for multiple write() calls, it will be - // * invoked exactly once per call. The only way to cancel outstanding - // * write requests is to close the socket (e.g., with closeNow() or - // * shutdownWriteNow()). When closing the socket this way, writeErr() will - // * still be invoked once for each outstanding write operation. - // */ - // virtual void writeChain( - // WriteCallback* callback, - // std::unique_ptr&& buf, - // WriteFlags flags = WriteFlags::NONE) = 0; - - virtual void setWriteCB(WriteCallback *callback) = 0; - virtual WriteCallback *getWriteCallback() const = 0; - - protected: - virtual ~AsyncWriter() = default; -}; - -} // namespace interface - -} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h new file mode 100644 index 000000000..24f47eb75 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace utils { +class MemBuf; +} + +namespace transport { + +namespace protocol { + +class IcnObserver; +class TransportStatistics; + +} // namespace protocol + +namespace core { + +class ContentObject; +class Interest; +} // namespace core + +namespace interface { + +// Forward declarations +class ConsumerSocket; +class ProducerSocket; + +/** + * The ConsumerInterestCallback will be called in different parts of the + * consumer socket processing pipeline, with a ConsumerSocket and an Interest as + * parameters. + */ +using ConsumerInterestCallback = + std::function; + +/** + * The ConsumerTimerCallback is called periodically for exposing to applications + * a summary of the statistics of the transport protocol in use. + */ +using ConsumerTimerCallback = std::function; + +/** + * The ProducerContentCallback will be called by the producer socket right after + * a content has been segmented and published. + */ +using ProducerContentCallback = std::function; + +/** + * The ConsumerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ConsumerSocket and an + * ContentObject as parameters. + */ +using ConsumerContentObjectCallback = + std::function; + +/** + * The ConsumerContentObjectVerificationCallback will be called by the transport + * if an application is willing to verify each content object. Note that a + * better alternative is to instrument the transport to perform the verification + * autonomously, without requiring the intervention of the application. + */ +using ConsumerContentObjectVerificationCallback = + std::function; + +/** + * The ConsumerManifestCallback will be called by the consumer socket when a + * manifest is received. + */ +using ConsumerManifestCallback = + std::function; + +/** + * The ProducerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ProducerSocket and an + * ContentObject as parameters. + */ +using ProducerContentObjectCallback = + std::function; + +/** + * The ProducerContentObjectCallback will be called in different parts of the + * consumer socket processing pipeline, with a ProducerSocket and an + * Interest as parameters. + */ +using ProducerInterestCallback = + std::function; + +} // namespace interface + +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc deleted file mode 100644 index fdd422dee..000000000 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include - -namespace transport { - -namespace interface { - -static const std::string producer_identity = "producer_socket"; - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator) - : AsyncFullDuplexSocket(locator, internal_io_service_) {} - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator, - asio::io_service &io_service) - : locator_(locator), - incremental_suffix_(0), - io_service_(io_service), - work_(io_service), - producer_(std::make_unique(io_service_)), - consumer_(std::make_unique( - TransportProtocolAlgorithms::RAAQM /* , io_service_ */)), - read_callback_(nullptr), - write_callback_(nullptr), - connect_callback_(nullptr), - accept_callback_(nullptr), - internal_connect_callback_(new OnConnectCallback(*this)), - internal_signal_callback_(new OnSignalCallback(*this)), - send_timeout_milliseconds_(~0), - counters_({0}), - receive_buffer_(ContentBuffer()) { - using namespace transport; - using namespace std::placeholders; - producer_->registerPrefix(locator); - - producer_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2)); - - producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, - uint32_t{150000}); - - ProducerContentCallback producer_callback = - std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3); - producer_->setSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - producer_callback); - - producer_->connect(); - - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, - (ConsumerContentObjectVerificationCallback)[]( - ConsumerSocket & s, const ContentObject &c) - ->bool { return true; }); - - ConsumerContentCallback consumer_callback = - std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3); - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, - consumer_callback); - - consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, - uint32_t{4}); - - consumer_->connect(); -} - -void AsyncFullDuplexSocket::close() { - this->consumer_->stop(); - this->producer_->stop(); -} - -void AsyncFullDuplexSocket::closeNow() { close(); } - -void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); } - -void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); } - -bool AsyncFullDuplexSocket::good() const { return true; } - -bool AsyncFullDuplexSocket::readable() const { - // TODO return status of consumer socket - return true; -} - -bool AsyncFullDuplexSocket::writable() const { - // TODO return status of producer socket - return true; -} - -bool AsyncFullDuplexSocket::isPending() const { - // TODO save if there are production operation in the ops queue - // in producer socket - return true; -} - -bool AsyncFullDuplexSocket::connected() const { - // No real connection here (ICN world). Return good - return good(); -} - -bool AsyncFullDuplexSocket::error() const { return !good(); } - -void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) { - // TODO if production takes too much to complete - // let's abort the operation. - - // Normally with hicn this should be done for content - // pull, not for production. - - send_timeout_milliseconds_ = milliseconds; -} - -uint32_t AsyncFullDuplexSocket::getSendTimeout() const { - return send_timeout_milliseconds_; -} - -size_t AsyncFullDuplexSocket::getAppBytesWritten() const { - return counters_.app_bytes_written_; -} - -size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; } - -size_t AsyncFullDuplexSocket::getAppBytesReceived() const { - return counters_.app_bytes_read_; -} - -size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; } - -void AsyncFullDuplexSocket::connect(ConnectCallback *callback, - const core::Prefix &prefix) { - connect_callback_ = callback; - - // Create an interest for a subscription - auto interest = - core::Interest::Ptr(new core::Interest(prefix.makeRandomName())); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = _payload->writableData(); - ActionMessage *subscription_message = - reinterpret_cast(payload); - subscription_message->header.msg_type = MessageType::ACTION; - subscription_message->action = Action::SUBSCRIBE; - subscription_message->header.reserved[0] = 0; - subscription_message->header.reserved[1] = 0; - - // Set the name the other part should use for notifying a content production - sync_notification_ = std::move(locator_.makeRandomName()); - sync_notification_.copyToDestination( - reinterpret_cast(subscription_message->name)); - - TRANSPORT_LOGI( - "Trying to connect. Sending interest: %s, name for notifications: %s", - prefix.getName().toString().c_str(), - sync_notification_.toString().c_str()); - - interest->setLifetime(1000); - interest->appendPayload(std::move(_payload)); - consumer_->asyncSendInterest(std::move(interest), - internal_connect_callback_.get()); -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf, - size_t bytes, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << bytes << std::endl; - - if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), - reinterpret_cast(buf), bytes); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers( - options.getName(), reinterpret_cast(buf), bytes); - } -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, - ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << output_buffer->size() << std::endl; - - if (output_buffer->size() > - core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), std::move(output_buffer)); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers(options.getName(), &(*output_buffer)[0], - output_buffer->size()); - } -} - -void AsyncFullDuplexSocket::piggybackPayloadToSubscribers( - const core::Name &name, const uint8_t *buffer, std::size_t bytes) { - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage)); - _payload->append(bytes + sizeof(PayloadMessage)); - auto payload = _payload->writableData(); - - PayloadMessage *interest_payload = - reinterpret_cast(payload); - interest_payload->header.msg_type = MessageType::PAYLOAD; - interest_payload->header.reserved[0] = 0; - interest_payload->header.reserved[1] = 0; - interest_payload->reserved[0] = 0; - std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::signalProductionToSubscribers( - const core::Name &name) { - // Signal the other part we are producing a content - // Create an interest for a subscription - - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - // Todo consider using preallocated pool of membufs - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = interest->getPayload()->writableData(); - - ActionMessage *produce_notification = - reinterpret_cast(payload); - produce_notification->header.msg_type = MessageType::ACTION; - produce_notification->action = Action::SIGNAL_PRODUCTION; - produce_notification->header.reserved[0] = 0; - produce_notification->header.reserved[1] = 0; - name.copyToDestination( - reinterpret_cast(produce_notification->name)); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) { - accept_callback_ = cb; -} - -std::shared_ptr -AsyncFullDuplexSocket::decodeSynchronizationMessage( - const core::Interest &interest) { - auto mesg = interest.getPayload(); - const MessageHeader *header = - reinterpret_cast(mesg->data()); - - switch (header->msg_type) { - case MessageType::ACTION: { - // Check what is the action to perform - const ActionMessage *message = - reinterpret_cast(header); - - if (message->action == Action::SUBSCRIBE) { - // Add consumer to list on consumers to be notified - auto ret = - subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0); - TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str()); - if (ret.second) { - accept_callback_->connectionAccepted(*ret.first); - } - - TRANSPORT_LOGI("Connection success!"); - - sync_notification_ = std::move(locator_.makeRandomName()); - return createSubscriptionResponse(sync_notification_); - - } else if (message->action == Action::CANCEL_SUBSCRIPTION) { - // XXX Modify name!!! Each allocated name allocates a 128 bit array. - subscribers_.erase( - core::Name(AF_INET6, (const uint8_t *)message->name, 0)); - return createAck(); - } else if (message->action == Action::SIGNAL_PRODUCTION) { - // trigger a reverse pull for the name contained in the message - core::Name n(AF_INET6, (const uint8_t *)message->name, 0); - std::cout << "PROD NOTIFICATION: Content to retrieve: " << n - << std::endl; - std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName() - << std::endl; // << " compared to " << sync_notification_ << - // std::endl; - - if (sync_notification_.equals(interest.getName(), false)) { - std::cout << "Starting reverse pull for " << n << std::endl; - consumer_->asyncConsume(n, receive_buffer_); - return createAck(); - } - } else { - TRANSPORT_LOGE("Received unknown message. Dropping it."); - } - - break; - } - case MessageType::RESPONSE: { - throw errors::RuntimeException( - "The response should be a content object!!"); - } - case MessageType::PAYLOAD: { - // The interest contains the payload directly. - // We saved one round trip :) - - auto buffer = ContentBuffer(); - const uint8_t *data = mesg->data() + sizeof(PayloadMessage); - buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage)); - read_callback_->readBufferAvailable(std::move(buffer)); - return createAck(); - } - default: { return std::shared_ptr(nullptr); } - } - - return std::shared_ptr(nullptr); -} - -void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, - const core::Interest &i) { - auto payload = i.getPayload(); - if (payload->length()) { - // Try to decode payload and see if starting an async pull operation - auto response = decodeSynchronizationMessage(i); - if (response) { - response->setName(i.getName()); - s.produce(*response); - } - } -} - -void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer, - const std::error_code &ec, - uint64_t bytes_written) { - if (write_callback_) { - if (!ec) { - write_callback_->writeSuccess(); - } else { - write_callback_->writeErr(bytes_written); - } - } -} - -void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, - std::size_t size, - const std::error_code &ec) { - // Sanity check - if (size != receive_buffer_->size()) { - TRANSPORT_LOGE( - "Received content size differs from size retrieved from the buffer."); - return; - } - - TRANSPORT_LOGI("Received content with size %zu", size); - if (!ec) { - read_callback_->readBufferAvailable(std::move(receive_buffer_)); - } else { - TRANSPORT_LOGE("Error retrieving content."); - } - // consumer_->stop(); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - // The ack message should contain the name to be used for notifying - // the production of the content to the other part - - if (content_object->getPayload()->length() == 0) { - TRANSPORT_LOGW("Connection response message empty...."); - return; - } - - SubscriptionResponseMessage *response = - reinterpret_cast( - content_object->getPayload()->writableData()); - - if (response->response.header.msg_type == MessageType::RESPONSE) { - if (response->response.return_code == ReturnCode::OK) { - auto ret = - socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0); - TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s", - ret.first->toString().c_str()); - socket_.connect_callback_->connectSuccess(); - } - } -} - -void AsyncFullDuplexSocket::OnSignalCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - return; -} - -void AsyncFullDuplexSocket::OnSignalCallback::onTimeout( - core::Interest::Ptr &&interest) { - TRANSPORT_LOGE("Retransmitting signalization interest to %s!!", - interest->getName().toString().c_str()); - socket_.consumer_->asyncSendInterest(std::move(interest), - socket_.internal_signal_callback_.get()); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onTimeout( - core::Interest::Ptr &&interest) { - socket_.connect_callback_->connectErr( - std::make_error_code(std::errc::not_connected)); -} - -std::shared_ptr AsyncFullDuplexSocket::createAck() { - // Send the response back - core::Name name("b001::abcd"); - auto response = std::make_shared(name); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ResponseMessage)); - auto payload = response->getPayload()->data(); - ResponseMessage *response_message = (ResponseMessage *)payload; - response_message->header.msg_type = MessageType::RESPONSE; - response_message->header.reserved[0] = 0; - response_message->header.reserved[1] = 0; - response_message->return_code = ReturnCode::OK; - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -std::shared_ptr -AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) { - // Send the response back - core::Name tmp_name("b001::abcd"); - auto response = std::make_shared(tmp_name); - auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage)); - _payload->append(sizeof(SubscriptionResponseMessage)); - auto payload = _payload->data(); - SubscriptionResponseMessage *response_message = - (SubscriptionResponseMessage *)payload; - response_message->response.header.msg_type = MessageType::RESPONSE; - response_message->response.header.reserved[0] = 0; - response_message->response.header.reserved[1] = 0; - response_message->response.return_code = ReturnCode::OK; - name.copyToDestination(reinterpret_cast(response_message->name)); - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -} // namespace interface -} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h deleted file mode 100644 index 438325fdb..000000000 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * This class is created for sending/receiving data over an ICN network. - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include - -namespace transport { - -namespace interface { - -enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD }; - -enum class Action : uint8_t { - SUBSCRIBE, - CANCEL_SUBSCRIPTION, - SIGNAL_PRODUCTION, -}; - -enum class ReturnCode : uint8_t { - OK, - FAILED, -}; - -struct MessageHeader { - MessageType msg_type; - uint8_t reserved[2]; -}; - -struct ActionMessage { - MessageHeader header; - Action action; - uint64_t name[2]; -}; - -struct ResponseMessage { - MessageHeader header; - ReturnCode return_code; -}; - -struct SubscriptionResponseMessage { - ResponseMessage response; - uint64_t name[2]; -}; - -struct PayloadMessage { - MessageHeader header; - uint8_t reserved[1]; -}; - -// struct NotificationMessage { -// Action action; -// uint8_t reserved[3]; -// uint64_t -// } - -using core::Prefix; - -class AsyncFullDuplexSocket : public AsyncSocket, - public AsyncReader, - public AsyncWriter, - public AsyncAcceptor { - private: - struct Counters { - uint64_t app_bytes_written_; - uint64_t app_bytes_read_; - - TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) { - app_bytes_written_ += bytes; - } - - TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) { - app_bytes_read_ += bytes; - } - }; - - public: - using UniquePtr = std::unique_ptr; - using SharedPtr = std::unique_ptr; - - AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); - AsyncFullDuplexSocket(const core::Prefix &locator); - - ~AsyncFullDuplexSocket(){}; - - using ReadCallback = AsyncReader::ReadCallback; - using WriteCallback = AsyncWriter::WriteCallback; - - TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override { - read_callback_ = callback; - } - - TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override { - return read_callback_; - } - - TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override { - write_callback_ = callback; - } - - TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override { - return write_callback_; - } - - TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; } - - void connect(ConnectCallback *callback, const core::Prefix &prefix) override; - - void write(WriteCallback *callback, const void *buf, size_t bytes, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) override; - - virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags = WriteFlags::NONE) override; - - void waitForSubscribers(AcceptCallback *cb) override; - - void close() override; - - void closeNow() override; - - void shutdownWrite() override; - - void shutdownWriteNow() override; - - bool good() const override; - - bool readable() const override; - - bool writable() const override; - - bool isPending() const override; - - bool connected() const override; - - bool error() const override; - - void setSendTimeout(uint32_t milliseconds) override; - - size_t getAppBytesWritten() const override; - size_t getRawBytesWritten() const override; - size_t getAppBytesReceived() const override; - size_t getRawBytesReceived() const override; - - uint32_t getSendTimeout() const override; - - private: - std::shared_ptr decodeSynchronizationMessage( - const core::Interest &interest); - - class OnConnectCallback : public BasePortal::ConsumerCallback { - public: - OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; - virtual ~OnConnectCallback() = default; - void onContentObject(core::Interest::Ptr &&, - core::ContentObject::Ptr &&content_object) override; - void onTimeout(core::Interest::Ptr &&interest) override; - - private: - AsyncFullDuplexSocket &socket_; - }; - - class OnSignalCallback : public BasePortal::ConsumerCallback { - public: - OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; - virtual ~OnSignalCallback() = default; - void onContentObject(core::Interest::Ptr &&, - core::ContentObject::Ptr &&content_object); - void onTimeout(core::Interest::Ptr &&interest); - - private: - AsyncFullDuplexSocket &socket_; - }; - - void onControlInterest(ProducerSocket &s, const core::Interest &i); - void onContentProduced(ProducerSocket &producer, const std::error_code &ec, - uint64_t bytes_written); - void onContentRetrieved(ConsumerSocket &s, std::size_t size, - const std::error_code &ec); - - void signalProductionToSubscribers(const core::Name &name); - void piggybackPayloadToSubscribers(const core::Name &name, - const uint8_t *buffer, std::size_t bytes); - - std::shared_ptr createAck(); - std::shared_ptr createSubscriptionResponse( - const core::Name &name); - - core::Prefix locator_; - uint32_t incremental_suffix_; - core::Name sync_notification_; - // std::unique_ptr portal_; - asio::io_service internal_io_service_; - asio::io_service &io_service_; - asio::io_service::work work_; - - // These names represent the "locator" of a certain - // peer that subscribed to this. - std::unordered_set subscribers_; - - // Useful for publishing / Retrieving data - std::unique_ptr producer_; - std::unique_ptr consumer_; - - ReadCallback *read_callback_; - WriteCallback *write_callback_; - ConnectCallback *connect_callback_; - AcceptCallback *accept_callback_; - - std::unique_ptr internal_connect_callback_; - std::unique_ptr internal_signal_callback_; - - uint32_t send_timeout_milliseconds_; - struct Counters counters_; - ContentBuffer receive_buffer_; -}; - -} // namespace interface -} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h index 7d50d0fbd..90f6a3ef6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket.h +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -16,18 +16,10 @@ #pragma once #include -#include #include -#include -#include -#include -#include +#include #include #include -#include -#include -#include -#include #define SOCKET_OPTION_GET 0 #define SOCKET_OPTION_NOT_GET 1 @@ -39,26 +31,14 @@ namespace transport { -namespace protocol { -class IcnObserver; -class TransportStatistics; -} // namespace protocol - namespace interface { +// Forward Declarations template class Socket; -class ConsumerSocket; -class ProducerSocket; - -// using Interest = core::Interest; -// using ContentObject = core::ContentObject; -// using Name = core::Name; -// using HashAlgorithm = core::HashAlgorithm; -// using CryptoSuite = utils::CryptoSuite; -// using Identity = utils::Identity; -// using Verifier = utils::Verifier; +// Define the portal and its connector, depending on the compilation options +// passed by the build tool. using HicnForwarderPortal = core::HicnForwarderPortal; #ifdef __linux__ @@ -76,40 +56,6 @@ using BaseSocket = Socket; using BasePortal = HicnForwarderPortal; #endif -using PayloadType = core::PayloadType; -using Prefix = core::Prefix; -using Array = utils::Array; -using ContentBuffer = std::shared_ptr>; - -using ConsumerInterestCallback = - std::function; - -using ConsumerContentCallback = - std::function; - -using ConsumerTimerCallback = std::function; - -using ProducerContentCallback = std::function; - -using ConsumerContentObjectCallback = - std::function; - -using ConsumerContentObjectVerificationCallback = - std::function; - -using ConsumerManifestCallback = - std::function; - -using ProducerContentObjectCallback = - std::function; - -using ProducerInterestCallback = - std::function; - -using namespace protocol; - template class Socket { static_assert(std::is_same::value diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index ca9722849..37d545779 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -48,7 +48,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) is_async_(false), verifier_(std::make_shared()), verify_signature_(false), - content_buffer_(nullptr), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), @@ -56,7 +55,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) on_content_object_verification_(VOID_HANDLER), on_content_object_(VOID_HANDLER), on_manifest_(VOID_HANDLER), - on_payload_retrieved_(VOID_HANDLER), + stats_summary_(VOID_HANDLER), + read_callback_(nullptr), virtual_download_(false), rtt_stats_(false), timer_interval_milliseconds_(0) { @@ -81,13 +81,11 @@ ConsumerSocket::~ConsumerSocket() { void ConsumerSocket::connect() { portal_->connect(); } -int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) { +int ConsumerSocket::consume(const Name &name) { if (transport_protocol_->isRunning()) { return CONSUMER_BUSY; } - content_buffer_ = receive_buffer; - network_name_ = name; network_name_.setSuffix(0); is_async_ = false; @@ -97,14 +95,12 @@ int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) { return CONSUMER_FINISHED; } -int ConsumerSocket::asyncConsume(const Name &name, - ContentBuffer &receive_buffer) { +int ConsumerSocket::asyncConsume(const Name &name) { if (!async_downloader_.stopped()) { - async_downloader_.add([this, receive_buffer, name]() { + async_downloader_.add([this, name]() { network_name_ = std::move(name); network_name_.setSuffix(0); is_async_ = true; - content_buffer_ = receive_buffer; transport_protocol_->start(); }); } diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index ceed53954..40344af5d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -19,6 +19,11 @@ #include #include #include +#include + +extern "C" { +#include +} #define CONSUMER_FINISHED 0 #define CONSUMER_BUSY 1 @@ -28,28 +33,210 @@ namespace transport { namespace interface { +using namespace core; +using namespace protocol; + +/** + * @brief Main interface for consumer applications. + * + * The consumer socket is the main interface for a consumer application. + * It allows to retrieve an application data from one/many producers, by + * hiding all the complexity of the transport protocol used underneath. + */ class ConsumerSocket : public BaseSocket { public: + /** + * The ReadCallback is a class which can be used by the transport for both + * querying the application needs and notifying events. + * + * Beware that the methods of this class will be called synchronously while + * the transport is working, so the operations the application is performing + * on the data retrieved should be executed in another thread in an + * asynchronous manner. Blocking one of these callbacks means blocking the + * transport. + */ + class ReadCallback { + public: + virtual ~ReadCallback() = default; + + /** + * This API will specify to the transport whether the buffer should be + * allocated by the application (and then the retrieved content will be + * copied there) or the transport should allocate the buffer and "move" it + * to the application. In other words, if isBufferMovable return true, the + * transport will transfer the ownership of the read buffer to the + * application, without performing an additional copy, while if it returns + * false the transport will use the getReadBuffer API. + * + * By default this method returns true. + * + */ + virtual bool isBufferMovable() noexcept { return true; } + + /** + * This method will be called by the transport when the content is + * available. The application can then allocate its own buffer and provide + * the address to the transport, which will use it for writing the data. + * Note that if the application won't allocate enough memory this method + * will be called several times, until the internal read buffer will be + * emptied. For ensuring this method will be called once, applications + * should allocate at least maxBufferSize() bytes. + * + * @param application_buffer - Pointer to the application's buffer. + * @param max_length - The length of the application buffer. + */ + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) = 0; + + /** + * This method will be called by the transport after calling getReadBuffer, + * in order to notify the application that length bytes are available in the + * buffer. The max_length size of the buffer could be larger than the actual + * amount of bytes written. + * + * @param length - The number of bytes placed in the buffer. + */ + virtual void readDataAvailable(size_t length) noexcept = 0; + + /** + * This method will be called by the transport for understanding how many + * bytes it should read (at most) before notifying the application. + * + * By default it reads 64 KB. + */ + virtual size_t maxBufferSize() const { return 64 * 1024; } + + /** + * This method will be called by the transport iff (isBufferMovable == + * true). The unique_ptr underlines the fact that the ownership of the + * buffer is being transferred to the application. + * + * @param buffer - The buffer + */ + virtual void readBufferAvailable( + std::unique_ptr &&buffer) noexcept {} + + /** + * readError() will be invoked if an error occurs reading from the + * transport. + * + * @param ec - An error code describing the error. + */ + virtual void readError(const std::error_code ec) noexcept = 0; + + /** + * This callback will be invoked when the whole content is retrieved. The + * transport itself knows when a content is retrieved (since it is not an + * opaque bytestream like TCP), and the transport itself is able to tell + * the application when the transfer is done. + */ + virtual void readSuccess(std::size_t total_size) noexcept = 0; + }; + + /** + * @brief Create a new consumer socket. + * + * @param protocol - The transport protocol to use. So far the following + * transport are supported: + * - CBR: Constant bitrate + * - Raaqm: Based on paper: Optimal multipath congestion control and request + * forwarding in information-centric networks: Protocol design and + * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks + * 110, 104-117 + * - RTC: Real time communication + */ explicit ConsumerSocket(int protocol); explicit ConsumerSocket(int protocol, asio::io_service &io_service); + /** + * @brief Destroy the consumer socket. + */ ~ConsumerSocket(); + /** + * @brief Connect the consumer socket to the underlying hICN forwarder. + */ void connect() override; - int consume(const Name &name, ContentBuffer &receive_buffer); - - int asyncConsume(const Name &name, ContentBuffer &receive_buffer); - + /** + * Retrieve a content using the protocol specified in the constructor. + * This function blocks until the whole content is downloaded. + * For monitoring the status of the download, the application MUST set the + * ConsumerRead callback. This callback will be called periodically (depending + * on the needs of the application), allowing the application to save the + * retrieved data. + * + * @param name - The name of the content to retrieve. + * + * @return CONSUMER_BUSY if a pending download exists + * @return CONSUMER_FINISHED when the download finishes + * + * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the + * content retrieval succeeded. This information can be obtained from the + * error code in CONTENT_RETRIEVED callback. + */ + int consume(const Name &name); + int asyncConsume(const Name &name); + + /** + * Send an interest asynchronously in another thread, which is the same used + * for asyncConsume. + * + * @param interest - An Interest::Ptr to the interest. Notice that the + * application looses the ownership of the interest, which is transferred to + * the library itself. + * @param callback - A ConsumerCallback containing the events to be trigger in + * case of timeout or content reception. + * + */ void asyncSendInterest(Interest::Ptr &&interest, Portal::ConsumerCallback *callback); + /** + * Stops the consumer socket. If several downloads are queued (using + * asyncConsume), this call stops just the current one. + */ void stop(); + /** + * Resume the download from the same exact point it stopped. + */ void resume(); + /** + * Get the io_service which is running the transport protocol event loop. + * + * @return A reference to the internal io_service where the transport protocol + * is running. + */ asio::io_service &getIoService() override; + TRANSPORT_ALWAYS_INLINE int setSocketOption( + int socket_option_key, ReadCallback *socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + TRANSPORT_ALWAYS_INLINE int getSocketOption( + int socket_option_key, ReadCallback **socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, double socket_option_value) { switch (socket_option_key) { @@ -150,12 +337,6 @@ class ConsumerSocket : public BaseSocket { break; } - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - if (socket_option_value == VOID_HANDLER) { - on_payload_retrieved_ = VOID_HANDLER; - break; - } - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: if (socket_option_value > 0) { rate_estimation_batching_parameter_ = socket_option_value; @@ -274,20 +455,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_SET; } - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerContentCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - on_payload_retrieved_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, ConsumerManifestCallback socket_option_value) { switch (socket_option_key) { @@ -331,21 +498,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_SET; } - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - const std::shared_ptr> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - content_buffer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, const std::string &socket_option_value) { switch (socket_option_key) { @@ -568,18 +720,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_GET; } - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - *socket_option_value = &on_payload_retrieved_; - return SOCKET_OPTION_GET; - - default: - return SOCKET_OPTION_NOT_GET; - } - } - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, ConsumerManifestCallback **socket_option_value) { switch (socket_option_key) { @@ -635,20 +775,6 @@ class ConsumerSocket : public BaseSocket { return SOCKET_OPTION_GET; } - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - socket_option_value = content_buffer_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, std::string &socket_option_value) { switch (socket_option_key) { @@ -718,8 +844,6 @@ class ConsumerSocket : public BaseSocket { PARCKeyId *key_id_; bool verify_signature_; - ContentBuffer content_buffer_; - ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; ConsumerInterestCallback on_interest_timeout_; @@ -730,11 +854,10 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; - - ConsumerContentCallback on_payload_retrieved_; - ConsumerTimerCallback stats_summary_; + ReadCallback *read_callback_; + // Virtual download for traffic generator bool virtual_download_; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index bcf049310..c21108186 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -78,7 +78,7 @@ typedef enum { CONTENT_OBJECT_INPUT = 411, MANIFEST_INPUT = 412, CONTENT_OBJECT_TO_VERIFY = 413, - CONTENT_RETRIEVED = 414, + READ_CALLBACK = 414, STATS_SUMMARY = 415 } ConsumerCallbacksOptions; diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 3f9d4959d..c4cf95895 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -14,6 +14,7 @@ */ #include +#include #include @@ -336,16 +337,6 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } } -void ProducerSocket::asyncProduce(const Name &suffix, - ContentBuffer &&output_buffer) { - if (!async_thread_.stopped()) { - async_thread_.add( - [this, suff = suffix, buffer = std::move(output_buffer)]() { - produce(suff, &(*buffer)[0], buffer->size(), true); - }); - } -} - void ProducerSocket::onInterest(Interest &interest) { if (on_interest_input_ != VOID_HANDLER) { on_interest_input_(*this, interest); diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 6ba5671cc..200c32a95 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -30,6 +30,10 @@ #define REGISTRATION_FAILURE 2 #define REGISTRATION_IN_PROGRESS 3 +namespace utils { +class Identity; +} + namespace transport { namespace interface { @@ -59,7 +63,7 @@ class ProducerSocket : public Socket, void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size); - void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer); + void asyncProduce(const Name &suffix); void asyncProduce(ContentObject &content_object); diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index b8a7c9610..7f0310e7c 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) } RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (this->rate_estimator_) { - delete this->rate_estimator_; + if (rate_estimator_) { + delete rate_estimator_; } } int RaaqmTransportProtocol::start() { - if (this->rate_estimator_) { - this->rate_estimator_->onStart(); + if (rate_estimator_) { + rate_estimator_->onStart(); } if (!cur_path_) { @@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() { choice_param); if (choice_param == 1) { - this->rate_estimator_ = new ALaTcpEstimator(); + rate_estimator_ = new ALaTcpEstimator(); } else { - this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + rate_estimator_ = new SimpleEstimator(alpha, batching_param); } socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, - &this->rate_estimator_->observer_); + &rate_estimator_->observer_); // Current path auto cur_path = std::make_unique( @@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowIncrease(current_window_size_); + rate_estimator_->onWindowIncrease(current_window_size_); } void RaaqmTransportProtocol::decreaseWindow() { @@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() { socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, current_window_size_); } - this->rate_estimator_->onWindowDecrease(current_window_size_); + rate_estimator_->onWindowDecrease(current_window_size_); } void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { @@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception( updatePathTable(content_object); increaseWindow(); updateRtt(interest.getName().getSuffix()); - this->rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); // Set drop probablility and window size accordingly RAAQM(); } @@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment( reassemble(std::move(content_object)); } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == index_manager_->getFinalSuffix())) { - onContentReassembled(std::make_error_code(std::errc(0))); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload != nullptr) { + on_payload->readSuccess(stats_.getBytesRecv()); + } } } else { // TODO Application policy check @@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { return; } - // This is set to ~0 so that the next interest_retransmissions_ + 1, performed - // by sendInterest, will result in 0 + // This is set to ~0 so that the next interest_retransmissions_ + 1, + // performed by sendInterest, will result in 0 interest_retransmissions_[next_suffix & mask] = ~0; interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); sendInterest(std::move(interest)); @@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if (*on_payload != VOID_HANDLER) { - std::shared_ptr> content_buffer; - socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - (*on_payload)(*socket_, content_buffer->size(), ec); + interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (on_payload == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_.getBytesRecv()); + } else { + on_payload->readError(ec); } - this->rate_estimator_->onDownloadFinished(); + rate_estimator_->onDownloadFinished(); stop(); } @@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) { // Update stats updateStats((uint32_t)segment, rtt.count(), now); - if (this->rate_estimator_) { - this->rate_estimator_->onRttUpdate((double)rtt.count()); + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); } cur_path_->insertNewRtt(rtt.count()); @@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() { } // end namespace protocol -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index 899f701c7..a2062df93 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -17,6 +17,7 @@ #include #include #include +#include namespace transport { @@ -30,7 +31,8 @@ BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, manifest_index_manager_( std::make_unique(icn_socket)), index_manager_(incremental_index_manager_.get()), - index_(0) { + index_(0), + read_buffer_(nullptr) { setContentCallback(content_callback); } @@ -54,30 +56,88 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { void BaseReassembly::copyContent(const ContentObject &content_object) { auto a = content_object.getPayload(); - - std::shared_ptr> content_buffer; - reassembly_consumer_socket_->getSocketOption( - interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer); - - content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(), - (uint8_t *)a->data() + a->length()); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } bool download_completed = index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); if (TRANSPORT_EXPECT_FALSE(download_completed)) { + notifyApplication(); content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); } } +void BaseReassembly::notifyApplication() { + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + if (TRANSPORT_EXPECT_FALSE(!read_callback)) { + TRANSPORT_LOGE("Read callback not installed!"); + return; + } + + if (read_callback->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback->readBufferAvailable(std::move(read_buffer_)); + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer_->length(); + + while (read_buffer_->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer_->length(), length); + std::memcpy(buffer, read_buffer_->data(), to_copy); + read_buffer_->trimStart(to_copy); + } + + read_callback->readDataAvailable(total_length); + read_buffer_->clear(); + } +} + void BaseReassembly::reset() { manifest_index_manager_->reset(); incremental_index_manager_->reset(); index_ = index_manager_->getNextReassemblySegment(); received_packets_.clear(); + + // reset read buffer + interface::ConsumerSocket::ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); } } // namespace protocol -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index 9efddb773..79f0ea4d2 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -20,6 +20,10 @@ namespace transport { +namespace interface { +class ConsumerReadCallback; +} + namespace protocol { // Forward Declaration @@ -54,6 +58,9 @@ class BaseReassembly : public Reassembly { virtual void reset() override; + private: + void notifyApplication(); + protected: // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; @@ -63,6 +70,7 @@ class BaseReassembly : public Reassembly { std::unordered_map received_packets_; uint64_t index_; + std::unique_ptr read_buffer_; }; } // namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 9402d3b02..4205ade4e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats( uint32_t segmentNumber = content_object.getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - if (inflightInterests_[pkt].state != sent_) - return; + if (inflightInterests_[pkt].state != sent_) return; - if(interestRetransmissions_.find(segmentNumber) != + if (interestRetransmissions_.find(segmentNumber) != interestRetransmissions_.end()) - //this packet was rtx at least once + // this packet was rtx at least once return; uint32_t pathLabel = content_object.getPathLabel(); @@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() { } else { currentCWin_ = min( maxCWin_, - (uint32_t)ceil(currentCWin_ + - (1.0 / (double)currentCWin_))); // linear + (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear } } @@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { if (!rtx) { inflightInterestsCount_++; } - } void RTCTransportProtocol::scheduleNextInterests() { @@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() { while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); - //we send the packet only if it is not pending yet + // we send the packet only if it is not pending yet interest_name->setSuffix(actualSegment_); if (portal_->interestIsPending(*interest_name)) { actualSegment_++; @@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() { } uint32_t pkt = actualSegment_ & modMask_; - //if we already reacevied the content we don't ask it again - if(inflightInterests_[pkt].state == received_ && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; - continue; + // if we already reacevied the content we don't ask it again + if (inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_++; + continue; } inflightInterests_[pkt].transmissionTime = @@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector &nacks) { #endif } -void RTCTransportProtocol::addRetransmissions(uint32_t val){ - //add only val in the rtx list +void RTCTransportProtocol::addRetransmissions(uint32_t val) { + // add only val in the rtx list addRetransmissions(val, val + 1); } -void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){ - for(uint32_t i = start; i < stop; i++){ +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); - if(it == interestRetransmissions_.end()){ + if (it == interestRetransmissions_.end()) { if (lastSegNacked_ <= i) { - //i must be larger than the last past nack received + // i must be larger than the last past nack received interestRetransmissions_[i] = 0; } - }//if the retransmission is already there the rtx timer will - //take care of it + } // if the retransmission is already there the rtx timer will + // take care of it } retransmit(true); } -void RTCTransportProtocol::retransmit(bool first_rtx){ +void RTCTransportProtocol::retransmit(bool first_rtx) { auto it = interestRetransmissions_.begin(); - //cut len to max HICN_MAX_RTX_SIZE - //since we use a map, the smaller (and so the older) sequence number are at - //the beginnin of the map - while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){ + // cut len to max HICN_MAX_RTX_SIZE + // since we use a map, the smaller (and so the older) sequence number are at + // the beginnin of the map + while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { it = interestRetransmissions_.erase(it); } it = interestRetransmissions_.begin(); - while (it != interestRetransmissions_.end()){ + while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; - if(inflightInterests_[pkt].sequence != it->first){ - //this packet is not anymore in the inflight buffer, erase it + if (inflightInterests_[pkt].sequence != it->first) { + // this packet is not anymore in the inflight buffer, erase it it = interestRetransmissions_.erase(it); continue; } - //we retransmitted the packet too many times - if(it->second >= HICN_MAX_RTX){ + // we retransmitted the packet too many times + if (it->second >= HICN_MAX_RTX) { it = interestRetransmissions_.erase(it); continue; } - //this packet is too old - if((lastReceived_ > it->first) && - (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){ + // this packet is too old + if ((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { it = interestRetransmissions_.erase(it); continue; } - if(first_rtx){ - //TODO (optimization) - //the rtx that we never sent (it->second == 0) are all at the - //end, so we can go directly there - if(it->second == 0){ + if (first_rtx) { + // TODO (optimization) + // the rtx that we never sent (it->second == 0) are all at the + // end, so we can go directly there + if (it->second == 0) { inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } ++it; - }else{ - //base on time + } else { + // base on time uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if((now - inflightInterests_[pkt].transmissionTime) > 20){ - //XXX replace 20 with rtt + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if ((now - inflightInterests_[pkt].transmissionTime) > 20) { + // XXX replace 20 with rtt inflightInterests_[pkt].transmissionTime = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); } @@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){ } } -void RTCTransportProtocol::checkRtx(){ +void RTCTransportProtocol::checkRtx() { retransmit(false); rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); rtx_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - checkRtx(); - }); + if (ec) return; + checkRtx(); + }); } void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { @@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { inflightInterestsCount_--; } - //check how many times we sent this packet - auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){ - inflightInterests_[pkt].state = lost_; - } + // check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == sent_) { - inflightInterests_[pkt].state = timeout1_; - } else if (inflightInterests_[pkt].state == timeout1_) { - inflightInterests_[pkt].state = timeout2_; - } else if (inflightInterests_[pkt].state == timeout2_) { - inflightInterests_[pkt].state = lost_; - } + if (inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } - if(inflightInterests_[pkt].state == lost_) { - interestRetransmissions_.erase(segmentNumber); - }else{ - addRetransmissions(segmentNumber); - } + if (inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + } else { + addRetransmissions(segmentNumber); + } scheduleNextInterests(); } @@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive( uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); - if (productionRate == 0) { // the producer socket is not active // in this case we consider only the first nack if (nack_timer_used_) { - return false; + return false; } nack_timer_used_ = true; @@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject( ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); if (inflightInterests_[pkt].state == sent_) { - inflightInterestsCount_--; //packet sent without timeouts + inflightInterestsCount_--; // packet sent without timeouts } if (inflightInterests_[pkt].state == sent_ && interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ + interestRetransmissions_.end()) { // we count only non retransmitted data in order to take into accunt only // the transmition rate of the producer receivedBytes_ += (uint32_t)(content_object->headerSize() + @@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject( updateDelayStats(*content_object); addRetransmissions(lastReceived_ + 1, segmentNumber); - //lastReceived_ is updated only for data packets received without RTX + // lastReceived_ is updated only for data packets received without RTX lastReceived_ = segmentNumber; } @@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject( increaseWindow(); } - //in any case we remove the packet from the rtx list + // in any case we remove the packet from the rtx list interestRetransmissions_.erase(segmentNumber); if (schedule_next_interest) { @@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject( void RTCTransportProtocol::returnContentToApplication( const ContentObject &content_object) { // return content to the user - auto a = content_object.getPayload(); + auto read_buffer = content_object.getPayload(); - a->trimStart(HICN_TIMESTAMP_SIZE); - uint8_t *start = a->writableData(); - unsigned size = (unsigned)a->length(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); // set offset between hICN and RTP packets - uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1)); + uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1)); RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq; - std::shared_ptr> content_buffer; - socket_->getSocketOption(APPLICATION_BUFFER, content_buffer); - content_buffer->insert(content_buffer->end(), start, start + size); + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + socket_->getSocketOption(READ_CALLBACK, &read_callback); + + if (read_callback == nullptr) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before starting " + "the content retrieval."); + } + + if (read_callback->isBufferMovable()) { + read_callback->readBufferAvailable( + utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer->length(); + + while (read_buffer->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer->length(), length); + + std::memcpy(buffer, read_buffer->data(), to_copy); + read_buffer->trimStart(to_copy); + } - ConsumerContentCallback *on_payload = nullptr; - socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload); - if ((*on_payload) != VOID_HANDLER) { - (*on_payload)(*socket_, size, std::make_error_code(std::errc(0))); + read_callback->readDataAvailable(total_length); + read_buffer->clear(); } } diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 7594d1f94..ddc79d520 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -16,14 +16,15 @@ #include #include #include +#include #ifndef _WIN32 #include #endif #include -#include #include #include +#include #ifdef __linux__ #include @@ -45,6 +46,9 @@ namespace interface { using CryptoSuite = utils::CryptoSuite; using Identity = utils::Identity; +/** + * Container for command line configuration for hiperf client. + */ struct ClientConfiguration { ClientConfiguration() : name("b001::abcd", 0), @@ -54,7 +58,7 @@ struct ClientConfiguration { window(-1), virtual_download(true), producer_certificate("/tmp/rsa_certificate.pem"), - receive_buffer(std::make_shared>()), + receive_buffer(nullptr), download_size(0), report_interval_milliseconds_(1000), rtc_(false), @@ -67,7 +71,7 @@ struct ClientConfiguration { double window; bool virtual_download; std::string producer_certificate; - std::shared_ptr> receive_buffer; + std::shared_ptr receive_buffer; std::size_t download_size; std::uint32_t report_interval_milliseconds_; TransportProtocolAlgorithms transport_protocol_; @@ -75,6 +79,9 @@ struct ClientConfiguration { bool test_mode_; }; +/** + * Class for handling the production rate for the RTC producer. + */ class Rate { public: Rate() : rate_kbps_(0) {} @@ -110,6 +117,9 @@ class Rate { float rate_kbps_; }; +/** + * Container for command line configuration for hiperf server. + */ struct ServerConfiguration { ServerConfiguration() : name("b001::abcd/64"), @@ -147,10 +157,23 @@ struct ServerConfiguration { std::size_t payload_size_; }; +/** + * Forward declaration of client Read callbacks. + */ +class RTCCallback; +class Callback; + +/** + * Hiperf client class: configure and setup an hicn consumer following the + * ClientConfiguration. + */ class HIperfClient { typedef std::chrono::time_point Time; typedef std::chrono::microseconds TimeDuration; + friend class RTCCallback; + friend class Callback; + public: HIperfClient(const ClientConfiguration &conf) : configuration_(conf), @@ -158,75 +181,55 @@ class HIperfClient { old_bytes_value_(0), signals_(io_service_, SIGINT), expected_seg_(0), - lost_packets_(std::unordered_set()) {} - - void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, - const std::error_code &ec) { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast(t2 - t_download_); - long usec = (long)dt.count(); - - std::cout << "Content retrieved. Size: " << bytes_transferred << " [Bytes]" - << std::endl; - - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (bytes_transferred * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; - - io_service_.stop(); - } - - void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred, - const std::error_code &ec) { - configuration_.receive_buffer->clear(); - } + lost_packets_(std::unordered_set()), + rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr), + callback_(configuration_.rtc_ ? nullptr : new Callback(*this)) {} void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) { - if(!configuration_.test_mode_) - return; + const ContentObject &contentObject) { + if (!configuration_.test_mode_) return; uint32_t receivedSeg = contentObject.getName().getSuffix(); auto payload = contentObject.getPayload(); - if((uint32_t)payload->length() == 8){ //8 is the size of the NACK payload + if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK + // payload uint32_t *payloadPtr = (uint32_t *)payload->data(); uint32_t productionSeg = *(payloadPtr); uint32_t productionRate = *(++payloadPtr); - if(productionRate == 0){ - std::cout << "[STOP] producer is not producing content" - << std::endl; + if (productionRate == 0) { + std::cout << "[STOP] producer is not producing content" << std::endl; return; } - if(receivedSeg < productionSeg){ - std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg << - ". Next expected packet " << productionSeg + 1 << std::endl; + if (receivedSeg < productionSeg) { + std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg + << ". Next expected packet " << productionSeg + 1 + << std::endl; expected_seg_ = productionSeg; - } else if(receivedSeg > productionSeg){ - std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg << - ". Next expected packet " << productionSeg << std::endl; + } else if (receivedSeg > productionSeg) { + std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg + << ". Next expected packet " << productionSeg << std::endl; } return; } - if(receivedSeg > expected_seg_){ - for(uint32_t i = expected_seg_; i < receivedSeg; i++){ + if (receivedSeg > expected_seg_) { + for (uint32_t i = expected_seg_; i < receivedSeg; i++) { std::cout << "[LOSS] lost packet " << i << std::endl; lost_packets_.insert(i); } expected_seg_ = receivedSeg + 1; return; - }else if (receivedSeg < expected_seg_){ + } else if (receivedSeg < expected_seg_) { auto it = lost_packets_.find(receivedSeg); - if(it != lost_packets_.end()){ + if (it != lost_packets_.end()) { std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl; lost_packets_.erase(it); - }else{ - std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " << - expected_seg_ << std::endl; + } else { + std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " + << expected_seg_ << std::endl; } return; } @@ -379,28 +382,22 @@ class HIperfClient { if (!configuration_.rtc_) { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + ConsumerCallbacksOptions::READ_CALLBACK, callback_); } else { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayloadRtc, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_); } if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } - if(configuration_.rtc_){ + if (configuration_.rtc_) { ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, (ConsumerContentObjectCallback)std::bind( &HIperfClient::checkReceivedRtcContent, this, - std::placeholders::_1, std::placeholders::_2)); + std::placeholders::_1, std::placeholders::_2)); if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } @@ -437,14 +434,103 @@ class HIperfClient { }); t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name, - configuration_.receive_buffer); + consumer_socket_->asyncConsume(configuration_.name); io_service_.run(); return ERROR_SUCCESS; } private: + class RTCCallback : public ConsumerSocket::ReadCallback { + static constexpr std::size_t mtu = 1500; + + public: + RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { + client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); + } + + bool isBufferMovable() noexcept override { return false; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + *application_buffer = + client_.configuration_.receive_buffer->writableData(); + *max_length = mtu; + } + + void readDataAvailable(std::size_t length) noexcept override { + // Do nothing + return; + } + + size_t maxBufferSize() const override { return mtu; } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error while reading from RTC socket" << std::endl; + } + + void readSuccess(std::size_t total_size) noexcept override { + std::cout << "Data successfully read" << std::endl; + } + + private: + HIperfClient &client_; + }; + + class Callback : public ConsumerSocket::ReadCallback { + static constexpr std::size_t read_size = 16 * 1024; + + public: + Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {} + + bool isBufferMovable() noexcept override { return true; } + + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override { + // Not used + } + + void readDataAvailable(std::size_t length) noexcept override { + // Do nothing + return; + } + + void readBufferAvailable( + std::unique_ptr &&buffer) noexcept override { + if (client_.configuration_.receive_buffer) { + client_.configuration_.receive_buffer->prependChain(std::move(buffer)); + } else { + client_.configuration_.receive_buffer = std::move(buffer); + } + } + + size_t maxBufferSize() const override { return read_size; } + + void readError(const std::error_code ec) noexcept override { + std::cerr << "Error " << ec.message() << " while reading from socket" + << std::endl; + } + + void readSuccess(std::size_t total_size) noexcept override { + Time t2 = std::chrono::steady_clock::now(); + TimeDuration dt = + std::chrono::duration_cast(t2 - client_.t_download_); + long usec = (long)dt.count(); + + std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" + << std::endl; + + std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " + << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" + << std::endl; + + client_.io_service_.stop(); + } + + private: + HIperfClient &client_; + }; + ClientConfiguration configuration_; Time t_stats_; Time t_download_; @@ -455,8 +541,14 @@ class HIperfClient { std::unique_ptr consumer_socket_; uint32_t expected_seg_; std::unordered_set lost_packets_; + RTCCallback *rtc_callback_; + Callback *callback_; }; +/** + * Hiperf server class: configure and setup an hicn producer following the + * ServerConfiguration. + */ class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; @@ -800,7 +892,6 @@ void usage() { "receiving the correct data. This is an RTC specific option, to be " "used with the -R (default false)" << std::endl; - } int main(int argc, char *argv[]) { @@ -899,7 +990,7 @@ int main(int argc, char *argv[]) { options = 1; break; } - case 't':{ + case 't': { client_configuration.test_mode_ = true; options = 1; break; diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc index d6614303a..049ab3ac5 100644 --- a/utils/src/ping_server.cc +++ b/utils/src/ping_server.cc @@ -19,6 +19,7 @@ #else #include #endif +#include #include #include @@ -43,16 +44,22 @@ utils::Identity setProducerIdentity(std::string keystore_name, class CallbackContainer { const std::size_t log2_content_object_buffer_size = 12; -public: + public: CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose, bool dump, bool quite, bool flags, bool reset, uint8_t ttl, utils::Identity *identity, bool sign, uint32_t lifetime) : buffer_(object_size, 'X'), content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)), mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - content_objects_index_(0), verbose_(verbose), dump_(dump), - quite_(quite), flags_(flags), reset_(reset), ttl_(ttl), - identity_(identity), sign_(sign) { + content_objects_index_(0), + verbose_(verbose), + dump_(dump), + quite_(quite), + flags_(flags), + reset_(reset), + ttl_(ttl), + identity_(identity), + sign_(sign) { core::Packet::Format format; if (prefix.getAddressFamily() == AF_INET) { @@ -114,7 +121,7 @@ public: content_object->setAck(); } else if (interest.testAck()) { content_object->setAck(); - } // here I may need to handle the FIN flag; + } // here I may need to handle the FIN flag; } else if (reset_) { content_object->setRst(); } @@ -136,8 +143,7 @@ public: std::cout << "-----------------------" << std::endl; } - if (!quite_) - std::cout << std::endl; + if (!quite_) std::cout << std::endl; if (sign_) { identity_->getSigner().sign(*content_object); @@ -147,7 +153,7 @@ public: } } -private: + private: std::string buffer_; std::vector> content_objects_; std::uint16_t mask_; @@ -222,51 +228,51 @@ int main(int argc, char **argv) { while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) { #endif switch (opt) { - case 's': - object_size = std::stoi(optarg); - break; - case 'n': - name_prefix = optarg; - break; - case 't': - ttl = (uint8_t)std::stoi(optarg); - break; - case 'l': - data_lifetime = std::stoi(optarg); - break; - case 'V': - verbose = true; - break; - case 'D': - dump = true; - break; - case 'q': - verbose = false; - dump = false; - quite = true; - break; + case 's': + object_size = std::stoi(optarg); + break; + case 'n': + name_prefix = optarg; + break; + case 't': + ttl = (uint8_t)std::stoi(optarg); + break; + case 'l': + data_lifetime = std::stoi(optarg); + break; + case 'V': + verbose = true; + break; + case 'D': + dump = true; + break; + case 'q': + verbose = false; + dump = false; + quite = true; + break; #ifndef _WIN32 - case 'd': - daemon = true; - break; + case 'd': + daemon = true; + break; #endif - case 'f': - flags = true; - break; - case 'r': - reset = true; - break; - case 'k': - keystore_path = optarg; - sign = true; - break; - case 'p': - keystore_password = optarg; - break; - case 'H': - default: - help(); - exit(EXIT_FAILURE); + case 'f': + flags = true; + break; + case 'r': + reset = true; + break; + case 'k': + keystore_path = optarg; + sign = true; + break; + case 'p': + keystore_password = optarg; + break; + case 'H': + default: + help(); + exit(EXIT_FAILURE); } } @@ -282,8 +288,7 @@ int main(int argc, char **argv) { std::string ip_address = tokenizer.nextToken(); Name n(ip_address); - if (object_size > 1350) - object_size = 1350; + if (object_size > 1350) object_size = 1350; CallbackContainer *stubs; utils::Identity identity = setProducerIdentity( @@ -327,9 +332,9 @@ int main(int argc, char **argv) { return 0; } -} // namespace interface +} // namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char **argv) { return transport::interface::main(argc, argv); -- cgit 1.2.3-korg