diff options
Diffstat (limited to 'libtransport/src/hicn/transport/http')
6 files changed, 136 insertions, 109 deletions
diff --git a/libtransport/src/hicn/transport/http/CMakeLists.txt b/libtransport/src/hicn/transport/http/CMakeLists.txt index ddcf1fdc3..b24c80195 100644 --- a/libtransport/src/hicn/transport/http/CMakeLists.txt +++ b/libtransport/src/hicn/transport/http/CMakeLists.txt @@ -29,7 +29,6 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/response.h ${CMAKE_CURRENT_SOURCE_DIR}/message.h - ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/http/callbacks.h b/libtransport/src/hicn/transport/http/callbacks.h deleted file mode 100644 index a0f3d5999..000000000 --- a/libtransport/src/hicn/transport/http/callbacks.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/http/server_publisher.h> - -#include <functional> -#include <memory> - -namespace transport { - -namespace http { - -enum class RC : uint8_t { - SUCCESS, - CONTENT_PUBLISHED, - ERR_UNDEFINED, -}; - -using OnHttpRequest = - std::function<void(std::shared_ptr<HTTPServerPublisher>&, const uint8_t*, - std::size_t, int request_id)>; -using DeadlineTimerCallback = std::function<void(const std::error_code& e)>; -using ReceiveCallback = std::function<void(const std::vector<uint8_t>&)>; -using OnPayloadCallback = - std::function<RC(const std::error_code& ec, const core::Name& name, - const ContentBuffer& payload)>; -using ContentSentCallback = - std::function<void(const std::error_code&, const core::Name&)>; - -} // namespace http - -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index b31d89b6b..fb7dbdfac 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -27,6 +27,8 @@ using namespace transport; HTTPClientConnection::HTTPClientConnection() : consumer_(TransportProtocolAlgorithms::RAAQM, io_service_), + read_bytes_callback_(nullptr), + read_buffer_(nullptr), response_(std::make_shared<HTTPResponse>()), timer_(nullptr) { consumer_.setSocketOption( @@ -35,11 +37,7 @@ HTTPClientConnection::HTTPClientConnection() &HTTPClientConnection::verifyData, this, std::placeholders::_1, std::placeholders::_2)); - consumer_.setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HTTPClientConnection::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); consumer_.connect(); std::shared_ptr<typename ConsumerSocket::Portal> portal; @@ -47,36 +45,48 @@ HTTPClientConnection::HTTPClientConnection() timer_ = std::make_unique<asio::steady_timer>(portal->getIoService()); } -HTTPClientConnection &HTTPClientConnection::get( +HTTPClientConnection::RC HTTPClientConnection::get( const std::string &url, HTTPHeaders headers, HTTPPayload payload, - std::shared_ptr<HTTPResponse> response) { - return sendRequest(url, HTTPMethod::GET, headers, payload, response); + std::shared_ptr<HTTPResponse> response, ReadBytesCallback *callback) { + return sendRequest(url, HTTPMethod::GET, headers, payload, response, + callback); } -HTTPClientConnection &HTTPClientConnection::sendRequest( +HTTPClientConnection::RC HTTPClientConnection::sendRequest( const std::string &url, HTTPMethod method, HTTPHeaders headers, - HTTPPayload payload, std::shared_ptr<HTTPResponse> response) { + HTTPPayload payload, std::shared_ptr<HTTPResponse> response, + ReadBytesCallback *callback) { + current_url_ = url; + read_bytes_callback_ = callback; if (!response) { response = response_; } auto start = std::chrono::steady_clock::now(); HTTPRequest request(method, url, headers, payload); - std::string name = sendRequestGetReply(request, response); - auto end = std::chrono::steady_clock::now(); - - TRANSPORT_LOGI( - "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", - method_map[method].c_str(), url.c_str(), name.c_str(), - (unsigned long long)std::chrono::duration_cast<std::chrono::microseconds>( - end - start) - .count(), - response->size()); + response->clear(); - return *this; + success_callback_ = [this, method = std::move(method), url = std::move(url), + start = std::move(start), + response = std::move(response)]( + std::size_t size) -> std::shared_ptr<HTTPResponse> { + auto end = std::chrono::steady_clock::now(); + TRANSPORT_LOGI( + "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", + method_map[method].c_str(), url.c_str(), name_.str().c_str(), + (unsigned long long) + std::chrono::duration_cast<std::chrono::microseconds>(end - start) + .count(), + size); + + return response; + }; + + sendRequestGetReply(request, response); + return return_code_; } -std::string HTTPClientConnection::sendRequestGetReply( +void HTTPClientConnection::sendRequestGetReply( const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response) { const std::string &request_string = request.getRequestString(); const std::string &locator = request.getLocator(); @@ -94,38 +104,28 @@ std::string HTTPClientConnection::sendRequestGetReply( &HTTPClientConnection::processLeavingInterest, this, std::placeholders::_1, std::placeholders::_2, request_string)); - // Send content to producer piggybacking it through first interest (to fix) - - response->clear(); - // Factor hicn name using hash + name_.str(""); - std::stringstream stream; - - stream << std::hex << http::default_values::ipv6_first_word << ":"; + name_ << std::hex << http::default_values::ipv6_first_word << ":"; for (uint16_t *word = (uint16_t *)&locator_hash; std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } for (uint16_t *word = (uint16_t *)&request_hash; std::size_t(word) < (std::size_t(&request_hash) + sizeof(request_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } - stream << "|0"; - - ContentBuffer response_ptr = - std::static_pointer_cast<std::vector<uint8_t>>(response); + name_ << "|0"; - consumer_.consume(Name(stream.str()), response_ptr); + consumer_.consume(Name(name_.str())); consumer_.stop(); - - return stream.str(); } HTTPResponse HTTPClientConnection::response() { @@ -133,14 +133,6 @@ HTTPResponse HTTPClientConnection::response() { return std::move(*response_); } -void HTTPClientConnection::processPayload(ConsumerSocket &c, - std::size_t bytes_transferred, - const std::error_code &ec) { - if (ec) { - TRANSPORT_LOGE("Download failed!!"); - } -} - bool HTTPClientConnection::verifyData( ConsumerSocket &c, const core::ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { @@ -192,6 +184,49 @@ HTTPClientConnection &HTTPClientConnection::setCertificate( return *this; } +// Read buffer management +void HTTPClientConnection::readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept { + if (!read_bytes_callback_) { + if (!read_buffer_) { + read_buffer_ = std::move(buffer); + } else { + read_buffer_->prependChain(std::move(buffer)); + } + } else { + read_bytes_callback_->onBytesReceived(std::move(buffer)); + } +} + +// Read buffer management +void HTTPClientConnection::readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error %s during download of %s", ec.message().c_str(), + current_url_.c_str()); + if (read_bytes_callback_) { + read_bytes_callback_->onError(ec); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_FAILED; +} + +void HTTPClientConnection::readSuccess(std::size_t total_size) noexcept { + auto response = success_callback_(total_size); + if (read_bytes_callback_) { + read_bytes_callback_->onSuccess(total_size); + } else { + response->reserve(total_size); + const utils::MemBuf *head = read_buffer_.get(), *current = head; + do { + response->insert(response->end(), current->data(), current->tail()); + current = current->next(); + } while (current != head); + + read_buffer_.reset(); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_SUCCESS; +} + } // namespace http } // namespace transport diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h index faad1864c..6c150f848 100644 --- a/libtransport/src/hicn/transport/http/client_connection.h +++ b/libtransport/src/hicn/transport/http/client_connection.h @@ -31,18 +31,30 @@ namespace http { using namespace interface; using namespace core; -class HTTPClientConnection { +class HTTPClientConnection : public ConsumerSocket::ReadCallback { + static constexpr uint32_t max_buffer_capacity = 64 * 1024; + public: + class ReadBytesCallback { + public: + virtual void onBytesReceived(std::unique_ptr<utils::MemBuf> &&buffer) = 0; + virtual void onSuccess(std::size_t bytes) = 0; + virtual void onError(const std::error_code ec) = 0; + }; + + enum class RC : uint32_t { DOWNLOAD_FAILED, DOWNLOAD_SUCCESS }; + HTTPClientConnection(); - HTTPClientConnection &get(const std::string &url, HTTPHeaders headers = {}, - HTTPPayload payload = {}, - std::shared_ptr<HTTPResponse> response = nullptr); + RC get(const std::string &url, HTTPHeaders headers = {}, + HTTPPayload payload = {}, + std::shared_ptr<HTTPResponse> response = nullptr, + ReadBytesCallback *callback = nullptr); - HTTPClientConnection &sendRequest( - const std::string &url, HTTPMethod method, HTTPHeaders headers = {}, - HTTPPayload payload = {}, - std::shared_ptr<HTTPResponse> response = nullptr); + RC sendRequest(const std::string &url, HTTPMethod method, + HTTPHeaders headers = {}, HTTPPayload payload = {}, + std::shared_ptr<HTTPResponse> response = nullptr, + ReadBytesCallback *callback = nullptr); HTTPResponse response(); @@ -55,11 +67,8 @@ class HTTPClientConnection { HTTPClientConnection &setCertificate(const std::string &cert_path); private: - void processPayload(interface::ConsumerSocket &c, - std::size_t bytes_transferred, const std::error_code &ec); - - std::string sendRequestGetReply(const HTTPRequest &request, - std::shared_ptr<HTTPResponse> &response); + void sendRequestGetReply(const HTTPRequest &request, + std::shared_ptr<HTTPResponse> &response); bool verifyData(interface::ConsumerSocket &c, const core::ContentObject &contentObject); @@ -68,12 +77,42 @@ class HTTPClientConnection { const core::Interest &interest, std::string &payload); + // Read callback + bool isBufferMovable() noexcept override { return true; } + void getReadBuffer(uint8_t **application_buffer, + size_t *max_length) override {} + void readDataAvailable(size_t length) noexcept override {} + size_t maxBufferSize() const override { return max_buffer_capacity; } + void readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; + void readError(const std::error_code ec) noexcept override; + void readSuccess(std::size_t total_size) noexcept override; + asio::io_service io_service_; + // The consumer socket ConsumerSocket consumer_; + // The current url provided by the application + std::string current_url_; + // The current hICN name used for downloading + std::stringstream name_; + // Function to be called when the read is successful + std::function<std::shared_ptr<HTTPResponse>(std::size_t)> success_callback_; + // Return code for current download + RC return_code_; + + // Application provided callback for saving the received content during + // the download. If this callback is used, the HTTPClient will NOT save + // any byte internally. + ReadBytesCallback *read_bytes_callback_; + + // Internal read buffer and HTTP response, to be used if the application does + // not provide any read_bytes_callback + std::unique_ptr<utils::MemBuf> read_buffer_; std::shared_ptr<HTTPResponse> response_; + // Timer std::unique_ptr<asio::steady_timer> timer_; }; diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h index 99480028a..4e7350b76 100644 --- a/libtransport/src/hicn/transport/http/server_acceptor.h +++ b/libtransport/src/hicn/transport/http/server_acceptor.h @@ -15,7 +15,6 @@ #pragma once -#include <hicn/transport/http/callbacks.h> #include <hicn/transport/http/default_values.h> #include <hicn/transport/http/request.h> #include <hicn/transport/http/server_publisher.h> @@ -31,6 +30,9 @@ namespace http { class HTTPServerAcceptor { friend class HTTPServerPublisher; + using OnHttpRequest = + std::function<void(std::shared_ptr<HTTPServerPublisher> &, + const uint8_t *, std::size_t, int request_id)>; public: HTTPServerAcceptor(std::string &&server_locator, OnHttpRequest callback); diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h index f3c39c3fe..1f12fd8f9 100644 --- a/libtransport/src/hicn/transport/http/server_publisher.h +++ b/libtransport/src/hicn/transport/http/server_publisher.h @@ -63,8 +63,6 @@ class HTTPServerPublisher { std::unique_ptr<ProducerSocket> producer_; ProducerInterestCallback interest_enter_callback_; utils::UserCallback wait_callback_; - - ContentBuffer receive_buffer_; }; } // end namespace http |