/* * Copyright (c) 2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "http_server.h" #include #include #include namespace icn_httpserver { HttpServer::HttpServer(unsigned short port, std::string icn_name, size_t num_threads, long timeout_request, long timeout_send_or_receive) : config_(port, num_threads), internal_io_service_(std::make_shared()), io_service_(*internal_io_service_), acceptor_(io_service_), icn_name_(icn_name), timeout_request_(timeout_request), timeout_content_(timeout_send_or_receive) {} HttpServer::HttpServer(unsigned short port, std::string icn_name, size_t num_threads, long timeout_request, long timeout_send_or_receive, asio::io_service &ioService) : config_(port, num_threads), io_service_(ioService), acceptor_(io_service_), icn_name_(icn_name), timeout_request_(timeout_request), timeout_content_(timeout_send_or_receive) {} void HttpServer::onIcnRequest( std::shared_ptr &publisher, const uint8_t *buffer, std::size_t size, int request_id) { std::shared_ptr request = std::make_shared(publisher); request->getContent().rdbuf()->sputn((char *)buffer, size); if (!parse_request(request, request->getContent())) { return; } std::map> &icn_publishers = icn_acceptor_->getPublishers(); std::unique_lock lock(thread_list_mtx_); if (icn_publishers.size() < config_.getNum_threads()) { std::cout << "Received request for: " << request->getPath() << std::endl; publisher->attachPublisher(); std::cout << "Starting new thread" << std::endl; io_service_.dispatch([this, request, request_id]() { std::map> &icn_publishers = icn_acceptor_->getPublishers(); find_resource(nullptr, request); icn_publishers[request_id]->serveClients(); std::unique_lock lock(thread_list_mtx_); icn_publishers.erase(request_id); }); } } void HttpServer::setIcnAcceptor() { icn_acceptor_ = std::make_shared( icn_name_, std::bind(&HttpServer::onIcnRequest, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); icn_acceptor_->listen(true); } void HttpServer::spawnThreads() { if (io_service_.stopped()) { io_service_.reset(); } asio::ip::tcp::endpoint endpoint; if (config_.getAddress().size() > 0) { endpoint = asio::ip::tcp::endpoint( asio::ip::address::from_string(config_.getAddress()), config_.getPort()); } else { endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config_.getPort()); } acceptor_.open(endpoint.protocol()); acceptor_.set_option( asio::socket_base::reuse_address(config_.isReuse_address())); acceptor_.bind(endpoint); acceptor_.listen(); accept(); // If num_threads>1, start m_io_service.run() in (num_threads-1) threads for // thread-pooling socket_threads_.clear(); for (size_t c = 1; c < config_.getNum_threads(); c++) { socket_threads_.emplace_back([this]() { io_service_.run(); }); } } void HttpServer::start() { // Copy the resources to opt_resource for more efficient request processing opt_resource_.clear(); for (auto &res : resource) { for (auto &res_method : res.second) { auto it = opt_resource_.end(); for (auto opt_it = opt_resource_.begin(); opt_it != opt_resource_.end(); opt_it++) { if (res_method.first == opt_it->first) { it = opt_it; break; } } if (it == opt_resource_.end()) { opt_resource_.emplace_back(); it = opt_resource_.begin() + (opt_resource_.size() - 1); it->first = res_method.first; } it->second.emplace_back(std::regex(res.first), res_method.second); } } spawnThreads(); setIcnAcceptor(); // Wait for the rest of the threads, if any, to finish as well for (auto &t : socket_threads_) { t.join(); } // for (auto &t : icn_threads) { // t.second.get(); // } } void HttpServer::stop() { acceptor_.close(); io_service_.stop(); std::map> &icn_publishers = icn_acceptor_->getPublishers(); for (auto &p : icn_publishers) { p.second->stop(); } } void HttpServer::accept() { // Create new socket for this connection // Shared_ptr is used to pass temporary objects to the asynchronous functions std::shared_ptr socket = std::make_shared(io_service_); acceptor_.async_accept(*socket, [this, socket](const std::error_code &ec) { // Immediately start accepting a new connection accept(); if (!ec) { asio::ip::tcp::no_delay option(true); socket->set_option(option); read_request_and_content(socket); } }); } void HttpServer::send(std::shared_ptr response, SendCallback callback) const { response->send(callback); } std::shared_ptr HttpServer::set_timeout_on_socket( std::shared_ptr socket, long seconds) { std::shared_ptr timer = std::make_shared(io_service_); timer->expires_from_now(std::chrono::seconds(seconds)); timer->async_wait([socket](const std::error_code &ec) { if (!ec) { std::error_code ec; socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); socket->lowest_layer().close(); } }); return timer; } void HttpServer::read_request_and_content(std::shared_ptr socket) { // Create new streambuf (Request::streambuf) for async_read_until() // shared_ptr is used to pass temporary objects to the asynchronous functions std::shared_ptr request = std::make_shared(); request->read_remote_endpoint_data(*socket); // Set timeout on the following asio::async-read or write function std::shared_ptr timer; if (timeout_request_ > 0) { timer = set_timeout_on_socket(socket, timeout_request_); } asio::async_read_until( *socket, request->getStreambuf(), "\r\n\r\n", [this, socket, request, timer](const std::error_code &ec, size_t bytes_transferred) { if (timeout_request_ > 0) { timer->cancel(); } if (!ec) { // request->streambuf.size() is not necessarily the same as // bytes_transferred, from Asio-docs: "After a successful // async_read_until operation, the streambuf may contain additional // data beyond the delimiter" The chosen solution is to extract lines // from the stream directly when parsing the header. What is left of // the streambuf (maybe some bytes of the content) is appended to in // the async_read-function below (for retrieving content). size_t num_additional_bytes = request->getStreambuf().in_avail() - bytes_transferred; if (!parse_request(request, request->getContent())) { return; } // If content, read that as well auto it = request->getHeader().find("Content-Length"); if (it != request->getHeader().end()) { // Set timeout on the following asio::async-read or write function std::shared_ptr timer; if (timeout_content_ > 0) { timer = set_timeout_on_socket(socket, timeout_content_); } unsigned long long content_length; try { content_length = atol(it->second.c_str()); } catch (const std::exception &) { return; } if (content_length > num_additional_bytes) { asio::async_read( *socket, request->getStreambuf(), asio::transfer_exactly(content_length - num_additional_bytes), [this, socket, request, timer](const std::error_code &ec, size_t /*bytes_transferred*/) { if (timeout_content_ > 0) { timer->cancel(); } if (!ec) { find_resource(socket, request); } }); } else { if (timeout_content_ > 0) { timer->cancel(); } find_resource(socket, request); } } else { find_resource(socket, request); } } }); } bool HttpServer::parse_request(std::shared_ptr request, std::istream &stream) const { std::string line; getline(stream, line); size_t method_end; if ((method_end = line.find(' ')) != std::string::npos) { size_t path_end; if ((path_end = line.find(' ', method_end + 1)) != std::string::npos) { request->setMethod(line.substr(0, method_end)); request->setPath(line.substr(method_end + 1, path_end - method_end - 1)); size_t protocol_end; if ((protocol_end = line.find('/', path_end + 1)) != std::string::npos) { if (line.substr(path_end + 1, protocol_end - path_end - 1) != "HTTP") { return false; } request->setHttp_version( line.substr(protocol_end + 1, line.size() - protocol_end - 2)); } else { return false; } getline(stream, line); size_t param_end; while ((param_end = line.find(':')) != std::string::npos) { size_t value_start = param_end + 1; if ((value_start) < line.size()) { if (line[value_start] == ' ') { value_start++; } if (value_start < line.size()) { request->getHeader().insert(std::make_pair( line.substr(0, param_end), line.substr(value_start, line.size() - value_start - 1))); } } getline(stream, line); } } else { return false; } } else { return false; } return true; } void HttpServer::find_resource(std::shared_ptr socket, std::shared_ptr request) { // Find path- and method-match, and call write_response for (auto &res : opt_resource_) { if (request->getMethod() == res.first) { for (auto &res_path : res.second) { std::smatch sm_res; if (std::regex_match(request->getPath(), sm_res, res_path.first)) { request->setPath_match(std::move(sm_res)); write_response(socket, request, res_path.second); return; } } } } auto it_method = default_resource.find(request->getMethod()); if (it_method != default_resource.end()) { write_response(socket, request, it_method->second); return; } std::cout << "resource not found" << std::endl; } void HttpServer::write_response(std::shared_ptr socket, std::shared_ptr request, ResourceCallback &resource_function) { // Set timeout on the following asio::async-read or write function std::shared_ptr timer; if (timeout_content_ > 0 && socket) { timer = set_timeout_on_socket(socket, timeout_content_); } Response *resp; if (socket) { resp = new SocketResponse(socket); } else { resp = new IcnResponse( std::static_pointer_cast(request)->getHttpPublisher(), std::static_pointer_cast(request)->getName(), std::static_pointer_cast(request)->getPath()); } auto response = std::shared_ptr(resp, [this, request, timer, socket]( Response *response_ptr) { auto response = std::shared_ptr(response_ptr); response->setIsLast(true); send(response, [this, response, request, timer, socket](const std::error_code &ec) { if (!ec) { if (socket && timeout_content_ > 0) { timer->cancel(); } float http_version; try { http_version = atof(request->getHttp_version().c_str()); } catch (const std::exception &) { return; } auto range = request->getHeader().equal_range("Connection"); for (auto it = range.first; it != range.second; it++) { if (caseInsCompare(it->second, "close")) { return; } } if (http_version > 1.05 && socket) { read_request_and_content( std::static_pointer_cast(response)->getSocket()); } } }); }); try { resource_function(response, request); } catch (const std::exception &) { return; } } } // end namespace icn_httpserver