diff options
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c')
-rw-r--r-- | libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c | 850 |
1 files changed, 850 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c new file mode 100644 index 00000000..bf2c68db --- /dev/null +++ b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c @@ -0,0 +1,850 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include <LongBow/runtime.h> +#include <LongBow/debugging.h> + +#include <stdio.h> +#include <stdlib.h> +#include <pthread.h> +#include <fcntl.h> +#include <unistd.h> +#include <string.h> +#include <pthread.h> +#include <stdbool.h> +#include <math.h> +#include <signal.h> +#include <errno.h> + +#include <sys/time.h> +#include <sys/queue.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include <parc/algol/parc_Memory.h> +#include <parc/algol/parc_EventTimer.h> +#include <parc/algol/parc_EventQueue.h> +#include <parc/algol/parc_EventBuffer.h> +#include <parc/algol/parc_EventSocket.h> +#include "bent_pipe.h" + +#define MAX_CONN 10 + +struct packet_wrapper { + struct timeval deadline; + int ingress_fd; + uint8_t *pbuff; + size_t pbuff_len; + + TAILQ_ENTRY(packet_wrapper) list; +}; + +struct bentpipe_conn { + int client_fd; + PARCEventQueue *bev; + BentPipeState *parent; + + // after reading a header, this is how long the next message is + size_t msg_length; + + // queue to send stuff up the connection + unsigned bytes_in_queue; + unsigned count_in_queue; + struct timeval last_deadline; + TAILQ_HEAD(, packet_wrapper) output_queue; + + PARCEventTimer *timer_event; +}; + +struct bentpipe_state { + char *local_name; + PARCEventScheduler *base; + PARCEventSocket *listenerUnix; + + struct bentpipe_conn conns[MAX_CONN]; + unsigned conn_count; + + pthread_t router_thread; + + bool use_params; + double loss_rate; + unsigned buffer_bytes; + double mean_sec_delay; + double bytes_per_sec; + + pthread_mutex_t startup_mutex; + pthread_cond_t startup_cond; + unsigned magic; + bool startup_running; + + // used to signal into the thread to stop + bool killme; + PARCEventTimer *keep_alive_event; + + // these track the state of sigpipe before we + // go into a write, so we can re-set the state + + // sigpipe_pending indicates that it was pending before our call + bool sigpipe_pending; + + // sigpipe_blocked means that it was blocked before our call, so + // don't unblock it when we're done + bool sigpipe_blocked; + + // debugging output + bool chattyOutput; +}; + +typedef struct { + uint32_t pid; + uint32_t fd; + uint32_t length; + uint32_t pad; // pad out to 16 bytes +} __attribute__ ((packed)) localhdr; + +static int setup_local(BentPipeState*bp); +static void +listener_cb(int fd, struct sockaddr *sa, int socklen, void *user_data); + +static void *run_bentpipe(void *arg); + +static void conn_readcb(PARCEventQueue *bev, PARCEventType event, void *connection); +static void reflect(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len); + +static void +queue_with_delay(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len, int i); + +static void timer_cb(int fd, PARCEventType what, void *user_data); +static void set_timer(struct bentpipe_conn *conn, struct timeval delay); +static void keepalive_cb(int fd, PARCEventType what, void *user_data); +void conn_errorcb(PARCEventQueue *bev, PARCEventQueueEventType events, void *user_framework); + +#define MAGIC 0x01020304 + +// =============== + +static void +lock_bentpipe(BentPipeState *bp) +{ + int res = pthread_mutex_lock(&bp->startup_mutex); + assertTrue(res == 0, "error from pthread_mutex_lock: %d", res); +} + +static void +unlock_bentpipe(BentPipeState *bp) +{ + int res = pthread_mutex_unlock(&bp->startup_mutex); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + +static void +wait_bentpipe(BentPipeState *bp) +{ + int res = pthread_cond_wait(&bp->startup_cond, &bp->startup_mutex); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + +static void +signal_bentpipe(BentPipeState *bp) +{ + int res = pthread_cond_signal(&bp->startup_cond); + assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res); +} + +/** + * if SIGPIPE was pending before we are called, don't do anything. + * Otherwise, mask SIGPIPE + */ +static void +capture_sigpipe(BentPipeState *bp) +{ +#if !defined(SO_NOSIGPIPE) + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + bp->sigpipe_pending = sigismember(&pending, SIGPIPE); + if (!bp->sigpipe_pending) { + sigset_t sigpipe_mask; + sigemptyset(&sigpipe_mask); + sigaddset(&sigpipe_mask, SIGPIPE); + + sigset_t blocked; + sigemptyset(&blocked); + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); + bp->sigpipe_blocked = sigismember(&blocked, SIGPIPE); + } +#endif +} + +static void +release_sigpipe(BentPipeState *bp) +{ +#if !defined(SO_NOSIGPIPE) + // If sigpipe was previously pending, we didnt block it, so + // nothing new to do + if (!bp->sigpipe_pending) { + sigset_t pending; + + sigset_t sigpipe_mask; + sigemptyset(&sigpipe_mask); + sigaddset(&sigpipe_mask, SIGPIPE); + + sigemptyset(&pending); + sigpending(&pending); + + if (!bp->sigpipe_blocked) { + pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); + } + } +#endif +} + +BentPipeState * +bentpipe_Create(const char *local_name) +{ + assertNotNull(local_name, "Parameter local_name must be non-null"); + + struct timeval keepalive_timeout = { .tv_sec = 0, .tv_usec = 500000 }; + + BentPipeState *bp = parcMemory_AllocateAndClear(sizeof(struct bentpipe_state)); + assertNotNull(bp, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct bentpipe_state)); + + bp->local_name = parcMemory_StringDuplicate((char *) local_name, 1024); + bp->conn_count = 0; + bp->base = parcEventScheduler_Create(); + bp->use_params = false; + bp->chattyOutput = false; + + if (!bp->base) { + fprintf(stderr, "Could not initialize PARCEventScheduler!\n"); + return NULL; + } + + pthread_mutex_init(&bp->startup_mutex, NULL); + pthread_cond_init(&bp->startup_cond, NULL); + bp->startup_running = false; + bp->magic = MAGIC; + bp->keep_alive_event = parcEventTimer_Create(bp->base, PARCEventType_Persist, keepalive_cb, bp); + parcEventTimer_Start(bp->keep_alive_event, &keepalive_timeout); + + for (int i = 0; i < MAX_CONN; i++) { + bp->conns[i].bytes_in_queue = 0; + TAILQ_INIT(&bp->conns[i].output_queue); + bp->conns[i].timer_event = parcEventTimer_Create(bp->base, 0, timer_cb, &bp->conns[i]); + } + setup_local(bp); + capture_sigpipe(bp); + return bp; +} + +void +bentpipe_Destroy(BentPipeState **bpPtr) +{ + BentPipeState *bp; + assertNotNull(bpPtr, "%s got null double pointer\n", __func__); + bp = *bpPtr; + assertNotNull(bp, "%s got null dereference\n", __func__); + + assertFalse(bp->startup_running, "calling destroy on a running bentpipe\n"); + + int i; + for (i = 0; i < MAX_CONN; i++) { + if (bp->conns[i].client_fd > 0) { + // this closes it too + parcEventQueue_Destroy(&(bp->conns[i].bev)); + bp->conns[i].client_fd = 0; + } + parcEventTimer_Destroy(&(bp->conns[i].timer_event)); + } + + parcEventTimer_Destroy(&(bp->keep_alive_event)); + parcEventSocket_Destroy(&(bp->listenerUnix)); + parcEventScheduler_Destroy(&(bp->base)); + bp->conn_count = 0; + + int failure = unlink(bp->local_name); + if (failure) { + printf("Error unlinking '%s': (%d) %s\n", bp->local_name, errno, strerror(errno)); + } + + release_sigpipe(bp); + parcMemory_Deallocate((void **) &(bp->local_name)); + parcMemory_Deallocate((void **) &bp); +} + +void +bentpipe_SetChattyOutput(BentPipeState *bp, bool chattyOutput) +{ + bp->chattyOutput = chattyOutput; +} + +int +bentpipe_Start(BentPipeState *bp) +{ + assertFalse(bp->startup_running, "bentpipe_Start already running"); + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + + assertTrue(pthread_create(&bp->router_thread, &attr, run_bentpipe, bp) == 0, "pthread_create failed."); +// if (pthread_create(&bp->router_thread, &attr, run_bentpipe, bp) != 0) { +// perror("pthread_create router thread"); +// abort(); +// } + + lock_bentpipe(bp); + while (!bp->startup_running) { + wait_bentpipe(bp); + } + unlock_bentpipe(bp); + + return 0; +} + +int +bentpipe_Stop(BentPipeState *bp) +{ + assertTrue(bp->magic == MAGIC, "Magic is not magic value! got %08X", bp->magic); + assertTrue(bp->startup_running, "Calling stop on a stopped bentpipe\n"); + + bp->killme = true; + + // wait until exist + lock_bentpipe(bp); + while (bp->startup_running) { + wait_bentpipe(bp); + } + unlock_bentpipe(bp); + + return 0; +} + +// ================================== + +void +listener_errorcb(PARCEventScheduler *base, int error, char *errorString, void *addr_unix) +{ + fprintf(stderr, "Got an error %d (%s) on the listener. " + "Shutting down.\n", error, errorString); + + parcEventScheduler_Stop(base, NULL); +} + +static int +setup_local(BentPipeState*bp) +{ + // cleanup anything left on file system + unlink(bp->local_name); + + struct sockaddr_un addr_unix; + memset(&addr_unix, 0, sizeof(addr_unix)); + + addr_unix.sun_family = PF_UNIX; + strcpy(addr_unix.sun_path, bp->local_name); + + if (bp->chattyOutput) { + printf("bent_pipe Creating '%s'", bp->local_name); + } + + bp->listenerUnix = parcEventSocket_Create(bp->base, + listener_cb, + listener_errorcb, + (void *) bp, + (struct sockaddr*) &addr_unix, + sizeof(addr_unix)); + + assertNotNull(bp->listenerUnix, "parcEventSocket_Create failed: unix %s", bp->local_name); + + return 0; +} + +static void * +run_bentpipe(void *arg) +{ + BentPipeState *bp = (BentPipeState *) arg; + + if (bp->chattyOutput) { + printf("%s starting\n", __func__); + } + + // The keepalive timer will signal that we have started running + parcEventScheduler_Start(bp->base, PARCEventSchedulerDispatchType_Blocking); + + if (bp->chattyOutput) { + printf("%s exiting\n", __func__); + } + + lock_bentpipe(bp); + bp->startup_running = false; + signal_bentpipe(bp); + unlock_bentpipe(bp); + + pthread_exit(NULL); +} + +static struct bentpipe_conn * +allocate_connection(BentPipeState *bp) +{ + if (bp->conn_count == MAX_CONN) { + printf("allocate_connection: connection count is %d, maximum count is %d\n", + bp->conn_count, MAX_CONN); + return NULL; + } + + for (int i = 0; i < MAX_CONN; i++) { + if (bp->conns[i].client_fd == 0) { + bp->conn_count++; + return &bp->conns[i]; + } + } + + // should never get here + abort(); +} + +static void +deallocate_connection(BentPipeState *bp, struct bentpipe_conn *conn) +{ + assertTrue(bp->conn_count > 0, "invalid state, called deallocate_connection when conn_count is zero"); + + conn->client_fd = 0; + + if (bp->chattyOutput) { + printf("destroying connection %p eventqueue %p\n", (void *) conn, (void *) conn->bev); + } + + parcEventQueue_Disable(conn->bev, PARCEventType_Read); + parcEventQueue_Destroy(&(conn->bev)); + + // this unschedules any callbacks, but the timer is still allocated + // timer_event is freed in bentPipe_Destroy() + parcEventTimer_Stop(conn->timer_event); + + bp->conn_count--; +} + +/* + * Server accepts a new client + */ +static void +listener_cb(int fd, struct sockaddr *sa, int socklen, void *user_data) +{ + BentPipeState *bp = (BentPipeState *) user_data; + + // allocate a connection + struct bentpipe_conn *conn = allocate_connection(bp); + + conn->parent = bp; + conn->client_fd = fd; + + // Set non-blocking flag + int flags = fcntl(fd, F_GETFL, NULL); + assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno); + int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno); + +#if defined(SO_NOSIGPIPE) + int set = 1; + setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *) &set, sizeof(int)); +#endif + + conn->bev = parcEventQueue_Create(bp->base, fd, PARCEventQueueOption_CloseOnFree | PARCEventQueueOption_DeferCallbacks); + if (!conn->bev) { + fprintf(stderr, "Error constructing parcEventQueue!"); + parcEventScheduler_Abort(bp->base); + return; + } + + parcEventQueue_SetCallbacks(conn->bev, conn_readcb, NULL, conn_errorcb, (void *) conn); + parcEventQueue_Enable(conn->bev, PARCEventType_Read); + + if (bp->chattyOutput) { + printf("%s accepted connection on fd %d conn %p eventqueue %p\n", + __func__, fd, (void *) conn, (void *) conn->bev); + } +} + +/* + * Return 1 if we read an entire message and we can try another one. + * return 0 if we're waiting for bytes. + */ +static int +single_read(PARCEventQueue *bev, struct bentpipe_conn *conn) +{ + BentPipeState *bp = conn->parent; + PARCEventBuffer *input = parcEventBuffer_GetQueueBufferInput(bev); + size_t read_length = parcEventBuffer_GetLength(input); + int ret = 0; + + if (bp->chattyOutput) { + printf("single_read: connid %d read %zu bytes\n", conn->client_fd, read_length); + } + + if (read_length == 0) { + // 0 length means EOF, close the connection + if (bp->chattyOutput) { + printf("single_read: connid %d read_length %zu, EOF, closing connection\n", + conn->client_fd, + read_length); + } + deallocate_connection(conn->parent, conn); + parcEventBuffer_Destroy(&input); + return 0; + } + + // head we read the message header, which sets the message length? + if (conn->msg_length == 0) { + // did we read a whole header? + if (read_length >= sizeof(localhdr)) { + // yes we did, we can now read the message header then try to read the whole message + // Note that this does not remove the header from the buffer + + localhdr *msg_hdr = (localhdr *) parcEventBuffer_Pullup(input, sizeof(localhdr)); + conn->msg_length = msg_hdr->length; + + assertTrue(conn->msg_length > 0, "single_read: msg_hdr length is 0!"); + assertTrue(conn->msg_length < 64000, "single_read: msg_hdr length too large: %zu", conn->msg_length); + + if (bp->chattyOutput) { + printf("single_read: %s start read_length %zu msg_length %zu\n", __func__, read_length, conn->msg_length); + } + } else { + // no we did not, wait for more data before procesing. + + if (bp->chattyOutput) { + printf("single_read: %s short read %zu\n", __func__, read_length); + } + } + } + + // if read_length < sizeof(localhdr), then this is false. + // Otherwise, we've read the header and set msg_length, so if read_length + // is greater than the sum we have a full message plus header + + if (read_length >= sizeof(localhdr) + conn->msg_length) { + size_t pbuff_len = sizeof(localhdr) + conn->msg_length; + int bytes_removed; + + uint8_t *pbuff = parcMemory_Allocate(pbuff_len); + assertNotNull(pbuff, "parcMemory_Allocate(%zu) returned NULL", pbuff_len); + + // dequeue into packet buffer + bytes_removed = parcEventBuffer_Read(input, (void *) pbuff, pbuff_len); + assertTrue(bytes_removed == pbuff_len, "parcEventBuffer read wrong length, expected %zu got %d", pbuff_len, bytes_removed); + + // now reset message length for next packet + conn->msg_length = 0; + + if (bp->chattyOutput) { + printf("connid %d msg_length %zu read_length %zu, resetting low water mark\n", + conn->client_fd, + pbuff_len, + read_length); + + longBowDebug_MemoryDump((char *) pbuff, pbuff_len); + } + + // reflect will free the memory + reflect(conn, pbuff, pbuff_len); + + // we could do more after this + if (read_length > pbuff_len) { + ret = 1; + } + } else { + // we do not have an entire message, so skip and wait for more to be read + } + parcEventBuffer_Destroy(&input); + return ret; +} + +static void +conn_readcb(PARCEventQueue *bev, PARCEventType event, void *user_data) +{ + struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data; + + // drain the input buffer + while (single_read(bev, conn)) { + // empty + } +} + +/* + * reflect a message to other connections + * + * We should use the zero-copy deferred write... + */ +static void +reflect(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len) +{ + int i; + BentPipeState *bp = conn->parent; + + for (i = 0; i < MAX_CONN; i++) { + if (bp->conns[i].client_fd > 0 && bp->conns[i].client_fd != conn->client_fd) { + int res; + + if (bp->chattyOutput) { + printf("%s connid %d adding buffer length %zu\n", __func__, conn[i].client_fd, pbuff_len); + } + + if (bp->use_params) { + uint8_t *copy = parcMemory_Allocate(pbuff_len); + assertNotNull(copy, "parcMemory_Allocate(%zu) returned NULL", pbuff_len); + memcpy(copy, pbuff, pbuff_len); + queue_with_delay(conn, copy, pbuff_len, i); + } else { + res = parcEventQueue_Write(bp->conns[i].bev, pbuff, pbuff_len); + assertTrue(res == 0, "%s got parcEventQueue_Write error\n", __func__); + + localhdr *msg_hdr = (localhdr *) pbuff; + assertTrue(msg_hdr->length + sizeof(localhdr) == pbuff_len, + "msg_hdr messed up! expected %zu got %zu", + msg_hdr->length + sizeof(localhdr), + pbuff_len); + } + } + } + + parcMemory_Deallocate((void **) &pbuff); +} + +/** + * Queue a packet for later delivery. We calculate the needed delay and insert it + * in the connection's output_queue. If there is not a timer running (i.e. there are now + * exactly 1 elements in the queue), we start the timer for the connection. + * + * If the output queue is full, the packet might be freed and not added to the output queue. + */ +static void +queue_with_delay(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len, int i) +{ + BentPipeState *bp = conn->parent; + double delay_sec; + + struct timeval now; + struct timeval delay_tv; + struct timeval deadline; + + gettimeofday(&now, NULL); + + // 1) Apply loss rate + if (drand48() < bp->loss_rate) { + if (bp->chattyOutput) { + printf("%s random drop\n", __func__); + } + parcMemory_Deallocate((void **) &pbuff); + return; + } + + // 2) will it fit? + if (pbuff_len + bp->conns[i].bytes_in_queue >= bp->buffer_bytes) { + if (bp->chattyOutput) { + printf("%s queue full\n", __func__); + } + parcMemory_Deallocate((void **) &pbuff); + return; + } + + // 3) Determine delay + delay_sec = (double) pbuff_len / bp->bytes_per_sec; + + // 4) exponential delay + delay_sec += -1 * log(drand48()) * bp->mean_sec_delay; + + delay_tv.tv_sec = (time_t) floor(delay_sec); + delay_tv.tv_usec = (suseconds_t) floor((delay_sec - floor(delay_sec)) * 1E+6); + + struct packet_wrapper *wrapper = parcMemory_Allocate(sizeof(struct packet_wrapper)); + assertNotNull(wrapper, "parcMemory_Allocate(%zu) returned NULL", sizeof(struct packet_wrapper)); + wrapper->ingress_fd = conn->client_fd; + wrapper->pbuff = pbuff; + wrapper->pbuff_len = pbuff_len; + + timeradd(&now, &delay_tv, &deadline); + + wrapper->deadline = deadline; + bp->conns[i].last_deadline = deadline; + bp->conns[i].bytes_in_queue += pbuff_len; + + bp->conns[i].count_in_queue++; + TAILQ_INSERT_TAIL(&bp->conns[i].output_queue, wrapper, list); + + if (bp->chattyOutput) { + printf("%s queue %d fd %d count %d\n", __func__, i, bp->conns[i].client_fd, bp->conns[i].count_in_queue); + } + + // if this is first item in queue, set a timer + if (bp->conns[i].count_in_queue == 1) { + set_timer(&bp->conns[i], delay_tv); + } +} + +int +bentpipe_Params(BentPipeState *bp, double loss_rate, unsigned buffer_bytes, double mean_sec_delay, double bytes_per_sec) +{ + bp->use_params = true; + bp->loss_rate = loss_rate; + bp->buffer_bytes = buffer_bytes; + bp->mean_sec_delay = mean_sec_delay; + bp->bytes_per_sec = bytes_per_sec; + return 0; +} + +static +void +keepalive_cb(int fd, PARCEventType what, void *user_data) +{ + struct bentpipe_state *bp = (struct bentpipe_state *) user_data; + + if (!bp->startup_running) { + // indicate to anyone waiting that we're really running + if (bp->chattyOutput) { + printf("%s signalling startup_running\n", __func__); + } + + lock_bentpipe(bp); + bp->startup_running = true; + signal_bentpipe(bp); + unlock_bentpipe(bp); + return; + } + + if (bp->killme) { + parcEventScheduler_Abort(bp->base); + return; + } +} + +/** + * Each connection has its own timer. The timer is used to defer sending packets to a later time, + * such as to realize traffic shaping. + */ +static void +timer_cb(int fd, PARCEventType what, void *user_data) +{ + struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data; + BentPipeState *bp = conn->parent; + struct timeval now; + + gettimeofday(&now, NULL); + while (!TAILQ_EMPTY(&conn->output_queue)) { + int res; + struct packet_wrapper *wrapper = TAILQ_FIRST(&conn->output_queue); + + assertTrue(conn->count_in_queue > 0, "invalid state: count_in_queue is 0"); + + if (timercmp(&now, &wrapper->deadline, <)) { + break; + } + + conn->bytes_in_queue -= wrapper->pbuff_len; + conn->count_in_queue--; + + res = parcEventQueue_Write(conn->bev, wrapper->pbuff, wrapper->pbuff_len); + assertTrue(res == 0, "got parcEventQueue_Write error\n"); + + if (bp->chattyOutput) { + printf("%3.9f output conn %d bytes %zu\n", + now.tv_sec + now.tv_usec * 1E-6, conn->client_fd, wrapper->pbuff_len); + } + + TAILQ_REMOVE(&conn->output_queue, wrapper, list); + + parcMemory_Deallocate((void **) &(wrapper->pbuff)); + parcMemory_Deallocate((void **) &wrapper); + } + + if (!TAILQ_EMPTY(&conn->output_queue)) { + struct packet_wrapper *wrapper = TAILQ_FIRST(&conn->output_queue); + struct timeval delay; + timersub(&wrapper->deadline, &now, &delay); + + if (bp->chattyOutput) { + printf("connid %d scheduling next timer delay %.6f\n", + conn->client_fd, delay.tv_sec + 1E-6 * delay.tv_usec); + } + + assertTrue(delay.tv_sec >= 0 && delay.tv_usec >= 0, + "Got negative delay: now %.6f deadline %.6f delay %.6f", + now.tv_sec + 1E-6 * now.tv_usec, + wrapper->deadline.tv_sec + 1E-6 * wrapper->deadline.tv_usec, + delay.tv_sec + 1E-6 * delay.tv_usec); + + if (delay.tv_sec == 0 && delay.tv_usec < 1000) { + delay.tv_usec = 1000; + } + + set_timer(conn, delay); + } +} + +static void +set_timer(struct bentpipe_conn *conn, struct timeval delay) +{ + BentPipeState *bp = conn->parent; + // this replaces any prior events + + if (delay.tv_usec < 1000) { + delay.tv_usec = 1000; + } + + if (delay.tv_sec < 0) { + delay.tv_sec = 0; + } + + if (bp->chattyOutput) { + printf("%s connid %d delay %.6f timer_event %p\n", + __func__, + conn->client_fd, + delay.tv_sec + 1E-6 * delay.tv_usec, + (void *) conn->timer_event); + } + + parcEventTimer_Start(conn->timer_event, &delay); +} + +void +conn_errorcb(PARCEventQueue *bev, PARCEventQueueEventType events, void *user_data) +{ + if (events & PARCEventQueueEventType_EOF) { + struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data; + + if (conn->parent->chattyOutput) { + printf("%s Got EOF on connid %d fd %d socket\n", + __func__, + conn->client_fd, + parcEventQueue_GetFileDescriptor(bev)); + } + + deallocate_connection(conn->parent, conn); + } + + if (events & PARCEventQueueEventType_Error) { + struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data; + + printf("%s Got error on connid %d fd %d socket: %s\n", + __func__, + conn->client_fd, + parcEventQueue_GetFileDescriptor(bev), + strerror(errno)); + + deallocate_connection(conn->parent, conn); + } +} |