X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eal%2Fcommon%2Feal_common_proc.c;h=611aba299c937f683445f28c808a95f55a3192ea;hb=6383d2642b62f430c0ddb991dcfa75a5b2a4aec6;hp=da7930f564c524833c2732a33baa98bd5e22b380;hpb=da5957821bdd684a50dcb639fb7798936fb2d981;p=dpdk.git diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index da7930f564..611aba299c 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,7 @@ #include #include #include +#include #include "eal_private.h" #include "eal_filesystem.h" @@ -51,6 +53,7 @@ enum mp_type { MP_MSG, /* Share message with peers, will not block */ MP_REQ, /* Request for information, Will block for a reply */ MP_REP, /* Response to previously-received request */ + MP_IGN, /* Response telling requester to ignore this response */ }; struct mp_msg_internal { @@ -58,31 +61,58 @@ struct mp_msg_internal { struct rte_mp_msg msg; }; -struct sync_request { - TAILQ_ENTRY(sync_request) next; - int reply_received; +struct async_request_param { + rte_mp_async_reply_t clb; + struct rte_mp_reply user_reply; + struct timespec end; + int n_responses_processed; +}; + +struct pending_request { + TAILQ_ENTRY(pending_request) next; + enum { + REQUEST_TYPE_SYNC, + REQUEST_TYPE_ASYNC + } type; char dst[PATH_MAX]; struct rte_mp_msg *request; struct rte_mp_msg *reply; - pthread_cond_t cond; + int reply_received; + RTE_STD_C11 + union { + struct { + struct async_request_param *param; + } async; + struct { + pthread_cond_t cond; + } sync; + }; }; -TAILQ_HEAD(sync_request_list, sync_request); +TAILQ_HEAD(pending_request_list, pending_request); static struct { - struct sync_request_list requests; + struct pending_request_list requests; pthread_mutex_t lock; -} sync_requests = { - .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests), - .lock = PTHREAD_MUTEX_INITIALIZER + pthread_cond_t async_cond; +} pending_requests = { + .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), + .lock = PTHREAD_MUTEX_INITIALIZER, + .async_cond = PTHREAD_COND_INITIALIZER + /**< used in async requests only */ }; -static struct sync_request * -find_sync_request(const char *dst, const char *act_name) +/* forward declarations */ +static int +mp_send(struct rte_mp_msg *msg, const char *peer, int type); + + +static struct pending_request * +find_pending_request(const char *dst, const char *act_name) { - struct sync_request *r; + struct pending_request *r; - TAILQ_FOREACH(r, &sync_requests.requests, next) { + TAILQ_FOREACH(r, &pending_requests.requests, next) { if (!strcmp(r->dst, dst) && !strcmp(r->request->name, act_name)) break; @@ -91,6 +121,17 @@ find_sync_request(const char *dst, const char *act_name) return r; } +static void +create_socket_path(const char *name, char *buf, int len) +{ + const char *prefix = eal_mp_socket_path(); + + if (strlen(name) > 0) + snprintf(buf, len, "%s_%s", prefix, name); + else + strlcpy(buf, prefix, len); +} + int rte_eal_primary_proc_alive(const char *config_file_path) { @@ -159,7 +200,7 @@ rte_mp_action_register(const char *name, rte_mp_t action) rte_errno = ENOMEM; return -1; } - strcpy(entry->action_name, name); + strlcpy(entry->action_name, name, sizeof(entry->action_name)); entry->action = action; pthread_mutex_lock(&mp_mutex_action); @@ -241,23 +282,30 @@ read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) static void process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) { - struct sync_request *sync_req; + struct pending_request *pending_req; struct action_entry *entry; struct rte_mp_msg *msg = &m->msg; rte_mp_t action = NULL; RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); - if (m->type == MP_REP) { - pthread_mutex_lock(&sync_requests.lock); - sync_req = find_sync_request(s->sun_path, msg->name); - if (sync_req) { - memcpy(sync_req->reply, msg, sizeof(*msg)); - sync_req->reply_received = 1; - pthread_cond_signal(&sync_req->cond); + if (m->type == MP_REP || m->type == MP_IGN) { + pthread_mutex_lock(&pending_requests.lock); + pending_req = find_pending_request(s->sun_path, msg->name); + if (pending_req) { + memcpy(pending_req->reply, msg, sizeof(*msg)); + /* -1 indicates that we've been asked to ignore */ + pending_req->reply_received = + m->type == MP_REP ? 1 : -1; + + if (pending_req->type == REQUEST_TYPE_SYNC) + pthread_cond_signal(&pending_req->sync.cond); + else if (pending_req->type == REQUEST_TYPE_ASYNC) + pthread_cond_signal( + &pending_requests.async_cond); } else RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); - pthread_mutex_unlock(&sync_requests.lock); + pthread_mutex_unlock(&pending_requests.lock); return; } @@ -267,10 +315,25 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) action = entry->action; pthread_mutex_unlock(&mp_mutex_action); - if (!action) - RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name); - else if (action(msg, s->sun_path) < 0) + if (!action) { + if (m->type == MP_REQ && !internal_config.init_complete) { + /* if this is a request, and init is not yet complete, + * and callback wasn't registered, we should tell the + * requester to ignore our existence because we're not + * yet ready to process this request. + */ + struct rte_mp_msg dummy; + + memset(&dummy, 0, sizeof(dummy)); + strlcpy(dummy.name, msg->name, sizeof(dummy.name)); + mp_send(&dummy, s->sun_path, MP_IGN); + } else { + RTE_LOG(ERR, EAL, "Cannot find action: %s\n", + msg->name); + } + } else if (action(msg, s->sun_path) < 0) { RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); + } } static void * @@ -287,11 +350,227 @@ mp_handle(void *arg __rte_unused) return NULL; } +static int +timespec_cmp(const struct timespec *a, const struct timespec *b) +{ + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + return 0; +} + +enum async_action { + ACTION_NONE, /**< don't do anything */ + ACTION_FREE, /**< free the action entry, but don't trigger callback */ + ACTION_TRIGGER /**< trigger callback, then free action entry */ +}; + +static enum async_action +process_async_request(struct pending_request *sr, const struct timespec *now) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + bool timeout, received, last_msg; + + param = sr->async.param; + reply = ¶m->user_reply; + + /* did we timeout? */ + timeout = timespec_cmp(¶m->end, now) <= 0; + + /* did we receive a response? */ + received = sr->reply_received != 0; + + /* if we didn't time out, and we didn't receive a response, ignore */ + if (!timeout && !received) + return ACTION_NONE; + + /* if we received a response, adjust relevant data and copy mesasge. */ + if (sr->reply_received == 1 && sr->reply) { + struct rte_mp_msg *msg, *user_msgs, *tmp; + + msg = sr->reply; + user_msgs = reply->msgs; + + tmp = realloc(user_msgs, sizeof(*msg) * + (reply->nb_received + 1)); + if (!tmp) { + RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", + sr->dst, sr->request->name); + /* this entry is going to be removed and its message + * dropped, but we don't want to leak memory, so + * continue. + */ + } else { + user_msgs = tmp; + reply->msgs = user_msgs; + memcpy(&user_msgs[reply->nb_received], + msg, sizeof(*msg)); + reply->nb_received++; + } + + /* mark this request as processed */ + param->n_responses_processed++; + } else if (sr->reply_received == -1) { + /* we were asked to ignore this process */ + reply->nb_sent--; + } else if (timeout) { + /* count it as processed response, but don't increment + * nb_received. + */ + param->n_responses_processed++; + } + + free(sr->reply); + + last_msg = param->n_responses_processed == reply->nb_sent; + + return last_msg ? ACTION_TRIGGER : ACTION_FREE; +} + +static void +trigger_async_action(struct pending_request *sr) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + + param = sr->async.param; + reply = ¶m->user_reply; + + param->clb(sr->request, reply); + + /* clean up */ + free(sr->async.param->user_reply.msgs); + free(sr->async.param); + free(sr->request); +} + +static struct pending_request * +check_trigger(struct timespec *ts) +{ + struct pending_request *next, *cur, *trigger = NULL; + + TAILQ_FOREACH_SAFE(cur, &pending_requests.requests, next, next) { + enum async_action action; + if (cur->type != REQUEST_TYPE_ASYNC) + continue; + + action = process_async_request(cur, ts); + if (action == ACTION_FREE) { + TAILQ_REMOVE(&pending_requests.requests, cur, next); + free(cur); + } else if (action == ACTION_TRIGGER) { + TAILQ_REMOVE(&pending_requests.requests, cur, next); + trigger = cur; + break; + } + } + return trigger; +} + +static void +wait_for_async_messages(void) +{ + struct pending_request *sr; + struct timespec timeout; + bool timedwait = false; + bool nowait = false; + int ret; + + /* scan through the list and see if there are any timeouts that + * are earlier than our current timeout. + */ + TAILQ_FOREACH(sr, &pending_requests.requests, next) { + if (sr->type != REQUEST_TYPE_ASYNC) + continue; + if (!timedwait || timespec_cmp(&sr->async.param->end, + &timeout) < 0) { + memcpy(&timeout, &sr->async.param->end, + sizeof(timeout)); + timedwait = true; + } + + /* sometimes, we don't even wait */ + if (sr->reply_received) { + nowait = true; + break; + } + } + + if (nowait) + return; + + do { + ret = timedwait ? + pthread_cond_timedwait( + &pending_requests.async_cond, + &pending_requests.lock, + &timeout) : + pthread_cond_wait( + &pending_requests.async_cond, + &pending_requests.lock); + } while (ret != 0 && ret != ETIMEDOUT); + + /* we've been woken up or timed out */ +} + +static void * +async_reply_handle(void *arg __rte_unused) +{ + struct timeval now; + struct timespec ts_now; + while (1) { + struct pending_request *trigger = NULL; + + pthread_mutex_lock(&pending_requests.lock); + + /* we exit this function holding the lock */ + wait_for_async_messages(); + + if (gettimeofday(&now, NULL) < 0) { + pthread_mutex_unlock(&pending_requests.lock); + RTE_LOG(ERR, EAL, "Cannot get current time\n"); + break; + } + ts_now.tv_nsec = now.tv_usec * 1000; + ts_now.tv_sec = now.tv_sec; + + do { + trigger = check_trigger(&ts_now); + /* unlock request list */ + pthread_mutex_unlock(&pending_requests.lock); + + if (trigger) { + trigger_async_action(trigger); + free(trigger); + + /* we've triggered a callback, but there may be + * more, so lock the list and check again. + */ + pthread_mutex_lock(&pending_requests.lock); + } + } while (trigger); + } + + RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n"); + + return NULL; +} + static int open_socket_fd(void) { + char peer_name[PATH_MAX] = {0}; struct sockaddr_un un; - const char *prefix = eal_mp_socket_path(); + + if (rte_eal_process_type() == RTE_PROC_SECONDARY) + snprintf(peer_name, sizeof(peer_name), + "%d_%"PRIx64, getpid(), rte_rdtsc()); mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (mp_fd < 0) { @@ -301,13 +580,11 @@ open_socket_fd(void) memset(&un, 0, sizeof(un)); un.sun_family = AF_UNIX; - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix); - else { - snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64, - prefix, getpid(), rte_rdtsc()); - } + + create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); + unlink(un.sun_path); /* May still exist since last run */ + if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", un.sun_path, strerror(errno)); @@ -342,54 +619,73 @@ unlink_sockets(const char *filter) return 0; } -static void -unlink_socket_by_path(const char *path) -{ - char *filename; - char *fullpath = strdup(path); - - if (!fullpath) - return; - filename = basename(fullpath); - unlink_sockets(filename); - free(fullpath); - RTE_LOG(INFO, EAL, "Remove socket %s\n", path); -} - int rte_mp_channel_init(void) { - char thread_name[RTE_MAX_THREAD_NAME_LEN]; - char *path; - pthread_t tid; + char path[PATH_MAX]; + int dir_fd; + pthread_t mp_handle_tid, async_reply_handle_tid; + + /* create filter path */ + create_socket_path("*", path, sizeof(path)); + strlcpy(mp_filter, basename(path), sizeof(mp_filter)); - snprintf(mp_filter, PATH_MAX, ".%s_unix_*", - internal_config.hugefile_prefix); + /* path may have been modified, so recreate it */ + create_socket_path("*", path, sizeof(path)); + strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path)); + + /* lock the directory */ + dir_fd = open(mp_dir_path, O_RDONLY); + if (dir_fd < 0) { + RTE_LOG(ERR, EAL, "failed to open %s: %s\n", + mp_dir_path, strerror(errno)); + return -1; + } - path = strdup(eal_mp_socket_path()); - snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path)); - free(path); + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", + mp_dir_path, strerror(errno)); + close(dir_fd); + return -1; + } if (rte_eal_process_type() == RTE_PROC_PRIMARY && - unlink_sockets(mp_filter)) { + unlink_sockets(mp_filter)) { RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n"); + close(dir_fd); return -1; } - if (open_socket_fd() < 0) + if (open_socket_fd() < 0) { + close(dir_fd); + return -1; + } + + if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle", + NULL, mp_handle, NULL) < 0) { + RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", + strerror(errno)); + close(mp_fd); + close(dir_fd); + mp_fd = -1; return -1; + } - if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) { + if (rte_ctrl_thread_create(&async_reply_handle_tid, + "rte_mp_async", NULL, + async_reply_handle, NULL) < 0) { RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", strerror(errno)); close(mp_fd); + close(dir_fd); mp_fd = -1; return -1; } - /* try best to set thread name */ - snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle"); - rte_thread_setname(tid, thread_name); + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + close(dir_fd); + return 0; } @@ -416,7 +712,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) memset(&dst, 0, sizeof(dst)); dst.sun_family = AF_UNIX; - snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path); + strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path)); memset(&msgh, 0, sizeof(msgh)); memset(control, 0, sizeof(control)); @@ -444,10 +740,9 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) if (snd < 0) { rte_errno = errno; /* Check if it caused by peer process exits */ - if (errno == ECONNREFUSED) { - /* We don't unlink the primary's socket here */ - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - unlink_socket_by_path(dst_path); + if (errno == ECONNREFUSED && + rte_eal_process_type() == RTE_PROC_PRIMARY) { + unlink(dst_path); return 0; } if (errno == ENOBUFS) { @@ -466,7 +761,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) static int mp_send(struct rte_mp_msg *msg, const char *peer, int type) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; @@ -488,6 +783,17 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) rte_errno = errno; return -1; } + + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + closedir(mp_dir); + return -1; + } + while ((ent = readdir(mp_dir))) { char path[PATH_MAX]; @@ -499,7 +805,10 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) if (send_msg(path, msg, type) < 0) ret = -1; } + /* unlock the dir */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } @@ -543,28 +852,79 @@ rte_mp_sendmsg(struct rte_mp_msg *msg) } static int -mp_request_one(const char *dst, struct rte_mp_msg *req, +mp_request_async(const char *dst, struct rte_mp_msg *req, + struct async_request_param *param) +{ + struct rte_mp_msg *reply_msg; + struct pending_request *pending_req, *exist; + int ret; + + pending_req = calloc(1, sizeof(*pending_req)); + reply_msg = calloc(1, sizeof(*reply_msg)); + if (pending_req == NULL || reply_msg == NULL) { + RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); + rte_errno = ENOMEM; + ret = -1; + goto fail; + } + + pending_req->type = REQUEST_TYPE_ASYNC; + strlcpy(pending_req->dst, dst, sizeof(pending_req->dst)); + strcpy(pending_req->dst, dst); + pending_req->request = req; + pending_req->reply = reply_msg; + pending_req->async.param = param; + + /* queue already locked by caller */ + + exist = find_pending_request(dst, req->name); + if (exist) { + RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); + rte_errno = EEXIST; + ret = -1; + goto fail; + } + + ret = send_msg(dst, req, MP_REQ); + if (ret < 0) { + RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", + dst, req->name); + ret = -1; + goto fail; + } else if (ret == 0) { + ret = 0; + goto fail; + } + TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next); + + param->user_reply.nb_sent++; + + return 0; +fail: + free(pending_req); + free(reply_msg); + return ret; +} + +static int +mp_request_sync(const char *dst, struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { int ret; - struct timeval now; struct rte_mp_msg msg, *tmp; - struct sync_request sync_req, *exist; - - sync_req.reply_received = 0; - strcpy(sync_req.dst, dst); - sync_req.request = req; - sync_req.reply = &msg; - pthread_cond_init(&sync_req.cond, NULL); - - pthread_mutex_lock(&sync_requests.lock); - exist = find_sync_request(dst, req->name); - if (!exist) - TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next); + struct pending_request pending_req, *exist; + + pending_req.type = REQUEST_TYPE_SYNC; + pending_req.reply_received = 0; + strlcpy(pending_req.dst, dst, sizeof(pending_req.dst)); + pending_req.request = req; + pending_req.reply = &msg; + pthread_cond_init(&pending_req.sync.cond, NULL); + + exist = find_pending_request(dst, req->name); if (exist) { RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); rte_errno = EEXIST; - pthread_mutex_unlock(&sync_requests.lock); return -1; } @@ -576,32 +936,31 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, } else if (ret == 0) return 0; + TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next); + reply->nb_sent++; do { - pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts); - /* Check spurious wakeups */ - if (sync_req.reply_received == 1) - break; - /* Check if time is out */ - if (gettimeofday(&now, NULL) < 0) - break; - if (ts->tv_sec < now.tv_sec) - break; - else if (now.tv_sec == ts->tv_sec && - now.tv_usec * 1000 < ts->tv_nsec) - break; - } while (1); - /* We got the lock now */ - TAILQ_REMOVE(&sync_requests.requests, &sync_req, next); - pthread_mutex_unlock(&sync_requests.lock); + ret = pthread_cond_timedwait(&pending_req.sync.cond, + &pending_requests.lock, ts); + } while (ret != 0 && ret != ETIMEDOUT); - if (sync_req.reply_received == 0) { + TAILQ_REMOVE(&pending_requests.requests, &pending_req, next); + + if (pending_req.reply_received == 0) { RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", dst, req->name); rte_errno = ETIMEDOUT; return -1; } + if (pending_req.reply_received == -1) { + RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); + /* not receiving this message is not an error, so decrement + * number of sent messages + */ + reply->nb_sent--; + return 0; + } tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); if (!tmp) { @@ -617,10 +976,10 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, } int __rte_experimental -rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, +rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; struct timeval now; @@ -645,8 +1004,12 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, reply->msgs = NULL; /* for secondary process, send request to the primary process only */ - if (rte_eal_process_type() == RTE_PROC_SECONDARY) - return mp_request_one(eal_mp_socket_path(), req, reply, &end); + if (rte_eal_process_type() == RTE_PROC_SECONDARY) { + pthread_mutex_lock(&pending_requests.lock); + ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end); + pthread_mutex_unlock(&pending_requests.lock); + return ret; + } /* for primary process, broadcast request, and collect reply 1 by 1 */ mp_dir = opendir(mp_dir_path); @@ -656,6 +1019,17 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, return -1; } + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + closedir(mp_dir); + rte_errno = errno; + return -1; + } + + pthread_mutex_lock(&pending_requests.lock); while ((ent = readdir(mp_dir))) { char path[PATH_MAX]; @@ -665,18 +1039,170 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, snprintf(path, sizeof(path), "%s/%s", mp_dir_path, ent->d_name); - if (mp_request_one(path, req, reply, &end)) + /* unlocks the mutex while waiting for response, + * locks on receive + */ + if (mp_request_sync(path, req, reply, &end)) ret = -1; } + pthread_mutex_unlock(&pending_requests.lock); + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } int __rte_experimental -rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, + rte_mp_async_reply_t clb) { + struct rte_mp_msg *copy; + struct pending_request *dummy; + struct async_request_param *param; + struct rte_mp_reply *reply; + int dir_fd, ret = 0; + DIR *mp_dir; + struct dirent *ent; + struct timeval now; + struct timespec *end; + bool dummy_used = false; + RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); + + if (check_input(req) == false) + return -1; + if (gettimeofday(&now, NULL) < 0) { + RTE_LOG(ERR, EAL, "Faile to get current time\n"); + rte_errno = errno; + return -1; + } + copy = calloc(1, sizeof(*copy)); + dummy = calloc(1, sizeof(*dummy)); + param = calloc(1, sizeof(*param)); + if (copy == NULL || dummy == NULL || param == NULL) { + RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); + rte_errno = ENOMEM; + goto fail; + } + + /* copy message */ + memcpy(copy, req, sizeof(*copy)); + + param->n_responses_processed = 0; + param->clb = clb; + end = ¶m->end; + reply = ¶m->user_reply; + + end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; + end->tv_sec = now.tv_sec + ts->tv_sec + + (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; + reply->nb_sent = 0; + reply->nb_received = 0; + reply->msgs = NULL; + + /* we have to lock the request queue here, as we will be adding a bunch + * of requests to the queue at once, and some of the replies may arrive + * before we add all of the requests to the queue. + */ + pthread_mutex_lock(&pending_requests.lock); + + /* we have to ensure that callback gets triggered even if we don't send + * anything, therefore earlier we have allocated a dummy request. fill + * it, and put it on the queue if we don't send any requests. + */ + dummy->type = REQUEST_TYPE_ASYNC; + dummy->request = copy; + dummy->reply = NULL; + dummy->async.param = param; + dummy->reply_received = 1; /* short-circuit the timeout */ + + /* for secondary process, send request to the primary process only */ + if (rte_eal_process_type() == RTE_PROC_SECONDARY) { + ret = mp_request_async(eal_mp_socket_path(), copy, param); + + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, + next); + dummy_used = true; + } + + pthread_mutex_unlock(&pending_requests.lock); + + /* if we couldn't send anything, clean up */ + if (ret != 0) + goto fail; + return 0; + } + + /* for primary process, broadcast request */ + mp_dir = opendir(mp_dir_path); + if (!mp_dir) { + RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); + rte_errno = errno; + goto unlock_fail; + } + dir_fd = dirfd(mp_dir); + + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + goto closedir_fail; + } + + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + + if (fnmatch(mp_filter, ent->d_name, 0) != 0) + continue; + + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + + if (mp_request_async(path, copy, param)) + ret = -1; + } + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); + dummy_used = true; + } + + /* trigger async request thread wake up */ + pthread_cond_signal(&pending_requests.async_cond); + + /* finally, unlock the queue */ + pthread_mutex_unlock(&pending_requests.lock); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + + /* dir_fd automatically closed on closedir */ + closedir(mp_dir); + + /* if dummy was unused, free it */ + if (!dummy_used) + free(dummy); + + return ret; +closedir_fail: + closedir(mp_dir); +unlock_fail: + pthread_mutex_unlock(&pending_requests.lock); +fail: + free(dummy); + free(param); + free(copy); + return -1; +} + +int __rte_experimental +rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +{ RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); if (check_input(msg) == false)