diff options
author | Jakub Grajciar <jgrajcia@cisco.com> | 2021-01-04 11:10:42 +0100 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2021-09-27 14:35:45 +0000 |
commit | 134f1e02251b27ad73c3ff4591954c2e0919c984 (patch) | |
tree | 30e84c2bea30ad760a6629a7b1d5843bdd7f8a7c /extras/libmemif/src | |
parent | 45cf1fc3f58ee465d2e7e4689158e79fd706658e (diff) |
libmemif: refactor connection establishment
per_thread_ namespace fuctionality replaced by memif socket.
Interfaces are grouped by memif socket which holds interface database.
Each thread can create it's unique memif socket. The path name
can be equal across threads so that the app only uses one
UNIX socket. In case of listener socket, listener fd
can be obtained and set using APIs.
This change allows:
- No lookup on file descriptor events
- improves interrupt handling
- Loopback support (connect two interfaces in one app)
- usefull for debugging and testing
- Improves code readability by providing control channel
abstraction for each interface and listener sockets
Type: refactor
Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
Change-Id: I1b8261042431c0376646ab4c4c831f6e59dd3eed
Diffstat (limited to 'extras/libmemif/src')
-rw-r--r-- | extras/libmemif/src/libmemif.h | 331 | ||||
-rw-r--r-- | extras/libmemif/src/main.c | 1393 | ||||
-rw-r--r-- | extras/libmemif/src/memif_private.h | 134 | ||||
-rw-r--r-- | extras/libmemif/src/socket.c | 818 | ||||
-rw-r--r-- | extras/libmemif/src/socket.h | 56 |
5 files changed, 1160 insertions, 1572 deletions
diff --git a/extras/libmemif/src/libmemif.h b/extras/libmemif/src/libmemif.h index f9e3dcd09b3..bb36c39047e 100644 --- a/extras/libmemif/src/libmemif.h +++ b/extras/libmemif/src/libmemif.h @@ -35,50 +35,51 @@ /*! Error codes */ typedef enum { - MEMIF_ERR_SUCCESS = 0, /*!< success */ -/* SYSCALL ERRORS */ - MEMIF_ERR_SYSCALL, /*!< other syscall error */ - MEMIF_ERR_CONNREFUSED, /*!< connection refused */ - MEMIF_ERR_ACCES, /*!< permission denied */ - MEMIF_ERR_NO_FILE, /*!< file does not exist */ - MEMIF_ERR_FILE_LIMIT, /*!< system open file limit */ - MEMIF_ERR_PROC_FILE_LIMIT, /*!< process open file limit */ - MEMIF_ERR_ALREADY, /*!< connection already requested */ - MEMIF_ERR_AGAIN, /*!< fd is not socket, or operation would block */ - MEMIF_ERR_BAD_FD, /*!< invalid fd */ - MEMIF_ERR_NOMEM, /*!< out of memory */ -/* LIBMEMIF ERRORS */ - MEMIF_ERR_INVAL_ARG, /*!< invalid argument */ - MEMIF_ERR_NOCONN, /*!< handle points to no connection */ - MEMIF_ERR_CONN, /*!< handle points to existing connection */ - MEMIF_ERR_CB_FDUPDATE, /*!< user defined callback memif_control_fd_update_t error */ - MEMIF_ERR_FILE_NOT_SOCK, /*!< file specified by socket filename - exists, but it's not socket */ - MEMIF_ERR_NO_SHMFD, /*!< missing shm fd */ - MEMIF_ERR_COOKIE, /*!< wrong cookie on ring */ - MEMIF_ERR_NOBUF_RING, /*!< ring buffer full */ - MEMIF_ERR_NOBUF, /*!< not enough memif buffers */ - MEMIF_ERR_NOBUF_DET, /*!< memif details needs larger buffer */ - MEMIF_ERR_INT_WRITE, /*!< send interrupt error */ - MEMIF_ERR_MFMSG, /*!< malformed msg received */ - MEMIF_ERR_QID, /*!< invalid queue id */ -/* MEMIF PROTO ERRORS */ - MEMIF_ERR_PROTO, /*!< incompatible protocol version */ - MEMIF_ERR_ID, /*!< unmatched interface id */ - MEMIF_ERR_ACCSLAVE, /*!< slave cannot accept connection requests */ - MEMIF_ERR_ALRCONN, /*!< memif is already connected */ - MEMIF_ERR_MODE, /*!< mode mismatch */ - MEMIF_ERR_SECRET, /*!< secret mismatch */ - MEMIF_ERR_NOSECRET, /*!< secret required */ - MEMIF_ERR_MAXREG, /*!< max region limit reached */ - MEMIF_ERR_MAXRING, /*!< max ring limit reached */ - MEMIF_ERR_NO_INTFD, /*!< missing interrupt fd */ - MEMIF_ERR_DISCONNECT, /*!< disconnect received */ - MEMIF_ERR_DISCONNECTED, /*!< peer interface disconnected */ - MEMIF_ERR_UNKNOWN_MSG, /*!< unknown message type */ - MEMIF_ERR_POLL_CANCEL, /*!< memif_poll_event() was cancelled */ - MEMIF_ERR_MAX_RING, /*!< too large ring size */ - MEMIF_ERR_PRIVHDR, /*!< private hdrs not supported */ + MEMIF_ERR_SUCCESS = 0, /*!< success */ + /* SYSCALL ERRORS */ + MEMIF_ERR_SYSCALL, /*!< other syscall error */ + MEMIF_ERR_CONNREFUSED, /*!< connection refused */ + MEMIF_ERR_ACCES, /*!< permission denied */ + MEMIF_ERR_NO_FILE, /*!< file does not exist */ + MEMIF_ERR_FILE_LIMIT, /*!< system open file limit */ + MEMIF_ERR_PROC_FILE_LIMIT, /*!< process open file limit */ + MEMIF_ERR_ALREADY, /*!< connection already requested */ + MEMIF_ERR_AGAIN, /*!< fd is not socket, or operation would block */ + MEMIF_ERR_BAD_FD, /*!< invalid fd */ + MEMIF_ERR_NOMEM, /*!< out of memory */ + /* LIBMEMIF ERRORS */ + MEMIF_ERR_INVAL_ARG, /*!< invalid argument */ + MEMIF_ERR_NOCONN, /*!< handle points to no connection */ + MEMIF_ERR_CONN, /*!< handle points to existing connection */ + MEMIF_ERR_CB_FDUPDATE, /*!< user defined callback memif_control_fd_update_t + error */ + MEMIF_ERR_FILE_NOT_SOCK, /*!< file specified by socket path + exists, but it's not socket */ + MEMIF_ERR_NO_SHMFD, /*!< missing shm fd */ + MEMIF_ERR_COOKIE, /*!< wrong cookie on ring */ + MEMIF_ERR_NOBUF_RING, /*!< ring buffer full */ + MEMIF_ERR_NOBUF, /*!< not enough memif buffers */ + MEMIF_ERR_NOBUF_DET, /*!< memif details needs larger buffer */ + MEMIF_ERR_INT_WRITE, /*!< send interrupt error */ + MEMIF_ERR_MFMSG, /*!< malformed msg received */ + MEMIF_ERR_QID, /*!< invalid queue id */ + /* MEMIF PROTO ERRORS */ + MEMIF_ERR_PROTO, /*!< incompatible protocol version */ + MEMIF_ERR_ID, /*!< unmatched interface id */ + MEMIF_ERR_ACCSLAVE, /*!< slave cannot accept connection requests */ + MEMIF_ERR_ALRCONN, /*!< memif is already connected */ + MEMIF_ERR_MODE, /*!< mode mismatch */ + MEMIF_ERR_SECRET, /*!< secret mismatch */ + MEMIF_ERR_NOSECRET, /*!< secret required */ + MEMIF_ERR_MAXREG, /*!< max region limit reached */ + MEMIF_ERR_MAXRING, /*!< max ring limit reached */ + MEMIF_ERR_NO_INTFD, /*!< missing interrupt fd */ + MEMIF_ERR_DISCONNECT, /*!< disconnect received */ + MEMIF_ERR_DISCONNECTED, /*!< peer interface disconnected */ + MEMIF_ERR_UNKNOWN_MSG, /*!< unknown message type */ + MEMIF_ERR_POLL_CANCEL, /*!< memif_poll_event() was cancelled */ + MEMIF_ERR_MAX_RING, /*!< too large ring size */ + MEMIF_ERR_PRIVHDR, /*!< private hdrs not supported */ } memif_err_t; /** @@ -87,15 +88,22 @@ typedef enum * @{ */ -/** user needs to set events that occurred on fd and pass them to memif_control_fd_handler */ -#define MEMIF_FD_EVENT_READ (1 << 0) -#define MEMIF_FD_EVENT_WRITE (1 << 1) -/** inform libmemif that error occurred on fd */ -#define MEMIF_FD_EVENT_ERROR (1 << 2) -/** if set, informs that fd is going to be closed (user may want to stop watching for events on this fd) */ -#define MEMIF_FD_EVENT_DEL (1 << 3) -/** update events */ -#define MEMIF_FD_EVENT_MOD (1 << 4) +/** \brief Memif fd events + * User needs to set events that occurred on fd and pass them to + * memif_control_fd_handler + */ +typedef enum memif_fd_event_type +{ + MEMIF_FD_EVENT_READ = 1, /* 00001 */ + MEMIF_FD_EVENT_WRITE = 2, /* 00010 */ + /** inform libmemif that error occurred on fd */ + MEMIF_FD_EVENT_ERROR = 4, /* 00100 */ + /** if set, informs that fd is going to be closed (user may want to stop + watching for events on this fd) */ + MEMIF_FD_EVENT_DEL = 8, /* 01000 */ + /** update events */ + MEMIF_FD_EVENT_MOD = 16 /* 10000 */ +} memif_fd_event_type_t; /** @} */ /** \brief Memif connection handle @@ -138,19 +146,29 @@ typedef void (memif_free_t) (void *ptr); * @{ */ +/** \brief Memif fd event + @param fd - interrupt file descriptor + @param type - memif fd event type + @param private_ctx - private event data +*/ +typedef struct memif_fd_event +{ + int fd; + memif_fd_event_type_t type; + void *private_ctx; +} memif_fd_event_t; + /** \brief Memif control file descriptor update (callback function) - @param fd - new file descriptor to watch - @param events - event type(s) to watch for - @param private_ctx - libmemif main private context. Is NULL for - libmemif main created by memif_init() + @param fde - memif fd event + @param private_ctx - private context of socket this fd belongs to This callback is called when there is new fd to watch for events on - or if fd is about to be closed (user mey want to stop watching for events on this fd). - Private context is taken from libmemif_main, 'private_ctx' passed to memif_per_thread_init() - or NULL in case of memif_init() + or if fd is about to be closed (user mey want to stop watching for events + on this fd). Private context is taken from libmemif_main, 'private_ctx' + passed to memif_per_thread_init() or NULL in case of memif_init() */ -typedef int (memif_control_fd_update_t) (int fd, uint8_t events, +typedef int (memif_control_fd_update_t) (memif_fd_event_t fde, void *private_ctx); /** \brief Memif connection status update (callback function) @@ -170,8 +188,8 @@ typedef int (memif_connection_update_t) (memif_conn_handle_t conn, Called when event is received on interrupt fd. */ -typedef int (memif_interrupt_t) (memif_conn_handle_t conn, void *private_ctx, - uint16_t qid); +typedef int (memif_on_interrupt_t) (memif_conn_handle_t conn, + void *private_ctx, uint16_t qid); /** @} */ @@ -229,10 +247,11 @@ typedef int (memif_del_external_region_t) (void *addr, uint32_t size, int fd, @param dr - delete external region callback @param go - get external buffer offset callback (optional) */ -void memif_register_external_region (memif_add_external_region_t * ar, - memif_get_external_region_addr_t * gr, - memif_del_external_region_t * dr, - memif_get_external_buffer_offset_t * go); +void memif_register_external_region (memif_socket_handle_t sock, + memif_add_external_region_t *ar, + memif_get_external_region_addr_t *gr, + memif_del_external_region_t *dr, + memif_get_external_buffer_offset_t *go); /** \brief Register external region @param pt_main - per thread main handle @@ -270,6 +289,37 @@ typedef enum } memif_interface_mode_t; #endif /* _MEMIF_H_ */ +/** \brief Memif socket arguments + @param path - UNIX socket path, supports abstract socket (have '\0' or '@' + as the first char of the path) + @param app_name - application name + @param connection_request_timer - automaticaly request connection each time + this timer expires, must be non-zero to enable this feature + @param on_control_fd_update - if control fd updates inform user to watch + new fd + @param alloc - custom memory allocator, NULL = default + @param realloc - custom memory reallocation, NULL = default + @param free - custom memory free, NULL = default + + If param on_control_fd_update is set to NULL, + libmemif will handle file descriptor event polling + if a valid callback is set, file descriptor event polling needs to be done + by user application, all file descriptors and event types will be passed in + this callback to user application +*/ +typedef struct memif_socket_args +{ + char path[108]; + char app_name[32]; + + struct itimerspec connection_request_timer; + + memif_control_fd_update_t *on_control_fd_update; + memif_alloc_t *alloc; + memif_realloc_t *realloc; + memif_free_t *free; +} memif_socket_args_t; + /** \brief Memif connection arguments @param socket - Memif socket handle, if NULL default socket will be used. Default socket is only supported in global database (see memif_init). @@ -347,7 +397,7 @@ typedef struct typedef struct { uint8_t region; - uint8_t qid; + uint16_t qid; uint32_t ring_size; /** if set queue is in polling mode, else in interrupt mode */ #define MEMIF_QUEUE_FLAG_POLLING 1 @@ -382,7 +432,7 @@ typedef struct @param secret - secret @param role - 0 = master, 1 = slave @param mode - 0 = ethernet, 1 = ip , 2 = punt/inject - @param socket_filename - socket filename + @param socket_path - socket path @param regions_num - number of regions @param regions - struct containing region details @param rx_queues_num - number of receive queues @@ -403,7 +453,7 @@ typedef struct uint8_t *secret; /* optional */ uint8_t role; /* 0 = master, 1 = slave */ uint8_t mode; /* 0 = ethernet, 1 = ip, 2 = punt/inject */ - uint8_t *socket_filename; + uint8_t *socket_path; uint8_t regions_num; memif_region_details_t *regions; uint8_t rx_queues_num; @@ -429,6 +479,11 @@ typedef struct */ uint16_t memif_get_version (); +/** \brief Get memif version as string + \return major.minor +*/ +const char *memif_get_version_str (); + /** \brief Memif get queue event file descriptor @param conn - memif connection handle @param qid - queue id @@ -469,37 +524,6 @@ char *memif_strerror (int err_code); int memif_get_details (memif_conn_handle_t conn, memif_details_t * md, char *buf, ssize_t buflen); -/** \brief Memif initialization - @param on_control_fd_update - if control fd updates inform user to watch new fd - @param app_name - application name (will be truncated to 32 chars) - @param memif_alloc - custom memory allocator, NULL = default - @param memif_realloc - custom memory reallocation, NULL = default - @param memif_free - custom memory free, NULL = default - - if param on_control_fd_update is set to NULL, - libmemif will handle file descriptor event polling - if a valid callback is set, file descriptor event polling needs to be done by - user application, all file descriptors and event types will be passed in - this callback to user application - - Initialize internal libmemif structures. Create timerfd (used to periodically request connection by - disconnected memifs in slave mode, with no additional API call). This fd is passed to user with memif_control_fd_update_t - timer is inactive at this state. It activates with if there is at least one memif in slave mode. - - \return memif_err_t -*/ -int memif_init (memif_control_fd_update_t * on_control_fd_update, - char *app_name, memif_alloc_t * memif_alloc, - memif_realloc_t * memif_realloc, memif_free_t * memif_free); - -/** \brief Memif cleanup - - Free libmemif internal allocations. - - \return 0 -*/ -int memif_cleanup (); - /** \brief Memory interface create function @param conn - connection handle for client app @param args - memory interface connection arguments @@ -523,30 +547,19 @@ int memif_cleanup (); \return memif_err_t */ -int memif_create (memif_conn_handle_t * conn, memif_conn_args_t * args, - memif_connection_update_t * on_connect, - memif_connection_update_t * on_disconnect, - memif_interrupt_t * on_interrupt, void *private_ctx); +int memif_create (memif_conn_handle_t *conn, memif_conn_args_t *args, + memif_connection_update_t *on_connect, + memif_connection_update_t *on_disconnect, + memif_on_interrupt_t *on_interrupt, void *private_ctx); /** \brief Memif control file descriptor handler - @param fd - file descriptor on which the event occurred + @param ptr - pointer to event data @param events - event type(s) that occurred - If event occurs on any control fd, call memif_control_fd_handler. - Internal - lib will "identify" fd (timerfd, listener, control) and handle event accordingly. - - FD-TYPE - - TIMERFD - - Every disconnected memif in slave mode will request connection. - LISTENER or CONTROL - - Handle socket messaging (internal connection establishment). - INTERRUPT - - Call on_interrupt callback (if set). - \return memif_err_t */ -int memif_control_fd_handler (int fd, uint8_t events); +int memif_control_fd_handler (void *ptr, memif_fd_event_type_t events); /** \brief Memif delete @param conn - pointer to memif connection handle @@ -650,21 +663,19 @@ int memif_rx_burst (memif_conn_handle_t conn, uint16_t qid, memif_buffer_t * bufs, uint16_t count, uint16_t * rx); /** \brief Memif poll event + @param sock - socket to poll events on @param timeout - timeout in seconds Passive event polling - - timeout = 0 - dont wait for event, check event queue if there is an event and return. - timeout = -1 - wait until event + timeout = 0 - dont wait for event, check event queue if there is an event + and return. timeout = -1 - wait until event \return memif_err_t */ -int memif_poll_event (int timeout); - -/** \brief Memif per thread poll event - @param pt_main - per thread main handle - @param timeout - timeout in seconds +int memif_poll_event (memif_socket_handle_t sock, int timeout); /** \brief Send signal to stop concurrently running memif_poll_event(). + @param sock - stop polling on this socket The function, however, does not wait for memif_poll_event() to stop. memif_poll_event() may still return simply because an event has occurred @@ -677,17 +688,7 @@ int memif_poll_event (int timeout); \return memif_err_t */ #define MEMIF_HAVE_CANCEL_POLL_EVENT 1 -int memif_cancel_poll_event (); - -/** \brief Set connection request timer value - @param timer - new timer value - - Timer on which all disconnected slaves request connection. - See system call 'timer_settime' man-page. - - \return memif_err_t -*/ -int memif_set_connection_request_timer (struct itimerspec timer); +int memif_cancel_poll_event (memif_socket_handle_t sock); /** \brief Send connection request @param conn - memif connection handle @@ -700,20 +701,27 @@ int memif_request_connection (memif_conn_handle_t conn); /** \brief Create memif socket @param sock - socket handle for client app - @param filename - path to socket file + @param args - memif socket arguments @param private_ctx - private context The first time an interface is assigned a socket, its type is determined. - For master role it's 'listener', for slave role it's 'client'. Each interface - requires socket of its respective type. Default socket is created if no - socket handle is passed to memif_create(). It's private context is NULL. - If all interfaces using this socket are deleted, the socket returns - to its default state. + For master role it's 'listener', for slave role it's 'client'. Each + interface requires socket of its respective type. Default socket is created + if no socket handle is passed to memif_create(). It's private context is + NULL. If all interfaces using this socket are deleted, the socket returns to + its default state. \return memif_err_t */ -int memif_create_socket (memif_socket_handle_t * sock, const char *filename, - void *private_ctx); +int memif_create_socket (memif_socket_handle_t *sock, + memif_socket_args_t *args, void *private_ctx); + +/** \brief Get memif socket handle from connection + @param conn - memif connection handle + + \return memif_socket_handle_t +*/ +memif_socket_handle_t memif_get_socket_handle (memif_conn_handle_t conn); /** \brief Delete memif socket @param sock - socket handle for client app @@ -725,15 +733,42 @@ int memif_create_socket (memif_socket_handle_t * sock, const char *filename, */ int memif_delete_socket (memif_socket_handle_t * sock); -/** \brief Get socket filename +/** \brief Get socket path @param sock - socket handle for client app - Return constant pointer to socket filename. + Return constant pointer to socket path. \return const char * */ -const char *memif_get_socket_filename (memif_socket_handle_t sock); +const char *memif_get_socket_path (memif_socket_handle_t sock); + +/** \brief Get listener file descriptor + @param sock - memif socket handle + + \return listener fd +*/ +int memif_get_listener_fd (memif_socket_handle_t sock); + +/** \brief Set listener file descriptor + @param sock - memif socket handle + @param if - file descriptor + + \return memif_err_t +*/ +int memif_set_listener_fd (memif_socket_handle_t sock, int fd); + +/** \brief Set connection request timer value + @param sock - memif socket handle + @param timer - new timer value + + Timer on which all disconnected slaves request connection. + If the timer doesn't exist (timerspec is 0) create new timer. + See system call 'timer_settime' man-page. + \return memif_err_t +*/ +int memif_set_connection_request_timer (memif_socket_handle_t sock, + struct itimerspec timer); /** @} */ -#endif /* _LIBMEMIF_H_ */ +#endif /* _LIBMEMIF_H_ */
\ No newline at end of file diff --git a/extras/libmemif/src/main.c b/extras/libmemif/src/main.c index 967e033cda3..6a8e8b942dd 100644 --- a/extras/libmemif/src/main.c +++ b/extras/libmemif/src/main.c @@ -65,8 +65,6 @@ #define MEMIF_MEMORY_BARRIER() __sync_synchronize () #endif /* __x86_x64__ */ -libmemif_main_t libmemif_main; - static char memif_buf[MAX_ERRBUF_LEN]; const char *memif_errlist[ERRLIST_LEN] = { /* MEMIF_ERR_SUCCESS */ @@ -163,6 +161,7 @@ memif_strerror (int err_code) else { strlcpy (memif_buf, memif_errlist[err_code], sizeof (memif_buf)); + memif_buf[strlen (memif_errlist[err_code])] = '\0'; } return memif_buf; } @@ -173,6 +172,16 @@ memif_get_version () return MEMIF_VERSION; } +const char * +memif_get_version_str () +{ +#define __STR_HELPER(x) #x +#define __STR(x) __STR_HELPER (x) + return __STR (MEMIF_VERSION_MAJOR) "." __STR (MEMIF_VERSION_MINOR); +#undef __STR +#undef __STR_HELPER +} + #define DBG_TX_BUF (0) #define DBG_RX_BUF (1) @@ -231,398 +240,128 @@ memif_syscall_error_handler (int err_code) return MEMIF_ERR_SYSCALL; } -/* Always valid */ -libmemif_main_t * -get_libmemif_main (memif_socket_t * ms) -{ - if (ms != NULL && ms->lm != NULL) - return ms->lm; - return &libmemif_main; -} - static int -memif_add_epoll_fd (libmemif_main_t * lm, int fd, uint32_t events) +memif_add_epoll_fd (memif_socket_t *ms, memif_fd_event_t fde, uint32_t events) { - if (fd < 0) + if (fde.fd < 0) { - DBG ("invalid fd %d", fd); + DBG ("invalid fd %d", fde.fd); return -1; } struct epoll_event evt; memset (&evt, 0, sizeof (evt)); evt.events = events; - evt.data.fd = fd; - if (epoll_ctl (lm->epfd, EPOLL_CTL_ADD, fd, &evt) < 0) + evt.data.ptr = fde.private_ctx; + if (epoll_ctl (ms->epfd, EPOLL_CTL_ADD, fde.fd, &evt) < 0) { - DBG ("epoll_ctl: %s fd %d", strerror (errno), fd); + DBG ("epoll_ctl: %s fd %d", strerror (errno), fde.fd); return -1; } - DBG ("fd %d added to epoll", fd); + DBG ("fd %d added to epoll", fde.fd); return 0; } static int -memif_mod_epoll_fd (libmemif_main_t * lm, int fd, uint32_t events) +memif_mod_epoll_fd (memif_socket_t *ms, memif_fd_event_t fde, uint32_t events) { - if (fd < 0) + if (fde.fd < 0) { - DBG ("invalid fd %d", fd); + DBG ("invalid fd %d", fde.fd); return -1; } struct epoll_event evt; memset (&evt, 0, sizeof (evt)); evt.events = events; - evt.data.fd = fd; - if (epoll_ctl (lm->epfd, EPOLL_CTL_MOD, fd, &evt) < 0) + evt.data.ptr = fde.private_ctx; + if (epoll_ctl (ms->epfd, EPOLL_CTL_MOD, fde.fd, &evt) < 0) { - DBG ("epoll_ctl: %s fd %d", strerror (errno), fd); + DBG ("epoll_ctl: %s fd %d", strerror (errno), fde.fd); return -1; } - DBG ("fd %d modified on epoll", fd); + DBG ("fd %d modified on epoll", fde.fd); return 0; } static int -memif_del_epoll_fd (libmemif_main_t * lm, int fd) +memif_del_epoll_fd (memif_socket_t *ms, memif_fd_event_t fde) { - if (fd < 0) + if (fde.fd < 0) { - DBG ("invalid fd %d", fd); + DBG ("invalid fd %d", fde.fd); return -1; } struct epoll_event evt; memset (&evt, 0, sizeof (evt)); - if (epoll_ctl (lm->epfd, EPOLL_CTL_DEL, fd, &evt) < 0) + if (epoll_ctl (ms->epfd, EPOLL_CTL_DEL, fde.fd, &evt) < 0) { - DBG ("epoll_ctl: %s fd %d", strerror (errno), fd); + DBG ("epoll_ctl: %s fd %d", strerror (errno), fde.fd); return -1; } - DBG ("fd %d removed from epoll", fd); + DBG ("fd %d removed from epoll", fde.fd); return 0; } int -memif_control_fd_update (int fd, uint8_t events, void *private_ctx) +memif_control_fd_update (memif_fd_event_t fde, void *private_ctx) { - libmemif_main_t *lm; + memif_socket_t *ms = (memif_socket_t *) private_ctx; + int fd; - lm = (private_ctx == NULL) ? &libmemif_main : (libmemif_main_t *) private_ctx; + if (ms == NULL) + return MEMIF_ERR_INVAL_ARG; - if (events & MEMIF_FD_EVENT_DEL) - return memif_del_epoll_fd (lm, fd); + if (fde.type & MEMIF_FD_EVENT_DEL) + return memif_del_epoll_fd (ms, fde); uint32_t evt = 0; - if (events & MEMIF_FD_EVENT_READ) + if (fde.type & MEMIF_FD_EVENT_READ) evt |= EPOLLIN; - if (events & MEMIF_FD_EVENT_WRITE) + if (fde.type & MEMIF_FD_EVENT_WRITE) evt |= EPOLLOUT; - if (events & MEMIF_FD_EVENT_MOD) - return memif_mod_epoll_fd (lm, fd, evt); + if (fde.type & MEMIF_FD_EVENT_MOD) + return memif_mod_epoll_fd (ms, fde, evt); - return memif_add_epoll_fd (lm, fd, evt); -} - -int -add_list_elt (libmemif_main_t * lm, memif_list_elt_t * e, - memif_list_elt_t ** list, uint16_t * len) -{ - memif_list_elt_t *tmp; - int i; - - for (i = 0; i < *len; i++) - { - if ((*list)[i].data_struct == NULL) - { - (*list)[i].key = e->key; - (*list)[i].data_struct = e->data_struct; - return i; - } - } - - tmp = lm->realloc (*list, sizeof (memif_list_elt_t) * *len * 2); - if (tmp == NULL) - return -1; - - for (i = *len; i < *len * 2; i++) - { - tmp[i].key = -1; - tmp[i].data_struct = NULL; - } - - tmp[*len].key = e->key; - tmp[*len].data_struct = e->data_struct; - i = *len; - *len = *len * 2; - *list = tmp; - - return i; -} - -int -get_list_elt (memif_list_elt_t ** e, memif_list_elt_t * list, uint16_t len, - int key) -{ - int i; - if (key == -1) - { - *e = NULL; - return -1; - } - - for (i = 0; i < len; i++) - { - if (list[i].key == key) - { - *e = &list[i]; - return 0; - } - } - *e = NULL; - return -1; -} - -/* does not free memory, only marks element as free */ -int -free_list_elt (memif_list_elt_t * list, uint16_t len, int key) -{ - int i; - for (i = 0; i < len; i++) - { - if (list[i].key == key) - { - list[i].key = -1; - list[i].data_struct = NULL; - return 0; - } - } - - return -1; -} - -int -free_list_elt_ctx (memif_list_elt_t * list, uint16_t len, - memif_connection_t * ctx) -{ - int i; - for (i = 0; i < len; i++) - { - if (list[i].key == -1) - { - if (list[i].data_struct == ctx) - { - list[i].data_struct = NULL; - return 0; - } - } - } - - return -1; + return memif_add_epoll_fd (ms, fde, evt); } static void -memif_control_fd_update_register (libmemif_main_t * lm, - memif_control_fd_update_t * cb) +memif_control_fd_update_register (memif_socket_t *ms, + memif_control_fd_update_t *cb) { - lm->control_fd_update = cb; + ms->args.on_control_fd_update = cb; } void -memif_register_external_region (memif_add_external_region_t * ar, - memif_get_external_region_addr_t * gr, - memif_del_external_region_t * dr, - memif_get_external_buffer_offset_t * go) +memif_register_external_region (memif_socket_handle_t sock, + memif_add_external_region_t *ar, + memif_get_external_region_addr_t *gr, + memif_del_external_region_t *dr, + memif_get_external_buffer_offset_t *go) { - libmemif_main_t *lm = &libmemif_main; - lm->add_external_region = ar; - lm->get_external_region_addr = gr; - lm->del_external_region = dr; - lm->get_external_buffer_offset = go; + memif_socket_t *ms = (memif_socket_t *) sock; + ms->add_external_region = ar; + ms->get_external_region_addr = gr; + ms->del_external_region = dr; + ms->get_external_buffer_offset = go; } static void -memif_alloc_register (libmemif_main_t * lm, memif_alloc_t * ma) +memif_alloc_register (memif_socket_t *ms, memif_alloc_t *ma) { - lm->alloc = ma; + ms->args.alloc = ma; } static void -memif_realloc_register (libmemif_main_t * lm, memif_realloc_t * mr) +memif_realloc_register (memif_socket_t *ms, memif_realloc_t *mr) { - lm->realloc = mr; + ms->args.realloc = mr; } static void -memif_free_register (libmemif_main_t * lm, memif_free_t * mf) +memif_free_register (memif_socket_t *ms, memif_free_t *mf) { - lm->free = mf; -} - -int -memif_set_connection_request_timer (struct itimerspec timer) -{ - libmemif_main_t *lm = &libmemif_main; - int err = MEMIF_ERR_SUCCESS; - - lm->arm = timer; - - /* overwrite timer, if already armed */ - if (lm->disconn_slaves != 0) - { - if (timerfd_settime (lm->timerfd, 0, &lm->arm, NULL) < 0) - { - err = memif_syscall_error_handler (errno); - } - } - return err; -} - -int -memif_init (memif_control_fd_update_t * on_control_fd_update, char *app_name, - memif_alloc_t * memif_alloc, memif_realloc_t * memif_realloc, - memif_free_t * memif_free) -{ - int err = MEMIF_ERR_SUCCESS; /* 0 */ - libmemif_main_t *lm = &libmemif_main; - memset (lm, 0, sizeof (libmemif_main_t)); - - /* register custom memory management */ - if (memif_alloc != NULL) - { - memif_alloc_register (lm, memif_alloc); - } - else - memif_alloc_register (lm, malloc); - - if (memif_realloc != NULL) - { - memif_realloc_register (lm, memif_realloc); - } - else - memif_realloc_register (lm, realloc); - - if (memif_free != NULL) - memif_free_register (lm, memif_free); - else - memif_free_register (lm, free); - - if (app_name != NULL) - { - strlcpy ((char *) lm->app_name, app_name, sizeof (lm->app_name)); - } - else - { - strlcpy ((char *) lm->app_name, MEMIF_DEFAULT_APP_NAME, - sizeof (lm->app_name)); - } - - lm->poll_cancel_fd = -1; - /* register control fd update callback */ - if (on_control_fd_update != NULL) - memif_control_fd_update_register (lm, on_control_fd_update); - else - { - lm->epfd = epoll_create (1); - memif_control_fd_update_register (lm, memif_control_fd_update); - if ((lm->poll_cancel_fd = eventfd (0, EFD_NONBLOCK)) < 0) - { - err = errno; - DBG ("eventfd: %s", strerror (err)); - return memif_syscall_error_handler (err); - } - lm->control_fd_update (lm->poll_cancel_fd, MEMIF_FD_EVENT_READ, lm->private_ctx); - DBG ("libmemif event polling initialized"); - } - - lm->control_list_len = 2; - lm->interrupt_list_len = 2; - lm->socket_list_len = 1; - lm->pending_list_len = 1; - - lm->control_list = - lm->alloc (sizeof (memif_list_elt_t) * lm->control_list_len); - if (lm->control_list == NULL) - { - err = MEMIF_ERR_NOMEM; - goto error; - } - lm->interrupt_list = - lm->alloc (sizeof (memif_list_elt_t) * lm->interrupt_list_len); - if (lm->interrupt_list == NULL) - { - err = MEMIF_ERR_NOMEM; - goto error; - } - lm->socket_list = - lm->alloc (sizeof (memif_list_elt_t) * lm->socket_list_len); - if (lm->socket_list == NULL) - { - err = MEMIF_ERR_NOMEM; - goto error; - } - lm->pending_list = - lm->alloc (sizeof (memif_list_elt_t) * lm->pending_list_len); - if (lm->pending_list == NULL) - { - err = MEMIF_ERR_NOMEM; - goto error; - } - - int i; - for (i = 0; i < lm->control_list_len; i++) - { - lm->control_list[i].key = -1; - lm->control_list[i].data_struct = NULL; - } - for (i = 0; i < lm->interrupt_list_len; i++) - { - lm->interrupt_list[i].key = -1; - lm->interrupt_list[i].data_struct = NULL; - } - for (i = 0; i < lm->socket_list_len; i++) - { - lm->socket_list[i].key = -1; - lm->socket_list[i].data_struct = NULL; - } - for (i = 0; i < lm->pending_list_len; i++) - { - lm->pending_list[i].key = -1; - lm->pending_list[i].data_struct = NULL; - } - - lm->disconn_slaves = 0; - - lm->timerfd = timerfd_create (CLOCK_REALTIME, TFD_NONBLOCK); - if (lm->timerfd < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } - - lm->arm.it_value.tv_sec = MEMIF_DEFAULT_RECONNECT_PERIOD_SEC; - lm->arm.it_value.tv_nsec = MEMIF_DEFAULT_RECONNECT_PERIOD_NSEC; - lm->arm.it_interval.tv_sec = MEMIF_DEFAULT_RECONNECT_PERIOD_SEC; - lm->arm.it_interval.tv_nsec = MEMIF_DEFAULT_RECONNECT_PERIOD_NSEC; - - if (lm->control_fd_update (lm->timerfd, MEMIF_FD_EVENT_READ, lm->private_ctx) < 0) - { - DBG ("callback type memif_control_fd_update_t error!"); - err = MEMIF_ERR_CB_FDUPDATE; - goto error; - } - - /* Create default socket */ - err = memif_create_socket ((memif_socket_handle_t *) & - lm->default_socket, - MEMIF_DEFAULT_SOCKET_PATH, NULL); - if (err != MEMIF_ERR_SUCCESS) - goto error; - - return err; - -error: - memif_cleanup (); - return err; + ms->args.free = mf; } static inline memif_ring_t * @@ -658,122 +397,156 @@ memif_set_rx_mode (memif_conn_handle_t c, memif_rx_mode_t rx_mode, return MEMIF_ERR_SUCCESS; } -static int -memif_socket_start_listening (memif_socket_t * ms) +int +memif_poll_cancel_handler (memif_fd_event_type_t type, void *private_ctx) { - libmemif_main_t *lm = get_libmemif_main (ms); - memif_list_elt_t elt; - struct stat file_stat; - struct sockaddr_un un = { 0 }; - int on = 1; - int err = MEMIF_ERR_SUCCESS; + return MEMIF_ERR_POLL_CANCEL; +} + +int +memif_connect_handler (memif_fd_event_type_t type, void *private_ctx) +{ + memif_socket_t *ms = (memif_socket_t *) private_ctx; + memif_connection_t *c; - if (ms->type == MEMIF_SOCKET_TYPE_CLIENT) + /* loop ms->slave_interfaces and request connection for disconnected ones */ + TAILQ_FOREACH (c, &ms->slave_interfaces, next) + { + /* connected or connecting */ + if (c->control_channel != NULL) + continue; + + /* ignore errors */ + memif_request_connection (c); + } + + return MEMIF_ERR_SUCCESS; +} + +int +memif_set_connection_request_timer (memif_socket_handle_t sock, + struct itimerspec timer) +{ + memif_socket_t *ms = (memif_socket_t *) sock; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata; + void *ctx; + + if (ms == NULL) return MEMIF_ERR_INVAL_ARG; - /* check if file exists */ - if (stat ((char *) ms->filename, &file_stat) == 0) + if (ms->timer_fd < 0) { - if (S_ISSOCK (file_stat.st_mode)) - unlink ((char *) ms->filename); - else + /* only create new timer if there is a valid interval */ + if (timer.it_interval.tv_sec == 0 && timer.it_interval.tv_nsec == 0) + return MEMIF_ERR_SUCCESS; + + /* create timerfd */ + ms->timer_fd = timerfd_create (CLOCK_REALTIME, TFD_NONBLOCK); + if (ms->timer_fd < 0) return memif_syscall_error_handler (errno); - } - ms->fd = socket (AF_UNIX, SOCK_SEQPACKET, 0); - if (ms->fd < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } + /* start listening for events */ + fdata = ms->args.alloc (sizeof (*fdata)); + fdata->event_handler = memif_connect_handler; + fdata->private_ctx = ms; - DBG ("socket %d created", ms->fd); - un.sun_family = AF_UNIX; - strlcpy ((char *) un.sun_path, (char *) ms->filename, sizeof (un.sun_path)); - if (setsockopt (ms->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)) < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } - if (bind (ms->fd, (struct sockaddr *) &un, sizeof (un)) < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } - if (listen (ms->fd, 1) < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } - if (stat ((char *) ms->filename, &file_stat) < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } + fde.fd = ms->timer_fd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; - /* add socket to libmemif main */ - elt.key = ms->fd; - elt.data_struct = ms; - add_list_elt (lm, &elt, &lm->socket_list, &lm->socket_list_len); - /* if lm->private_ctx == lm event polling is done by libmemif */ - lm->control_fd_update (ms->fd, MEMIF_FD_EVENT_READ, lm->private_ctx); + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); + } - ms->type = MEMIF_SOCKET_TYPE_LISTENER; + ms->args.connection_request_timer = timer; - return err; + /* arm the timer */ + if (timerfd_settime (ms->timer_fd, 0, &ms->args.connection_request_timer, + NULL) < 0) + return memif_syscall_error_handler (errno); -error: - if (ms->fd > 0) - { - close (ms->fd); - ms->fd = -1; - } - return err; + return MEMIF_ERR_SUCCESS; } int -memif_create_socket (memif_socket_handle_t * sock, const char *filename, +memif_create_socket (memif_socket_handle_t *sock, memif_socket_args_t *args, void *private_ctx) { - libmemif_main_t *lm = &libmemif_main; memif_socket_t *ms = (memif_socket_t *) * sock; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata; int i, err = MEMIF_ERR_SUCCESS; - - for (i = 0; i < lm->socket_list_len; i++) - { - if ((ms = (memif_socket_t *) lm->socket_list[i].data_struct) != NULL) - { - if (strncmp ((char *) ms->filename, filename, - strlen ((char *) ms->filename)) == 0) - return MEMIF_ERR_INVAL_ARG; - } - } + void *ctx; /* allocate memif_socket_t */ ms = NULL; - ms = lm->alloc (sizeof (memif_socket_t)); + if (args->alloc != NULL) + ms = args->alloc (sizeof (memif_socket_t)); + else + ms = malloc (sizeof (memif_socket_t)); if (ms == NULL) { err = MEMIF_ERR_NOMEM; goto error; } + + /* default values */ memset (ms, 0, sizeof (memif_socket_t)); - /* set filename */ - memset (ms->filename, 0, sizeof (ms->filename)); - strlcpy ((char *) ms->filename, filename, sizeof (ms->filename)); + ms->epfd = -1; + ms->listener_fd = -1; + ms->poll_cancel_fd = -1; + ms->timer_fd = -1; + + /* copy arguments to internal struct */ + memcpy (&ms->args, args, sizeof (*args)); + /* Handle abstract socket by converting '@' -> '\0' */ + if (ms->args.path[0] == '@') + ms->args.path[0] = '\0'; + ms->private_ctx = private_ctx; + + if (ms->args.alloc == NULL) + memif_alloc_register (ms, malloc); + if (ms->args.realloc == NULL) + memif_realloc_register (ms, realloc); + if (ms->args.free == NULL) + memif_free_register (ms, free); + + TAILQ_INIT (&ms->master_interfaces); + TAILQ_INIT (&ms->slave_interfaces); + + /* FIXME: implement connection request timer */ + + /* initialize internal epoll */ + if (ms->args.on_control_fd_update == NULL) + { + ms->epfd = epoll_create (1); + /* register default fd update callback */ + memif_control_fd_update_register (ms, memif_control_fd_update); + ms->poll_cancel_fd = eventfd (0, EFD_NONBLOCK); + if (ms->poll_cancel_fd < 0) + { + err = errno; + DBG ("eventfd: %s", strerror (err)); + return memif_syscall_error_handler (err); + } + /* add interrupt fd to epfd */ + fdata = ms->args.alloc (sizeof (*fdata)); + fdata->event_handler = memif_poll_cancel_handler; + fdata->private_ctx = ms; - ms->type = MEMIF_SOCKET_TYPE_NONE; + fde.fd = ms->poll_cancel_fd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; - ms->interface_list_len = 1; - ms->interface_list = - lm->alloc (sizeof (memif_list_elt_t) * ms->interface_list_len); - if (ms->interface_list == NULL) - { - err = MEMIF_ERR_NOMEM; - goto error; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); } - ms->interface_list[0].key = -1; - ms->interface_list[0].data_struct = NULL; + + err = + memif_set_connection_request_timer (ms, ms->args.connection_request_timer); + if (err != MEMIF_ERR_SUCCESS) + goto error; *sock = ms; @@ -782,35 +555,86 @@ memif_create_socket (memif_socket_handle_t * sock, const char *filename, error: if (ms != NULL) { - if (ms->fd > 0) - { - close (ms->fd); - ms->fd = -1; - } - if (ms->interface_list != NULL) - { - lm->free (ms->interface_list); - ms->interface_list = NULL; - ms->interface_list_len = 0; - } - lm->free (ms); - *sock = ms = NULL; + ms->args.free (ms); + if (ms->epfd != -1) + close (ms->epfd); + if (ms->poll_cancel_fd != -1) + close (ms->poll_cancel_fd); } return err; } +memif_socket_handle_t +memif_get_socket_handle (memif_conn_handle_t conn) +{ + memif_connection_t *c = (memif_connection_t *) conn; + + if (c == NULL) + return NULL; + + return c->args.socket; +} + +const char * +memif_get_socket_path (memif_socket_handle_t sock) +{ + memif_socket_t *ms = (memif_socket_t *) sock; + + if (ms == NULL) + return NULL; + + return ms->args.path; +} + +int +memif_get_listener_fd (memif_socket_handle_t sock) +{ + memif_socket_t *ms = (memif_socket_t *) sock; + + if (ms == NULL) + return -1; + + return ms->listener_fd; +} int -memif_create (memif_conn_handle_t * c, memif_conn_args_t * args, - memif_connection_update_t * on_connect, - memif_connection_update_t * on_disconnect, - memif_interrupt_t * on_interrupt, void *private_ctx) +memif_set_listener_fd (memif_socket_handle_t sock, int fd) +{ + memif_socket_t *ms = (memif_socket_t *) sock; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata; + void *ctx; + + if ((ms == NULL) || (fd < 0)) + return MEMIF_ERR_INVAL_ARG; + + fdata = ms->args.alloc (sizeof (*fdata)); + if (fdata == NULL) + return MEMIF_ERR_NOMEM; + + ms->listener_fd = fd; + + fdata->event_handler = memif_listener_handler; + fdata->private_ctx = ms; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + /* send fd to epoll */ + fde.fd = ms->listener_fd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; + ms->args.on_control_fd_update (fde, ctx); + + return MEMIF_ERR_SUCCESS; +} + +int +memif_create (memif_conn_handle_t *c, memif_conn_args_t *args, + memif_connection_update_t *on_connect, + memif_connection_update_t *on_disconnect, + memif_on_interrupt_t *on_interrupt, void *private_ctx) { - libmemif_main_t *lm = get_libmemif_main (args->socket); int err, index = 0; - memif_list_elt_t elt; memif_connection_t *conn = (memif_connection_t *) * c; - memif_socket_t *ms; + memif_socket_t *ms = (memif_socket_t *) args->socket; if (conn != NULL) { @@ -818,7 +642,13 @@ memif_create (memif_conn_handle_t * c, memif_conn_args_t * args, return MEMIF_ERR_CONN; } - conn = (memif_connection_t *) lm->alloc (sizeof (memif_connection_t)); + if (ms == NULL) + { + DBG ("Missing memif socket"); + return MEMIF_ERR_INVAL_ARG; + } + + conn = (memif_connection_t *) ms->args.alloc (sizeof (*conn)); if (conn == NULL) { err = MEMIF_ERR_NOMEM; @@ -848,92 +678,47 @@ memif_create (memif_conn_handle_t * c, memif_conn_args_t * args, conn->args.log2_ring_size = args->log2_ring_size; conn->args.is_master = args->is_master; conn->args.mode = args->mode; - conn->msg_queue = NULL; + conn->args.socket = args->socket; conn->regions = NULL; conn->tx_queues = NULL; conn->rx_queues = NULL; - conn->fd = -1; + conn->control_channel = NULL; conn->on_connect = on_connect; conn->on_disconnect = on_disconnect; conn->on_interrupt = on_interrupt; conn->private_ctx = private_ctx; memset (&conn->run_args, 0, sizeof (memif_conn_run_args_t)); + uint8_t l = sizeof (conn->args.interface_name); strlcpy ((char *) conn->args.interface_name, (char *) args->interface_name, - sizeof (conn->args.interface_name)); + l); - if ((strlen ((char *) args->secret)) > 0) + if ((l = strlen ((char *) args->secret)) > 0) strlcpy ((char *) conn->args.secret, (char *) args->secret, sizeof (conn->args.secret)); - if (args->socket != NULL) - conn->args.socket = args->socket; - else if (lm->default_socket != NULL) - conn->args.socket = lm->default_socket; + if (args->is_master) + TAILQ_INSERT_TAIL (&ms->master_interfaces, conn, next); else - { - err = MEMIF_ERR_INVAL_ARG; - goto error; - } + TAILQ_INSERT_TAIL (&ms->slave_interfaces, conn, next); - ms = (memif_socket_t *) conn->args.socket; - - if ((conn->args.is_master && ms->type == MEMIF_SOCKET_TYPE_CLIENT) || - (!conn->args.is_master && ms->type == MEMIF_SOCKET_TYPE_LISTENER)) + err = memif_request_connection (conn); + if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_CONNREFUSED) { - err = MEMIF_ERR_INVAL_ARG; + if (args->is_master) + TAILQ_REMOVE (&ms->master_interfaces, conn, next); + else + TAILQ_REMOVE (&ms->slave_interfaces, conn, next); goto error; } - elt.key = conn->args.interface_id; - elt.data_struct = conn; - add_list_elt (lm, &elt, &ms->interface_list, &ms->interface_list_len); - ms->use_count++; - - if (conn->args.is_master) - { - if (ms->type == MEMIF_SOCKET_TYPE_NONE) - { - err = memif_socket_start_listening (ms); - if (err != MEMIF_ERR_SUCCESS) - goto error; - } - } - else - { - elt.key = -1; - elt.data_struct = conn; - if ((index = - add_list_elt (lm, &elt, &lm->control_list, - &lm->control_list_len)) < 0) - { - err = MEMIF_ERR_NOMEM; - goto error; - } - - conn->index = index; - - /* try connecting to master */ - err = memif_request_connection (conn); - if ((err != MEMIF_ERR_SUCCESS) && (lm->disconn_slaves == 0)) - { - /* connection failed, arm reconnect timer (if not armed) */ - if (timerfd_settime (lm->timerfd, 0, &lm->arm, NULL) < 0) - { - err = memif_syscall_error_handler (errno); - goto error; - } - } - lm->disconn_slaves++; - } - *c = conn; return 0; error: if (conn != NULL) - lm->free (conn); + ms->args.free (conn); *c = conn = NULL; return err; } @@ -942,23 +727,29 @@ int memif_request_connection (memif_conn_handle_t c) { memif_connection_t *conn = (memif_connection_t *) c; - libmemif_main_t *lm; memif_socket_t *ms; int err = MEMIF_ERR_SUCCESS; int sockfd = -1; - struct sockaddr_un sun; + struct sockaddr_un un = { 0 }; + struct stat file_stat; + int on = 1; + memif_control_channel_t *cc = NULL; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata = NULL; + void *ctx; if (conn == NULL) return MEMIF_ERR_NOCONN; ms = (memif_socket_t *) conn->args.socket; - lm = get_libmemif_main (ms); - - if (conn->args.is_master || ms->type == MEMIF_SOCKET_TYPE_LISTENER) - return MEMIF_ERR_INVAL_ARG; - if (conn->fd > 0) + /* if control channel is assigned, the interface is either connected or + * connecting */ + if (conn->control_channel != NULL) return MEMIF_ERR_ALRCONN; + /* if interface is master and the socket is already listener we are done */ + if (conn->args.is_master && (ms->listener_fd != -1)) + return MEMIF_ERR_SUCCESS; sockfd = socket (AF_UNIX, SOCK_SEQPACKET, 0); if (sockfd < 0) @@ -967,41 +758,90 @@ memif_request_connection (memif_conn_handle_t c) goto error; } - sun.sun_family = AF_UNIX; + un.sun_family = AF_UNIX; - strlcpy (sun.sun_path, (char *) ms->filename, sizeof (sun.sun_path)); + /* use memcpy to support abstract socket + * ms->args.path is already a valid socket path + */ + memcpy (un.sun_path, ms->args.path, sizeof (un.sun_path) - 1); - if (connect (sockfd, (struct sockaddr *) &sun, - sizeof (struct sockaddr_un)) == 0) + /* allocate fd event data */ + fdata = ms->args.alloc (sizeof (*fdata)); + if (fdata == NULL) { - conn->fd = sockfd; - conn->read_fn = memif_conn_fd_read_ready; - conn->write_fn = memif_conn_fd_write_ready; - conn->error_fn = memif_conn_fd_error; - - lm->control_list[conn->index].key = conn->fd; - lm->control_fd_update (sockfd, - MEMIF_FD_EVENT_READ | - MEMIF_FD_EVENT_WRITE, lm->private_ctx); - - lm->disconn_slaves--; - if (lm->disconn_slaves == 0) + err = MEMIF_ERR_NOMEM; + goto error; + } + + if (conn->args.is_master != 0) + { + /* Configure socket optins */ + if (setsockopt (sockfd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on)) < 0) + { + err = memif_syscall_error_handler (errno); + goto error; + } + if (bind (sockfd, (struct sockaddr *) &un, sizeof (un)) < 0) + { + err = memif_syscall_error_handler (errno); + goto error; + } + if (listen (sockfd, 1) < 0) + { + err = memif_syscall_error_handler (errno); + goto error; + } + if (ms->args.path[0] != '\0') { - if (timerfd_settime (lm->timerfd, 0, &lm->disarm, NULL) < 0) + /* Verify that the socket was created */ + if (stat ((char *) ms->args.path, &file_stat) < 0) { err = memif_syscall_error_handler (errno); - return err; + goto error; } } + + /* assign listener fd */ + ms->listener_fd = sockfd; + + fdata->event_handler = memif_listener_handler; + fdata->private_ctx = ms; } else { - err = memif_syscall_error_handler (errno); - strcpy ((char *) conn->remote_disconnect_string, memif_strerror (err)); - goto error; + cc = ms->args.alloc (sizeof (*cc)); + if (cc == NULL) + { + err = MEMIF_ERR_NOMEM; + goto error; + } + if (connect (sockfd, (struct sockaddr *) &un, + sizeof (struct sockaddr_un)) != 0) + { + err = MEMIF_ERR_CONNREFUSED; + goto error; + } + + /* Create control channel */ + cc->fd = sockfd; + cc->sock = ms; + cc->conn = conn; + TAILQ_INIT (&cc->msg_queue); + + /* assign control channel to endpoint */ + conn->control_channel = cc; + + fdata->event_handler = memif_control_channel_handler; + fdata->private_ctx = cc; } - ms->type = MEMIF_SOCKET_TYPE_CLIENT; + /* if event polling is done internally, send memif socket as context */ + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + /* send fd to epoll */ + fde.fd = sockfd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; + ms->args.on_control_fd_update (fde, ctx); return err; @@ -1009,136 +849,55 @@ error: if (sockfd > 0) close (sockfd); sockfd = -1; + if (fdata != NULL) + ms->args.free (fdata); + fdata = NULL; + if (cc != NULL) + ms->args.free (cc); + conn->control_channel = cc = NULL; return err; } int -memif_control_fd_handler (int fd, uint8_t events) +memif_control_fd_handler (void *ptr, memif_fd_event_type_t events) { - int i, err = MEMIF_ERR_SUCCESS; /* 0 */ - uint16_t num; - memif_list_elt_t *e = NULL; - memif_connection_t *conn; - libmemif_main_t *lm = &libmemif_main; - if (fd == lm->timerfd) - { - uint64_t b; - ssize_t size; - size = read (fd, &b, sizeof (b)); + memif_fd_event_data_t *fdata = (memif_fd_event_data_t *) ptr; - if (size == -1) - goto error; - - for (i = 0; i < lm->control_list_len; i++) - { - if ((lm->control_list[i].key < 0) - && (lm->control_list[i].data_struct != NULL)) - { - conn = lm->control_list[i].data_struct; - if (conn->args.is_master) - continue; - err = memif_request_connection (conn); - if (err != MEMIF_ERR_SUCCESS) - DBG ("memif_request_connection: %s", memif_strerror (err)); - } - } - } - else - { - get_list_elt (&e, lm->interrupt_list, lm->interrupt_list_len, fd); - if (e != NULL) - { - if (((memif_connection_t *) e->data_struct)->on_interrupt != NULL) - { - num = - (((memif_connection_t *) e->data_struct)-> - args.is_master) ? ((memif_connection_t *) e-> - data_struct)->run_args. - num_s2m_rings : ((memif_connection_t *) e->data_struct)-> - run_args.num_m2s_rings; - for (i = 0; i < num; i++) - { - if (((memif_connection_t *) e->data_struct)-> - rx_queues[i].int_fd == fd) - { - ((memif_connection_t *) e->data_struct)-> - on_interrupt ((void *) e->data_struct, - ((memif_connection_t *) e-> - data_struct)->private_ctx, i); - return MEMIF_ERR_SUCCESS; - } - } - } - return MEMIF_ERR_SUCCESS; - } - get_list_elt (&e, lm->socket_list, lm->socket_list_len, fd); - if (e != NULL - && ((memif_socket_t *) e->data_struct)->type == - MEMIF_SOCKET_TYPE_LISTENER) - { - err = - memif_conn_fd_accept_ready ((memif_socket_t *) e->data_struct); - return err; - } + if (fdata == NULL) + return MEMIF_ERR_INVAL_ARG; - get_list_elt (&e, lm->pending_list, lm->pending_list_len, fd); - if (e != NULL) - { - err = memif_read_ready (lm, fd); - return err; - } + return fdata->event_handler (events, fdata->private_ctx); +} - get_list_elt (&e, lm->control_list, lm->control_list_len, fd); - if (e != NULL) - { - if (events & MEMIF_FD_EVENT_READ) - { - err = - ((memif_connection_t *) e->data_struct)-> - read_fn (e->data_struct); - if (err != MEMIF_ERR_SUCCESS) - return err; - } - if (events & MEMIF_FD_EVENT_WRITE) - { - err = - ((memif_connection_t *) e->data_struct)-> - write_fn (e->data_struct); - if (err != MEMIF_ERR_SUCCESS) - return err; - } - if (events & MEMIF_FD_EVENT_ERROR) - { - err = - ((memif_connection_t *) e->data_struct)-> - error_fn (e->data_struct); - if (err != MEMIF_ERR_SUCCESS) - return err; - } - } - } +int +memif_interrupt_handler (memif_fd_event_type_t type, void *private_ctx) +{ + memif_interrupt_t *idata = (memif_interrupt_t *) private_ctx; - return MEMIF_ERR_SUCCESS; /* 0 */ + if (idata == NULL) + return MEMIF_ERR_INVAL_ARG; -error: - return err; + return idata->c->on_interrupt (idata->c, idata->c->private_ctx, idata->qid); } - int -memif_poll_event (int timeout) +memif_poll_event (memif_socket_handle_t sock, int timeout) { - libmemif_main_t *lm = &libmemif_main; + memif_socket_t *ms = (memif_socket_t *) sock; struct epoll_event evt; int en = 0, err = MEMIF_ERR_SUCCESS; /* 0 */ - uint32_t events = 0; + memif_fd_event_type_t events = 0; uint64_t counter = 0; ssize_t r = 0; + sigset_t sigset; + + if (ms == NULL) + return MEMIF_ERR_INVAL_ARG; + memset (&evt, 0, sizeof (evt)); evt.events = EPOLLIN | EPOLLOUT; - sigset_t sigset; sigemptyset (&sigset); - en = epoll_pwait (lm->epfd, &evt, 1, timeout, &sigset); + en = epoll_pwait (ms->epfd, &evt, 1, timeout, &sigset); if (en < 0) { err = errno; @@ -1147,51 +906,58 @@ memif_poll_event (int timeout) } if (en > 0) { - if (evt.data.fd == lm->poll_cancel_fd) - { - r = read (evt.data.fd, &counter, sizeof (counter)); - if (r == -1) - return MEMIF_ERR_DISCONNECTED; - - return MEMIF_ERR_POLL_CANCEL; - } if (evt.events & EPOLLIN) events |= MEMIF_FD_EVENT_READ; if (evt.events & EPOLLOUT) events |= MEMIF_FD_EVENT_WRITE; if (evt.events & EPOLLERR) events |= MEMIF_FD_EVENT_ERROR; - err = memif_control_fd_handler (evt.data.fd, events); - return err; + return memif_control_fd_handler (evt.data.ptr, events); } - return 0; + return MEMIF_ERR_SUCCESS; } int -memif_cancel_poll_event () +memif_cancel_poll_event (memif_socket_handle_t sock) { - libmemif_main_t *lm = &libmemif_main; + memif_socket_t *ms = (memif_socket_t *) sock; uint64_t counter = 1; ssize_t w = 0; - if (lm->poll_cancel_fd == -1) - return 0; - w = write (lm->poll_cancel_fd, &counter, sizeof (counter)); + if (ms->poll_cancel_fd == -1) + return MEMIF_ERR_INVAL_ARG; + w = write (ms->poll_cancel_fd, &counter, sizeof (counter)); if (w < sizeof (counter)) return MEMIF_ERR_INT_WRITE; - return 0; + return MEMIF_ERR_SUCCESS; } -static void -memif_msg_queue_free (libmemif_main_t * lm, memif_msg_queue_elt_t ** e) +void +memif_close_queues (memif_socket_t *ms, memif_queue_t *queues, int nqueues) { - if (*e == NULL) - return; - memif_msg_queue_free (lm, &(*e)->next); - lm->free (*e); - *e = NULL; - return; + memif_fd_event_t fde; + memif_queue_t *mq; + void *ctx; + + int i; + for (i = 0; i < nqueues; i++) + { + mq = &queues[i]; + if (mq != NULL) + { + if (mq->int_fd > 0) + { + /* Stop listening for events */ + fde.fd = mq->int_fd; + fde.type = MEMIF_FD_EVENT_DEL; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); + close (mq->int_fd); + } + mq->int_fd = -1; + } + } } /* send disconnect msg and close interface */ @@ -1200,85 +966,43 @@ memif_disconnect_internal (memif_connection_t * c) { int err = MEMIF_ERR_SUCCESS, i; /* 0 */ memif_queue_t *mq; - libmemif_main_t *lm; - memif_list_elt_t *e; - - if (c == NULL) - { - DBG ("no connection"); - return MEMIF_ERR_NOCONN; - } - - lm = get_libmemif_main (c->args.socket); + memif_socket_t *ms = (memif_socket_t *) c->args.socket; + memif_fd_event_t fde; + void *ctx; c->on_disconnect ((void *) c, c->private_ctx); - if (c->fd > 0) - { - memif_msg_send_disconnect (c->fd, (uint8_t *) "interface deleted", 0); - lm->control_fd_update (c->fd, MEMIF_FD_EVENT_DEL, lm->private_ctx); - close (c->fd); - } - get_list_elt (&e, lm->control_list, lm->control_list_len, c->fd); - if (e != NULL) - { - if (c->args.is_master) - free_list_elt (lm->control_list, lm->control_list_len, c->fd); - e->key = c->fd = -1; - } + /* Delete control channel */ + if (c->control_channel != NULL) + memif_delete_control_channel (c->control_channel); if (c->tx_queues != NULL) { - for (i = 0; i < c->tx_queues_num; i++) - { - mq = &c->tx_queues[i]; - if (mq != NULL) - { - if (mq->int_fd > 0) - close (mq->int_fd); - free_list_elt (lm->interrupt_list, lm->interrupt_list_len, - mq->int_fd); - mq->int_fd = -1; - } - } - lm->free (c->tx_queues); + memif_close_queues (ms, c->tx_queues, c->tx_queues_num); + ms->args.free (c->tx_queues); c->tx_queues = NULL; } c->tx_queues_num = 0; if (c->rx_queues != NULL) { - for (i = 0; i < c->rx_queues_num; i++) - { - mq = &c->rx_queues[i]; - if (mq != NULL) - { - if (mq->int_fd > 0) - { - if (c->on_interrupt != NULL) - lm->control_fd_update (mq->int_fd, MEMIF_FD_EVENT_DEL, - lm->private_ctx); - close (mq->int_fd); - } - free_list_elt (lm->interrupt_list, lm->interrupt_list_len, - mq->int_fd); - mq->int_fd = -1; - } - } - lm->free (c->rx_queues); + memif_close_queues (ms, c->rx_queues, c->rx_queues_num); + ms->args.free (c->rx_queues); c->rx_queues = NULL; } c->rx_queues_num = 0; + /* TODO: Slave reuse regions */ + for (i = 0; i < c->regions_num; i++) { if (&c->regions[i] == NULL) continue; if (c->regions[i].is_external != 0) { - lm->del_external_region (c->regions[i].addr, - c->regions[i].region_size, - c->regions[i].fd, c->private_ctx); + ms->del_external_region (c->regions[i].addr, + c->regions[i].region_size, c->regions[i].fd, + c->private_ctx); } else { @@ -1289,27 +1013,12 @@ memif_disconnect_internal (memif_connection_t * c) c->regions[i].fd = -1; } } - lm->free (c->regions); + ms->args.free (c->regions); c->regions = NULL; c->regions_num = 0; memset (&c->run_args, 0, sizeof (memif_conn_run_args_t)); - memif_msg_queue_free (lm, &c->msg_queue); - - if (!(c->args.is_master)) - { - if (lm->disconn_slaves == 0) - { - if (timerfd_settime (lm->timerfd, 0, &lm->arm, NULL) < 0) - { - err = memif_syscall_error_handler (errno); - DBG ("timerfd_settime: arm"); - } - } - lm->disconn_slaves++; - } - return err; } @@ -1321,24 +1030,44 @@ memif_get_socket_filename (memif_socket_handle_t sock) if (ms == NULL) return NULL; - return (char *) ms->filename; + return (char *) ms->args.path; } int memif_delete_socket (memif_socket_handle_t * sock) { memif_socket_t *ms = (memif_socket_t *) * sock; - libmemif_main_t *lm; + memif_fd_event_t fde; + void *ctx; /* check if socket is in use */ - if (ms == NULL || ms->use_count > 0) + if (ms == NULL || !TAILQ_EMPTY (&ms->master_interfaces) || + !TAILQ_EMPTY (&ms->slave_interfaces)) return MEMIF_ERR_INVAL_ARG; - lm = get_libmemif_main (ms); + if (ms->listener_fd > 0) + { + fde.fd = ms->listener_fd; + fde.type = MEMIF_FD_EVENT_DEL; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); + } + ms->listener_fd = -1; + + if (ms->poll_cancel_fd > 0) + { + fde.fd = ms->poll_cancel_fd; + fde.type = MEMIF_FD_EVENT_DEL; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); + } + ms->poll_cancel_fd = -1; + + if (ms->epfd > 0) + close (ms->epfd); + ms->epfd = -1; - lm->free (ms->interface_list); - ms->interface_list = NULL; - lm->free (ms); + ms->args.free (ms); *sock = ms = NULL; return MEMIF_ERR_SUCCESS; @@ -1348,8 +1077,7 @@ int memif_delete (memif_conn_handle_t * conn) { memif_connection_t *c = (memif_connection_t *) * conn; - libmemif_main_t *lm; - memif_socket_t *ms = NULL; + memif_socket_t *ms; int err = MEMIF_ERR_SUCCESS; if (c == NULL) @@ -1358,50 +1086,17 @@ memif_delete (memif_conn_handle_t * conn) return MEMIF_ERR_NOCONN; } - if (c->fd > 0) - { - DBG ("DISCONNECTING"); - err = memif_disconnect_internal (c); - if (err == MEMIF_ERR_NOCONN) - return err; - } - - lm = get_libmemif_main (c->args.socket); - - free_list_elt_ctx (lm->control_list, lm->control_list_len, c); + err = memif_disconnect_internal (c); ms = (memif_socket_t *) c->args.socket; - ms->use_count--; - free_list_elt (ms->interface_list, ms->interface_list_len, - c->args.interface_id); - if (ms->use_count <= 0) - { - /* stop listening on this socket */ - if (ms->type == MEMIF_SOCKET_TYPE_LISTENER) - { - lm->control_fd_update (ms->fd, MEMIF_FD_EVENT_DEL, lm->private_ctx); - free_list_elt (lm->socket_list, lm->socket_list_len, ms->fd); - close (ms->fd); - ms->fd = -1; - } - /* socket not in use */ - ms->type = MEMIF_SOCKET_TYPE_NONE; - } - if (!c->args.is_master) - { - lm->disconn_slaves--; - if (lm->disconn_slaves <= 0) - { - if (timerfd_settime (lm->timerfd, 0, &lm->disarm, NULL) < 0) - { - err = memif_syscall_error_handler (errno); - DBG ("timerfd_settime: disarm"); - } - } - } + if (c->args.is_master) + TAILQ_REMOVE (&ms->master_interfaces, c, next); + else + TAILQ_REMOVE (&ms->slave_interfaces, c, next); + /* TODO: don't listen with empty interface queue */ - lm->free (c); + ms->args.free (c); c = NULL; *conn = c; @@ -1411,7 +1106,7 @@ memif_delete (memif_conn_handle_t * conn) int memif_connect1 (memif_connection_t * c) { - libmemif_main_t *lm; + memif_socket_t *ms; memif_region_t *mr; memif_queue_t *mq; int i; @@ -1419,7 +1114,7 @@ memif_connect1 (memif_connection_t * c) if (c == NULL) return MEMIF_ERR_INVAL_ARG; - lm = get_libmemif_main (c->args.socket); + ms = (memif_socket_t *) c->args.socket; for (i = 0; i < c->regions_num; i++) { @@ -1430,11 +1125,10 @@ memif_connect1 (memif_connection_t * c) { if (mr->is_external) { - if (lm->get_external_region_addr == NULL) + if (ms->get_external_region_addr == NULL) return MEMIF_ERR_INVAL_ARG; - mr->addr = - lm->get_external_region_addr (mr->region_size, mr->fd, - c->private_ctx); + mr->addr = ms->get_external_region_addr ( + mr->region_size, mr->fd, c->private_ctx); } else { @@ -1442,8 +1136,8 @@ memif_connect1 (memif_connection_t * c) return MEMIF_ERR_NO_SHMFD; if ((mr->addr = - mmap (NULL, mr->region_size, PROT_READ | PROT_WRITE, - MAP_SHARED, mr->fd, 0)) == MAP_FAILED) + mmap (NULL, mr->region_size, PROT_READ | PROT_WRITE, + MAP_SHARED, mr->fd, 0)) == MAP_FAILED) { return memif_syscall_error_handler (errno); } @@ -1482,21 +1176,17 @@ memif_connect1 (memif_connection_t * c) } } - lm->control_fd_update (c->fd, MEMIF_FD_EVENT_READ | MEMIF_FD_EVENT_MOD, - lm->private_ctx); - return 0; } static inline int -memif_add_region (libmemif_main_t * lm, memif_connection_t * conn, - uint8_t has_buffers) +memif_add_region (memif_connection_t *conn, uint8_t has_buffers) { memif_region_t *r; + memif_socket_t *ms = (memif_socket_t *) conn->args.socket; - r = - lm->realloc (conn->regions, - sizeof (memif_region_t) * ++conn->regions_num); + r = ms->args.realloc (conn->regions, + sizeof (memif_region_t) * ++conn->regions_num); if (r == NULL) return MEMIF_ERR_NOMEM; @@ -1539,10 +1229,11 @@ memif_add_region (libmemif_main_t * lm, memif_connection_t * conn, } static inline int -memif_init_queues (libmemif_main_t * lm, memif_connection_t * conn) +memif_init_queues (memif_connection_t *conn) { int i, j; memif_ring_t *ring; + memif_socket_t *ms = (memif_socket_t *) conn->args.socket; for (i = 0; i < conn->run_args.num_s2m_rings; i++) { @@ -1580,23 +1271,17 @@ memif_init_queues (libmemif_main_t * lm, memif_connection_t * conn) } } memif_queue_t *mq; - DBG ("alloc: %p", lm->alloc); - DBG ("size: %lu", sizeof (memif_queue_t) * conn->run_args.num_s2m_rings); - mq = - (memif_queue_t *) lm->alloc (sizeof (memif_queue_t) * - conn->run_args.num_s2m_rings); + mq = (memif_queue_t *) ms->args.alloc (sizeof (memif_queue_t) * + conn->run_args.num_s2m_rings); if (mq == NULL) return MEMIF_ERR_NOMEM; int x; - memif_list_elt_t e; + for (x = 0; x < conn->run_args.num_s2m_rings; x++) { if ((mq[x].int_fd = eventfd (0, EFD_NONBLOCK)) < 0) return memif_syscall_error_handler (errno); - e.key = mq[x].int_fd; - e.data_struct = conn; - add_list_elt (lm, &e, &lm->interrupt_list, &lm->interrupt_list_len); mq[x].ring = memif_get_ring (conn, MEMIF_RING_S2M, x); DBG ("RING: %p I: %d", mq[x].ring, x); @@ -1610,9 +1295,8 @@ memif_init_queues (libmemif_main_t * lm, memif_connection_t * conn) conn->tx_queues = mq; conn->tx_queues_num = conn->run_args.num_s2m_rings; - mq = - (memif_queue_t *) lm->alloc (sizeof (memif_queue_t) * - conn->run_args.num_m2s_rings); + mq = (memif_queue_t *) ms->args.alloc (sizeof (memif_queue_t) * + conn->run_args.num_m2s_rings); if (mq == NULL) return MEMIF_ERR_NOMEM; @@ -1620,9 +1304,6 @@ memif_init_queues (libmemif_main_t * lm, memif_connection_t * conn) { if ((mq[x].int_fd = eventfd (0, EFD_NONBLOCK)) < 0) return memif_syscall_error_handler (errno); - e.key = mq[x].int_fd; - e.data_struct = conn; - add_list_elt (lm, &e, &lm->interrupt_list, &lm->interrupt_list_len); mq[x].ring = memif_get_ring (conn, MEMIF_RING_M2S, x); DBG ("RING: %p I: %d", mq[x].ring, x); @@ -1643,23 +1324,16 @@ int memif_init_regions_and_queues (memif_connection_t * conn) { memif_region_t *r; - libmemif_main_t *lm; - - if (conn == NULL) - return MEMIF_ERR_INVAL_ARG; - - lm = get_libmemif_main (conn->args.socket); + memif_socket_t *ms = (memif_socket_t *) conn->args.socket; /* region 0. rings */ - memif_add_region (lm, conn, /* has_buffers */ 0); + memif_add_region (conn, /* has_buffers */ 0); /* region 1. buffers */ - if (lm->add_external_region) + if (ms->add_external_region) { - r = - (memif_region_t *) lm->realloc (conn->regions, - sizeof (memif_region_t) * - ++conn->regions_num); + r = (memif_region_t *) ms->args.realloc ( + conn->regions, sizeof (memif_region_t) * ++conn->regions_num); if (r == NULL) return MEMIF_ERR_NOMEM; conn->regions = r; @@ -1668,17 +1342,17 @@ memif_init_regions_and_queues (memif_connection_t * conn) conn->run_args.buffer_size * (1 << conn->run_args.log2_ring_size) * (conn->run_args.num_s2m_rings + conn->run_args.num_m2s_rings); conn->regions[1].buffer_offset = 0; - lm->add_external_region (&conn->regions[1].addr, + ms->add_external_region (&conn->regions[1].addr, conn->regions[1].region_size, &conn->regions[1].fd, conn->private_ctx); conn->regions[1].is_external = 1; } else { - memif_add_region (lm, conn, 1); + memif_add_region (conn, 1); } - memif_init_queues (lm, conn); + memif_init_queues (conn); return 0; } @@ -1772,7 +1446,7 @@ memif_buffer_enq_tx (memif_conn_handle_t conn, uint16_t qid, memif_connection_t *c = (memif_connection_t *) conn; if (EXPECT_FALSE (c == NULL)) return MEMIF_ERR_NOCONN; - if (EXPECT_FALSE (c->fd < 0)) + if (EXPECT_FALSE (c->control_channel == NULL)) return MEMIF_ERR_DISCONNECTED; if (EXPECT_FALSE (qid >= c->tx_queues_num)) return MEMIF_ERR_QID; @@ -1833,7 +1507,7 @@ memif_buffer_alloc (memif_conn_handle_t conn, uint16_t qid, memif_connection_t *c = (memif_connection_t *) conn; if (EXPECT_FALSE (c == NULL)) return MEMIF_ERR_NOCONN; - if (EXPECT_FALSE (c->fd < 0)) + if (EXPECT_FALSE (c->control_channel == NULL)) return MEMIF_ERR_DISCONNECTED; uint8_t num = (c->args.is_master) ? c->run_args.num_m2s_rings : c-> @@ -1843,7 +1517,7 @@ memif_buffer_alloc (memif_conn_handle_t conn, uint16_t qid, if (EXPECT_FALSE (!count_out)) return MEMIF_ERR_INVAL_ARG; - libmemif_main_t *lm = get_libmemif_main (c->args.socket); + memif_socket_t *ms = (memif_socket_t *) c->args.socket; memif_queue_t *mq = &c->tx_queues[qid]; memif_ring_t *ring = mq->ring; memif_buffer_t *b0; @@ -1855,6 +1529,7 @@ memif_buffer_alloc (memif_conn_handle_t conn, uint16_t qid, uint16_t dst_left, src_left; uint16_t saved_count; uint16_t saved_next_buf; + uint16_t slot; memif_buffer_t *saved_b; *count_out = 0; @@ -1917,9 +1592,9 @@ memif_buffer_alloc (memif_conn_handle_t conn, uint16_t qid, /* slave resets buffer offset */ if (c->args.is_master == 0) { - memif_desc_t *d = &ring->desc[mq->next_buf & mask]; - if (lm->get_external_buffer_offset) - d->offset = lm->get_external_buffer_offset (c->private_ctx); + memif_desc_t *d = &ring->desc[slot & mask]; + if (ms->get_external_buffer_offset) + d->offset = ms->get_external_buffer_offset (c->private_ctx); else d->offset = d->offset - (d->offset & offset_mask); } @@ -1956,14 +1631,14 @@ memif_refill_queue (memif_conn_handle_t conn, uint16_t qid, uint16_t count, memif_connection_t *c = (memif_connection_t *) conn; if (EXPECT_FALSE (c == NULL)) return MEMIF_ERR_NOCONN; - if (EXPECT_FALSE (c->fd < 0)) + if (EXPECT_FALSE (c->control_channel == NULL)) return MEMIF_ERR_DISCONNECTED; uint8_t num = (c->args.is_master) ? c->run_args.num_s2m_rings : c-> run_args.num_m2s_rings; if (EXPECT_FALSE (qid >= num)) return MEMIF_ERR_QID; - libmemif_main_t *lm = get_libmemif_main (c->args.socket); + memif_socket_t *ms = (memif_socket_t *) c->args.socket; memif_queue_t *mq = &c->rx_queues[qid]; memif_ring_t *ring = mq->ring; uint16_t mask = (1 << mq->log2_ring_size) - 1; @@ -1990,8 +1665,8 @@ memif_refill_queue (memif_conn_handle_t conn, uint16_t qid, uint16_t count, d = &ring->desc[slot & mask]; d->region = 1; d->length = c->run_args.buffer_size - headroom; - if (lm->get_external_buffer_offset) - d->offset = lm->get_external_buffer_offset (c->private_ctx); + if (ms->get_external_buffer_offset) + d->offset = ms->get_external_buffer_offset (c->private_ctx); else d->offset = d->offset - (d->offset & offset_mask) + headroom; slot++; @@ -2011,7 +1686,7 @@ memif_tx_burst (memif_conn_handle_t conn, uint16_t qid, memif_connection_t *c = (memif_connection_t *) conn; if (EXPECT_FALSE (c == NULL)) return MEMIF_ERR_NOCONN; - if (EXPECT_FALSE (c->fd < 0)) + if (EXPECT_FALSE (c->control_channel == NULL)) return MEMIF_ERR_DISCONNECTED; uint8_t num = (c->args.is_master) ? c->run_args.num_m2s_rings : c-> @@ -2112,7 +1787,7 @@ memif_rx_burst (memif_conn_handle_t conn, uint16_t qid, memif_connection_t *c = (memif_connection_t *) conn; if (EXPECT_FALSE (c == NULL)) return MEMIF_ERR_NOCONN; - if (EXPECT_FALSE (c->fd < 0)) + if (EXPECT_FALSE (c->control_channel == NULL)) return MEMIF_ERR_DISCONNECTED; uint8_t num = (c->args.is_master) ? c->run_args.num_s2m_rings : c-> @@ -2202,7 +1877,6 @@ memif_get_details (memif_conn_handle_t conn, memif_details_t * md, char *buf, ssize_t buflen) { memif_connection_t *c = (memif_connection_t *) conn; - libmemif_main_t *lm; memif_socket_t *ms; int err = MEMIF_ERR_SUCCESS, i; ssize_t l0 = 0, l1; @@ -2211,7 +1885,6 @@ memif_get_details (memif_conn_handle_t conn, memif_details_t * md, return MEMIF_ERR_NOCONN; ms = (memif_socket_t *) c->args.socket; - lm = get_libmemif_main (ms); l1 = strlen ((char *) c->args.interface_name); if (l0 + l1 < buflen) @@ -2223,10 +1896,11 @@ memif_get_details (memif_conn_handle_t conn, memif_details_t * md, else err = MEMIF_ERR_NOBUF_DET; - l1 = strlen ((char *) lm->app_name); + l1 = strlen ((char *) ms->args.app_name); if (l0 + l1 < buflen) { - md->inst_name = (uint8_t *) strcpy (buf + l0, (char *) lm->app_name); + md->inst_name = + (uint8_t *) strcpy (buf + l0, (char *) ms->args.app_name); l0 += l1 + 1; } else @@ -2269,12 +1943,15 @@ memif_get_details (memif_conn_handle_t conn, memif_details_t * md, md->role = (c->args.is_master) ? 0 : 1; md->mode = c->args.mode; - l1 = strlen ((char *) ms->filename); + l1 = 108; if (l0 + l1 < buflen) { - md->socket_filename = - (uint8_t *) strcpy (buf + l0, (char *) ms->filename); - l0 += l1 + 1; + md->socket_path = (uint8_t *) memcpy (buf + l0, ms->args.path, 108); + if (md->socket_path[0] == '\0') + { + md->socket_path[0] = '@'; + } + l0 += l1; } else err = MEMIF_ERR_NOBUF_DET; @@ -2353,7 +2030,9 @@ memif_get_details (memif_conn_handle_t conn, memif_details_t * md, else err = MEMIF_ERR_NOBUF_DET; - md->link_up_down = (c->fd > 0) ? 1 : 0; + /* This is not completely true, clients should relay on + * on_connect/on_disconnect callbacks */ + md->link_up_down = (c->control_channel != NULL) ? 1 : 0; return err; /* 0 */ } @@ -2367,7 +2046,7 @@ memif_get_queue_efd (memif_conn_handle_t conn, uint16_t qid, int *efd) *efd = -1; if (c == NULL) return MEMIF_ERR_NOCONN; - if (c->fd < 0) + if (c->control_channel == NULL) return MEMIF_ERR_DISCONNECTED; num = @@ -2380,31 +2059,3 @@ memif_get_queue_efd (memif_conn_handle_t conn, uint16_t qid, int *efd) return MEMIF_ERR_SUCCESS; } - -int -memif_cleanup () -{ - libmemif_main_t *lm = &libmemif_main; - int err; - - err = memif_delete_socket ((memif_socket_handle_t *) & lm->default_socket); - if (err != MEMIF_ERR_SUCCESS) - return err; - - if (lm->control_list) - lm->free (lm->control_list); - lm->control_list = NULL; - if (lm->interrupt_list) - lm->free (lm->interrupt_list); - lm->interrupt_list = NULL; - if (lm->socket_list) - lm->free (lm->socket_list); - lm->socket_list = NULL; - if (lm->pending_list) - lm->free (lm->pending_list); - lm->pending_list = NULL; - if (lm->poll_cancel_fd != -1) - close (lm->poll_cancel_fd); - - return MEMIF_ERR_SUCCESS; /* 0 */ -} diff --git a/extras/libmemif/src/memif_private.h b/extras/libmemif/src/memif_private.h index 9542936983e..71a4bc879f4 100644 --- a/extras/libmemif/src/memif_private.h +++ b/extras/libmemif/src/memif_private.h @@ -27,6 +27,7 @@ #include <limits.h> #include <sys/timerfd.h> #include <string.h> +#include <sys/queue.h> #include <memif.h> #include <libmemif.h> @@ -103,7 +104,7 @@ typedef enum typedef struct { void *addr; - uint32_t region_size; + memif_region_size_t region_size; uint32_t buffer_offset; int fd; uint8_t is_external; @@ -125,20 +126,10 @@ typedef struct uint32_t next_buf; /* points to next free buffer */ } memif_queue_t; -typedef struct memif_msg_queue_elt -{ - memif_msg_t msg; - int fd; - struct memif_msg_queue_elt *next; -} memif_msg_queue_elt_t; - struct memif_connection; typedef struct memif_connection memif_connection_t; -/* functions called by memif_control_fd_handler */ -typedef int (memif_fn) (memif_connection_t * conn); - typedef struct { uint8_t num_s2m_rings; @@ -147,7 +138,7 @@ typedef struct memif_log2_ring_size_t log2_ring_size; } memif_conn_run_args_t; -struct libmemif_main; +struct memif_control_channel; typedef struct memif_connection { @@ -155,17 +146,12 @@ typedef struct memif_connection memif_conn_args_t args; memif_conn_run_args_t run_args; - int fd; - - memif_fn *write_fn, *read_fn, *error_fn; + struct memif_control_channel *control_channel; memif_connection_update_t *on_connect, *on_disconnect; - memif_interrupt_t *on_interrupt; + memif_on_interrupt_t *on_interrupt; void *private_ctx; - /* connection message queue */ - memif_msg_queue_elt_t *msg_queue; - uint8_t remote_if_name[MEMIF_NAME_LEN]; uint8_t remote_name[MEMIF_NAME_LEN]; uint8_t remote_disconnect_string[96]; @@ -180,61 +166,85 @@ typedef struct memif_connection uint16_t flags; #define MEMIF_CONNECTION_FLAG_WRITE (1 << 0) + + TAILQ_ENTRY (memif_connection) next; } memif_connection_t; -typedef struct +/** \brief Memif message queue element + * @param msg - memif control message (defined in memif.h) + * @param nex - tailq entry + * @param fd - File descriptor to be shared with peer endpoint + */ +typedef struct memif_msg_queue_elt { - int key; - void *data_struct; -} memif_list_elt_t; + memif_msg_t msg; + TAILQ_ENTRY (memif_msg_queue_elt) next; + int fd; +} memif_msg_queue_elt_t; -typedef struct +struct memif_socket; + +/** \brief Memif control channel + * @param fd - fd used for communbication + * @param msg_queue - message queue + * @param conn - memif connection using this control channel + * @param sock - socket this control channel belongs to + * + * Memif controll channel represents one end of communication between two memif + * endpoints. The controll channel is responsible for receiving and + * transmitting memif control messages via UNIX domain socket. + */ +typedef struct memif_control_channel { int fd; - uint16_t use_count; - memif_socket_type_t type; - uint8_t filename[108]; - /* unique database */ - struct libmemif_main *lm; - uint16_t interface_list_len; - void *private_ctx; - memif_list_elt_t *interface_list; /* memif master interfaces listening on this socket */ -} memif_socket_t; - -typedef struct libmemif_main + TAILQ_HEAD (, memif_msg_queue_elt) msg_queue; + memif_connection_t *conn; + struct memif_socket *sock; +} memif_control_channel_t; + +/** \brief Memif socket + * @param args - memif socket arguments (from libmemif.h) + * @param epfd - epoll fd, used for internal fd polling + * @param poll_cancel_fd - if event is received on this fd, interrupt polling + * @param listener_fd - listener fd if this socket is listener else -1 + * @param private_ctx - private context + * @param master_interfaces - master interface queue + * @param slave_interfaces - slave interface queue + * @param control_channels - controll channel queue + */ +typedef struct memif_socket { - memif_control_fd_update_t *control_fd_update; - int timerfd; + memif_socket_args_t args; int epfd; int poll_cancel_fd; - struct itimerspec arm, disarm; - uint16_t disconn_slaves; - uint8_t app_name[MEMIF_NAME_LEN]; - + int listener_fd; + int timer_fd; + struct itimerspec timer; void *private_ctx; + TAILQ_HEAD (, memif_connection) master_interfaces; + TAILQ_HEAD (, memif_connection) slave_interfaces; - memif_socket_handle_t default_socket; - + /* External region callbacks */ memif_add_external_region_t *add_external_region; memif_get_external_region_addr_t *get_external_region_addr; memif_del_external_region_t *del_external_region; memif_get_external_buffer_offset_t *get_external_buffer_offset; +} memif_socket_t; - memif_alloc_t *alloc; - memif_realloc_t *realloc; - memif_free_t *free; +typedef int (memif_fd_event_handler_t) (memif_fd_event_type_t type, + void *private_ctx); - uint16_t control_list_len; - uint16_t interrupt_list_len; - uint16_t socket_list_len; - uint16_t pending_list_len; - memif_list_elt_t *control_list; - memif_list_elt_t *interrupt_list; - memif_list_elt_t *socket_list; - memif_list_elt_t *pending_list; -} libmemif_main_t; +typedef struct memif_fd_event_data +{ + memif_fd_event_handler_t *event_handler; + void *private_ctx; +} memif_fd_event_data_t; -extern libmemif_main_t libmemif_main; +typedef struct memif_interrupt +{ + memif_connection_t *c; + uint16_t qid; +} memif_interrupt_t; /* main.c */ @@ -246,19 +256,11 @@ int memif_init_regions_and_queues (memif_connection_t * c); int memif_disconnect_internal (memif_connection_t * c); +int memif_interrupt_handler (memif_fd_event_type_t type, void *private_ctx); + /* map errno to memif error code */ int memif_syscall_error_handler (int err_code); -int add_list_elt (libmemif_main_t *lm, memif_list_elt_t * e, memif_list_elt_t ** list, - uint16_t * len); - -int get_list_elt (memif_list_elt_t ** e, memif_list_elt_t * list, - uint16_t len, int key); - -int free_list_elt (memif_list_elt_t * list, uint16_t len, int key); - -libmemif_main_t *get_libmemif_main (memif_socket_t * ms); - #ifndef __NR_memfd_create #if defined __x86_64__ #define __NR_memfd_create 319 diff --git a/extras/libmemif/src/socket.c b/extras/libmemif/src/socket.c index b801cac75ba..a9db7705fe9 100644 --- a/extras/libmemif/src/socket.c +++ b/extras/libmemif/src/socket.c @@ -35,20 +35,27 @@ #include <memif_private.h> /* sends msg to socket */ -static_fn int -memif_msg_send (int fd, memif_msg_t * msg, int afd) +static int +memif_msg_send_from_queue (memif_control_channel_t *cc) { struct msghdr mh = { 0 }; struct iovec iov[1]; char ctl[CMSG_SPACE (sizeof (int))]; int rv, err = MEMIF_ERR_SUCCESS; /* 0 */ + memif_msg_queue_elt_t *e; - iov[0].iov_base = (void *) msg; + /* Pick the first message */ + e = TAILQ_FIRST (&cc->msg_queue); + if (e == NULL) + return MEMIF_ERR_SUCCESS; + + /* Construct the message */ + iov[0].iov_base = (void *) &e->msg; iov[0].iov_len = sizeof (memif_msg_t); mh.msg_iov = iov; mh.msg_iovlen = 1; - if (afd > 0) + if (e->fd > 0) { struct cmsghdr *cmsg; memset (&ctl, 0, sizeof (ctl)); @@ -58,52 +65,50 @@ memif_msg_send (int fd, memif_msg_t * msg, int afd) cmsg->cmsg_len = CMSG_LEN (sizeof (int)); cmsg->cmsg_level = SOL_SOCKET; cmsg->cmsg_type = SCM_RIGHTS; - memcpy (CMSG_DATA (cmsg), &afd, sizeof (int)); + memcpy (CMSG_DATA (cmsg), &e->fd, sizeof (int)); } - rv = sendmsg (fd, &mh, 0); + rv = sendmsg (cc->fd, &mh, 0); if (rv < 0) err = memif_syscall_error_handler (errno); - DBG ("Message type %u sent", msg->type); + DBG ("Message type %u sent", e->msg.type); + + /* If sent successfully, remove the msg from queue */ + if (err == MEMIF_ERR_SUCCESS) + { + TAILQ_REMOVE (&cc->msg_queue, e, next); + cc->sock->args.free (e); + } + return err; } -/* response from memif master - master is ready to handle next message */ -static_fn int -memif_msg_enq_ack (memif_connection_t * c) +static memif_msg_queue_elt_t * +memif_msg_enq (memif_control_channel_t *cc) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); + memif_msg_queue_elt_t *e; + + e = cc->sock->args.alloc (sizeof (*e)); if (e == NULL) - return memif_syscall_error_handler (errno); + return NULL; - memset (&e->msg, 0, sizeof (e->msg)); - e->msg.type = MEMIF_MSG_TYPE_ACK; e->fd = -1; + TAILQ_INSERT_TAIL (&cc->msg_queue, e, next); - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } - - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; - - return MEMIF_ERR_SUCCESS; /* 0 */ + return e; } -static_fn int -memif_msg_send_hello (libmemif_main_t * lm, int fd) +static int +memif_msg_enq_hello (memif_control_channel_t *cc) { - memif_msg_t msg = { 0 }; - memif_msg_hello_t *h = &msg.hello; - msg.type = MEMIF_MSG_TYPE_HELLO; + memif_msg_hello_t *h; + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + + if (e == NULL) + return MEMIF_ERR_NOMEM; + + e->msg.type = MEMIF_MSG_TYPE_HELLO; + + h = &e->msg.hello; h->min_version = MEMIF_VERSION; h->max_version = MEMIF_VERSION; h->max_s2m_ring = MEMIF_MAX_S2M_RING; @@ -111,67 +116,63 @@ memif_msg_send_hello (libmemif_main_t * lm, int fd) h->max_region = MEMIF_MAX_REGION; h->max_log2_ring_size = MEMIF_MAX_LOG2_RING_SIZE; - strlcpy ((char *) h->name, (char *) lm->app_name, sizeof (h->name)); + strlcpy ((char *) h->name, (char *) cc->sock->args.app_name, + sizeof (h->name)); - /* msg hello is not enqueued but sent directly, - because it is the first msg to be sent */ - return memif_msg_send (fd, &msg, -1); + return MEMIF_ERR_SUCCESS; +} + +/* response from memif master - master is ready to handle next message */ +static int +memif_msg_enq_ack (memif_control_channel_t *cc) +{ + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + + if (e == NULL) + return MEMIF_ERR_NOMEM; + + e->msg.type = MEMIF_MSG_TYPE_ACK; + e->fd = -1; + + return MEMIF_ERR_SUCCESS; /* 0 */ } /* send id and secret (optional) for interface identification */ -static_fn int -memif_msg_enq_init (memif_connection_t * c) +static int +memif_msg_enq_init (memif_control_channel_t *cc) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + if (e == NULL) - return memif_syscall_error_handler (errno); - memset (e, 0, sizeof (memif_msg_queue_elt_t)); + return MEMIF_ERR_NOMEM; - memset (&e->msg, 0, sizeof (e->msg)); memif_msg_init_t *i = &e->msg.init; e->msg.type = MEMIF_MSG_TYPE_INIT; e->fd = -1; i->version = MEMIF_VERSION; - i->id = c->args.interface_id; - i->mode = c->args.mode; - - strlcpy ((char *) i->name, (char *) lm->app_name, sizeof (i->name)); - if (strlen ((char *) c->args.secret) > 0) - strncpy ((char *) i->secret, (char *) c->args.secret, sizeof (i->secret)); - - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } + i->id = cc->conn->args.interface_id; + i->mode = cc->conn->args.mode; - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; + strlcpy ((char *) i->name, (char *) cc->sock->args.app_name, + sizeof (i->name)); + if (strlen ((char *) cc->conn->args.secret) > 0) + strlcpy ((char *) i->secret, (char *) cc->conn->args.secret, + sizeof (i->secret)); return MEMIF_ERR_SUCCESS; /* 0 */ } /* send information about region specified by region_index */ -static_fn int -memif_msg_enq_add_region (memif_connection_t * c, uint8_t region_index) +static int +memif_msg_enq_add_region (memif_control_channel_t *cc, uint8_t region_index) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_region_t *mr = &c->regions[region_index]; + memif_region_t *mr = &cc->conn->regions[region_index]; + memif_msg_queue_elt_t *e = memif_msg_enq (cc); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); if (e == NULL) - return memif_syscall_error_handler (errno); + return MEMIF_ERR_NOMEM; - memset (&e->msg, 0, sizeof (e->msg)); memif_msg_add_region_t *ar = &e->msg.add_region; e->msg.type = MEMIF_MSG_TYPE_ADD_REGION; @@ -179,34 +180,19 @@ memif_msg_enq_add_region (memif_connection_t * c, uint8_t region_index) ar->index = region_index; ar->size = mr->region_size; - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } - - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; - return MEMIF_ERR_SUCCESS; /* 0 */ } /* send information about ring specified by direction (S2M | M2S) and index */ -static_fn int -memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir) +static int +memif_msg_enq_add_ring (memif_control_channel_t *cc, uint8_t index, + uint8_t dir) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + if (e == NULL) - return memif_syscall_error_handler (errno); + return MEMIF_ERR_NOMEM; - memset (&e->msg, 0, sizeof (e->msg)); memif_msg_add_ring_t *ar = &e->msg.add_ring; e->msg.type = MEMIF_MSG_TYPE_ADD_RING; @@ -214,9 +200,9 @@ memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir) /* TODO: support multiple rings */ memif_queue_t *mq; if (dir == MEMIF_RING_M2S) - mq = &c->rx_queues[index]; + mq = &cc->conn->rx_queues[index]; else - mq = &c->tx_queues[index]; + mq = &cc->conn->tx_queues[index]; e->fd = mq->int_fd; ar->index = index; @@ -226,119 +212,81 @@ memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, uint8_t dir) ar->flags = (dir == MEMIF_RING_S2M) ? MEMIF_MSG_ADD_RING_FLAG_S2M : 0; ar->private_hdr_size = 0; - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } - - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; - return MEMIF_ERR_SUCCESS; /* 0 */ } /* used as connection request from slave */ -static_fn int -memif_msg_enq_connect (memif_connection_t * c) +static int +memif_msg_enq_connect (memif_control_channel_t *cc) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + if (e == NULL) - return memif_syscall_error_handler (errno); + return MEMIF_ERR_NOMEM; - memset (&e->msg, 0, sizeof (e->msg)); memif_msg_connect_t *cm = &e->msg.connect; e->msg.type = MEMIF_MSG_TYPE_CONNECT; e->fd = -1; - strlcpy ((char *) cm->if_name, (char *) c->args.interface_name, + strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name, sizeof (cm->if_name)); - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } - - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; - return MEMIF_ERR_SUCCESS; /* 0 */ } /* used as confirmation of connection by master */ -static_fn int -memif_msg_enq_connected (memif_connection_t * c) +static int +memif_msg_enq_connected (memif_control_channel_t *cc) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_queue_elt_t *e = - (memif_msg_queue_elt_t *) lm->alloc (sizeof (memif_msg_queue_elt_t)); + memif_msg_queue_elt_t *e = memif_msg_enq (cc); + if (e == NULL) - return memif_syscall_error_handler (errno); + return MEMIF_ERR_NOMEM; - memset (&e->msg, 0, sizeof (e->msg)); memif_msg_connected_t *cm = &e->msg.connected; e->msg.type = MEMIF_MSG_TYPE_CONNECTED; e->fd = -1; - strlcpy ((char *) cm->if_name, (char *) c->args.interface_name, + strlcpy ((char *) cm->if_name, (char *) cc->conn->args.interface_name, sizeof (cm->if_name)); - e->next = NULL; - if (c->msg_queue == NULL) - { - c->msg_queue = e; - return MEMIF_ERR_SUCCESS; /* 0 */ - } - - memif_msg_queue_elt_t *cur = c->msg_queue; - while (cur->next != NULL) - { - cur = cur->next; - } - cur->next = e; - return MEMIF_ERR_SUCCESS; /* 0 */ } -/* immediately send disconnect msg */ - /* specify protocol for disconnect msg err_code - so that it will be compatible with VPP? (header/doc) */ int -memif_msg_send_disconnect (int fd, uint8_t * err_string, uint32_t err_code) +memif_msg_enq_disconnect (memif_control_channel_t *cc, uint8_t *err_string, + uint32_t err_code) { - memif_msg_t msg = { 0 }; - memif_msg_disconnect_t *d = &msg.disconnect; + memif_msg_queue_elt_t *e; + + e = cc->sock->args.alloc (sizeof (*e)); + if (e == NULL) + return MEMIF_ERR_NOMEM; - msg.type = MEMIF_MSG_TYPE_DISCONNECT; + e->fd = -1; + /* Insert disconenct message at the top of the msg queue */ + TAILQ_INSERT_HEAD (&cc->msg_queue, e, next); + + memif_msg_disconnect_t *d = &e->msg.disconnect; + + e->msg.type = MEMIF_MSG_TYPE_DISCONNECT; d->code = err_code; - uint16_t l = strlen ((char *) err_string); - if (l > sizeof (d->string) - 1) + uint16_t l = sizeof (d->string); + if (l > 96) { DBG ("Disconnect string too long. Sending the first %d characters.", sizeof (d->string) - 1); } strlcpy ((char *) d->string, (char *) err_string, sizeof (d->string)); - return memif_msg_send (fd, &msg, -1); + return MEMIF_ERR_SUCCESS; } -static_fn int -memif_msg_receive_hello (memif_connection_t * c, memif_msg_t * msg) +static int +memif_msg_parse_hello (memif_control_channel_t *cc, memif_msg_t *msg) { memif_msg_hello_t *h = &msg->hello; + memif_connection_t *c = cc->conn; if (msg->hello.min_version > MEMIF_VERSION || msg->hello.max_version < MEMIF_VERSION) @@ -360,139 +308,73 @@ memif_msg_receive_hello (memif_connection_t * c, memif_msg_t * msg) } /* handle interface identification (id, secret (optional)) */ -static_fn int -memif_msg_receive_init (memif_socket_t * ms, int fd, memif_msg_t * msg) +static int +memif_msg_parse_init (memif_control_channel_t *cc, memif_msg_t *msg) { memif_msg_init_t *i = &msg->init; - memif_list_elt_t *elt = NULL; - memif_list_elt_t elt2; memif_connection_t *c = NULL; - libmemif_main_t *lm = get_libmemif_main (ms); uint8_t err_string[96]; memset (err_string, 0, sizeof (char) * 96); int err = MEMIF_ERR_SUCCESS; /* 0 */ + /* Check compatible meimf version */ if (i->version != MEMIF_VERSION) { DBG ("MEMIF_VER_ERR"); - strncpy ((char *) err_string, MEMIF_VER_ERR, strlen (MEMIF_VER_ERR)); - err = MEMIF_ERR_PROTO; - goto error; - } - - get_list_elt (&elt, ms->interface_list, ms->interface_list_len, i->id); - if (elt == NULL) - { - DBG ("MEMIF_ID_ERR"); - strncpy ((char *) err_string, MEMIF_ID_ERR, strlen (MEMIF_ID_ERR)); - err = MEMIF_ERR_ID; - goto error; - } - - c = (memif_connection_t *) elt->data_struct; - - if (!(c->args.is_master)) - { - DBG ("MEMIF_SLAVE_ERR"); - strncpy ((char *) err_string, MEMIF_SLAVE_ERR, - strlen (MEMIF_SLAVE_ERR)); - err = MEMIF_ERR_ACCSLAVE; - goto error; - } - if (c->fd != -1) - { - DBG ("MEMIF_CONN_ERR"); - strncpy ((char *) err_string, MEMIF_CONN_ERR, strlen (MEMIF_CONN_ERR)); - err = MEMIF_ERR_ALRCONN; - goto error; - } - - c->fd = fd; - - if (i->mode != c->args.mode) - { - DBG ("MEMIF_MODE_ERR"); - strncpy ((char *) err_string, MEMIF_MODE_ERR, strlen (MEMIF_MODE_ERR)); - err = MEMIF_ERR_MODE; - goto error; - } - - strlcpy ((char *) c->remote_name, (char *) i->name, sizeof (c->remote_name)); - - if (strlen ((char *) c->args.secret) > 0) - { - int r; - if (strlen ((char *) i->secret) > 0) - { - if (strlen ((char *) c->args.secret) != strlen ((char *) i->secret)) - { - DBG ("MEMIF_SECRET_ERR"); - strncpy ((char *) err_string, - MEMIF_SECRET_ERR, strlen (MEMIF_SECRET_ERR)); - err = MEMIF_ERR_SECRET; - goto error; - } - r = strncmp ((char *) i->secret, (char *) c->args.secret, - strlen ((char *) c->args.secret)); - if (r != 0) - { - DBG ("MEMIF_SECRET_ERR"); - strncpy ((char *) err_string, - MEMIF_SECRET_ERR, strlen (MEMIF_SECRET_ERR)); - err = MEMIF_ERR_SECRET; - goto error; - } - } - else - { - DBG ("MEMIF_NOSECRET_ERR"); - strncpy ((char *) err_string, - MEMIF_NOSECRET_ERR, strlen (MEMIF_NOSECRET_ERR)); - err = MEMIF_ERR_NOSECRET; - goto error; - } + memif_msg_enq_disconnect (cc, MEMIF_VER_ERR, 0); + return MEMIF_ERR_PROTO; } - c->read_fn = memif_conn_fd_read_ready; - c->write_fn = memif_conn_fd_write_ready; - c->error_fn = memif_conn_fd_error; + /* Find endpoint on the socket */ + TAILQ_FOREACH (c, &cc->sock->master_interfaces, next) + { + /* Match interface id */ + if (c->args.interface_id != i->id) + continue; + /* If control channel is present, interface is connected (or connecting) */ + if (c->control_channel != NULL) + { + memif_msg_enq_disconnect (cc, "Already connected", 0); + return MEMIF_ERR_ALRCONN; + } + /* Verify secret */ + if (c->args.secret[0] != '\0') + { + if (strncmp ((char *) c->args.secret, (char *) i->secret, 24) != 0) + { + memif_msg_enq_disconnect (cc, "Incorrect secret", 0); + return MEMIF_ERR_SECRET; + } + } + + /* Assign the control channel to this interface */ + c->control_channel = cc; + cc->conn = c; + + strlcpy ((char *) c->remote_name, (char *) i->name, + sizeof (c->remote_name)); + } - elt2.key = c->fd; - elt2.data_struct = c; - - add_list_elt (lm, &elt2, &lm->control_list, &lm->control_list_len); - free_list_elt (lm->pending_list, lm->pending_list_len, fd); - - return err; - -error: - memif_msg_send_disconnect (fd, err_string, 0); - lm->control_fd_update (fd, MEMIF_FD_EVENT_DEL, lm->private_ctx); - free_list_elt (lm->pending_list, lm->pending_list_len, fd); - close (fd); - fd = -1; return err; } /* receive region information and add new region to connection (if possible) */ -static_fn int -memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg, - int fd) +static int +memif_msg_parse_add_region (memif_control_channel_t *cc, memif_msg_t *msg, + int fd) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_add_region_t *ar = &msg->add_region; memif_region_t *mr; + memif_connection_t *c = cc->conn; + if (fd < 0) return MEMIF_ERR_NO_SHMFD; if (ar->index > MEMIF_MAX_REGION) return MEMIF_ERR_MAXREG; - mr = - (memif_region_t *) lm->realloc (c->regions, - sizeof (memif_region_t) * - (++c->regions_num)); + mr = (memif_region_t *) cc->sock->args.realloc ( + c->regions, sizeof (memif_region_t) * (++c->regions_num)); if (mr == NULL) return memif_syscall_error_handler (errno); memset (mr + ar->index, 0, sizeof (memif_region_t)); @@ -500,9 +382,8 @@ memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg, c->regions[ar->index].fd = fd; c->regions[ar->index].region_size = ar->size; c->regions[ar->index].addr = NULL; - /* region 0 is never external */ - if (lm->get_external_region_addr && (ar->index != 0)) + if (cc->sock->get_external_region_addr && (ar->index != 0)) c->regions[ar->index].is_external = 1; return MEMIF_ERR_SUCCESS; /* 0 */ @@ -510,12 +391,12 @@ memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg, /* receive ring information and add new ring to connection queue (based on direction S2M | M2S) */ -static_fn int -memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd) +static int +memif_msg_parse_add_ring (memif_control_channel_t *cc, memif_msg_t *msg, + int fd) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_msg_add_ring_t *ar = &msg->add_ring; + memif_connection_t *c = cc->conn; memif_queue_t *mq; @@ -532,10 +413,8 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd) if (ar->index >= c->args.num_s2m_rings) return MEMIF_ERR_MAXRING; - mq = - (memif_queue_t *) lm->realloc (c->rx_queues, - sizeof (memif_queue_t) * - (++c->rx_queues_num)); + mq = (memif_queue_t *) cc->sock->args.realloc ( + c->rx_queues, sizeof (memif_queue_t) * (++c->rx_queues_num)); memset (mq + ar->index, 0, sizeof (memif_queue_t)); if (mq == NULL) return memif_syscall_error_handler (errno); @@ -553,10 +432,8 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd) if (ar->index >= c->args.num_m2s_rings) return MEMIF_ERR_MAXRING; - mq = - (memif_queue_t *) lm->realloc (c->tx_queues, - sizeof (memif_queue_t) * - (++c->tx_queues_num)); + mq = (memif_queue_t *) cc->sock->args.realloc ( + c->tx_queues, sizeof (memif_queue_t) * (++c->tx_queues_num)); memset (mq + ar->index, 0, sizeof (memif_queue_t)); if (mq == NULL) return memif_syscall_error_handler (errno); @@ -571,15 +448,65 @@ memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, int fd) return MEMIF_ERR_SUCCESS; /* 0 */ } +static int +memif_configure_rx_interrupt (memif_connection_t *c) +{ + memif_socket_t *ms = (memif_socket_t *) c->args.socket; + memif_interrupt_t *idata; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata; + void *ctx; + int i; + + if (c->on_interrupt != NULL) + { + for (i = 0; i < c->run_args.num_m2s_rings; i++) + { + /* Allocate fd event data */ + fdata = ms->args.alloc (sizeof (*fdata)); + if (fdata == NULL) + { + memif_msg_enq_disconnect (c->control_channel, "Internal error", + 0); + return MEMIF_ERR_NOMEM; + } + /* Allocate interrupt data */ + idata = ms->args.alloc (sizeof (*fdata)); + if (idata == NULL) + { + ms->args.free (fdata); + memif_msg_enq_disconnect (c->control_channel, "Internal error", + 0); + return MEMIF_ERR_NOMEM; + } + + /* configure interrupt data */ + idata->c = c; + idata->qid = i; + /* configure fd event data */ + fdata->event_handler = memif_interrupt_handler; + fdata->private_ctx = idata; + fde.fd = c->rx_queues[i].int_fd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; + + /* Start listening for events */ + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); + } + } + + return MEMIF_ERR_SUCCESS; +} + /* slave -> master */ -static_fn int -memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg) +static int +memif_msg_parse_connect (memif_control_channel_t *cc, memif_msg_t *msg) { memif_msg_connect_t *cm = &msg->connect; - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - memif_list_elt_t elt; - + memif_connection_t *c = cc->conn; int err; + err = memif_connect1 (c); if (err != MEMIF_ERR_SUCCESS) return err; @@ -587,21 +514,9 @@ memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg) strlcpy ((char *) c->remote_if_name, (char *) cm->if_name, sizeof (c->remote_if_name)); - int i; - if (c->on_interrupt != NULL) - { - for (i = 0; i < c->run_args.num_m2s_rings; i++) - { - elt.key = c->rx_queues[i].int_fd; - elt.data_struct = c; - add_list_elt (lm, &elt, &lm->interrupt_list, - &lm->interrupt_list_len); - - lm->control_fd_update (c->rx_queues[i].int_fd, MEMIF_FD_EVENT_READ, - lm->private_ctx); - } - - } + err = memif_configure_rx_interrupt (c); + if (err != MEMIF_ERR_SUCCESS) + return err; c->on_connect ((void *) c, c->private_ctx); @@ -609,43 +524,38 @@ memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg) } /* master -> slave */ -static_fn int -memif_msg_receive_connected (memif_connection_t * c, memif_msg_t * msg) +static int +memif_msg_parse_connected (memif_control_channel_t *cc, memif_msg_t *msg) { memif_msg_connect_t *cm = &msg->connect; - libmemif_main_t *lm = get_libmemif_main (c->args.socket); + memif_connection_t *c = cc->conn; int err; err = memif_connect1 (c); if (err != MEMIF_ERR_SUCCESS) return err; - strncpy ((char *) c->remote_if_name, (char *) cm->if_name, + strlcpy ((char *) c->remote_if_name, (char *) cm->if_name, sizeof (c->remote_if_name)); - int i; - if (c->on_interrupt != NULL) - { - for (i = 0; i < c->run_args.num_s2m_rings; i++) - { - lm->control_fd_update (c->rx_queues[i].int_fd, MEMIF_FD_EVENT_READ, - lm->private_ctx); - } - } + err = memif_configure_rx_interrupt (c); + if (err != MEMIF_ERR_SUCCESS) + return err; c->on_connect ((void *) c, c->private_ctx); return err; } -static_fn int -memif_msg_receive_disconnect (memif_connection_t * c, memif_msg_t * msg) +static int +memif_msg_parse_disconnect (memif_control_channel_t *cc, memif_msg_t *msg) { memif_msg_disconnect_t *d = &msg->disconnect; + memif_connection_t *c = cc->conn; memset (c->remote_disconnect_string, 0, sizeof (c->remote_disconnect_string)); - strncpy ((char *) c->remote_disconnect_string, (char *) d->string, + strlcpy ((char *) c->remote_disconnect_string, (char *) d->string, sizeof (c->remote_disconnect_string)); /* on returning error, handle function will call memif_disconnect () */ @@ -654,8 +564,8 @@ memif_msg_receive_disconnect (memif_connection_t * c, memif_msg_t * msg) return MEMIF_ERR_DISCONNECT; } -static_fn int -memif_msg_receive (libmemif_main_t * lm, int ifd) +static int +memif_msg_receive_and_parse (memif_control_channel_t *cc) { char ctl[CMSG_SPACE (sizeof (int)) + CMSG_SPACE (sizeof (struct ucred))] = { 0 }; @@ -666,9 +576,7 @@ memif_msg_receive (libmemif_main_t * lm, int ifd) int err = MEMIF_ERR_SUCCESS; /* 0 */ int fd = -1; int i; - memif_connection_t *c = NULL; memif_socket_t *ms = NULL; - memif_list_elt_t *elt = NULL; iov[0].iov_base = (void *) &msg; iov[0].iov_len = sizeof (memif_msg_t); @@ -677,8 +585,8 @@ memif_msg_receive (libmemif_main_t * lm, int ifd) mh.msg_control = ctl; mh.msg_controllen = sizeof (ctl); - DBG ("recvmsg fd %d", ifd); - size = recvmsg (ifd, &mh, 0); + DBG ("recvmsg fd %d", cc->fd); + size = recvmsg (cc->fd, &mh, 0); if (size != sizeof (memif_msg_t)) { if (size == 0) @@ -709,93 +617,79 @@ memif_msg_receive (libmemif_main_t * lm, int ifd) DBG ("Message type %u received", msg.type); - get_list_elt (&elt, lm->control_list, lm->control_list_len, ifd); - if (elt != NULL) - c = (memif_connection_t *) elt->data_struct; - switch (msg.type) { case MEMIF_MSG_TYPE_ACK: break; case MEMIF_MSG_TYPE_HELLO: - if ((err = memif_msg_receive_hello (c, &msg)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_hello (cc, &msg)) != MEMIF_ERR_SUCCESS) return err; - if ((err = memif_init_regions_and_queues (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_init_regions_and_queues (cc->conn)) != + MEMIF_ERR_SUCCESS) return err; - if ((err = memif_msg_enq_init (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_init (cc)) != MEMIF_ERR_SUCCESS) return err; - for (i = 0; i < c->regions_num; i++) + for (i = 0; i < cc->conn->regions_num; i++) { - if ((err = memif_msg_enq_add_region (c, i)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_add_region (cc, i)) != MEMIF_ERR_SUCCESS) return err; } - for (i = 0; i < c->run_args.num_s2m_rings; i++) + for (i = 0; i < cc->conn->run_args.num_s2m_rings; i++) { - if ((err = - memif_msg_enq_add_ring (c, i, - MEMIF_RING_S2M)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_S2M)) != + MEMIF_ERR_SUCCESS) return err; } - for (i = 0; i < c->run_args.num_m2s_rings; i++) + for (i = 0; i < cc->conn->run_args.num_m2s_rings; i++) { - if ((err = - memif_msg_enq_add_ring (c, i, - MEMIF_RING_M2S)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_add_ring (cc, i, MEMIF_RING_M2S)) != + MEMIF_ERR_SUCCESS) return err; } - if ((err = memif_msg_enq_connect (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_connect (cc)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_INIT: - get_list_elt (&elt, lm->pending_list, lm->pending_list_len, ifd); - if (elt == NULL) - return -1; - ms = (memif_socket_t *) elt->data_struct; - if ((err = memif_msg_receive_init (ms, ifd, &msg)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_init (cc, &msg)) != MEMIF_ERR_SUCCESS) return err; /* c->remote_pid = cr->pid */ /* c->remote_uid = cr->uid */ /* c->remote_gid = cr->gid */ - get_list_elt (&elt, lm->control_list, lm->control_list_len, ifd); - if (elt == NULL) - return -1; - c = (memif_connection_t *) elt->data_struct; - if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_ADD_REGION: - if ((err = - memif_msg_receive_add_region (c, &msg, fd)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_add_region (cc, &msg, fd)) != + MEMIF_ERR_SUCCESS) return err; - if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_ADD_RING: - if ((err = - memif_msg_receive_add_ring (c, &msg, fd)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_add_ring (cc, &msg, fd)) != MEMIF_ERR_SUCCESS) return err; - if ((err = memif_msg_enq_ack (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_ack (cc)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_CONNECT: - if ((err = memif_msg_receive_connect (c, &msg)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_connect (cc, &msg)) != MEMIF_ERR_SUCCESS) return err; - if ((err = memif_msg_enq_connected (c)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_enq_connected (cc)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_CONNECTED: - if ((err = memif_msg_receive_connected (c, &msg)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_connected (cc, &msg)) != MEMIF_ERR_SUCCESS) return err; break; case MEMIF_MSG_TYPE_DISCONNECT: - if ((err = memif_msg_receive_disconnect (c, &msg)) != MEMIF_ERR_SUCCESS) + if ((err = memif_msg_parse_disconnect (cc, &msg)) != MEMIF_ERR_SUCCESS) return err; break; @@ -804,101 +698,155 @@ memif_msg_receive (libmemif_main_t * lm, int ifd) break; } - if (c != NULL) - c->flags |= MEMIF_CONNECTION_FLAG_WRITE; - return MEMIF_ERR_SUCCESS; /* 0 */ } -int -memif_conn_fd_error (memif_connection_t * c) +void +memif_delete_control_channel (memif_control_channel_t *cc) { - DBG ("connection fd error"); - strncpy ((char *) c->remote_disconnect_string, "connection fd error", 19); - int err = memif_disconnect_internal (c); - return err; -} + memif_msg_queue_elt_t *e, *next; + memif_socket_t *ms = cc->sock; + memif_fd_event_t fde; + void *ctx; -/* calls memif_msg_receive to handle pending messages on socket */ -int -memif_conn_fd_read_ready (memif_connection_t * c) -{ - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - int err; + fde.fd = cc->fd; + fde.type = MEMIF_FD_EVENT_DEL; + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + cc->sock->args.on_control_fd_update (fde, ctx); - err = memif_msg_receive (lm, c->fd); - if (err != 0) + if (cc->fd > 0) + close (cc->fd); + + /* Clear control message queue */ + for (e = TAILQ_FIRST (&cc->msg_queue); e != NULL; e = next) { - err = memif_disconnect_internal (c); + next = TAILQ_NEXT (e, next); + TAILQ_REMOVE (&cc->msg_queue, e, next); + cc->sock->args.free (e); } - return err; + + /* remove reference */ + if (cc->conn != NULL) + cc->conn->control_channel = NULL; + cc->conn = NULL; + cc->sock->args.free (cc); + + return; } -/* get msg from msg queue buffer and send it to socket */ int -memif_conn_fd_write_ready (memif_connection_t * c) +memif_control_channel_handler (memif_fd_event_type_t type, void *private_ctx) { - libmemif_main_t *lm = get_libmemif_main (c->args.socket); - int err = MEMIF_ERR_SUCCESS; /* 0 */ - - - if ((c->flags & MEMIF_CONNECTION_FLAG_WRITE) == 0) - goto done; + memif_control_channel_t *cc = (memif_control_channel_t *) private_ctx; + int err; - memif_msg_queue_elt_t *e = c->msg_queue; - if (e == NULL) - goto done; + /* Receive the message, parse the message and + * enqueue next message(s). + */ + err = memif_msg_receive_and_parse (cc); + /* Can't assign to endpoint */ + if (cc->conn == NULL) + { + /* A disconnect message is already in the queue */ + memif_msg_send_from_queue (cc); + memif_delete_control_channel (cc); - c->msg_queue = c->msg_queue->next; + return MEMIF_ERR_SUCCESS; + } + /* error in memif_msg_receive */ + if (err != MEMIF_ERR_SUCCESS) + goto disconnect; - c->flags &= ~MEMIF_CONNECTION_FLAG_WRITE; + /* Continue connecting, send next message from the queue */ + err = memif_msg_send_from_queue (cc); + if (err != MEMIF_ERR_SUCCESS) + goto disconnect; - err = memif_msg_send (c->fd, &e->msg, e->fd); - lm->free (e); - goto done; + return MEMIF_ERR_SUCCESS; -done: - return err; +disconnect: + memif_disconnect_internal (cc->conn); + return MEMIF_ERR_SUCCESS; } int -memif_conn_fd_accept_ready (memif_socket_t * ms) +memif_listener_handler (memif_fd_event_type_t type, void *private_ctx) { - int addr_len; - struct sockaddr_un client; - int conn_fd; - libmemif_main_t *lm = get_libmemif_main (ms); + memif_socket_t *ms = (memif_socket_t *) private_ctx; + memif_control_channel_t *cc; + memif_fd_event_t fde; + memif_fd_event_data_t *fdata; + struct sockaddr_un un; + int err, sockfd, addr_len = sizeof (un); + void *ctx; + + if (ms == NULL) + return MEMIF_ERR_INVAL_ARG; + + if (type & MEMIF_FD_EVENT_READ) + { + /* Accept connection to the listener socket */ + sockfd = accept (ms->listener_fd, (struct sockaddr *) &un, + (socklen_t *) &addr_len); + if (sockfd < 0) + { + return memif_syscall_error_handler (errno); + } - DBG ("accept called"); + /* Create new control channel */ + cc = ms->args.alloc (sizeof (*cc)); + if (cc == NULL) + { + err = MEMIF_ERR_NOMEM; + goto error; + } - addr_len = sizeof (client); - conn_fd = - accept (ms->fd, (struct sockaddr *) &client, (socklen_t *) & addr_len); + cc->fd = sockfd; + /* The connection will be assigned after parsing MEMIF_MSG_TYPE_INIT msg + */ + cc->conn = NULL; + cc->sock = ms; + TAILQ_INIT (&cc->msg_queue); - if (conn_fd < 0) - { - return memif_syscall_error_handler (errno); - } - DBG ("accept fd %d", ms->fd); - DBG ("conn fd %d", conn_fd); + /* Create memif fd event */ + fdata = ms->args.alloc (sizeof (*fdata)); + if (fdata == NULL) + { + err = MEMIF_ERR_NOMEM; + goto error; + } - memif_list_elt_t elt; - elt.key = conn_fd; - elt.data_struct = ms; + fdata->event_handler = memif_control_channel_handler; + fdata->private_ctx = cc; - add_list_elt (lm, &elt, &lm->pending_list, &lm->pending_list_len); - lm->control_fd_update (conn_fd, MEMIF_FD_EVENT_READ | MEMIF_FD_EVENT_WRITE, - lm->private_ctx); + fde.fd = sockfd; + fde.type = MEMIF_FD_EVENT_READ; + fde.private_ctx = fdata; - return memif_msg_send_hello (lm, conn_fd); -} + /* Start listenning for events on the new control channel */ + ctx = ms->epfd != -1 ? ms : ms->private_ctx; + ms->args.on_control_fd_update (fde, ctx); -int -memif_read_ready (libmemif_main_t * lm, int fd) -{ - int err; + /* enqueue HELLO msg */ + err = memif_msg_enq_hello (cc); + if (err != MEMIF_ERR_SUCCESS) + goto error; + + /* send HELLO msg */ + err = memif_msg_send_from_queue (cc); + if (err != MEMIF_ERR_SUCCESS) + goto error; + } + + return MEMIF_ERR_SUCCESS; - err = memif_msg_receive (lm, fd); +error: + if (sockfd > 0) + close (sockfd); + if (cc != NULL) + ms->args.free (cc); + if (fdata != NULL) + ms->args.free (fdata); return err; } diff --git a/extras/libmemif/src/socket.h b/extras/libmemif/src/socket.h index ea6979d6bd5..0c1a848a677 100644 --- a/extras/libmemif/src/socket.h +++ b/extras/libmemif/src/socket.h @@ -31,59 +31,11 @@ /* socket.c */ -int memif_conn_fd_read_ready (memif_connection_t * c); +int memif_listener_handler (memif_fd_event_type_t type, void *private_ctx); -int memif_conn_fd_write_ready (memif_connection_t * c); +int memif_control_channel_handler (memif_fd_event_type_t type, + void *private_ctx); -int memif_conn_fd_error (memif_connection_t * c); - -int memif_conn_fd_accept_ready (memif_socket_t * ms); - -int memif_read_ready (libmemif_main_t *lm, int fd); - -int memif_msg_send_disconnect (int fd, uint8_t * err_string, - uint32_t err_code); - -/* when compiling unit tests, compile functions without static keyword - and declare functions in header file */ -#ifdef MEMIF_UNIT_TEST -#define static_fn - -int memif_msg_send (int fd, memif_msg_t * msg, int afd); - -int memif_msg_enq_ack (memif_connection_t * c); - -int memif_msg_send_hello (libmemif_main_t *lm, int fd); - -int memif_msg_enq_init (memif_connection_t * c); - -int memif_msg_enq_add_region (memif_connection_t * c, uint8_t region); - -int memif_msg_enq_add_ring (memif_connection_t * c, uint8_t index, - uint8_t dir); - -int memif_msg_receive_hello (memif_connection_t * c, memif_msg_t * msg); - -int memif_msg_receive_init (memif_socket_t * ms, int fd, memif_msg_t * msg); - -int memif_msg_receive_add_region (memif_connection_t * c, memif_msg_t * msg, - int fd); - -int memif_msg_receive_add_ring (memif_connection_t * c, memif_msg_t * msg, - int fd); - -int memif_msg_enq_connect (memif_connection_t * c); - -int memif_msg_enq_connected (memif_connection_t * c); - -int memif_msg_receive_connect (memif_connection_t * c, memif_msg_t * msg); - -int memif_msg_receive_connected (memif_connection_t * c, memif_msg_t * msg); - -int memif_msg_receive_disconnect (memif_connection_t * c, memif_msg_t * msg); - -#else -#define static_fn static -#endif /* MEMIF_UNIT_TEST */ +void memif_delete_control_channel (memif_control_channel_t *cc); #endif /* _SOCKET_H_ */ |