diff options
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/pgm_sender.cpp')
-rw-r--r-- | external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/pgm_sender.cpp | 249 |
1 files changed, 0 insertions, 249 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/pgm_sender.cpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/pgm_sender.cpp deleted file mode 100644 index 593aa641..00000000 --- a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/pgm_sender.cpp +++ /dev/null @@ -1,249 +0,0 @@ -/* - Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_OPENPGM - -#ifdef ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include <stdlib.h> - -#include "io_thread.hpp" -#include "pgm_sender.hpp" -#include "session_base.hpp" -#include "err.hpp" -#include "wire.hpp" -#include "stdint.hpp" - -zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, - const options_t &options_) : - io_object_t (parent_), - has_tx_timer (false), - has_rx_timer (false), - session (NULL), - encoder (0), - more_flag (false), - pgm_socket (false, options_), - options (options_), - out_buffer (NULL), - out_buffer_size (0), - write_size (0) -{ - int rc = msg.init (); - errno_assert (rc == 0); -} - -int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) -{ - int rc = pgm_socket.init (udp_encapsulation_, network_); - if (rc != 0) - return rc; - - out_buffer_size = pgm_socket.get_max_tsdu_size (); - out_buffer = (unsigned char*) malloc (out_buffer_size); - alloc_assert (out_buffer); - - return rc; -} - -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) -{ - // Alocate 2 fds for PGM socket. - fd_t downlink_socket_fd = retired_fd; - fd_t uplink_socket_fd = retired_fd; - fd_t rdata_notify_fd = retired_fd; - fd_t pending_notify_fd = retired_fd; - - session = session_; - - // Fill fds from PGM transport and add them to the poller. - pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, - &rdata_notify_fd, &pending_notify_fd); - - handle = add_fd (downlink_socket_fd); - uplink_handle = add_fd (uplink_socket_fd); - rdata_notify_handle = add_fd (rdata_notify_fd); - pending_notify_handle = add_fd (pending_notify_fd); - - // Set POLLIN. We wont never want to stop polling for uplink = we never - // want to stop porocess NAKs. - set_pollin (uplink_handle); - set_pollin (rdata_notify_handle); - set_pollin (pending_notify_handle); - - // Set POLLOUT for downlink_socket_handle. - set_pollout (handle); -} - -void zmq::pgm_sender_t::unplug () -{ - if (has_rx_timer) { - cancel_timer (rx_timer_id); - has_rx_timer = false; - } - - if (has_tx_timer) { - cancel_timer (tx_timer_id); - has_tx_timer = false; - } - - rm_fd (handle); - rm_fd (uplink_handle); - rm_fd (rdata_notify_handle); - rm_fd (pending_notify_handle); - session = NULL; -} - -void zmq::pgm_sender_t::terminate () -{ - unplug (); - delete this; -} - -void zmq::pgm_sender_t::restart_output () -{ - set_pollout (handle); - out_event (); -} - -void zmq::pgm_sender_t::restart_input () -{ - zmq_assert (false); -} - -zmq::pgm_sender_t::~pgm_sender_t () -{ - int rc = msg.close (); - errno_assert (rc == 0); - - if (out_buffer) { - free (out_buffer); - out_buffer = NULL; - } -} - -void zmq::pgm_sender_t::in_event () -{ - if (has_rx_timer) { - cancel_timer (rx_timer_id); - has_rx_timer = false; - } - - // In-event on sender side means NAK or SPMR receiving from some peer. - pgm_socket.process_upstream (); - if (errno == ENOMEM || errno == EBUSY) { - const long timeout = pgm_socket.get_rx_timeout (); - add_timer (timeout, rx_timer_id); - has_rx_timer = true; - } -} - -void zmq::pgm_sender_t::out_event () -{ - // POLLOUT event from send socket. If write buffer is empty, - // try to read new data from the encoder. - if (write_size == 0) { - - // First two bytes (sizeof uint16_t) are used to store message - // offset in following steps. Note that by passing our buffer to - // the get data function we prevent it from returning its own buffer. - unsigned char *bf = out_buffer + sizeof (uint16_t); - size_t bfsz = out_buffer_size - sizeof (uint16_t); - uint16_t offset = 0xffff; - - size_t bytes = encoder.encode (&bf, bfsz); - while (bytes < bfsz) { - if (!more_flag && offset == 0xffff) - offset = static_cast <uint16_t> (bytes); - int rc = session->pull_msg (&msg); - if (rc == -1) - break; - more_flag = msg.flags () & msg_t::more; - encoder.load_msg (&msg); - bf = out_buffer + sizeof (uint16_t) + bytes; - bytes += encoder.encode (&bf, bfsz - bytes); - } - - // If there are no data to write stop polling for output. - if (bytes == 0) { - reset_pollout (handle); - return; - } - - write_size = sizeof (uint16_t) + bytes; - - // Put offset information in the buffer. - put_uint16 (out_buffer, offset); - } - - if (has_tx_timer) { - cancel_timer (tx_timer_id); - has_tx_timer = false; - } - - // Send the data. - size_t nbytes = pgm_socket.send (out_buffer, write_size); - - // We can write either all data or 0 which means rate limit reached. - if (nbytes == write_size) - write_size = 0; - else { - zmq_assert (nbytes == 0); - - if (errno == ENOMEM) { - const long timeout = pgm_socket.get_tx_timeout (); - add_timer (timeout, tx_timer_id); - has_tx_timer = true; - } - else - errno_assert (errno == EBUSY); - } -} - -void zmq::pgm_sender_t::timer_event (int token) -{ - // Timer cancels on return by poller_base. - if (token == rx_timer_id) { - has_rx_timer = false; - in_event (); - } - else - if (token == tx_timer_id) { - has_tx_timer = false; - out_event (); - } - else - zmq_assert (false); -} - -#endif - |