summaryrefslogtreecommitdiffstats
path: root/src/console/zmq/devices/monitoredqueue.pxd
blob: 1e26ed8603f47c5e66471c2ef55a8c09e548e2bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""MonitoredQueue class declarations.

Authors
-------
* MinRK
* Brian Granger
"""

#
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
#
#    This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp
#    originally from libzmq-2.1.6, used under LGPLv3
#
#    pyzmq is free software; you can redistribute it and/or modify it under
#    the terms of the Lesser GNU General Public License as published by
#    the Free Software Foundation; either version 3 of the License, or
#    (at your option) any later version.
#
#    pyzmq is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    Lesser GNU General Public License for more details.
#
#    You should have received a copy of the Lesser GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

from libzmq cimport *

#-----------------------------------------------------------------------------
# MonitoredQueue C functions
#-----------------------------------------------------------------------------

cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_, 
                zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
                bint swap_ids) nogil:
    cdef int rc
    cdef int64_t flag_2
    cdef int flag_3
    cdef int flags
    cdef bint more
    cdef size_t flagsz
    cdef void * flag_ptr
    
    if ZMQ_VERSION_MAJOR < 3:
        flagsz = sizeof (int64_t)
        flag_ptr = &flag_2
    else:
        flagsz = sizeof (int)
        flag_ptr = &flag_3
    
    if swap_ids:# both router, must send second identity first
        # recv two ids into msg, id_msg
        rc = zmq_msg_recv(&msg, insocket_, 0)
        if rc < 0: return rc
        
        rc = zmq_msg_recv(&id_msg, insocket_, 0)
        if rc < 0: return rc

        # send second id (id_msg) first
        #!!!! always send a copy before the original !!!!
        rc = zmq_msg_copy(&side_msg, &id_msg)
        if rc < 0: return rc
        rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
        if rc < 0: return rc
        rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE)
        if rc < 0: return rc
        # send first id (msg) second
        rc = zmq_msg_copy(&side_msg, &msg)
        if rc < 0: return rc
        rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
        if rc < 0: return rc
        rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
        if rc < 0: return rc
    while (True):
        rc = zmq_msg_recv(&msg, insocket_, 0)
        if rc < 0: return rc
        # assert (rc == 0)
        rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
        if rc < 0: return rc
        flags = 0
        if ZMQ_VERSION_MAJOR < 3:
            if flag_2:
                flags |= ZMQ_SNDMORE
        else:
            if flag_3:
                flags |= ZMQ_SNDMORE
            # LABEL has been removed:
            # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
            # if flag_3:
            #     flags |= ZMQ_SNDLABEL
        # assert (rc == 0)

        rc = zmq_msg_copy(&side_msg, &msg)
        if rc < 0: return rc
        if flags:
            rc = zmq_msg_send(&side_msg, outsocket_, flags)
            if rc < 0: return rc
            # only SNDMORE for side-socket
            rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
            if rc < 0: return rc
        else:
            rc = zmq_msg_send(&side_msg, outsocket_, 0)
            if rc < 0: return rc
            rc = zmq_msg_send(&msg, sidesocket_, 0)
            if rc < 0: return rc
            break
    return rc

# the MonitoredQueue C function, adapted from zmq::queue.cpp :
cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
                        void *sidesocket_, zmq_msg_t *in_msg_ptr, 
                        zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
    """The actual C function for a monitored queue device. 

    See ``monitored_queue()`` for details.
    """
    
    cdef zmq_msg_t msg
    cdef int rc = zmq_msg_init (&msg)
    cdef zmq_msg_t id_msg
    rc = zmq_msg_init (&id_msg)
    if rc < 0: return rc
    cdef zmq_msg_t side_msg
    rc = zmq_msg_init (&side_msg)
    if rc < 0: return rc
    
    cdef zmq_pollitem_t items [2]
    items [0].socket = insocket_
    items [0].fd = 0
    items [0].events = ZMQ_POLLIN
    items [0].revents = 0
    items [1].socket = outsocket_
    items [1].fd = 0
    items [1].events = ZMQ_POLLIN
    items [1].revents = 0
    # I don't think sidesocket should be polled?
    # items [2].socket = sidesocket_
    # items [2].fd = 0
    # items [2].events = ZMQ_POLLIN
    # items [2].revents = 0
    
    while (True):
    
        # //  Wait while there are either requests or replies to process.
        rc = zmq_poll (&items [0], 2, -1)
        if rc < 0: return rc
        # //  The algorithm below asumes ratio of request and replies processed
        # //  under full load to be 1:1. Although processing requests replies
        # //  first is tempting it is suspectible to DoS attacks (overloading
        # //  the system with unsolicited replies).
        # 
        # //  Process a request.
        if (items [0].revents & ZMQ_POLLIN):
            # send in_prefix to side socket
            rc = zmq_msg_copy(&side_msg, in_msg_ptr)
            if rc < 0: return rc
            rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
            if rc < 0: return rc
            # relay the rest of the message
            rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
            if rc < 0: return rc
        if (items [1].revents & ZMQ_POLLIN):
            # send out_prefix to side socket
            rc = zmq_msg_copy(&side_msg, out_msg_ptr)
            if rc < 0: return rc
            rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
            if rc < 0: return rc
            # relay the rest of the message
            rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
            if rc < 0: return rc
    return rc