summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-03 10:03:56 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-15 11:37:30 +0200
commitc365689250216861fd7727203ee6ba1049ad5778 (patch)
tree97f1f3d1a6cb7314f1292d97be6d8e8e06cc998b /libtransport
parentd8ce6d98a2a726393655bd71eb81b8ef5222d6ba (diff)
[HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application.
Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/http/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/http/callbacks.h46
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.cc127
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.h65
-rw-r--r--libtransport/src/hicn/transport/http/server_acceptor.h4
-rw-r--r--libtransport/src/hicn/transport/http/server_publisher.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/CMakeLists.txt4
-rw-r--r--libtransport/src/hicn/transport/interfaces/async_transport.h641
-rw-r--r--libtransport/src/hicn/transport/interfaces/callbacks.h110
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc490
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.h243
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket.h62
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc14
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h263
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_options_keys.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc11
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h6
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc62
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc78
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h8
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc204
21 files changed, 683 insertions, 1760 deletions
diff --git a/libtransport/src/hicn/transport/http/CMakeLists.txt b/libtransport/src/hicn/transport/http/CMakeLists.txt
index ddcf1fdc3..b24c80195 100644
--- a/libtransport/src/hicn/transport/http/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/http/CMakeLists.txt
@@ -29,7 +29,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/facade.h
${CMAKE_CURRENT_SOURCE_DIR}/response.h
${CMAKE_CURRENT_SOURCE_DIR}/message.h
- ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/hicn/transport/http/callbacks.h b/libtransport/src/hicn/transport/http/callbacks.h
deleted file mode 100644
index a0f3d5999..000000000
--- a/libtransport/src/hicn/transport/http/callbacks.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/http/server_publisher.h>
-
-#include <functional>
-#include <memory>
-
-namespace transport {
-
-namespace http {
-
-enum class RC : uint8_t {
- SUCCESS,
- CONTENT_PUBLISHED,
- ERR_UNDEFINED,
-};
-
-using OnHttpRequest =
- std::function<void(std::shared_ptr<HTTPServerPublisher>&, const uint8_t*,
- std::size_t, int request_id)>;
-using DeadlineTimerCallback = std::function<void(const std::error_code& e)>;
-using ReceiveCallback = std::function<void(const std::vector<uint8_t>&)>;
-using OnPayloadCallback =
- std::function<RC(const std::error_code& ec, const core::Name& name,
- const ContentBuffer& payload)>;
-using ContentSentCallback =
- std::function<void(const std::error_code&, const core::Name&)>;
-
-} // namespace http
-
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc
index b31d89b6b..fb7dbdfac 100644
--- a/libtransport/src/hicn/transport/http/client_connection.cc
+++ b/libtransport/src/hicn/transport/http/client_connection.cc
@@ -27,6 +27,8 @@ using namespace transport;
HTTPClientConnection::HTTPClientConnection()
: consumer_(TransportProtocolAlgorithms::RAAQM, io_service_),
+ read_bytes_callback_(nullptr),
+ read_buffer_(nullptr),
response_(std::make_shared<HTTPResponse>()),
timer_(nullptr) {
consumer_.setSocketOption(
@@ -35,11 +37,7 @@ HTTPClientConnection::HTTPClientConnection()
&HTTPClientConnection::verifyData, this, std::placeholders::_1,
std::placeholders::_2));
- consumer_.setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HTTPClientConnection::processPayload, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
consumer_.connect();
std::shared_ptr<typename ConsumerSocket::Portal> portal;
@@ -47,36 +45,48 @@ HTTPClientConnection::HTTPClientConnection()
timer_ = std::make_unique<asio::steady_timer>(portal->getIoService());
}
-HTTPClientConnection &HTTPClientConnection::get(
+HTTPClientConnection::RC HTTPClientConnection::get(
const std::string &url, HTTPHeaders headers, HTTPPayload payload,
- std::shared_ptr<HTTPResponse> response) {
- return sendRequest(url, HTTPMethod::GET, headers, payload, response);
+ std::shared_ptr<HTTPResponse> response, ReadBytesCallback *callback) {
+ return sendRequest(url, HTTPMethod::GET, headers, payload, response,
+ callback);
}
-HTTPClientConnection &HTTPClientConnection::sendRequest(
+HTTPClientConnection::RC HTTPClientConnection::sendRequest(
const std::string &url, HTTPMethod method, HTTPHeaders headers,
- HTTPPayload payload, std::shared_ptr<HTTPResponse> response) {
+ HTTPPayload payload, std::shared_ptr<HTTPResponse> response,
+ ReadBytesCallback *callback) {
+ current_url_ = url;
+ read_bytes_callback_ = callback;
if (!response) {
response = response_;
}
auto start = std::chrono::steady_clock::now();
HTTPRequest request(method, url, headers, payload);
- std::string name = sendRequestGetReply(request, response);
- auto end = std::chrono::steady_clock::now();
-
- TRANSPORT_LOGI(
- "%s %s [%s] duration: %llu [usec] %zu [bytes]\n",
- method_map[method].c_str(), url.c_str(), name.c_str(),
- (unsigned long long)std::chrono::duration_cast<std::chrono::microseconds>(
- end - start)
- .count(),
- response->size());
+ response->clear();
- return *this;
+ success_callback_ = [this, method = std::move(method), url = std::move(url),
+ start = std::move(start),
+ response = std::move(response)](
+ std::size_t size) -> std::shared_ptr<HTTPResponse> {
+ auto end = std::chrono::steady_clock::now();
+ TRANSPORT_LOGI(
+ "%s %s [%s] duration: %llu [usec] %zu [bytes]\n",
+ method_map[method].c_str(), url.c_str(), name_.str().c_str(),
+ (unsigned long long)
+ std::chrono::duration_cast<std::chrono::microseconds>(end - start)
+ .count(),
+ size);
+
+ return response;
+ };
+
+ sendRequestGetReply(request, response);
+ return return_code_;
}
-std::string HTTPClientConnection::sendRequestGetReply(
+void HTTPClientConnection::sendRequestGetReply(
const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response) {
const std::string &request_string = request.getRequestString();
const std::string &locator = request.getLocator();
@@ -94,38 +104,28 @@ std::string HTTPClientConnection::sendRequestGetReply(
&HTTPClientConnection::processLeavingInterest, this,
std::placeholders::_1, std::placeholders::_2, request_string));
- // Send content to producer piggybacking it through first interest (to fix)
-
- response->clear();
-
// Factor hicn name using hash
+ name_.str("");
- std::stringstream stream;
-
- stream << std::hex << http::default_values::ipv6_first_word << ":";
+ name_ << std::hex << http::default_values::ipv6_first_word << ":";
for (uint16_t *word = (uint16_t *)&locator_hash;
std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
word++) {
- stream << ":" << std::hex << *word;
+ name_ << ":" << std::hex << *word;
}
for (uint16_t *word = (uint16_t *)&request_hash;
std::size_t(word) < (std::size_t(&request_hash) + sizeof(request_hash));
word++) {
- stream << ":" << std::hex << *word;
+ name_ << ":" << std::hex << *word;
}
- stream << "|0";
-
- ContentBuffer response_ptr =
- std::static_pointer_cast<std::vector<uint8_t>>(response);
+ name_ << "|0";
- consumer_.consume(Name(stream.str()), response_ptr);
+ consumer_.consume(Name(name_.str()));
consumer_.stop();
-
- return stream.str();
}
HTTPResponse HTTPClientConnection::response() {
@@ -133,14 +133,6 @@ HTTPResponse HTTPClientConnection::response() {
return std::move(*response_);
}
-void HTTPClientConnection::processPayload(ConsumerSocket &c,
- std::size_t bytes_transferred,
- const std::error_code &ec) {
- if (ec) {
- TRANSPORT_LOGE("Download failed!!");
- }
-}
-
bool HTTPClientConnection::verifyData(
ConsumerSocket &c, const core::ContentObject &contentObject) {
if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -192,6 +184,49 @@ HTTPClientConnection &HTTPClientConnection::setCertificate(
return *this;
}
+// Read buffer management
+void HTTPClientConnection::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ if (!read_bytes_callback_) {
+ if (!read_buffer_) {
+ read_buffer_ = std::move(buffer);
+ } else {
+ read_buffer_->prependChain(std::move(buffer));
+ }
+ } else {
+ read_bytes_callback_->onBytesReceived(std::move(buffer));
+ }
+}
+
+// Read buffer management
+void HTTPClientConnection::readError(const std::error_code ec) noexcept {
+ TRANSPORT_LOGE("Error %s during download of %s", ec.message().c_str(),
+ current_url_.c_str());
+ if (read_bytes_callback_) {
+ read_bytes_callback_->onError(ec);
+ }
+
+ return_code_ = HTTPClientConnection::RC::DOWNLOAD_FAILED;
+}
+
+void HTTPClientConnection::readSuccess(std::size_t total_size) noexcept {
+ auto response = success_callback_(total_size);
+ if (read_bytes_callback_) {
+ read_bytes_callback_->onSuccess(total_size);
+ } else {
+ response->reserve(total_size);
+ const utils::MemBuf *head = read_buffer_.get(), *current = head;
+ do {
+ response->insert(response->end(), current->data(), current->tail());
+ current = current->next();
+ } while (current != head);
+
+ read_buffer_.reset();
+ }
+
+ return_code_ = HTTPClientConnection::RC::DOWNLOAD_SUCCESS;
+}
+
} // namespace http
} // namespace transport
diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h
index faad1864c..6c150f848 100644
--- a/libtransport/src/hicn/transport/http/client_connection.h
+++ b/libtransport/src/hicn/transport/http/client_connection.h
@@ -31,18 +31,30 @@ namespace http {
using namespace interface;
using namespace core;
-class HTTPClientConnection {
+class HTTPClientConnection : public ConsumerSocket::ReadCallback {
+ static constexpr uint32_t max_buffer_capacity = 64 * 1024;
+
public:
+ class ReadBytesCallback {
+ public:
+ virtual void onBytesReceived(std::unique_ptr<utils::MemBuf> &&buffer) = 0;
+ virtual void onSuccess(std::size_t bytes) = 0;
+ virtual void onError(const std::error_code ec) = 0;
+ };
+
+ enum class RC : uint32_t { DOWNLOAD_FAILED, DOWNLOAD_SUCCESS };
+
HTTPClientConnection();
- HTTPClientConnection &get(const std::string &url, HTTPHeaders headers = {},
- HTTPPayload payload = {},
- std::shared_ptr<HTTPResponse> response = nullptr);
+ RC get(const std::string &url, HTTPHeaders headers = {},
+ HTTPPayload payload = {},
+ std::shared_ptr<HTTPResponse> response = nullptr,
+ ReadBytesCallback *callback = nullptr);
- HTTPClientConnection &sendRequest(
- const std::string &url, HTTPMethod method, HTTPHeaders headers = {},
- HTTPPayload payload = {},
- std::shared_ptr<HTTPResponse> response = nullptr);
+ RC sendRequest(const std::string &url, HTTPMethod method,
+ HTTPHeaders headers = {}, HTTPPayload payload = {},
+ std::shared_ptr<HTTPResponse> response = nullptr,
+ ReadBytesCallback *callback = nullptr);
HTTPResponse response();
@@ -55,11 +67,8 @@ class HTTPClientConnection {
HTTPClientConnection &setCertificate(const std::string &cert_path);
private:
- void processPayload(interface::ConsumerSocket &c,
- std::size_t bytes_transferred, const std::error_code &ec);
-
- std::string sendRequestGetReply(const HTTPRequest &request,
- std::shared_ptr<HTTPResponse> &response);
+ void sendRequestGetReply(const HTTPRequest &request,
+ std::shared_ptr<HTTPResponse> &response);
bool verifyData(interface::ConsumerSocket &c,
const core::ContentObject &contentObject);
@@ -68,12 +77,42 @@ class HTTPClientConnection {
const core::Interest &interest,
std::string &payload);
+ // Read callback
+ bool isBufferMovable() noexcept override { return true; }
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {}
+ void readDataAvailable(size_t length) noexcept override {}
+ size_t maxBufferSize() const override { return max_buffer_capacity; }
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+ void readError(const std::error_code ec) noexcept override;
+ void readSuccess(std::size_t total_size) noexcept override;
+
asio::io_service io_service_;
+ // The consumer socket
ConsumerSocket consumer_;
+ // The current url provided by the application
+ std::string current_url_;
+ // The current hICN name used for downloading
+ std::stringstream name_;
+ // Function to be called when the read is successful
+ std::function<std::shared_ptr<HTTPResponse>(std::size_t)> success_callback_;
+ // Return code for current download
+ RC return_code_;
+
+ // Application provided callback for saving the received content during
+ // the download. If this callback is used, the HTTPClient will NOT save
+ // any byte internally.
+ ReadBytesCallback *read_bytes_callback_;
+
+ // Internal read buffer and HTTP response, to be used if the application does
+ // not provide any read_bytes_callback
+ std::unique_ptr<utils::MemBuf> read_buffer_;
std::shared_ptr<HTTPResponse> response_;
+ // Timer
std::unique_ptr<asio::steady_timer> timer_;
};
diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h
index 99480028a..4e7350b76 100644
--- a/libtransport/src/hicn/transport/http/server_acceptor.h
+++ b/libtransport/src/hicn/transport/http/server_acceptor.h
@@ -15,7 +15,6 @@
#pragma once
-#include <hicn/transport/http/callbacks.h>
#include <hicn/transport/http/default_values.h>
#include <hicn/transport/http/request.h>
#include <hicn/transport/http/server_publisher.h>
@@ -31,6 +30,9 @@ namespace http {
class HTTPServerAcceptor {
friend class HTTPServerPublisher;
+ using OnHttpRequest =
+ std::function<void(std::shared_ptr<HTTPServerPublisher> &,
+ const uint8_t *, std::size_t, int request_id)>;
public:
HTTPServerAcceptor(std::string &&server_locator, OnHttpRequest callback);
diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h
index f3c39c3fe..1f12fd8f9 100644
--- a/libtransport/src/hicn/transport/http/server_publisher.h
+++ b/libtransport/src/hicn/transport/http/server_publisher.h
@@ -63,8 +63,6 @@ class HTTPServerPublisher {
std::unique_ptr<ProducerSocket> producer_;
ProducerInterestCallback interest_enter_callback_;
utils::UserCallback wait_callback_;
-
- ContentBuffer receive_buffer_;
};
} // end namespace http
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
index cbf371bac..2ff4fda56 100644
--- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -19,15 +19,13 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/async_transport.h
- ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.h
${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h
${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
)
list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc
diff --git a/libtransport/src/hicn/transport/interfaces/async_transport.h b/libtransport/src/hicn/transport/interfaces/async_transport.h
deleted file mode 100644
index 692dd318c..000000000
--- a/libtransport/src/hicn/transport/interfaces/async_transport.h
+++ /dev/null
@@ -1,641 +0,0 @@
-
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/publication_options.h>
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/portability/portability.h>
-
-#ifndef _WIN32
-#include <sys/uio.h>
-#endif
-
-#include <memory>
-
-namespace transport {
-
-namespace interface {
-
-/*
- * flags given by the application for write* calls
- */
-enum class WriteFlags : uint32_t {
- NONE = 0x00,
- /*
- * Whether to delay the output until a subsequent non-corked write.
- * (Note: may not be supported in all subclasses or on all platforms.)
- */
- CORK = 0x01,
- /*
- * for a socket that has ACK latency enabled, it will cause the kernel
- * to fire a TCP ESTATS event when the last byte of the given write call
- * will be acknowledged.
- */
- EOR = 0x02,
- /*
- * this indicates that only the write side of socket should be shutdown
- */
- WRITE_SHUTDOWN = 0x04,
- /*
- * use msg zerocopy if allowed
- */
- WRITE_MSG_ZEROCOPY = 0x08,
-};
-
-/*
- * union operator
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags operator|(WriteFlags a, WriteFlags b) {
- return static_cast<WriteFlags>(static_cast<uint32_t>(a) |
- static_cast<uint32_t>(b));
-}
-
-/*
- * compound assignment union operator
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags &operator|=(WriteFlags &a, WriteFlags b) {
- a = a | b;
- return a;
-}
-
-/*
- * intersection operator
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags operator&(WriteFlags a, WriteFlags b) {
- return static_cast<WriteFlags>(static_cast<uint32_t>(a) &
- static_cast<uint32_t>(b));
-}
-
-/*
- * compound assignment intersection operator
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags &operator&=(WriteFlags &a, WriteFlags b) {
- a = a & b;
- return a;
-}
-
-/*
- * exclusion parameter
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags operator~(WriteFlags a) {
- return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
-}
-
-/*
- * unset operator
- */
-TRANSPORT_ALWAYS_INLINE WriteFlags unSet(WriteFlags a, WriteFlags b) {
- return a & ~b;
-}
-
-/*
- * inclusion operator
- */
-TRANSPORT_ALWAYS_INLINE bool isSet(WriteFlags a, WriteFlags b) {
- return (a & b) == b;
-}
-
-class ConnectCallback {
- public:
- virtual ~ConnectCallback() = default;
-
- /**
- * connectSuccess() will be invoked when the connection has been
- * successfully established.
- */
- virtual void connectSuccess() noexcept = 0;
-
- /**
- * connectErr() will be invoked if the connection attempt fails.
- *
- * @param ex An exception describing the error that occurred.
- */
- virtual void connectErr(const std::error_code ec) noexcept = 0;
-};
-
-/**
- * AsyncSocket defines an asynchronous API for streaming I/O.
- *
- * This class provides an API to for asynchronously waiting for data
- * on a streaming transport, and for asynchronously sending data.
- *
- * The APIs for reading and writing are intentionally asymmetric. Waiting for
- * data to read is a persistent API: a callback is installed, and is notified
- * whenever new data is available. It continues to be notified of new events
- * until it is uninstalled.
- *
- * AsyncSocket does not provide read timeout functionality, because it
- * typically cannot determine when the timeout should be active. Generally, a
- * timeout should only be enabled when processing is blocked waiting on data
- * from the remote endpoint. For server-side applications, the timeout should
- * not be active if the server is currently processing one or more outstanding
- * requests on this transport. For client-side applications, the timeout
- * should not be active if there are no requests pending on the transport.
- * Additionally, if a client has multiple pending requests, it will ususally
- * want a separate timeout for each request, rather than a single read timeout.
- *
- * The write API is fairly intuitive: a user can request to send a block of
- * data, and a callback will be informed once the entire block has been
- * transferred to the kernel, or on error. AsyncSocket does provide a send
- * timeout, since most callers want to give up if the remote end stops
- * responding and no further progress can be made sending the data.
- */
-class AsyncSocket {
- public:
- /**
- * Close the transport.
- *
- * This gracefully closes the transport, waiting for all pending write
- * requests to complete before actually closing the underlying transport.
- *
- * If a read callback is set, readEOF() will be called immediately. If there
- * are outstanding write requests, the close will be delayed until all
- * remaining writes have completed. No new writes may be started after
- * close() has been called.
- */
- virtual void close() = 0;
-
- /**
- * Close the transport immediately.
- *
- * This closes the transport immediately, dropping any outstanding data
- * waiting to be written.
- *
- * If a read callback is set, readEOF() will be called immediately.
- * If there are outstanding write requests, these requests will be aborted
- * and writeError() will be invoked immediately on all outstanding write
- * callbacks.
- */
- virtual void closeNow() = 0;
-
- /**
- * Perform a half-shutdown of the write side of the transport.
- *
- * The caller should not make any more calls to write() or writev() after
- * shutdownWrite() is called. Any future write attempts will fail
- * immediately.
- *
- * Not all transport types support half-shutdown. If the underlying
- * transport does not support half-shutdown, it will fully shutdown both the
- * read and write sides of the transport. (Fully shutting down the socket is
- * better than doing nothing at all, since the caller may rely on the
- * shutdownWrite() call to notify the other end of the connection that no
- * more data can be read.)
- *
- * If there is pending data still waiting to be written on the transport,
- * the actual shutdown will be delayed until the pending data has been
- * written.
- *
- * Note: There is no corresponding shutdownRead() equivalent. Simply
- * uninstall the read callback if you wish to stop reading. (On TCP sockets
- * at least, shutting down the read side of the socket is a no-op anyway.)
- */
- virtual void shutdownWrite() = 0;
-
- /**
- * Perform a half-shutdown of the write side of the transport.
- *
- * shutdownWriteNow() is identical to shutdownWrite(), except that it
- * immediately performs the shutdown, rather than waiting for pending writes
- * to complete. Any pending write requests will be immediately failed when
- * shutdownWriteNow() is called.
- */
- virtual void shutdownWriteNow() = 0;
-
- /**
- * Determine if transport is open and ready to read or write.
- *
- * Note that this function returns false on EOF; you must also call error()
- * to distinguish between an EOF and an error.
- *
- * @return true iff the transport is open and ready, false otherwise.
- */
- virtual bool good() const = 0;
-
- /**
- * Determine if the transport is readable or not.
- *
- * @return true iff the transport is readable, false otherwise.
- */
- virtual bool readable() const = 0;
-
- /**
- * Determine if the transport is writable or not.
- *
- * @return true iff the transport is writable, false otherwise.
- */
- virtual bool writable() const {
- // By default return good() - leave it to implementers to override.
- return good();
- }
-
- /**
- * Determine if the there is pending data on the transport.
- *
- * @return true iff the if the there is pending data, false otherwise.
- */
- virtual bool isPending() const { return readable(); }
-
- /**
- * Determine if transport is connected to the endpoint
- *
- * @return false iff the transport is connected, otherwise true
- */
- virtual bool connected() const = 0;
-
- /**
- * Determine if an error has occurred with this transport.
- *
- * @return true iff an error has occurred (not EOF).
- */
- virtual bool error() const = 0;
-
- // /**
- // * Attach the transport to a EventBase.
- // *
- // * This may only be called if the transport is not currently attached to a
- // * EventBase (by an earlier call to detachEventBase()).
- // *
- // * This method must be invoked in the EventBase's thread.
- // */
- // virtual void attachEventBase(EventBase* eventBase) = 0;
-
- // /**
- // * Detach the transport from its EventBase.
- // *
- // * This may only be called when the transport is idle and has no reads or
- // * writes pending. Once detached, the transport may not be used again
- // until
- // * it is re-attached to a EventBase by calling attachEventBase().
- // *
- // * This method must be called from the current EventBase's thread.
- // */
- // virtual void detachEventBase() = 0;
-
- // /**
- // * Determine if the transport can be detached.
- // *
- // * This method must be called from the current EventBase's thread.
- // */
- // virtual bool isDetachable() const = 0;
-
- /**
- * Set the send timeout.
- *
- * If write requests do not make any progress for more than the specified
- * number of milliseconds, fail all pending writes and close the transport.
- *
- * If write requests are currently pending when setSendTimeout() is called,
- * the timeout interval is immediately restarted using the new value.
- *
- * @param milliseconds The timeout duration, in milliseconds. If 0, no
- * timeout will be used.
- */
- virtual void setSendTimeout(uint32_t milliseconds) = 0;
-
- /**
- * Get the send timeout.
- *
- * @return Returns the current send timeout, in milliseconds. A return value
- * of 0 indicates that no timeout is set.
- */
- virtual uint32_t getSendTimeout() const = 0;
-
- virtual void connect(ConnectCallback *callback,
- const core::Prefix &prefix_) = 0;
-
- // /**
- // * Get the address of the local endpoint of this transport.
- // *
- // * This function may throw AsyncSocketException on error.
- // *
- // * @param address The local address will be stored in the specified
- // * SocketAddress.
- // */
- // virtual void getLocalAddress(* address) const = 0;
-
- virtual size_t getAppBytesWritten() const = 0;
- virtual size_t getRawBytesWritten() const = 0;
- virtual size_t getAppBytesReceived() const = 0;
- virtual size_t getRawBytesReceived() const = 0;
-
- class BufferCallback {
- public:
- virtual ~BufferCallback() {}
- virtual void onEgressBuffered() = 0;
- virtual void onEgressBufferCleared() = 0;
- };
-
- ~AsyncSocket() = default;
-};
-
-class AsyncAcceptor {
- public:
- class AcceptCallback {
- public:
- virtual ~AcceptCallback() = default;
-
- /**
- * connectionAccepted() is called whenever a new client connection is
- * received.
- *
- * The AcceptCallback will remain installed after connectionAccepted()
- * returns.
- *
- * @param fd The newly accepted client socket. The AcceptCallback
- * assumes ownership of this socket, and is responsible
- * for closing it when done. The newly accepted file
- * descriptor will have already been put into
- * non-blocking mode.
- * @param clientAddr A reference to a SocketAddress struct containing the
- * client's address. This struct is only guaranteed to
- * remain valid until connectionAccepted() returns.
- */
- virtual void connectionAccepted(
- const core::Name &subscriber_name) noexcept = 0;
-
- /**
- * acceptError() is called if an error occurs while accepting.
- *
- * The AcceptCallback will remain installed even after an accept error,
- * as the errors are typically somewhat transient, such as being out of
- * file descriptors. The server socket must be explicitly stopped if you
- * wish to stop accepting after an error.
- *
- * @param ex An exception representing the error.
- */
- virtual void acceptError(const std::exception &ex) noexcept = 0;
-
- /**
- * acceptStarted() will be called in the callback's EventBase thread
- * after this callback has been added to the AsyncServerSocket.
- *
- * acceptStarted() will be called before any calls to connectionAccepted()
- * or acceptError() are made on this callback.
- *
- * acceptStarted() makes it easier for callbacks to perform initialization
- * inside the callback thread. (The call to addAcceptCallback() must
- * always be made from the AsyncServerSocket's primary EventBase thread.
- * acceptStarted() provides a hook that will always be invoked in the
- * callback's thread.)
- *
- * Note that the call to acceptStarted() is made once the callback is
- * added, regardless of whether or not the AsyncServerSocket is actually
- * accepting at the moment. acceptStarted() will be called even if the
- * AsyncServerSocket is paused when the callback is added (including if
- * the initial call to startAccepting() on the AsyncServerSocket has not
- * been made yet).
- */
- virtual void acceptStarted() noexcept {}
-
- /**
- * acceptStopped() will be called when this AcceptCallback is removed from
- * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
- * whichever occurs first.
- *
- * No more calls to connectionAccepted() or acceptError() will be made
- * after acceptStopped() is invoked.
- */
- virtual void acceptStopped() noexcept {}
- };
-
- /**
- * Wait for subscribers
- *
- */
- virtual void waitForSubscribers(AcceptCallback *cb) = 0;
-};
-
-class AsyncReader {
- public:
- class ReadCallback {
- public:
- virtual ~ReadCallback() = default;
-
- /**
- * When data becomes available, getReadBuffer() will be invoked to get the
- * buffer into which data should be read.
- *
- * This method allows the ReadCallback to delay buffer allocation until
- * data becomes available. This allows applications to manage large
- * numbers of idle connections, without having to maintain a separate read
- * buffer for each idle connection.
- *
- * It is possible that in some cases, getReadBuffer() may be called
- * multiple times before readDataAvailable() is invoked. In this case, the
- * data will be written to the buffer returned from the most recent call to
- * readDataAvailable(). If the previous calls to readDataAvailable()
- * returned different buffers, the ReadCallback is responsible for ensuring
- * that they are not leaked.
- *
- * If getReadBuffer() throws an exception, returns a nullptr buffer, or
- * returns a 0 length, the ReadCallback will be uninstalled and its
- * readError() method will be invoked.
- *
- * getReadBuffer() is not allowed to change the transport state before it
- * returns. (For example, it should never uninstall the read callback, or
- * set a different read callback.)
- *
- * @param bufReturn getReadBuffer() should update *bufReturn to contain the
- * address of the read buffer. This parameter will never
- * be nullptr.
- * @param lenReturn getReadBuffer() should update *lenReturn to contain the
- * maximum number of bytes that may be written to the read
- * buffer. This parameter will never be nullptr.
- *
- *
- * XXX TODO this does not seems to be completely true Checlk i/.
- */
- virtual void getReadBuffer(void **bufReturn, size_t *lenReturn) = 0;
-
- /**
- * readDataAvailable() will be invoked when data has been successfully read
- * into the buffer returned by the last call to getReadBuffer().
- *
- * The read callback remains installed after readDataAvailable() returns.
- * It must be explicitly uninstalled to stop receiving read events.
- * getReadBuffer() will be called at least once before each call to
- * readDataAvailable(). getReadBuffer() will also be called before any
- * call to readEOF().
- *
- * @param len The number of bytes placed in the buffer.
- */
-
- virtual void readDataAvailable(size_t len) noexcept = 0;
-
- /**
- * When data becomes available, isBufferMovable() will be invoked to figure
- * out which API will be used, readBufferAvailable() or
- * readDataAvailable(). If isBufferMovable() returns true, that means
- * ReadCallback supports the IOBuf ownership transfer and
- * readBufferAvailable() will be used. Otherwise, not.
-
- * By default, isBufferMovable() always return false. If
- * readBufferAvailable() is implemented and to be invoked, You should
- * overwrite isBufferMovable() and return true in the inherited class.
- *
- * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
- * itself until data becomes available. Compared with the pre/post buffer
- * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
- * has two advantages. First, this can avoid memcpy. E.g., in
- * AsyncSSLSocket, the decrypted data was copied from the openssl internal
- * buffer to the readbuf buffer. With the buffer ownership transfer, the
- * internal buffer can be directly "moved" to ReadCallback. Second, the
- * memory allocation can be more precise. The reason is
- * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
- * because they have more context about the available data than
- * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes
- * buffer, but the available data is always 16KB (max OpenSSL record size).
- */
-
- virtual bool isBufferMovable() noexcept { return false; }
-
- /**
- * Suggested buffer size, allocated for read operations,
- * if callback is movable and supports folly::IOBuf
- */
-
- virtual size_t maxBufferSize() const {
- return 64 * 1024; // 64K
- }
-
- /**
- * readBufferAvailable() will be invoked when data has been successfully
- * read.
- *
- * Note that only either readBufferAvailable() or readDataAvailable() will
- * be invoked according to the return value of isBufferMovable(). The timing
- * and aftereffect of readBufferAvailable() are the same as
- * readDataAvailable()
- *
- * @param readBuf The unique pointer of read buffer.
- */
-
- // virtual void readBufferAvailable(uint8_t** buffer, std::size_t
- // *buf_length) noexcept {}
-
- virtual void readBufferAvailable(ContentBuffer &&buffer) noexcept {}
-
- // virtual void readBufferAvailable(utils::SharableBuffer<uint8_t>&& buffer)
- // noexcept {}
-
- /**
- * readEOF() will be invoked when the transport is closed.
- *
- * The read callback will be automatically uninstalled immediately before
- * readEOF() is invoked.
- */
- virtual void readEOF() noexcept = 0;
-
- /**
- * readError() will be invoked if an error occurs reading from the
- * transport.
- *
- * The read callback will be automatically uninstalled immediately before
- * readError() is invoked.
- *
- * @param ex An exception describing the error that occurred.
- */
- virtual void readErr(const std::error_code ec) noexcept = 0;
- };
-
- // Read methods that aren't part of AsyncTransport.
- virtual void setReadCB(ReadCallback *callback) = 0;
- virtual ReadCallback *getReadCallback() const = 0;
-
- protected:
- virtual ~AsyncReader() = default;
-};
-
-class AsyncWriter {
- public:
- class WriteCallback {
- public:
- virtual ~WriteCallback() = default;
-
- /**
- * writeSuccess() will be invoked when all of the data has been
- * successfully written.
- *
- * Note that this mainly signals that the buffer containing the data to
- * write is no longer needed and may be freed or re-used. It does not
- * guarantee that the data has been fully transmitted to the remote
- * endpoint. For example, on socket-based transports, writeSuccess() only
- * indicates that the data has been given to the kernel for eventual
- * transmission.
- */
- virtual void writeSuccess() noexcept = 0;
-
- /**
- * writeError() will be invoked if an error occurs writing the data.
- *
- * @param bytesWritten The number of bytes that were successfull
- * @param ex An exception describing the error that occurred.
- */
- virtual void writeErr(size_t bytesWritten) noexcept = 0;
- };
-
- /**
- * If you supply a non-null WriteCallback, exactly one of writeSuccess()
- * or writeErr() will be invoked when the write completes. If you supply
- * the same WriteCallback object for multiple write() calls, it will be
- * invoked exactly once per call. The only way to cancel outstanding
- * write requests is to close the socket (e.g., with closeNow() or
- * shutdownWriteNow()). When closing the socket this way, writeErr() will
- * still be invoked once for each outstanding write operation.
- */
- virtual void write(WriteCallback *callback, const void *buf, size_t bytes,
- const PublicationOptions &options,
- WriteFlags flags = WriteFlags::NONE) = 0;
-
- /**
- * If you supply a non-null WriteCallback, exactly one of writeSuccess()
- * or writeErr() will be invoked when the write completes. If you supply
- * the same WriteCallback object for multiple write() calls, it will be
- * invoked exactly once per call. The only way to cancel outstanding
- * write requests is to close the socket (e.g., with closeNow() or
- * shutdownWriteNow()). When closing the socket this way, writeErr() will
- * still be invoked once for each outstanding write operation.
- */
- virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer,
- const PublicationOptions &options,
- WriteFlags flags = WriteFlags::NONE) = 0;
-
- // /**
- // * If you supply a non-null WriteCallback, exactly one of writeSuccess()
- // * or writeErr() will be invoked when the write completes. If you supply
- // * the same WriteCallback object for multiple write() calls, it will be
- // * invoked exactly once per call. The only way to cancel outstanding
- // * write requests is to close the socket (e.g., with closeNow() or
- // * shutdownWriteNow()). When closing the socket this way, writeErr() will
- // * still be invoked once for each outstanding write operation.
- // */
- // virtual void writeChain(
- // WriteCallback* callback,
- // std::unique_ptr<IOBuf>&& buf,
- // WriteFlags flags = WriteFlags::NONE) = 0;
-
- virtual void setWriteCB(WriteCallback *callback) = 0;
- virtual WriteCallback *getWriteCallback() const = 0;
-
- protected:
- virtual ~AsyncWriter() = default;
-};
-
-} // namespace interface
-
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h
new file mode 100644
index 000000000..24f47eb75
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/callbacks.h
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <system_error>
+
+namespace utils {
+class MemBuf;
+}
+
+namespace transport {
+
+namespace protocol {
+
+class IcnObserver;
+class TransportStatistics;
+
+} // namespace protocol
+
+namespace core {
+
+class ContentObject;
+class Interest;
+} // namespace core
+
+namespace interface {
+
+// Forward declarations
+class ConsumerSocket;
+class ProducerSocket;
+
+/**
+ * The ConsumerInterestCallback will be called in different parts of the
+ * consumer socket processing pipeline, with a ConsumerSocket and an Interest as
+ * parameters.
+ */
+using ConsumerInterestCallback =
+ std::function<void(ConsumerSocket &, const core::Interest &)>;
+
+/**
+ * The ConsumerTimerCallback is called periodically for exposing to applications
+ * a summary of the statistics of the transport protocol in use.
+ */
+using ConsumerTimerCallback = std::function<void(
+ ConsumerSocket &, const protocol::TransportStatistics &stats)>;
+
+/**
+ * The ProducerContentCallback will be called by the producer socket right after
+ * a content has been segmented and published.
+ */
+using ProducerContentCallback = std::function<void(
+ ProducerSocket &, const std::error_code &, uint64_t bytes_written)>;
+
+/**
+ * The ConsumerContentObjectCallback will be called in different parts of the
+ * consumer socket processing pipeline, with a ConsumerSocket and an
+ * ContentObject as parameters.
+ */
+using ConsumerContentObjectCallback =
+ std::function<void(ConsumerSocket &, const core::ContentObject &)>;
+
+/**
+ * The ConsumerContentObjectVerificationCallback will be called by the transport
+ * if an application is willing to verify each content object. Note that a
+ * better alternative is to instrument the transport to perform the verification
+ * autonomously, without requiring the intervention of the application.
+ */
+using ConsumerContentObjectVerificationCallback =
+ std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
+
+/**
+ * The ConsumerManifestCallback will be called by the consumer socket when a
+ * manifest is received.
+ */
+using ConsumerManifestCallback =
+ std::function<void(ConsumerSocket &, const core::ContentObjectManifest &)>;
+
+/**
+ * The ProducerContentObjectCallback will be called in different parts of the
+ * consumer socket processing pipeline, with a ProducerSocket and an
+ * ContentObject as parameters.
+ */
+using ProducerContentObjectCallback =
+ std::function<void(ProducerSocket &, core::ContentObject &)>;
+
+/**
+ * The ProducerContentObjectCallback will be called in different parts of the
+ * consumer socket processing pipeline, with a ProducerSocket and an
+ * Interest as parameters.
+ */
+using ProducerInterestCallback =
+ std::function<void(ProducerSocket &, core::Interest &)>;
+
+} // namespace interface
+
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
deleted file mode 100644
index fdd422dee..000000000
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/full_duplex_socket.h>
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-
-#include <memory>
-
-namespace transport {
-
-namespace interface {
-
-static const std::string producer_identity = "producer_socket";
-
-AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator)
- : AsyncFullDuplexSocket(locator, internal_io_service_) {}
-
-AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator,
- asio::io_service &io_service)
- : locator_(locator),
- incremental_suffix_(0),
- io_service_(io_service),
- work_(io_service),
- producer_(std::make_unique<ProducerSocket>(io_service_)),
- consumer_(std::make_unique<ConsumerSocket>(
- TransportProtocolAlgorithms::RAAQM /* , io_service_ */)),
- read_callback_(nullptr),
- write_callback_(nullptr),
- connect_callback_(nullptr),
- accept_callback_(nullptr),
- internal_connect_callback_(new OnConnectCallback(*this)),
- internal_signal_callback_(new OnSignalCallback(*this)),
- send_timeout_milliseconds_(~0),
- counters_({0}),
- receive_buffer_(ContentBuffer()) {
- using namespace transport;
- using namespace std::placeholders;
- producer_->registerPrefix(locator);
-
- producer_->setSocketOption(
- ProducerCallbacksOptions::CACHE_MISS,
- std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2));
-
- producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE,
- uint32_t{150000});
-
- ProducerContentCallback producer_callback =
- std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3);
- producer_->setSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
- producer_callback);
-
- producer_->connect();
-
- consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
- (ConsumerContentObjectVerificationCallback)[](
- ConsumerSocket & s, const ContentObject &c)
- ->bool { return true; });
-
- ConsumerContentCallback consumer_callback =
- std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3);
- consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- consumer_callback);
-
- consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX,
- uint32_t{4});
-
- consumer_->connect();
-}
-
-void AsyncFullDuplexSocket::close() {
- this->consumer_->stop();
- this->producer_->stop();
-}
-
-void AsyncFullDuplexSocket::closeNow() { close(); }
-
-void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); }
-
-void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); }
-
-bool AsyncFullDuplexSocket::good() const { return true; }
-
-bool AsyncFullDuplexSocket::readable() const {
- // TODO return status of consumer socket
- return true;
-}
-
-bool AsyncFullDuplexSocket::writable() const {
- // TODO return status of producer socket
- return true;
-}
-
-bool AsyncFullDuplexSocket::isPending() const {
- // TODO save if there are production operation in the ops queue
- // in producer socket
- return true;
-}
-
-bool AsyncFullDuplexSocket::connected() const {
- // No real connection here (ICN world). Return good
- return good();
-}
-
-bool AsyncFullDuplexSocket::error() const { return !good(); }
-
-void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) {
- // TODO if production takes too much to complete
- // let's abort the operation.
-
- // Normally with hicn this should be done for content
- // pull, not for production.
-
- send_timeout_milliseconds_ = milliseconds;
-}
-
-uint32_t AsyncFullDuplexSocket::getSendTimeout() const {
- return send_timeout_milliseconds_;
-}
-
-size_t AsyncFullDuplexSocket::getAppBytesWritten() const {
- return counters_.app_bytes_written_;
-}
-
-size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; }
-
-size_t AsyncFullDuplexSocket::getAppBytesReceived() const {
- return counters_.app_bytes_read_;
-}
-
-size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; }
-
-void AsyncFullDuplexSocket::connect(ConnectCallback *callback,
- const core::Prefix &prefix) {
- connect_callback_ = callback;
-
- // Create an interest for a subscription
- auto interest =
- core::Interest::Ptr(new core::Interest(prefix.makeRandomName()));
- auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
- _payload->append(sizeof(ActionMessage));
- auto payload = _payload->writableData();
- ActionMessage *subscription_message =
- reinterpret_cast<ActionMessage *>(payload);
- subscription_message->header.msg_type = MessageType::ACTION;
- subscription_message->action = Action::SUBSCRIBE;
- subscription_message->header.reserved[0] = 0;
- subscription_message->header.reserved[1] = 0;
-
- // Set the name the other part should use for notifying a content production
- sync_notification_ = std::move(locator_.makeRandomName());
- sync_notification_.copyToDestination(
- reinterpret_cast<uint8_t *>(subscription_message->name));
-
- TRANSPORT_LOGI(
- "Trying to connect. Sending interest: %s, name for notifications: %s",
- prefix.getName().toString().c_str(),
- sync_notification_.toString().c_str());
-
- interest->setLifetime(1000);
- interest->appendPayload(std::move(_payload));
- consumer_->asyncSendInterest(std::move(interest),
- internal_connect_callback_.get());
-}
-
-void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf,
- size_t bytes,
- const PublicationOptions &options,
- WriteFlags flags) {
- using namespace transport;
-
- // 1 asynchronously write the content. I assume here the
- // buffer contains the whole application frame. FIXME: check
- // if this is true and fix it accordingly
- std::cout << "Size of the PAYLOAD: " << bytes << std::endl;
-
- if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) {
- TRANSPORT_LOGI("Producing content with name %s",
- options.getName().toString().c_str());
- producer_->asyncProduce(options.getName(),
- reinterpret_cast<const uint8_t *>(buf), bytes);
- signalProductionToSubscribers(options.getName());
- } else {
- TRANSPORT_LOGI("Sending payload through interest");
- piggybackPayloadToSubscribers(
- options.getName(), reinterpret_cast<const uint8_t *>(buf), bytes);
- }
-}
-
-void AsyncFullDuplexSocket::write(WriteCallback *callback,
- ContentBuffer &&output_buffer,
- const PublicationOptions &options,
- WriteFlags flags) {
- using namespace transport;
-
- // 1 asynchronously write the content. I assume here the
- // buffer contains the whole application frame. FIXME: check
- // if this is true and fix it accordingly
- std::cout << "Size of the PAYLOAD: " << output_buffer->size() << std::endl;
-
- if (output_buffer->size() >
- core::Packet::default_mtu - sizeof(PayloadMessage)) {
- TRANSPORT_LOGI("Producing content with name %s",
- options.getName().toString().c_str());
- producer_->asyncProduce(options.getName(), std::move(output_buffer));
- signalProductionToSubscribers(options.getName());
- } else {
- TRANSPORT_LOGI("Sending payload through interest");
- piggybackPayloadToSubscribers(options.getName(), &(*output_buffer)[0],
- output_buffer->size());
- }
-}
-
-void AsyncFullDuplexSocket::piggybackPayloadToSubscribers(
- const core::Name &name, const uint8_t *buffer, std::size_t bytes) {
- for (auto &sub : subscribers_) {
- auto interest = core::Interest::Ptr(new core::Interest(name));
- auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage));
- _payload->append(bytes + sizeof(PayloadMessage));
- auto payload = _payload->writableData();
-
- PayloadMessage *interest_payload =
- reinterpret_cast<PayloadMessage *>(payload);
- interest_payload->header.msg_type = MessageType::PAYLOAD;
- interest_payload->header.reserved[0] = 0;
- interest_payload->header.reserved[1] = 0;
- interest_payload->reserved[0] = 0;
- std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes);
- interest->appendPayload(std::move(_payload));
-
- // Set the timeout of 0.2 second
- interest->setLifetime(1000);
- interest->setName(sub);
- interest->getWritableName().setSuffix(incremental_suffix_++);
- // TRANSPORT_LOGI("Sending signalization to %s",
- // interest->getName().toString().c_str());
-
- consumer_->asyncSendInterest(std::move(interest),
- internal_signal_callback_.get());
- }
-}
-
-void AsyncFullDuplexSocket::signalProductionToSubscribers(
- const core::Name &name) {
- // Signal the other part we are producing a content
- // Create an interest for a subscription
-
- for (auto &sub : subscribers_) {
- auto interest = core::Interest::Ptr(new core::Interest(name));
- // Todo consider using preallocated pool of membufs
- auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
- _payload->append(sizeof(ActionMessage));
- auto payload = interest->getPayload()->writableData();
-
- ActionMessage *produce_notification =
- reinterpret_cast<ActionMessage *>(payload);
- produce_notification->header.msg_type = MessageType::ACTION;
- produce_notification->action = Action::SIGNAL_PRODUCTION;
- produce_notification->header.reserved[0] = 0;
- produce_notification->header.reserved[1] = 0;
- name.copyToDestination(
- reinterpret_cast<uint8_t *>(produce_notification->name));
- interest->appendPayload(std::move(_payload));
-
- // Set the timeout of 0.2 second
- interest->setLifetime(1000);
- interest->setName(sub);
- interest->getWritableName().setSuffix(incremental_suffix_++);
- // TRANSPORT_LOGI("Sending signalization to %s",
- // interest->getName().toString().c_str());
-
- consumer_->asyncSendInterest(std::move(interest),
- internal_signal_callback_.get());
- }
-}
-
-void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) {
- accept_callback_ = cb;
-}
-
-std::shared_ptr<core::ContentObject>
-AsyncFullDuplexSocket::decodeSynchronizationMessage(
- const core::Interest &interest) {
- auto mesg = interest.getPayload();
- const MessageHeader *header =
- reinterpret_cast<const MessageHeader *>(mesg->data());
-
- switch (header->msg_type) {
- case MessageType::ACTION: {
- // Check what is the action to perform
- const ActionMessage *message =
- reinterpret_cast<const ActionMessage *>(header);
-
- if (message->action == Action::SUBSCRIBE) {
- // Add consumer to list on consumers to be notified
- auto ret =
- subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0);
- TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str());
- if (ret.second) {
- accept_callback_->connectionAccepted(*ret.first);
- }
-
- TRANSPORT_LOGI("Connection success!");
-
- sync_notification_ = std::move(locator_.makeRandomName());
- return createSubscriptionResponse(sync_notification_);
-
- } else if (message->action == Action::CANCEL_SUBSCRIPTION) {
- // XXX Modify name!!! Each allocated name allocates a 128 bit array.
- subscribers_.erase(
- core::Name(AF_INET6, (const uint8_t *)message->name, 0));
- return createAck();
- } else if (message->action == Action::SIGNAL_PRODUCTION) {
- // trigger a reverse pull for the name contained in the message
- core::Name n(AF_INET6, (const uint8_t *)message->name, 0);
- std::cout << "PROD NOTIFICATION: Content to retrieve: " << n
- << std::endl;
- std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName()
- << std::endl; // << " compared to " << sync_notification_ <<
- // std::endl;
-
- if (sync_notification_.equals(interest.getName(), false)) {
- std::cout << "Starting reverse pull for " << n << std::endl;
- consumer_->asyncConsume(n, receive_buffer_);
- return createAck();
- }
- } else {
- TRANSPORT_LOGE("Received unknown message. Dropping it.");
- }
-
- break;
- }
- case MessageType::RESPONSE: {
- throw errors::RuntimeException(
- "The response should be a content object!!");
- }
- case MessageType::PAYLOAD: {
- // The interest contains the payload directly.
- // We saved one round trip :)
-
- auto buffer = ContentBuffer();
- const uint8_t *data = mesg->data() + sizeof(PayloadMessage);
- buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage));
- read_callback_->readBufferAvailable(std::move(buffer));
- return createAck();
- }
- default: { return std::shared_ptr<core::ContentObject>(nullptr); }
- }
-
- return std::shared_ptr<core::ContentObject>(nullptr);
-}
-
-void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s,
- const core::Interest &i) {
- auto payload = i.getPayload();
- if (payload->length()) {
- // Try to decode payload and see if starting an async pull operation
- auto response = decodeSynchronizationMessage(i);
- if (response) {
- response->setName(i.getName());
- s.produce(*response);
- }
- }
-}
-
-void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer,
- const std::error_code &ec,
- uint64_t bytes_written) {
- if (write_callback_) {
- if (!ec) {
- write_callback_->writeSuccess();
- } else {
- write_callback_->writeErr(bytes_written);
- }
- }
-}
-
-void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s,
- std::size_t size,
- const std::error_code &ec) {
- // Sanity check
- if (size != receive_buffer_->size()) {
- TRANSPORT_LOGE(
- "Received content size differs from size retrieved from the buffer.");
- return;
- }
-
- TRANSPORT_LOGI("Received content with size %zu", size);
- if (!ec) {
- read_callback_->readBufferAvailable(std::move(receive_buffer_));
- } else {
- TRANSPORT_LOGE("Error retrieving content.");
- }
- // consumer_->stop();
-}
-
-void AsyncFullDuplexSocket::OnConnectCallback::onContentObject(
- core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) {
- // The ack message should contain the name to be used for notifying
- // the production of the content to the other part
-
- if (content_object->getPayload()->length() == 0) {
- TRANSPORT_LOGW("Connection response message empty....");
- return;
- }
-
- SubscriptionResponseMessage *response =
- reinterpret_cast<SubscriptionResponseMessage *>(
- content_object->getPayload()->writableData());
-
- if (response->response.header.msg_type == MessageType::RESPONSE) {
- if (response->response.return_code == ReturnCode::OK) {
- auto ret =
- socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0);
- TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s",
- ret.first->toString().c_str());
- socket_.connect_callback_->connectSuccess();
- }
- }
-}
-
-void AsyncFullDuplexSocket::OnSignalCallback::onContentObject(
- core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) {
- return;
-}
-
-void AsyncFullDuplexSocket::OnSignalCallback::onTimeout(
- core::Interest::Ptr &&interest) {
- TRANSPORT_LOGE("Retransmitting signalization interest to %s!!",
- interest->getName().toString().c_str());
- socket_.consumer_->asyncSendInterest(std::move(interest),
- socket_.internal_signal_callback_.get());
-}
-
-void AsyncFullDuplexSocket::OnConnectCallback::onTimeout(
- core::Interest::Ptr &&interest) {
- socket_.connect_callback_->connectErr(
- std::make_error_code(std::errc::not_connected));
-}
-
-std::shared_ptr<core::ContentObject> AsyncFullDuplexSocket::createAck() {
- // Send the response back
- core::Name name("b001::abcd");
- auto response = std::make_shared<core::ContentObject>(name);
- auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
- _payload->append(sizeof(ResponseMessage));
- auto payload = response->getPayload()->data();
- ResponseMessage *response_message = (ResponseMessage *)payload;
- response_message->header.msg_type = MessageType::RESPONSE;
- response_message->header.reserved[0] = 0;
- response_message->header.reserved[1] = 0;
- response_message->return_code = ReturnCode::OK;
- response->appendPayload(std::move(_payload));
- response->setLifetime(0);
- return response;
-}
-
-std::shared_ptr<core::ContentObject>
-AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) {
- // Send the response back
- core::Name tmp_name("b001::abcd");
- auto response = std::make_shared<core::ContentObject>(tmp_name);
- auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage));
- _payload->append(sizeof(SubscriptionResponseMessage));
- auto payload = _payload->data();
- SubscriptionResponseMessage *response_message =
- (SubscriptionResponseMessage *)payload;
- response_message->response.header.msg_type = MessageType::RESPONSE;
- response_message->response.header.reserved[0] = 0;
- response_message->response.header.reserved[1] = 0;
- response_message->response.return_code = ReturnCode::OK;
- name.copyToDestination(reinterpret_cast<uint8_t *>(response_message->name));
- response->appendPayload(std::move(_payload));
- response->setLifetime(0);
- return response;
-}
-
-} // namespace interface
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
deleted file mode 100644
index 438325fdb..000000000
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * This class is created for sending/receiving data over an ICN network.
- */
-
-#pragma once
-
-#include <hicn/transport/core/prefix.h>
-#include <hicn/transport/interfaces/async_transport.h>
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/interfaces/socket_producer.h>
-#include <hicn/transport/portability/portability.h>
-
-#include <unordered_set>
-#include <vector>
-
-namespace transport {
-
-namespace interface {
-
-enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD };
-
-enum class Action : uint8_t {
- SUBSCRIBE,
- CANCEL_SUBSCRIPTION,
- SIGNAL_PRODUCTION,
-};
-
-enum class ReturnCode : uint8_t {
- OK,
- FAILED,
-};
-
-struct MessageHeader {
- MessageType msg_type;
- uint8_t reserved[2];
-};
-
-struct ActionMessage {
- MessageHeader header;
- Action action;
- uint64_t name[2];
-};
-
-struct ResponseMessage {
- MessageHeader header;
- ReturnCode return_code;
-};
-
-struct SubscriptionResponseMessage {
- ResponseMessage response;
- uint64_t name[2];
-};
-
-struct PayloadMessage {
- MessageHeader header;
- uint8_t reserved[1];
-};
-
-// struct NotificationMessage {
-// Action action;
-// uint8_t reserved[3];
-// uint64_t
-// }
-
-using core::Prefix;
-
-class AsyncFullDuplexSocket : public AsyncSocket,
- public AsyncReader,
- public AsyncWriter,
- public AsyncAcceptor {
- private:
- struct Counters {
- uint64_t app_bytes_written_;
- uint64_t app_bytes_read_;
-
- TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) {
- app_bytes_written_ += bytes;
- }
-
- TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) {
- app_bytes_read_ += bytes;
- }
- };
-
- public:
- using UniquePtr = std::unique_ptr<AsyncFullDuplexSocket>;
- using SharedPtr = std::unique_ptr<AsyncFullDuplexSocket>;
-
- AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service);
- AsyncFullDuplexSocket(const core::Prefix &locator);
-
- ~AsyncFullDuplexSocket(){};
-
- using ReadCallback = AsyncReader::ReadCallback;
- using WriteCallback = AsyncWriter::WriteCallback;
-
- TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override {
- read_callback_ = callback;
- }
-
- TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override {
- return read_callback_;
- }
-
- TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override {
- write_callback_ = callback;
- }
-
- TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override {
- return write_callback_;
- }
-
- TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; }
-
- void connect(ConnectCallback *callback, const core::Prefix &prefix) override;
-
- void write(WriteCallback *callback, const void *buf, size_t bytes,
- const PublicationOptions &options,
- WriteFlags flags = WriteFlags::NONE) override;
-
- virtual void write(WriteCallback *callback, ContentBuffer &&output_buffer,
- const PublicationOptions &options,
- WriteFlags flags = WriteFlags::NONE) override;
-
- void waitForSubscribers(AcceptCallback *cb) override;
-
- void close() override;
-
- void closeNow() override;
-
- void shutdownWrite() override;
-
- void shutdownWriteNow() override;
-
- bool good() const override;
-
- bool readable() const override;
-
- bool writable() const override;
-
- bool isPending() const override;
-
- bool connected() const override;
-
- bool error() const override;
-
- void setSendTimeout(uint32_t milliseconds) override;
-
- size_t getAppBytesWritten() const override;
- size_t getRawBytesWritten() const override;
- size_t getAppBytesReceived() const override;
- size_t getRawBytesReceived() const override;
-
- uint32_t getSendTimeout() const override;
-
- private:
- std::shared_ptr<core::ContentObject> decodeSynchronizationMessage(
- const core::Interest &interest);
-
- class OnConnectCallback : public BasePortal::ConsumerCallback {
- public:
- OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){};
- virtual ~OnConnectCallback() = default;
- void onContentObject(core::Interest::Ptr &&,
- core::ContentObject::Ptr &&content_object) override;
- void onTimeout(core::Interest::Ptr &&interest) override;
-
- private:
- AsyncFullDuplexSocket &socket_;
- };
-
- class OnSignalCallback : public BasePortal::ConsumerCallback {
- public:
- OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){};
- virtual ~OnSignalCallback() = default;
- void onContentObject(core::Interest::Ptr &&,
- core::ContentObject::Ptr &&content_object);
- void onTimeout(core::Interest::Ptr &&interest);
-
- private:
- AsyncFullDuplexSocket &socket_;
- };
-
- void onControlInterest(ProducerSocket &s, const core::Interest &i);
- void onContentProduced(ProducerSocket &producer, const std::error_code &ec,
- uint64_t bytes_written);
- void onContentRetrieved(ConsumerSocket &s, std::size_t size,
- const std::error_code &ec);
-
- void signalProductionToSubscribers(const core::Name &name);
- void piggybackPayloadToSubscribers(const core::Name &name,
- const uint8_t *buffer, std::size_t bytes);
-
- std::shared_ptr<core::ContentObject> createAck();
- std::shared_ptr<core::ContentObject> createSubscriptionResponse(
- const core::Name &name);
-
- core::Prefix locator_;
- uint32_t incremental_suffix_;
- core::Name sync_notification_;
- // std::unique_ptr<BasePortal> portal_;
- asio::io_service internal_io_service_;
- asio::io_service &io_service_;
- asio::io_service::work work_;
-
- // These names represent the "locator" of a certain
- // peer that subscribed to this.
- std::unordered_set<core::Name> subscribers_;
-
- // Useful for publishing / Retrieving data
- std::unique_ptr<ProducerSocket> producer_;
- std::unique_ptr<ConsumerSocket> consumer_;
-
- ReadCallback *read_callback_;
- WriteCallback *write_callback_;
- ConnectCallback *connect_callback_;
- AcceptCallback *accept_callback_;
-
- std::unique_ptr<OnConnectCallback> internal_connect_callback_;
- std::unique_ptr<OnSignalCallback> internal_signal_callback_;
-
- uint32_t send_timeout_milliseconds_;
- struct Counters counters_;
- ContentBuffer receive_buffer_;
-};
-
-} // namespace interface
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h
index 7d50d0fbd..90f6a3ef6 100644
--- a/libtransport/src/hicn/transport/interfaces/socket.h
+++ b/libtransport/src/hicn/transport/interfaces/socket.h
@@ -16,18 +16,10 @@
#pragma once
#include <hicn/transport/config.h>
-#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/facade.h>
-#include <hicn/transport/core/interest.h>
-#include <hicn/transport/core/manifest_format_fixed.h>
-#include <hicn/transport/core/manifest_inline.h>
-#include <hicn/transport/core/name.h>
+#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/socket_options_keys.h>
-#include <hicn/transport/protocols/statistics.h>
-#include <hicn/transport/utils/crypto_suite.h>
-#include <hicn/transport/utils/identity.h>
-#include <hicn/transport/utils/verifier.h>
#define SOCKET_OPTION_GET 0
#define SOCKET_OPTION_NOT_GET 1
@@ -39,26 +31,14 @@
namespace transport {
-namespace protocol {
-class IcnObserver;
-class TransportStatistics;
-} // namespace protocol
-
namespace interface {
+// Forward Declarations
template <typename PortalType>
class Socket;
-class ConsumerSocket;
-class ProducerSocket;
-
-// using Interest = core::Interest;
-// using ContentObject = core::ContentObject;
-// using Name = core::Name;
-// using HashAlgorithm = core::HashAlgorithm;
-// using CryptoSuite = utils::CryptoSuite;
-// using Identity = utils::Identity;
-// using Verifier = utils::Verifier;
+// Define the portal and its connector, depending on the compilation options
+// passed by the build tool.
using HicnForwarderPortal = core::HicnForwarderPortal;
#ifdef __linux__
@@ -76,40 +56,6 @@ using BaseSocket = Socket<HicnForwarderPortal>;
using BasePortal = HicnForwarderPortal;
#endif
-using PayloadType = core::PayloadType;
-using Prefix = core::Prefix;
-using Array = utils::Array<uint8_t>;
-using ContentBuffer = std::shared_ptr<std::vector<uint8_t>>;
-
-using ConsumerInterestCallback =
- std::function<void(ConsumerSocket &, const core::Interest &)>;
-
-using ConsumerContentCallback =
- std::function<void(ConsumerSocket &, std::size_t, const std::error_code &)>;
-
-using ConsumerTimerCallback = std::function<void(
- ConsumerSocket &, const protocol::TransportStatistics &stats)>;
-
-using ProducerContentCallback = std::function<void(
- ProducerSocket &, const std::error_code &, uint64_t bytes_written)>;
-
-using ConsumerContentObjectCallback =
- std::function<void(ConsumerSocket &, const core::ContentObject &)>;
-
-using ConsumerContentObjectVerificationCallback =
- std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
-
-using ConsumerManifestCallback =
- std::function<void(ConsumerSocket &, const core::ContentObjectManifest &)>;
-
-using ProducerContentObjectCallback =
- std::function<void(ProducerSocket &, core::ContentObject &)>;
-
-using ProducerInterestCallback =
- std::function<void(ProducerSocket &, core::Interest &)>;
-
-using namespace protocol;
-
template <typename PortalType>
class Socket {
static_assert(std::is_same<PortalType, HicnForwarderPortal>::value
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index ca9722849..37d545779 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -48,7 +48,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
is_async_(false),
verifier_(std::make_shared<utils::Verifier>()),
verify_signature_(false),
- content_buffer_(nullptr),
on_interest_output_(VOID_HANDLER),
on_interest_timeout_(VOID_HANDLER),
on_interest_satisfied_(VOID_HANDLER),
@@ -56,7 +55,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
on_content_object_verification_(VOID_HANDLER),
on_content_object_(VOID_HANDLER),
on_manifest_(VOID_HANDLER),
- on_payload_retrieved_(VOID_HANDLER),
+ stats_summary_(VOID_HANDLER),
+ read_callback_(nullptr),
virtual_download_(false),
rtt_stats_(false),
timer_interval_milliseconds_(0) {
@@ -81,13 +81,11 @@ ConsumerSocket::~ConsumerSocket() {
void ConsumerSocket::connect() { portal_->connect(); }
-int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) {
+int ConsumerSocket::consume(const Name &name) {
if (transport_protocol_->isRunning()) {
return CONSUMER_BUSY;
}
- content_buffer_ = receive_buffer;
-
network_name_ = name;
network_name_.setSuffix(0);
is_async_ = false;
@@ -97,14 +95,12 @@ int ConsumerSocket::consume(const Name &name, ContentBuffer &receive_buffer) {
return CONSUMER_FINISHED;
}
-int ConsumerSocket::asyncConsume(const Name &name,
- ContentBuffer &receive_buffer) {
+int ConsumerSocket::asyncConsume(const Name &name) {
if (!async_downloader_.stopped()) {
- async_downloader_.add([this, receive_buffer, name]() {
+ async_downloader_.add([this, name]() {
network_name_ = std::move(name);
network_name_.setSuffix(0);
is_async_ = true;
- content_buffer_ = receive_buffer;
transport_protocol_->start();
});
}
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index ceed53954..40344af5d 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -19,6 +19,11 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/protocols/protocol.h>
#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/verifier.h>
+
+extern "C" {
+#include <parc/security/parc_KeyId.h>
+}
#define CONSUMER_FINISHED 0
#define CONSUMER_BUSY 1
@@ -28,28 +33,210 @@ namespace transport {
namespace interface {
+using namespace core;
+using namespace protocol;
+
+/**
+ * @brief Main interface for consumer applications.
+ *
+ * The consumer socket is the main interface for a consumer application.
+ * It allows to retrieve an application data from one/many producers, by
+ * hiding all the complexity of the transport protocol used underneath.
+ */
class ConsumerSocket : public BaseSocket {
public:
+ /**
+ * The ReadCallback is a class which can be used by the transport for both
+ * querying the application needs and notifying events.
+ *
+ * Beware that the methods of this class will be called synchronously while
+ * the transport is working, so the operations the application is performing
+ * on the data retrieved should be executed in another thread in an
+ * asynchronous manner. Blocking one of these callbacks means blocking the
+ * transport.
+ */
+ class ReadCallback {
+ public:
+ virtual ~ReadCallback() = default;
+
+ /**
+ * This API will specify to the transport whether the buffer should be
+ * allocated by the application (and then the retrieved content will be
+ * copied there) or the transport should allocate the buffer and "move" it
+ * to the application. In other words, if isBufferMovable return true, the
+ * transport will transfer the ownership of the read buffer to the
+ * application, without performing an additional copy, while if it returns
+ * false the transport will use the getReadBuffer API.
+ *
+ * By default this method returns true.
+ *
+ */
+ virtual bool isBufferMovable() noexcept { return true; }
+
+ /**
+ * This method will be called by the transport when the content is
+ * available. The application can then allocate its own buffer and provide
+ * the address to the transport, which will use it for writing the data.
+ * Note that if the application won't allocate enough memory this method
+ * will be called several times, until the internal read buffer will be
+ * emptied. For ensuring this method will be called once, applications
+ * should allocate at least maxBufferSize() bytes.
+ *
+ * @param application_buffer - Pointer to the application's buffer.
+ * @param max_length - The length of the application buffer.
+ */
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) = 0;
+
+ /**
+ * This method will be called by the transport after calling getReadBuffer,
+ * in order to notify the application that length bytes are available in the
+ * buffer. The max_length size of the buffer could be larger than the actual
+ * amount of bytes written.
+ *
+ * @param length - The number of bytes placed in the buffer.
+ */
+ virtual void readDataAvailable(size_t length) noexcept = 0;
+
+ /**
+ * This method will be called by the transport for understanding how many
+ * bytes it should read (at most) before notifying the application.
+ *
+ * By default it reads 64 KB.
+ */
+ virtual size_t maxBufferSize() const { return 64 * 1024; }
+
+ /**
+ * This method will be called by the transport iff (isBufferMovable ==
+ * true). The unique_ptr underlines the fact that the ownership of the
+ * buffer is being transferred to the application.
+ *
+ * @param buffer - The buffer
+ */
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {}
+
+ /**
+ * readError() will be invoked if an error occurs reading from the
+ * transport.
+ *
+ * @param ec - An error code describing the error.
+ */
+ virtual void readError(const std::error_code ec) noexcept = 0;
+
+ /**
+ * This callback will be invoked when the whole content is retrieved. The
+ * transport itself knows when a content is retrieved (since it is not an
+ * opaque bytestream like TCP), and the transport itself is able to tell
+ * the application when the transfer is done.
+ */
+ virtual void readSuccess(std::size_t total_size) noexcept = 0;
+ };
+
+ /**
+ * @brief Create a new consumer socket.
+ *
+ * @param protocol - The transport protocol to use. So far the following
+ * transport are supported:
+ * - CBR: Constant bitrate
+ * - Raaqm: Based on paper: Optimal multipath congestion control and request
+ * forwarding in information-centric networks: Protocol design and
+ * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks
+ * 110, 104-117
+ * - RTC: Real time communication
+ */
explicit ConsumerSocket(int protocol);
explicit ConsumerSocket(int protocol, asio::io_service &io_service);
+ /**
+ * @brief Destroy the consumer socket.
+ */
~ConsumerSocket();
+ /**
+ * @brief Connect the consumer socket to the underlying hICN forwarder.
+ */
void connect() override;
- int consume(const Name &name, ContentBuffer &receive_buffer);
-
- int asyncConsume(const Name &name, ContentBuffer &receive_buffer);
-
+ /**
+ * Retrieve a content using the protocol specified in the constructor.
+ * This function blocks until the whole content is downloaded.
+ * For monitoring the status of the download, the application MUST set the
+ * ConsumerRead callback. This callback will be called periodically (depending
+ * on the needs of the application), allowing the application to save the
+ * retrieved data.
+ *
+ * @param name - The name of the content to retrieve.
+ *
+ * @return CONSUMER_BUSY if a pending download exists
+ * @return CONSUMER_FINISHED when the download finishes
+ *
+ * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the
+ * content retrieval succeeded. This information can be obtained from the
+ * error code in CONTENT_RETRIEVED callback.
+ */
+ int consume(const Name &name);
+ int asyncConsume(const Name &name);
+
+ /**
+ * Send an interest asynchronously in another thread, which is the same used
+ * for asyncConsume.
+ *
+ * @param interest - An Interest::Ptr to the interest. Notice that the
+ * application looses the ownership of the interest, which is transferred to
+ * the library itself.
+ * @param callback - A ConsumerCallback containing the events to be trigger in
+ * case of timeout or content reception.
+ *
+ */
void asyncSendInterest(Interest::Ptr &&interest,
Portal::ConsumerCallback *callback);
+ /**
+ * Stops the consumer socket. If several downloads are queued (using
+ * asyncConsume), this call stops just the current one.
+ */
void stop();
+ /**
+ * Resume the download from the same exact point it stopped.
+ */
void resume();
+ /**
+ * Get the io_service which is running the transport protocol event loop.
+ *
+ * @return A reference to the internal io_service where the transport protocol
+ * is running.
+ */
asio::io_service &getIoService() override;
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, ReadCallback *socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, ReadCallback **socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ *socket_option_value = read_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
double socket_option_value) {
switch (socket_option_key) {
@@ -150,12 +337,6 @@ class ConsumerSocket : public BaseSocket {
break;
}
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- if (socket_option_value == VOID_HANDLER) {
- on_payload_retrieved_ = VOID_HANDLER;
- break;
- }
-
case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
if (socket_option_value > 0) {
rate_estimation_batching_parameter_ = socket_option_value;
@@ -275,20 +456,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerContentCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- on_payload_retrieved_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key, ConsumerManifestCallback socket_option_value) {
switch (socket_option_key) {
case ConsumerCallbacksOptions::MANIFEST_INPUT:
@@ -332,21 +499,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- const std::shared_ptr<std::vector<uint8_t>> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::APPLICATION_BUFFER:
- content_buffer_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key, const std::string &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::CERTIFICATE:
@@ -569,18 +721,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerContentCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- *socket_option_value = &on_payload_retrieved_;
- return SOCKET_OPTION_GET;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key, ConsumerManifestCallback **socket_option_value) {
switch (socket_option_key) {
case ConsumerCallbacksOptions::MANIFEST_INPUT:
@@ -636,20 +776,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- std::shared_ptr<std::vector<uint8_t>> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::APPLICATION_BUFFER:
- socket_option_value = content_buffer_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key, std::string &socket_option_value) {
switch (socket_option_key) {
case DataLinkOptions::OUTPUT_INTERFACE:
@@ -718,8 +844,6 @@ class ConsumerSocket : public BaseSocket {
PARCKeyId *key_id_;
bool verify_signature_;
- ContentBuffer content_buffer_;
-
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
ConsumerInterestCallback on_interest_timeout_;
@@ -730,11 +854,10 @@ class ConsumerSocket : public BaseSocket {
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
-
- ConsumerContentCallback on_payload_retrieved_;
-
ConsumerTimerCallback stats_summary_;
+ ReadCallback *read_callback_;
+
// Virtual download for traffic generator
bool virtual_download_;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
index bcf049310..c21108186 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -78,7 +78,7 @@ typedef enum {
CONTENT_OBJECT_INPUT = 411,
MANIFEST_INPUT = 412,
CONTENT_OBJECT_TO_VERIFY = 413,
- CONTENT_RETRIEVED = 414,
+ READ_CALLBACK = 414,
STATS_SUMMARY = 415
} ConsumerCallbacksOptions;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 3f9d4959d..c4cf95895 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/identity.h>
#include <functional>
@@ -336,16 +337,6 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
}
}
-void ProducerSocket::asyncProduce(const Name &suffix,
- ContentBuffer &&output_buffer) {
- if (!async_thread_.stopped()) {
- async_thread_.add(
- [this, suff = suffix, buffer = std::move(output_buffer)]() {
- produce(suff, &(*buffer)[0], buffer->size(), true);
- });
- }
-}
-
void ProducerSocket::onInterest(Interest &interest) {
if (on_interest_input_ != VOID_HANDLER) {
on_interest_input_(*this, interest);
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 6ba5671cc..200c32a95 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -30,6 +30,10 @@
#define REGISTRATION_FAILURE 2
#define REGISTRATION_IN_PROGRESS 3
+namespace utils {
+class Identity;
+}
+
namespace transport {
namespace interface {
@@ -59,7 +63,7 @@ class ProducerSocket : public Socket<BasePortal>,
void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size);
- void asyncProduce(const Name &suffix, ContentBuffer &&output_buffer);
+ void asyncProduce(const Name &suffix);
void asyncProduce(ContentObject &content_object);
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index b8a7c9610..7f0310e7c 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -38,14 +38,14 @@ RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
}
RaaqmTransportProtocol::~RaaqmTransportProtocol() {
- if (this->rate_estimator_) {
- delete this->rate_estimator_;
+ if (rate_estimator_) {
+ delete rate_estimator_;
}
}
int RaaqmTransportProtocol::start() {
- if (this->rate_estimator_) {
- this->rate_estimator_->onStart();
+ if (rate_estimator_) {
+ rate_estimator_->onStart();
}
if (!cur_path_) {
@@ -75,13 +75,13 @@ int RaaqmTransportProtocol::start() {
choice_param);
if (choice_param == 1) {
- this->rate_estimator_ = new ALaTcpEstimator();
+ rate_estimator_ = new ALaTcpEstimator();
} else {
- this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ rate_estimator_ = new SimpleEstimator(alpha, batching_param);
}
socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
- &this->rate_estimator_->observer_);
+ &rate_estimator_->observer_);
// Current path
auto cur_path = std::make_unique<RaaqmDataPath>(
@@ -126,7 +126,7 @@ void RaaqmTransportProtocol::increaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowIncrease(current_window_size_);
+ rate_estimator_->onWindowIncrease(current_window_size_);
}
void RaaqmTransportProtocol::decreaseWindow() {
@@ -145,7 +145,7 @@ void RaaqmTransportProtocol::decreaseWindow() {
socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
current_window_size_);
}
- this->rate_estimator_->onWindowDecrease(current_window_size_);
+ rate_estimator_->onWindowDecrease(current_window_size_);
}
void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
@@ -158,8 +158,8 @@ void RaaqmTransportProtocol::afterContentReception(
updatePathTable(content_object);
increaseWindow();
updateRtt(interest.getName().getSuffix());
- this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
- (int)content_object.headerSize());
+ rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ (int)content_object.headerSize());
// Set drop probablility and window size accordingly
RAAQM();
}
@@ -368,7 +368,12 @@ void RaaqmTransportProtocol::onContentSegment(
reassemble(std::move(content_object));
} else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
index_manager_->getFinalSuffix())) {
- onContentReassembled(std::make_error_code(std::errc(0)));
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload != nullptr) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ }
}
} else {
// TODO Application policy check
@@ -487,8 +492,8 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
return;
}
- // This is set to ~0 so that the next interest_retransmissions_ + 1, performed
- // by sendInterest, will result in 0
+ // This is set to ~0 so that the next interest_retransmissions_ + 1,
+ // performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
sendInterest(std::move(interest));
@@ -502,16 +507,23 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
}
void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if (*on_payload != VOID_HANDLER) {
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(
- interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
- (*on_payload)(*socket_, content_buffer->size(), ec);
+ interface::ConsumerSocket::ReadCallback *on_payload = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &on_payload);
+
+ if (on_payload == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before "
+ "starting "
+ "the content retrieval.");
+ }
+
+ if (!ec) {
+ on_payload->readSuccess(stats_.getBytesRecv());
+ } else {
+ on_payload->readError(ec);
}
- this->rate_estimator_->onDownloadFinished();
+ rate_estimator_->onDownloadFinished();
stop();
}
@@ -526,8 +538,8 @@ void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
// Update stats
updateStats((uint32_t)segment, rtt.count(), now);
- if (this->rate_estimator_) {
- this->rate_estimator_->onRttUpdate((double)rtt.count());
+ if (rate_estimator_) {
+ rate_estimator_->onRttUpdate((double)rtt.count());
}
cur_path_->insertNewRtt(rtt.count());
@@ -676,4 +688,4 @@ void RaaqmTransportProtocol::checkForStalePaths() {
} // end namespace protocol
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
index 899f701c7..a2062df93 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -17,6 +17,7 @@
#include <hicn/transport/protocols/indexing_manager.h>
#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/utils/array.h>
+#include <hicn/transport/utils/membuf.h>
namespace transport {
@@ -30,7 +31,8 @@ BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
manifest_index_manager_(
std::make_unique<ManifestIndexManager>(icn_socket)),
index_manager_(incremental_index_manager_.get()),
- index_(0) {
+ index_(0),
+ read_buffer_(nullptr) {
setContentCallback(content_callback);
}
@@ -54,30 +56,88 @@ void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
void BaseReassembly::copyContent(const ContentObject &content_object) {
auto a = content_object.getPayload();
-
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- reassembly_consumer_socket_->getSocketOption(
- interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
-
- content_buffer->insert(content_buffer->end(), (uint8_t *)a->data(),
- (uint8_t *)a->data() + a->length());
+ auto payload_length = a->length();
+ auto write_size = std::min(payload_length, read_buffer_->tailroom());
+ auto additional_bytes = payload_length > read_buffer_->tailroom()
+ ? payload_length - read_buffer_->tailroom()
+ : 0;
+
+ std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
+ read_buffer_->append(write_size);
+
+ if (!read_buffer_->tailroom()) {
+ notifyApplication();
+ std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
+ additional_bytes);
+ read_buffer_->append(additional_bytes);
+ }
bool download_completed =
index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
if (TRANSPORT_EXPECT_FALSE(download_completed)) {
+ notifyApplication();
content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
}
}
+void BaseReassembly::notifyApplication() {
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ if (TRANSPORT_EXPECT_FALSE(!read_callback)) {
+ TRANSPORT_LOGE("Read callback not installed!");
+ return;
+ }
+
+ if (read_callback->isBufferMovable()) {
+ // No need to perform an additional copy. The whole buffer will be
+ // tranferred to the application.
+
+ read_callback->readBufferAvailable(std::move(read_buffer_));
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer_->length();
+
+ while (read_buffer_->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer_->length(), length);
+ std::memcpy(buffer, read_buffer_->data(), to_copy);
+ read_buffer_->trimStart(to_copy);
+ }
+
+ read_callback->readDataAvailable(total_length);
+ read_buffer_->clear();
+ }
+}
+
void BaseReassembly::reset() {
manifest_index_manager_->reset();
incremental_index_manager_->reset();
index_ = index_manager_->getNextReassemblySegment();
received_packets_.clear();
+
+ // reset read buffer
+ interface::ConsumerSocket::ReadCallback *read_callback;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
+
+ read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
}
} // namespace protocol
-} // end namespace transport
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
index 9efddb773..79f0ea4d2 100644
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -20,6 +20,10 @@
namespace transport {
+namespace interface {
+class ConsumerReadCallback;
+}
+
namespace protocol {
// Forward Declaration
@@ -54,6 +58,9 @@ class BaseReassembly : public Reassembly {
virtual void reset() override;
+ private:
+ void notifyApplication();
+
protected:
// The consumer socket
interface::ConsumerSocket *reassembly_consumer_socket_;
@@ -63,6 +70,7 @@ class BaseReassembly : public Reassembly {
std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_;
uint64_t index_;
+ std::unique_ptr<utils::MemBuf> read_buffer_;
};
} // namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 9402d3b02..4205ade4e 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -161,12 +161,11 @@ void RTCTransportProtocol::updateDelayStats(
uint32_t segmentNumber = content_object.getName().getSuffix();
uint32_t pkt = segmentNumber & modMask_;
- if (inflightInterests_[pkt].state != sent_)
- return;
+ if (inflightInterests_[pkt].state != sent_) return;
- if(interestRetransmissions_.find(segmentNumber) !=
+ if (interestRetransmissions_.find(segmentNumber) !=
interestRetransmissions_.end())
- //this packet was rtx at least once
+ // this packet was rtx at least once
return;
uint32_t pathLabel = content_object.getPathLabel();
@@ -329,8 +328,7 @@ void RTCTransportProtocol::increaseWindow() {
} else {
currentCWin_ = min(
maxCWin_,
- (uint32_t)ceil(currentCWin_ +
- (1.0 / (double)currentCWin_))); // linear
+ (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
}
}
@@ -363,7 +361,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
if (!rtx) {
inflightInterestsCount_++;
}
-
}
void RTCTransportProtocol::scheduleNextInterests() {
@@ -373,9 +370,9 @@ void RTCTransportProtocol::scheduleNextInterests() {
while (inflightInterestsCount_ < currentCWin_) {
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
- //we send the packet only if it is not pending yet
+ // we send the packet only if it is not pending yet
interest_name->setSuffix(actualSegment_);
if (portal_->interestIsPending(*interest_name)) {
actualSegment_++;
@@ -383,11 +380,11 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
uint32_t pkt = actualSegment_ & modMask_;
- //if we already reacevied the content we don't ask it again
- if(inflightInterests_[pkt].state == received_ &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_++;
- continue;
+ // if we already reacevied the content we don't ask it again
+ if (inflightInterests_[pkt].state == received_ &&
+ inflightInterests_[pkt].sequence == actualSegment_) {
+ actualSegment_++;
+ continue;
}
inflightInterests_[pkt].transmissionTime =
@@ -419,93 +416,93 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
#endif
}
-void RTCTransportProtocol::addRetransmissions(uint32_t val){
- //add only val in the rtx list
+void RTCTransportProtocol::addRetransmissions(uint32_t val) {
+ // add only val in the rtx list
addRetransmissions(val, val + 1);
}
-void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){
- for(uint32_t i = start; i < stop; i++){
+void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
+ for (uint32_t i = start; i < stop; i++) {
auto it = interestRetransmissions_.find(i);
- if(it == interestRetransmissions_.end()){
+ if (it == interestRetransmissions_.end()) {
if (lastSegNacked_ <= i) {
- //i must be larger than the last past nack received
+ // i must be larger than the last past nack received
interestRetransmissions_[i] = 0;
}
- }//if the retransmission is already there the rtx timer will
- //take care of it
+ } // if the retransmission is already there the rtx timer will
+ // take care of it
}
retransmit(true);
}
-void RTCTransportProtocol::retransmit(bool first_rtx){
+void RTCTransportProtocol::retransmit(bool first_rtx) {
auto it = interestRetransmissions_.begin();
- //cut len to max HICN_MAX_RTX_SIZE
- //since we use a map, the smaller (and so the older) sequence number are at
- //the beginnin of the map
- while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){
+ // cut len to max HICN_MAX_RTX_SIZE
+ // since we use a map, the smaller (and so the older) sequence number are at
+ // the beginnin of the map
+ while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) {
it = interestRetransmissions_.erase(it);
}
it = interestRetransmissions_.begin();
- while (it != interestRetransmissions_.end()){
+ while (it != interestRetransmissions_.end()) {
uint32_t pkt = it->first & modMask_;
- if(inflightInterests_[pkt].sequence != it->first){
- //this packet is not anymore in the inflight buffer, erase it
+ if (inflightInterests_[pkt].sequence != it->first) {
+ // this packet is not anymore in the inflight buffer, erase it
it = interestRetransmissions_.erase(it);
continue;
}
- //we retransmitted the packet too many times
- if(it->second >= HICN_MAX_RTX){
+ // we retransmitted the packet too many times
+ if (it->second >= HICN_MAX_RTX) {
it = interestRetransmissions_.erase(it);
continue;
}
- //this packet is too old
- if((lastReceived_ > it->first) &&
- (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){
+ // this packet is too old
+ if ((lastReceived_ > it->first) &&
+ (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) {
it = interestRetransmissions_.erase(it);
continue;
}
- if(first_rtx){
- //TODO (optimization)
- //the rtx that we never sent (it->second == 0) are all at the
- //end, so we can go directly there
- if(it->second == 0){
+ if (first_rtx) {
+ // TODO (optimization)
+ // the rtx that we never sent (it->second == 0) are all at the
+ // end, so we can go directly there
+ if (it->second == 0) {
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
++it;
- }else{
- //base on time
+ } else {
+ // base on time
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- if((now - inflightInterests_[pkt].transmissionTime) > 20){
- //XXX replace 20 with rtt
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if ((now - inflightInterests_[pkt].transmissionTime) > 20) {
+ // XXX replace 20 with rtt
inflightInterests_[pkt].transmissionTime =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
it->second++;
Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
+ &interest_name);
interest_name->setSuffix(it->first);
sendInterest(interest_name, true);
}
@@ -514,13 +511,13 @@ void RTCTransportProtocol::retransmit(bool first_rtx){
}
}
-void RTCTransportProtocol::checkRtx(){
+void RTCTransportProtocol::checkRtx() {
retransmit(false);
rtx_timer_->expires_from_now(std::chrono::milliseconds(20));
rtx_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- checkRtx();
- });
+ if (ec) return;
+ checkRtx();
+ });
}
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
@@ -533,25 +530,25 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
inflightInterestsCount_--;
}
- //check how many times we sent this packet
- auto it = interestRetransmissions_.find(segmentNumber);
- if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){
- inflightInterests_[pkt].state = lost_;
- }
+ // check how many times we sent this packet
+ auto it = interestRetransmissions_.find(segmentNumber);
+ if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == sent_) {
- inflightInterests_[pkt].state = timeout1_;
- } else if (inflightInterests_[pkt].state == timeout1_) {
- inflightInterests_[pkt].state = timeout2_;
- } else if (inflightInterests_[pkt].state == timeout2_) {
- inflightInterests_[pkt].state = lost_;
- }
+ if (inflightInterests_[pkt].state == sent_) {
+ inflightInterests_[pkt].state = timeout1_;
+ } else if (inflightInterests_[pkt].state == timeout1_) {
+ inflightInterests_[pkt].state = timeout2_;
+ } else if (inflightInterests_[pkt].state == timeout2_) {
+ inflightInterests_[pkt].state = lost_;
+ }
- if(inflightInterests_[pkt].state == lost_) {
- interestRetransmissions_.erase(segmentNumber);
- }else{
- addRetransmissions(segmentNumber);
- }
+ if (inflightInterests_[pkt].state == lost_) {
+ interestRetransmissions_.erase(segmentNumber);
+ } else {
+ addRetransmissions(segmentNumber);
+ }
scheduleNextInterests();
}
@@ -562,12 +559,11 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
uint32_t productionSeg = *payload;
uint32_t productionRate = *(++payload);
-
if (productionRate == 0) {
// the producer socket is not active
// in this case we consider only the first nack
if (nack_timer_used_) {
- return false;
+ return false;
}
nack_timer_used_ = true;
@@ -680,12 +676,12 @@ void RTCTransportProtocol::onContentObject(
((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
if (inflightInterests_[pkt].state == sent_) {
- inflightInterestsCount_--; //packet sent without timeouts
+ inflightInterestsCount_--; // packet sent without timeouts
}
if (inflightInterests_[pkt].state == sent_ &&
interestRetransmissions_.find(segmentNumber) ==
- interestRetransmissions_.end()){
+ interestRetransmissions_.end()) {
// we count only non retransmitted data in order to take into accunt only
// the transmition rate of the producer
receivedBytes_ += (uint32_t)(content_object->headerSize() +
@@ -693,7 +689,7 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
addRetransmissions(lastReceived_ + 1, segmentNumber);
- //lastReceived_ is updated only for data packets received without RTX
+ // lastReceived_ is updated only for data packets received without RTX
lastReceived_ = segmentNumber;
}
@@ -704,7 +700,7 @@ void RTCTransportProtocol::onContentObject(
increaseWindow();
}
- //in any case we remove the packet from the rtx list
+ // in any case we remove the packet from the rtx list
interestRetransmissions_.erase(segmentNumber);
if (schedule_next_interest) {
@@ -715,24 +711,50 @@ void RTCTransportProtocol::onContentObject(
void RTCTransportProtocol::returnContentToApplication(
const ContentObject &content_object) {
// return content to the user
- auto a = content_object.getPayload();
+ auto read_buffer = content_object.getPayload();
- a->trimStart(HICN_TIMESTAMP_SIZE);
- uint8_t *start = a->writableData();
- unsigned size = (unsigned)a->length();
+ read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
// set offset between hICN and RTP packets
- uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));
+ uint16_t rtp_seq = ntohs(*(((uint16_t *)read_buffer->writableData()) + 1));
RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq;
- std::shared_ptr<std::vector<uint8_t>> content_buffer;
- socket_->getSocketOption(APPLICATION_BUFFER, content_buffer);
- content_buffer->insert(content_buffer->end(), start, start + size);
+ interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
+ socket_->getSocketOption(READ_CALLBACK, &read_callback);
+
+ if (read_callback == nullptr) {
+ throw errors::RuntimeException(
+ "The read callback must be installed in the transport before starting "
+ "the content retrieval.");
+ }
+
+ if (read_callback->isBufferMovable()) {
+ read_callback->readBufferAvailable(
+ utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
+ } else {
+ // The buffer will be copied into the application-provided buffer
+ uint8_t *buffer;
+ std::size_t length;
+ std::size_t total_length = read_buffer->length();
+
+ while (read_buffer->length()) {
+ buffer = nullptr;
+ length = 0;
+ read_callback->getReadBuffer(&buffer, &length);
+
+ if (!buffer || !length) {
+ throw errors::RuntimeException(
+ "Invalid buffer provided by the application.");
+ }
+
+ auto to_copy = std::min(read_buffer->length(), length);
+
+ std::memcpy(buffer, read_buffer->data(), to_copy);
+ read_buffer->trimStart(to_copy);
+ }
- ConsumerContentCallback *on_payload = nullptr;
- socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
- if ((*on_payload) != VOID_HANDLER) {
- (*on_payload)(*socket_, size, std::make_error_code(std::errc(0)));
+ read_callback->readDataAvailable(total_length);
+ read_buffer->clear();
}
}