summaryrefslogtreecommitdiffstats
path: root/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp')
-rw-r--r--external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp188
1 files changed, 188 insertions, 0 deletions
diff --git a/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp
new file mode 100644
index 00000000..72542e19
--- /dev/null
+++ b/external_libs/python/pyzmq-14.7.0/bundled/zeromq/src/norm_engine.hpp
@@ -0,0 +1,188 @@
+
+#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
+#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
+
+#if defined ZMQ_HAVE_NORM
+
+#include "io_object.hpp"
+#include "i_engine.hpp"
+#include "options.hpp"
+#include "v2_decoder.hpp"
+#include "v2_encoder.hpp"
+
+#include <normApi.h>
+
+namespace zmq
+{
+ class io_thread_t;
+ class session_base_t;
+
+ class norm_engine_t : public io_object_t, public i_engine
+ {
+ public:
+ norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
+ ~norm_engine_t ();
+
+ // create NORM instance, session, etc
+ int init(const char* network_, bool send, bool recv);
+ void shutdown();
+
+ // i_engine interface implementation.
+ // Plug the engine to the session.
+ virtual void plug (zmq::io_thread_t *io_thread_,
+ class session_base_t *session_);
+
+ // Terminate and deallocate the engine. Note that 'detached'
+ // events are not fired on termination.
+ virtual void terminate ();
+
+ // This method is called by the session to signalise that more
+ // messages can be written to the pipe.
+ virtual void restart_input ();
+
+ // This method is called by the session to signalise that there
+ // are messages to send available.
+ virtual void restart_output ();
+
+ virtual void zap_msg_available () {};
+
+ // i_poll_events interface implementation.
+ // (we only need in_event() for NormEvent notification)
+ // (i.e., don't have any output events or timers (yet))
+ void in_event ();
+
+ private:
+ void unplug();
+ void send_data();
+ void recv_data(NormObjectHandle stream);
+
+
+ enum {BUFFER_SIZE = 2048};
+
+ // Used to keep track of streams from multiple senders
+ class NormRxStreamState
+ {
+ public:
+ NormRxStreamState(NormObjectHandle normStream,
+ int64_t maxMsgSize);
+ ~NormRxStreamState();
+
+ NormObjectHandle GetStreamHandle() const
+ {return norm_stream;}
+
+ bool Init();
+
+ void SetRxReady(bool state)
+ {rx_ready = state;}
+ bool IsRxReady() const
+ {return rx_ready;}
+
+ void SetSync(bool state)
+ {in_sync = state;}
+ bool InSync() const
+ {return in_sync;}
+
+ // These are used to feed data to decoder
+ // and its underlying "msg" buffer
+ char* AccessBuffer()
+ {return (char*)(buffer_ptr + buffer_count);}
+ size_t GetBytesNeeded() const
+ {return (buffer_size - buffer_count);}
+ void IncrementBufferCount(size_t count)
+ {buffer_count += count;}
+ msg_t* AccessMsg()
+ {return zmq_decoder->msg();}
+ // This invokes the decoder "decode" method
+ // returning 0 if more data is needed,
+ // 1 if the message is complete, If an error
+ // occurs the 'sync' is dropped and the
+ // decoder re-initialized
+ int Decode();
+
+ class List
+ {
+ public:
+ List();
+ ~List();
+
+ void Append(NormRxStreamState& item);
+ void Remove(NormRxStreamState& item);
+
+ bool IsEmpty() const
+ {return (NULL == head);}
+
+ void Destroy();
+
+ class Iterator
+ {
+ public:
+ Iterator(const List& list);
+ NormRxStreamState* GetNextItem();
+ private:
+ NormRxStreamState* next_item;
+ };
+ friend class Iterator;
+
+ private:
+ NormRxStreamState* head;
+ NormRxStreamState* tail;
+
+ }; // end class zmq::norm_engine_t::NormRxStreamState::List
+
+ friend class List;
+
+ List* AccessList()
+ {return list;}
+
+
+ private:
+ NormObjectHandle norm_stream;
+ int64_t max_msg_size;
+ bool in_sync;
+ bool rx_ready;
+ v2_decoder_t* zmq_decoder;
+ bool skip_norm_sync;
+ unsigned char* buffer_ptr;
+ size_t buffer_size;
+ size_t buffer_count;
+
+ NormRxStreamState* prev;
+ NormRxStreamState* next;
+ NormRxStreamState::List* list;
+
+ }; // end class zmq::norm_engine_t::NormRxStreamState
+
+ session_base_t* zmq_session;
+ options_t options;
+ NormInstanceHandle norm_instance;
+ handle_t norm_descriptor_handle;
+ NormSessionHandle norm_session;
+ bool is_sender;
+ bool is_receiver;
+ // Sender state
+ msg_t tx_msg;
+ v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
+ NormObjectHandle norm_tx_stream;
+ bool tx_first_msg;
+ bool tx_more_bit;
+ bool zmq_output_ready; // zmq has msg(s) to send
+ bool norm_tx_ready; // norm has tx queue vacancy
+ // tbd - maybe don't need buffer if can access zmq message buffer directly?
+ char tx_buffer[BUFFER_SIZE];
+ unsigned int tx_index;
+ unsigned int tx_len;
+
+ // Receiver state
+ // Lists of norm rx streams from remote senders
+ bool zmq_input_ready; // zmq ready to receive msg(s)
+ NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
+ NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
+ NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
+
+
+ }; // end class norm_engine_t
+}
+
+#endif // ZMQ_HAVE_NORM
+
+#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__