aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/http
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-04-03 10:03:56 +0200
committerMauro Sardara <msardara@cisco.com>2019-04-15 11:37:30 +0200
commitc365689250216861fd7727203ee6ba1049ad5778 (patch)
tree97f1f3d1a6cb7314f1292d97be6d8e8e06cc998b /libtransport/src/hicn/transport/http
parentd8ce6d98a2a726393655bd71eb81b8ef5222d6ba (diff)
[HICN-155] Consumer socket allows now to read N bytes from the network, where N is defined by the application.
Change-Id: Ib20309b40e43e4c0db09b9b484e18cd2e3ebf581 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/http')
-rw-r--r--libtransport/src/hicn/transport/http/CMakeLists.txt1
-rw-r--r--libtransport/src/hicn/transport/http/callbacks.h46
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.cc127
-rw-r--r--libtransport/src/hicn/transport/http/client_connection.h65
-rw-r--r--libtransport/src/hicn/transport/http/server_acceptor.h4
-rw-r--r--libtransport/src/hicn/transport/http/server_publisher.h2
6 files changed, 136 insertions, 109 deletions
diff --git a/libtransport/src/hicn/transport/http/CMakeLists.txt b/libtransport/src/hicn/transport/http/CMakeLists.txt
index ddcf1fdc3..b24c80195 100644
--- a/libtransport/src/hicn/transport/http/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/http/CMakeLists.txt
@@ -29,7 +29,6 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/facade.h
${CMAKE_CURRENT_SOURCE_DIR}/response.h
${CMAKE_CURRENT_SOURCE_DIR}/message.h
- ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h
)
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/hicn/transport/http/callbacks.h b/libtransport/src/hicn/transport/http/callbacks.h
deleted file mode 100644
index a0f3d5999..000000000
--- a/libtransport/src/hicn/transport/http/callbacks.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/http/server_publisher.h>
-
-#include <functional>
-#include <memory>
-
-namespace transport {
-
-namespace http {
-
-enum class RC : uint8_t {
- SUCCESS,
- CONTENT_PUBLISHED,
- ERR_UNDEFINED,
-};
-
-using OnHttpRequest =
- std::function<void(std::shared_ptr<HTTPServerPublisher>&, const uint8_t*,
- std::size_t, int request_id)>;
-using DeadlineTimerCallback = std::function<void(const std::error_code& e)>;
-using ReceiveCallback = std::function<void(const std::vector<uint8_t>&)>;
-using OnPayloadCallback =
- std::function<RC(const std::error_code& ec, const core::Name& name,
- const ContentBuffer& payload)>;
-using ContentSentCallback =
- std::function<void(const std::error_code&, const core::Name&)>;
-
-} // namespace http
-
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc
index b31d89b6b..fb7dbdfac 100644
--- a/libtransport/src/hicn/transport/http/client_connection.cc
+++ b/libtransport/src/hicn/transport/http/client_connection.cc
@@ -27,6 +27,8 @@ using namespace transport;
HTTPClientConnection::HTTPClientConnection()
: consumer_(TransportProtocolAlgorithms::RAAQM, io_service_),
+ read_bytes_callback_(nullptr),
+ read_buffer_(nullptr),
response_(std::make_shared<HTTPResponse>()),
timer_(nullptr) {
consumer_.setSocketOption(
@@ -35,11 +37,7 @@ HTTPClientConnection::HTTPClientConnection()
&HTTPClientConnection::verifyData, this, std::placeholders::_1,
std::placeholders::_2));
- consumer_.setSocketOption(
- ConsumerCallbacksOptions::CONTENT_RETRIEVED,
- (ConsumerContentCallback)std::bind(
- &HTTPClientConnection::processPayload, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this);
consumer_.connect();
std::shared_ptr<typename ConsumerSocket::Portal> portal;
@@ -47,36 +45,48 @@ HTTPClientConnection::HTTPClientConnection()
timer_ = std::make_unique<asio::steady_timer>(portal->getIoService());
}
-HTTPClientConnection &HTTPClientConnection::get(
+HTTPClientConnection::RC HTTPClientConnection::get(
const std::string &url, HTTPHeaders headers, HTTPPayload payload,
- std::shared_ptr<HTTPResponse> response) {
- return sendRequest(url, HTTPMethod::GET, headers, payload, response);
+ std::shared_ptr<HTTPResponse> response, ReadBytesCallback *callback) {
+ return sendRequest(url, HTTPMethod::GET, headers, payload, response,
+ callback);
}
-HTTPClientConnection &HTTPClientConnection::sendRequest(
+HTTPClientConnection::RC HTTPClientConnection::sendRequest(
const std::string &url, HTTPMethod method, HTTPHeaders headers,
- HTTPPayload payload, std::shared_ptr<HTTPResponse> response) {
+ HTTPPayload payload, std::shared_ptr<HTTPResponse> response,
+ ReadBytesCallback *callback) {
+ current_url_ = url;
+ read_bytes_callback_ = callback;
if (!response) {
response = response_;
}
auto start = std::chrono::steady_clock::now();
HTTPRequest request(method, url, headers, payload);
- std::string name = sendRequestGetReply(request, response);
- auto end = std::chrono::steady_clock::now();
-
- TRANSPORT_LOGI(
- "%s %s [%s] duration: %llu [usec] %zu [bytes]\n",
- method_map[method].c_str(), url.c_str(), name.c_str(),
- (unsigned long long)std::chrono::duration_cast<std::chrono::microseconds>(
- end - start)
- .count(),
- response->size());
+ response->clear();
- return *this;
+ success_callback_ = [this, method = std::move(method), url = std::move(url),
+ start = std::move(start),
+ response = std::move(response)](
+ std::size_t size) -> std::shared_ptr<HTTPResponse> {
+ auto end = std::chrono::steady_clock::now();
+ TRANSPORT_LOGI(
+ "%s %s [%s] duration: %llu [usec] %zu [bytes]\n",
+ method_map[method].c_str(), url.c_str(), name_.str().c_str(),
+ (unsigned long long)
+ std::chrono::duration_cast<std::chrono::microseconds>(end - start)
+ .count(),
+ size);
+
+ return response;
+ };
+
+ sendRequestGetReply(request, response);
+ return return_code_;
}
-std::string HTTPClientConnection::sendRequestGetReply(
+void HTTPClientConnection::sendRequestGetReply(
const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response) {
const std::string &request_string = request.getRequestString();
const std::string &locator = request.getLocator();
@@ -94,38 +104,28 @@ std::string HTTPClientConnection::sendRequestGetReply(
&HTTPClientConnection::processLeavingInterest, this,
std::placeholders::_1, std::placeholders::_2, request_string));
- // Send content to producer piggybacking it through first interest (to fix)
-
- response->clear();
-
// Factor hicn name using hash
+ name_.str("");
- std::stringstream stream;
-
- stream << std::hex << http::default_values::ipv6_first_word << ":";
+ name_ << std::hex << http::default_values::ipv6_first_word << ":";
for (uint16_t *word = (uint16_t *)&locator_hash;
std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash));
word++) {
- stream << ":" << std::hex << *word;
+ name_ << ":" << std::hex << *word;
}
for (uint16_t *word = (uint16_t *)&request_hash;
std::size_t(word) < (std::size_t(&request_hash) + sizeof(request_hash));
word++) {
- stream << ":" << std::hex << *word;
+ name_ << ":" << std::hex << *word;
}
- stream << "|0";
-
- ContentBuffer response_ptr =
- std::static_pointer_cast<std::vector<uint8_t>>(response);
+ name_ << "|0";
- consumer_.consume(Name(stream.str()), response_ptr);
+ consumer_.consume(Name(name_.str()));
consumer_.stop();
-
- return stream.str();
}
HTTPResponse HTTPClientConnection::response() {
@@ -133,14 +133,6 @@ HTTPResponse HTTPClientConnection::response() {
return std::move(*response_);
}
-void HTTPClientConnection::processPayload(ConsumerSocket &c,
- std::size_t bytes_transferred,
- const std::error_code &ec) {
- if (ec) {
- TRANSPORT_LOGE("Download failed!!");
- }
-}
-
bool HTTPClientConnection::verifyData(
ConsumerSocket &c, const core::ContentObject &contentObject) {
if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) {
@@ -192,6 +184,49 @@ HTTPClientConnection &HTTPClientConnection::setCertificate(
return *this;
}
+// Read buffer management
+void HTTPClientConnection::readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
+ if (!read_bytes_callback_) {
+ if (!read_buffer_) {
+ read_buffer_ = std::move(buffer);
+ } else {
+ read_buffer_->prependChain(std::move(buffer));
+ }
+ } else {
+ read_bytes_callback_->onBytesReceived(std::move(buffer));
+ }
+}
+
+// Read buffer management
+void HTTPClientConnection::readError(const std::error_code ec) noexcept {
+ TRANSPORT_LOGE("Error %s during download of %s", ec.message().c_str(),
+ current_url_.c_str());
+ if (read_bytes_callback_) {
+ read_bytes_callback_->onError(ec);
+ }
+
+ return_code_ = HTTPClientConnection::RC::DOWNLOAD_FAILED;
+}
+
+void HTTPClientConnection::readSuccess(std::size_t total_size) noexcept {
+ auto response = success_callback_(total_size);
+ if (read_bytes_callback_) {
+ read_bytes_callback_->onSuccess(total_size);
+ } else {
+ response->reserve(total_size);
+ const utils::MemBuf *head = read_buffer_.get(), *current = head;
+ do {
+ response->insert(response->end(), current->data(), current->tail());
+ current = current->next();
+ } while (current != head);
+
+ read_buffer_.reset();
+ }
+
+ return_code_ = HTTPClientConnection::RC::DOWNLOAD_SUCCESS;
+}
+
} // namespace http
} // namespace transport
diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h
index faad1864c..6c150f848 100644
--- a/libtransport/src/hicn/transport/http/client_connection.h
+++ b/libtransport/src/hicn/transport/http/client_connection.h
@@ -31,18 +31,30 @@ namespace http {
using namespace interface;
using namespace core;
-class HTTPClientConnection {
+class HTTPClientConnection : public ConsumerSocket::ReadCallback {
+ static constexpr uint32_t max_buffer_capacity = 64 * 1024;
+
public:
+ class ReadBytesCallback {
+ public:
+ virtual void onBytesReceived(std::unique_ptr<utils::MemBuf> &&buffer) = 0;
+ virtual void onSuccess(std::size_t bytes) = 0;
+ virtual void onError(const std::error_code ec) = 0;
+ };
+
+ enum class RC : uint32_t { DOWNLOAD_FAILED, DOWNLOAD_SUCCESS };
+
HTTPClientConnection();
- HTTPClientConnection &get(const std::string &url, HTTPHeaders headers = {},
- HTTPPayload payload = {},
- std::shared_ptr<HTTPResponse> response = nullptr);
+ RC get(const std::string &url, HTTPHeaders headers = {},
+ HTTPPayload payload = {},
+ std::shared_ptr<HTTPResponse> response = nullptr,
+ ReadBytesCallback *callback = nullptr);
- HTTPClientConnection &sendRequest(
- const std::string &url, HTTPMethod method, HTTPHeaders headers = {},
- HTTPPayload payload = {},
- std::shared_ptr<HTTPResponse> response = nullptr);
+ RC sendRequest(const std::string &url, HTTPMethod method,
+ HTTPHeaders headers = {}, HTTPPayload payload = {},
+ std::shared_ptr<HTTPResponse> response = nullptr,
+ ReadBytesCallback *callback = nullptr);
HTTPResponse response();
@@ -55,11 +67,8 @@ class HTTPClientConnection {
HTTPClientConnection &setCertificate(const std::string &cert_path);
private:
- void processPayload(interface::ConsumerSocket &c,
- std::size_t bytes_transferred, const std::error_code &ec);
-
- std::string sendRequestGetReply(const HTTPRequest &request,
- std::shared_ptr<HTTPResponse> &response);
+ void sendRequestGetReply(const HTTPRequest &request,
+ std::shared_ptr<HTTPResponse> &response);
bool verifyData(interface::ConsumerSocket &c,
const core::ContentObject &contentObject);
@@ -68,12 +77,42 @@ class HTTPClientConnection {
const core::Interest &interest,
std::string &payload);
+ // Read callback
+ bool isBufferMovable() noexcept override { return true; }
+ void getReadBuffer(uint8_t **application_buffer,
+ size_t *max_length) override {}
+ void readDataAvailable(size_t length) noexcept override {}
+ size_t maxBufferSize() const override { return max_buffer_capacity; }
+ void readBufferAvailable(
+ std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
+ void readError(const std::error_code ec) noexcept override;
+ void readSuccess(std::size_t total_size) noexcept override;
+
asio::io_service io_service_;
+ // The consumer socket
ConsumerSocket consumer_;
+ // The current url provided by the application
+ std::string current_url_;
+ // The current hICN name used for downloading
+ std::stringstream name_;
+ // Function to be called when the read is successful
+ std::function<std::shared_ptr<HTTPResponse>(std::size_t)> success_callback_;
+ // Return code for current download
+ RC return_code_;
+
+ // Application provided callback for saving the received content during
+ // the download. If this callback is used, the HTTPClient will NOT save
+ // any byte internally.
+ ReadBytesCallback *read_bytes_callback_;
+
+ // Internal read buffer and HTTP response, to be used if the application does
+ // not provide any read_bytes_callback
+ std::unique_ptr<utils::MemBuf> read_buffer_;
std::shared_ptr<HTTPResponse> response_;
+ // Timer
std::unique_ptr<asio::steady_timer> timer_;
};
diff --git a/libtransport/src/hicn/transport/http/server_acceptor.h b/libtransport/src/hicn/transport/http/server_acceptor.h
index 99480028a..4e7350b76 100644
--- a/libtransport/src/hicn/transport/http/server_acceptor.h
+++ b/libtransport/src/hicn/transport/http/server_acceptor.h
@@ -15,7 +15,6 @@
#pragma once
-#include <hicn/transport/http/callbacks.h>
#include <hicn/transport/http/default_values.h>
#include <hicn/transport/http/request.h>
#include <hicn/transport/http/server_publisher.h>
@@ -31,6 +30,9 @@ namespace http {
class HTTPServerAcceptor {
friend class HTTPServerPublisher;
+ using OnHttpRequest =
+ std::function<void(std::shared_ptr<HTTPServerPublisher> &,
+ const uint8_t *, std::size_t, int request_id)>;
public:
HTTPServerAcceptor(std::string &&server_locator, OnHttpRequest callback);
diff --git a/libtransport/src/hicn/transport/http/server_publisher.h b/libtransport/src/hicn/transport/http/server_publisher.h
index f3c39c3fe..1f12fd8f9 100644
--- a/libtransport/src/hicn/transport/http/server_publisher.h
+++ b/libtransport/src/hicn/transport/http/server_publisher.h
@@ -63,8 +63,6 @@ class HTTPServerPublisher {
std::unique_ptr<ProducerSocket> producer_;
ProducerInterestCallback interest_enter_callback_;
utils::UserCallback wait_callback_;
-
- ContentBuffer receive_buffer_;
};
} // end namespace http