diff options
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/xpub.cpp')
-rw-r--r-- | external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/xpub.cpp | 207 |
1 files changed, 0 insertions, 207 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/xpub.cpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/xpub.cpp deleted file mode 100644 index 445ef060..00000000 --- a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/xpub.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/* - Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <string.h> - -#include "xpub.hpp" -#include "pipe.hpp" -#include "err.hpp" -#include "msg.hpp" - -zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), - verbose (false), - more (false), - lossy (true) -{ - options.type = ZMQ_XPUB; -} - -zmq::xpub_t::~xpub_t () -{ -} - -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) -{ - zmq_assert (pipe_); - dist.attach (pipe_); - - // If subscribe_to_all_ is specified, the caller would like to subscribe - // to all data on this pipe, implicitly. - if (subscribe_to_all_) - subscriptions.add (NULL, 0, pipe_); - - // The pipe is active when attached. Let's read the subscriptions from - // it, if any. - xread_activated (pipe_); -} - -void zmq::xpub_t::xread_activated (pipe_t *pipe_) -{ - // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { - // Apply the subscription to the trie - unsigned char *const data = (unsigned char *) sub.data (); - const size_t size = sub.size (); - if (size > 0 && (*data == 0 || *data == 1)) { - bool unique; - if (*data == 0) - unique = subscriptions.rm (data + 1, size - 1, pipe_); - else - unique = subscriptions.add (data + 1, size - 1, pipe_); - - // If the subscription is not a duplicate store it so that it can be - // passed to used on next recv call. (Unsubscribe is not verbose.) - if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) { - pending_data.push_back (blob_t (data, size)); - pending_flags.push_back (0); - } - } - else { - // Process user message coming upstream from xsub socket - pending_data.push_back (blob_t (data, size)); - pending_flags.push_back (sub.flags ()); - } - sub.close (); - } -} - -void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) -{ - dist.activated (pipe_); -} - -int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) { - errno = EINVAL; - return -1; - } - if (option_ == ZMQ_XPUB_VERBOSE) - verbose = (*static_cast <const int*> (optval_) != 0); - else - if (option_ == ZMQ_XPUB_NODROP) - lossy = (*static_cast <const int*> (optval_) == 0); - else { - errno = EINVAL; - return -1; - } - return 0; -} - -void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) -{ - // Remove the pipe from the trie. If there are topics that nobody - // is interested in anymore, send corresponding unsubscriptions - // upstream. - subscriptions.rm (pipe_, send_unsubscription, this); - - dist.pipe_terminated (pipe_); -} - -void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - self->dist.match (pipe_); -} - -int zmq::xpub_t::xsend (msg_t *msg_) -{ - bool msg_more = msg_->flags () & msg_t::more ? true : false; - - // For the first part of multi-part message, find the matching pipes. - if (!more) - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); - - int rc = -1; // Assume we fail - if (lossy || dist.check_hwm ()) { - if (dist.send_to_matching (msg_) == 0) { - // If we are at the end of multi-part message we can mark - // all the pipes as non-matching. - if (!msg_more) - dist.unmatch (); - more = msg_more; - rc = 0; // Yay, sent successfully - } - } - else - errno = EAGAIN; - return rc; -} - -bool zmq::xpub_t::xhas_out () -{ - return dist.has_out (); -} - -int zmq::xpub_t::xrecv (msg_t *msg_) -{ - // If there is at least one - if (pending_data.empty ()) { - errno = EAGAIN; - return -1; - } - - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init_size (pending_data.front ().size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), - pending_data.front ().data (), - pending_data.front ().size ()); - msg_->set_flags (pending_flags.front ()); - pending_data.pop_front (); - pending_flags.pop_front (); - return 0; -} - -bool zmq::xpub_t::xhas_in () -{ - return !pending_data.empty (); -} - -void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, - void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - - if (self->options.type != ZMQ_PUB) { - // Place the unsubscription to the queue of pending (un)sunscriptions - // to be retrived by the user later on. - blob_t unsub (size_ + 1, 0); - unsub [0] = 0; - if (size_ > 0) - memcpy (&unsub [1], data_, size_); - self->pending_data.push_back (unsub); - self->pending_flags.push_back (0); - } -} |