summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp')
-rw-r--r--external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp479
1 files changed, 0 insertions, 479 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp
deleted file mode 100644
index fc46ae19..00000000
--- a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/router.cpp
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
-
- This file is part of libzmq, the ZeroMQ core engine in C++.
-
- libzmq is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License (LGPL) as published
- by the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- As a special exception, the Contributors give you permission to link
- this library with independent modules to produce an executable,
- regardless of the license terms of these independent modules, and to
- copy and distribute the resulting executable under terms of your choice,
- provided that you also meet, for each linked independent module, the
- terms and conditions of the license of that module. An independent
- module is a module which is not derived from or based on this library.
- If you modify this library, you must extend this exception to your
- version of the library.
-
- libzmq is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
- License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "router.hpp"
-#include "pipe.hpp"
-#include "wire.hpp"
-#include "random.hpp"
-#include "likely.hpp"
-#include "err.hpp"
-
-zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_),
- prefetched (false),
- identity_sent (false),
- more_in (false),
- current_out (NULL),
- more_out (false),
- next_rid (generate_random ()),
- mandatory (false),
- // raw_sock functionality in ROUTER is deprecated
- raw_sock (false),
- probe_router (false),
- handover (false)
-{
- options.type = ZMQ_ROUTER;
- options.recv_identity = true;
- options.raw_sock = false;
-
- prefetched_id.init ();
- prefetched_msg.init ();
-}
-
-zmq::router_t::~router_t ()
-{
- zmq_assert (anonymous_pipes.empty ());;
- zmq_assert (outpipes.empty ());
- prefetched_id.close ();
- prefetched_msg.close ();
-}
-
-void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
-{
- // subscribe_to_all_ is unused
- (void)subscribe_to_all_;
-
- zmq_assert (pipe_);
-
- if (probe_router) {
- msg_t probe_msg_;
- int rc = probe_msg_.init ();
- errno_assert (rc == 0);
-
- rc = pipe_->write (&probe_msg_);
- // zmq_assert (rc) is not applicable here, since it is not a bug.
- pipe_->flush ();
-
- rc = probe_msg_.close ();
- errno_assert (rc == 0);
- }
-
- bool identity_ok = identify_peer (pipe_);
- if (identity_ok)
- fq.attach (pipe_);
- else
- anonymous_pipes.insert (pipe_);
-}
-
-int zmq::router_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- bool is_int = (optvallen_ == sizeof (int));
- int value = is_int? *((int *) optval_): 0;
-
- switch (option_) {
- case ZMQ_CONNECT_RID:
- if (optval_ && optvallen_) {
- connect_rid.assign ((char *) optval_, optvallen_);
- return 0;
- }
- break;
- case ZMQ_ROUTER_RAW:
- if (is_int && value >= 0) {
- raw_sock = (value != 0);
- if (raw_sock) {
- options.recv_identity = false;
- options.raw_sock = true;
- }
- return 0;
- }
- break;
-
- case ZMQ_ROUTER_MANDATORY:
- if (is_int && value >= 0) {
- mandatory = (value != 0);
- return 0;
- }
- break;
-
- case ZMQ_PROBE_ROUTER:
- if (is_int && value >= 0) {
- probe_router = (value != 0);
- return 0;
- }
- break;
-
- case ZMQ_ROUTER_HANDOVER:
- if (is_int && value >= 0) {
- handover = (value != 0);
- return 0;
- }
- break;
-
- default:
- break;
- }
- errno = EINVAL;
- return -1;
-}
-
-
-void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
-{
- std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
- if (it != anonymous_pipes.end ())
- anonymous_pipes.erase (it);
- else {
- outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
- zmq_assert (it != outpipes.end ());
- outpipes.erase (it);
- fq.pipe_terminated (pipe_);
- if (pipe_ == current_out)
- current_out = NULL;
- }
-}
-
-void zmq::router_t::xread_activated (pipe_t *pipe_)
-{
- std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
- if (it == anonymous_pipes.end ())
- fq.activated (pipe_);
- else {
- bool identity_ok = identify_peer (pipe_);
- if (identity_ok) {
- anonymous_pipes.erase (it);
- fq.attach (pipe_);
- }
- }
-}
-
-void zmq::router_t::xwrite_activated (pipe_t *pipe_)
-{
- outpipes_t::iterator it;
- for (it = outpipes.begin (); it != outpipes.end (); ++it)
- if (it->second.pipe == pipe_)
- break;
-
- zmq_assert (it != outpipes.end ());
- zmq_assert (!it->second.active);
- it->second.active = true;
-}
-
-int zmq::router_t::xsend (msg_t *msg_)
-{
- // If this is the first part of the message it's the ID of the
- // peer to send the message to.
- if (!more_out) {
- zmq_assert (!current_out);
-
- // If we have malformed message (prefix with no subsequent message)
- // then just silently ignore it.
- // TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::more) {
-
- more_out = true;
-
- // Find the pipe associated with the identity stored in the prefix.
- // If there's no such pipe just silently ignore the message, unless
- // router_mandatory is set.
- blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
- outpipes_t::iterator it = outpipes.find (identity);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- if (!current_out->check_write ()) {
- it->second.active = false;
- current_out = NULL;
- if (mandatory) {
- more_out = false;
- errno = EAGAIN;
- return -1;
- }
- }
- }
- else
- if (mandatory) {
- more_out = false;
- errno = EHOSTUNREACH;
- return -1;
- }
- }
-
- int rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
- return 0;
- }
-
- // Ignore the MORE flag for raw-sock or assert?
- if (options.raw_sock)
- msg_->reset_flags (msg_t::more);
-
- // Check whether this is the last part of the message.
- more_out = msg_->flags () & msg_t::more ? true : false;
-
- // Push the message into the pipe. If there's no out pipe, just drop it.
- if (current_out) {
-
- // Close the remote connection if user has asked to do so
- // by sending zero length message.
- // Pending messages in the pipe will be dropped (on receiving term- ack)
- if (raw_sock && msg_->size() == 0) {
- current_out->terminate (false);
- int rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
- current_out = NULL;
- return 0;
- }
-
- bool ok = current_out->write (msg_);
- if (unlikely (!ok)) {
- // Message failed to send - we must close it ourselves.
- int rc = msg_->close ();
- errno_assert (rc == 0);
- current_out = NULL;
- } else {
- if (!more_out) {
- current_out->flush ();
- current_out = NULL;
- }
- }
- }
- else {
- int rc = msg_->close ();
- errno_assert (rc == 0);
- }
-
- // Detach the message from the data buffer.
- int rc = msg_->init ();
- errno_assert (rc == 0);
-
- return 0;
-}
-
-int zmq::router_t::xrecv (msg_t *msg_)
-{
- if (prefetched) {
- if (!identity_sent) {
- int rc = msg_->move (prefetched_id);
- errno_assert (rc == 0);
- identity_sent = true;
- }
- else {
- int rc = msg_->move (prefetched_msg);
- errno_assert (rc == 0);
- prefetched = false;
- }
- more_in = msg_->flags () & msg_t::more ? true : false;
- return 0;
- }
-
- pipe_t *pipe = NULL;
- int rc = fq.recvpipe (msg_, &pipe);
-
- // It's possible that we receive peer's identity. That happens
- // after reconnection. The current implementation assumes that
- // the peer always uses the same identity.
- while (rc == 0 && msg_->is_identity ())
- rc = fq.recvpipe (msg_, &pipe);
-
- if (rc != 0)
- return -1;
-
- zmq_assert (pipe != NULL);
-
- // If we are in the middle of reading a message, just return the next part.
- if (more_in)
- more_in = msg_->flags () & msg_t::more ? true : false;
- else {
- // We are at the beginning of a message.
- // Keep the message part we have in the prefetch buffer
- // and return the ID of the peer instead.
- rc = prefetched_msg.move (*msg_);
- errno_assert (rc == 0);
- prefetched = true;
-
- blob_t identity = pipe->get_identity ();
- rc = msg_->init_size (identity.size ());
- errno_assert (rc == 0);
- memcpy (msg_->data (), identity.data (), identity.size ());
- msg_->set_flags (msg_t::more);
- identity_sent = true;
- }
-
- return 0;
-}
-
-int zmq::router_t::rollback (void)
-{
- if (current_out) {
- current_out->rollback ();
- current_out = NULL;
- more_out = false;
- }
- return 0;
-}
-
-bool zmq::router_t::xhas_in ()
-{
- // If we are in the middle of reading the messages, there are
- // definitely more parts available.
- if (more_in)
- return true;
-
- // We may already have a message pre-fetched.
- if (prefetched)
- return true;
-
- // Try to read the next message.
- // The message, if read, is kept in the pre-fetch buffer.
- pipe_t *pipe = NULL;
- int rc = fq.recvpipe (&prefetched_msg, &pipe);
-
- // It's possible that we receive peer's identity. That happens
- // after reconnection. The current implementation assumes that
- // the peer always uses the same identity.
- // TODO: handle the situation when the peer changes its identity.
- while (rc == 0 && prefetched_msg.is_identity ())
- rc = fq.recvpipe (&prefetched_msg, &pipe);
-
- if (rc != 0)
- return false;
-
- zmq_assert (pipe != NULL);
-
- blob_t identity = pipe->get_identity ();
- rc = prefetched_id.init_size (identity.size ());
- errno_assert (rc == 0);
- memcpy (prefetched_id.data (), identity.data (), identity.size ());
- prefetched_id.set_flags (msg_t::more);
-
- prefetched = true;
- identity_sent = false;
-
- return true;
-}
-
-bool zmq::router_t::xhas_out ()
-{
- // In theory, ROUTER socket is always ready for writing. Whether actual
- // attempt to write succeeds depends on whitch pipe the message is going
- // to be routed to.
- return true;
-}
-
-zmq::blob_t zmq::router_t::get_credential () const
-{
- return fq.get_credential ();
-}
-
-bool zmq::router_t::identify_peer (pipe_t *pipe_)
-{
- msg_t msg;
- blob_t identity;
- bool ok;
-
- if (connect_rid.length()) {
- identity = blob_t ((unsigned char*) connect_rid.c_str (),
- connect_rid.length());
- connect_rid.clear ();
- outpipes_t::iterator it = outpipes.find (identity);
- if (it != outpipes.end ())
- zmq_assert(false); // Not allowed to duplicate an existing rid
- }
- else
- if (options.raw_sock) { // Always assign identity for raw-socket
- unsigned char buf [5];
- buf [0] = 0;
- put_uint32 (buf + 1, next_rid++);
- identity = blob_t (buf, sizeof buf);
- }
- else
- if (!options.raw_sock) {
- // Pick up handshake cases and also case where next identity is set
- msg.init ();
- ok = pipe_->read (&msg);
- if (!ok)
- return false;
-
- if (msg.size () == 0) {
- // Fall back on the auto-generation
- unsigned char buf [5];
- buf [0] = 0;
- put_uint32 (buf + 1, next_rid++);
- identity = blob_t (buf, sizeof buf);
- msg.close ();
- }
- else {
- identity = blob_t ((unsigned char*) msg.data (), msg.size ());
- outpipes_t::iterator it = outpipes.find (identity);
- msg.close ();
-
- if (it != outpipes.end ()) {
- if (!handover)
- // Ignore peers with duplicate ID
- return false;
- else {
- // We will allow the new connection to take over this
- // identity. Temporarily assign a new identity to the
- // existing pipe so we can terminate it asynchronously.
- unsigned char buf [5];
- buf [0] = 0;
- put_uint32 (buf + 1, next_rid++);
- blob_t new_identity = blob_t (buf, sizeof buf);
-
- it->second.pipe->set_identity (new_identity);
- outpipe_t existing_outpipe =
- {it->second.pipe, it->second.active};
-
- ok = outpipes.insert (outpipes_t::value_type (
- new_identity, existing_outpipe)).second;
- zmq_assert (ok);
-
- // Remove the existing identity entry to allow the new
- // connection to take the identity.
- outpipes.erase (it);
-
- existing_outpipe.pipe->terminate (true);
- }
- }
- }
- }
-
- pipe_->set_identity (identity);
- // Add the record into output pipes lookup table
- outpipe_t outpipe = {pipe_, true};
- ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
- zmq_assert (ok);
-
- return true;
-}