diff options
author | Dan Klein <danklei@cisco.com> | 2015-08-24 17:28:17 +0300 |
---|---|---|
committer | Dan Klein <danklei@cisco.com> | 2015-08-24 17:28:17 +0300 |
commit | 7d3be8c612e295820649779335288c197b80ccb2 (patch) | |
tree | 78e9636bc8780dedc919c30378a621f425e1cbfc /external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp | |
parent | dab741a80699f86e86c91718872a052cca9bbb25 (diff) |
Changes location of console and fixed dependencies
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp')
-rw-r--r-- | external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp | 289 |
1 files changed, 0 insertions, 289 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp deleted file mode 100644 index 3a930349..00000000 --- a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/req.cpp +++ /dev/null @@ -1,289 +0,0 @@ -/* - Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "req.hpp" -#include "err.hpp" -#include "msg.hpp" -#include "wire.hpp" -#include "random.hpp" -#include "likely.hpp" - -zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - dealer_t (parent_, tid_, sid_), - receiving_reply (false), - message_begins (true), - reply_pipe (NULL), - request_id_frames_enabled (false), - request_id (generate_random()), - strict (true) -{ - options.type = ZMQ_REQ; -} - -zmq::req_t::~req_t () -{ -} - -int zmq::req_t::xsend (msg_t *msg_) -{ - // If we've sent a request and we still haven't got the reply, - // we can't send another request unless the strict option is disabled. - if (receiving_reply) { - if (strict) { - errno = EFSM; - return -1; - } - - if (reply_pipe) - reply_pipe->terminate (false); - receiving_reply = false; - message_begins = true; - } - - // First part of the request is the request identity. - if (message_begins) { - reply_pipe = NULL; - - if (request_id_frames_enabled) { - request_id++; - - msg_t id; - int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL); - errno_assert (rc == 0); - id.set_flags (msg_t::more); - - rc = dealer_t::sendpipe (&id, &reply_pipe); - if (rc != 0) - return -1; - } - - msg_t bottom; - int rc = bottom.init (); - errno_assert (rc == 0); - bottom.set_flags (msg_t::more); - - rc = dealer_t::sendpipe (&bottom, &reply_pipe); - if (rc != 0) - return -1; - zmq_assert (reply_pipe); - - message_begins = false; - - // Eat all currently avaliable messages before the request is fully - // sent. This is done to avoid: - // REQ sends request to A, A replies, B replies too. - // A's reply was first and matches, that is used. - // An hour later REQ sends a request to B. B's old reply is used. - msg_t drop; - while (true) { - rc = drop.init (); - errno_assert (rc == 0); - rc = dealer_t::xrecv (&drop); - if (rc != 0) - break; - drop.close (); - } - } - - bool more = msg_->flags () & msg_t::more ? true : false; - - int rc = dealer_t::xsend (msg_); - if (rc != 0) - return rc; - - // If the request was fully sent, flip the FSM into reply-receiving state. - if (!more) { - receiving_reply = true; - message_begins = true; - } - - return 0; -} - -int zmq::req_t::xrecv (msg_t *msg_) -{ - // If request wasn't send, we can't wait for reply. - if (!receiving_reply) { - errno = EFSM; - return -1; - } - - // Skip messages until one with the right first frames is found. - while (message_begins) { - // If enabled, the first frame must have the correct request_id. - if (request_id_frames_enabled) { - int rc = recv_reply_pipe (msg_); - if (rc != 0) - return rc; - - if (unlikely (!(msg_->flags () & msg_t::more) || - msg_->size () != sizeof (request_id) || - *static_cast<uint32_t *> (msg_->data ()) != request_id)) { - // Skip the remaining frames and try the next message - while (msg_->flags () & msg_t::more) { - rc = recv_reply_pipe (msg_); - errno_assert (rc == 0); - } - continue; - } - } - - // The next frame must be 0. - // TODO: Failing this check should also close the connection with the peer! - int rc = recv_reply_pipe (msg_); - if (rc != 0) - return rc; - - if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { - // Skip the remaining frames and try the next message - while (msg_->flags () & msg_t::more) { - rc = recv_reply_pipe (msg_); - errno_assert (rc == 0); - } - continue; - } - - message_begins = false; - } - - int rc = recv_reply_pipe (msg_); - if (rc != 0) - return rc; - - // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & msg_t::more)) { - receiving_reply = false; - message_begins = true; - } - - return 0; -} - -bool zmq::req_t::xhas_in () -{ - // TODO: Duplicates should be removed here. - - if (!receiving_reply) - return false; - - return dealer_t::xhas_in (); -} - -bool zmq::req_t::xhas_out () -{ - if (receiving_reply) - return false; - - return dealer_t::xhas_out (); -} - -int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) -{ - bool is_int = (optvallen_ == sizeof (int)); - int value = is_int? *((int *) optval_): 0; - switch (option_) { - case ZMQ_REQ_CORRELATE: - if (is_int && value >= 0) { - request_id_frames_enabled = (value != 0); - return 0; - } - break; - - case ZMQ_REQ_RELAXED: - if (is_int && value >= 0) { - strict = (value == 0); - return 0; - } - break; - - default: - break; - } - - return dealer_t::xsetsockopt (option_, optval_, optvallen_); -} - -void zmq::req_t::xpipe_terminated (pipe_t *pipe_) -{ - if (reply_pipe == pipe_) - reply_pipe = NULL; - dealer_t::xpipe_terminated (pipe_); -} - -int zmq::req_t::recv_reply_pipe (msg_t *msg_) -{ - while (true) { - pipe_t *pipe = NULL; - int rc = dealer_t::recvpipe (msg_, &pipe); - if (rc != 0) - return rc; - if (!reply_pipe || pipe == reply_pipe) - return 0; - } -} - -zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, - socket_base_t *socket_, const options_t &options_, - address_t *addr_) : - session_base_t (io_thread_, connect_, socket_, options_, addr_), - state (bottom) -{ -} - -zmq::req_session_t::~req_session_t () -{ -} - -int zmq::req_session_t::push_msg (msg_t *msg_) -{ - switch (state) { - case bottom: - if (msg_->flags () == msg_t::more && msg_->size () == 0) { - state = body; - return session_base_t::push_msg (msg_); - } - break; - case body: - if (msg_->flags () == msg_t::more) - return session_base_t::push_msg (msg_); - if (msg_->flags () == 0) { - state = bottom; - return session_base_t::push_msg (msg_); - } - break; - } - errno = EFAULT; - return -1; -} - -void zmq::req_session_t::reset () -{ - session_base_t::reset (); - state = bottom; -} |