aboutsummaryrefslogtreecommitdiffstats
path: root/libccnx-transport-rta/ccnx/transport/test_tools
diff options
context:
space:
mode:
Diffstat (limited to 'libccnx-transport-rta/ccnx/transport/test_tools')
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c850
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.h148
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/ethersend.c211
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/pktgen.c195
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/test/.gitignore1
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/test/test_bent_pipe.c397
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.c292
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.h284
-rw-r--r--libccnx-transport-rta/ccnx/transport/test_tools/write_packets.c84
9 files changed, 2462 insertions, 0 deletions
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c
new file mode 100644
index 00000000..bf2c68db
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.c
@@ -0,0 +1,850 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include <LongBow/runtime.h>
+#include <LongBow/debugging.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <string.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <math.h>
+#include <signal.h>
+#include <errno.h>
+
+#include <sys/time.h>
+#include <sys/queue.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+#include <parc/algol/parc_Memory.h>
+#include <parc/algol/parc_EventTimer.h>
+#include <parc/algol/parc_EventQueue.h>
+#include <parc/algol/parc_EventBuffer.h>
+#include <parc/algol/parc_EventSocket.h>
+#include "bent_pipe.h"
+
+#define MAX_CONN 10
+
+struct packet_wrapper {
+ struct timeval deadline;
+ int ingress_fd;
+ uint8_t *pbuff;
+ size_t pbuff_len;
+
+ TAILQ_ENTRY(packet_wrapper) list;
+};
+
+struct bentpipe_conn {
+ int client_fd;
+ PARCEventQueue *bev;
+ BentPipeState *parent;
+
+ // after reading a header, this is how long the next message is
+ size_t msg_length;
+
+ // queue to send stuff up the connection
+ unsigned bytes_in_queue;
+ unsigned count_in_queue;
+ struct timeval last_deadline;
+ TAILQ_HEAD(, packet_wrapper) output_queue;
+
+ PARCEventTimer *timer_event;
+};
+
+struct bentpipe_state {
+ char *local_name;
+ PARCEventScheduler *base;
+ PARCEventSocket *listenerUnix;
+
+ struct bentpipe_conn conns[MAX_CONN];
+ unsigned conn_count;
+
+ pthread_t router_thread;
+
+ bool use_params;
+ double loss_rate;
+ unsigned buffer_bytes;
+ double mean_sec_delay;
+ double bytes_per_sec;
+
+ pthread_mutex_t startup_mutex;
+ pthread_cond_t startup_cond;
+ unsigned magic;
+ bool startup_running;
+
+ // used to signal into the thread to stop
+ bool killme;
+ PARCEventTimer *keep_alive_event;
+
+ // these track the state of sigpipe before we
+ // go into a write, so we can re-set the state
+
+ // sigpipe_pending indicates that it was pending before our call
+ bool sigpipe_pending;
+
+ // sigpipe_blocked means that it was blocked before our call, so
+ // don't unblock it when we're done
+ bool sigpipe_blocked;
+
+ // debugging output
+ bool chattyOutput;
+};
+
+typedef struct {
+ uint32_t pid;
+ uint32_t fd;
+ uint32_t length;
+ uint32_t pad; // pad out to 16 bytes
+} __attribute__ ((packed)) localhdr;
+
+static int setup_local(BentPipeState*bp);
+static void
+listener_cb(int fd, struct sockaddr *sa, int socklen, void *user_data);
+
+static void *run_bentpipe(void *arg);
+
+static void conn_readcb(PARCEventQueue *bev, PARCEventType event, void *connection);
+static void reflect(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len);
+
+static void
+queue_with_delay(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len, int i);
+
+static void timer_cb(int fd, PARCEventType what, void *user_data);
+static void set_timer(struct bentpipe_conn *conn, struct timeval delay);
+static void keepalive_cb(int fd, PARCEventType what, void *user_data);
+void conn_errorcb(PARCEventQueue *bev, PARCEventQueueEventType events, void *user_framework);
+
+#define MAGIC 0x01020304
+
+// ===============
+
+static void
+lock_bentpipe(BentPipeState *bp)
+{
+ int res = pthread_mutex_lock(&bp->startup_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_lock: %d", res);
+}
+
+static void
+unlock_bentpipe(BentPipeState *bp)
+{
+ int res = pthread_mutex_unlock(&bp->startup_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+static void
+wait_bentpipe(BentPipeState *bp)
+{
+ int res = pthread_cond_wait(&bp->startup_cond, &bp->startup_mutex);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+static void
+signal_bentpipe(BentPipeState *bp)
+{
+ int res = pthread_cond_signal(&bp->startup_cond);
+ assertTrue(res == 0, "error from pthread_mutex_unlock: %d", res);
+}
+
+/**
+ * if SIGPIPE was pending before we are called, don't do anything.
+ * Otherwise, mask SIGPIPE
+ */
+static void
+capture_sigpipe(BentPipeState *bp)
+{
+#if !defined(SO_NOSIGPIPE)
+ sigset_t pending;
+ sigemptyset(&pending);
+ sigpending(&pending);
+ bp->sigpipe_pending = sigismember(&pending, SIGPIPE);
+ if (!bp->sigpipe_pending) {
+ sigset_t sigpipe_mask;
+ sigemptyset(&sigpipe_mask);
+ sigaddset(&sigpipe_mask, SIGPIPE);
+
+ sigset_t blocked;
+ sigemptyset(&blocked);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
+ bp->sigpipe_blocked = sigismember(&blocked, SIGPIPE);
+ }
+#endif
+}
+
+static void
+release_sigpipe(BentPipeState *bp)
+{
+#if !defined(SO_NOSIGPIPE)
+ // If sigpipe was previously pending, we didnt block it, so
+ // nothing new to do
+ if (!bp->sigpipe_pending) {
+ sigset_t pending;
+
+ sigset_t sigpipe_mask;
+ sigemptyset(&sigpipe_mask);
+ sigaddset(&sigpipe_mask, SIGPIPE);
+
+ sigemptyset(&pending);
+ sigpending(&pending);
+
+ if (!bp->sigpipe_blocked) {
+ pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
+ }
+ }
+#endif
+}
+
+BentPipeState *
+bentpipe_Create(const char *local_name)
+{
+ assertNotNull(local_name, "Parameter local_name must be non-null");
+
+ struct timeval keepalive_timeout = { .tv_sec = 0, .tv_usec = 500000 };
+
+ BentPipeState *bp = parcMemory_AllocateAndClear(sizeof(struct bentpipe_state));
+ assertNotNull(bp, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct bentpipe_state));
+
+ bp->local_name = parcMemory_StringDuplicate((char *) local_name, 1024);
+ bp->conn_count = 0;
+ bp->base = parcEventScheduler_Create();
+ bp->use_params = false;
+ bp->chattyOutput = false;
+
+ if (!bp->base) {
+ fprintf(stderr, "Could not initialize PARCEventScheduler!\n");
+ return NULL;
+ }
+
+ pthread_mutex_init(&bp->startup_mutex, NULL);
+ pthread_cond_init(&bp->startup_cond, NULL);
+ bp->startup_running = false;
+ bp->magic = MAGIC;
+ bp->keep_alive_event = parcEventTimer_Create(bp->base, PARCEventType_Persist, keepalive_cb, bp);
+ parcEventTimer_Start(bp->keep_alive_event, &keepalive_timeout);
+
+ for (int i = 0; i < MAX_CONN; i++) {
+ bp->conns[i].bytes_in_queue = 0;
+ TAILQ_INIT(&bp->conns[i].output_queue);
+ bp->conns[i].timer_event = parcEventTimer_Create(bp->base, 0, timer_cb, &bp->conns[i]);
+ }
+ setup_local(bp);
+ capture_sigpipe(bp);
+ return bp;
+}
+
+void
+bentpipe_Destroy(BentPipeState **bpPtr)
+{
+ BentPipeState *bp;
+ assertNotNull(bpPtr, "%s got null double pointer\n", __func__);
+ bp = *bpPtr;
+ assertNotNull(bp, "%s got null dereference\n", __func__);
+
+ assertFalse(bp->startup_running, "calling destroy on a running bentpipe\n");
+
+ int i;
+ for (i = 0; i < MAX_CONN; i++) {
+ if (bp->conns[i].client_fd > 0) {
+ // this closes it too
+ parcEventQueue_Destroy(&(bp->conns[i].bev));
+ bp->conns[i].client_fd = 0;
+ }
+ parcEventTimer_Destroy(&(bp->conns[i].timer_event));
+ }
+
+ parcEventTimer_Destroy(&(bp->keep_alive_event));
+ parcEventSocket_Destroy(&(bp->listenerUnix));
+ parcEventScheduler_Destroy(&(bp->base));
+ bp->conn_count = 0;
+
+ int failure = unlink(bp->local_name);
+ if (failure) {
+ printf("Error unlinking '%s': (%d) %s\n", bp->local_name, errno, strerror(errno));
+ }
+
+ release_sigpipe(bp);
+ parcMemory_Deallocate((void **) &(bp->local_name));
+ parcMemory_Deallocate((void **) &bp);
+}
+
+void
+bentpipe_SetChattyOutput(BentPipeState *bp, bool chattyOutput)
+{
+ bp->chattyOutput = chattyOutput;
+}
+
+int
+bentpipe_Start(BentPipeState *bp)
+{
+ assertFalse(bp->startup_running, "bentpipe_Start already running");
+ pthread_attr_t attr;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+
+ assertTrue(pthread_create(&bp->router_thread, &attr, run_bentpipe, bp) == 0, "pthread_create failed.");
+// if (pthread_create(&bp->router_thread, &attr, run_bentpipe, bp) != 0) {
+// perror("pthread_create router thread");
+// abort();
+// }
+
+ lock_bentpipe(bp);
+ while (!bp->startup_running) {
+ wait_bentpipe(bp);
+ }
+ unlock_bentpipe(bp);
+
+ return 0;
+}
+
+int
+bentpipe_Stop(BentPipeState *bp)
+{
+ assertTrue(bp->magic == MAGIC, "Magic is not magic value! got %08X", bp->magic);
+ assertTrue(bp->startup_running, "Calling stop on a stopped bentpipe\n");
+
+ bp->killme = true;
+
+ // wait until exist
+ lock_bentpipe(bp);
+ while (bp->startup_running) {
+ wait_bentpipe(bp);
+ }
+ unlock_bentpipe(bp);
+
+ return 0;
+}
+
+// ==================================
+
+void
+listener_errorcb(PARCEventScheduler *base, int error, char *errorString, void *addr_unix)
+{
+ fprintf(stderr, "Got an error %d (%s) on the listener. "
+ "Shutting down.\n", error, errorString);
+
+ parcEventScheduler_Stop(base, NULL);
+}
+
+static int
+setup_local(BentPipeState*bp)
+{
+ // cleanup anything left on file system
+ unlink(bp->local_name);
+
+ struct sockaddr_un addr_unix;
+ memset(&addr_unix, 0, sizeof(addr_unix));
+
+ addr_unix.sun_family = PF_UNIX;
+ strcpy(addr_unix.sun_path, bp->local_name);
+
+ if (bp->chattyOutput) {
+ printf("bent_pipe Creating '%s'", bp->local_name);
+ }
+
+ bp->listenerUnix = parcEventSocket_Create(bp->base,
+ listener_cb,
+ listener_errorcb,
+ (void *) bp,
+ (struct sockaddr*) &addr_unix,
+ sizeof(addr_unix));
+
+ assertNotNull(bp->listenerUnix, "parcEventSocket_Create failed: unix %s", bp->local_name);
+
+ return 0;
+}
+
+static void *
+run_bentpipe(void *arg)
+{
+ BentPipeState *bp = (BentPipeState *) arg;
+
+ if (bp->chattyOutput) {
+ printf("%s starting\n", __func__);
+ }
+
+ // The keepalive timer will signal that we have started running
+ parcEventScheduler_Start(bp->base, PARCEventSchedulerDispatchType_Blocking);
+
+ if (bp->chattyOutput) {
+ printf("%s exiting\n", __func__);
+ }
+
+ lock_bentpipe(bp);
+ bp->startup_running = false;
+ signal_bentpipe(bp);
+ unlock_bentpipe(bp);
+
+ pthread_exit(NULL);
+}
+
+static struct bentpipe_conn *
+allocate_connection(BentPipeState *bp)
+{
+ if (bp->conn_count == MAX_CONN) {
+ printf("allocate_connection: connection count is %d, maximum count is %d\n",
+ bp->conn_count, MAX_CONN);
+ return NULL;
+ }
+
+ for (int i = 0; i < MAX_CONN; i++) {
+ if (bp->conns[i].client_fd == 0) {
+ bp->conn_count++;
+ return &bp->conns[i];
+ }
+ }
+
+ // should never get here
+ abort();
+}
+
+static void
+deallocate_connection(BentPipeState *bp, struct bentpipe_conn *conn)
+{
+ assertTrue(bp->conn_count > 0, "invalid state, called deallocate_connection when conn_count is zero");
+
+ conn->client_fd = 0;
+
+ if (bp->chattyOutput) {
+ printf("destroying connection %p eventqueue %p\n", (void *) conn, (void *) conn->bev);
+ }
+
+ parcEventQueue_Disable(conn->bev, PARCEventType_Read);
+ parcEventQueue_Destroy(&(conn->bev));
+
+ // this unschedules any callbacks, but the timer is still allocated
+ // timer_event is freed in bentPipe_Destroy()
+ parcEventTimer_Stop(conn->timer_event);
+
+ bp->conn_count--;
+}
+
+/*
+ * Server accepts a new client
+ */
+static void
+listener_cb(int fd, struct sockaddr *sa, int socklen, void *user_data)
+{
+ BentPipeState *bp = (BentPipeState *) user_data;
+
+ // allocate a connection
+ struct bentpipe_conn *conn = allocate_connection(bp);
+
+ conn->parent = bp;
+ conn->client_fd = fd;
+
+ // Set non-blocking flag
+ int flags = fcntl(fd, F_GETFL, NULL);
+ assertTrue(flags != -1, "fcntl failed to obtain file descriptor flags (%d)\n", errno);
+ int failure = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ assertFalse(failure, "fcntl failed to set file descriptor flags (%d)\n", errno);
+
+#if defined(SO_NOSIGPIPE)
+ int set = 1;
+ setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *) &set, sizeof(int));
+#endif
+
+ conn->bev = parcEventQueue_Create(bp->base, fd, PARCEventQueueOption_CloseOnFree | PARCEventQueueOption_DeferCallbacks);
+ if (!conn->bev) {
+ fprintf(stderr, "Error constructing parcEventQueue!");
+ parcEventScheduler_Abort(bp->base);
+ return;
+ }
+
+ parcEventQueue_SetCallbacks(conn->bev, conn_readcb, NULL, conn_errorcb, (void *) conn);
+ parcEventQueue_Enable(conn->bev, PARCEventType_Read);
+
+ if (bp->chattyOutput) {
+ printf("%s accepted connection on fd %d conn %p eventqueue %p\n",
+ __func__, fd, (void *) conn, (void *) conn->bev);
+ }
+}
+
+/*
+ * Return 1 if we read an entire message and we can try another one.
+ * return 0 if we're waiting for bytes.
+ */
+static int
+single_read(PARCEventQueue *bev, struct bentpipe_conn *conn)
+{
+ BentPipeState *bp = conn->parent;
+ PARCEventBuffer *input = parcEventBuffer_GetQueueBufferInput(bev);
+ size_t read_length = parcEventBuffer_GetLength(input);
+ int ret = 0;
+
+ if (bp->chattyOutput) {
+ printf("single_read: connid %d read %zu bytes\n", conn->client_fd, read_length);
+ }
+
+ if (read_length == 0) {
+ // 0 length means EOF, close the connection
+ if (bp->chattyOutput) {
+ printf("single_read: connid %d read_length %zu, EOF, closing connection\n",
+ conn->client_fd,
+ read_length);
+ }
+ deallocate_connection(conn->parent, conn);
+ parcEventBuffer_Destroy(&input);
+ return 0;
+ }
+
+ // head we read the message header, which sets the message length?
+ if (conn->msg_length == 0) {
+ // did we read a whole header?
+ if (read_length >= sizeof(localhdr)) {
+ // yes we did, we can now read the message header then try to read the whole message
+ // Note that this does not remove the header from the buffer
+
+ localhdr *msg_hdr = (localhdr *) parcEventBuffer_Pullup(input, sizeof(localhdr));
+ conn->msg_length = msg_hdr->length;
+
+ assertTrue(conn->msg_length > 0, "single_read: msg_hdr length is 0!");
+ assertTrue(conn->msg_length < 64000, "single_read: msg_hdr length too large: %zu", conn->msg_length);
+
+ if (bp->chattyOutput) {
+ printf("single_read: %s start read_length %zu msg_length %zu\n", __func__, read_length, conn->msg_length);
+ }
+ } else {
+ // no we did not, wait for more data before procesing.
+
+ if (bp->chattyOutput) {
+ printf("single_read: %s short read %zu\n", __func__, read_length);
+ }
+ }
+ }
+
+ // if read_length < sizeof(localhdr), then this is false.
+ // Otherwise, we've read the header and set msg_length, so if read_length
+ // is greater than the sum we have a full message plus header
+
+ if (read_length >= sizeof(localhdr) + conn->msg_length) {
+ size_t pbuff_len = sizeof(localhdr) + conn->msg_length;
+ int bytes_removed;
+
+ uint8_t *pbuff = parcMemory_Allocate(pbuff_len);
+ assertNotNull(pbuff, "parcMemory_Allocate(%zu) returned NULL", pbuff_len);
+
+ // dequeue into packet buffer
+ bytes_removed = parcEventBuffer_Read(input, (void *) pbuff, pbuff_len);
+ assertTrue(bytes_removed == pbuff_len, "parcEventBuffer read wrong length, expected %zu got %d", pbuff_len, bytes_removed);
+
+ // now reset message length for next packet
+ conn->msg_length = 0;
+
+ if (bp->chattyOutput) {
+ printf("connid %d msg_length %zu read_length %zu, resetting low water mark\n",
+ conn->client_fd,
+ pbuff_len,
+ read_length);
+
+ longBowDebug_MemoryDump((char *) pbuff, pbuff_len);
+ }
+
+ // reflect will free the memory
+ reflect(conn, pbuff, pbuff_len);
+
+ // we could do more after this
+ if (read_length > pbuff_len) {
+ ret = 1;
+ }
+ } else {
+ // we do not have an entire message, so skip and wait for more to be read
+ }
+ parcEventBuffer_Destroy(&input);
+ return ret;
+}
+
+static void
+conn_readcb(PARCEventQueue *bev, PARCEventType event, void *user_data)
+{
+ struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data;
+
+ // drain the input buffer
+ while (single_read(bev, conn)) {
+ // empty
+ }
+}
+
+/*
+ * reflect a message to other connections
+ *
+ * We should use the zero-copy deferred write...
+ */
+static void
+reflect(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len)
+{
+ int i;
+ BentPipeState *bp = conn->parent;
+
+ for (i = 0; i < MAX_CONN; i++) {
+ if (bp->conns[i].client_fd > 0 && bp->conns[i].client_fd != conn->client_fd) {
+ int res;
+
+ if (bp->chattyOutput) {
+ printf("%s connid %d adding buffer length %zu\n", __func__, conn[i].client_fd, pbuff_len);
+ }
+
+ if (bp->use_params) {
+ uint8_t *copy = parcMemory_Allocate(pbuff_len);
+ assertNotNull(copy, "parcMemory_Allocate(%zu) returned NULL", pbuff_len);
+ memcpy(copy, pbuff, pbuff_len);
+ queue_with_delay(conn, copy, pbuff_len, i);
+ } else {
+ res = parcEventQueue_Write(bp->conns[i].bev, pbuff, pbuff_len);
+ assertTrue(res == 0, "%s got parcEventQueue_Write error\n", __func__);
+
+ localhdr *msg_hdr = (localhdr *) pbuff;
+ assertTrue(msg_hdr->length + sizeof(localhdr) == pbuff_len,
+ "msg_hdr messed up! expected %zu got %zu",
+ msg_hdr->length + sizeof(localhdr),
+ pbuff_len);
+ }
+ }
+ }
+
+ parcMemory_Deallocate((void **) &pbuff);
+}
+
+/**
+ * Queue a packet for later delivery. We calculate the needed delay and insert it
+ * in the connection's output_queue. If there is not a timer running (i.e. there are now
+ * exactly 1 elements in the queue), we start the timer for the connection.
+ *
+ * If the output queue is full, the packet might be freed and not added to the output queue.
+ */
+static void
+queue_with_delay(struct bentpipe_conn *conn, uint8_t *pbuff, size_t pbuff_len, int i)
+{
+ BentPipeState *bp = conn->parent;
+ double delay_sec;
+
+ struct timeval now;
+ struct timeval delay_tv;
+ struct timeval deadline;
+
+ gettimeofday(&now, NULL);
+
+ // 1) Apply loss rate
+ if (drand48() < bp->loss_rate) {
+ if (bp->chattyOutput) {
+ printf("%s random drop\n", __func__);
+ }
+ parcMemory_Deallocate((void **) &pbuff);
+ return;
+ }
+
+ // 2) will it fit?
+ if (pbuff_len + bp->conns[i].bytes_in_queue >= bp->buffer_bytes) {
+ if (bp->chattyOutput) {
+ printf("%s queue full\n", __func__);
+ }
+ parcMemory_Deallocate((void **) &pbuff);
+ return;
+ }
+
+ // 3) Determine delay
+ delay_sec = (double) pbuff_len / bp->bytes_per_sec;
+
+ // 4) exponential delay
+ delay_sec += -1 * log(drand48()) * bp->mean_sec_delay;
+
+ delay_tv.tv_sec = (time_t) floor(delay_sec);
+ delay_tv.tv_usec = (suseconds_t) floor((delay_sec - floor(delay_sec)) * 1E+6);
+
+ struct packet_wrapper *wrapper = parcMemory_Allocate(sizeof(struct packet_wrapper));
+ assertNotNull(wrapper, "parcMemory_Allocate(%zu) returned NULL", sizeof(struct packet_wrapper));
+ wrapper->ingress_fd = conn->client_fd;
+ wrapper->pbuff = pbuff;
+ wrapper->pbuff_len = pbuff_len;
+
+ timeradd(&now, &delay_tv, &deadline);
+
+ wrapper->deadline = deadline;
+ bp->conns[i].last_deadline = deadline;
+ bp->conns[i].bytes_in_queue += pbuff_len;
+
+ bp->conns[i].count_in_queue++;
+ TAILQ_INSERT_TAIL(&bp->conns[i].output_queue, wrapper, list);
+
+ if (bp->chattyOutput) {
+ printf("%s queue %d fd %d count %d\n", __func__, i, bp->conns[i].client_fd, bp->conns[i].count_in_queue);
+ }
+
+ // if this is first item in queue, set a timer
+ if (bp->conns[i].count_in_queue == 1) {
+ set_timer(&bp->conns[i], delay_tv);
+ }
+}
+
+int
+bentpipe_Params(BentPipeState *bp, double loss_rate, unsigned buffer_bytes, double mean_sec_delay, double bytes_per_sec)
+{
+ bp->use_params = true;
+ bp->loss_rate = loss_rate;
+ bp->buffer_bytes = buffer_bytes;
+ bp->mean_sec_delay = mean_sec_delay;
+ bp->bytes_per_sec = bytes_per_sec;
+ return 0;
+}
+
+static
+void
+keepalive_cb(int fd, PARCEventType what, void *user_data)
+{
+ struct bentpipe_state *bp = (struct bentpipe_state *) user_data;
+
+ if (!bp->startup_running) {
+ // indicate to anyone waiting that we're really running
+ if (bp->chattyOutput) {
+ printf("%s signalling startup_running\n", __func__);
+ }
+
+ lock_bentpipe(bp);
+ bp->startup_running = true;
+ signal_bentpipe(bp);
+ unlock_bentpipe(bp);
+ return;
+ }
+
+ if (bp->killme) {
+ parcEventScheduler_Abort(bp->base);
+ return;
+ }
+}
+
+/**
+ * Each connection has its own timer. The timer is used to defer sending packets to a later time,
+ * such as to realize traffic shaping.
+ */
+static void
+timer_cb(int fd, PARCEventType what, void *user_data)
+{
+ struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data;
+ BentPipeState *bp = conn->parent;
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+ while (!TAILQ_EMPTY(&conn->output_queue)) {
+ int res;
+ struct packet_wrapper *wrapper = TAILQ_FIRST(&conn->output_queue);
+
+ assertTrue(conn->count_in_queue > 0, "invalid state: count_in_queue is 0");
+
+ if (timercmp(&now, &wrapper->deadline, <)) {
+ break;
+ }
+
+ conn->bytes_in_queue -= wrapper->pbuff_len;
+ conn->count_in_queue--;
+
+ res = parcEventQueue_Write(conn->bev, wrapper->pbuff, wrapper->pbuff_len);
+ assertTrue(res == 0, "got parcEventQueue_Write error\n");
+
+ if (bp->chattyOutput) {
+ printf("%3.9f output conn %d bytes %zu\n",
+ now.tv_sec + now.tv_usec * 1E-6, conn->client_fd, wrapper->pbuff_len);
+ }
+
+ TAILQ_REMOVE(&conn->output_queue, wrapper, list);
+
+ parcMemory_Deallocate((void **) &(wrapper->pbuff));
+ parcMemory_Deallocate((void **) &wrapper);
+ }
+
+ if (!TAILQ_EMPTY(&conn->output_queue)) {
+ struct packet_wrapper *wrapper = TAILQ_FIRST(&conn->output_queue);
+ struct timeval delay;
+ timersub(&wrapper->deadline, &now, &delay);
+
+ if (bp->chattyOutput) {
+ printf("connid %d scheduling next timer delay %.6f\n",
+ conn->client_fd, delay.tv_sec + 1E-6 * delay.tv_usec);
+ }
+
+ assertTrue(delay.tv_sec >= 0 && delay.tv_usec >= 0,
+ "Got negative delay: now %.6f deadline %.6f delay %.6f",
+ now.tv_sec + 1E-6 * now.tv_usec,
+ wrapper->deadline.tv_sec + 1E-6 * wrapper->deadline.tv_usec,
+ delay.tv_sec + 1E-6 * delay.tv_usec);
+
+ if (delay.tv_sec == 0 && delay.tv_usec < 1000) {
+ delay.tv_usec = 1000;
+ }
+
+ set_timer(conn, delay);
+ }
+}
+
+static void
+set_timer(struct bentpipe_conn *conn, struct timeval delay)
+{
+ BentPipeState *bp = conn->parent;
+ // this replaces any prior events
+
+ if (delay.tv_usec < 1000) {
+ delay.tv_usec = 1000;
+ }
+
+ if (delay.tv_sec < 0) {
+ delay.tv_sec = 0;
+ }
+
+ if (bp->chattyOutput) {
+ printf("%s connid %d delay %.6f timer_event %p\n",
+ __func__,
+ conn->client_fd,
+ delay.tv_sec + 1E-6 * delay.tv_usec,
+ (void *) conn->timer_event);
+ }
+
+ parcEventTimer_Start(conn->timer_event, &delay);
+}
+
+void
+conn_errorcb(PARCEventQueue *bev, PARCEventQueueEventType events, void *user_data)
+{
+ if (events & PARCEventQueueEventType_EOF) {
+ struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data;
+
+ if (conn->parent->chattyOutput) {
+ printf("%s Got EOF on connid %d fd %d socket\n",
+ __func__,
+ conn->client_fd,
+ parcEventQueue_GetFileDescriptor(bev));
+ }
+
+ deallocate_connection(conn->parent, conn);
+ }
+
+ if (events & PARCEventQueueEventType_Error) {
+ struct bentpipe_conn *conn = (struct bentpipe_conn *) user_data;
+
+ printf("%s Got error on connid %d fd %d socket: %s\n",
+ __func__,
+ conn->client_fd,
+ parcEventQueue_GetFileDescriptor(bev),
+ strerror(errno));
+
+ deallocate_connection(conn->parent, conn);
+ }
+}
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.h b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.h
new file mode 100644
index 00000000..8c726b8b
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/bent_pipe.h
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file bent_pipe.h
+ * @brief <#Brief Description#>
+ *
+ * PF_UNIX "forwarder" for testing. It's a bent pipe. Whatever comes down
+ * one connection goes up all the others.
+ *
+ * It runs in its own thread and uses an event model.
+ *
+ * We capture SIG_PIPE when doing a write so this test code does not
+ * trigger a process-wide SIG_PIPE. Unless we have SO_SIGPIPE, then we'll use that.
+ *
+ */
+#ifndef Libccnx_bent_pipe_h
+#define Libccnx_bent_pipe_h
+
+#include <stdbool.h>
+
+struct bentpipe_state;
+/**
+ *
+ * @see bentpipe_Create
+ */
+typedef struct bentpipe_state BentPipeState;
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+BentPipeState *bentpipe_Create(const char *local_name);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+void bentpipe_Destroy(BentPipeState **statePtr);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int bentpipe_Start(BentPipeState *bp);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int bentpipe_Stop(BentPipeState *bp);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+void bentpipe_SetChattyOutput(BentPipeState *bp, bool chattyOutput);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+int bentpipe_Params(BentPipeState *bp, double loss_rate, unsigned buffer_bytes, double mean_sec_delay, double bytes_per_sec);
+#endif // Libccnx_bent_pipe_h
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/ethersend.c b/libccnx-transport-rta/ccnx/transport/test_tools/ethersend.c
new file mode 100644
index 00000000..9db1a7e3
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/ethersend.c
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <ifaddrs.h>
+#include <assert.h>
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+
+#include <sys/types.h>
+#include <netinet/in.h>
+
+typedef uint8_t u_char;
+typedef uint16_t u_short;
+typedef uint32_t u_int;
+
+
+#include <pcap.h>
+
+#define ETHERTYPE 0x0801
+
+struct _packet {
+ u_int8_t dst[6];
+ u_int8_t src[6];
+ u_int16_t ethertype;
+ u_int8_t data[0];
+} __attribute__((packed));
+
+
+static void send_file(pcap_t *handle, uint8_t smac[], uint8_t dmac[], const char *filename);
+
+
+static
+void
+printhex(u_int8_t *buffer, int length)
+{
+ int i;
+ for (i = 0; i < length; i++) {
+ printf("%02X", buffer[i]);
+ }
+}
+
+
+
+static void
+send_file(pcap_t *handle, uint8_t smac[], uint8_t dmac[], const char *filename)
+{
+ struct stat statbuf;
+
+ struct _packet *packet = malloc(1500);
+ memset(packet, 0, 1500);
+
+ int fd = open(filename, O_RDONLY);
+ if (fd < 0) {
+ perror("Error opening file: ");
+ abort();
+ }
+
+ if (fstat(fd, &statbuf) < 0) {
+ perror("fstat error");
+ abort();
+ }
+
+ uint8_t *src = mmap(0, statbuf.st_size, PROT_READ, MAP_SHARED, fd, 0);
+ if (src == MAP_FAILED) {
+ perror("mmap error");
+ abort();
+ }
+
+ assert(statbuf.st_size <= 1500);
+
+ size_t len = sizeof(struct _packet) + statbuf.st_size;
+ printf("Sending config/query size %zu\n", len);
+
+ memcpy(packet->dst, dmac, 6);
+ memcpy(packet->src, smac, 6);
+
+ packet->ethertype = htons(ETHERTYPE);
+
+ memcpy(packet->data, src, statbuf.st_size);
+
+ int x = pcap_inject(handle, packet, len);
+ printf("%s wrote %d bytes\n", __func__, x);
+
+ free(packet);
+}
+
+static
+void
+get_mac_address(const char *deviceName, u_int8_t *mac)
+{
+ struct ifaddrs *ifap;
+ struct ifaddrs *next;
+
+ int x = getifaddrs(&ifap);
+ if (x != 0) {
+ perror("getifaddrs");
+ exit(EXIT_FAILURE);
+ }
+
+ next = ifap;
+ while (next != NULL) {
+#if defined(__APPLE__)
+ if (strstr(deviceName, next->ifa_name) != NULL && next->ifa_addr->sa_family == AF_LINK)
+#elif defined(__linux__)
+ if (strstr(deviceName, next->ifa_name) != NULL && next->ifa_addr->sa_family == AF_PACKET)
+#else
+#error Unsupported platform
+#endif
+ {
+ memcpy(mac, next->ifa_addr->sa_data + 9, 6);
+ break;
+ }
+ next = next->ifa_next;
+ }
+ freeifaddrs(ifap);
+}
+
+static void
+macStringToArray(const char *string, size_t outputLength, uint8_t output[])
+{
+ assert(outputLength == 6);
+
+ sscanf(string, "%02x:%02x:%02x:%02x:%02x:%02x", &output[0], &output[1], &output[2], &output[3], &output[4], &output[5]);
+}
+
+int
+main(int argc, const char *argv[])
+{
+ if (argc != 4 || argv[1][0] == '-') {
+ printf("usage: ethersend dev dst filename\n");
+ printf("\n");
+ printf("Will send filename as the payload of an ethernet frame to dst\n");
+ printf("\n");
+ printf("example: ethersend eth0 a8:20:66:3b:30:bc interest.bin\n");
+ printf("\n");
+ exit(EXIT_FAILURE);
+ }
+
+ pcap_t *handle; /* Session handle */
+ const char *dev = argv[1]; /* The device to sniff on */
+ char errbuf[PCAP_ERRBUF_SIZE]; /* Error string */
+ struct bpf_program fp; /* The compiled filter */
+ char filter_exp[1024]; /* The filter expression */
+ bpf_u_int32 mask; /* Our netmask */
+ bpf_u_int32 net; /* Our IP */
+
+ sprintf(filter_exp, "ether proto 0x%04X", ETHERTYPE);
+
+ printf("dev = %s\n", dev);
+
+ /* Find the properties for the device */
+ if (pcap_lookupnet(dev, &net, &mask, errbuf) == -1) {
+ fprintf(stderr, "Couldn't get netmask for device %s: %s\n", dev, errbuf);
+ net = 0;
+ mask = 0;
+ }
+
+ u_int8_t mymac[6];
+ get_mac_address(dev, mymac);
+ printf("My mac address: "); printhex(mymac, 6); printf("\n");
+
+ u_int8_t dmac[6];
+ macStringToArray(argv[2], 6, dmac);
+ printf("dmac address : "); printhex(dmac, 6); printf("\n");
+
+
+ /* Open the session in promiscuous mode */
+ handle = pcap_open_live(dev, 1500, 1, 1000, errbuf);
+ if (handle == NULL) {
+ fprintf(stderr, "Couldn't open device %s: %s\n", dev, errbuf);
+ return (2);
+ }
+
+ /* Compile and apply the filter */
+ if (pcap_compile(handle, &fp, filter_exp, 0, net) == -1) {
+ fprintf(stderr, "Couldn't parse filter %s: %s\n", filter_exp, pcap_geterr(handle));
+ return (2);
+ }
+
+ if (pcap_setfilter(handle, &fp) == -1) {
+ fprintf(stderr, "Couldn't install filter %s: %s\n", filter_exp, pcap_geterr(handle));
+ return (2);
+ }
+
+ send_file(handle, mymac, dmac, argv[3]);
+
+
+ pcap_close(handle);
+ return (0);
+}
+
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/pktgen.c b/libccnx-transport-rta/ccnx/transport/test_tools/pktgen.c
new file mode 100644
index 00000000..8f1cc087
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/pktgen.c
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Generate packets
+ *
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <LongBow/runtime.h>
+
+typedef enum {
+ MODE_SEND,
+ MODE_REPLY
+} PktGenMode;
+
+typedef enum {
+ ENCAP_ETHER,
+ ENCAP_UDP
+} PktGenEncap;
+
+typedef enum {
+ PKTGEN_STREAM,
+ PKTGEN_STOPWAIT
+} PktGenFlow;
+
+typedef struct {
+ PktGenMode mode;
+ PktGenEncap encap;
+ PktGenFlow flow;
+
+ char *ifname;
+ char *etherOrIp;
+ char *etherType;
+ unsigned count;
+
+ struct timeval startTime;
+ struct timeval stopTime;
+ unsigned packetCount;
+} PktGen;
+
+// ======================================================================
+
+static void
+usage(void)
+{
+ printf("usage: \n");
+ printf(" This program functions as a requester and a responder. They operate in a pair.\n");
+ printf(" The test can run over raw Ethernet encapsulation or over UDP\n");
+ printf(" The <count> parameter can be an integer or use a 'kmg' suffix for 1000, 1E+6, or 1E+9\n");
+ printf("\n");
+ printf(" pktgen send ether <ifname> <dstmac> [ethertype] count <n> (stream | stopwait)\n");
+ printf(" pktgen reply ether <ifname> [count <n>]\n");
+ printf("\n");
+ printf(" This mode sends either a stream or stop-and-wait request to an Ethernet peer\n");
+ printf(" pktgen send udp <ifname> <dstip> <dstport> count <n> (stream | stopwait)\n");
+ printf(" pktgen reply udp <ifname> [count <n>]\n");
+ printf("\n");
+ printf(" Examples:\n");
+ printf(" This uses the standard Ethertype of 0x0801. The replier will stay running forever.\n");
+ printf(" pktgen send ether em1 bc:30:5b:f2:2f:60 count 1M stream\n");
+ printf(" pktgen reply ether em1\n");
+ printf("\n");
+ printf(" This uses a custom ethertype. The replier will stay running forever.\n");
+ printf(" pktgen send ether em1 bc:30:5b:f2:2f:60 0x9000 count 1M stream\n");
+ printf(" pktgen reply ether em1\n");
+ printf("\n");
+ printf(" An example with UDP\n");
+ printf(" pktgen send udp em1 10.1.0.2 9695 count 1M stopwait\n");
+ printf(" pktgen reply udp em1\n");
+ printf("\n");
+}
+
+static PktGen
+parseCommandLine(int argc, char *argv[argc])
+{
+ PktGen pktgen;
+ memset(&pktgen, 0, sizeof(PktGen));
+
+ usage();
+
+ return pktgen;
+}
+
+// ======================================================================
+
+static void
+generateEther(PktGen *pktgen)
+{
+ printf("Generating %u ethernet interest messages\n", pktgen->count);
+}
+
+static void
+replyEther(PktGen *pktgen)
+{
+ printf("replying up to %u ethernet content objects messages\n", pktgen->count);
+}
+
+// ======================================================================
+
+static void
+generateUdp(PktGen *pktgen)
+{
+ printf("Generating %u UDP interest messages\n", pktgen->count);
+}
+
+static void
+replyUdp(PktGen *pktgen)
+{
+ printf("replying up to %u UDP content objects messages\n", pktgen->count);
+}
+
+
+// ======================================================================
+
+static void
+displayStatistics(PktGen *pktgen)
+{
+ printf("stats.... coming soon\n");
+}
+
+// ======================================================================
+
+static void
+runSender(PktGen *pktgen)
+{
+ switch (pktgen->encap) {
+ case ENCAP_ETHER:
+ generateEther(pktgen);
+ break;
+
+ case ENCAP_UDP:
+ generateUdp(pktgen);
+ break;
+
+ default:
+ trapIllegalValue(pktgen.encap, "Unknown encapsulation: %d", pktgen->encap);
+ }
+}
+
+static void
+runReplier(PktGen *pktgen)
+{
+ switch (pktgen->encap) {
+ case ENCAP_ETHER:
+ replyEther(pktgen);
+ break;
+
+ case ENCAP_UDP:
+ replyUdp(pktgen);
+ break;
+
+ default:
+ trapIllegalValue(pktgen.encap, "Unknown encapsulation: %d", pktgen->encap);
+ }
+}
+
+// ======================================================================
+
+int
+main(int argc, char *argv[argc])
+{
+ PktGen pktgen = parseCommandLine(argc, argv);
+
+ switch (pktgen.mode) {
+ case MODE_SEND:
+ runSender(&pktgen);
+ break;
+
+ case MODE_REPLY:
+ runReplier(&pktgen);
+ break;
+
+ default:
+ trapIllegalValue(pktgen.mode, "Unknown mode: %d", pktgen.mode);
+ }
+
+ displayStatistics(&pktgen);
+}
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/test/.gitignore b/libccnx-transport-rta/ccnx/transport/test_tools/test/.gitignore
new file mode 100644
index 00000000..e1bb21dc
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/test/.gitignore
@@ -0,0 +1 @@
+test_bent_pipe
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/test/test_bent_pipe.c b/libccnx-transport-rta/ccnx/transport/test_tools/test/test_bent_pipe.c
new file mode 100644
index 00000000..0fbe9683
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/test/test_bent_pipe.c
@@ -0,0 +1,397 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//
+// test_bent_pipe.c
+// Libccnx
+//
+
+#include "../bent_pipe.c"
+
+#include <stdio.h>
+#include <sys/un.h>
+#include <strings.h>
+#include <sys/queue.h>
+#include <errno.h>
+
+#include <LongBow/unit-test.h>
+#include <LongBow/runtime.h>
+
+#include <parc/algol/parc_Memory.h>
+
+#define CHATTY 1
+
+static const char *local_name = "/tmp/alpha";
+
+LONGBOW_TEST_RUNNER(BentPipe)
+{
+ LONGBOW_RUN_TEST_FIXTURE(CreateDestroy);
+ LONGBOW_RUN_TEST_FIXTURE(System);
+}
+
+LONGBOW_TEST_RUNNER_SETUP(BentPipe)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_RUNNER_TEARDOWN(BentPipe)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+// ================================
+
+LONGBOW_TEST_FIXTURE(CreateDestroy)
+{
+ LONGBOW_RUN_TEST_CASE(CreateDestroy, create_destroy);
+ LONGBOW_RUN_TEST_CASE(CreateDestroy, create_start_stop_destroy);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(CreateDestroy)
+{
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(CreateDestroy)
+{
+ if (parcMemory_Outstanding() != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_CASE(CreateDestroy, create_destroy)
+{
+ BentPipeState *bp = bentpipe_Create(local_name);
+ bentpipe_Destroy(&bp);
+}
+
+LONGBOW_TEST_CASE(CreateDestroy, create_start_stop_destroy)
+{
+ BentPipeState *bp = bentpipe_Create(local_name);
+ bentpipe_Start(bp);
+ bentpipe_Stop(bp);
+ bentpipe_Destroy(&bp);
+}
+
+// ================================
+
+BentPipeState *system_bp;
+
+LONGBOW_TEST_FIXTURE(System)
+{
+ LONGBOW_RUN_TEST_CASE(System, two_connections);
+}
+
+LONGBOW_TEST_FIXTURE_SETUP(System)
+{
+ system_bp = bentpipe_Create(local_name);
+ bentpipe_Start(system_bp);
+
+ printf("%s created system_bp %p, running %d\n", __func__, (void *) system_bp, system_bp->startup_running);
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+LONGBOW_TEST_FIXTURE_TEARDOWN(System)
+{
+ printf("%s stopping system_bp %p, running %d\n", __func__, (void *) system_bp, system_bp->startup_running);
+
+ bentpipe_Stop(system_bp);
+ bentpipe_Destroy(&system_bp);
+ if (parcMemory_Outstanding() != 0) {
+ printf("('%s' leaks memory by %d (allocs - frees)) ", longBowTestCase_GetName(testCase), parcMemory_Outstanding());
+ return LONGBOW_STATUS_MEMORYLEAK;
+ }
+
+ return LONGBOW_STATUS_SUCCEEDED;
+}
+
+int
+connect_to_bentpipe(const char *pipe_name)
+{
+ int res;
+ struct sockaddr_un addr_unix;
+ int fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0) {
+ perror("socket PF_LOCAL");
+ }
+
+ assertFalse(fd < 0, "socket PF_LOCAL error");
+
+ memset(&addr_unix, 0, sizeof(addr_unix));
+ addr_unix.sun_family = AF_UNIX;
+ strcpy(addr_unix.sun_path, pipe_name);
+
+ res = connect(fd, (struct sockaddr *) &addr_unix, sizeof(addr_unix));
+ if (res < 0) {
+ perror("connect");
+ }
+ assertTrue(res == 0, "error on connect");
+ return fd;
+}
+
+#define MAXSEND 1024
+#define CONN_COUNT 3
+#define MAXPENDING 128
+
+struct sendlist {
+ uint8_t buffer[MAXSEND];
+ size_t length;
+ unsigned refcount;
+};
+
+struct fdstate {
+ int fd;
+
+ struct sendlist *expected[MAXPENDING];
+ unsigned head;
+ unsigned tail;
+ unsigned count_expected;
+
+ unsigned count_send;
+ unsigned count_recv;
+
+ // these are for the next message being read
+ size_t total_read_length;
+ size_t current_read_length;
+ uint8_t pbuff[MAXSEND + 16];
+};
+
+void
+sendbuffer(int fd, struct fdstate *state)
+{
+ struct sendlist *entry = parcMemory_AllocateAndClear(sizeof(struct sendlist));
+ assertNotNull(entry, "parcMemory_AllocateAndClear(%zu) returned NULL", sizeof(struct sendlist));
+ size_t len, total_len;
+ localhdr *hdr = (localhdr *) entry->buffer;
+ int i;
+ ssize_t res;
+
+ len = random() % (MAXSEND - sizeof(localhdr) - 1) + 1;
+ hdr->pid = getpid();
+ hdr->fd = fd;
+ hdr->length = (int) len;
+
+ total_len = len + sizeof(localhdr);
+
+ entry->length = total_len;
+ entry->refcount = 0;
+
+ for (i = 0; i < CONN_COUNT; i++) {
+ if (state[i].fd != fd) {
+ entry->refcount++;
+ state[i].expected[ state[i].tail ] = entry;
+ state[i].tail = (state[i].tail + 1) % MAXPENDING;
+ state[i].count_expected++;
+
+ assertFalse(state[i].tail == state[i].head,
+ "%s buffer wrap around on fd %d",
+ __func__,
+ state[i].fd);
+
+ if (CHATTY) {
+ printf("conn %2d added expected cnt %u length %zu\n",
+ i,
+ state[i].count_expected,
+ entry->length);
+ }
+ } else {
+ state[i].count_send++;
+
+ if (CHATTY) {
+ printf("conn %2d sent count %u\n", i, state[i].count_send);
+ }
+ }
+ }
+
+ res = write(fd, entry->buffer, entry->length);
+ assertTrue(res == entry->length, "%s write error %s\n", __func__, strerror(errno));
+}
+
+static int
+compare_sends(struct fdstate *s, uint8_t *buffer, size_t buffer_len)
+{
+ struct sendlist *expected = s->expected[ s->head ];
+ int res;
+
+ s->head = (s->head + 1) % MAXPENDING;
+
+ assertTrue(expected->length == buffer_len, "%s lengths do not match, expected %zu got %zu\n",
+ __func__, expected->length, buffer_len);
+
+ if (expected->length != buffer_len) {
+ return -1;
+ }
+
+ res = memcmp(expected->buffer, buffer, buffer_len);
+ assertTrue(res == 0, "%s buffers did not match\n", __func__);
+ if (res != 0) {
+ return -1;
+ }
+
+ assertTrue(expected->refcount > 0, "%s invalid refcount\n", __func__);
+
+ expected->refcount--;
+ if (expected->refcount == 0) {
+ memset(expected, 0, sizeof(struct sendlist));
+ parcMemory_Deallocate((void **) &expected);
+ }
+
+ return 0;
+}
+
+/*
+ * Create two connections to bent pipe and make sure they reflect
+ */
+LONGBOW_TEST_CASE(System, two_connections)
+{
+ struct fdstate state[CONN_COUNT];
+ fd_set fdset, readset;
+ int number_writes = 100;
+ int count_writes = 0;
+ int i;
+ struct timeval timeout = { 0, 10000 };
+ unsigned pending_expected = 0;
+
+ assertNotNull(system_bp, "%s running with null system_bp\n", __func__);
+
+ FD_ZERO(&fdset);
+ for (i = 0; i < CONN_COUNT; i++) {
+ memset(&state[i], 0, sizeof(struct fdstate));
+ state[i].fd = connect_to_bentpipe(local_name);
+ FD_SET(state[i].fd, &fdset);
+ }
+
+ sleep(1);
+
+ assertTrue(system_bp->conn_count == CONN_COUNT, "bp conn count wrong");
+
+ while (count_writes < number_writes || pending_expected > 0) {
+ int res;
+ memcpy(&readset, &fdset, sizeof(readset));
+
+ res = select(FD_SETSIZE, &readset, NULL, NULL, &timeout);
+ if (res < 0) {
+ perror("select");
+ abort();
+ }
+
+ if (res > 0) {
+ if (CHATTY) {
+ printf("%s got res %d\n", __func__, res);
+ }
+ for (i = 0; i < CONN_COUNT; i++) {
+ if (FD_ISSET(state[i].fd, &readset)) {
+ ssize_t res;
+ localhdr *hdr = (localhdr *) state[i].pbuff;
+
+ if (state[i].total_read_length == 0) {
+ size_t remaining = sizeof(localhdr) - state[i].current_read_length;
+ // we need to read a header
+ res = read(state[i].fd,
+ state[i].pbuff + state[i].current_read_length,
+ remaining);
+
+ assertFalse(res < 0, "%s got read error: %s", __func__, strerror(errno));
+
+ state[i].current_read_length += res;
+ if (state[i].current_read_length == sizeof(localhdr)) {
+ state[i].total_read_length = sizeof(localhdr) + hdr->length;
+
+ if (CHATTY) {
+ printf("%s conn %d fd %d set total length %zu\n",
+ __func__,
+ i,
+ state[i].fd,
+ state[i].total_read_length);
+ }
+ }
+ }
+
+ if (state[i].current_read_length < state[i].total_read_length) {
+ size_t remaining = state[i].total_read_length - state[i].current_read_length;
+ // we need to read a header
+ res = read(state[i].fd,
+ state[i].pbuff + state[i].current_read_length,
+ remaining);
+
+ assertFalse(res < 0, "%s got read error: %s", __func__, strerror(errno));
+ state[i].current_read_length += res;
+ }
+
+ if (state[i].current_read_length == state[i].total_read_length) {
+ // verify that it's the same as the top expected stack
+ res = compare_sends(&state[i], state[i].pbuff, state[i].total_read_length);
+
+ assertTrue(res == 0, "%s invalid receive compare\n", __func__);
+
+ state[i].count_recv++;
+ state[i].count_expected--;
+
+ if (CHATTY) {
+ printf("%s conn %d fd %d cnt_recv %u cnt_expected %u\n",
+ __func__,
+ i, state[i].fd, state[i].count_recv, state[i].count_expected);
+ }
+
+ // done with it
+ state[i].current_read_length = 0;
+ state[i].total_read_length = 0;
+ }
+ }
+ }
+ }
+
+ if ((random() % 4) == 0) {
+ // do a write
+ int out = random() % CONN_COUNT;
+
+ if (CHATTY) {
+ printf("%s sendbuffer for conn %d fd %d\n", __func__, out, state[out].fd);
+ }
+
+ sendbuffer(state[out].fd, state);
+ count_writes++;
+ }
+
+ pending_expected = 0;
+ for (i = 0; i < CONN_COUNT; i++) {
+ pending_expected += state[i].count_expected;
+ }
+ }
+
+ for (i = 0; i < CONN_COUNT; i++) {
+ printf("conn %2d fd %2d send %4u recv %4u\n",
+ i,
+ state[i].fd,
+ state[i].count_send,
+ state[i].count_recv);
+
+ assertTrue(state[i].count_recv == number_writes - state[i].count_send + 1,
+ "%s conn %d incorrect counts\n",
+ __func__);
+ close(state[i].fd);
+ }
+}
+
+int
+main(int argc, char *argv[])
+{
+ LongBowRunner *testRunner = LONGBOW_TEST_RUNNER_CREATE(BentPipe);
+ exit(longBowMain(argc, argv, testRunner, NULL));
+}
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.c b/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.c
new file mode 100644
index 00000000..73b321e4
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.c
@@ -0,0 +1,292 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <LongBow/runtime.h>
+#include <parc/algol/parc_Memory.h>
+
+#include "traffic_tools.h"
+#include <ccnx/common/ccnx_NameSegmentNumber.h>
+#include <ccnx/transport/transport_rta/core/rta_Component.h>
+
+#include <ccnx/common/codec/schema_v1/testdata/v1_interest_nameA.h>
+
+#include <ccnx/common/internal/ccnx_InterestDefault.h>
+
+#include <ccnx/api/control/cpi_ControlFacade.h>
+
+/**
+ * @function decode_last_component_as_segment
+ * @abstract Returns true is last name component is a segment, and returns the segment
+ * @discussion
+ * The outputSegment may be null, in which case this function is just a true/false for the
+ * last path segment being an object segment number.
+ *
+ * @param outputSegment is an output parameter of the segment number, if returns true. May be null.
+ * @return <#return#>
+ */
+bool
+trafficTools_GetObjectSegmentFromName(CCNxName *name, uint64_t *outputSegment)
+{
+ assertNotNull(name, "Name must be non-null");
+ bool success = false;
+ size_t segmentCount = ccnxName_GetSegmentCount(name);
+ if (segmentCount > 0) {
+ CCNxNameSegment *lastSegment = ccnxName_GetSegment(name, segmentCount - 1);
+ if (ccnxNameSegment_GetType(lastSegment) == CCNxNameLabelType_CHUNK) {
+ if (outputSegment) {
+ *outputSegment = ccnxNameSegmentNumber_Value(lastSegment);
+ }
+ success = true;
+ }
+ }
+ return success;
+}
+
+bool
+trafficTools_ReadAndVerifySegment(PARCEventQueue *queue, CCNxName *basename, uint64_t expected, PARCBuffer *expectedPayload)
+{
+ TransportMessage *test_tm;
+ CCNxName *test_name;
+ uint64_t segnum;
+ CCNxName *name_copy;
+
+ test_tm = rtaComponent_GetMessage(queue);
+ assertNotNull(test_tm, "got null transport message down the stack, expecting interest\n");
+
+ assertTrue(transportMessage_IsInterest(test_tm),
+ "Got wrong transport message pointer, is not an interest");
+
+ CCNxTlvDictionary *interestDictionary = transportMessage_GetDictionary(test_tm);
+ test_name = ccnxInterest_GetName(interestDictionary);
+
+ bool success = trafficTools_GetObjectSegmentFromName(test_name, &segnum);
+ assertTrue(success, "got error decoding last component as segnum: %s", ccnxName_ToString(test_name));
+ assertTrue(expected == segnum, "Got wrong segnum, expected %" PRIu64 ", got %" PRIu64 "\n",
+ expected, segnum);
+
+ name_copy = ccnxName_Copy(test_name);
+ ccnxName_Trim(name_copy, 1);
+ assertTrue(ccnxName_Compare(basename, name_copy) == 0,
+ "\nName '%s'\ndid not match\nexpected '%s'\nInterest name '%s'\n\n",
+ ccnxName_ToString(name_copy),
+ ccnxName_ToString(basename),
+ ccnxName_ToString(test_name));
+
+ ccnxName_Release(&name_copy);
+
+ if (expectedPayload != NULL) {
+ assertTrue(parcBuffer_Equals(expectedPayload, ccnxInterest_GetPayload(interestDictionary)),
+ "Expected the same Interest payload out as was sent in originally.");
+ }
+
+ transportMessage_Destroy(&test_tm);
+
+ return true;
+}
+
+CCNxContentObject *
+trafficTools_CreateSignedContentObject()
+{
+ CCNxName *name = ccnxName_CreateFromCString("lci:/hello/dolly");
+ PARCBuffer *payload = parcBuffer_WrapCString("hello");
+
+ CCNxContentObject *result = ccnxContentObject_CreateWithNameAndPayload(name, payload);
+
+ PARCBuffer *keyId = parcBuffer_WrapCString("keyhash");
+ PARCBuffer *sigbits = parcBuffer_WrapCString("siggybits");
+
+ PARCSignature *signature = parcSignature_Create(PARCSigningAlgorithm_RSA, PARCCryptoHashType_SHA256, sigbits);
+ parcBuffer_Release(&sigbits);
+
+ ccnxContentObject_SetSignature(result, keyId, signature, NULL);
+
+ parcBuffer_Release(&payload);
+ parcBuffer_Release(&keyId);
+ parcSignature_Release(&signature);
+ ccnxName_Release(&name);
+
+ return result;
+}
+
+CCNxContentObject *
+trafficTools_CreateContentObjectWithPayload(PARCBuffer *contents)
+{
+ CCNxName *name = ccnxName_CreateFromCString("lci:/hello/dolly");
+
+ CCNxContentObject *result = ccnxContentObject_CreateWithNameAndPayload(name, contents);
+
+ ccnxName_Release(&name);
+
+ return result;
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithSignedContentObject(RtaConnection *connection)
+{
+ CCNxContentObject *unsignedObject = trafficTools_CreateSignedContentObject();
+ CCNxMetaMessage *message = ccnxMetaMessage_CreateFromContentObject(unsignedObject);
+ TransportMessage *tm = transportMessage_CreateFromDictionary(message);
+
+ transportMessage_SetInfo(tm, connection, NULL);
+
+ ccnxContentObject_Release(&unsignedObject);
+ ccnxMetaMessage_Release(&message);
+
+ return tm;
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithSignedContentObjectWithName(RtaConnection *connection, CCNxName *name, const char *keystorePath, const char *keystorePassword)
+{
+ PARCBuffer *payload = parcBuffer_WrapCString("hello");
+
+ CCNxContentObject *contentObject = ccnxContentObject_CreateWithNameAndPayload(name, payload);
+ PARCBuffer *keyId = parcBuffer_WrapCString("hash of key");
+ PARCBuffer *sigbits = parcBuffer_WrapCString("sig bits");
+ PARCSignature *signature = parcSignature_Create(PARCSigningAlgorithm_RSA, PARCCryptoHashType_SHA256, sigbits);
+ parcBuffer_Release(&sigbits);
+
+ ccnxContentObject_SetSignature(contentObject, keyId, signature, NULL);
+
+ CCNxMetaMessage *message = ccnxMetaMessage_CreateFromContentObject(contentObject);
+ TransportMessage *tm = transportMessage_CreateFromDictionary(message);
+ transportMessage_SetInfo(tm, connection, NULL);
+
+ ccnxMetaMessage_Release(&message);
+ ccnxContentObject_Release(&contentObject);
+ parcSignature_Release(&signature);
+ parcBuffer_Release(&keyId);
+ return tm;
+}
+
+CCNxInterest *
+trafficTools_CreateInterest(void)
+{
+ CCNxName *name = ccnxName_CreateFromCString("lci:/there/were/bells/on/the/hill");
+ CCNxInterest *interest = ccnxInterest_CreateSimple(name);
+ ccnxName_Release(&name);
+
+ return interest;
+}
+
+CCNxTlvDictionary *
+trafficTools_CreateDictionaryInterest(void)
+{
+ CCNxName *name = ccnxName_CreateFromCString("lci:/there/were/bells/on/the/hill");
+ CCNxTlvDictionary *interest = ccnxInterest_CreateSimple(name);
+ ccnxName_Release(&name);
+
+ return interest;
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithInterest(RtaConnection *connection)
+{
+ return trafficTools_CreateTransportMessageWithDictionaryInterest(connection, CCNxTlvDictionary_SchemaVersion_V1);
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithControlMessage(RtaConnection *connection)
+{
+ return trafficTools_CreateTransportMessageWithDictionaryControl(connection, CCNxTlvDictionary_SchemaVersion_V1);
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithRaw(RtaConnection *connection)
+{
+ return trafficTools_CreateTransportMessageWithDictionaryRaw(connection, CCNxTlvDictionary_SchemaVersion_V1);
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithDictionaryInterest(RtaConnection *connection, CCNxTlvDictionary_SchemaVersion schema)
+{
+ CCNxTlvDictionary *interest;
+ CCNxName *name = ccnxName_CreateFromCString("lci:/lost/in/space");
+
+ CCNxInterestInterface *impl = NULL;
+
+ switch (schema) {
+ case CCNxTlvDictionary_SchemaVersion_V1:
+ impl = &CCNxInterestFacadeV1_Implementation;
+ break;
+
+ default:
+ trapIllegalValue(schema, "Unsupported schema version");
+ }
+
+ // impl should be set if we get here.
+ interest = ccnxInterest_CreateWithImpl(impl,
+ name,
+ CCNxInterestDefault_LifetimeMilliseconds,
+ NULL,
+ NULL,
+ CCNxInterestDefault_HopLimit);
+
+
+ TransportMessage *tm = transportMessage_CreateFromDictionary(interest);
+ ccnxTlvDictionary_Release(&interest);
+
+ transportMessage_SetInfo(tm, connection, NULL);
+ ccnxName_Release(&name);
+ return tm;
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithDictionaryRaw(RtaConnection *connection, unsigned schema)
+{
+ PARCBuffer *buffer = parcBuffer_Allocate(sizeof(v1_interest_nameA));
+ parcBuffer_PutArray(buffer, sizeof(v1_interest_nameA), v1_interest_nameA);
+ parcBuffer_Flip(buffer);
+ CCNxTlvDictionary *wireformat = ccnxWireFormatMessage_FromInterestPacketType(schema, buffer);
+
+ TransportMessage *tm = transportMessage_CreateFromDictionary(wireformat);
+ ccnxTlvDictionary_Release(&wireformat);
+ parcBuffer_Release(&buffer);
+
+ transportMessage_SetInfo(tm, connection, NULL);
+ return tm;
+}
+
+TransportMessage *
+trafficTools_CreateTransportMessageWithDictionaryControl(RtaConnection *connection, unsigned schema)
+{
+ char *jsonstring = "{\"CPI_REQUEST\":{\"SEQUENCE\":22,\"REGISTER\":{\"PREFIX\":\"lci:/howdie/stranger\",\"INTERFACE\":55,\"FLAGS\":0,\"PROTOCOL\":\"STATIC\",\"ROUTETYPE\":\"LONGEST\",\"COST\":200}}}";
+
+ PARCJSON *json = parcJSON_ParseString(jsonstring);
+ CCNxTlvDictionary *control = NULL;
+
+ switch (schema) {
+ case 1:
+ control = ccnxControlFacade_CreateCPI(json);
+
+ break;
+
+ default:
+ break;
+ }
+
+
+ TransportMessage *tm = transportMessage_CreateFromDictionary(control);
+ transportMessage_SetInfo(tm, connection, NULL);
+
+ parcJSON_Release(&json);
+ ccnxTlvDictionary_Release(&control);
+
+ return tm;
+}
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.h b/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.h
new file mode 100644
index 00000000..09a8caf9
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/traffic_tools.h
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file traffic_tools.h
+ * @brief <#Brief Description#>
+ *
+ * <#Detailed Description#>
+ *
+ */
+#ifndef Libccnx_traffic_generators_h
+#define Libccnx_traffic_generators_h
+
+#include <ccnx/transport/transport_rta/core/rta_Connection.h>
+#include <ccnx/transport/transport_rta/core/rta_ComponentQueue.h>
+
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <ccnx/common/ccnx_Interest.h>
+
+#include <ccnx/common/internal/ccnx_TlvDictionary.h>
+
+/**
+ * <#One Line Description#>
+ *
+ * Read the provided queue and verify the received message (Interest or Object)
+ * has the given basename (not including segment) and the provided segment number
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+bool trafficTools_ReadAndVerifySegment(PARCEventQueue *queue, CCNxName *basename,
+ uint64_t expectedSegnum, PARCBuffer *expectedPayload);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+bool trafficTools_GetObjectSegmentFromName(CCNxName *name, uint64_t *outputSegment);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+CCNxContentObject *trafficTools_CreateSignedContentObject();
+
+/**
+ * Create a `CCNxUnsignedContentObject` with the given payload.
+ *
+ * The payload is contained within the given `PARCBuffer` from its position to its limit.
+ *
+ * @param [in] payload <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * {
+ * <#example#>
+ * }
+ * @endcode
+ *
+ * @see PARCBuffer
+ */
+CCNxContentObject *trafficTools_CreateContentObjectWithPayload(PARCBuffer *payload);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+CCNxInterest *trafficTools_CreateInterest(void);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithControlMessage(RtaConnection *connection);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithInterest(RtaConnection *connection);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithSignedContentObject(RtaConnection *connection);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithSignedContentObjectWithName(RtaConnection *connection,
+ CCNxName *name, const char *keystorePath, const char *keystorePassword);
+
+/**
+ * <#One Line Description#>
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ *
+ * @see <#references#>
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithRaw(RtaConnection *connection);
+
+
+/**
+ * Creates a Dictionary format RAW message
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithDictionaryRaw(RtaConnection *connection, unsigned schema);
+
+/**
+ * Creates a dictionary format Interest message
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithDictionaryInterest(RtaConnection *connection, unsigned schema);
+
+/**
+ * Creates a dictioary format Control message
+ *
+ * <#Paragraphs Of Explanation#>
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+TransportMessage *trafficTools_CreateTransportMessageWithDictionaryControl(RtaConnection *connection, unsigned schema);
+
+/**
+ * Creates an interest in Dictionary format
+ *
+ * Does not have a wire format
+ *
+ * @param [<#in out in,out#>] <#name#> <#description#>
+ *
+ * @return <#value#> <#explanation#>
+ *
+ * Example:
+ * @code
+ * <#example#>
+ * @endcode
+ */
+CCNxTlvDictionary *trafficTools_CreateDictionaryInterest(void);
+#endif // Libccnx_traffic_generators_h
diff --git a/libccnx-transport-rta/ccnx/transport/test_tools/write_packets.c b/libccnx-transport-rta/ccnx/transport/test_tools/write_packets.c
new file mode 100644
index 00000000..16c3e5ff
--- /dev/null
+++ b/libccnx-transport-rta/ccnx/transport/test_tools/write_packets.c
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility function to write the test data to data files. The data files will be in a format
+ * you can use to import with "text2pcap". For example:
+ *
+ * text2pcap -u 9695,9695 file
+ *
+ * would add a fake UPD/IP/Ethernet header with UDP ports 9695 for source and destination
+ *
+ */
+
+#include <stdio.h>
+
+#include <ccnx/common/codec/schema_v0/testdata/testrig_truthSet.h>
+#include <ccnx/common/codec/schema_v1/testdata/v1_testrig_truthSet.h>
+
+/*
+ * typedef struct testrig_truth_table
+ * {
+ * const char * testname;
+ * uint8_t * packet;
+ * size_t length;
+ *
+ * TlvErrorCodes expectedError;
+ *
+ * // the array is terminated by a T_INVALID value
+ * // for "arrayIndexOrTypeKey"
+ * TruthTableEntry * entry;
+ * } TruthTable;
+ *
+ */
+
+static void
+writePacket(TruthTable *table)
+{
+ char filename[1024];
+ snprintf(filename, 1024, "%s.txt", table->testname);
+ FILE *fh = fopen(filename, "w+");
+ printf("name %s\n", filename);
+
+ int linewidth = 8;
+ for (int i = 0; i < table->length; i++) {
+ if ((i % linewidth) == 0) {
+ fprintf(fh, "\n%06X ", i);
+ }
+ fprintf(fh, "%02X ", table->packet[i]);
+ }
+ fprintf(fh, "\n");
+ fclose(fh);
+}
+
+static void
+loopTruthTable(TruthTable truthset[])
+{
+ for (int i = 0; truthset[i].packet != NULL; i++) {
+ writePacket(&truthset[i]);
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ loopTruthTable(interests_truthSet);
+ loopTruthTable(contentObject_truthSet);
+ loopTruthTable(cpi_truthSet);
+
+ loopTruthTable(v1_interests_truthSet);
+ loopTruthTable(v1_contentObject_truthSet);
+ loopTruthTable(v1_cpi_truthSet);
+}