From c365689250216861fd7727203ee6ba1049ad5778 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 3 Apr 2019 10:03:56 +0200 Subject: [HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application. Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara --- .../src/hicn/transport/http/client_connection.cc | 127 +++++++++++++-------- 1 file changed, 81 insertions(+), 46 deletions(-) (limited to 'libtransport/src/hicn/transport/http/client_connection.cc') diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index b31d89b6b..fb7dbdfac 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -27,6 +27,8 @@ using namespace transport; HTTPClientConnection::HTTPClientConnection() : consumer_(TransportProtocolAlgorithms::RAAQM, io_service_), + read_bytes_callback_(nullptr), + read_buffer_(nullptr), response_(std::make_shared()), timer_(nullptr) { consumer_.setSocketOption( @@ -35,11 +37,7 @@ HTTPClientConnection::HTTPClientConnection() &HTTPClientConnection::verifyData, this, std::placeholders::_1, std::placeholders::_2)); - consumer_.setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HTTPClientConnection::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); consumer_.connect(); std::shared_ptr portal; @@ -47,36 +45,48 @@ HTTPClientConnection::HTTPClientConnection() timer_ = std::make_unique(portal->getIoService()); } -HTTPClientConnection &HTTPClientConnection::get( +HTTPClientConnection::RC HTTPClientConnection::get( const std::string &url, HTTPHeaders headers, HTTPPayload payload, - std::shared_ptr response) { - return sendRequest(url, HTTPMethod::GET, headers, payload, response); + std::shared_ptr response, ReadBytesCallback *callback) { + return sendRequest(url, HTTPMethod::GET, headers, payload, response, + callback); } -HTTPClientConnection &HTTPClientConnection::sendRequest( +HTTPClientConnection::RC HTTPClientConnection::sendRequest( const std::string &url, HTTPMethod method, HTTPHeaders headers, - HTTPPayload payload, std::shared_ptr response) { + HTTPPayload payload, std::shared_ptr response, + ReadBytesCallback *callback) { + current_url_ = url; + read_bytes_callback_ = callback; if (!response) { response = response_; } auto start = std::chrono::steady_clock::now(); HTTPRequest request(method, url, headers, payload); - std::string name = sendRequestGetReply(request, response); - auto end = std::chrono::steady_clock::now(); - - TRANSPORT_LOGI( - "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", - method_map[method].c_str(), url.c_str(), name.c_str(), - (unsigned long long)std::chrono::duration_cast( - end - start) - .count(), - response->size()); + response->clear(); - return *this; + success_callback_ = [this, method = std::move(method), url = std::move(url), + start = std::move(start), + response = std::move(response)]( + std::size_t size) -> std::shared_ptr { + auto end = std::chrono::steady_clock::now(); + TRANSPORT_LOGI( + "%s %s [%s] duration: %llu [usec] %zu [bytes]\n", + method_map[method].c_str(), url.c_str(), name_.str().c_str(), + (unsigned long long) + std::chrono::duration_cast(end - start) + .count(), + size); + + return response; + }; + + sendRequestGetReply(request, response); + return return_code_; } -std::string HTTPClientConnection::sendRequestGetReply( +void HTTPClientConnection::sendRequestGetReply( const HTTPRequest &request, std::shared_ptr &response) { const std::string &request_string = request.getRequestString(); const std::string &locator = request.getLocator(); @@ -94,38 +104,28 @@ std::string HTTPClientConnection::sendRequestGetReply( &HTTPClientConnection::processLeavingInterest, this, std::placeholders::_1, std::placeholders::_2, request_string)); - // Send content to producer piggybacking it through first interest (to fix) - - response->clear(); - // Factor hicn name using hash + name_.str(""); - std::stringstream stream; - - stream << std::hex << http::default_values::ipv6_first_word << ":"; + name_ << std::hex << http::default_values::ipv6_first_word << ":"; for (uint16_t *word = (uint16_t *)&locator_hash; std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } for (uint16_t *word = (uint16_t *)&request_hash; std::size_t(word) < (std::size_t(&request_hash) + sizeof(request_hash)); word++) { - stream << ":" << std::hex << *word; + name_ << ":" << std::hex << *word; } - stream << "|0"; - - ContentBuffer response_ptr = - std::static_pointer_cast>(response); + name_ << "|0"; - consumer_.consume(Name(stream.str()), response_ptr); + consumer_.consume(Name(name_.str())); consumer_.stop(); - - return stream.str(); } HTTPResponse HTTPClientConnection::response() { @@ -133,14 +133,6 @@ HTTPResponse HTTPClientConnection::response() { return std::move(*response_); } -void HTTPClientConnection::processPayload(ConsumerSocket &c, - std::size_t bytes_transferred, - const std::error_code &ec) { - if (ec) { - TRANSPORT_LOGE("Download failed!!"); - } -} - bool HTTPClientConnection::verifyData( ConsumerSocket &c, const core::ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { @@ -192,6 +184,49 @@ HTTPClientConnection &HTTPClientConnection::setCertificate( return *this; } +// Read buffer management +void HTTPClientConnection::readBufferAvailable( + std::unique_ptr &&buffer) noexcept { + if (!read_bytes_callback_) { + if (!read_buffer_) { + read_buffer_ = std::move(buffer); + } else { + read_buffer_->prependChain(std::move(buffer)); + } + } else { + read_bytes_callback_->onBytesReceived(std::move(buffer)); + } +} + +// Read buffer management +void HTTPClientConnection::readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error %s during download of %s", ec.message().c_str(), + current_url_.c_str()); + if (read_bytes_callback_) { + read_bytes_callback_->onError(ec); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_FAILED; +} + +void HTTPClientConnection::readSuccess(std::size_t total_size) noexcept { + auto response = success_callback_(total_size); + if (read_bytes_callback_) { + read_bytes_callback_->onSuccess(total_size); + } else { + response->reserve(total_size); + const utils::MemBuf *head = read_buffer_.get(), *current = head; + do { + response->insert(response->end(), current->data(), current->tail()); + current = current->next(); + } while (current != head); + + read_buffer_.reset(); + } + + return_code_ = HTTPClientConnection::RC::DOWNLOAD_SUCCESS; +} + } // namespace http } // namespace transport -- cgit 1.2.3-korg