diff options
author | Luca Muscariello <lumuscar+fdio@cisco.com> | 2017-02-25 23:42:31 +0100 |
---|---|---|
committer | Luca Muscariello <lumuscar+fdio@cisco.com> | 2017-02-25 23:42:31 +0100 |
commit | 05c1a838c881ea502888659848d8792843b28718 (patch) | |
tree | cf0b05b58bd725a1eb6c80325ba986c63dea42aa /websocketpp/impl | |
parent | 9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff) |
Initial commit: video player - viper
Change-Id: Id5aa33598ce34659bad4a7a9ae5006bfb84f9bd1
Signed-off-by: Luca Muscariello <lumuscar+fdio@cisco.com>
Diffstat (limited to 'websocketpp/impl')
-rw-r--r-- | websocketpp/impl/connection_impl.hpp | 2372 | ||||
-rw-r--r-- | websocketpp/impl/endpoint_impl.hpp | 269 | ||||
-rw-r--r-- | websocketpp/impl/utilities_impl.hpp | 87 |
3 files changed, 2728 insertions, 0 deletions
diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp new file mode 100644 index 00000000..d1f8dff2 --- /dev/null +++ b/websocketpp/impl/connection_impl.hpp @@ -0,0 +1,2372 @@ +/* + * Copyright (c) 2014, Peter Thorson. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the WebSocket++ Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP +#define WEBSOCKETPP_CONNECTION_IMPL_HPP + +#include <websocketpp/processors/hybi00.hpp> +#include <websocketpp/processors/hybi07.hpp> +#include <websocketpp/processors/hybi08.hpp> +#include <websocketpp/processors/hybi13.hpp> + +#include <websocketpp/processors/processor.hpp> + +#include <websocketpp/common/platforms.hpp> +#include <websocketpp/common/system_error.hpp> + +#include <algorithm> +#include <exception> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +namespace websocketpp { + +namespace istate = session::internal_state; + +template <typename config> +void connection<config>::set_termination_handler( + termination_handler new_handler) +{ + m_alog.write(log::alevel::devel, + "connection set_termination_handler"); + + //scoped_lock_type lock(m_connection_state_lock); + + m_termination_handler = new_handler; +} + +template <typename config> +std::string const & connection<config>::get_origin() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_processor->get_origin(m_request); +} + +template <typename config> +size_t connection<config>::get_buffered_amount() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_send_buffer_size; +} + +template <typename config> +session::state::value connection<config>::get_state() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_state; +} + +template <typename config> +lib::error_code connection<config>::send(std::string const & payload, + frame::opcode::value op) +{ + message_ptr msg = m_msg_manager->get_message(op,payload.size()); + msg->append_payload(payload); + msg->set_compressed(true); + + return send(msg); +} + +template <typename config> +lib::error_code connection<config>::send(void const * payload, size_t len, + frame::opcode::value op) +{ + message_ptr msg = m_msg_manager->get_message(op,len); + msg->append_payload(payload,len); + + return send(msg); +} + +template <typename config> +lib::error_code connection<config>::send(typename config::message_type::ptr msg) +{ + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection send"); + } + + { + scoped_lock_type lock(m_connection_state_lock); + if (m_state != session::state::open) { + return error::make_error_code(error::invalid_state); + } + } + + message_ptr outgoing_msg; + bool needs_writing = false; + + if (msg->get_prepared()) { + outgoing_msg = msg; + + scoped_lock_type lock(m_write_lock); + write_push(outgoing_msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); + } else { + outgoing_msg = m_msg_manager->get_message(); + + if (!outgoing_msg) { + return error::make_error_code(error::no_outgoing_buffers); + } + + scoped_lock_type lock(m_write_lock); + lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg); + + if (ec) { + return ec; + } + + write_push(outgoing_msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); + } + + if (needs_writing) { + transport_con_type::dispatch(lib::bind( + &type::write_frame, + type::get_shared() + )); + } + + return lib::error_code(); +} + +template <typename config> +void connection<config>::ping(std::string const& payload, lib::error_code& ec) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection ping"); + } + + { + scoped_lock_type lock(m_connection_state_lock); + if (m_state != session::state::open) { + std::stringstream ss; + ss << "connection::ping called from invalid state " << m_state; + m_alog.write(log::alevel::devel,ss.str()); + ec = error::make_error_code(error::invalid_state); + return; + } + } + + message_ptr msg = m_msg_manager->get_message(); + if (!msg) { + ec = error::make_error_code(error::no_outgoing_buffers); + return; + } + + ec = m_processor->prepare_ping(payload,msg); + if (ec) {return;} + + // set ping timer if we are listening for one + if (m_pong_timeout_handler) { + // Cancel any existing timers + if (m_ping_timer) { + m_ping_timer->cancel(); + } + + if (m_pong_timeout_dur > 0) { + m_ping_timer = transport_con_type::set_timer( + m_pong_timeout_dur, + lib::bind( + &type::handle_pong_timeout, + type::get_shared(), + payload, + lib::placeholders::_1 + ) + ); + } + + if (!m_ping_timer) { + // Our transport doesn't support timers + m_elog.write(log::elevel::warn,"Warning: a pong_timeout_handler is \ + set but the transport in use does not support timeouts."); + } + } + + bool needs_writing = false; + { + scoped_lock_type lock(m_write_lock); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); + } + + if (needs_writing) { + transport_con_type::dispatch(lib::bind( + &type::write_frame, + type::get_shared() + )); + } + + ec = lib::error_code(); +} + +template<typename config> +void connection<config>::ping(std::string const & payload) { + lib::error_code ec; + ping(payload,ec); + if (ec) { + throw exception(ec); + } +} + +template<typename config> +void connection<config>::handle_pong_timeout(std::string payload, + lib::error_code const & ec) +{ + if (ec) { + if (ec == transport::error::operation_aborted) { + // ignore, this is expected + return; + } + + m_elog.write(log::elevel::devel,"pong_timeout error: "+ec.message()); + return; + } + + if (m_pong_timeout_handler) { + m_pong_timeout_handler(m_connection_hdl,payload); + } +} + +template <typename config> +void connection<config>::pong(std::string const& payload, lib::error_code& ec) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection pong"); + } + + { + scoped_lock_type lock(m_connection_state_lock); + if (m_state != session::state::open) { + std::stringstream ss; + ss << "connection::pong called from invalid state " << m_state; + m_alog.write(log::alevel::devel,ss.str()); + ec = error::make_error_code(error::invalid_state); + return; + } + } + + message_ptr msg = m_msg_manager->get_message(); + if (!msg) { + ec = error::make_error_code(error::no_outgoing_buffers); + return; + } + + ec = m_processor->prepare_pong(payload,msg); + if (ec) {return;} + + bool needs_writing = false; + { + scoped_lock_type lock(m_write_lock); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); + } + + if (needs_writing) { + transport_con_type::dispatch(lib::bind( + &type::write_frame, + type::get_shared() + )); + } + + ec = lib::error_code(); +} + +template<typename config> +void connection<config>::pong(std::string const & payload) { + lib::error_code ec; + pong(payload,ec); + if (ec) { + throw exception(ec); + } +} + +template <typename config> +void connection<config>::close(close::status::value const code, + std::string const & reason, lib::error_code & ec) +{ + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection close"); + } + + // Truncate reason to maximum size allowable in a close frame. + std::string tr(reason,0,std::min<size_t>(reason.size(), + frame::limits::close_reason_size)); + + scoped_lock_type lock(m_connection_state_lock); + + if (m_state != session::state::open) { + ec = error::make_error_code(error::invalid_state); + return; + } + + ec = this->send_close_frame(code,tr,false,close::status::terminal(code)); +} + +template<typename config> +void connection<config>::close(close::status::value const code, + std::string const & reason) +{ + lib::error_code ec; + close(code,reason,ec); + if (ec) { + throw exception(ec); + } +} + +/// Trigger the on_interrupt handler +/** + * This is thread safe if the transport is thread safe + */ +template <typename config> +lib::error_code connection<config>::interrupt() { + m_alog.write(log::alevel::devel,"connection connection::interrupt"); + return transport_con_type::interrupt( + lib::bind( + &type::handle_interrupt, + type::get_shared() + ) + ); +} + + +template <typename config> +void connection<config>::handle_interrupt() { + if (m_interrupt_handler) { + m_interrupt_handler(m_connection_hdl); + } +} + +template <typename config> +lib::error_code connection<config>::pause_reading() { + m_alog.write(log::alevel::devel,"connection connection::pause_reading"); + return transport_con_type::dispatch( + lib::bind( + &type::handle_pause_reading, + type::get_shared() + ) + ); +} + +/// Pause reading handler. Not safe to call directly +template <typename config> +void connection<config>::handle_pause_reading() { + m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading"); + m_read_flag = false; +} + +template <typename config> +lib::error_code connection<config>::resume_reading() { + m_alog.write(log::alevel::devel,"connection connection::resume_reading"); + return transport_con_type::dispatch( + lib::bind( + &type::handle_resume_reading, + type::get_shared() + ) + ); +} + +/// Resume reading helper method. Not safe to call directly +template <typename config> +void connection<config>::handle_resume_reading() { + m_read_flag = true; + read_frame(); +} + + + + + + + + + + + +template <typename config> +bool connection<config>::get_secure() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_uri->get_secure(); +} + +template <typename config> +std::string const & connection<config>::get_host() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_uri->get_host(); +} + +template <typename config> +std::string const & connection<config>::get_resource() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_uri->get_resource(); +} + +template <typename config> +uint16_t connection<config>::get_port() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_uri->get_port(); +} + +template <typename config> +uri_ptr connection<config>::get_uri() const { + //scoped_lock_type lock(m_connection_state_lock); + return m_uri; +} + +template <typename config> +void connection<config>::set_uri(uri_ptr uri) { + //scoped_lock_type lock(m_connection_state_lock); + m_uri = uri; +} + + + + + + +template <typename config> +std::string const & connection<config>::get_subprotocol() const { + return m_subprotocol; +} + +template <typename config> +std::vector<std::string> const & +connection<config>::get_requested_subprotocols() const { + return m_requested_subprotocols; +} + +template <typename config> +void connection<config>::add_subprotocol(std::string const & value, + lib::error_code & ec) +{ + if (m_is_server) { + ec = error::make_error_code(error::client_only); + return; + } + + // If the value is empty or has a non-RFC2616 token character it is invalid. + if (value.empty() || std::find_if(value.begin(),value.end(), + http::is_not_token_char) != value.end()) + { + ec = error::make_error_code(error::invalid_subprotocol); + return; + } + + m_requested_subprotocols.push_back(value); +} + +template <typename config> +void connection<config>::add_subprotocol(std::string const & value) { + lib::error_code ec; + this->add_subprotocol(value,ec); + if (ec) { + throw exception(ec); + } +} + + +template <typename config> +void connection<config>::select_subprotocol(std::string const & value, + lib::error_code & ec) +{ + if (!m_is_server) { + ec = error::make_error_code(error::server_only); + return; + } + + if (value.empty()) { + ec = lib::error_code(); + return; + } + + std::vector<std::string>::iterator it; + + it = std::find(m_requested_subprotocols.begin(), + m_requested_subprotocols.end(), + value); + + if (it == m_requested_subprotocols.end()) { + ec = error::make_error_code(error::unrequested_subprotocol); + return; + } + + m_subprotocol = value; +} + +template <typename config> +void connection<config>::select_subprotocol(std::string const & value) { + lib::error_code ec; + this->select_subprotocol(value,ec); + if (ec) { + throw exception(ec); + } +} + + +template <typename config> +std::string const & +connection<config>::get_request_header(std::string const & key) const { + return m_request.get_header(key); +} + +template <typename config> +std::string const & +connection<config>::get_request_body() const { + return m_request.get_body(); +} + +template <typename config> +std::string const & +connection<config>::get_response_header(std::string const & key) const { + return m_response.get_header(key); +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::set_status(http::status_code::value code) +{ + if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { + throw exception("Call to set_status from invalid state", + error::make_error_code(error::invalid_state)); + } + m_response.set_status(code); +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::set_status(http::status_code::value code, + std::string const & msg) +{ + if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { + throw exception("Call to set_status from invalid state", + error::make_error_code(error::invalid_state)); + } + + m_response.set_status(code,msg); +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::set_body(std::string const & value) { + if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { + throw exception("Call to set_status from invalid state", + error::make_error_code(error::invalid_state)); + } + + m_response.set_body(value); +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::append_header(std::string const & key, + std::string const & val) +{ + if (m_is_server) { + if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { + // we are setting response headers for an incoming server connection + m_response.append_header(key,val); + } else { + throw exception("Call to append_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } else { + if (m_internal_state == istate::USER_INIT) { + // we are setting initial headers for an outgoing client connection + m_request.append_header(key,val); + } else { + throw exception("Call to append_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::replace_header(std::string const & key, + std::string const & val) +{ + if (m_is_server) { + if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { + // we are setting response headers for an incoming server connection + m_response.replace_header(key,val); + } else { + throw exception("Call to replace_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } else { + if (m_internal_state == istate::USER_INIT) { + // we are setting initial headers for an outgoing client connection + m_request.replace_header(key,val); + } else { + throw exception("Call to replace_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } +} + +// TODO: EXCEPTION_FREE +template <typename config> +void connection<config>::remove_header(std::string const & key) +{ + if (m_is_server) { + if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { + // we are setting response headers for an incoming server connection + m_response.remove_header(key); + } else { + throw exception("Call to remove_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } else { + if (m_internal_state == istate::USER_INIT) { + // we are setting initial headers for an outgoing client connection + m_request.remove_header(key); + } else { + throw exception("Call to remove_header from invalid state", + error::make_error_code(error::invalid_state)); + } + } +} + +/// Defer HTTP Response until later +/** + * Used in the http handler to defer the HTTP response for this connection + * until later. Handshake timers will be canceled and the connection will be + * left open until `send_http_response` or an equivalent is called. + * + * Warning: deferred connections won't time out and as a result can tie up + * resources. + * + * @return A status code, zero on success, non-zero otherwise + */ +template <typename config> +lib::error_code connection<config>::defer_http_response() { + // Cancel handshake timer, otherwise the connection will time out and we'll + // close the connection before the app has a chance to send a response. + if (m_handshake_timer) { + m_handshake_timer->cancel(); + m_handshake_timer.reset(); + } + + // Do something to signal deferral + m_http_state = session::http_state::deferred; + + return lib::error_code(); +} + +/// Send deferred HTTP Response (exception free) +/** + * Sends an http response to an HTTP connection that was deferred. This will + * send a complete response including all headers, status line, and body + * text. The connection will be closed afterwards. + * + * @since 0.6.0 + * + * @param ec A status code, zero on success, non-zero otherwise + */ +template <typename config> +void connection<config>::send_http_response(lib::error_code & ec) { + { + scoped_lock_type lock(m_connection_state_lock); + if (m_http_state != session::http_state::deferred) { + ec = error::make_error_code(error::invalid_state); + return; + } + + m_http_state = session::http_state::body_written; + } + + this->write_http_response(lib::error_code()); + ec = lib::error_code(); +} + +template <typename config> +void connection<config>::send_http_response() { + lib::error_code ec; + this->send_http_response(ec); + if (ec) { + throw exception(ec); + } +} + + + + +/******** logic thread ********/ + +template <typename config> +void connection<config>::start() { + m_alog.write(log::alevel::devel,"connection start"); + + if (m_internal_state != istate::USER_INIT) { + m_alog.write(log::alevel::devel,"Start called in invalid state"); + this->terminate(error::make_error_code(error::invalid_state)); + return; + } + + m_internal_state = istate::TRANSPORT_INIT; + + // Depending on how the transport implements init this function may return + // immediately and call handle_transport_init later or call + // handle_transport_init from this function. + transport_con_type::init( + lib::bind( + &type::handle_transport_init, + type::get_shared(), + lib::placeholders::_1 + ) + ); +} + +template <typename config> +void connection<config>::handle_transport_init(lib::error_code const & ec) { + m_alog.write(log::alevel::devel,"connection handle_transport_init"); + + lib::error_code ecm = ec; + + if (m_internal_state != istate::TRANSPORT_INIT) { + m_alog.write(log::alevel::devel, + "handle_transport_init must be called from transport init state"); + ecm = error::make_error_code(error::invalid_state); + } + + if (ecm) { + std::stringstream s; + s << "handle_transport_init received error: "<< ecm.message(); + m_elog.write(log::elevel::rerror,s.str()); + + this->terminate(ecm); + return; + } + + // At this point the transport is ready to read and write bytes. + if (m_is_server) { + m_internal_state = istate::READ_HTTP_REQUEST; + this->read_handshake(1); + } else { + // We are a client. Set the processor to the version specified in the + // config file and send a handshake request. + m_internal_state = istate::WRITE_HTTP_REQUEST; + m_processor = get_processor(config::client_version); + this->send_http_request(); + } +} + +template <typename config> +void connection<config>::read_handshake(size_t num_bytes) { + m_alog.write(log::alevel::devel,"connection read_handshake"); + + if (m_open_handshake_timeout_dur > 0) { + m_handshake_timer = transport_con_type::set_timer( + m_open_handshake_timeout_dur, + lib::bind( + &type::handle_open_handshake_timeout, + type::get_shared(), + lib::placeholders::_1 + ) + ); + } + + transport_con_type::async_read_at_least( + num_bytes, + m_buf, + config::connection_read_buffer_size, + lib::bind( + &type::handle_read_handshake, + type::get_shared(), + lib::placeholders::_1, + lib::placeholders::_2 + ) + ); +} + +// All exit paths for this function need to call write_http_response() or submit +// a new read request with this function as the handler. +template <typename config> +void connection<config>::handle_read_handshake(lib::error_code const & ec, + size_t bytes_transferred) +{ + m_alog.write(log::alevel::devel,"connection handle_read_handshake"); + + lib::error_code ecm = ec; + + if (!ecm) { + scoped_lock_type lock(m_connection_state_lock); + + if (m_state == session::state::connecting) { + if (m_internal_state != istate::READ_HTTP_REQUEST) { + ecm = error::make_error_code(error::invalid_state); + } + } else if (m_state == session::state::closed) { + // The connection was canceled while the response was being sent, + // usually by the handshake timer. This is basically expected + // (though hopefully rare) and there is nothing we can do so ignore. + m_alog.write(log::alevel::devel, + "handle_read_handshake invoked after connection was closed"); + return; + } else { + ecm = error::make_error_code(error::invalid_state); + } + } + + if (ecm) { + if (ecm == transport::error::eof && m_state == session::state::closed) { + // we expect to get eof if the connection is closed already + m_alog.write(log::alevel::devel, + "got (expected) eof/state error from closed con"); + return; + } + + log_err(log::elevel::rerror,"handle_read_handshake",ecm); + this->terminate(ecm); + return; + } + + // Boundaries checking. TODO: How much of this should be done? + if (bytes_transferred > config::connection_read_buffer_size) { + m_elog.write(log::elevel::fatal,"Fatal boundaries checking error."); + this->terminate(make_error_code(error::general)); + return; + } + + size_t bytes_processed = 0; + try { + bytes_processed = m_request.consume(m_buf,bytes_transferred); + } catch (http::exception &e) { + // All HTTP exceptions will result in this request failing and an error + // response being returned. No more bytes will be read in this con. + m_response.set_status(e.m_error_code,e.m_error_msg); + this->write_http_response_error(error::make_error_code(error::http_parse_error)); + return; + } + + // More paranoid boundaries checking. + // TODO: Is this overkill? + if (bytes_processed > bytes_transferred) { + m_elog.write(log::elevel::fatal,"Fatal boundaries checking error."); + this->terminate(make_error_code(error::general)); + return; + } + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "bytes_transferred: " << bytes_transferred + << " bytes, bytes processed: " << bytes_processed << " bytes"; + m_alog.write(log::alevel::devel,s.str()); + } + + if (m_request.ready()) { + lib::error_code processor_ec = this->initialize_processor(); + if (processor_ec) { + this->write_http_response_error(processor_ec); + return; + } + + if (m_processor && m_processor->get_version() == 0) { + // Version 00 has an extra requirement to read some bytes after the + // handshake + if (bytes_transferred-bytes_processed >= 8) { + m_request.replace_header( + "Sec-WebSocket-Key3", + std::string(m_buf+bytes_processed,m_buf+bytes_processed+8) + ); + bytes_processed += 8; + } else { + // TODO: need more bytes + m_alog.write(log::alevel::devel,"short key3 read"); + m_response.set_status(http::status_code::internal_server_error); + this->write_http_response_error(processor::error::make_error_code(processor::error::short_key3)); + return; + } + } + + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,m_request.raw()); + if (!m_request.get_header("Sec-WebSocket-Key3").empty()) { + m_alog.write(log::alevel::devel, + utility::to_hex(m_request.get_header("Sec-WebSocket-Key3"))); + } + } + + // The remaining bytes in m_buf are frame data. Copy them to the + // beginning of the buffer and note the length. They will be read after + // the handshake completes and before more bytes are read. + std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf); + m_buf_cursor = bytes_transferred-bytes_processed; + + + m_internal_state = istate::PROCESS_HTTP_REQUEST; + + // We have the complete request. Process it. + lib::error_code handshake_ec = this->process_handshake_request(); + + // Write a response if this is a websocket connection or if it is an + // HTTP connection for which the response has not been deferred or + // started yet by a different system (i.e. still in init state). + if (!m_is_http || m_http_state == session::http_state::init) { + this->write_http_response(handshake_ec); + } + } else { + // read at least 1 more byte + transport_con_type::async_read_at_least( + 1, + m_buf, + config::connection_read_buffer_size, + lib::bind( + &type::handle_read_handshake, + type::get_shared(), + lib::placeholders::_1, + lib::placeholders::_2 + ) + ); + } +} + +// write_http_response requires the request to be fully read and the connection +// to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors +// before the request is fully read (specifically at a point where we aren't +// sure if the hybi00 key3 bytes need to be read). This method sets the correct +// state and calls write_http_response +template <typename config> +void connection<config>::write_http_response_error(lib::error_code const & ec) { + if (m_internal_state != istate::READ_HTTP_REQUEST) { + m_alog.write(log::alevel::devel, + "write_http_response_error called in invalid state"); + this->terminate(error::make_error_code(error::invalid_state)); + return; + } + + m_internal_state = istate::PROCESS_HTTP_REQUEST; + + this->write_http_response(ec); +} + +// All exit paths for this function need to call write_http_response() or submit +// a new read request with this function as the handler. +template <typename config> +void connection<config>::handle_read_frame(lib::error_code const & ec, + size_t bytes_transferred) +{ + //m_alog.write(log::alevel::devel,"connection handle_read_frame"); + + lib::error_code ecm = ec; + + if (!ecm && m_internal_state != istate::PROCESS_CONNECTION) { + ecm = error::make_error_code(error::invalid_state); + } + + if (ecm) { + log::level echannel = log::elevel::rerror; + + if (ecm == transport::error::eof) { + if (m_state == session::state::closed) { + // we expect to get eof if the connection is closed already + // just ignore it + m_alog.write(log::alevel::devel,"got eof from closed con"); + return; + } else if (m_state == session::state::closing && !m_is_server) { + // If we are a client we expect to get eof in the closing state, + // this is a signal to terminate our end of the connection after + // the closing handshake + terminate(lib::error_code()); + return; + } + } else if (ecm == error::invalid_state) { + // In general, invalid state errors in the closed state are the + // result of handlers that were in the system already when the state + // changed and should be ignored as they pose no problems and there + // is nothing useful that we can do about them. + if (m_state == session::state::closed) { + m_alog.write(log::alevel::devel, + "handle_read_frame: got invalid istate in closed state"); + return; + } + } else if (ecm == transport::error::tls_short_read) { + if (m_state == session::state::closed) { + // We expect to get a TLS short read if we try to read after the + // connection is closed. If this happens ignore and exit the + // read frame path. + terminate(lib::error_code()); + return; + } + echannel = log::elevel::rerror; + } else if (ecm == transport::error::action_after_shutdown) { + echannel = log::elevel::info; + } + + log_err(echannel, "handle_read_frame", ecm); + this->terminate(ecm); + return; + } + + // Boundaries checking. TODO: How much of this should be done? + /*if (bytes_transferred > config::connection_read_buffer_size) { + m_elog.write(log::elevel::fatal,"Fatal boundaries checking error"); + this->terminate(make_error_code(error::general)); + return; + }*/ + + size_t p = 0; + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "p = " << p << " bytes transferred = " << bytes_transferred; + m_alog.write(log::alevel::devel,s.str()); + } + + while (p < bytes_transferred) { + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "calling consume with " << bytes_transferred-p << " bytes"; + m_alog.write(log::alevel::devel,s.str()); + } + + lib::error_code consume_ec; + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "Processing Bytes: " << utility::to_hex(reinterpret_cast<uint8_t*>(m_buf)+p,bytes_transferred-p); + m_alog.write(log::alevel::devel,s.str()); + } + + p += m_processor->consume( + reinterpret_cast<uint8_t*>(m_buf)+p, + bytes_transferred-p, + consume_ec + ); + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "bytes left after consume: " << bytes_transferred-p; + m_alog.write(log::alevel::devel,s.str()); + } + if (consume_ec) { + log_err(log::elevel::rerror, "consume", consume_ec); + + if (config::drop_on_protocol_error) { + this->terminate(consume_ec); + return; + } else { + lib::error_code close_ec; + this->close( + processor::error::to_ws(consume_ec), + consume_ec.message(), + close_ec + ); + + if (close_ec) { + log_err(log::elevel::fatal, "Protocol error close frame ", close_ec); + this->terminate(close_ec); + return; + } + } + return; + } + + if (m_processor->ready()) { + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "Complete message received. Dispatching"; + m_alog.write(log::alevel::devel,s.str()); + } + + message_ptr msg = m_processor->get_message(); + + if (!msg) { + m_alog.write(log::alevel::devel, "null message from m_processor"); + } else if (!is_control(msg->get_opcode())) { + // data message, dispatch to user + if (m_state != session::state::open) { + m_elog.write(log::elevel::warn, "got non-close frame while closing"); + } else if (m_message_handler) { + m_message_handler(m_connection_hdl, msg); + } + } else { + process_control_frame(msg); + } + } + } + + read_frame(); +} + +/// Issue a new transport read unless reading is paused. +template <typename config> +void connection<config>::read_frame() { + if (!m_read_flag) { + return; + } + + transport_con_type::async_read_at_least( + // std::min wont work with undefined static const values. + // TODO: is there a more elegant way to do this? + // Need to determine if requesting 1 byte or the exact number of bytes + // is better here. 1 byte lets us be a bit more responsive at a + // potential expense of additional runs through handle_read_frame + /*(m_processor->get_bytes_needed() > config::connection_read_buffer_size ? + config::connection_read_buffer_size : m_processor->get_bytes_needed())*/ + 1, + m_buf, + config::connection_read_buffer_size, + m_handle_read_frame + ); +} + +template <typename config> +lib::error_code connection<config>::initialize_processor() { + m_alog.write(log::alevel::devel,"initialize_processor"); + + // if it isn't a websocket handshake nothing to do. + if (!processor::is_websocket_handshake(m_request)) { + return lib::error_code(); + } + + int version = processor::get_websocket_version(m_request); + + if (version < 0) { + m_alog.write(log::alevel::devel, "BAD REQUEST: can't determine version"); + m_response.set_status(http::status_code::bad_request); + return error::make_error_code(error::invalid_version); + } + + m_processor = get_processor(version); + + // if the processor is not null we are done + if (m_processor) { + return lib::error_code(); + } + + // We don't have a processor for this version. Return bad request + // with Sec-WebSocket-Version header filled with values we do accept + m_alog.write(log::alevel::devel, "BAD REQUEST: no processor for version"); + m_response.set_status(http::status_code::bad_request); + + std::stringstream ss; + std::string sep; + std::vector<int>::const_iterator it; + for (it = versions_supported.begin(); it != versions_supported.end(); it++) + { + ss << sep << *it; + sep = ","; + } + + m_response.replace_header("Sec-WebSocket-Version",ss.str()); + return error::make_error_code(error::unsupported_version); +} + +template <typename config> +lib::error_code connection<config>::process_handshake_request() { + m_alog.write(log::alevel::devel,"process handshake request"); + + if (!processor::is_websocket_handshake(m_request)) { + // this is not a websocket handshake. Process as plain HTTP + m_alog.write(log::alevel::devel,"HTTP REQUEST"); + + // extract URI from request + m_uri = processor::get_uri_from_host( + m_request, + (transport_con_type::is_secure() ? "https" : "http") + ); + + if (!m_uri->get_valid()) { + m_alog.write(log::alevel::devel, "Bad request: failed to parse uri"); + m_response.set_status(http::status_code::bad_request); + return error::make_error_code(error::invalid_uri); + } + + if (m_http_handler) { + m_is_http = true; + m_http_handler(m_connection_hdl); + + if (m_state == session::state::closed) { + return error::make_error_code(error::http_connection_ended); + } + } else { + set_status(http::status_code::upgrade_required); + return error::make_error_code(error::upgrade_required); + } + + return lib::error_code(); + } + + lib::error_code ec = m_processor->validate_handshake(m_request); + + // Validate: make sure all required elements are present. + if (ec){ + // Not a valid handshake request + m_alog.write(log::alevel::devel, "Bad request " + ec.message()); + m_response.set_status(http::status_code::bad_request); + return ec; + } + + // Read extension parameters and set up values necessary for the end user + // to complete extension negotiation. + std::pair<lib::error_code,std::string> neg_results; + neg_results = m_processor->negotiate_extensions(m_request); + + if (neg_results.first) { + // There was a fatal error in extension parsing that should result in + // a failed connection attempt. + m_alog.write(log::alevel::devel, "Bad request: " + neg_results.first.message()); + m_response.set_status(http::status_code::bad_request); + return neg_results.first; + } else { + // extension negotiation succeeded, set response header accordingly + // we don't send an empty extensions header because it breaks many + // clients. + if (neg_results.second.size() > 0) { + m_response.replace_header("Sec-WebSocket-Extensions", + neg_results.second); + } + } + + // extract URI from request + m_uri = m_processor->get_uri(m_request); + + + if (!m_uri->get_valid()) { + m_alog.write(log::alevel::devel, "Bad request: failed to parse uri"); + m_response.set_status(http::status_code::bad_request); + return error::make_error_code(error::invalid_uri); + } + + // extract subprotocols + lib::error_code subp_ec = m_processor->extract_subprotocols(m_request, + m_requested_subprotocols); + + if (subp_ec) { + // should we do anything? + } + + // Ask application to validate the connection + if (!m_validate_handler || m_validate_handler(m_connection_hdl)) { + m_response.set_status(http::status_code::switching_protocols); + + // Write the appropriate response headers based on request and + // processor version + ec = m_processor->process_handshake(m_request,m_subprotocol,m_response); + + if (ec) { + std::stringstream s; + s << "Processing error: " << ec << "(" << ec.message() << ")"; + m_alog.write(log::alevel::devel, s.str()); + + m_response.set_status(http::status_code::internal_server_error); + return ec; + } + } else { + // User application has rejected the handshake + m_alog.write(log::alevel::devel, "USER REJECT"); + + // Use Bad Request if the user handler did not provide a more + // specific http response error code. + // TODO: is there a better default? + if (m_response.get_status_code() == http::status_code::uninitialized) { + m_response.set_status(http::status_code::bad_request); + } + + return error::make_error_code(error::rejected); + } + + return lib::error_code(); +} + +template <typename config> +void connection<config>::write_http_response(lib::error_code const & ec) { + m_alog.write(log::alevel::devel,"connection write_http_response"); + + if (ec == error::make_error_code(error::http_connection_ended)) { + m_alog.write(log::alevel::http,"An HTTP handler took over the connection."); + return; + } + + if (m_response.get_status_code() == http::status_code::uninitialized) { + m_response.set_status(http::status_code::internal_server_error); + m_ec = error::make_error_code(error::general); + } else { + m_ec = ec; + } + + m_response.set_version("HTTP/1.1"); + + // Set server header based on the user agent settings + if (m_response.get_header("Server").empty()) { + if (!m_user_agent.empty()) { + m_response.replace_header("Server",m_user_agent); + } else { + m_response.remove_header("Server"); + } + } + + // have the processor generate the raw bytes for the wire (if it exists) + if (m_processor) { + m_handshake_buffer = m_processor->get_raw(m_response); + } else { + // a processor wont exist for raw HTTP responses. + m_handshake_buffer = m_response.raw(); + } + + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"Raw Handshake response:\n"+m_handshake_buffer); + if (!m_response.get_header("Sec-WebSocket-Key3").empty()) { + m_alog.write(log::alevel::devel, + utility::to_hex(m_response.get_header("Sec-WebSocket-Key3"))); + } + } + + // write raw bytes + transport_con_type::async_write( + m_handshake_buffer.data(), + m_handshake_buffer.size(), + lib::bind( + &type::handle_write_http_response, + type::get_shared(), + lib::placeholders::_1 + ) + ); +} + +template <typename config> +void connection<config>::handle_write_http_response(lib::error_code const & ec) { + m_alog.write(log::alevel::devel,"handle_write_http_response"); + + lib::error_code ecm = ec; + + if (!ecm) { + scoped_lock_type lock(m_connection_state_lock); + + if (m_state == session::state::connecting) { + if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { + ecm = error::make_error_code(error::invalid_state); + } + } else if (m_state == session::state::closed) { + // The connection was canceled while the response was being sent, + // usually by the handshake timer. This is basically expected + // (though hopefully rare) and there is nothing we can do so ignore. + m_alog.write(log::alevel::devel, + "handle_write_http_response invoked after connection was closed"); + return; + } else { + ecm = error::make_error_code(error::invalid_state); + } + } + + if (ecm) { + if (ecm == transport::error::eof && m_state == session::state::closed) { + // we expect to get eof if the connection is closed already + m_alog.write(log::alevel::devel, + "got (expected) eof/state error from closed con"); + return; + } + + log_err(log::elevel::rerror,"handle_write_http_response",ecm); + this->terminate(ecm); + return; + } + + if (m_handshake_timer) { + m_handshake_timer->cancel(); + m_handshake_timer.reset(); + } + + if (m_response.get_status_code() != http::status_code::switching_protocols) + { + /*if (m_processor || m_ec == error::http_parse_error || + m_ec == error::invalid_version || m_ec == error::unsupported_version + || m_ec == error::upgrade_required) + {*/ + if (!m_is_http) { + std::stringstream s; + s << "Handshake ended with HTTP error: " + << m_response.get_status_code(); + m_elog.write(log::elevel::rerror,s.str()); + } else { + // if this was not a websocket connection, we have written + // the expected response and the connection can be closed. + + this->log_http_result(); + + if (m_ec) { + m_alog.write(log::alevel::devel, + "got to writing HTTP results with m_ec set: "+m_ec.message()); + } + m_ec = make_error_code(error::http_connection_ended); + } + + this->terminate(m_ec); + return; + } + + this->log_open_result(); + + m_internal_state = istate::PROCESS_CONNECTION; + m_state = session::state::open; + + if (m_open_handler) { + m_open_handler(m_connection_hdl); + } + + this->handle_read_frame(lib::error_code(), m_buf_cursor); +} + +template <typename config> +void connection<config>::send_http_request() { + m_alog.write(log::alevel::devel,"connection send_http_request"); + + // TODO: origin header? + + // Have the protocol processor fill in the appropriate fields based on the + // selected client version + if (m_processor) { + lib::error_code ec; + ec = m_processor->client_handshake_request(m_request,m_uri, + m_requested_subprotocols); + + if (ec) { + log_err(log::elevel::fatal,"Internal library error: Processor",ec); + return; + } + } else { + m_elog.write(log::elevel::fatal,"Internal library error: missing processor"); + return; + } + + // Unless the user has overridden the user agent, send generic WS++ UA. + if (m_request.get_header("User-Agent").empty()) { + if (!m_user_agent.empty()) { + m_request.replace_header("User-Agent",m_user_agent); + } else { + m_request.remove_header("User-Agent"); + } + } + + m_handshake_buffer = m_request.raw(); + + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"Raw Handshake request:\n"+m_handshake_buffer); + } + + if (m_open_handshake_timeout_dur > 0) { + m_handshake_timer = transport_con_type::set_timer( + m_open_handshake_timeout_dur, + lib::bind( + &type::handle_open_handshake_timeout, + type::get_shared(), + lib::placeholders::_1 + ) + ); + } + + transport_con_type::async_write( + m_handshake_buffer.data(), + m_handshake_buffer.size(), + lib::bind( + &type::handle_send_http_request, + type::get_shared(), + lib::placeholders::_1 + ) + ); +} + +template <typename config> +void connection<config>::handle_send_http_request(lib::error_code const & ec) { + m_alog.write(log::alevel::devel,"handle_send_http_request"); + + lib::error_code ecm = ec; + + if (!ecm) { + scoped_lock_type lock(m_connection_state_lock); + + if (m_state == session::state::connecting) { + if (m_internal_state != istate::WRITE_HTTP_REQUEST) { + ecm = error::make_error_code(error::invalid_state); + } else { + m_internal_state = istate::READ_HTTP_RESPONSE; + } + } else if (m_state == session::state::closed) { + // The connection was canceled while the response was being sent, + // usually by the handshake timer. This is basically expected + // (though hopefully rare) and there is nothing we can do so ignore. + m_alog.write(log::alevel::devel, + "handle_send_http_request invoked after connection was closed"); + return; + } else { + ecm = error::make_error_code(error::invalid_state); + } + } + + if (ecm) { + if (ecm == transport::error::eof && m_state == session::state::closed) { + // we expect to get eof if the connection is closed already + m_alog.write(log::alevel::devel, + "got (expected) eof/state error from closed con"); + return; + } + + log_err(log::elevel::rerror,"handle_send_http_request",ecm); + this->terminate(ecm); + return; + } + + transport_con_type::async_read_at_least( + 1, + m_buf, + config::connection_read_buffer_size, + lib::bind( + &type::handle_read_http_response, + type::get_shared(), + lib::placeholders::_1, + lib::placeholders::_2 + ) + ); +} + +template <typename config> +void connection<config>::handle_read_http_response(lib::error_code const & ec, + size_t bytes_transferred) +{ + m_alog.write(log::alevel::devel,"handle_read_http_response"); + + lib::error_code ecm = ec; + + if (!ecm) { + scoped_lock_type lock(m_connection_state_lock); + + if (m_state == session::state::connecting) { + if (m_internal_state != istate::READ_HTTP_RESPONSE) { + ecm = error::make_error_code(error::invalid_state); + } + } else if (m_state == session::state::closed) { + // The connection was canceled while the response was being sent, + // usually by the handshake timer. This is basically expected + // (though hopefully rare) and there is nothing we can do so ignore. + m_alog.write(log::alevel::devel, + "handle_read_http_response invoked after connection was closed"); + return; + } else { + ecm = error::make_error_code(error::invalid_state); + } + } + + if (ecm) { + if (ecm == transport::error::eof && m_state == session::state::closed) { + // we expect to get eof if the connection is closed already + m_alog.write(log::alevel::devel, + "got (expected) eof/state error from closed con"); + return; + } + + log_err(log::elevel::rerror,"handle_read_http_response",ecm); + this->terminate(ecm); + return; + } + + size_t bytes_processed = 0; + // TODO: refactor this to use error codes rather than exceptions + try { + bytes_processed = m_response.consume(m_buf,bytes_transferred); + } catch (http::exception & e) { + m_elog.write(log::elevel::rerror, + std::string("error in handle_read_http_response: ")+e.what()); + this->terminate(make_error_code(error::general)); + return; + } + + m_alog.write(log::alevel::devel,std::string("Raw response: ")+m_response.raw()); + + if (m_response.headers_ready()) { + if (m_handshake_timer) { + m_handshake_timer->cancel(); + m_handshake_timer.reset(); + } + + lib::error_code validate_ec = m_processor->validate_server_handshake_response( + m_request, + m_response + ); + if (validate_ec) { + log_err(log::elevel::rerror,"Server handshake response",validate_ec); + this->terminate(validate_ec); + return; + } + + // Read extension parameters and set up values necessary for the end + // user to complete extension negotiation. + std::pair<lib::error_code,std::string> neg_results; + neg_results = m_processor->negotiate_extensions(m_response); + + if (neg_results.first) { + // There was a fatal error in extension negotiation. For the moment + // kill all connections that fail extension negotiation. + + // TODO: deal with cases where the response is well formed but + // doesn't match the options requested by the client. Its possible + // that the best behavior in this cases is to log and continue with + // an unextended connection. + m_alog.write(log::alevel::devel, "Extension negotiation failed: " + + neg_results.first.message()); + this->terminate(make_error_code(error::extension_neg_failed)); + // TODO: close connection with reason 1010 (and list extensions) + } + + // response is valid, connection can now be assumed to be open + m_internal_state = istate::PROCESS_CONNECTION; + m_state = session::state::open; + + this->log_open_result(); + + if (m_open_handler) { + m_open_handler(m_connection_hdl); + } + + // The remaining bytes in m_buf are frame data. Copy them to the + // beginning of the buffer and note the length. They will be read after + // the handshake completes and before more bytes are read. + std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf); + m_buf_cursor = bytes_transferred-bytes_processed; + + this->handle_read_frame(lib::error_code(), m_buf_cursor); + } else { + transport_con_type::async_read_at_least( + 1, + m_buf, + config::connection_read_buffer_size, + lib::bind( + &type::handle_read_http_response, + type::get_shared(), + lib::placeholders::_1, + lib::placeholders::_2 + ) + ); + } +} + +template <typename config> +void connection<config>::handle_open_handshake_timeout( + lib::error_code const & ec) +{ + if (ec == transport::error::operation_aborted) { + m_alog.write(log::alevel::devel,"open handshake timer cancelled"); + } else if (ec) { + m_alog.write(log::alevel::devel, + "open handle_open_handshake_timeout error: "+ec.message()); + // TODO: ignore or fail here? + } else { + m_alog.write(log::alevel::devel,"open handshake timer expired"); + terminate(make_error_code(error::open_handshake_timeout)); + } +} + +template <typename config> +void connection<config>::handle_close_handshake_timeout( + lib::error_code const & ec) +{ + if (ec == transport::error::operation_aborted) { + m_alog.write(log::alevel::devel,"asio close handshake timer cancelled"); + } else if (ec) { + m_alog.write(log::alevel::devel, + "asio open handle_close_handshake_timeout error: "+ec.message()); + // TODO: ignore or fail here? + } else { + m_alog.write(log::alevel::devel, "asio close handshake timer expired"); + terminate(make_error_code(error::close_handshake_timeout)); + } +} + +template <typename config> +void connection<config>::terminate(lib::error_code const & ec) { + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection terminate"); + } + + // Cancel close handshake timer + if (m_handshake_timer) { + m_handshake_timer->cancel(); + m_handshake_timer.reset(); + } + + terminate_status tstat = unknown; + if (ec) { + m_ec = ec; + m_local_close_code = close::status::abnormal_close; + m_local_close_reason = ec.message(); + } + + // TODO: does any of this need a mutex? + if (m_is_http) { + m_http_state = session::http_state::closed; + } + if (m_state == session::state::connecting) { + m_state = session::state::closed; + tstat = failed; + + // Log fail result here before socket is shut down and we can't get + // the remote address, etc anymore + if (m_ec != error::http_connection_ended) { + log_fail_result(); + } + } else if (m_state != session::state::closed) { + m_state = session::state::closed; + tstat = closed; + } else { + m_alog.write(log::alevel::devel, + "terminate called on connection that was already terminated"); + return; + } + + // TODO: choose between shutdown and close based on error code sent + + transport_con_type::async_shutdown( + lib::bind( + &type::handle_terminate, + type::get_shared(), + tstat, + lib::placeholders::_1 + ) + ); +} + +template <typename config> +void connection<config>::handle_terminate(terminate_status tstat, + lib::error_code const & ec) +{ + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection handle_terminate"); + } + + if (ec) { + // there was an error actually shutting down the connection + log_err(log::elevel::devel,"handle_terminate",ec); + } + + // clean shutdown + if (tstat == failed) { + if (m_ec != error::http_connection_ended) { + if (m_fail_handler) { + m_fail_handler(m_connection_hdl); + } + } + } else if (tstat == closed) { + if (m_close_handler) { + m_close_handler(m_connection_hdl); + } + log_close_result(); + } else { + m_elog.write(log::elevel::rerror,"Unknown terminate_status"); + } + + // call the termination handler if it exists + // if it exists it might (but shouldn't) refer to a bad memory location. + // If it does, we don't care and should catch and ignore it. + if (m_termination_handler) { + try { + m_termination_handler(type::get_shared()); + } catch (std::exception const & e) { + m_elog.write(log::elevel::warn, + std::string("termination_handler call failed. Reason was: ")+e.what()); + } + } +} + +template <typename config> +void connection<config>::write_frame() { + //m_alog.write(log::alevel::devel,"connection write_frame"); + + { + scoped_lock_type lock(m_write_lock); + + // Check the write flag. If true, there is an outstanding transport + // write already. In this case we just return. The write handler will + // start a new write if the write queue isn't empty. If false, we set + // the write flag and proceed to initiate a transport write. + if (m_write_flag) { + return; + } + + // pull off all the messages that are ready to write. + // stop if we get a message marked terminal + message_ptr next_message = write_pop(); + while (next_message) { + m_current_msgs.push_back(next_message); + if (!next_message->get_terminal()) { + next_message = write_pop(); + } else { + next_message = message_ptr(); + } + } + + if (m_current_msgs.empty()) { + // there was nothing to send + return; + } else { + // At this point we own the next messages to be sent and are + // responsible for holding the write flag until they are + // successfully sent or there is some error + m_write_flag = true; + } + } + + typename std::vector<message_ptr>::iterator it; + for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) { + std::string const & header = (*it)->get_header(); + std::string const & payload = (*it)->get_payload(); + + m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); + m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); + } + + // Print detailed send stats if those log levels are enabled + if (m_alog.static_test(log::alevel::frame_header)) { + if (m_alog.dynamic_test(log::alevel::frame_header)) { + std::stringstream general,header,payload; + + general << "Dispatching write containing " << m_current_msgs.size() + <<" message(s) containing "; + header << "Header Bytes: \n"; + payload << "Payload Bytes: \n"; + + size_t hbytes = 0; + size_t pbytes = 0; + + for (size_t i = 0; i < m_current_msgs.size(); i++) { + hbytes += m_current_msgs[i]->get_header().size(); + pbytes += m_current_msgs[i]->get_payload().size(); + + + header << "[" << i << "] (" + << m_current_msgs[i]->get_header().size() << ") " + << utility::to_hex(m_current_msgs[i]->get_header()) << "\n"; + + if (m_alog.static_test(log::alevel::frame_payload)) { + if (m_alog.dynamic_test(log::alevel::frame_payload)) { + payload << "[" << i << "] (" + << m_current_msgs[i]->get_payload().size() << ") ["<<m_current_msgs[i]->get_opcode()<<"] " + << (m_current_msgs[i]->get_opcode() == frame::opcode::text ? + m_current_msgs[i]->get_payload() : + utility::to_hex(m_current_msgs[i]->get_payload()) + ) + << "\n"; + } + } + } + + general << hbytes << " header bytes and " << pbytes << " payload bytes"; + + m_alog.write(log::alevel::frame_header,general.str()); + m_alog.write(log::alevel::frame_header,header.str()); + m_alog.write(log::alevel::frame_payload,payload.str()); + } + } + + transport_con_type::async_write( + m_send_buffer, + m_write_frame_handler + ); +} + +template <typename config> +void connection<config>::handle_write_frame(lib::error_code const & ec) +{ + if (m_alog.static_test(log::alevel::devel)) { + m_alog.write(log::alevel::devel,"connection handle_write_frame"); + } + + bool terminal = m_current_msgs.back()->get_terminal(); + + m_send_buffer.clear(); + m_current_msgs.clear(); + // TODO: recycle instead of deleting + + if (ec) { + log_err(log::elevel::fatal,"handle_write_frame",ec); + this->terminate(ec); + return; + } + + if (terminal) { + this->terminate(lib::error_code()); + return; + } + + bool needs_writing = false; + { + scoped_lock_type lock(m_write_lock); + + // release write flag + m_write_flag = false; + + needs_writing = !m_send_queue.empty(); + } + + if (needs_writing) { + transport_con_type::dispatch(lib::bind( + &type::write_frame, + type::get_shared() + )); + } +} + +template <typename config> +std::vector<int> const & connection<config>::get_supported_versions() const +{ + return versions_supported; +} + +template <typename config> +void connection<config>::process_control_frame(typename config::message_type::ptr msg) +{ + m_alog.write(log::alevel::devel,"process_control_frame"); + + frame::opcode::value op = msg->get_opcode(); + lib::error_code ec; + + std::stringstream s; + s << "Control frame received with opcode " << op; + m_alog.write(log::alevel::control,s.str()); + + if (m_state == session::state::closed) { + m_elog.write(log::elevel::warn,"got frame in state closed"); + return; + } + if (op != frame::opcode::CLOSE && m_state != session::state::open) { + m_elog.write(log::elevel::warn,"got non-close frame in state closing"); + return; + } + + if (op == frame::opcode::PING) { + bool should_reply = true; + + if (m_ping_handler) { + should_reply = m_ping_handler(m_connection_hdl, msg->get_payload()); + } + + if (should_reply) { + this->pong(msg->get_payload(),ec); + if (ec) { + log_err(log::elevel::devel,"Failed to send response pong",ec); + } + } + } else if (op == frame::opcode::PONG) { + if (m_pong_handler) { + m_pong_handler(m_connection_hdl, msg->get_payload()); + } + if (m_ping_timer) { + m_ping_timer->cancel(); + } + } else if (op == frame::opcode::CLOSE) { + m_alog.write(log::alevel::devel,"got close frame"); + // record close code and reason somewhere + + m_remote_close_code = close::extract_code(msg->get_payload(),ec); + if (ec) { + s.str(""); + if (config::drop_on_protocol_error) { + s << "Received invalid close code " << m_remote_close_code + << " dropping connection per config."; + m_elog.write(log::elevel::devel,s.str()); + this->terminate(ec); + } else { + s << "Received invalid close code " << m_remote_close_code + << " sending acknowledgement and closing"; + m_elog.write(log::elevel::devel,s.str()); + ec = send_close_ack(close::status::protocol_error, + "Invalid close code"); + if (ec) { + log_err(log::elevel::devel,"send_close_ack",ec); + } + } + return; + } + + m_remote_close_reason = close::extract_reason(msg->get_payload(),ec); + if (ec) { + if (config::drop_on_protocol_error) { + m_elog.write(log::elevel::devel, + "Received invalid close reason. Dropping connection per config"); + this->terminate(ec); + } else { + m_elog.write(log::elevel::devel, + "Received invalid close reason. Sending acknowledgement and closing"); + ec = send_close_ack(close::status::protocol_error, + "Invalid close reason"); + if (ec) { + log_err(log::elevel::devel,"send_close_ack",ec); + } + } + return; + } + + if (m_state == session::state::open) { + s.str(""); + s << "Received close frame with code " << m_remote_close_code + << " and reason " << m_remote_close_reason; + m_alog.write(log::alevel::devel,s.str()); + + ec = send_close_ack(); + if (ec) { + log_err(log::elevel::devel,"send_close_ack",ec); + } + } else if (m_state == session::state::closing && !m_was_clean) { + // ack of our close + m_alog.write(log::alevel::devel, "Got acknowledgement of close"); + + m_was_clean = true; + + // If we are a server terminate the connection now. Clients should + // leave the connection open to give the server an opportunity to + // initiate the TCP close. The client's timer will handle closing + // its side of the connection if the server misbehaves. + // + // TODO: different behavior if the underlying transport doesn't + // support timers? + if (m_is_server) { + terminate(lib::error_code()); + } + } else { + // spurious, ignore + m_elog.write(log::elevel::devel, "Got close frame in wrong state"); + } + } else { + // got an invalid control opcode + m_elog.write(log::elevel::devel, "Got control frame with invalid opcode"); + // initiate protocol error shutdown + } +} + +template <typename config> +lib::error_code connection<config>::send_close_ack(close::status::value code, + std::string const & reason) +{ + return send_close_frame(code,reason,true,m_is_server); +} + +template <typename config> +lib::error_code connection<config>::send_close_frame(close::status::value code, + std::string const & reason, bool ack, bool terminal) +{ + m_alog.write(log::alevel::devel,"send_close_frame"); + + // check for special codes + + // If silent close is set, respect it and blank out close information + // Otherwise use whatever has been specified in the parameters. If + // parameters specifies close::status::blank then determine what to do + // based on whether or not this is an ack. If it is not an ack just + // send blank info. If it is an ack then echo the close information from + // the remote endpoint. + if (config::silent_close) { + m_alog.write(log::alevel::devel,"closing silently"); + m_local_close_code = close::status::no_status; + m_local_close_reason.clear(); + } else if (code != close::status::blank) { + m_alog.write(log::alevel::devel,"closing with specified codes"); + m_local_close_code = code; + m_local_close_reason = reason; + } else if (!ack) { + m_alog.write(log::alevel::devel,"closing with no status code"); + m_local_close_code = close::status::no_status; + m_local_close_reason.clear(); + } else if (m_remote_close_code == close::status::no_status) { + m_alog.write(log::alevel::devel, + "acknowledging a no-status close with normal code"); + m_local_close_code = close::status::normal; + m_local_close_reason.clear(); + } else { + m_alog.write(log::alevel::devel,"acknowledging with remote codes"); + m_local_close_code = m_remote_close_code; + m_local_close_reason = m_remote_close_reason; + } + + std::stringstream s; + s << "Closing with code: " << m_local_close_code << ", and reason: " + << m_local_close_reason; + m_alog.write(log::alevel::devel,s.str()); + + message_ptr msg = m_msg_manager->get_message(); + if (!msg) { + return error::make_error_code(error::no_outgoing_buffers); + } + + lib::error_code ec = m_processor->prepare_close(m_local_close_code, + m_local_close_reason,msg); + if (ec) { + return ec; + } + + // Messages flagged terminal will result in the TCP connection being dropped + // after the message has been written. This is typically used when servers + // send an ack and when any endpoint encounters a protocol error + if (terminal) { + msg->set_terminal(true); + } + + m_state = session::state::closing; + + if (ack) { + m_was_clean = true; + } + + // Start a timer so we don't wait forever for the acknowledgement close + // frame + if (m_close_handshake_timeout_dur > 0) { + m_handshake_timer = transport_con_type::set_timer( + m_close_handshake_timeout_dur, + lib::bind( + &type::handle_close_handshake_timeout, + type::get_shared(), + lib::placeholders::_1 + ) + ); + } + + bool needs_writing = false; + { + scoped_lock_type lock(m_write_lock); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); + } + + if (needs_writing) { + transport_con_type::dispatch(lib::bind( + &type::write_frame, + type::get_shared() + )); + } + + return lib::error_code(); +} + +template <typename config> +typename connection<config>::processor_ptr +connection<config>::get_processor(int version) const { + // TODO: allow disabling certain versions + + processor_ptr p; + + switch (version) { + case 0: + p = lib::make_shared<processor::hybi00<config> >( + transport_con_type::is_secure(), + m_is_server, + m_msg_manager + ); + break; + case 7: + p = lib::make_shared<processor::hybi07<config> >( + transport_con_type::is_secure(), + m_is_server, + m_msg_manager, + lib::ref(m_rng) + ); + break; + case 8: + p = lib::make_shared<processor::hybi08<config> >( + transport_con_type::is_secure(), + m_is_server, + m_msg_manager, + lib::ref(m_rng) + ); + break; + case 13: + p = lib::make_shared<processor::hybi13<config> >( + transport_con_type::is_secure(), + m_is_server, + m_msg_manager, + lib::ref(m_rng) + ); + break; + default: + return p; + } + + // Settings not configured by the constructor + p->set_max_message_size(m_max_message_size); + + return p; +} + +template <typename config> +void connection<config>::write_push(typename config::message_type::ptr msg) +{ + if (!msg) { + return; + } + + m_send_buffer_size += msg->get_payload().size(); + m_send_queue.push(msg); + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "write_push: message count: " << m_send_queue.size() + << " buffer size: " << m_send_buffer_size; + m_alog.write(log::alevel::devel,s.str()); + } +} + +template <typename config> +typename config::message_type::ptr connection<config>::write_pop() +{ + message_ptr msg; + + if (m_send_queue.empty()) { + return msg; + } + + msg = m_send_queue.front(); + + m_send_buffer_size -= msg->get_payload().size(); + m_send_queue.pop(); + + if (m_alog.static_test(log::alevel::devel)) { + std::stringstream s; + s << "write_pop: message count: " << m_send_queue.size() + << " buffer size: " << m_send_buffer_size; + m_alog.write(log::alevel::devel,s.str()); + } + return msg; +} + +template <typename config> +void connection<config>::log_open_result() +{ + std::stringstream s; + + int version; + if (!processor::is_websocket_handshake(m_request)) { + version = -1; + } else { + version = processor::get_websocket_version(m_request); + } + + // Connection Type + s << (version == -1 ? "HTTP" : "WebSocket") << " Connection "; + + // Remote endpoint address + s << transport_con_type::get_remote_endpoint() << " "; + + // Version string if WebSocket + if (version != -1) { + s << "v" << version << " "; + } + + // User Agent + std::string ua = m_request.get_header("User-Agent"); + if (ua.empty()) { + s << "\"\" "; + } else { + // check if there are any quotes in the user agent + s << "\"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; + } + + // URI + s << (m_uri ? m_uri->get_resource() : "NULL") << " "; + + // Status code + s << m_response.get_status_code(); + + m_alog.write(log::alevel::connect,s.str()); +} + +template <typename config> +void connection<config>::log_close_result() +{ + std::stringstream s; + + s << "Disconnect " + << "close local:[" << m_local_close_code + << (m_local_close_reason.empty() ? "" : ","+m_local_close_reason) + << "] remote:[" << m_remote_close_code + << (m_remote_close_reason.empty() ? "" : ","+m_remote_close_reason) << "]"; + + m_alog.write(log::alevel::disconnect,s.str()); +} + +template <typename config> +void connection<config>::log_fail_result() +{ + std::stringstream s; + + int version = processor::get_websocket_version(m_request); + + // Connection Type + s << "WebSocket Connection "; + + // Remote endpoint address & WebSocket version + s << transport_con_type::get_remote_endpoint(); + if (version < 0) { + s << " -"; + } else { + s << " v" << version; + } + + // User Agent + std::string ua = m_request.get_header("User-Agent"); + if (ua.empty()) { + s << " \"\" "; + } else { + // check if there are any quotes in the user agent + s << " \"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; + } + + // URI + s << (m_uri ? m_uri->get_resource() : "-"); + + // HTTP Status code + s << " " << m_response.get_status_code(); + + // WebSocket++ error code & reason + s << " " << m_ec << " " << m_ec.message(); + + m_alog.write(log::alevel::fail,s.str()); +} + +template <typename config> +void connection<config>::log_http_result() { + std::stringstream s; + + if (processor::is_websocket_handshake(m_request)) { + m_alog.write(log::alevel::devel,"Call to log_http_result for WebSocket"); + return; + } + + // Connection Type + s << (m_request.get_header("host").empty() ? "-" : m_request.get_header("host")) + << " " << transport_con_type::get_remote_endpoint() + << " \"" << m_request.get_method() + << " " << (m_uri ? m_uri->get_resource() : "-") + << " " << m_request.get_version() << "\" " << m_response.get_status_code() + << " " << m_response.get_body().size(); + + // User Agent + std::string ua = m_request.get_header("User-Agent"); + if (ua.empty()) { + s << " \"\" "; + } else { + // check if there are any quotes in the user agent + s << " \"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; + } + + m_alog.write(log::alevel::http,s.str()); +} + +} // namespace websocketpp + +#endif // WEBSOCKETPP_CONNECTION_IMPL_HPP diff --git a/websocketpp/impl/endpoint_impl.hpp b/websocketpp/impl/endpoint_impl.hpp new file mode 100644 index 00000000..e09cda95 --- /dev/null +++ b/websocketpp/impl/endpoint_impl.hpp @@ -0,0 +1,269 @@ +/* + * Copyright (c) 2014, Peter Thorson. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the WebSocket++ Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef WEBSOCKETPP_ENDPOINT_IMPL_HPP +#define WEBSOCKETPP_ENDPOINT_IMPL_HPP + +#include <string> + +namespace websocketpp { + +template <typename connection, typename config> +typename endpoint<connection,config>::connection_ptr +endpoint<connection,config>::create_connection() { + m_alog.write(log::alevel::devel,"create_connection"); + //scoped_lock_type lock(m_state_lock); + + /*if (m_state == STOPPING || m_state == STOPPED) { + return connection_ptr(); + }*/ + + //scoped_lock_type guard(m_mutex); + // Create a connection on the heap and manage it using a shared pointer + connection_ptr con = lib::make_shared<connection_type>(m_is_server, + m_user_agent, lib::ref(m_alog), lib::ref(m_elog), lib::ref(m_rng)); + + connection_weak_ptr w(con); + + // Create a weak pointer on the heap using that shared_ptr. + // Cast that weak pointer to void* and manage it using another shared_ptr + // connection_hdl hdl(reinterpret_cast<void*>(new connection_weak_ptr(con))); + + con->set_handle(w); + + // Copy default handlers from the endpoint + con->set_open_handler(m_open_handler); + con->set_close_handler(m_close_handler); + con->set_fail_handler(m_fail_handler); + con->set_ping_handler(m_ping_handler); + con->set_pong_handler(m_pong_handler); + con->set_pong_timeout_handler(m_pong_timeout_handler); + con->set_interrupt_handler(m_interrupt_handler); + con->set_http_handler(m_http_handler); + con->set_validate_handler(m_validate_handler); + con->set_message_handler(m_message_handler); + + if (m_open_handshake_timeout_dur != config::timeout_open_handshake) { + con->set_open_handshake_timeout(m_open_handshake_timeout_dur); + } + if (m_close_handshake_timeout_dur != config::timeout_close_handshake) { + con->set_close_handshake_timeout(m_close_handshake_timeout_dur); + } + if (m_pong_timeout_dur != config::timeout_pong) { + con->set_pong_timeout(m_pong_timeout_dur); + } + if (m_max_message_size != config::max_message_size) { + con->set_max_message_size(m_max_message_size); + } + con->set_max_http_body_size(m_max_http_body_size); + + lib::error_code ec; + + ec = transport_type::init(con); + if (ec) { + m_elog.write(log::elevel::fatal,ec.message()); + return connection_ptr(); + } + + return con; +} + +template <typename connection, typename config> +void endpoint<connection,config>::interrupt(connection_hdl hdl, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + m_alog.write(log::alevel::devel,"Interrupting connection"); + + ec = con->interrupt(); +} + +template <typename connection, typename config> +void endpoint<connection,config>::interrupt(connection_hdl hdl) { + lib::error_code ec; + interrupt(hdl,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::pause_reading(connection_hdl hdl, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + ec = con->pause_reading(); +} + +template <typename connection, typename config> +void endpoint<connection,config>::pause_reading(connection_hdl hdl) { + lib::error_code ec; + pause_reading(hdl,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::resume_reading(connection_hdl hdl, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + ec = con->resume_reading(); +} + +template <typename connection, typename config> +void endpoint<connection,config>::resume_reading(connection_hdl hdl) { + lib::error_code ec; + resume_reading(hdl,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::send_http_response(connection_hdl hdl, + lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + con->send_http_response(ec); +} + +template <typename connection, typename config> +void endpoint<connection,config>::send_http_response(connection_hdl hdl) { + lib::error_code ec; + send_http_response(hdl,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, std::string const & payload, + frame::opcode::value op, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + ec = con->send(payload,op); +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, std::string const & payload, + frame::opcode::value op) +{ + lib::error_code ec; + send(hdl,payload,op,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, void const * payload, + size_t len, frame::opcode::value op, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + ec = con->send(payload,len,op); +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, void const * payload, + size_t len, frame::opcode::value op) +{ + lib::error_code ec; + send(hdl,payload,len,op,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, message_ptr msg, + lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + ec = con->send(msg); +} + +template <typename connection, typename config> +void endpoint<connection,config>::send(connection_hdl hdl, message_ptr msg) { + lib::error_code ec; + send(hdl,msg,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::close(connection_hdl hdl, close::status::value + const code, std::string const & reason, + lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + con->close(code,reason,ec); +} + +template <typename connection, typename config> +void endpoint<connection,config>::close(connection_hdl hdl, close::status::value + const code, std::string const & reason) +{ + lib::error_code ec; + close(hdl,code,reason,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::ping(connection_hdl hdl, std::string const & + payload, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + con->ping(payload,ec); +} + +template <typename connection, typename config> +void endpoint<connection,config>::ping(connection_hdl hdl, std::string const & payload) +{ + lib::error_code ec; + ping(hdl,payload,ec); + if (ec) { throw exception(ec); } +} + +template <typename connection, typename config> +void endpoint<connection,config>::pong(connection_hdl hdl, std::string const & payload, + lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + con->pong(payload,ec); +} + +template <typename connection, typename config> +void endpoint<connection,config>::pong(connection_hdl hdl, std::string const & payload) +{ + lib::error_code ec; + pong(hdl,payload,ec); + if (ec) { throw exception(ec); } +} + +} // namespace websocketpp + +#endif // WEBSOCKETPP_ENDPOINT_IMPL_HPP diff --git a/websocketpp/impl/utilities_impl.hpp b/websocketpp/impl/utilities_impl.hpp new file mode 100644 index 00000000..6f86e22f --- /dev/null +++ b/websocketpp/impl/utilities_impl.hpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2014, Peter Thorson. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the WebSocket++ Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef WEBSOCKETPP_UTILITIES_IMPL_HPP +#define WEBSOCKETPP_UTILITIES_IMPL_HPP + +#include <algorithm> +#include <string> + +namespace websocketpp { +namespace utility { + +inline std::string to_lower(std::string const & in) { + std::string out = in; + std::transform(out.begin(),out.end(),out.begin(),::tolower); + return out; +} + +inline std::string to_hex(std::string const & input) { + std::string output; + std::string hex = "0123456789ABCDEF"; + + for (size_t i = 0; i < input.size(); i++) { + output += hex[(input[i] & 0xF0) >> 4]; + output += hex[input[i] & 0x0F]; + output += " "; + } + + return output; +} + +inline std::string to_hex(uint8_t const * input, size_t length) { + std::string output; + std::string hex = "0123456789ABCDEF"; + + for (size_t i = 0; i < length; i++) { + output += hex[(input[i] & 0xF0) >> 4]; + output += hex[input[i] & 0x0F]; + output += " "; + } + + return output; +} + +inline std::string to_hex(const char* input,size_t length) { + return to_hex(reinterpret_cast<const uint8_t*>(input),length); +} + +inline std::string string_replace_all(std::string subject, std::string const & + search, std::string const & replace) +{ + size_t pos = 0; + while((pos = subject.find(search, pos)) != std::string::npos) { + subject.replace(pos, search.length(), replace); + pos += replace.length(); + } + return subject; +} + +} // namespace utility +} // namespace websocketpp + +#endif // WEBSOCKETPP_UTILITIES_IMPL_HPP |