aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy/src/ATSConnector.cc
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-05-14 20:27:11 +0200
committerMauro Sardara <msardara@cisco.com>2020-05-20 11:07:30 +0200
commita27536f3d1ce6c2f46aef61a29dd1f516644e663 (patch)
treeabcf4c6285dbd824f21095d33293a48d58d74889 /apps/http-proxy/src/ATSConnector.cc
parent81fb39606b069fbece973995572fa7f90ea1950a (diff)
[HICN-614] Add client HTTP proxy (TCP->hICN)
Change-Id: Ieaa875edff98404676083bab91233b98ce50c8d0 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'apps/http-proxy/src/ATSConnector.cc')
-rw-r--r--apps/http-proxy/src/ATSConnector.cc286
1 files changed, 0 insertions, 286 deletions
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc
deleted file mode 100644
index a9b889941..000000000
--- a/apps/http-proxy/src/ATSConnector.cc
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Copyright (c) 2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ATSConnector.h"
-#include "HTTP1.xMessageFastParser.h"
-
-#include <hicn/transport/utils/branch_prediction.h>
-#include <hicn/transport/utils/log.h>
-#include <iostream>
-
-namespace transport {
-
-ATSConnector::ATSConnector(asio::io_service &io_service,
- std::string &ip_address, std::string &port,
- ContentReceivedCallback receive_callback,
- OnReconnect on_reconnect_callback)
- : io_service_(io_service),
- socket_(io_service_),
- resolver_(io_service_),
- endpoint_iterator_(resolver_.resolve({ip_address, port})),
- timer_(io_service),
- is_reconnection_(false),
- data_available_(false),
- content_length_(0),
- is_last_chunk_(false),
- chunked_(false),
- receive_callback_(receive_callback),
- on_reconnect_callback_(on_reconnect_callback) {
- input_buffer_.prepare(buffer_size + 2048);
- state_ = ConnectorState::CONNECTING;
- doConnect();
-}
-
-ATSConnector::~ATSConnector() {}
-
-void ATSConnector::send(const uint8_t *packet, std::size_t len,
- ContentSentCallback &&content_sent) {
- asio::async_write(
- socket_, asio::buffer(packet, len),
- [content_sent = std::move(content_sent)](
- std::error_code ec, std::size_t /*length*/) { content_sent(); });
-}
-
-void ATSConnector::send(utils::MemBuf *buffer,
- ContentSentCallback &&content_sent) {
- io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() {
- bool write_in_progress = !write_msgs_.empty();
- write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer),
- std::move(callback));
- if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) {
- if (!write_in_progress) {
- doWrite();
- }
- } else {
- TRANSPORT_LOGD("Tell the handle connect it has data to write");
- data_available_ = true;
- }
- });
-}
-
-void ATSConnector::close() {
- if (state_ != ConnectorState::CLOSED) {
- state_ = ConnectorState::CLOSED;
- if (socket_.is_open()) {
- socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- // on_disconnect_callback_();
- }
- }
-}
-
-void ATSConnector::doWrite() {
- auto &buffer = write_msgs_.front().first;
-
- asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()),
- [this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_FALSE(!ec)) {
- TRANSPORT_LOGD("Content successfully sent!");
- write_msgs_.front().second();
- write_msgs_.pop_front();
- if (!write_msgs_.empty()) {
- doWrite();
- }
- } else {
- TRANSPORT_LOGD("Content NOT sent!");
- }
- });
-} // namespace transport
-
-void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- content_length_ -= length;
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
- input_buffer_.consume(input_buffer_.size());
-
- if (!content_length_) {
- if (!chunked_ || is_last_chunk_) {
- doReadHeader();
- } else {
- doReadChunkedHeader();
- }
- } else {
- auto to_read =
- content_length_ >= buffer_size ? buffer_size : content_length_;
- asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
- std::placeholders::_1, std::placeholders::_2));
- }
- } else if (ec == asio::error::eof) {
- input_buffer_.consume(input_buffer_.size());
- tryReconnection();
- }
-}
-
-void ATSConnector::doReadBody(std::size_t body_size,
- std::size_t additional_bytes) {
- auto bytes_to_read =
- body_size > additional_bytes ? (body_size - additional_bytes) : 0;
-
- auto to_read = bytes_to_read >= buffer_size
- ? (buffer_size - input_buffer_.size())
- : bytes_to_read;
-
- is_last_chunk_ = chunked_ && body_size == 5;
-
- if (to_read > 0) {
- content_length_ = bytes_to_read;
- asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this,
- std::placeholders::_1, std::placeholders::_2));
- } else {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
- false);
- input_buffer_.consume(body_size);
-
- if (!chunked_ || is_last_chunk_) {
- doReadHeader();
- } else {
- doReadChunkedHeader();
- }
- }
-}
-
-void ATSConnector::doReadChunkedHeader() {
- asio::async_read_until(
- socket_, input_buffer_, "\r\n",
- [this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- std::size_t chunk_size =
- std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 +
- length;
- auto additional_bytes = input_buffer_.size();
- doReadBody(chunk_size, additional_bytes);
- } else {
- input_buffer_.consume(input_buffer_.size());
- tryReconnection();
- }
- });
-}
-
-void ATSConnector::doReadHeader() {
- asio::async_read_until(
- socket_, input_buffer_, "\r\n\r\n",
- [this](std::error_code ec, std::size_t length) {
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- const uint8_t *buffer =
- asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
-
- // Try to get content length, if available
- auto it = headers.find(HTTPMessageFastParser::content_length);
- std::size_t size = 0;
- if (it != headers.end()) {
- size = std::stoull(it->second);
- chunked_ = false;
- } else {
- it = headers.find(HTTPMessageFastParser::transfer_encoding);
- if (it != headers.end() &&
- it->second.compare(HTTPMessageFastParser::chunked) == 0) {
- chunked_ = true;
- }
- }
-
- receive_callback_(buffer, length, !size && !chunked_, true);
- auto additional_bytes = input_buffer_.size() - length;
- input_buffer_.consume(length);
-
- if (!chunked_) {
- doReadBody(size, additional_bytes);
- } else {
- doReadChunkedHeader();
- }
- } else {
- input_buffer_.consume(input_buffer_.size());
- tryReconnection();
- }
- });
-}
-
-void ATSConnector::tryReconnection() {
- TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n");
- if (state_ == ConnectorState::CONNECTED) {
- state_ = ConnectorState::CONNECTING;
- is_reconnection_ = true;
- io_service_.post([this]() {
- if (socket_.is_open()) {
- // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
- socket_.close();
- }
- startConnectionTimer();
- doConnect();
- });
- }
-}
-
-void ATSConnector::doConnect() {
- asio::async_connect(socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
- if (!ec) {
- timer_.cancel();
- state_ = ConnectorState::CONNECTED;
-
- asio::ip::tcp::no_delay noDelayOption(true);
- socket_.set_option(noDelayOption);
-
- // on_reconnect_callback_();
-
- doReadHeader();
-
- if (data_available_ && !write_msgs_.empty()) {
- data_available_ = false;
- doWrite();
- }
-
- if (is_reconnection_) {
- is_reconnection_ = false;
- TRANSPORT_LOGD("Connection recovered!");
- }
-
- } else {
- TRANSPORT_LOGE("Impossible to reconnect: %s",
- ec.message().c_str());
- close();
- }
- });
-}
-
-bool ATSConnector::checkConnected() {
- return state_ == ConnectorState::CONNECTED;
-}
-
-void ATSConnector::startConnectionTimer() {
- timer_.expires_from_now(std::chrono::seconds(10));
- timer_.async_wait(
- std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1));
-}
-
-void ATSConnector::handleDeadline(const std::error_code &ec) {
- if (!ec) {
- io_service_.post([this]() {
- socket_.close();
- TRANSPORT_LOGE("Error connecting. Is the server running?\n");
- io_service_.stop();
- });
- }
-}
-
-} // namespace transport