summaryrefslogtreecommitdiffstats
path: root/extras/libmemif/src/socket.c
diff options
context:
space:
mode:
authorJakub Grajciar <jgrajcia@cisco.com>2021-01-04 11:10:42 +0100
committerDamjan Marion <dmarion@me.com>2021-09-27 14:35:45 +0000
commit134f1e02251b27ad73c3ff4591954c2e0919c984 (patch)
tree30e84c2bea30ad760a6629a7b1d5843bdd7f8a7c /extras/libmemif/src/socket.c
parent45cf1fc3f58ee465d2e7e4689158e79fd706658e (diff)
libmemif: refactor connection establishment
per_thread_ namespace fuctionality replaced by memif socket. Interfaces are grouped by memif socket which holds interface database. Each thread can create it's unique memif socket. The path name can be equal across threads so that the app only uses one UNIX socket. In case of listener socket, listener fd can be obtained and set using APIs. This change allows: - No lookup on file descriptor events - improves interrupt handling - Loopback support (connect two interfaces in one app) - usefull for debugging and testing - Improves code readability by providing control channel abstraction for each interface and listener sockets Type: refactor Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com> Change-Id: I1b8261042431c0376646ab4c4c831f6e59dd3eed
Diffstat (limited to 'extras/libmemif/src/socket.c')
-rw-r--r--extras/libmemif/src/socket.c818
1 files changed, 383 insertions, 435 deletions
diff --git a/extras/libmemif/src/socket.c b/extras/libmemif/src/socket.c
index b801cac75ba..a9db7705fe9 100644
--- a/extras/libmemif/src/socket.c
+++ b/extras/libmemif/src/socket.c
@@ -35,20 +35,27 @@
#include <memif_private.h>
/* sends msg to socket */
-static_fn int
-memif_msg_send (int fd, memif_msg_t * msg, int afd)
+static int
+memif_msg_send_from_queue (memif_control_channel_t *cc)
{
struct msghdr mh = { 0 };
struct iovec iov[1];
char ctl[CMSG_SPACE (sizeof (int))];
int rv, err = MEMIF_ERR_SUCCESS; /* 0 */
+ memif_msg_queue_elt_t *e;
- iov[0].iov_base = (void *) msg;
+ /* Pick the first message */
+ e = TAILQ_FIRST (&cc->msg_queue);
+ if (e == NULL)
+ return MEMIF_ERR_SUCCESS;
+
+ /* Construct the message */
+ iov[0].iov_base = (void *) &e->msg;
iov[0].iov_len = sizeof (memif_msg_t);
mh.msg_iov = iov;
mh.msg_iovlen = 1;
- if (afd > 0)
+ if (e->fd > 0)
{
struct cmsghdr *cmsg;
memset (&ctl, 0, sizeof (ctl));
@@ -58,52 +65,50 @@ memif_msg_send (int fd, memif_msg_t * msg, int afd)
cmsg->cmsg_len = CMSG_LEN (sizeof (int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
- memcpy (CMSG_DATA (cmsg), &afd, sizeof (int));
+ memcpy (CMSG_DATA (cmsg), &e->fd, sizeof (int));
}
- rv = sendmsg (fd, &mh, 0);
+ rv = sendmsg (cc->fd, &mh, 0);
if (rv < 0)
err = memif_syscall_error_handler (errno);
- DBG ("Message type %u sent", msg->type);
+ DBG ("Message type %u sent", e->msg.type);
+
+ /* If sent successfully, remove the msg from queue */
+ if (err == MEMIF_ERR_SUCCESS)
+ {
+ TAILQ_REMOVE (&cc->msg_queue, e, next);
+ cc->sock->args.free (e);
+ }
+
return err;
}
-/* response from memif master - master is ready to handle next message */
-static_fn int
-memif_msg_enq_ack (memif_connection_t * c)
+static memif_msg_queue_elt_t *
+memif_msg_enq (memif_control_channel_t *cc)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
+ memif_msg_queue_elt_t *e;
+
+ e = cc->sock->args.alloc (sizeof (*e));
if (e == NULL)
- return memif_syscall_error_handler (errno);
+ return NULL;
- memset (&e->msg, 0, sizeof (e->msg));
- e->msg.type = MEMIF_MSG_TYPE_ACK;
e->fd = -1;
+ TAILQ_INSERT_TAIL (&cc->msg_queue, e, next);
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
-
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
-
- return MEMIF_ERR_SUCCESS; /* 0 */
+ return e;
}
-static_fn int
-memif_msg_send_hello (libmemif_main_t * lm, int fd)
+static int
+memif_msg_enq_hello (memif_control_channel_t *cc)
{
- memif_msg_t msg = { 0 };
- memif_msg_hello_t *h = &msg.hello;
- msg.type = MEMIF_MSG_TYPE_HELLO;
+ memif_msg_hello_t *h;
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
+ if (e == NULL)
+ return MEMIF_ERR_NOMEM;
+
+ e->msg.type = MEMIF_MSG_TYPE_HELLO;
+
+ h = &e->msg.hello;
h->min_version = MEMIF_VERSION;
h->max_version = MEMIF_VERSION;
h->max_s2m_ring = MEMIF_MAX_S2M_RING;
@@ -111,67 +116,63 @@ memif_msg_send_hello (libmemif_main_t * lm, int fd)
h->max_region = MEMIF_MAX_REGION;
h->max_log2_ring_size = MEMIF_MAX_LOG2_RING_SIZE;
- strlcpy ((char *) h->name, (char *) lm->app_name, sizeof (h->name));
+ strlcpy ((char *) h->name, (char *) cc->sock->args.app_name,
+ sizeof (h->name));
- /* msg hello is not enqueued but sent directly,
- because it is the first msg to be sent */
- return memif_msg_send (fd, &msg, -1);
+ return MEMIF_ERR_SUCCESS;
+}
+
+/* response from memif master - master is ready to handle next message */
+static int
+memif_msg_enq_ack (memif_control_channel_t *cc)
+{
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
+ if (e == NULL)
+ return MEMIF_ERR_NOMEM;
+
+ e->msg.type = MEMIF_MSG_TYPE_ACK;
+ e->fd = -1;
+
+ return MEMIF_ERR_SUCCESS; /* 0 */
}
/* send id and secret (optional) for interface identification */
-static_fn int
-memif_msg_enq_init (memif_connection_t * c)
+static int
+memif_msg_enq_init (memif_control_channel_t *cc)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
if (e == NULL)
- return memif_syscall_error_handler (errno);
- memset (e, 0, sizeof (memif_msg_queue_elt_t));
+ return MEMIF_ERR_NOMEM;
- memset (&e->msg, 0, sizeof (e->msg));
memif_msg_init_t *i = &e->msg.init;
e->msg.type = MEMIF_MSG_TYPE_INIT;
e->fd = -1;
i->version = MEMIF_VERSION;
- i->id = c->args.interface_id;
- i->mode = c->args.mode;
-
- strlcpy ((char *) i->name, (char *) lm->app_name, sizeof (i->name));
- if (strlen ((char *) c->args.secret) > 0)
- strncpy ((char *) i->secret, (char *) c->args.secret, sizeof (i->secret));
-
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
+ i->id = cc->conn->args.interface_id;
+ i->mode = cc->conn->args.mode;
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
+ strlcpy ((char *) i->name, (char *) cc->sock->args.app_name,
+ sizeof (i->name));
+ if (strlen ((char *) cc->conn->args.secret) > 0)
+ strlcpy ((char *) i->secret, (char *) cc->conn->args.secret,
+ sizeof (i->secret));
return MEMIF_ERR_SUCCESS; /* 0 */
}
/* send information about region specified by region_index */
-static_fn int
-memif_msg_enq_add_region (memif_connection_t * c, uint8_t region_index)
+static int
+memif_msg_enq_add_region (memif_control_channel_t *cc, uint8_t region_index)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_region_t *mr = &c->regions[region_index];
+ memif_region_t *mr = &cc->conn->regions[region_index];
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
if (e == NULL)
- return memif_syscall_error_handler (errno);
+ return MEMIF_ERR_NOMEM;
- memset (&e->msg, 0, sizeof (e->msg));
memif_msg_add_region_t *ar = &e->msg.add_region;
e->msg.type = MEMIF_MSG_TYPE_ADD_REGION;
@@ -179,34 +180,19 @@ memif_msg_enq_add_region (memif_connection_t * c, uint8_t region_index)
ar->index = region_index;
ar->size = mr->region_size;
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
-
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
-
return MEMIF_ERR_SUCCESS; /* 0 */
}
/* send information about ring specified by direction (S2M | M2S) and index */
-static_fn int
-memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir)
+static int
+memif_msg_enq_add_ring (memif_control_channel_t *cc, uint8_t index,
+ uint8_t dir)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
if (e == NULL)
- return memif_syscall_error_handler (errno);
+ return MEMIF_ERR_NOMEM;
- memset (&e->msg, 0, sizeof (e->msg));
memif_msg_add_ring_t *ar = &e->msg.add_ring;
e->msg.type = MEMIF_MSG_TYPE_ADD_RING;
@@ -214,9 +200,9 @@ memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir)
/* TODO: support multiple rings */
memif_queue_t *mq;
if (dir == MEMIF_RING_M2S)
- mq = &c->rx_queues[index];
+ mq = &cc->conn->rx_queues[index];
else
- mq = &c->tx_queues[index];
+ mq = &cc->conn->tx_queues[index];
e->fd = mq->int_fd;
ar->index = index;
@@ -226,119 +212,81 @@ memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir)
ar->flags = (dir == MEMIF_RING_S2M) ? MEMIF_MSG_ADD_RING_FLAG_S2M : 0;
ar->private_hdr_size = 0;
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
-
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
-
return MEMIF_ERR_SUCCESS; /* 0 */
}
/* used as connection request from slave */
-static_fn int
-memif_msg_enq_connect (memif_connection_t * c)
+static int
+memif_msg_enq_connect (memif_control_channel_t *cc)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
if (e == NULL)
- return memif_syscall_error_handler (errno);
+ return MEMIF_ERR_NOMEM;
- memset (&e->msg, 0, sizeof (e->msg));
memif_msg_connect_t *cm = &e->msg.connect;
e->msg.type = MEMIF_MSG_TYPE_CONNECT;
e->fd = -1;
- strlcpy ((char *) cm->if_name, (char *) c->args.interface_name,
+ strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name,
sizeof (cm->if_name));
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
-
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
-
return MEMIF_ERR_SUCCESS; /* 0 */
}
/* used as confirmation of connection by master */
-static_fn int
-memif_msg_enq_connected (memif_connection_t * c)
+static int
+memif_msg_enq_connected (memif_control_channel_t *cc)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_msg_queue_elt_t *e =
- (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t));
+ memif_msg_queue_elt_t *e = memif_msg_enq (cc);
+
if (e == NULL)
- return memif_syscall_error_handler (errno);
+ return MEMIF_ERR_NOMEM;
- memset (&e->msg, 0, sizeof (e->msg));
memif_msg_connected_t *cm = &e->msg.connected;
e->msg.type = MEMIF_MSG_TYPE_CONNECTED;
e->fd = -1;
- strlcpy ((char *) cm->if_name, (char *) c->args.interface_name,
+ strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name,
sizeof (cm->if_name));
- e->next = NULL;
- if (c->msg_queue == NULL)
- {
- c->msg_queue = e;
- return MEMIF_ERR_SUCCESS; /* 0 */
- }
-
- memif_msg_queue_elt_t *cur = c->msg_queue;
- while (cur->next != NULL)
- {
- cur = cur->next;
- }
- cur->next = e;
-
return MEMIF_ERR_SUCCESS; /* 0 */
}
-/* immediately send disconnect msg */
- /* specify protocol for disconnect msg err_code
- so that it will be compatible with VPP? (header/doc) */
int
-memif_msg_send_disconnect (int fd, uint8_t * err_string, uint32_t err_code)
+memif_msg_enq_disconnect (memif_control_channel_t *cc, uint8_t *err_string,
+ uint32_t err_code)
{
- memif_msg_t msg = { 0 };
- memif_msg_disconnect_t *d = &msg.disconnect;
+ memif_msg_queue_elt_t *e;
+
+ e = cc->sock->args.alloc (sizeof (*e));
+ if (e == NULL)
+ return MEMIF_ERR_NOMEM;
- msg.type = MEMIF_MSG_TYPE_DISCONNECT;
+ e->fd = -1;
+ /* Insert disconenct message at the top of the msg queue */
+ TAILQ_INSERT_HEAD (&cc->msg_queue, e, next);
+
+ memif_msg_disconnect_t *d = &e->msg.disconnect;
+
+ e->msg.type = MEMIF_MSG_TYPE_DISCONNECT;
d->code = err_code;
- uint16_t l = strlen ((char *) err_string);
- if (l > sizeof (d->string) - 1)
+ uint16_t l = sizeof (d->string);
+ if (l > 96)
{
DBG ("Disconnect string too long. Sending the first %d characters.",
sizeof (d->string) - 1);
}
strlcpy ((char *) d->string, (char *) err_string, sizeof (d->string));
- return memif_msg_send (fd, &msg, -1);
+ return MEMIF_ERR_SUCCESS;
}
-static_fn int
-memif_msg_receive_hello (memif_connection_t * c, memif_msg_t * msg)
+static int
+memif_msg_parse_hello (memif_control_channel_t *cc, memif_msg_t *msg)
{
memif_msg_hello_t *h = &msg->hello;
+ memif_connection_t *c = cc->conn;
if (msg->hello.min_version > MEMIF_VERSION ||
msg->hello.max_version < MEMIF_VERSION)
@@ -360,139 +308,73 @@ memif_msg_receive_hello (memif_connection_t * c, memif_msg_t * msg)
}
/* handle interface identification (id, secret (optional)) */
-static_fn int
-memif_msg_receive_init (memif_socket_t * ms, int fd, memif_msg_t * msg)
+static int
+memif_msg_parse_init (memif_control_channel_t *cc, memif_msg_t *msg)
{
memif_msg_init_t *i = &msg->init;
- memif_list_elt_t *elt = NULL;
- memif_list_elt_t elt2;
memif_connection_t *c = NULL;
- libmemif_main_t *lm = get_libmemif_main (ms);
uint8_t err_string[96];
memset (err_string, 0, sizeof (char) * 96);
int err = MEMIF_ERR_SUCCESS; /* 0 */
+ /* Check compatible meimf version */
if (i->version != MEMIF_VERSION)
{
DBG ("MEMIF_VER_ERR");
- strncpy ((char *) err_string, MEMIF_VER_ERR, strlen (MEMIF_VER_ERR));
- err = MEMIF_ERR_PROTO;
- goto error;
- }
-
- get_list_elt (&elt, ms->interface_list, ms->interface_list_len, i->id);
- if (elt == NULL)
- {
- DBG ("MEMIF_ID_ERR");
- strncpy ((char *) err_string, MEMIF_ID_ERR, strlen (MEMIF_ID_ERR));
- err = MEMIF_ERR_ID;
- goto error;
- }
-
- c = (memif_connection_t *) elt->data_struct;
-
- if (!(c->args.is_master))
- {
- DBG ("MEMIF_SLAVE_ERR");
- strncpy ((char *) err_string, MEMIF_SLAVE_ERR,
- strlen (MEMIF_SLAVE_ERR));
- err = MEMIF_ERR_ACCSLAVE;
- goto error;
- }
- if (c->fd != -1)
- {
- DBG ("MEMIF_CONN_ERR");
- strncpy ((char *) err_string, MEMIF_CONN_ERR, strlen (MEMIF_CONN_ERR));
- err = MEMIF_ERR_ALRCONN;
- goto error;
- }
-
- c->fd = fd;
-
- if (i->mode != c->args.mode)
- {
- DBG ("MEMIF_MODE_ERR");
- strncpy ((char *) err_string, MEMIF_MODE_ERR, strlen (MEMIF_MODE_ERR));
- err = MEMIF_ERR_MODE;
- goto error;
- }
-
- strlcpy ((char *) c->remote_name, (char *) i->name, sizeof (c->remote_name));
-
- if (strlen ((char *) c->args.secret) > 0)
- {
- int r;
- if (strlen ((char *) i->secret) > 0)
- {
- if (strlen ((char *) c->args.secret) != strlen ((char *) i->secret))
- {
- DBG ("MEMIF_SECRET_ERR");
- strncpy ((char *) err_string,
- MEMIF_SECRET_ERR, strlen (MEMIF_SECRET_ERR));
- err = MEMIF_ERR_SECRET;
- goto error;
- }
- r = strncmp ((char *) i->secret, (char *) c->args.secret,
- strlen ((char *) c->args.secret));
- if (r != 0)
- {
- DBG ("MEMIF_SECRET_ERR");
- strncpy ((char *) err_string,
- MEMIF_SECRET_ERR, strlen (MEMIF_SECRET_ERR));
- err = MEMIF_ERR_SECRET;
- goto error;
- }
- }
- else
- {
- DBG ("MEMIF_NOSECRET_ERR");
- strncpy ((char *) err_string,
- MEMIF_NOSECRET_ERR, strlen (MEMIF_NOSECRET_ERR));
- err = MEMIF_ERR_NOSECRET;
- goto error;
- }
+ memif_msg_enq_disconnect (cc, MEMIF_VER_ERR, 0);
+ return MEMIF_ERR_PROTO;
}
- c->read_fn = memif_conn_fd_read_ready;
- c->write_fn = memif_conn_fd_write_ready;
- c->error_fn = memif_conn_fd_error;
+ /* Find endpoint on the socket */
+ TAILQ_FOREACH (c, &cc->sock->master_interfaces, next)
+ {
+ /* Match interface id */
+ if (c->args.interface_id != i->id)
+ continue;
+ /* If control channel is present, interface is connected (or connecting) */
+ if (c->control_channel != NULL)
+ {
+ memif_msg_enq_disconnect (cc, "Already connected", 0);
+ return MEMIF_ERR_ALRCONN;
+ }
+ /* Verify secret */
+ if (c->args.secret[0] != '\0')
+ {
+ if (strncmp ((char *) c->args.secret, (char *) i->secret, 24) != 0)
+ {
+ memif_msg_enq_disconnect (cc, "Incorrect secret", 0);
+ return MEMIF_ERR_SECRET;
+ }
+ }
+
+ /* Assign the control channel to this interface */
+ c->control_channel = cc;
+ cc->conn = c;
+
+ strlcpy ((char *) c->remote_name, (char *) i->name,
+ sizeof (c->remote_name));
+ }
- elt2.key = c->fd;
- elt2.data_struct = c;
-
- add_list_elt (lm, &elt2, &lm->control_list, &lm->control_list_len);
- free_list_elt (lm->pending_list, lm->pending_list_len, fd);
-
- return err;
-
-error:
- memif_msg_send_disconnect (fd, err_string, 0);
- lm->control_fd_update (fd, MEMIF_FD_EVENT_DEL, lm->private_ctx);
- free_list_elt (lm->pending_list, lm->pending_list_len, fd);
- close (fd);
- fd = -1;
return err;
}
/* receive region information and add new region to connection (if possible) */
-static_fn int
-memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg,
- int fd)
+static int
+memif_msg_parse_add_region (memif_control_channel_t *cc, memif_msg_t *msg,
+ int fd)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
-
memif_msg_add_region_t *ar = &msg->add_region;
memif_region_t *mr;
+ memif_connection_t *c = cc->conn;
+
if (fd < 0)
return MEMIF_ERR_NO_SHMFD;
if (ar->index > MEMIF_MAX_REGION)
return MEMIF_ERR_MAXREG;
- mr =
- (memif_region_t *) lm->realloc (c->regions,
- sizeof (memif_region_t) *
- (++c->regions_num));
+ mr = (memif_region_t *) cc->sock->args.realloc (
+ c->regions, sizeof (memif_region_t) * (++c->regions_num));
if (mr == NULL)
return memif_syscall_error_handler (errno);
memset (mr + ar->index, 0, sizeof (memif_region_t));
@@ -500,9 +382,8 @@ memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg,
c->regions[ar->index].fd = fd;
c->regions[ar->index].region_size = ar->size;
c->regions[ar->index].addr = NULL;
-
/* region 0 is never external */
- if (lm->get_external_region_addr && (ar->index != 0))
+ if (cc->sock->get_external_region_addr && (ar->index != 0))
c->regions[ar->index].is_external = 1;
return MEMIF_ERR_SUCCESS; /* 0 */
@@ -510,12 +391,12 @@ memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg,
/* receive ring information and add new ring to connection queue
(based on direction S2M | M2S) */
-static_fn int
-memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd)
+static int
+memif_msg_parse_add_ring (memif_control_channel_t *cc, memif_msg_t *msg,
+ int fd)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
-
memif_msg_add_ring_t *ar = &msg->add_ring;
+ memif_connection_t *c = cc->conn;
memif_queue_t *mq;
@@ -532,10 +413,8 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd)
if (ar->index >= c->args.num_s2m_rings)
return MEMIF_ERR_MAXRING;
- mq =
- (memif_queue_t *) lm->realloc (c->rx_queues,
- sizeof (memif_queue_t) *
- (++c->rx_queues_num));
+ mq = (memif_queue_t *) cc->sock->args.realloc (
+ c->rx_queues, sizeof (memif_queue_t) * (++c->rx_queues_num));
memset (mq + ar->index, 0, sizeof (memif_queue_t));
if (mq == NULL)
return memif_syscall_error_handler (errno);
@@ -553,10 +432,8 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd)
if (ar->index >= c->args.num_m2s_rings)
return MEMIF_ERR_MAXRING;
- mq =
- (memif_queue_t *) lm->realloc (c->tx_queues,
- sizeof (memif_queue_t) *
- (++c->tx_queues_num));
+ mq = (memif_queue_t *) cc->sock->args.realloc (
+ c->tx_queues, sizeof (memif_queue_t) * (++c->tx_queues_num));
memset (mq + ar->index, 0, sizeof (memif_queue_t));
if (mq == NULL)
return memif_syscall_error_handler (errno);
@@ -571,15 +448,65 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd)
return MEMIF_ERR_SUCCESS; /* 0 */
}
+static int
+memif_configure_rx_interrupt (memif_connection_t *c)
+{
+ memif_socket_t *ms = (memif_socket_t *) c->args.socket;
+ memif_interrupt_t *idata;
+ memif_fd_event_t fde;
+ memif_fd_event_data_t *fdata;
+ void *ctx;
+ int i;
+
+ if (c->on_interrupt != NULL)
+ {
+ for (i = 0; i < c->run_args.num_m2s_rings; i++)
+ {
+ /* Allocate fd event data */
+ fdata = ms->args.alloc (sizeof (*fdata));
+ if (fdata == NULL)
+ {
+ memif_msg_enq_disconnect (c->control_channel, "Internal error",
+ 0);
+ return MEMIF_ERR_NOMEM;
+ }
+ /* Allocate interrupt data */
+ idata = ms->args.alloc (sizeof (*fdata));
+ if (idata == NULL)
+ {
+ ms->args.free (fdata);
+ memif_msg_enq_disconnect (c->control_channel, "Internal error",
+ 0);
+ return MEMIF_ERR_NOMEM;
+ }
+
+ /* configure interrupt data */
+ idata->c = c;
+ idata->qid = i;
+ /* configure fd event data */
+ fdata->event_handler = memif_interrupt_handler;
+ fdata->private_ctx = idata;
+ fde.fd = c->rx_queues[i].int_fd;
+ fde.type = MEMIF_FD_EVENT_READ;
+ fde.private_ctx = fdata;
+
+ /* Start listening for events */
+ ctx = ms->epfd != -1 ? ms : ms->private_ctx;
+ ms->args.on_control_fd_update (fde, ctx);
+ }
+ }
+
+ return MEMIF_ERR_SUCCESS;
+}
+
/* slave -> master */
-static_fn int
-memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg)
+static int
+memif_msg_parse_connect (memif_control_channel_t *cc, memif_msg_t *msg)
{
memif_msg_connect_t *cm = &msg->connect;
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- memif_list_elt_t elt;
-
+ memif_connection_t *c = cc->conn;
int err;
+
err = memif_connect1 (c);
if (err != MEMIF_ERR_SUCCESS)
return err;
@@ -587,21 +514,9 @@ memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg)
strlcpy ((char *) c->remote_if_name, (char *) cm->if_name,
sizeof (c->remote_if_name));
- int i;
- if (c->on_interrupt != NULL)
- {
- for (i = 0; i < c->run_args.num_m2s_rings; i++)
- {
- elt.key = c->rx_queues[i].int_fd;
- elt.data_struct = c;
- add_list_elt (lm, &elt, &lm->interrupt_list,
- &lm->interrupt_list_len);
-
- lm->control_fd_update (c->rx_queues[i].int_fd, MEMIF_FD_EVENT_READ,
- lm->private_ctx);
- }
-
- }
+ err = memif_configure_rx_interrupt (c);
+ if (err != MEMIF_ERR_SUCCESS)
+ return err;
c->on_connect ((void *) c, c->private_ctx);
@@ -609,43 +524,38 @@ memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg)
}
/* master -> slave */
-static_fn int
-memif_msg_receive_connected (memif_connection_t * c, memif_msg_t * msg)
+static int
+memif_msg_parse_connected (memif_control_channel_t *cc, memif_msg_t *msg)
{
memif_msg_connect_t *cm = &msg->connect;
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
+ memif_connection_t *c = cc->conn;
int err;
err = memif_connect1 (c);
if (err != MEMIF_ERR_SUCCESS)
return err;
- strncpy ((char *) c->remote_if_name, (char *) cm->if_name,
+ strlcpy ((char *) c->remote_if_name, (char *) cm->if_name,
sizeof (c->remote_if_name));
- int i;
- if (c->on_interrupt != NULL)
- {
- for (i = 0; i < c->run_args.num_s2m_rings; i++)
- {
- lm->control_fd_update (c->rx_queues[i].int_fd, MEMIF_FD_EVENT_READ,
- lm->private_ctx);
- }
- }
+ err = memif_configure_rx_interrupt (c);
+ if (err != MEMIF_ERR_SUCCESS)
+ return err;
c->on_connect ((void *) c, c->private_ctx);
return err;
}
-static_fn int
-memif_msg_receive_disconnect (memif_connection_t * c, memif_msg_t * msg)
+static int
+memif_msg_parse_disconnect (memif_control_channel_t *cc, memif_msg_t *msg)
{
memif_msg_disconnect_t *d = &msg->disconnect;
+ memif_connection_t *c = cc->conn;
memset (c->remote_disconnect_string, 0,
sizeof (c->remote_disconnect_string));
- strncpy ((char *) c->remote_disconnect_string, (char *) d->string,
+ strlcpy ((char *) c->remote_disconnect_string, (char *) d->string,
sizeof (c->remote_disconnect_string));
/* on returning error, handle function will call memif_disconnect () */
@@ -654,8 +564,8 @@ memif_msg_receive_disconnect (memif_connection_t * c, memif_msg_t * msg)
return MEMIF_ERR_DISCONNECT;
}
-static_fn int
-memif_msg_receive (libmemif_main_t * lm, int ifd)
+static int
+memif_msg_receive_and_parse (memif_control_channel_t *cc)
{
char ctl[CMSG_SPACE (sizeof (int)) +
CMSG_SPACE (sizeof (struct ucred))] = { 0 };
@@ -666,9 +576,7 @@ memif_msg_receive (libmemif_main_t * lm, int ifd)
int err = MEMIF_ERR_SUCCESS; /* 0 */
int fd = -1;
int i;
- memif_connection_t *c = NULL;
memif_socket_t *ms = NULL;
- memif_list_elt_t *elt = NULL;
iov[0].iov_base = (void *) &msg;
iov[0].iov_len = sizeof (memif_msg_t);
@@ -677,8 +585,8 @@ memif_msg_receive (libmemif_main_t * lm, int ifd)
mh.msg_control = ctl;
mh.msg_controllen = sizeof (ctl);
- DBG ("recvmsg fd %d", ifd);
- size = recvmsg (ifd, &mh, 0);
+ DBG ("recvmsg fd %d", cc->fd);
+ size = recvmsg (cc->fd, &mh, 0);
if (size != sizeof (memif_msg_t))
{
if (size == 0)
@@ -709,93 +617,79 @@ memif_msg_receive (libmemif_main_t * lm, int ifd)
DBG ("Message type %u received", msg.type);
- get_list_elt (&elt, lm->control_list, lm->control_list_len, ifd);
- if (elt != NULL)
- c = (memif_connection_t *) elt->data_struct;
-
switch (msg.type)
{
case MEMIF_MSG_TYPE_ACK:
break;
case MEMIF_MSG_TYPE_HELLO:
- if ((err = memif_msg_receive_hello (c, &msg)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_hello (cc, &msg)) != MEMIF_ERR_SUCCESS)
return err;
- if ((err = memif_init_regions_and_queues (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_init_regions_and_queues (cc->conn)) !=
+ MEMIF_ERR_SUCCESS)
return err;
- if ((err = memif_msg_enq_init (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_init (cc)) != MEMIF_ERR_SUCCESS)
return err;
- for (i = 0; i < c->regions_num; i++)
+ for (i = 0; i < cc->conn->regions_num; i++)
{
- if ((err = memif_msg_enq_add_region (c, i)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_add_region (cc, i)) != MEMIF_ERR_SUCCESS)
return err;
}
- for (i = 0; i < c->run_args.num_s2m_rings; i++)
+ for (i = 0; i < cc->conn->run_args.num_s2m_rings; i++)
{
- if ((err =
- memif_msg_enq_add_ring (c, i,
- MEMIF_RING_S2M)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_S2M)) !=
+ MEMIF_ERR_SUCCESS)
return err;
}
- for (i = 0; i < c->run_args.num_m2s_rings; i++)
+ for (i = 0; i < cc->conn->run_args.num_m2s_rings; i++)
{
- if ((err =
- memif_msg_enq_add_ring (c, i,
- MEMIF_RING_M2S)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_M2S)) !=
+ MEMIF_ERR_SUCCESS)
return err;
}
- if ((err = memif_msg_enq_connect (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_connect (cc)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_INIT:
- get_list_elt (&elt, lm->pending_list, lm->pending_list_len, ifd);
- if (elt == NULL)
- return -1;
- ms = (memif_socket_t *) elt->data_struct;
- if ((err = memif_msg_receive_init (ms, ifd, &msg)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_init (cc, &msg)) != MEMIF_ERR_SUCCESS)
return err;
/* c->remote_pid = cr->pid */
/* c->remote_uid = cr->uid */
/* c->remote_gid = cr->gid */
- get_list_elt (&elt, lm->control_list, lm->control_list_len, ifd);
- if (elt == NULL)
- return -1;
- c = (memif_connection_t *) elt->data_struct;
- if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_ADD_REGION:
- if ((err =
- memif_msg_receive_add_region (c, &msg, fd)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_add_region (cc, &msg, fd)) !=
+ MEMIF_ERR_SUCCESS)
return err;
- if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_ADD_RING:
- if ((err =
- memif_msg_receive_add_ring (c, &msg, fd)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_add_ring (cc, &msg, fd)) != MEMIF_ERR_SUCCESS)
return err;
- if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_CONNECT:
- if ((err = memif_msg_receive_connect (c, &msg)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_connect (cc, &msg)) != MEMIF_ERR_SUCCESS)
return err;
- if ((err = memif_msg_enq_connected (c)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_enq_connected (cc)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_CONNECTED:
- if ((err = memif_msg_receive_connected (c, &msg)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_connected (cc, &msg)) != MEMIF_ERR_SUCCESS)
return err;
break;
case MEMIF_MSG_TYPE_DISCONNECT:
- if ((err = memif_msg_receive_disconnect (c, &msg)) != MEMIF_ERR_SUCCESS)
+ if ((err = memif_msg_parse_disconnect (cc, &msg)) != MEMIF_ERR_SUCCESS)
return err;
break;
@@ -804,101 +698,155 @@ memif_msg_receive (libmemif_main_t * lm, int ifd)
break;
}
- if (c != NULL)
- c->flags |= MEMIF_CONNECTION_FLAG_WRITE;
-
return MEMIF_ERR_SUCCESS; /* 0 */
}
-int
-memif_conn_fd_error (memif_connection_t * c)
+void
+memif_delete_control_channel (memif_control_channel_t *cc)
{
- DBG ("connection fd error");
- strncpy ((char *) c->remote_disconnect_string, "connection fd error", 19);
- int err = memif_disconnect_internal (c);
- return err;
-}
+ memif_msg_queue_elt_t *e, *next;
+ memif_socket_t *ms = cc->sock;
+ memif_fd_event_t fde;
+ void *ctx;
-/* calls memif_msg_receive to handle pending messages on socket */
-int
-memif_conn_fd_read_ready (memif_connection_t * c)
-{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- int err;
+ fde.fd = cc->fd;
+ fde.type = MEMIF_FD_EVENT_DEL;
+ ctx = ms->epfd != -1 ? ms : ms->private_ctx;
+ cc->sock->args.on_control_fd_update (fde, ctx);
- err = memif_msg_receive (lm, c->fd);
- if (err != 0)
+ if (cc->fd > 0)
+ close (cc->fd);
+
+ /* Clear control message queue */
+ for (e = TAILQ_FIRST (&cc->msg_queue); e != NULL; e = next)
{
- err = memif_disconnect_internal (c);
+ next = TAILQ_NEXT (e, next);
+ TAILQ_REMOVE (&cc->msg_queue, e, next);
+ cc->sock->args.free (e);
}
- return err;
+
+ /* remove reference */
+ if (cc->conn != NULL)
+ cc->conn->control_channel = NULL;
+ cc->conn = NULL;
+ cc->sock->args.free (cc);
+
+ return;
}
-/* get msg from msg queue buffer and send it to socket */
int
-memif_conn_fd_write_ready (memif_connection_t * c)
+memif_control_channel_handler (memif_fd_event_type_t type, void *private_ctx)
{
- libmemif_main_t *lm = get_libmemif_main (c->args.socket);
- int err = MEMIF_ERR_SUCCESS; /* 0 */
-
-
- if ((c->flags & MEMIF_CONNECTION_FLAG_WRITE) == 0)
- goto done;
+ memif_control_channel_t *cc = (memif_control_channel_t *) private_ctx;
+ int err;
- memif_msg_queue_elt_t *e = c->msg_queue;
- if (e == NULL)
- goto done;
+ /* Receive the message, parse the message and
+ * enqueue next message(s).
+ */
+ err = memif_msg_receive_and_parse (cc);
+ /* Can't assign to endpoint */
+ if (cc->conn == NULL)
+ {
+ /* A disconnect message is already in the queue */
+ memif_msg_send_from_queue (cc);
+ memif_delete_control_channel (cc);
- c->msg_queue = c->msg_queue->next;
+ return MEMIF_ERR_SUCCESS;
+ }
+ /* error in memif_msg_receive */
+ if (err != MEMIF_ERR_SUCCESS)
+ goto disconnect;
- c->flags &= ~MEMIF_CONNECTION_FLAG_WRITE;
+ /* Continue connecting, send next message from the queue */
+ err = memif_msg_send_from_queue (cc);
+ if (err != MEMIF_ERR_SUCCESS)
+ goto disconnect;
- err = memif_msg_send (c->fd, &e->msg, e->fd);
- lm->free (e);
- goto done;
+ return MEMIF_ERR_SUCCESS;
-done:
- return err;
+disconnect:
+ memif_disconnect_internal (cc->conn);
+ return MEMIF_ERR_SUCCESS;
}
int
-memif_conn_fd_accept_ready (memif_socket_t * ms)
+memif_listener_handler (memif_fd_event_type_t type, void *private_ctx)
{
- int addr_len;
- struct sockaddr_un client;
- int conn_fd;
- libmemif_main_t *lm = get_libmemif_main (ms);
+ memif_socket_t *ms = (memif_socket_t *) private_ctx;
+ memif_control_channel_t *cc;
+ memif_fd_event_t fde;
+ memif_fd_event_data_t *fdata;
+ struct sockaddr_un un;
+ int err, sockfd, addr_len = sizeof (un);
+ void *ctx;
+
+ if (ms == NULL)
+ return MEMIF_ERR_INVAL_ARG;
+
+ if (type & MEMIF_FD_EVENT_READ)
+ {
+ /* Accept connection to the listener socket */
+ sockfd = accept (ms->listener_fd, (struct sockaddr *) &un,
+ (socklen_t *) &addr_len);
+ if (sockfd < 0)
+ {
+ return memif_syscall_error_handler (errno);
+ }
- DBG ("accept called");
+ /* Create new control channel */
+ cc = ms->args.alloc (sizeof (*cc));
+ if (cc == NULL)
+ {
+ err = MEMIF_ERR_NOMEM;
+ goto error;
+ }
- addr_len = sizeof (client);
- conn_fd =
- accept (ms->fd, (struct sockaddr *) &client, (socklen_t *) & addr_len);
+ cc->fd = sockfd;
+ /* The connection will be assigned after parsing MEMIF_MSG_TYPE_INIT msg
+ */
+ cc->conn = NULL;
+ cc->sock = ms;
+ TAILQ_INIT (&cc->msg_queue);
- if (conn_fd < 0)
- {
- return memif_syscall_error_handler (errno);
- }
- DBG ("accept fd %d", ms->fd);
- DBG ("conn fd %d", conn_fd);
+ /* Create memif fd event */
+ fdata = ms->args.alloc (sizeof (*fdata));
+ if (fdata == NULL)
+ {
+ err = MEMIF_ERR_NOMEM;
+ goto error;
+ }
- memif_list_elt_t elt;
- elt.key = conn_fd;
- elt.data_struct = ms;
+ fdata->event_handler = memif_control_channel_handler;
+ fdata->private_ctx = cc;
- add_list_elt (lm, &elt, &lm->pending_list, &lm->pending_list_len);
- lm->control_fd_update (conn_fd, MEMIF_FD_EVENT_READ | MEMIF_FD_EVENT_WRITE,
- lm->private_ctx);
+ fde.fd = sockfd;
+ fde.type = MEMIF_FD_EVENT_READ;
+ fde.private_ctx = fdata;
- return memif_msg_send_hello (lm, conn_fd);
-}
+ /* Start listenning for events on the new control channel */
+ ctx = ms->epfd != -1 ? ms : ms->private_ctx;
+ ms->args.on_control_fd_update (fde, ctx);
-int
-memif_read_ready (libmemif_main_t * lm, int fd)
-{
- int err;
+ /* enqueue HELLO msg */
+ err = memif_msg_enq_hello (cc);
+ if (err != MEMIF_ERR_SUCCESS)
+ goto error;
+
+ /* send HELLO msg */
+ err = memif_msg_send_from_queue (cc);
+ if (err != MEMIF_ERR_SUCCESS)
+ goto error;
+ }
+
+ return MEMIF_ERR_SUCCESS;
- err = memif_msg_receive (lm, fd);
+error:
+ if (sockfd > 0)
+ close (sockfd);
+ if (cc != NULL)
+ ms->args.free (cc);
+ if (fdata != NULL)
+ ms->args.free (fdata);
return err;
}