diff options
Diffstat (limited to 'websocketpp/transport/asio/connection.hpp')
-rw-r--r-- | websocketpp/transport/asio/connection.hpp | 1204 |
1 files changed, 1204 insertions, 0 deletions
diff --git a/websocketpp/transport/asio/connection.hpp b/websocketpp/transport/asio/connection.hpp new file mode 100644 index 00000000..8eb8c759 --- /dev/null +++ b/websocketpp/transport/asio/connection.hpp @@ -0,0 +1,1204 @@ +/* + * Copyright (c) 2015, Peter Thorson. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the WebSocket++ Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP +#define WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP + +#include <websocketpp/transport/asio/base.hpp> + +#include <websocketpp/transport/base/connection.hpp> + +#include <websocketpp/logger/levels.hpp> +#include <websocketpp/http/constants.hpp> + +#include <websocketpp/base64/base64.hpp> +#include <websocketpp/error.hpp> +#include <websocketpp/uri.hpp> + +#include <websocketpp/common/asio.hpp> +#include <websocketpp/common/chrono.hpp> +#include <websocketpp/common/cpp11.hpp> +#include <websocketpp/common/memory.hpp> +#include <websocketpp/common/functional.hpp> +#include <websocketpp/common/connection_hdl.hpp> + +#include <istream> +#include <sstream> +#include <string> +#include <vector> + +namespace websocketpp { +namespace transport { +namespace asio { + +typedef lib::function<void(connection_hdl)> tcp_init_handler; + +/// Asio based connection transport component +/** + * transport::asio::connection implements a connection transport component using + * Asio that works with the transport::asio::endpoint endpoint transport + * component. + */ +template <typename config> +class connection : public config::socket_type::socket_con_type { +public: + /// Type of this connection transport component + typedef connection<config> type; + /// Type of a shared pointer to this connection transport component + typedef lib::shared_ptr<type> ptr; + + /// Type of the socket connection component + typedef typename config::socket_type::socket_con_type socket_con_type; + /// Type of a shared pointer to the socket connection component + typedef typename socket_con_type::ptr socket_con_ptr; + /// Type of this transport's access logging policy + typedef typename config::alog_type alog_type; + /// Type of this transport's error logging policy + typedef typename config::elog_type elog_type; + + typedef typename config::request_type request_type; + typedef typename request_type::ptr request_ptr; + typedef typename config::response_type response_type; + typedef typename response_type::ptr response_ptr; + + /// Type of a pointer to the Asio io_service being used + typedef lib::asio::io_service * io_service_ptr; + /// Type of a pointer to the Asio io_service::strand being used + typedef lib::shared_ptr<lib::asio::io_service::strand> strand_ptr; + /// Type of a pointer to the Asio timer class + typedef lib::shared_ptr<lib::asio::steady_timer> timer_ptr; + + // connection is friends with its associated endpoint to allow the endpoint + // to call private/protected utility methods that we don't want to expose + // to the public api. + friend class endpoint<config>; + + // generate and manage our own io_service + explicit connection(bool is_server, alog_type & alog, elog_type & elog) + : m_is_server(is_server) + , m_alog(alog) + , m_elog(elog) + { + m_alog.write(log::alevel::devel,"asio con transport constructor"); + } + + /// Get a shared pointer to this component + ptr get_shared() { + return lib::static_pointer_cast<type>(socket_con_type::get_shared()); + } + + bool is_secure() const { + return socket_con_type::is_secure(); + } + + /// Set uri hook + /** + * Called by the endpoint as a connection is being established to provide + * the uri being connected to to the transport layer. + * + * This transport policy doesn't use the uri except to forward it to the + * socket layer. + * + * @since 0.6.0 + * + * @param u The uri to set + */ + void set_uri(uri_ptr u) { + socket_con_type::set_uri(u); + } + + /// Sets the tcp pre init handler + /** + * The tcp pre init handler is called after the raw tcp connection has been + * established but before any additional wrappers (proxy connects, TLS + * handshakes, etc) have been performed. + * + * @since 0.3.0 + * + * @param h The handler to call on tcp pre init. + */ + void set_tcp_pre_init_handler(tcp_init_handler h) { + m_tcp_pre_init_handler = h; + } + + /// Sets the tcp pre init handler (deprecated) + /** + * The tcp pre init handler is called after the raw tcp connection has been + * established but before any additional wrappers (proxy connects, TLS + * handshakes, etc) have been performed. + * + * @deprecated Use set_tcp_pre_init_handler instead + * + * @param h The handler to call on tcp pre init. + */ + void set_tcp_init_handler(tcp_init_handler h) { + set_tcp_pre_init_handler(h); + } + + /// Sets the tcp post init handler + /** + * The tcp post init handler is called after the tcp connection has been + * established and all additional wrappers (proxy connects, TLS handshakes, + * etc have been performed. This is fired before any bytes are read or any + * WebSocket specific handshake logic has been performed. + * + * @since 0.3.0 + * + * @param h The handler to call on tcp post init. + */ + void set_tcp_post_init_handler(tcp_init_handler h) { + m_tcp_post_init_handler = h; + } + + /// Set the proxy to connect through (exception free) + /** + * The URI passed should be a complete URI including scheme. For example: + * http://proxy.example.com:8080/ + * + * The proxy must be set up as an explicit (CONNECT) proxy allowed to + * connect to the port you specify. Traffic to the proxy is not encrypted. + * + * @param uri The full URI of the proxy to connect to. + * + * @param ec A status value + */ + void set_proxy(std::string const & uri, lib::error_code & ec) { + // TODO: return errors for illegal URIs here? + // TODO: should https urls be illegal for the moment? + m_proxy = uri; + m_proxy_data = lib::make_shared<proxy_data>(); + ec = lib::error_code(); + } + + /// Set the proxy to connect through (exception) + void set_proxy(std::string const & uri) { + lib::error_code ec; + set_proxy(uri,ec); + if (ec) { throw exception(ec); } + } + + /// Set the basic auth credentials to use (exception free) + /** + * The URI passed should be a complete URI including scheme. For example: + * http://proxy.example.com:8080/ + * + * The proxy must be set up as an explicit proxy + * + * @param username The username to send + * + * @param password The password to send + * + * @param ec A status value + */ + void set_proxy_basic_auth(std::string const & username, std::string const & + password, lib::error_code & ec) + { + if (!m_proxy_data) { + ec = make_error_code(websocketpp::error::invalid_state); + return; + } + + // TODO: username can't contain ':' + std::string val = "Basic "+base64_encode(username + ":" + password); + m_proxy_data->req.replace_header("Proxy-Authorization",val); + ec = lib::error_code(); + } + + /// Set the basic auth credentials to use (exception) + void set_proxy_basic_auth(std::string const & username, std::string const & + password) + { + lib::error_code ec; + set_proxy_basic_auth(username,password,ec); + if (ec) { throw exception(ec); } + } + + /// Set the proxy timeout duration (exception free) + /** + * Duration is in milliseconds. Default value is based on the transport + * config + * + * @param duration The number of milliseconds to wait before aborting the + * proxy connection. + * + * @param ec A status value + */ + void set_proxy_timeout(long duration, lib::error_code & ec) { + if (!m_proxy_data) { + ec = make_error_code(websocketpp::error::invalid_state); + return; + } + + m_proxy_data->timeout_proxy = duration; + ec = lib::error_code(); + } + + /// Set the proxy timeout duration (exception) + void set_proxy_timeout(long duration) { + lib::error_code ec; + set_proxy_timeout(duration,ec); + if (ec) { throw exception(ec); } + } + + std::string const & get_proxy() const { + return m_proxy; + } + + /// Get the remote endpoint address + /** + * The iostream transport has no information about the ultimate remote + * endpoint. It will return the string "iostream transport". To indicate + * this. + * + * TODO: allow user settable remote endpoint addresses if this seems useful + * + * @return A string identifying the address of the remote endpoint + */ + std::string get_remote_endpoint() const { + lib::error_code ec; + + std::string ret = socket_con_type::get_remote_endpoint(ec); + + if (ec) { + m_elog.write(log::elevel::info,ret); + return "Unknown"; + } else { + return ret; + } + } + + /// Get the connection handle + connection_hdl get_handle() const { + return m_connection_hdl; + } + + /// Call back a function after a period of time. + /** + * Sets a timer that calls back a function after the specified period of + * milliseconds. Returns a handle that can be used to cancel the timer. + * A cancelled timer will return the error code error::operation_aborted + * A timer that expired will return no error. + * + * @param duration Length of time to wait in milliseconds + * + * @param callback The function to call back when the timer has expired + * + * @return A handle that can be used to cancel the timer if it is no longer + * needed. + */ + timer_ptr set_timer(long duration, timer_handler callback) { + timer_ptr new_timer = lib::make_shared<lib::asio::steady_timer>( + lib::ref(*m_io_service), + lib::asio::milliseconds(duration) + ); + + if (config::enable_multithreading) { + new_timer->async_wait(m_strand->wrap(lib::bind( + &type::handle_timer, get_shared(), + new_timer, + callback, + lib::placeholders::_1 + ))); + } else { + new_timer->async_wait(lib::bind( + &type::handle_timer, get_shared(), + new_timer, + callback, + lib::placeholders::_1 + )); + } + + return new_timer; + } + + /// Timer callback + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * TODO: candidate for protected status + * + * @param post_timer Pointer to the timer in question + * @param callback The function to call back + * @param ec The status code + */ + void handle_timer(timer_ptr, timer_handler callback, + lib::asio::error_code const & ec) + { + if (ec) { + if (ec == lib::asio::error::operation_aborted) { + callback(make_error_code(transport::error::operation_aborted)); + } else { + log_err(log::elevel::info,"asio handle_timer",ec); + callback(make_error_code(error::pass_through)); + } + } else { + callback(lib::error_code()); + } + } + + /// Get a pointer to this connection's strand + strand_ptr get_strand() { + return m_strand; + } + + /// Get the internal transport error code for a closed/failed connection + /** + * Retrieves a machine readable detailed error code indicating the reason + * that the connection was closed or failed. Valid only after the close or + * fail handler is called. + * + * Primarily used if you are using mismatched asio / system_error + * implementations such as `boost::asio` with `std::system_error`. In these + * cases the transport error type is different than the library error type + * and some WebSocket++ functions that return transport errors via the + * library error code type will be coerced into a catch all `pass_through` + * or `tls_error` error. This method will return the original machine + * readable transport error in the native type. + * + * @since 0.7.0 + * + * @return Error code indicating the reason the connection was closed or + * failed + */ + lib::asio::error_code get_transport_ec() const { + return m_tec; + } + + /// Initialize transport for reading + /** + * init_asio is called once immediately after construction to initialize + * Asio components to the io_service + * + * The transport initialization sequence consists of the following steps: + * - Pre-init: the underlying socket is initialized to the point where + * bytes may be written. No bytes are actually written in this stage + * - Proxy negotiation: if a proxy is set, a request is made to it to start + * a tunnel to the final destination. This stage ends when the proxy is + * ready to forward the + * next byte to the remote endpoint. + * - Post-init: Perform any i/o with the remote endpoint, such as setting up + * tunnels for encryption. This stage ends when the connection is ready to + * read or write the WebSocket handshakes. At this point the original + * callback function is called. + */ +protected: + void init(init_handler callback) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection init"); + } + + // TODO: pre-init timeout. Right now no implemented socket policies + // actually have an asyncronous pre-init + + socket_con_type::pre_init( + lib::bind( + &type::handle_pre_init, + get_shared(), + callback, + lib::placeholders::_1 + ) + ); + } + + /// initialize the proxy buffers and http parsers + /** + * + * @param authority The address of the server we want the proxy to tunnel to + * in the format of a URI authority (host:port) + * + * @return Status code indicating what errors occurred, if any + */ + lib::error_code proxy_init(std::string const & authority) { + if (!m_proxy_data) { + return websocketpp::error::make_error_code( + websocketpp::error::invalid_state); + } + m_proxy_data->req.set_version("HTTP/1.1"); + m_proxy_data->req.set_method("CONNECT"); + + m_proxy_data->req.set_uri(authority); + m_proxy_data->req.replace_header("Host",authority); + + return lib::error_code(); + } + + /// Finish constructing the transport + /** + * init_asio is called once immediately after construction to initialize + * Asio components to the io_service. + * + * @param io_service A pointer to the io_service to register with this + * connection + * + * @return Status code for the success or failure of the initialization + */ + lib::error_code init_asio (io_service_ptr io_service) { + m_io_service = io_service; + + if (config::enable_multithreading) { + m_strand = lib::make_shared<lib::asio::io_service::strand>( + lib::ref(*io_service)); + } + + lib::error_code ec = socket_con_type::init_asio(io_service, m_strand, + m_is_server); + + return ec; + } + + void handle_pre_init(init_handler callback, lib::error_code const & ec) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection handle pre_init"); + } + + if (m_tcp_pre_init_handler) { + m_tcp_pre_init_handler(m_connection_hdl); + } + + if (ec) { + callback(ec); + } + + // If we have a proxy set issue a proxy connect, otherwise skip to + // post_init + if (!m_proxy.empty()) { + proxy_write(callback); + } else { + post_init(callback); + } + } + + void post_init(init_handler callback) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection post_init"); + } + + timer_ptr post_timer; + + if (config::timeout_socket_post_init > 0) { + post_timer = set_timer( + config::timeout_socket_post_init, + lib::bind( + &type::handle_post_init_timeout, + get_shared(), + post_timer, + callback, + lib::placeholders::_1 + ) + ); + } + + socket_con_type::post_init( + lib::bind( + &type::handle_post_init, + get_shared(), + post_timer, + callback, + lib::placeholders::_1 + ) + ); + } + + /// Post init timeout callback + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * @param post_timer Pointer to the timer in question + * @param callback The function to call back + * @param ec The status code + */ + void handle_post_init_timeout(timer_ptr, init_handler callback, + lib::error_code const & ec) + { + lib::error_code ret_ec; + + if (ec) { + if (ec == transport::error::operation_aborted) { + m_alog.write(log::alevel::devel, + "asio post init timer cancelled"); + return; + } + + log_err(log::elevel::devel,"asio handle_post_init_timeout",ec); + ret_ec = ec; + } else { + if (socket_con_type::get_ec()) { + ret_ec = socket_con_type::get_ec(); + } else { + ret_ec = make_error_code(transport::error::timeout); + } + } + + m_alog.write(log::alevel::devel, "Asio transport post-init timed out"); + cancel_socket_checked(); + callback(ret_ec); + } + + /// Post init timeout callback + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * @param post_timer Pointer to the timer in question + * @param callback The function to call back + * @param ec The status code + */ + void handle_post_init(timer_ptr post_timer, init_handler callback, + lib::error_code const & ec) + { + if (ec == transport::error::operation_aborted || + (post_timer && lib::asio::is_neg(post_timer->expires_from_now()))) + { + m_alog.write(log::alevel::devel,"post_init cancelled"); + return; + } + + if (post_timer) { + post_timer->cancel(); + } + + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection handle_post_init"); + } + + if (m_tcp_post_init_handler) { + m_tcp_post_init_handler(m_connection_hdl); + } + + callback(ec); + } + + void proxy_write(init_handler callback) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection proxy_write"); + } + + if (!m_proxy_data) { + m_elog.write(log::elevel::library, + "assertion failed: !m_proxy_data in asio::connection::proxy_write"); + callback(make_error_code(error::general)); + return; + } + + m_proxy_data->write_buf = m_proxy_data->req.raw(); + + m_bufs.push_back(lib::asio::buffer(m_proxy_data->write_buf.data(), + m_proxy_data->write_buf.size())); + + m_alog.write(log::alevel::devel,m_proxy_data->write_buf); + + // Set a timer so we don't wait forever for the proxy to respond + m_proxy_data->timer = this->set_timer( + m_proxy_data->timeout_proxy, + lib::bind( + &type::handle_proxy_timeout, + get_shared(), + callback, + lib::placeholders::_1 + ) + ); + + // Send proxy request + if (config::enable_multithreading) { + lib::asio::async_write( + socket_con_type::get_next_layer(), + m_bufs, + m_strand->wrap(lib::bind( + &type::handle_proxy_write, get_shared(), + callback, + lib::placeholders::_1 + )) + ); + } else { + lib::asio::async_write( + socket_con_type::get_next_layer(), + m_bufs, + lib::bind( + &type::handle_proxy_write, get_shared(), + callback, + lib::placeholders::_1 + ) + ); + } + } + + void handle_proxy_timeout(init_handler callback, lib::error_code const & ec) + { + if (ec == transport::error::operation_aborted) { + m_alog.write(log::alevel::devel, + "asio handle_proxy_write timer cancelled"); + return; + } else if (ec) { + log_err(log::elevel::devel,"asio handle_proxy_write",ec); + callback(ec); + } else { + m_alog.write(log::alevel::devel, + "asio handle_proxy_write timer expired"); + cancel_socket_checked(); + callback(make_error_code(transport::error::timeout)); + } + } + + void handle_proxy_write(init_handler callback, + lib::asio::error_code const & ec) + { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel, + "asio connection handle_proxy_write"); + } + + m_bufs.clear(); + + // Timer expired or the operation was aborted for some reason. + // Whatever aborted it will be issuing the callback so we are safe to + // return + if (ec == lib::asio::error::operation_aborted || + lib::asio::is_neg(m_proxy_data->timer->expires_from_now())) + { + m_elog.write(log::elevel::devel,"write operation aborted"); + return; + } + + if (ec) { + log_err(log::elevel::info,"asio handle_proxy_write",ec); + m_proxy_data->timer->cancel(); + callback(make_error_code(error::pass_through)); + return; + } + + proxy_read(callback); + } + + void proxy_read(init_handler callback) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection proxy_read"); + } + + if (!m_proxy_data) { + m_elog.write(log::elevel::library, + "assertion failed: !m_proxy_data in asio::connection::proxy_read"); + m_proxy_data->timer->cancel(); + callback(make_error_code(error::general)); + return; + } + + if (config::enable_multithreading) { + lib::asio::async_read_until( + socket_con_type::get_next_layer(), + m_proxy_data->read_buf, + "\r\n\r\n", + m_strand->wrap(lib::bind( + &type::handle_proxy_read, get_shared(), + callback, + lib::placeholders::_1, lib::placeholders::_2 + )) + ); + } else { + lib::asio::async_read_until( + socket_con_type::get_next_layer(), + m_proxy_data->read_buf, + "\r\n\r\n", + lib::bind( + &type::handle_proxy_read, get_shared(), + callback, + lib::placeholders::_1, lib::placeholders::_2 + ) + ); + } + } + + /// Proxy read callback + /** + * @param init_handler The function to call back + * @param ec The status code + * @param bytes_transferred The number of bytes read + */ + void handle_proxy_read(init_handler callback, + lib::asio::error_code const & ec, size_t) + { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel, + "asio connection handle_proxy_read"); + } + + // Timer expired or the operation was aborted for some reason. + // Whatever aborted it will be issuing the callback so we are safe to + // return + if (ec == lib::asio::error::operation_aborted || + lib::asio::is_neg(m_proxy_data->timer->expires_from_now())) + { + m_elog.write(log::elevel::devel,"read operation aborted"); + return; + } + + // At this point there is no need to wait for the timer anymore + m_proxy_data->timer->cancel(); + + if (ec) { + m_elog.write(log::elevel::info, + "asio handle_proxy_read error: "+ec.message()); + callback(make_error_code(error::pass_through)); + } else { + if (!m_proxy_data) { + m_elog.write(log::elevel::library, + "assertion failed: !m_proxy_data in asio::connection::handle_proxy_read"); + callback(make_error_code(error::general)); + return; + } + + std::istream input(&m_proxy_data->read_buf); + + m_proxy_data->res.consume(input); + + if (!m_proxy_data->res.headers_ready()) { + // we read until the headers were done in theory but apparently + // they aren't. Internal endpoint error. + callback(make_error_code(error::general)); + return; + } + + m_alog.write(log::alevel::devel,m_proxy_data->res.raw()); + + if (m_proxy_data->res.get_status_code() != http::status_code::ok) { + // got an error response back + // TODO: expose this error in a programmatically accessible way? + // if so, see below for an option on how to do this. + std::stringstream s; + s << "Proxy connection error: " + << m_proxy_data->res.get_status_code() + << " (" + << m_proxy_data->res.get_status_msg() + << ")"; + m_elog.write(log::elevel::info,s.str()); + callback(make_error_code(error::proxy_failed)); + return; + } + + // we have successfully established a connection to the proxy, now + // we can continue and the proxy will transparently forward the + // WebSocket connection. + + // TODO: decide if we want an on_proxy callback that would allow + // access to the proxy response. + + // free the proxy buffers and req/res objects as they aren't needed + // anymore + m_proxy_data.reset(); + + // Continue with post proxy initialization + post_init(callback); + } + } + + /// read at least num_bytes bytes into buf and then call handler. + void async_read_at_least(size_t num_bytes, char *buf, size_t len, + read_handler handler) + { + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "asio async_read_at_least: " << num_bytes; + m_alog.write(log::alevel::devel,s.str()); + } + + // TODO: safety vs speed ? + // maybe move into an if devel block + /*if (num_bytes > len) { + m_elog.write(log::elevel::devel, + "asio async_read_at_least error::invalid_num_bytes"); + handler(make_error_code(transport::error::invalid_num_bytes), + size_t(0)); + return; + }*/ + + if (config::enable_multithreading) { + lib::asio::async_read( + socket_con_type::get_socket(), + lib::asio::buffer(buf,len), + lib::asio::transfer_at_least(num_bytes), + m_strand->wrap(make_custom_alloc_handler( + m_read_handler_allocator, + lib::bind( + &type::handle_async_read, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) + ); + } else { + lib::asio::async_read( + socket_con_type::get_socket(), + lib::asio::buffer(buf,len), + lib::asio::transfer_at_least(num_bytes), + make_custom_alloc_handler( + m_read_handler_allocator, + lib::bind( + &type::handle_async_read, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + ) + ); + } + + } + + void handle_async_read(read_handler handler, lib::asio::error_code const & ec, + size_t bytes_transferred) + { + m_alog.write(log::alevel::devel, "asio con handle_async_read"); + + // translate asio error codes into more lib::error_codes + lib::error_code tec; + if (ec == lib::asio::error::eof) { + tec = make_error_code(transport::error::eof); + } else if (ec) { + // We don't know much more about the error at this point. As our + // socket/security policy if it knows more: + tec = socket_con_type::translate_ec(ec); + m_tec = ec; + + if (tec == transport::error::tls_error || + tec == transport::error::pass_through) + { + // These are aggregate/catch all errors. Log some human readable + // information to the info channel to give library users some + // more details about why the upstream method may have failed. + log_err(log::elevel::info,"asio async_read_at_least",ec); + } + } + if (handler) { + handler(tec,bytes_transferred); + } else { + // This can happen in cases where the connection is terminated while + // the transport is waiting on a read. + m_alog.write(log::alevel::devel, + "handle_async_read called with null read handler"); + } + } + + /// Initiate a potentially asyncronous write of the given buffer + void async_write(const char* buf, size_t len, write_handler handler) { + m_bufs.push_back(lib::asio::buffer(buf,len)); + + if (config::enable_multithreading) { + lib::asio::async_write( + socket_con_type::get_socket(), + m_bufs, + m_strand->wrap(make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) + ); + } else { + lib::asio::async_write( + socket_con_type::get_socket(), + m_bufs, + make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + ) + ); + } + } + + /// Initiate a potentially asyncronous write of the given buffers + void async_write(std::vector<buffer> const & bufs, write_handler handler) { + std::vector<buffer>::const_iterator it; + + for (it = bufs.begin(); it != bufs.end(); ++it) { + m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len)); + } + + if (config::enable_multithreading) { + lib::asio::async_write( + socket_con_type::get_socket(), + m_bufs, + m_strand->wrap(make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) + ); + } else { + lib::asio::async_write( + socket_con_type::get_socket(), + m_bufs, + make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + ) + ); + } + } + + /// Async write callback + /** + * @param ec The status code + * @param bytes_transferred The number of bytes read + */ + void handle_async_write(write_handler handler, lib::asio::error_code const & ec, size_t) { + m_bufs.clear(); + lib::error_code tec; + if (ec) { + log_err(log::elevel::info,"asio async_write",ec); + tec = make_error_code(transport::error::pass_through); + } + if (handler) { + handler(tec); + } else { + // This can happen in cases where the connection is terminated while + // the transport is waiting on a read. + m_alog.write(log::alevel::devel, + "handle_async_write called with null write handler"); + } + } + + /// Set Connection Handle + /** + * See common/connection_hdl.hpp for information + * + * @param hdl A connection_hdl that the transport will use to refer + * to itself + */ + void set_handle(connection_hdl hdl) { + m_connection_hdl = hdl; + socket_con_type::set_handle(hdl); + } + + /// Trigger the on_interrupt handler + /** + * This needs to be thread safe + */ + lib::error_code interrupt(interrupt_handler handler) { + if (config::enable_multithreading) { + m_io_service->post(m_strand->wrap(handler)); + } else { + m_io_service->post(handler); + } + return lib::error_code(); + } + + lib::error_code dispatch(dispatch_handler handler) { + if (config::enable_multithreading) { + m_io_service->post(m_strand->wrap(handler)); + } else { + m_io_service->post(handler); + } + return lib::error_code(); + } + + /*void handle_interrupt(interrupt_handler handler) { + handler(); + }*/ + + /// close and clean up the underlying socket + void async_shutdown(shutdown_handler callback) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"asio connection async_shutdown"); + } + + timer_ptr shutdown_timer; + shutdown_timer = set_timer( + config::timeout_socket_shutdown, + lib::bind( + &type::handle_async_shutdown_timeout, + get_shared(), + shutdown_timer, + callback, + lib::placeholders::_1 + ) + ); + + socket_con_type::async_shutdown( + lib::bind( + &type::handle_async_shutdown, + get_shared(), + shutdown_timer, + callback, + lib::placeholders::_1 + ) + ); + } + + /// Async shutdown timeout handler + /** + * @param shutdown_timer A pointer to the timer to keep it in scope + * @param callback The function to call back + * @param ec The status code + */ + void handle_async_shutdown_timeout(timer_ptr, init_handler callback, + lib::error_code const & ec) + { + lib::error_code ret_ec; + + if (ec) { + if (ec == transport::error::operation_aborted) { + m_alog.write(log::alevel::devel, + "asio socket shutdown timer cancelled"); + return; + } + + log_err(log::elevel::devel,"asio handle_async_shutdown_timeout",ec); + ret_ec = ec; + } else { + ret_ec = make_error_code(transport::error::timeout); + } + + m_alog.write(log::alevel::devel, + "Asio transport socket shutdown timed out"); + cancel_socket_checked(); + callback(ret_ec); + } + + void handle_async_shutdown(timer_ptr shutdown_timer, shutdown_handler + callback, lib::asio::error_code const & ec) + { + if (ec == lib::asio::error::operation_aborted || + lib::asio::is_neg(shutdown_timer->expires_from_now())) + { + m_alog.write(log::alevel::devel,"async_shutdown cancelled"); + return; + } + + shutdown_timer->cancel(); + + lib::error_code tec; + if (ec) { + if (ec == lib::asio::error::not_connected) { + // The socket was already closed when we tried to close it. This + // happens periodically (usually if a read or write fails + // earlier and if it is a real error will be caught at another + // level of the stack. + } else { + // We don't know anything more about this error, give our + // socket/security policy a crack at it. + tec = socket_con_type::translate_ec(ec); + m_tec = ec; + + if (tec == transport::error::tls_short_read) { + // TLS short read at this point is somewhat expected if both + // sides try and end the connection at the same time or if + // SSLv2 is being used. In general there is nothing that can + // be done here other than a low level development log. + } else { + // all other errors are effectively pass through errors of + // some sort so print some detail on the info channel for + // library users to look up if needed. + log_err(log::elevel::info,"asio async_shutdown",ec); + } + } + } else { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel, + "asio con handle_async_shutdown"); + } + } + callback(tec); + } + + /// Cancel the underlying socket and log any errors + void cancel_socket_checked() { + lib::asio::error_code cec = socket_con_type::cancel_socket(); + if (cec) { + if (cec == lib::asio::error::operation_not_supported) { + // cancel not supported on this OS, ignore and log at dev level + m_alog.write(log::alevel::devel, "socket cancel not supported"); + } else { + log_err(log::elevel::warn, "socket cancel failed", cec); + } + } + } + +private: + /// Convenience method for logging the code and message for an error_code + template <typename error_type> + void log_err(log::level l, const char * msg, const error_type & ec) { + std::stringstream s; + s << msg << " error: " << ec << " (" << ec.message() << ")"; + m_elog.write(l,s.str()); + } + + // static settings + const bool m_is_server; + alog_type& m_alog; + elog_type& m_elog; + + struct proxy_data { + proxy_data() : timeout_proxy(config::timeout_proxy) {} + + request_type req; + response_type res; + std::string write_buf; + lib::asio::streambuf read_buf; + long timeout_proxy; + timer_ptr timer; + }; + + std::string m_proxy; + lib::shared_ptr<proxy_data> m_proxy_data; + + // transport resources + io_service_ptr m_io_service; + strand_ptr m_strand; + connection_hdl m_connection_hdl; + + std::vector<lib::asio::const_buffer> m_bufs; + + /// Detailed internal error code + lib::asio::error_code m_tec; + + // Handlers + tcp_init_handler m_tcp_pre_init_handler; + tcp_init_handler m_tcp_post_init_handler; + + handler_allocator m_read_handler_allocator; + handler_allocator m_write_handler_allocator; +}; + + +} // namespace asio +} // namespace transport +} // namespace websocketpp + +#endif // WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP |