#include "platform.hpp" #if defined ZMQ_HAVE_NORM #include "norm_engine.hpp" #include "session_base.hpp" #include "v2_protocol.hpp" zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, const options_t& options_) : io_object_t(parent_), zmq_session(NULL), options(options_), norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), is_sender(false), is_receiver(false), zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID), tx_first_msg(true), tx_more_bit(false), zmq_output_ready(false), norm_tx_ready(false), tx_index(0), tx_len(0), zmq_input_ready(false) { int rc = tx_msg.init(); errno_assert(0 == rc); } zmq::norm_engine_t::~norm_engine_t() { shutdown(); // in case it was not already called } int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) { // Parse the "network_" address int "iface", "addr", and "port" // norm endpoint format: [id,][;]: // First, look for optional local NormNodeId // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId) NormNodeId localId = NORM_NODE_ANY; const char* ifacePtr = strchr(network_, ','); if (NULL != ifacePtr) { size_t idLen = ifacePtr - network_; if (idLen > 31) idLen = 31; char idText[32]; strncpy(idText, network_, idLen); idText[idLen] = '\0'; localId = (NormNodeId)atoi(idText); ifacePtr++; } else { ifacePtr = network_; } // Second, look for optional multicast ifaceName char ifaceName[256]; const char* addrPtr = strchr(ifacePtr, ';'); if (NULL != addrPtr) { size_t ifaceLen = addrPtr - ifacePtr; if (ifaceLen > 255) ifaceLen = 255; // return error instead? strncpy(ifaceName, ifacePtr, ifaceLen); ifaceName[ifaceLen] = '\0'; ifacePtr = ifaceName; addrPtr++; } else { addrPtr = ifacePtr; ifacePtr = NULL; } // Finally, parse IP address and port number const char* portPtr = strrchr(addrPtr, ':'); if (NULL == portPtr) { errno = EINVAL; return -1; } char addr[256]; size_t addrLen = portPtr - addrPtr; if (addrLen > 255) addrLen = 255; strncpy(addr, addrPtr, addrLen); addr[addrLen] = '\0'; portPtr++; unsigned short portNumber = atoi(portPtr); if (NORM_INSTANCE_INVALID == norm_instance) { if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance())) { // errno set by whatever caused NormCreateInstance() to fail return -1; } } // TBD - What do we use for our local NormNodeId? // (for now we use automatic, IP addr based assignment or passed in 'id') // a) Use ZMQ Identity somehow? // b) Add function to use iface addr // c) Randomize and implement a NORM session layer // conflict detection/resolution protocol norm_session = NormCreateSession(norm_instance, addr, portNumber, localId); if (NORM_SESSION_INVALID == norm_session) { int savedErrno = errno; NormDestroyInstance(norm_instance); norm_instance = NORM_INSTANCE_INVALID; errno = savedErrno; return -1; } // There's many other useful NORM options that could be applied here if (NormIsUnicastAddress(addr)) { NormSetDefaultUnicastNack(norm_session, true); } else { // These only apply for multicast sessions //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1 NormSetTTL(norm_session, 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported NormSetRxPortReuse(norm_session, true); // port reuse doesn't work for non-connected unicast NormSetLoopback(norm_session, true); // needed when multicast users on same machine if (NULL != ifacePtr) { // Note a bad interface may not be caught until sender or receiver start // (Since sender/receiver is not yet started, this always succeeds here) NormSetMulticastInterface(norm_session, ifacePtr); } } if (recv) { // The alternative NORM_SYNC_CURRENT here would provide "instant" // receiver sync to the sender's _current_ message transmission. // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM); if (!NormStartReceiver(norm_session, 2*1024*1024)) { // errno set by whatever failed int savedErrno = errno; NormDestroyInstance(norm_instance); // session gets closed, too norm_session = NORM_SESSION_INVALID; norm_instance = NORM_INSTANCE_INVALID; errno = savedErrno; return -1; } is_receiver = true; } if (send) { // Pick a random sender instance id (aka norm sender session id) NormSessionId instanceId = NormGetRandomSessionId(); // TBD - provide "options" for some NORM sender parameters if (!NormStartSender(norm_session, instanceId, 2*1024*1024, 1400, 16, 4)) { // errno set by whatever failed int savedErrno = errno; NormDestroyInstance(norm_instance); // session gets closed, too norm_session = NORM_SESSION_INVALID; norm_instance = NORM_INSTANCE_INVALID; errno = savedErrno; return -1; } NormSetCongestionControl(norm_session, true); norm_tx_ready = true; is_sender = true; if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024))) { // errno set by whatever failed int savedErrno = errno; NormDestroyInstance(norm_instance); // session gets closed, too norm_session = NORM_SESSION_INVALID; norm_instance = NORM_INSTANCE_INVALID; errno = savedErrno; return -1; } } //NormSetMessageTrace(norm_session, true); //NormSetDebugLevel(3); //NormOpenDebugLog(norm_instance, "normLog.txt"); return 0; // no error } // end zmq::norm_engine_t::init() void zmq::norm_engine_t::shutdown() { // TBD - implement a more graceful shutdown option if (is_receiver) { NormStopReceiver(norm_session); // delete any active NormRxStreamState rx_pending_list.Destroy(); rx_ready_list.Destroy(); msg_ready_list.Destroy(); is_receiver = false; } if (is_sender) { NormStopSender(norm_session); is_sender = false; } if (NORM_SESSION_INVALID != norm_session) { NormDestroySession(norm_session); norm_session = NORM_SESSION_INVALID; } if (NORM_INSTANCE_INVALID != norm_instance) { NormStopInstance(norm_instance); NormDestroyInstance(norm_instance); norm_instance = NORM_INSTANCE_INVALID; } } // end zmq::norm_engine_t::shutdown() void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) { // TBD - we may assign the NORM engine to an io_thread in the future??? zmq_session = session_; if (is_sender) zmq_output_ready = true; if (is_receiver) zmq_input_ready = true; fd_t normDescriptor = NormGetDescriptor(norm_instance); norm_descriptor_handle = add_fd(normDescriptor); // Set POLLIN for notification of pending NormEvents set_pollin(norm_descriptor_handle); if (is_sender) send_data(); } // end zmq::norm_engine_t::init() void zmq::norm_engine_t::unplug() { rm_fd(norm_descriptor_handle); zmq_session = NULL; } // end zmq::norm_engine_t::unplug() void zmq::norm_engine_t::terminate() { unplug(); shutdown(); delete this; } void zmq::norm_engine_t::restart_output() { // There's new message data available from the session zmq_output_ready = true; if (norm_tx_ready) send_data(); } // end zmq::norm_engine_t::restart_output() void zmq::norm_engine_t::send_data() { // Here we write as much as is available or we can while (zmq_output_ready && norm_tx_ready) { if (0 == tx_len) { // Our tx_buffer needs data to send // Get more data from encoder size_t space = BUFFER_SIZE; unsigned char* bufPtr = (unsigned char*)tx_buffer; tx_len = zmq_encoder.encode(&bufPtr, space); if (0 == tx_len) { if (tx_first_msg) { // We don't need to mark eom/flush until a message is sent tx_first_msg = false; } else { // A prior message was completely written to stream, so // mark end-of-message and possibly flush (to force packet transmission, // even if it's not a full segment so message gets delivered quickly) // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging // but makes sure content is delivered quickly. Positive acknowledgements // with flush override would make NORM more succinct here NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); } // Need to pull and load a new message to send if (-1 == zmq_session->pull_msg(&tx_msg)) { // We need to wait for "restart_output()" to be called by ZMQ zmq_output_ready = false; break; } zmq_encoder.load_msg(&tx_msg); // Should we write message size header for NORM to use? Or expect NORM // receiver to decode ZMQ message framing format(s)? // OK - we need to use a byte to denote when the ZMQ frame is the _first_ // frame of a message so it can be decoded properly when a receiver // 'syncs' mid-stream. We key off the the state of the 'more_flag' // I.e.,If more_flag _was_ false previously, this is the first // frame of a ZMQ message. if (tx_more_bit) tx_buffer[0] = (char)0xff; // this is not first frame of message else tx_buffer[0] = 0x00; // this is first frame of message tx_more_bit = (0 != (tx_msg.flags() & msg_t::more)); // Go ahead an get a first chunk of the message bufPtr++; space--; tx_len = 1 + zmq_encoder.encode(&bufPtr, space); tx_index = 0; } } // Do we have data in our tx_buffer pending if (tx_index < tx_len) { // We have data in our tx_buffer to send, so write it to the stream tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index); if (tx_index < tx_len) { // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY norm_tx_ready = false; break; } tx_len = 0; // all buffered data was written } } // end while (zmq_output_ready && norm_tx_ready) } // end zmq::norm_engine_t::send_data() void zmq::norm_engine_t::in_event() { // This means a NormEvent is pending, so call NormGetNextEvent() and handle NormEvent event; if (!NormGetNextEvent(norm_instance, &event)) { // NORM has died before we unplugged?! zmq_assert(false); return; } switch(event.type) { case NORM_TX_QUEUE_VACANCY: case NORM_TX_QUEUE_EMPTY: if (!norm_tx_ready) { norm_tx_ready = true; send_data(); } break; case NORM_RX_OBJECT_NEW: //break; case NORM_RX_OBJECT_UPDATED: recv_data(event.object); break; case NORM_RX_OBJECT_ABORTED: { NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); if (NULL != rxState) { // Remove the state from the list it's in // This is now unnecessary since deletion takes care of list removal // but in the interest of being clear ... NormRxStreamState::List* list = rxState->AccessList(); if (NULL != list) list->Remove(*rxState); } delete rxState; break; } case NORM_REMOTE_SENDER_INACTIVE: // Here we free resources used for this formerly active sender. // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may // get some messages delivered twice. NORM_SYNC_CURRENT would // mitigate that but might miss data at startup. Always tradeoffs. // Instead of immediately deleting, we could instead initiate a // user configurable timeout here to wait some amount of time // after this event to declare the remote sender truly dead // and delete its state??? NormNodeDelete(event.sender); break; default: // We ignore some NORM events break; } } // zmq::norm_engine_t::in_event() void zmq::norm_engine_t::restart_input() { // TBD - should we check/assert that zmq_input_ready was false??? zmq_input_ready = true; // Process any pending received messages if (!msg_ready_list.IsEmpty()) recv_data(NORM_OBJECT_INVALID); } // end zmq::norm_engine_t::restart_input() void zmq::norm_engine_t::recv_data(NormObjectHandle object) { if (NORM_OBJECT_INVALID != object) { // Call result of NORM_RX_OBJECT_UPDATED notification // This is a rx_ready indication for a new or existing rx stream // First, determine if this is a stream we already know zmq_assert(NORM_OBJECT_STREAM == NormObjectGetType(object)); // Since there can be multiple senders (publishers), we keep // state for each separate rx stream. NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(object); if (NULL == rxState) { // This is a new stream, so create rxState with zmq decoder, etc rxState = new NormRxStreamState(object, options.maxmsgsize); if (!rxState->Init()) { errno_assert(false); delete rxState; return; } NormObjectSetUserData(object, rxState); } else if (!rxState->IsRxReady()) { // Existing non-ready stream, so remove from pending // list to be promoted to rx_ready_list ... rx_pending_list.Remove(*rxState); } if (!rxState->IsRxReady()) { // TBD - prepend up front for immediate service? rxState->SetRxReady(true); rx_ready_list.Append(*rxState); } } // This loop repeats until we've read all data available from "rx ready" inbound streams // and pushed any accumulated messages we can up to the zmq session. while (!rx_ready_list.IsEmpty() || (zmq_input_ready && !msg_ready_list.IsEmpty())) { // Iterate through our rx_ready streams, reading data into the decoder // (This services incoming "rx ready" streams in a round-robin fashion) NormRxStreamState::List::Iterator iterator(rx_ready_list); NormRxStreamState* rxState; while (NULL != (rxState = iterator.GetNextItem())) { switch(rxState->Decode()) { case 1: // msg completed // Complete message decoded, move this stream to msg_ready_list // to push the message up to the session below. Note the stream // will be returned to the "rx_ready_list" after that's done rx_ready_list.Remove(*rxState); msg_ready_list.Append(*rxState); continue; case -1: // decoding error (shouldn't happen w/ NORM, but ...) // We need to re-sync this stream (decoder buffer was reset) rxState->SetSync(false); break; default: // 0 - need more data break; } // Get more data from this stream NormObjectHandle stream = rxState->GetStreamHandle(); // First, make sure we're in sync ... while (!rxState->InSync()) { // seek NORM message start if (!NormStreamSeekMsgStart(stream)) { // Need to wait for more data break; } // read message 'flag' byte to see if this it's a 'final' frame char syncFlag; unsigned int numBytes = 1; if (!NormStreamRead(stream, &syncFlag, &numBytes)) { // broken stream (shouldn't happen after seek msg start?) zmq_assert(false); continue; } if (0 == numBytes) { // This probably shouldn't happen either since we found msg start // Need to wait for more data break; } if (0 == syncFlag) rxState->SetSync(true); // else keep seeking ... } // end while(!rxState->InSync()) if (!rxState->InSync()) { // Need more data for this stream, so remove from "rx ready" // list and iterate to next "rx ready" stream rxState->SetRxReady(false); // Move from rx_ready_list to rx_pending_list rx_ready_list.Remove(*rxState); rx_pending_list.Append(*rxState); continue; } // Now we're actually ready to read data from the NORM stream to the zmq_decoder // the underlying zmq_decoder->get_buffer() call sets how much is needed. unsigned int numBytes = rxState->GetBytesNeeded(); if (!NormStreamRead(stream, rxState->AccessBuffer(), &numBytes)) { // broken NORM stream, so re-sync rxState->Init(); // TBD - check result // This will retry syncing, and getting data from this stream // since we don't increment the "it" iterator continue; } rxState->IncrementBufferCount(numBytes); if (0 == numBytes) { // All the data available has been read // Need to wait for NORM_RX_OBJECT_UPDATED for this stream rxState->SetRxReady(false); // Move from rx_ready_list to rx_pending_list rx_ready_list.Remove(*rxState); rx_pending_list.Append(*rxState); } } // end while(NULL != (rxState = iterator.GetNextItem())) if (zmq_input_ready) { // At this point, we've made a pass through the "rx_ready" stream list // Now make a pass through the "msg_pending" list (if the zmq session // ready for more input). This may possibly return streams back to // the "rx ready" stream list after their pending message is handled NormRxStreamState::List::Iterator iterator(msg_ready_list); NormRxStreamState* rxState; while (NULL != (rxState = iterator.GetNextItem())) { msg_t* msg = rxState->AccessMsg(); int rc = zmq_session->push_msg(msg); if (-1 == rc) { if (EAGAIN == errno) { // need to wait until session calls "restart_input()" zmq_input_ready = false; break; } else { // session rejected message? // TBD - handle this better zmq_assert(false); } } // else message was accepted. msg_ready_list.Remove(*rxState); if (rxState->IsRxReady()) // Move back to "rx_ready" list to read more data rx_ready_list.Append(*rxState); else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED msg_ready_list.Append(*rxState); } // end while(NULL != (rxState = iterator.GetNextItem())) } // end if (zmq_input_ready) } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty())) // Alert zmq of the messages we have pushed up zmq_session->flush(); } // end zmq::norm_engine_t::recv_data() zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream, int64_t maxMsgSize) : norm_stream(normStream), max_msg_size(maxMsgSize), in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false), buffer_ptr(NULL), buffer_size(0), buffer_count(0), prev(NULL), next(NULL), list(NULL) { } zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState() { if (NULL != zmq_decoder) { delete zmq_decoder; zmq_decoder = NULL; } if (NULL != list) { list->Remove(*this); list = NULL; } } bool zmq::norm_engine_t::NormRxStreamState::Init() { in_sync = false; skip_norm_sync = false; if (NULL != zmq_decoder) delete zmq_decoder; // Note "in_batch_size" comes from config.h zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size); alloc_assert (zmq_decoder); if (NULL != zmq_decoder) { buffer_count = 0; buffer_size = 0; zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); return true; } else { return false; } } // end zmq::norm_engine_t::NormRxStreamState::Init() // This decodes any pending data sitting in our stream decoder buffer // It returns 1 upon message completion, -1 on error, 1 on msg completion int zmq::norm_engine_t::NormRxStreamState::Decode() { // If we have pending bytes to decode, process those first while (buffer_count > 0) { // There's pending data for the decoder to decode size_t processed = 0; // This a bit of a kludgy approach used to weed // out the NORM ZMQ message transport "syncFlag" byte // from the ZMQ message stream being decoded (but it works!) if (skip_norm_sync) { buffer_ptr++; buffer_count--; skip_norm_sync = false; } int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed); buffer_ptr += processed; buffer_count -= processed; switch (rc) { case 1: // msg completed if (0 == buffer_count) { buffer_size = 0; zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); } skip_norm_sync = true; return 1; case -1: // decoder error (reset decoder and state variables) in_sync = false; skip_norm_sync = false; // will get consumed by norm sync check Init(); break; case 0: // need more data, keep decoding until buffer exhausted break; } } // Reset buffer pointer/count for next read buffer_count = 0; buffer_size = 0; zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); return 0; // need more data } // end zmq::norm_engine_t::NormRxStreamState::Decode() zmq::norm_engine_t::NormRxStreamState::List::List() : head(NULL), tail(NULL) { } zmq::norm_engine_t::NormRxStreamState::List::~List() { Destroy(); } void zmq::norm_engine_t::NormRxStreamState::List::Destroy() { NormRxStreamState* item = head; while (NULL != item) { Remove(*item); delete item; item = head; } } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy() void zmq::norm_engine_t::NormRxStreamState::List::Append(NormRxStreamState& item) { item.prev = tail; if (NULL != tail) tail->next = &item; else head = &item; item.next = NULL; tail = &item; item.list = this; } // end zmq::norm_engine_t::NormRxStreamState::List::Append() void zmq::norm_engine_t::NormRxStreamState::List::Remove(NormRxStreamState& item) { if (NULL != item.prev) item.prev->next = item.next; else head = item.next; if (NULL != item.next) item.next ->prev = item.prev; else tail = item.prev; item.prev = item.next = NULL; item.list = NULL; } // end zmq::norm_engine_t::NormRxStreamState::List::Remove() zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator(const List& list) : next_item(list.head) { } zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() { NormRxStreamState* nextItem = next_item; if (NULL != nextItem) next_item = nextItem->next; return nextItem; } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() #endif // ZMQ_HAVE_NORM