aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_consumer.h
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-03 10:03:56 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-15 11:37:30 +0200
commitc365689250216861fd7727203ee6ba1049ad5778 (patch)
tree97f1f3d1a6cb7314f1292d97be6d8e8e06cc998b /libtransport/src/hicn/transport/interfaces/socket_consumer.h
parentd8ce6d98a2a726393655bd71eb81b8ef5222d6ba (diff)
[HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application.
Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_consumer.h')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h263
1 files changed, 193 insertions, 70 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index ceed53954..40344af5d 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -19,6 +19,11 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/protocols/protocol.h>
#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/verifier.h>
+
+extern "C" {
+#include <parc/security/parc_KeyId.h>
+}
#define CONSUMER_FINISHED 0
#define CONSUMER_BUSY 1
@@ -28,28 +33,210 @@ namespace transport {
namespace interface {
+using namespace core;
+using namespace protocol;
+
+/**
+ * @brief Main interface for consumer applications.
+ *
+ * The consumer socket is the main interface for a consumer application.
+ * It allows to retrieve an application data from one/many producers, by
+ * hiding all the complexity of the transport protocol used underneath.
+ */
class ConsumerSocket : public BaseSocket {
public:
+ /**
+ * The ReadCallback is a class which can be used by the transport for both
+ * querying the application needs and notifying events.
+ *
+ * Beware that the methods of this class will be called synchronously while
+ * the transport is working, so the operations the application is performing
+ * on the data retrieved should be executed in another thread in an
+ * asynchronous manner. Blocking one of these callbacks means blocking the
+ * transport.
+ */
+ class ReadCallback {
+ public:
+ virtual ~ReadCallback() = default;
+
+ /**
+ * This API will specify to the transport whether the buffer should be
+ * allocated by the application (and then the retrieved content will be
+ * copied there) or the transport should allocate the buffer and "move" it
+ * to the application. In other words, if isBufferMovable return true, the
+ * transport will transfer the ownership of the read buffer to the
+ * application, without performing an additional copy, while if it returns
+ * false the transport will use the getReadBuffer API.
+ *
+ * By default this method returns true.
+ *
+ */
+ virtual bool isBufferMovable() noexcept { return true; }
+
+ /**
+ * This method will be called by the transport when the content is
+ * available. The application can then allocate its own buffer and provide
+ * the address to the transport, which will use it for writing the data.
+ * Note that if the application won't allocate enough memory this method
+ * will be called several times, until the internal read buffer will be
+ * emptied. For ensuring this method will be called once, applications
+ * should allocate at least maxBufferSize() bytes.
+ *
+ * @param application_buffer - Pointer to the application's buffer.
+ * @param max_length - The length of the application buffer.
+ */
+ virtual void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) = 0;
+
+ /**
+ * This method will be called by the transport after calling getReadBuffer,
+ * in order to notify the application that length bytes are available in the
+ * buffer. The max_length size of the buffer could be larger than the actual
+ * amount of bytes written.
+ *
+ * @param length - The number of bytes placed in the buffer.
+ */
+ virtual void readDataAvailable(size_t length) noexcept = 0;
+
+ /**
+ * This method will be called by the transport for understanding how many
+ * bytes it should read (at most) before notifying the application.
+ *
+ * By default it reads 64 KB.
+ */
+ virtual size_t maxBufferSize() const { return 64 * 1024; }
+
+ /**
+ * This method will be called by the transport iff (isBufferMovable ==
+ * true). The unique_ptr underlines the fact that the ownership of the
+ * buffer is being transferred to the application.
+ *
+ * @param buffer - The buffer
+ */
+ virtual void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {}
+
+ /**
+ * readError() will be invoked if an error occurs reading from the
+ * transport.
+ *
+ * @param ec - An error code describing the error.
+ */
+ virtual void readError(const std::error_code ec) noexcept = 0;
+
+ /**
+ * This callback will be invoked when the whole content is retrieved. The
+ * transport itself knows when a content is retrieved (since it is not an
+ * opaque bytestream like TCP), and the transport itself is able to tell
+ * the application when the transfer is done.
+ */
+ virtual void readSuccess(std::size_t total_size) noexcept = 0;
+ };
+
+ /**
+ * @brief Create a new consumer socket.
+ *
+ * @param protocol - The transport protocol to use. So far the following
+ * transport are supported:
+ * - CBR: Constant bitrate
+ * - Raaqm: Based on paper: Optimal multipath congestion control and request
+ * forwarding in information-centric networks: Protocol design and
+ * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks
+ * 110, 104-117
+ * - RTC: Real time communication
+ */
explicit ConsumerSocket(int protocol);
explicit ConsumerSocket(int protocol, asio::io_service &io_service);
+ /**
+ * @brief Destroy the consumer socket.
+ */
~ConsumerSocket();
+ /**
+ * @brief Connect the consumer socket to the underlying hICN forwarder.
+ */
void connect() override;
- int consume(const Name &name, ContentBuffer &receive_buffer);
-
- int asyncConsume(const Name &name, ContentBuffer &receive_buffer);
-
+ /**
+ * Retrieve a content using the protocol specified in the constructor.
+ * This function blocks until the whole content is downloaded.
+ * For monitoring the status of the download, the application MUST set the
+ * ConsumerRead callback. This callback will be called periodically (depending
+ * on the needs of the application), allowing the application to save the
+ * retrieved data.
+ *
+ * @param name - The name of the content to retrieve.
+ *
+ * @return CONSUMER_BUSY if a pending download exists
+ * @return CONSUMER_FINISHED when the download finishes
+ *
+ * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the
+ * content retrieval succeeded. This information can be obtained from the
+ * error code in CONTENT_RETRIEVED callback.
+ */
+ int consume(const Name &name);
+ int asyncConsume(const Name &name);
+
+ /**
+ * Send an interest asynchronously in another thread, which is the same used
+ * for asyncConsume.
+ *
+ * @param interest - An Interest::Ptr to the interest. Notice that the
+ * application looses the ownership of the interest, which is transferred to
+ * the library itself.
+ * @param callback - A ConsumerCallback containing the events to be trigger in
+ * case of timeout or content reception.
+ *
+ */
void asyncSendInterest(Interest::Ptr &&interest,
Portal::ConsumerCallback *callback);
+ /**
+ * Stops the consumer socket. If several downloads are queued (using
+ * asyncConsume), this call stops just the current one.
+ */
void stop();
+ /**
+ * Resume the download from the same exact point it stopped.
+ */
void resume();
+ /**
+ * Get the io_service which is running the transport protocol event loop.
+ *
+ * @return A reference to the internal io_service where the transport protocol
+ * is running.
+ */
asio::io_service &getIoService() override;
+ TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ int socket_option_key, ReadCallback *socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ read_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ }
+
+ TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ int socket_option_key, ReadCallback **socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::READ_CALLBACK:
+ *socket_option_value = read_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
double socket_option_value) {
switch (socket_option_key) {
@@ -150,12 +337,6 @@ class ConsumerSocket : public BaseSocket {
break;
}
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- if (socket_option_value == VOID_HANDLER) {
- on_payload_retrieved_ = VOID_HANDLER;
- break;
- }
-
case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
if (socket_option_value > 0) {
rate_estimation_batching_parameter_ = socket_option_value;
@@ -275,20 +456,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerContentCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- on_payload_retrieved_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key, ConsumerManifestCallback socket_option_value) {
switch (socket_option_key) {
case ConsumerCallbacksOptions::MANIFEST_INPUT:
@@ -332,21 +499,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- const std::shared_ptr<std::vector<uint8_t>> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::APPLICATION_BUFFER:
- content_buffer_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
int socket_option_key, const std::string &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::CERTIFICATE:
@@ -569,18 +721,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerContentCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
- *socket_option_value = &on_payload_retrieved_;
- return SOCKET_OPTION_GET;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key, ConsumerManifestCallback **socket_option_value) {
switch (socket_option_key) {
case ConsumerCallbacksOptions::MANIFEST_INPUT:
@@ -636,20 +776,6 @@ class ConsumerSocket : public BaseSocket {
}
TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- std::shared_ptr<std::vector<uint8_t>> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::APPLICATION_BUFFER:
- socket_option_value = content_buffer_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
int socket_option_key, std::string &socket_option_value) {
switch (socket_option_key) {
case DataLinkOptions::OUTPUT_INTERFACE:
@@ -718,8 +844,6 @@ class ConsumerSocket : public BaseSocket {
PARCKeyId *key_id_;
bool verify_signature_;
- ContentBuffer content_buffer_;
-
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
ConsumerInterestCallback on_interest_timeout_;
@@ -730,11 +854,10 @@ class ConsumerSocket : public BaseSocket {
ConsumerContentObjectCallback on_content_object_;
ConsumerManifestCallback on_manifest_;
-
- ConsumerContentCallback on_payload_retrieved_;
-
ConsumerTimerCallback stats_summary_;
+ ReadCallback *read_callback_;
+
// Virtual download for traffic generator
bool virtual_download_;