summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp')
-rw-r--r--external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp216
1 files changed, 216 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp
new file mode 100644
index 00000000..e7e18233
--- /dev/null
+++ b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/dist.cpp
@@ -0,0 +1,216 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of libzmq, the ZeroMQ core engine in C++.
+
+ libzmq is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License (LGPL) as published
+ by the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ As a special exception, the Contributors give you permission to link
+ this library with independent modules to produce an executable,
+ regardless of the license terms of these independent modules, and to
+ copy and distribute the resulting executable under terms of your choice,
+ provided that you also meet, for each linked independent module, the
+ terms and conditions of the license of that module. An independent
+ module is a module which is not derived from or based on this library.
+ If you modify this library, you must extend this exception to your
+ version of the library.
+
+ libzmq is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "dist.hpp"
+#include "pipe.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+#include "likely.hpp"
+
+zmq::dist_t::dist_t () :
+ matching (0),
+ active (0),
+ eligible (0),
+ more (false)
+{
+}
+
+zmq::dist_t::~dist_t ()
+{
+ zmq_assert (pipes.empty ());
+}
+
+void zmq::dist_t::attach (pipe_t *pipe_)
+{
+ // If we are in the middle of sending a message, we'll add new pipe
+ // into the list of eligible pipes. Otherwise we add it to the list
+ // of active pipes.
+ if (more) {
+ pipes.push_back (pipe_);
+ pipes.swap (eligible, pipes.size () - 1);
+ eligible++;
+ }
+ else {
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+ eligible++;
+ }
+}
+
+void zmq::dist_t::match (pipe_t *pipe_)
+{
+ // If pipe is already matching do nothing.
+ if (pipes.index (pipe_) < matching)
+ return;
+
+ // If the pipe isn't eligible, ignore it.
+ if (pipes.index (pipe_) >= eligible)
+ return;
+
+ // Mark the pipe as matching.
+ pipes.swap (pipes.index (pipe_), matching);
+ matching++;
+}
+
+void zmq::dist_t::unmatch ()
+{
+ matching = 0;
+}
+
+void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of matching, active and/or
+ // eligible pipes accordingly.
+ if (pipes.index (pipe_) < matching) {
+ pipes.swap (pipes.index (pipe_), matching - 1);
+ matching--;
+ }
+ if (pipes.index (pipe_) < active) {
+ pipes.swap (pipes.index (pipe_), active - 1);
+ active--;
+ }
+ if (pipes.index (pipe_) < eligible) {
+ pipes.swap (pipes.index (pipe_), eligible - 1);
+ eligible--;
+ }
+
+ pipes.erase (pipe_);
+}
+
+void zmq::dist_t::activated (pipe_t *pipe_)
+{
+ // Move the pipe from passive to eligible state.
+ pipes.swap (pipes.index (pipe_), eligible);
+ eligible++;
+
+ // If there's no message being sent at the moment, move it to
+ // the active state.
+ if (!more) {
+ pipes.swap (eligible - 1, active);
+ active++;
+ }
+}
+
+int zmq::dist_t::send_to_all (msg_t *msg_)
+{
+ matching = active;
+ return send_to_matching (msg_);
+}
+
+int zmq::dist_t::send_to_matching (msg_t *msg_)
+{
+ // Is this end of a multipart message?
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
+
+ // Push the message to matching pipes.
+ distribute (msg_);
+
+ // If mutlipart message is fully sent, activate all the eligible pipes.
+ if (!msg_more)
+ active = eligible;
+
+ more = msg_more;
+
+ return 0;
+}
+
+void zmq::dist_t::distribute (msg_t *msg_)
+{
+ // If there are no matching pipes available, simply drop the message.
+ if (matching == 0) {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return;
+ }
+
+ if (msg_->is_vsm ()) {
+ for (pipes_t::size_type i = 0; i < matching; ++i)
+ if(!write (pipes [i], msg_))
+ --i; // Retry last write because index will have been swapped
+ int rc = msg_->close();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return;
+ }
+
+ // Add matching-1 references to the message. We already hold one reference,
+ // that's why -1.
+ msg_->add_refs ((int) matching - 1);
+
+ // Push copy of the message to each matching pipe.
+ int failed = 0;
+ for (pipes_t::size_type i = 0; i < matching; ++i)
+ if (!write (pipes [i], msg_)) {
+ ++failed;
+ --i; // Retry last write because index will have been swapped
+ }
+ if (unlikely (failed))
+ msg_->rm_refs (failed);
+
+ // Detach the original message from the data buffer. Note that we don't
+ // close the message. That's because we've already used all the references.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
+}
+
+bool zmq::dist_t::has_out ()
+{
+ return true;
+}
+
+bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
+{
+ if (!pipe_->write (msg_)) {
+ pipes.swap (pipes.index (pipe_), matching - 1);
+ matching--;
+ pipes.swap (pipes.index (pipe_), active - 1);
+ active--;
+ pipes.swap (active, eligible - 1);
+ eligible--;
+ return false;
+ }
+ if (!(msg_->flags () & msg_t::more))
+ pipe_->flush ();
+ return true;
+}
+
+bool zmq::dist_t::check_hwm ()
+{
+ for (pipes_t::size_type i = 0; i < matching; ++i)
+ if (!pipes [i]->check_hwm ())
+ return false;
+
+ return true;
+}
+
+