diff options
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_consumer.h')
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/socket_consumer.h | 263 |
1 files changed, 193 insertions, 70 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index ceed53954..40344af5d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -19,6 +19,11 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/protocols/protocol.h> #include <hicn/transport/utils/event_thread.h> +#include <hicn/transport/utils/verifier.h> + +extern "C" { +#include <parc/security/parc_KeyId.h> +} #define CONSUMER_FINISHED 0 #define CONSUMER_BUSY 1 @@ -28,28 +33,210 @@ namespace transport { namespace interface { +using namespace core; +using namespace protocol; + +/** + * @brief Main interface for consumer applications. + * + * The consumer socket is the main interface for a consumer application. + * It allows to retrieve an application data from one/many producers, by + * hiding all the complexity of the transport protocol used underneath. + */ class ConsumerSocket : public BaseSocket { public: + /** + * The ReadCallback is a class which can be used by the transport for both + * querying the application needs and notifying events. + * + * Beware that the methods of this class will be called synchronously while + * the transport is working, so the operations the application is performing + * on the data retrieved should be executed in another thread in an + * asynchronous manner. Blocking one of these callbacks means blocking the + * transport. + */ + class ReadCallback { + public: + virtual ~ReadCallback() = default; + + /** + * This API will specify to the transport whether the buffer should be + * allocated by the application (and then the retrieved content will be + * copied there) or the transport should allocate the buffer and "move" it + * to the application. In other words, if isBufferMovable return true, the + * transport will transfer the ownership of the read buffer to the + * application, without performing an additional copy, while if it returns + * false the transport will use the getReadBuffer API. + * + * By default this method returns true. + * + */ + virtual bool isBufferMovable() noexcept { return true; } + + /** + * This method will be called by the transport when the content is + * available. The application can then allocate its own buffer and provide + * the address to the transport, which will use it for writing the data. + * Note that if the application won't allocate enough memory this method + * will be called several times, until the internal read buffer will be + * emptied. For ensuring this method will be called once, applications + * should allocate at least maxBufferSize() bytes. + * + * @param application_buffer - Pointer to the application's buffer. + * @param max_length - The length of the application buffer. + */ + virtual void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) = 0; + + /** + * This method will be called by the transport after calling getReadBuffer, + * in order to notify the application that length bytes are available in the + * buffer. The max_length size of the buffer could be larger than the actual + * amount of bytes written. + * + * @param length - The number of bytes placed in the buffer. + */ + virtual void readDataAvailable(size_t length) noexcept = 0; + + /** + * This method will be called by the transport for understanding how many + * bytes it should read (at most) before notifying the application. + * + * By default it reads 64 KB. + */ + virtual size_t maxBufferSize() const { return 64 * 1024; } + + /** + * This method will be called by the transport iff (isBufferMovable == + * true). The unique_ptr underlines the fact that the ownership of the + * buffer is being transferred to the application. + * + * @param buffer - The buffer + */ + virtual void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept {} + + /** + * readError() will be invoked if an error occurs reading from the + * transport. + * + * @param ec - An error code describing the error. + */ + virtual void readError(const std::error_code ec) noexcept = 0; + + /** + * This callback will be invoked when the whole content is retrieved. The + * transport itself knows when a content is retrieved (since it is not an + * opaque bytestream like TCP), and the transport itself is able to tell + * the application when the transfer is done. + */ + virtual void readSuccess(std::size_t total_size) noexcept = 0; + }; + + /** + * @brief Create a new consumer socket. + * + * @param protocol - The transport protocol to use. So far the following + * transport are supported: + * - CBR: Constant bitrate + * - Raaqm: Based on paper: Optimal multipath congestion control and request + * forwarding in information-centric networks: Protocol design and + * experimentation. G Carofiglio, M Gallo, L Muscariello. Computer Networks + * 110, 104-117 + * - RTC: Real time communication + */ explicit ConsumerSocket(int protocol); explicit ConsumerSocket(int protocol, asio::io_service &io_service); + /** + * @brief Destroy the consumer socket. + */ ~ConsumerSocket(); + /** + * @brief Connect the consumer socket to the underlying hICN forwarder. + */ void connect() override; - int consume(const Name &name, ContentBuffer &receive_buffer); - - int asyncConsume(const Name &name, ContentBuffer &receive_buffer); - + /** + * Retrieve a content using the protocol specified in the constructor. + * This function blocks until the whole content is downloaded. + * For monitoring the status of the download, the application MUST set the + * ConsumerRead callback. This callback will be called periodically (depending + * on the needs of the application), allowing the application to save the + * retrieved data. + * + * @param name - The name of the content to retrieve. + * + * @return CONSUMER_BUSY if a pending download exists + * @return CONSUMER_FINISHED when the download finishes + * + * Notice that the fact consume() returns CONSUMER_FINISHED does not imply the + * content retrieval succeeded. This information can be obtained from the + * error code in CONTENT_RETRIEVED callback. + */ + int consume(const Name &name); + int asyncConsume(const Name &name); + + /** + * Send an interest asynchronously in another thread, which is the same used + * for asyncConsume. + * + * @param interest - An Interest::Ptr to the interest. Notice that the + * application looses the ownership of the interest, which is transferred to + * the library itself. + * @param callback - A ConsumerCallback containing the events to be trigger in + * case of timeout or content reception. + * + */ void asyncSendInterest(Interest::Ptr &&interest, Portal::ConsumerCallback *callback); + /** + * Stops the consumer socket. If several downloads are queued (using + * asyncConsume), this call stops just the current one. + */ void stop(); + /** + * Resume the download from the same exact point it stopped. + */ void resume(); + /** + * Get the io_service which is running the transport protocol event loop. + * + * @return A reference to the internal io_service where the transport protocol + * is running. + */ asio::io_service &getIoService() override; + TRANSPORT_ALWAYS_INLINE int setSocketOption( + int socket_option_key, ReadCallback *socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + } + + TRANSPORT_ALWAYS_INLINE int getSocketOption( + int socket_option_key, ReadCallback **socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, double socket_option_value) { switch (socket_option_key) { @@ -150,12 +337,6 @@ class ConsumerSocket : public BaseSocket { break; } - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - if (socket_option_value == VOID_HANDLER) { - on_payload_retrieved_ = VOID_HANDLER; - break; - } - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: if (socket_option_value > 0) { rate_estimation_batching_parameter_ = socket_option_value; @@ -275,20 +456,6 @@ class ConsumerSocket : public BaseSocket { } TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerContentCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - on_payload_retrieved_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, ConsumerManifestCallback socket_option_value) { switch (socket_option_key) { case ConsumerCallbacksOptions::MANIFEST_INPUT: @@ -332,21 +499,6 @@ class ConsumerSocket : public BaseSocket { } TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - const std::shared_ptr<std::vector<uint8_t>> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - content_buffer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( int socket_option_key, const std::string &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::CERTIFICATE: @@ -569,18 +721,6 @@ class ConsumerSocket : public BaseSocket { } TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_RETRIEVED: - *socket_option_value = &on_payload_retrieved_; - return SOCKET_OPTION_GET; - - default: - return SOCKET_OPTION_NOT_GET; - } - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, ConsumerManifestCallback **socket_option_value) { switch (socket_option_key) { case ConsumerCallbacksOptions::MANIFEST_INPUT: @@ -636,20 +776,6 @@ class ConsumerSocket : public BaseSocket { } TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr<std::vector<uint8_t>> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::APPLICATION_BUFFER: - socket_option_value = content_buffer_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( int socket_option_key, std::string &socket_option_value) { switch (socket_option_key) { case DataLinkOptions::OUTPUT_INTERFACE: @@ -718,8 +844,6 @@ class ConsumerSocket : public BaseSocket { PARCKeyId *key_id_; bool verify_signature_; - ContentBuffer content_buffer_; - ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; ConsumerInterestCallback on_interest_timeout_; @@ -730,11 +854,10 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; - - ConsumerContentCallback on_payload_retrieved_; - ConsumerTimerCallback stats_summary_; + ReadCallback *read_callback_; + // Virtual download for traffic generator bool virtual_download_; |