/* * Copyright (c) 2014, Peter Thorson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of the WebSocket++ Project nor the * names of its contributors may be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP #define WEBSOCKETPP_CONNECTION_IMPL_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include namespace websocketpp { namespace istate = session::internal_state; template void connection::set_termination_handler( termination_handler new_handler) { m_alog.write(log::alevel::devel, "connection set_termination_handler"); //scoped_lock_type lock(m_connection_state_lock); m_termination_handler = new_handler; } template std::string const & connection::get_origin() const { //scoped_lock_type lock(m_connection_state_lock); return m_processor->get_origin(m_request); } template size_t connection::get_buffered_amount() const { //scoped_lock_type lock(m_connection_state_lock); return m_send_buffer_size; } template session::state::value connection::get_state() const { //scoped_lock_type lock(m_connection_state_lock); return m_state; } template lib::error_code connection::send(std::string const & payload, frame::opcode::value op) { message_ptr msg = m_msg_manager->get_message(op,payload.size()); msg->append_payload(payload); msg->set_compressed(true); return send(msg); } template lib::error_code connection::send(void const * payload, size_t len, frame::opcode::value op) { message_ptr msg = m_msg_manager->get_message(op,len); msg->append_payload(payload,len); return send(msg); } template lib::error_code connection::send(typename config::message_type::ptr msg) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection send"); } { scoped_lock_type lock(m_connection_state_lock); if (m_state != session::state::open) { return error::make_error_code(error::invalid_state); } } message_ptr outgoing_msg; bool needs_writing = false; if (msg->get_prepared()) { outgoing_msg = msg; scoped_lock_type lock(m_write_lock); write_push(outgoing_msg); needs_writing = !m_write_flag && !m_send_queue.empty(); } else { outgoing_msg = m_msg_manager->get_message(); if (!outgoing_msg) { return error::make_error_code(error::no_outgoing_buffers); } scoped_lock_type lock(m_write_lock); lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg); if (ec) { return ec; } write_push(outgoing_msg); needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { transport_con_type::dispatch(lib::bind( &type::write_frame, type::get_shared() )); } return lib::error_code(); } template void connection::ping(std::string const& payload, lib::error_code& ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection ping"); } { scoped_lock_type lock(m_connection_state_lock); if (m_state != session::state::open) { std::stringstream ss; ss << "connection::ping called from invalid state " << m_state; m_alog.write(log::alevel::devel,ss.str()); ec = error::make_error_code(error::invalid_state); return; } } message_ptr msg = m_msg_manager->get_message(); if (!msg) { ec = error::make_error_code(error::no_outgoing_buffers); return; } ec = m_processor->prepare_ping(payload,msg); if (ec) {return;} // set ping timer if we are listening for one if (m_pong_timeout_handler) { // Cancel any existing timers if (m_ping_timer) { m_ping_timer->cancel(); } if (m_pong_timeout_dur > 0) { m_ping_timer = transport_con_type::set_timer( m_pong_timeout_dur, lib::bind( &type::handle_pong_timeout, type::get_shared(), payload, lib::placeholders::_1 ) ); } if (!m_ping_timer) { // Our transport doesn't support timers m_elog.write(log::elevel::warn,"Warning: a pong_timeout_handler is \ set but the transport in use does not support timeouts."); } } bool needs_writing = false; { scoped_lock_type lock(m_write_lock); write_push(msg); needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { transport_con_type::dispatch(lib::bind( &type::write_frame, type::get_shared() )); } ec = lib::error_code(); } template void connection::ping(std::string const & payload) { lib::error_code ec; ping(payload,ec); if (ec) { throw exception(ec); } } template void connection::handle_pong_timeout(std::string payload, lib::error_code const & ec) { if (ec) { if (ec == transport::error::operation_aborted) { // ignore, this is expected return; } m_elog.write(log::elevel::devel,"pong_timeout error: "+ec.message()); return; } if (m_pong_timeout_handler) { m_pong_timeout_handler(m_connection_hdl,payload); } } template void connection::pong(std::string const& payload, lib::error_code& ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection pong"); } { scoped_lock_type lock(m_connection_state_lock); if (m_state != session::state::open) { std::stringstream ss; ss << "connection::pong called from invalid state " << m_state; m_alog.write(log::alevel::devel,ss.str()); ec = error::make_error_code(error::invalid_state); return; } } message_ptr msg = m_msg_manager->get_message(); if (!msg) { ec = error::make_error_code(error::no_outgoing_buffers); return; } ec = m_processor->prepare_pong(payload,msg); if (ec) {return;} bool needs_writing = false; { scoped_lock_type lock(m_write_lock); write_push(msg); needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { transport_con_type::dispatch(lib::bind( &type::write_frame, type::get_shared() )); } ec = lib::error_code(); } template void connection::pong(std::string const & payload) { lib::error_code ec; pong(payload,ec); if (ec) { throw exception(ec); } } template void connection::close(close::status::value const code, std::string const & reason, lib::error_code & ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection close"); } // Truncate reason to maximum size allowable in a close frame. std::string tr(reason,0,std::min(reason.size(), frame::limits::close_reason_size)); scoped_lock_type lock(m_connection_state_lock); if (m_state != session::state::open) { ec = error::make_error_code(error::invalid_state); return; } ec = this->send_close_frame(code,tr,false,close::status::terminal(code)); } template void connection::close(close::status::value const code, std::string const & reason) { lib::error_code ec; close(code,reason,ec); if (ec) { throw exception(ec); } } /// Trigger the on_interrupt handler /** * This is thread safe if the transport is thread safe */ template lib::error_code connection::interrupt() { m_alog.write(log::alevel::devel,"connection connection::interrupt"); return transport_con_type::interrupt( lib::bind( &type::handle_interrupt, type::get_shared() ) ); } template void connection::handle_interrupt() { if (m_interrupt_handler) { m_interrupt_handler(m_connection_hdl); } } template lib::error_code connection::pause_reading() { m_alog.write(log::alevel::devel,"connection connection::pause_reading"); return transport_con_type::dispatch( lib::bind( &type::handle_pause_reading, type::get_shared() ) ); } /// Pause reading handler. Not safe to call directly template void connection::handle_pause_reading() { m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading"); m_read_flag = false; } template lib::error_code connection::resume_reading() { m_alog.write(log::alevel::devel,"connection connection::resume_reading"); return transport_con_type::dispatch( lib::bind( &type::handle_resume_reading, type::get_shared() ) ); } /// Resume reading helper method. Not safe to call directly template void connection::handle_resume_reading() { m_read_flag = true; read_frame(); } template bool connection::get_secure() const { //scoped_lock_type lock(m_connection_state_lock); return m_uri->get_secure(); } template std::string const & connection::get_host() const { //scoped_lock_type lock(m_connection_state_lock); return m_uri->get_host(); } template std::string const & connection::get_resource() const { //scoped_lock_type lock(m_connection_state_lock); return m_uri->get_resource(); } template uint16_t connection::get_port() const { //scoped_lock_type lock(m_connection_state_lock); return m_uri->get_port(); } template uri_ptr connection::get_uri() const { //scoped_lock_type lock(m_connection_state_lock); return m_uri; } template void connection::set_uri(uri_ptr uri) { //scoped_lock_type lock(m_connection_state_lock); m_uri = uri; } template std::string const & connection::get_subprotocol() const { return m_subprotocol; } template std::vector const & connection::get_requested_subprotocols() const { return m_requested_subprotocols; } template void connection::add_subprotocol(std::string const & value, lib::error_code & ec) { if (m_is_server) { ec = error::make_error_code(error::client_only); return; } // If the value is empty or has a non-RFC2616 token character it is invalid. if (value.empty() || std::find_if(value.begin(),value.end(), http::is_not_token_char) != value.end()) { ec = error::make_error_code(error::invalid_subprotocol); return; } m_requested_subprotocols.push_back(value); } template void connection::add_subprotocol(std::string const & value) { lib::error_code ec; this->add_subprotocol(value,ec); if (ec) { throw exception(ec); } } template void connection::select_subprotocol(std::string const & value, lib::error_code & ec) { if (!m_is_server) { ec = error::make_error_code(error::server_only); return; } if (value.empty()) { ec = lib::error_code(); return; } std::vector::iterator it; it = std::find(m_requested_subprotocols.begin(), m_requested_subprotocols.end(), value); if (it == m_requested_subprotocols.end()) { ec = error::make_error_code(error::unrequested_subprotocol); return; } m_subprotocol = value; } template void connection::select_subprotocol(std::string const & value) { lib::error_code ec; this->select_subprotocol(value,ec); if (ec) { throw exception(ec); } } template std::string const & connection::get_request_header(std::string const & key) const { return m_request.get_header(key); } template std::string const & connection::get_request_body() const { return m_request.get_body(); } template std::string const & connection::get_response_header(std::string const & key) const { return m_response.get_header(key); } // TODO: EXCEPTION_FREE template void connection::set_status(http::status_code::value code) { if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { throw exception("Call to set_status from invalid state", error::make_error_code(error::invalid_state)); } m_response.set_status(code); } // TODO: EXCEPTION_FREE template void connection::set_status(http::status_code::value code, std::string const & msg) { if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { throw exception("Call to set_status from invalid state", error::make_error_code(error::invalid_state)); } m_response.set_status(code,msg); } // TODO: EXCEPTION_FREE template void connection::set_body(std::string const & value) { if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { throw exception("Call to set_status from invalid state", error::make_error_code(error::invalid_state)); } m_response.set_body(value); } // TODO: EXCEPTION_FREE template void connection::append_header(std::string const & key, std::string const & val) { if (m_is_server) { if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { // we are setting response headers for an incoming server connection m_response.append_header(key,val); } else { throw exception("Call to append_header from invalid state", error::make_error_code(error::invalid_state)); } } else { if (m_internal_state == istate::USER_INIT) { // we are setting initial headers for an outgoing client connection m_request.append_header(key,val); } else { throw exception("Call to append_header from invalid state", error::make_error_code(error::invalid_state)); } } } // TODO: EXCEPTION_FREE template void connection::replace_header(std::string const & key, std::string const & val) { if (m_is_server) { if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { // we are setting response headers for an incoming server connection m_response.replace_header(key,val); } else { throw exception("Call to replace_header from invalid state", error::make_error_code(error::invalid_state)); } } else { if (m_internal_state == istate::USER_INIT) { // we are setting initial headers for an outgoing client connection m_request.replace_header(key,val); } else { throw exception("Call to replace_header from invalid state", error::make_error_code(error::invalid_state)); } } } // TODO: EXCEPTION_FREE template void connection::remove_header(std::string const & key) { if (m_is_server) { if (m_internal_state == istate::PROCESS_HTTP_REQUEST) { // we are setting response headers for an incoming server connection m_response.remove_header(key); } else { throw exception("Call to remove_header from invalid state", error::make_error_code(error::invalid_state)); } } else { if (m_internal_state == istate::USER_INIT) { // we are setting initial headers for an outgoing client connection m_request.remove_header(key); } else { throw exception("Call to remove_header from invalid state", error::make_error_code(error::invalid_state)); } } } /// Defer HTTP Response until later /** * Used in the http handler to defer the HTTP response for this connection * until later. Handshake timers will be canceled and the connection will be * left open until `send_http_response` or an equivalent is called. * * Warning: deferred connections won't time out and as a result can tie up * resources. * * @return A status code, zero on success, non-zero otherwise */ template lib::error_code connection::defer_http_response() { // Cancel handshake timer, otherwise the connection will time out and we'll // close the connection before the app has a chance to send a response. if (m_handshake_timer) { m_handshake_timer->cancel(); m_handshake_timer.reset(); } // Do something to signal deferral m_http_state = session::http_state::deferred; return lib::error_code(); } /// Send deferred HTTP Response (exception free) /** * Sends an http response to an HTTP connection that was deferred. This will * send a complete response including all headers, status line, and body * text. The connection will be closed afterwards. * * @since 0.6.0 * * @param ec A status code, zero on success, non-zero otherwise */ template void connection::send_http_response(lib::error_code & ec) { { scoped_lock_type lock(m_connection_state_lock); if (m_http_state != session::http_state::deferred) { ec = error::make_error_code(error::invalid_state); return; } m_http_state = session::http_state::body_written; } this->write_http_response(lib::error_code()); ec = lib::error_code(); } template void connection::send_http_response() { lib::error_code ec; this->send_http_response(ec); if (ec) { throw exception(ec); } } /******** logic thread ********/ template void connection::start() { m_alog.write(log::alevel::devel,"connection start"); if (m_internal_state != istate::USER_INIT) { m_alog.write(log::alevel::devel,"Start called in invalid state"); this->terminate(error::make_error_code(error::invalid_state)); return; } m_internal_state = istate::TRANSPORT_INIT; // Depending on how the transport implements init this function may return // immediately and call handle_transport_init later or call // handle_transport_init from this function. transport_con_type::init( lib::bind( &type::handle_transport_init, type::get_shared(), lib::placeholders::_1 ) ); } template void connection::handle_transport_init(lib::error_code const & ec) { m_alog.write(log::alevel::devel,"connection handle_transport_init"); lib::error_code ecm = ec; if (m_internal_state != istate::TRANSPORT_INIT) { m_alog.write(log::alevel::devel, "handle_transport_init must be called from transport init state"); ecm = error::make_error_code(error::invalid_state); } if (ecm) { std::stringstream s; s << "handle_transport_init received error: "<< ecm.message(); m_elog.write(log::elevel::rerror,s.str()); this->terminate(ecm); return; } // At this point the transport is ready to read and write bytes. if (m_is_server) { m_internal_state = istate::READ_HTTP_REQUEST; this->read_handshake(1); } else { // We are a client. Set the processor to the version specified in the // config file and send a handshake request. m_internal_state = istate::WRITE_HTTP_REQUEST; m_processor = get_processor(config::client_version); this->send_http_request(); } } template void connection::read_handshake(size_t num_bytes) { m_alog.write(log::alevel::devel,"connection read_handshake"); if (m_open_handshake_timeout_dur > 0) { m_handshake_timer = transport_con_type::set_timer( m_open_handshake_timeout_dur, lib::bind( &type::handle_open_handshake_timeout, type::get_shared(), lib::placeholders::_1 ) ); } transport_con_type::async_read_at_least( num_bytes, m_buf, config::connection_read_buffer_size, lib::bind( &type::handle_read_handshake, type::get_shared(), lib::placeholders::_1, lib::placeholders::_2 ) ); } // All exit paths for this function need to call write_http_response() or submit // a new read request with this function as the handler. template void connection::handle_read_handshake(lib::error_code const & ec, size_t bytes_transferred) { m_alog.write(log::alevel::devel,"connection handle_read_handshake"); lib::error_code ecm = ec; if (!ecm) { scoped_lock_type lock(m_connection_state_lock); if (m_state == session::state::connecting) { if (m_internal_state != istate::READ_HTTP_REQUEST) { ecm = error::make_error_code(error::invalid_state); } } else if (m_state == session::state::closed) { // The connection was canceled while the response was being sent, // usually by the handshake timer. This is basically expected // (though hopefully rare) and there is nothing we can do so ignore. m_alog.write(log::alevel::devel, "handle_read_handshake invoked after connection was closed"); return; } else { ecm = error::make_error_code(error::invalid_state); } } if (ecm) { if (ecm == transport::error::eof && m_state == session::state::closed) { // we expect to get eof if the connection is closed already m_alog.write(log::alevel::devel, "got (expected) eof/state error from closed con"); return; } log_err(log::elevel::rerror,"handle_read_handshake",ecm); this->terminate(ecm); return; } // Boundaries checking. TODO: How much of this should be done? if (bytes_transferred > config::connection_read_buffer_size) { m_elog.write(log::elevel::fatal,"Fatal boundaries checking error."); this->terminate(make_error_code(error::general)); return; } size_t bytes_processed = 0; try { bytes_processed = m_request.consume(m_buf,bytes_transferred); } catch (http::exception &e) { // All HTTP exceptions will result in this request failing and an error // response being returned. No more bytes will be read in this con. m_response.set_status(e.m_error_code,e.m_error_msg); this->write_http_response_error(error::make_error_code(error::http_parse_error)); return; } // More paranoid boundaries checking. // TODO: Is this overkill? if (bytes_processed > bytes_transferred) { m_elog.write(log::elevel::fatal,"Fatal boundaries checking error."); this->terminate(make_error_code(error::general)); return; } if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "bytes_transferred: " << bytes_transferred << " bytes, bytes processed: " << bytes_processed << " bytes"; m_alog.write(log::alevel::devel,s.str()); } if (m_request.ready()) { lib::error_code processor_ec = this->initialize_processor(); if (processor_ec) { this->write_http_response_error(processor_ec); return; } if (m_processor && m_processor->get_version() == 0) { // Version 00 has an extra requirement to read some bytes after the // handshake if (bytes_transferred-bytes_processed >= 8) { m_request.replace_header( "Sec-WebSocket-Key3", std::string(m_buf+bytes_processed,m_buf+bytes_processed+8) ); bytes_processed += 8; } else { // TODO: need more bytes m_alog.write(log::alevel::devel,"short key3 read"); m_response.set_status(http::status_code::internal_server_error); this->write_http_response_error(processor::error::make_error_code(processor::error::short_key3)); return; } } if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,m_request.raw()); if (!m_request.get_header("Sec-WebSocket-Key3").empty()) { m_alog.write(log::alevel::devel, utility::to_hex(m_request.get_header("Sec-WebSocket-Key3"))); } } // The remaining bytes in m_buf are frame data. Copy them to the // beginning of the buffer and note the length. They will be read after // the handshake completes and before more bytes are read. std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf); m_buf_cursor = bytes_transferred-bytes_processed; m_internal_state = istate::PROCESS_HTTP_REQUEST; // We have the complete request. Process it. lib::error_code handshake_ec = this->process_handshake_request(); // Write a response if this is a websocket connection or if it is an // HTTP connection for which the response has not been deferred or // started yet by a different system (i.e. still in init state). if (!m_is_http || m_http_state == session::http_state::init) { this->write_http_response(handshake_ec); } } else { // read at least 1 more byte transport_con_type::async_read_at_least( 1, m_buf, config::connection_read_buffer_size, lib::bind( &type::handle_read_handshake, type::get_shared(), lib::placeholders::_1, lib::placeholders::_2 ) ); } } // write_http_response requires the request to be fully read and the connection // to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors // before the request is fully read (specifically at a point where we aren't // sure if the hybi00 key3 bytes need to be read). This method sets the correct // state and calls write_http_response template void connection::write_http_response_error(lib::error_code const & ec) { if (m_internal_state != istate::READ_HTTP_REQUEST) { m_alog.write(log::alevel::devel, "write_http_response_error called in invalid state"); this->terminate(error::make_error_code(error::invalid_state)); return; } m_internal_state = istate::PROCESS_HTTP_REQUEST; this->write_http_response(ec); } // All exit paths for this function need to call write_http_response() or submit // a new read request with this function as the handler. template void connection::handle_read_frame(lib::error_code const & ec, size_t bytes_transferred) { //m_alog.write(log::alevel::devel,"connection handle_read_frame"); lib::error_code ecm = ec; if (!ecm && m_internal_state != istate::PROCESS_CONNECTION) { ecm = error::make_error_code(error::invalid_state); } if (ecm) { log::level echannel = log::elevel::rerror; if (ecm == transport::error::eof) { if (m_state == session::state::closed) { // we expect to get eof if the connection is closed already // just ignore it m_alog.write(log::alevel::devel,"got eof from closed con"); return; } else if (m_state == session::state::closing && !m_is_server) { // If we are a client we expect to get eof in the closing state, // this is a signal to terminate our end of the connection after // the closing handshake terminate(lib::error_code()); return; } } else if (ecm == error::invalid_state) { // In general, invalid state errors in the closed state are the // result of handlers that were in the system already when the state // changed and should be ignored as they pose no problems and there // is nothing useful that we can do about them. if (m_state == session::state::closed) { m_alog.write(log::alevel::devel, "handle_read_frame: got invalid istate in closed state"); return; } } else if (ecm == transport::error::tls_short_read) { if (m_state == session::state::closed) { // We expect to get a TLS short read if we try to read after the // connection is closed. If this happens ignore and exit the // read frame path. terminate(lib::error_code()); return; } echannel = log::elevel::rerror; } else if (ecm == transport::error::action_after_shutdown) { echannel = log::elevel::info; } log_err(echannel, "handle_read_frame", ecm); this->terminate(ecm); return; } // Boundaries checking. TODO: How much of this should be done? /*if (bytes_transferred > config::connection_read_buffer_size) { m_elog.write(log::elevel::fatal,"Fatal boundaries checking error"); this->terminate(make_error_code(error::general)); return; }*/ size_t p = 0; if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "p = " << p << " bytes transferred = " << bytes_transferred; m_alog.write(log::alevel::devel,s.str()); } while (p < bytes_transferred) { if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "calling consume with " << bytes_transferred-p << " bytes"; m_alog.write(log::alevel::devel,s.str()); } lib::error_code consume_ec; if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "Processing Bytes: " << utility::to_hex(reinterpret_cast(m_buf)+p,bytes_transferred-p); m_alog.write(log::alevel::devel,s.str()); } p += m_processor->consume( reinterpret_cast(m_buf)+p, bytes_transferred-p, consume_ec ); if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "bytes left after consume: " << bytes_transferred-p; m_alog.write(log::alevel::devel,s.str()); } if (consume_ec) { log_err(log::elevel::rerror, "consume", consume_ec); if (config::drop_on_protocol_error) { this->terminate(consume_ec); return; } else { lib::error_code close_ec; this->close( processor::error::to_ws(consume_ec), consume_ec.message(), close_ec ); if (close_ec) { log_err(log::elevel::fatal, "Protocol error close frame ", close_ec); this->terminate(close_ec); return; } } return; } if (m_processor->ready()) { if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "Complete message received. Dispatching"; m_alog.write(log::alevel::devel,s.str()); } message_ptr msg = m_processor->get_message(); if (!msg) { m_alog.write(log::alevel::devel, "null message from m_processor"); } else if (!is_control(msg->get_opcode())) { // data message, dispatch to user if (m_state != session::state::open) { m_elog.write(log::elevel::warn, "got non-close frame while closing"); } else if (m_message_handler) { m_message_handler(m_connection_hdl, msg); } } else { process_control_frame(msg); } } } read_frame(); } /// Issue a new transport read unless reading is paused. template void connection::read_frame() { if (!m_read_flag) { return; } transport_con_type::async_read_at_least( // std::min wont work with undefined static const values. // TODO: is there a more elegant way to do this? // Need to determine if requesting 1 byte or the exact number of bytes // is better here. 1 byte lets us be a bit more responsive at a // potential expense of additional runs through handle_read_frame /*(m_processor->get_bytes_needed() > config::connection_read_buffer_size ? config::connection_read_buffer_size : m_processor->get_bytes_needed())*/ 1, m_buf, config::connection_read_buffer_size, m_handle_read_frame ); } template lib::error_code connection::initialize_processor() { m_alog.write(log::alevel::devel,"initialize_processor"); // if it isn't a websocket handshake nothing to do. if (!processor::is_websocket_handshake(m_request)) { return lib::error_code(); } int version = processor::get_websocket_version(m_request); if (version < 0) { m_alog.write(log::alevel::devel, "BAD REQUEST: can't determine version"); m_response.set_status(http::status_code::bad_request); return error::make_error_code(error::invalid_version); } m_processor = get_processor(version); // if the processor is not null we are done if (m_processor) { return lib::error_code(); } // We don't have a processor for this version. Return bad request // with Sec-WebSocket-Version header filled with values we do accept m_alog.write(log::alevel::devel, "BAD REQUEST: no processor for version"); m_response.set_status(http::status_code::bad_request); std::stringstream ss; std::string sep; std::vector::const_iterator it; for (it = versions_supported.begin(); it != versions_supported.end(); it++) { ss << sep << *it; sep = ","; } m_response.replace_header("Sec-WebSocket-Version",ss.str()); return error::make_error_code(error::unsupported_version); } template lib::error_code connection::process_handshake_request() { m_alog.write(log::alevel::devel,"process handshake request"); if (!processor::is_websocket_handshake(m_request)) { // this is not a websocket handshake. Process as plain HTTP m_alog.write(log::alevel::devel,"HTTP REQUEST"); // extract URI from request m_uri = processor::get_uri_from_host( m_request, (transport_con_type::is_secure() ? "https" : "http") ); if (!m_uri->get_valid()) { m_alog.write(log::alevel::devel, "Bad request: failed to parse uri"); m_response.set_status(http::status_code::bad_request); return error::make_error_code(error::invalid_uri); } if (m_http_handler) { m_is_http = true; m_http_handler(m_connection_hdl); if (m_state == session::state::closed) { return error::make_error_code(error::http_connection_ended); } } else { set_status(http::status_code::upgrade_required); return error::make_error_code(error::upgrade_required); } return lib::error_code(); } lib::error_code ec = m_processor->validate_handshake(m_request); // Validate: make sure all required elements are present. if (ec){ // Not a valid handshake request m_alog.write(log::alevel::devel, "Bad request " + ec.message()); m_response.set_status(http::status_code::bad_request); return ec; } // Read extension parameters and set up values necessary for the end user // to complete extension negotiation. std::pair neg_results; neg_results = m_processor->negotiate_extensions(m_request); if (neg_results.first) { // There was a fatal error in extension parsing that should result in // a failed connection attempt. m_alog.write(log::alevel::devel, "Bad request: " + neg_results.first.message()); m_response.set_status(http::status_code::bad_request); return neg_results.first; } else { // extension negotiation succeeded, set response header accordingly // we don't send an empty extensions header because it breaks many // clients. if (neg_results.second.size() > 0) { m_response.replace_header("Sec-WebSocket-Extensions", neg_results.second); } } // extract URI from request m_uri = m_processor->get_uri(m_request); if (!m_uri->get_valid()) { m_alog.write(log::alevel::devel, "Bad request: failed to parse uri"); m_response.set_status(http::status_code::bad_request); return error::make_error_code(error::invalid_uri); } // extract subprotocols lib::error_code subp_ec = m_processor->extract_subprotocols(m_request, m_requested_subprotocols); if (subp_ec) { // should we do anything? } // Ask application to validate the connection if (!m_validate_handler || m_validate_handler(m_connection_hdl)) { m_response.set_status(http::status_code::switching_protocols); // Write the appropriate response headers based on request and // processor version ec = m_processor->process_handshake(m_request,m_subprotocol,m_response); if (ec) { std::stringstream s; s << "Processing error: " << ec << "(" << ec.message() << ")"; m_alog.write(log::alevel::devel, s.str()); m_response.set_status(http::status_code::internal_server_error); return ec; } } else { // User application has rejected the handshake m_alog.write(log::alevel::devel, "USER REJECT"); // Use Bad Request if the user handler did not provide a more // specific http response error code. // TODO: is there a better default? if (m_response.get_status_code() == http::status_code::uninitialized) { m_response.set_status(http::status_code::bad_request); } return error::make_error_code(error::rejected); } return lib::error_code(); } template void connection::write_http_response(lib::error_code const & ec) { m_alog.write(log::alevel::devel,"connection write_http_response"); if (ec == error::make_error_code(error::http_connection_ended)) { m_alog.write(log::alevel::http,"An HTTP handler took over the connection."); return; } if (m_response.get_status_code() == http::status_code::uninitialized) { m_response.set_status(http::status_code::internal_server_error); m_ec = error::make_error_code(error::general); } else { m_ec = ec; } m_response.set_version("HTTP/1.1"); // Set server header based on the user agent settings if (m_response.get_header("Server").empty()) { if (!m_user_agent.empty()) { m_response.replace_header("Server",m_user_agent); } else { m_response.remove_header("Server"); } } // have the processor generate the raw bytes for the wire (if it exists) if (m_processor) { m_handshake_buffer = m_processor->get_raw(m_response); } else { // a processor wont exist for raw HTTP responses. m_handshake_buffer = m_response.raw(); } if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"Raw Handshake response:\n"+m_handshake_buffer); if (!m_response.get_header("Sec-WebSocket-Key3").empty()) { m_alog.write(log::alevel::devel, utility::to_hex(m_response.get_header("Sec-WebSocket-Key3"))); } } // write raw bytes transport_con_type::async_write( m_handshake_buffer.data(), m_handshake_buffer.size(), lib::bind( &type::handle_write_http_response, type::get_shared(), lib::placeholders::_1 ) ); } template void connection::handle_write_http_response(lib::error_code const & ec) { m_alog.write(log::alevel::devel,"handle_write_http_response"); lib::error_code ecm = ec; if (!ecm) { scoped_lock_type lock(m_connection_state_lock); if (m_state == session::state::connecting) { if (m_internal_state != istate::PROCESS_HTTP_REQUEST) { ecm = error::make_error_code(error::invalid_state); } } else if (m_state == session::state::closed) { // The connection was canceled while the response was being sent, // usually by the handshake timer. This is basically expected // (though hopefully rare) and there is nothing we can do so ignore. m_alog.write(log::alevel::devel, "handle_write_http_response invoked after connection was closed"); return; } else { ecm = error::make_error_code(error::invalid_state); } } if (ecm) { if (ecm == transport::error::eof && m_state == session::state::closed) { // we expect to get eof if the connection is closed already m_alog.write(log::alevel::devel, "got (expected) eof/state error from closed con"); return; } log_err(log::elevel::rerror,"handle_write_http_response",ecm); this->terminate(ecm); return; } if (m_handshake_timer) { m_handshake_timer->cancel(); m_handshake_timer.reset(); } if (m_response.get_status_code() != http::status_code::switching_protocols) { /*if (m_processor || m_ec == error::http_parse_error || m_ec == error::invalid_version || m_ec == error::unsupported_version || m_ec == error::upgrade_required) {*/ if (!m_is_http) { std::stringstream s; s << "Handshake ended with HTTP error: " << m_response.get_status_code(); m_elog.write(log::elevel::rerror,s.str()); } else { // if this was not a websocket connection, we have written // the expected response and the connection can be closed. this->log_http_result(); if (m_ec) { m_alog.write(log::alevel::devel, "got to writing HTTP results with m_ec set: "+m_ec.message()); } m_ec = make_error_code(error::http_connection_ended); } this->terminate(m_ec); return; } this->log_open_result(); m_internal_state = istate::PROCESS_CONNECTION; m_state = session::state::open; if (m_open_handler) { m_open_handler(m_connection_hdl); } this->handle_read_frame(lib::error_code(), m_buf_cursor); } template void connection::send_http_request() { m_alog.write(log::alevel::devel,"connection send_http_request"); // TODO: origin header? // Have the protocol processor fill in the appropriate fields based on the // selected client version if (m_processor) { lib::error_code ec; ec = m_processor->client_handshake_request(m_request,m_uri, m_requested_subprotocols); if (ec) { log_err(log::elevel::fatal,"Internal library error: Processor",ec); return; } } else { m_elog.write(log::elevel::fatal,"Internal library error: missing processor"); return; } // Unless the user has overridden the user agent, send generic WS++ UA. if (m_request.get_header("User-Agent").empty()) { if (!m_user_agent.empty()) { m_request.replace_header("User-Agent",m_user_agent); } else { m_request.remove_header("User-Agent"); } } m_handshake_buffer = m_request.raw(); if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"Raw Handshake request:\n"+m_handshake_buffer); } if (m_open_handshake_timeout_dur > 0) { m_handshake_timer = transport_con_type::set_timer( m_open_handshake_timeout_dur, lib::bind( &type::handle_open_handshake_timeout, type::get_shared(), lib::placeholders::_1 ) ); } transport_con_type::async_write( m_handshake_buffer.data(), m_handshake_buffer.size(), lib::bind( &type::handle_send_http_request, type::get_shared(), lib::placeholders::_1 ) ); } template void connection::handle_send_http_request(lib::error_code const & ec) { m_alog.write(log::alevel::devel,"handle_send_http_request"); lib::error_code ecm = ec; if (!ecm) { scoped_lock_type lock(m_connection_state_lock); if (m_state == session::state::connecting) { if (m_internal_state != istate::WRITE_HTTP_REQUEST) { ecm = error::make_error_code(error::invalid_state); } else { m_internal_state = istate::READ_HTTP_RESPONSE; } } else if (m_state == session::state::closed) { // The connection was canceled while the response was being sent, // usually by the handshake timer. This is basically expected // (though hopefully rare) and there is nothing we can do so ignore. m_alog.write(log::alevel::devel, "handle_send_http_request invoked after connection was closed"); return; } else { ecm = error::make_error_code(error::invalid_state); } } if (ecm) { if (ecm == transport::error::eof && m_state == session::state::closed) { // we expect to get eof if the connection is closed already m_alog.write(log::alevel::devel, "got (expected) eof/state error from closed con"); return; } log_err(log::elevel::rerror,"handle_send_http_request",ecm); this->terminate(ecm); return; } transport_con_type::async_read_at_least( 1, m_buf, config::connection_read_buffer_size, lib::bind( &type::handle_read_http_response, type::get_shared(), lib::placeholders::_1, lib::placeholders::_2 ) ); } template void connection::handle_read_http_response(lib::error_code const & ec, size_t bytes_transferred) { m_alog.write(log::alevel::devel,"handle_read_http_response"); lib::error_code ecm = ec; if (!ecm) { scoped_lock_type lock(m_connection_state_lock); if (m_state == session::state::connecting) { if (m_internal_state != istate::READ_HTTP_RESPONSE) { ecm = error::make_error_code(error::invalid_state); } } else if (m_state == session::state::closed) { // The connection was canceled while the response was being sent, // usually by the handshake timer. This is basically expected // (though hopefully rare) and there is nothing we can do so ignore. m_alog.write(log::alevel::devel, "handle_read_http_response invoked after connection was closed"); return; } else { ecm = error::make_error_code(error::invalid_state); } } if (ecm) { if (ecm == transport::error::eof && m_state == session::state::closed) { // we expect to get eof if the connection is closed already m_alog.write(log::alevel::devel, "got (expected) eof/state error from closed con"); return; } log_err(log::elevel::rerror,"handle_read_http_response",ecm); this->terminate(ecm); return; } size_t bytes_processed = 0; // TODO: refactor this to use error codes rather than exceptions try { bytes_processed = m_response.consume(m_buf,bytes_transferred); } catch (http::exception & e) { m_elog.write(log::elevel::rerror, std::string("error in handle_read_http_response: ")+e.what()); this->terminate(make_error_code(error::general)); return; } m_alog.write(log::alevel::devel,std::string("Raw response: ")+m_response.raw()); if (m_response.headers_ready()) { if (m_handshake_timer) { m_handshake_timer->cancel(); m_handshake_timer.reset(); } lib::error_code validate_ec = m_processor->validate_server_handshake_response( m_request, m_response ); if (validate_ec) { log_err(log::elevel::rerror,"Server handshake response",validate_ec); this->terminate(validate_ec); return; } // Read extension parameters and set up values necessary for the end // user to complete extension negotiation. std::pair neg_results; neg_results = m_processor->negotiate_extensions(m_response); if (neg_results.first) { // There was a fatal error in extension negotiation. For the moment // kill all connections that fail extension negotiation. // TODO: deal with cases where the response is well formed but // doesn't match the options requested by the client. Its possible // that the best behavior in this cases is to log and continue with // an unextended connection. m_alog.write(log::alevel::devel, "Extension negotiation failed: " + neg_results.first.message()); this->terminate(make_error_code(error::extension_neg_failed)); // TODO: close connection with reason 1010 (and list extensions) } // response is valid, connection can now be assumed to be open m_internal_state = istate::PROCESS_CONNECTION; m_state = session::state::open; this->log_open_result(); if (m_open_handler) { m_open_handler(m_connection_hdl); } // The remaining bytes in m_buf are frame data. Copy them to the // beginning of the buffer and note the length. They will be read after // the handshake completes and before more bytes are read. std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf); m_buf_cursor = bytes_transferred-bytes_processed; this->handle_read_frame(lib::error_code(), m_buf_cursor); } else { transport_con_type::async_read_at_least( 1, m_buf, config::connection_read_buffer_size, lib::bind( &type::handle_read_http_response, type::get_shared(), lib::placeholders::_1, lib::placeholders::_2 ) ); } } template void connection::handle_open_handshake_timeout( lib::error_code const & ec) { if (ec == transport::error::operation_aborted) { m_alog.write(log::alevel::devel,"open handshake timer cancelled"); } else if (ec) { m_alog.write(log::alevel::devel, "open handle_open_handshake_timeout error: "+ec.message()); // TODO: ignore or fail here? } else { m_alog.write(log::alevel::devel,"open handshake timer expired"); terminate(make_error_code(error::open_handshake_timeout)); } } template void connection::handle_close_handshake_timeout( lib::error_code const & ec) { if (ec == transport::error::operation_aborted) { m_alog.write(log::alevel::devel,"asio close handshake timer cancelled"); } else if (ec) { m_alog.write(log::alevel::devel, "asio open handle_close_handshake_timeout error: "+ec.message()); // TODO: ignore or fail here? } else { m_alog.write(log::alevel::devel, "asio close handshake timer expired"); terminate(make_error_code(error::close_handshake_timeout)); } } template void connection::terminate(lib::error_code const & ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection terminate"); } // Cancel close handshake timer if (m_handshake_timer) { m_handshake_timer->cancel(); m_handshake_timer.reset(); } terminate_status tstat = unknown; if (ec) { m_ec = ec; m_local_close_code = close::status::abnormal_close; m_local_close_reason = ec.message(); } // TODO: does any of this need a mutex? if (m_is_http) { m_http_state = session::http_state::closed; } if (m_state == session::state::connecting) { m_state = session::state::closed; tstat = failed; // Log fail result here before socket is shut down and we can't get // the remote address, etc anymore if (m_ec != error::http_connection_ended) { log_fail_result(); } } else if (m_state != session::state::closed) { m_state = session::state::closed; tstat = closed; } else { m_alog.write(log::alevel::devel, "terminate called on connection that was already terminated"); return; } // TODO: choose between shutdown and close based on error code sent transport_con_type::async_shutdown( lib::bind( &type::handle_terminate, type::get_shared(), tstat, lib::placeholders::_1 ) ); } template void connection::handle_terminate(terminate_status tstat, lib::error_code const & ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection handle_terminate"); } if (ec) { // there was an error actually shutting down the connection log_err(log::elevel::devel,"handle_terminate",ec); } // clean shutdown if (tstat == failed) { if (m_ec != error::http_connection_ended) { if (m_fail_handler) { m_fail_handler(m_connection_hdl); } } } else if (tstat == closed) { if (m_close_handler) { m_close_handler(m_connection_hdl); } log_close_result(); } else { m_elog.write(log::elevel::rerror,"Unknown terminate_status"); } // call the termination handler if it exists // if it exists it might (but shouldn't) refer to a bad memory location. // If it does, we don't care and should catch and ignore it. if (m_termination_handler) { try { m_termination_handler(type::get_shared()); } catch (std::exception const & e) { m_elog.write(log::elevel::warn, std::string("termination_handler call failed. Reason was: ")+e.what()); } } } template void connection::write_frame() { //m_alog.write(log::alevel::devel,"connection write_frame"); { scoped_lock_type lock(m_write_lock); // Check the write flag. If true, there is an outstanding transport // write already. In this case we just return. The write handler will // start a new write if the write queue isn't empty. If false, we set // the write flag and proceed to initiate a transport write. if (m_write_flag) { return; } // pull off all the messages that are ready to write. // stop if we get a message marked terminal message_ptr next_message = write_pop(); while (next_message) { m_current_msgs.push_back(next_message); if (!next_message->get_terminal()) { next_message = write_pop(); } else { next_message = message_ptr(); } } if (m_current_msgs.empty()) { // there was nothing to send return; } else { // At this point we own the next messages to be sent and are // responsible for holding the write flag until they are // successfully sent or there is some error m_write_flag = true; } } typename std::vector::iterator it; for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) { std::string const & header = (*it)->get_header(); std::string const & payload = (*it)->get_payload(); m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); } // Print detailed send stats if those log levels are enabled if (m_alog.static_test(log::alevel::frame_header)) { if (m_alog.dynamic_test(log::alevel::frame_header)) { std::stringstream general,header,payload; general << "Dispatching write containing " << m_current_msgs.size() <<" message(s) containing "; header << "Header Bytes: \n"; payload << "Payload Bytes: \n"; size_t hbytes = 0; size_t pbytes = 0; for (size_t i = 0; i < m_current_msgs.size(); i++) { hbytes += m_current_msgs[i]->get_header().size(); pbytes += m_current_msgs[i]->get_payload().size(); header << "[" << i << "] (" << m_current_msgs[i]->get_header().size() << ") " << utility::to_hex(m_current_msgs[i]->get_header()) << "\n"; if (m_alog.static_test(log::alevel::frame_payload)) { if (m_alog.dynamic_test(log::alevel::frame_payload)) { payload << "[" << i << "] (" << m_current_msgs[i]->get_payload().size() << ") ["<get_opcode()<<"] " << (m_current_msgs[i]->get_opcode() == frame::opcode::text ? m_current_msgs[i]->get_payload() : utility::to_hex(m_current_msgs[i]->get_payload()) ) << "\n"; } } } general << hbytes << " header bytes and " << pbytes << " payload bytes"; m_alog.write(log::alevel::frame_header,general.str()); m_alog.write(log::alevel::frame_header,header.str()); m_alog.write(log::alevel::frame_payload,payload.str()); } } transport_con_type::async_write( m_send_buffer, m_write_frame_handler ); } template void connection::handle_write_frame(lib::error_code const & ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"connection handle_write_frame"); } bool terminal = m_current_msgs.back()->get_terminal(); m_send_buffer.clear(); m_current_msgs.clear(); // TODO: recycle instead of deleting if (ec) { log_err(log::elevel::fatal,"handle_write_frame",ec); this->terminate(ec); return; } if (terminal) { this->terminate(lib::error_code()); return; } bool needs_writing = false; { scoped_lock_type lock(m_write_lock); // release write flag m_write_flag = false; needs_writing = !m_send_queue.empty(); } if (needs_writing) { transport_con_type::dispatch(lib::bind( &type::write_frame, type::get_shared() )); } } template std::vector const & connection::get_supported_versions() const { return versions_supported; } template void connection::process_control_frame(typename config::message_type::ptr msg) { m_alog.write(log::alevel::devel,"process_control_frame"); frame::opcode::value op = msg->get_opcode(); lib::error_code ec; std::stringstream s; s << "Control frame received with opcode " << op; m_alog.write(log::alevel::control,s.str()); if (m_state == session::state::closed) { m_elog.write(log::elevel::warn,"got frame in state closed"); return; } if (op != frame::opcode::CLOSE && m_state != session::state::open) { m_elog.write(log::elevel::warn,"got non-close frame in state closing"); return; } if (op == frame::opcode::PING) { bool should_reply = true; if (m_ping_handler) { should_reply = m_ping_handler(m_connection_hdl, msg->get_payload()); } if (should_reply) { this->pong(msg->get_payload(),ec); if (ec) { log_err(log::elevel::devel,"Failed to send response pong",ec); } } } else if (op == frame::opcode::PONG) { if (m_pong_handler) { m_pong_handler(m_connection_hdl, msg->get_payload()); } if (m_ping_timer) { m_ping_timer->cancel(); } } else if (op == frame::opcode::CLOSE) { m_alog.write(log::alevel::devel,"got close frame"); // record close code and reason somewhere m_remote_close_code = close::extract_code(msg->get_payload(),ec); if (ec) { s.str(""); if (config::drop_on_protocol_error) { s << "Received invalid close code " << m_remote_close_code << " dropping connection per config."; m_elog.write(log::elevel::devel,s.str()); this->terminate(ec); } else { s << "Received invalid close code " << m_remote_close_code << " sending acknowledgement and closing"; m_elog.write(log::elevel::devel,s.str()); ec = send_close_ack(close::status::protocol_error, "Invalid close code"); if (ec) { log_err(log::elevel::devel,"send_close_ack",ec); } } return; } m_remote_close_reason = close::extract_reason(msg->get_payload(),ec); if (ec) { if (config::drop_on_protocol_error) { m_elog.write(log::elevel::devel, "Received invalid close reason. Dropping connection per config"); this->terminate(ec); } else { m_elog.write(log::elevel::devel, "Received invalid close reason. Sending acknowledgement and closing"); ec = send_close_ack(close::status::protocol_error, "Invalid close reason"); if (ec) { log_err(log::elevel::devel,"send_close_ack",ec); } } return; } if (m_state == session::state::open) { s.str(""); s << "Received close frame with code " << m_remote_close_code << " and reason " << m_remote_close_reason; m_alog.write(log::alevel::devel,s.str()); ec = send_close_ack(); if (ec) { log_err(log::elevel::devel,"send_close_ack",ec); } } else if (m_state == session::state::closing && !m_was_clean) { // ack of our close m_alog.write(log::alevel::devel, "Got acknowledgement of close"); m_was_clean = true; // If we are a server terminate the connection now. Clients should // leave the connection open to give the server an opportunity to // initiate the TCP close. The client's timer will handle closing // its side of the connection if the server misbehaves. // // TODO: different behavior if the underlying transport doesn't // support timers? if (m_is_server) { terminate(lib::error_code()); } } else { // spurious, ignore m_elog.write(log::elevel::devel, "Got close frame in wrong state"); } } else { // got an invalid control opcode m_elog.write(log::elevel::devel, "Got control frame with invalid opcode"); // initiate protocol error shutdown } } template lib::error_code connection::send_close_ack(close::status::value code, std::string const & reason) { return send_close_frame(code,reason,true,m_is_server); } template lib::error_code connection::send_close_frame(close::status::value code, std::string const & reason, bool ack, bool terminal) { m_alog.write(log::alevel::devel,"send_close_frame"); // check for special codes // If silent close is set, respect it and blank out close information // Otherwise use whatever has been specified in the parameters. If // parameters specifies close::status::blank then determine what to do // based on whether or not this is an ack. If it is not an ack just // send blank info. If it is an ack then echo the close information from // the remote endpoint. if (config::silent_close) { m_alog.write(log::alevel::devel,"closing silently"); m_local_close_code = close::status::no_status; m_local_close_reason.clear(); } else if (code != close::status::blank) { m_alog.write(log::alevel::devel,"closing with specified codes"); m_local_close_code = code; m_local_close_reason = reason; } else if (!ack) { m_alog.write(log::alevel::devel,"closing with no status code"); m_local_close_code = close::status::no_status; m_local_close_reason.clear(); } else if (m_remote_close_code == close::status::no_status) { m_alog.write(log::alevel::devel, "acknowledging a no-status close with normal code"); m_local_close_code = close::status::normal; m_local_close_reason.clear(); } else { m_alog.write(log::alevel::devel,"acknowledging with remote codes"); m_local_close_code = m_remote_close_code; m_local_close_reason = m_remote_close_reason; } std::stringstream s; s << "Closing with code: " << m_local_close_code << ", and reason: " << m_local_close_reason; m_alog.write(log::alevel::devel,s.str()); message_ptr msg = m_msg_manager->get_message(); if (!msg) { return error::make_error_code(error::no_outgoing_buffers); } lib::error_code ec = m_processor->prepare_close(m_local_close_code, m_local_close_reason,msg); if (ec) { return ec; } // Messages flagged terminal will result in the TCP connection being dropped // after the message has been written. This is typically used when servers // send an ack and when any endpoint encounters a protocol error if (terminal) { msg->set_terminal(true); } m_state = session::state::closing; if (ack) { m_was_clean = true; } // Start a timer so we don't wait forever for the acknowledgement close // frame if (m_close_handshake_timeout_dur > 0) { m_handshake_timer = transport_con_type::set_timer( m_close_handshake_timeout_dur, lib::bind( &type::handle_close_handshake_timeout, type::get_shared(), lib::placeholders::_1 ) ); } bool needs_writing = false; { scoped_lock_type lock(m_write_lock); write_push(msg); needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { transport_con_type::dispatch(lib::bind( &type::write_frame, type::get_shared() )); } return lib::error_code(); } template typename connection::processor_ptr connection::get_processor(int version) const { // TODO: allow disabling certain versions processor_ptr p; switch (version) { case 0: p = lib::make_shared >( transport_con_type::is_secure(), m_is_server, m_msg_manager ); break; case 7: p = lib::make_shared >( transport_con_type::is_secure(), m_is_server, m_msg_manager, lib::ref(m_rng) ); break; case 8: p = lib::make_shared >( transport_con_type::is_secure(), m_is_server, m_msg_manager, lib::ref(m_rng) ); break; case 13: p = lib::make_shared >( transport_con_type::is_secure(), m_is_server, m_msg_manager, lib::ref(m_rng) ); break; default: return p; } // Settings not configured by the constructor p->set_max_message_size(m_max_message_size); return p; } template void connection::write_push(typename config::message_type::ptr msg) { if (!msg) { return; } m_send_buffer_size += msg->get_payload().size(); m_send_queue.push(msg); if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "write_push: message count: " << m_send_queue.size() << " buffer size: " << m_send_buffer_size; m_alog.write(log::alevel::devel,s.str()); } } template typename config::message_type::ptr connection::write_pop() { message_ptr msg; if (m_send_queue.empty()) { return msg; } msg = m_send_queue.front(); m_send_buffer_size -= msg->get_payload().size(); m_send_queue.pop(); if (m_alog.static_test(log::alevel::devel)) { std::stringstream s; s << "write_pop: message count: " << m_send_queue.size() << " buffer size: " << m_send_buffer_size; m_alog.write(log::alevel::devel,s.str()); } return msg; } template void connection::log_open_result() { std::stringstream s; int version; if (!processor::is_websocket_handshake(m_request)) { version = -1; } else { version = processor::get_websocket_version(m_request); } // Connection Type s << (version == -1 ? "HTTP" : "WebSocket") << " Connection "; // Remote endpoint address s << transport_con_type::get_remote_endpoint() << " "; // Version string if WebSocket if (version != -1) { s << "v" << version << " "; } // User Agent std::string ua = m_request.get_header("User-Agent"); if (ua.empty()) { s << "\"\" "; } else { // check if there are any quotes in the user agent s << "\"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; } // URI s << (m_uri ? m_uri->get_resource() : "NULL") << " "; // Status code s << m_response.get_status_code(); m_alog.write(log::alevel::connect,s.str()); } template void connection::log_close_result() { std::stringstream s; s << "Disconnect " << "close local:[" << m_local_close_code << (m_local_close_reason.empty() ? "" : ","+m_local_close_reason) << "] remote:[" << m_remote_close_code << (m_remote_close_reason.empty() ? "" : ","+m_remote_close_reason) << "]"; m_alog.write(log::alevel::disconnect,s.str()); } template void connection::log_fail_result() { std::stringstream s; int version = processor::get_websocket_version(m_request); // Connection Type s << "WebSocket Connection "; // Remote endpoint address & WebSocket version s << transport_con_type::get_remote_endpoint(); if (version < 0) { s << " -"; } else { s << " v" << version; } // User Agent std::string ua = m_request.get_header("User-Agent"); if (ua.empty()) { s << " \"\" "; } else { // check if there are any quotes in the user agent s << " \"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; } // URI s << (m_uri ? m_uri->get_resource() : "-"); // HTTP Status code s << " " << m_response.get_status_code(); // WebSocket++ error code & reason s << " " << m_ec << " " << m_ec.message(); m_alog.write(log::alevel::fail,s.str()); } template void connection::log_http_result() { std::stringstream s; if (processor::is_websocket_handshake(m_request)) { m_alog.write(log::alevel::devel,"Call to log_http_result for WebSocket"); return; } // Connection Type s << (m_request.get_header("host").empty() ? "-" : m_request.get_header("host")) << " " << transport_con_type::get_remote_endpoint() << " \"" << m_request.get_method() << " " << (m_uri ? m_uri->get_resource() : "-") << " " << m_request.get_version() << "\" " << m_response.get_status_code() << " " << m_response.get_body().size(); // User Agent std::string ua = m_request.get_header("User-Agent"); if (ua.empty()) { s << " \"\" "; } else { // check if there are any quotes in the user agent s << " \"" << utility::string_replace_all(ua,"\"","\\\"") << "\" "; } m_alog.write(log::alevel::http,s.str()); } } // namespace websocketpp #endif // WEBSOCKETPP_CONNECTION_IMPL_HPP