aboutsummaryrefslogtreecommitdiffstats
path: root/lib/librte_eal/common/eal_common_proc.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/librte_eal/common/eal_common_proc.c')
-rw-r--r--lib/librte_eal/common/eal_common_proc.c222
1 files changed, 93 insertions, 129 deletions
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 707d8ab3..9fcb9121 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -20,6 +20,7 @@
#include <sys/un.h>
#include <unistd.h>
+#include <rte_alarm.h>
#include <rte_common.h>
#include <rte_cycles.h>
#include <rte_eal.h>
@@ -94,11 +95,9 @@ TAILQ_HEAD(pending_request_list, pending_request);
static struct {
struct pending_request_list requests;
pthread_mutex_t lock;
- pthread_cond_t async_cond;
} pending_requests = {
.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
.lock = PTHREAD_MUTEX_INITIALIZER,
- .async_cond = PTHREAD_COND_INITIALIZER
/**< used in async requests only */
};
@@ -106,6 +105,16 @@ static struct {
static int
mp_send(struct rte_mp_msg *msg, const char *peer, int type);
+/* for use with alarm callback */
+static void
+async_reply_handle(void *arg);
+
+/* for use with process_msg */
+static struct pending_request *
+async_reply_handle_thread_unsafe(void *arg);
+
+static void
+trigger_async_action(struct pending_request *req);
static struct pending_request *
find_pending_request(const char *dst, const char *act_name)
@@ -290,6 +299,8 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
if (m->type == MP_REP || m->type == MP_IGN) {
+ struct pending_request *req = NULL;
+
pthread_mutex_lock(&pending_requests.lock);
pending_req = find_pending_request(s->sun_path, msg->name);
if (pending_req) {
@@ -301,11 +312,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
if (pending_req->type == REQUEST_TYPE_SYNC)
pthread_cond_signal(&pending_req->sync.cond);
else if (pending_req->type == REQUEST_TYPE_ASYNC)
- pthread_cond_signal(
- &pending_requests.async_cond);
+ req = async_reply_handle_thread_unsafe(
+ pending_req);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&pending_requests.lock);
+
+ if (req != NULL)
+ trigger_async_action(req);
return;
}
@@ -365,7 +379,6 @@ timespec_cmp(const struct timespec *a, const struct timespec *b)
}
enum async_action {
- ACTION_NONE, /**< don't do anything */
ACTION_FREE, /**< free the action entry, but don't trigger callback */
ACTION_TRIGGER /**< trigger callback, then free action entry */
};
@@ -375,7 +388,7 @@ process_async_request(struct pending_request *sr, const struct timespec *now)
{
struct async_request_param *param;
struct rte_mp_reply *reply;
- bool timeout, received, last_msg;
+ bool timeout, last_msg;
param = sr->async.param;
reply = &param->user_reply;
@@ -383,13 +396,6 @@ process_async_request(struct pending_request *sr, const struct timespec *now)
/* did we timeout? */
timeout = timespec_cmp(&param->end, now) <= 0;
- /* did we receive a response? */
- received = sr->reply_received != 0;
-
- /* if we didn't time out, and we didn't receive a response, ignore */
- if (!timeout && !received)
- return ACTION_NONE;
-
/* if we received a response, adjust relevant data and copy mesasge. */
if (sr->reply_received == 1 && sr->reply) {
struct rte_mp_msg *msg, *user_msgs, *tmp;
@@ -448,118 +454,58 @@ trigger_async_action(struct pending_request *sr)
free(sr->async.param->user_reply.msgs);
free(sr->async.param);
free(sr->request);
+ free(sr);
}
static struct pending_request *
-check_trigger(struct timespec *ts)
+async_reply_handle_thread_unsafe(void *arg)
{
- struct pending_request *next, *cur, *trigger = NULL;
-
- TAILQ_FOREACH_SAFE(cur, &pending_requests.requests, next, next) {
- enum async_action action;
- if (cur->type != REQUEST_TYPE_ASYNC)
- continue;
+ struct pending_request *req = (struct pending_request *)arg;
+ enum async_action action;
+ struct timespec ts_now;
+ struct timeval now;
- action = process_async_request(cur, ts);
- if (action == ACTION_FREE) {
- TAILQ_REMOVE(&pending_requests.requests, cur, next);
- free(cur);
- } else if (action == ACTION_TRIGGER) {
- TAILQ_REMOVE(&pending_requests.requests, cur, next);
- trigger = cur;
- break;
- }
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ goto no_trigger;
}
- return trigger;
-}
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
-static void
-wait_for_async_messages(void)
-{
- struct pending_request *sr;
- struct timespec timeout;
- bool timedwait = false;
- bool nowait = false;
- int ret;
+ action = process_async_request(req, &ts_now);
- /* scan through the list and see if there are any timeouts that
- * are earlier than our current timeout.
- */
- TAILQ_FOREACH(sr, &pending_requests.requests, next) {
- if (sr->type != REQUEST_TYPE_ASYNC)
- continue;
- if (!timedwait || timespec_cmp(&sr->async.param->end,
- &timeout) < 0) {
- memcpy(&timeout, &sr->async.param->end,
- sizeof(timeout));
- timedwait = true;
- }
+ TAILQ_REMOVE(&pending_requests.requests, req, next);
- /* sometimes, we don't even wait */
- if (sr->reply_received) {
- nowait = true;
- break;
+ if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
+ /* if we failed to cancel the alarm because it's already in
+ * progress, don't proceed because otherwise we will end up
+ * handling the same message twice.
+ */
+ if (rte_errno == EINPROGRESS) {
+ RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
+ goto no_trigger;
}
+ RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
}
- if (nowait)
- return;
-
- do {
- ret = timedwait ?
- pthread_cond_timedwait(
- &pending_requests.async_cond,
- &pending_requests.lock,
- &timeout) :
- pthread_cond_wait(
- &pending_requests.async_cond,
- &pending_requests.lock);
- } while (ret != 0 && ret != ETIMEDOUT);
-
- /* we've been woken up or timed out */
+ if (action == ACTION_TRIGGER)
+ return req;
+no_trigger:
+ free(req);
+ return NULL;
}
-static void *
-async_reply_handle(void *arg __rte_unused)
+static void
+async_reply_handle(void *arg)
{
- struct timeval now;
- struct timespec ts_now;
- while (1) {
- struct pending_request *trigger = NULL;
-
- pthread_mutex_lock(&pending_requests.lock);
+ struct pending_request *req;
- /* we exit this function holding the lock */
- wait_for_async_messages();
-
- if (gettimeofday(&now, NULL) < 0) {
- pthread_mutex_unlock(&pending_requests.lock);
- RTE_LOG(ERR, EAL, "Cannot get current time\n");
- break;
- }
- ts_now.tv_nsec = now.tv_usec * 1000;
- ts_now.tv_sec = now.tv_sec;
-
- do {
- trigger = check_trigger(&ts_now);
- /* unlock request list */
- pthread_mutex_unlock(&pending_requests.lock);
-
- if (trigger) {
- trigger_async_action(trigger);
- free(trigger);
-
- /* we've triggered a callback, but there may be
- * more, so lock the list and check again.
- */
- pthread_mutex_lock(&pending_requests.lock);
- }
- } while (trigger);
- }
-
- RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+ pthread_mutex_lock(&pending_requests.lock);
+ req = async_reply_handle_thread_unsafe(arg);
+ pthread_mutex_unlock(&pending_requests.lock);
- return NULL;
+ if (req != NULL)
+ trigger_async_action(req);
}
static int
@@ -624,7 +570,15 @@ rte_mp_channel_init(void)
{
char path[PATH_MAX];
int dir_fd;
- pthread_t mp_handle_tid, async_reply_handle_tid;
+ pthread_t mp_handle_tid;
+
+ /* in no shared files mode, we do not have secondary processes support,
+ * so no need to initialize IPC.
+ */
+ if (internal_config.no_shconf) {
+ RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
+ return 0;
+ }
/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -671,17 +625,6 @@ rte_mp_channel_init(void)
return -1;
}
- if (rte_ctrl_thread_create(&async_reply_handle_tid,
- "rte_mp_async", NULL,
- async_reply_handle, NULL) < 0) {
- RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
- strerror(errno));
- close(mp_fd);
- close(dir_fd);
- mp_fd = -1;
- return -1;
- }
-
/* unlock the directory */
flock(dir_fd, LOCK_UN);
close(dir_fd);
@@ -786,7 +729,7 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
dir_fd = dirfd(mp_dir);
/* lock the directory to prevent processes spinning up while we send */
- if (flock(dir_fd, LOCK_EX)) {
+ if (flock(dir_fd, LOCK_SH)) {
RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
mp_dir_path);
rte_errno = errno;
@@ -853,7 +796,7 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
static int
mp_request_async(const char *dst, struct rte_mp_msg *req,
- struct async_request_param *param)
+ struct async_request_param *param, const struct timespec *ts)
{
struct rte_mp_msg *reply_msg;
struct pending_request *pending_req, *exist;
@@ -898,6 +841,13 @@ mp_request_async(const char *dst, struct rte_mp_msg *req,
param->user_reply.nb_sent++;
+ if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
+ async_reply_handle, pending_req) < 0) {
+ RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
+ dst, req->name);
+ rte_panic("Fix the above shit to properly free all memory\n");
+ }
+
return 0;
fail:
free(pending_req);
@@ -988,6 +938,12 @@ rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
if (check_input(req) == false)
return -1;
+
+ if (internal_config.no_shconf) {
+ RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
+ return 0;
+ }
+
if (gettimeofday(&now, NULL) < 0) {
RTE_LOG(ERR, EAL, "Faile to get current time\n");
rte_errno = errno;
@@ -1020,7 +976,7 @@ rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
dir_fd = dirfd(mp_dir);
/* lock the directory to prevent processes spinning up while we send */
- if (flock(dir_fd, LOCK_EX)) {
+ if (flock(dir_fd, LOCK_SH)) {
RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
mp_dir_path);
closedir(mp_dir);
@@ -1072,6 +1028,12 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
if (check_input(req) == false)
return -1;
+
+ if (internal_config.no_shconf) {
+ RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
+ return 0;
+ }
+
if (gettimeofday(&now, NULL) < 0) {
RTE_LOG(ERR, EAL, "Faile to get current time\n");
rte_errno = errno;
@@ -1119,7 +1081,7 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
- ret = mp_request_async(eal_mp_socket_path(), copy, param);
+ ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
/* if we didn't send anything, put dummy request on the queue */
if (ret == 0 && reply->nb_sent == 0) {
@@ -1146,7 +1108,7 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
dir_fd = dirfd(mp_dir);
/* lock the directory to prevent processes spinning up while we send */
- if (flock(dir_fd, LOCK_EX)) {
+ if (flock(dir_fd, LOCK_SH)) {
RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
mp_dir_path);
rte_errno = errno;
@@ -1162,7 +1124,7 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);
- if (mp_request_async(path, copy, param))
+ if (mp_request_async(path, copy, param, ts))
ret = -1;
}
/* if we didn't send anything, put dummy request on the queue */
@@ -1171,9 +1133,6 @@ rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
dummy_used = true;
}
- /* trigger async request thread wake up */
- pthread_cond_signal(&pending_requests.async_cond);
-
/* finally, unlock the queue */
pthread_mutex_unlock(&pending_requests.lock);
@@ -1213,5 +1172,10 @@ rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
return -1;
}
+ if (internal_config.no_shconf) {
+ RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
+ return 0;
+ }
+
return mp_send(msg, peer, MP_REP);
}