X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eal%2Fcommon%2Feal_common_proc.c;h=f98622f4e9be5077e8b4c5b2f5a3730b8187a6c4;hb=2a04139f66b4;hp=e4d81044fc88559b2c2272bd4c980473325516d7;hpb=620952e060469ca966333c90cab3ca00281eff7d;p=dpdk.git diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index e4d81044fc..f98622f4e9 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,7 @@ #include #include #include +#include #include "eal_private.h" #include "eal_filesystem.h" @@ -51,6 +53,7 @@ enum mp_type { MP_MSG, /* Share message with peers, will not block */ MP_REQ, /* Request for information, Will block for a reply */ MP_REP, /* Response to previously-received request */ + MP_IGN, /* Response telling requester to ignore this response */ }; struct mp_msg_internal { @@ -58,31 +61,58 @@ struct mp_msg_internal { struct rte_mp_msg msg; }; -struct sync_request { - TAILQ_ENTRY(sync_request) next; - int reply_received; +struct async_request_param { + rte_mp_async_reply_t clb; + struct rte_mp_reply user_reply; + struct timespec end; + int n_responses_processed; +}; + +struct pending_request { + TAILQ_ENTRY(pending_request) next; + enum { + REQUEST_TYPE_SYNC, + REQUEST_TYPE_ASYNC + } type; char dst[PATH_MAX]; struct rte_mp_msg *request; struct rte_mp_msg *reply; - pthread_cond_t cond; + int reply_received; + RTE_STD_C11 + union { + struct { + struct async_request_param *param; + } async; + struct { + pthread_cond_t cond; + } sync; + }; }; -TAILQ_HEAD(sync_request_list, sync_request); +TAILQ_HEAD(pending_request_list, pending_request); static struct { - struct sync_request_list requests; + struct pending_request_list requests; pthread_mutex_t lock; -} sync_requests = { - .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests), - .lock = PTHREAD_MUTEX_INITIALIZER + pthread_cond_t async_cond; +} pending_requests = { + .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests), + .lock = PTHREAD_MUTEX_INITIALIZER, + .async_cond = PTHREAD_COND_INITIALIZER + /**< used in async requests only */ }; -static struct sync_request * +/* forward declarations */ +static int +mp_send(struct rte_mp_msg *msg, const char *peer, int type); + + +static struct pending_request * find_sync_request(const char *dst, const char *act_name) { - struct sync_request *r; + struct pending_request *r; - TAILQ_FOREACH(r, &sync_requests.requests, next) { + TAILQ_FOREACH(r, &pending_requests.requests, next) { if (!strcmp(r->dst, dst) && !strcmp(r->request->name, act_name)) break; @@ -91,6 +121,17 @@ find_sync_request(const char *dst, const char *act_name) return r; } +static void +create_socket_path(const char *name, char *buf, int len) +{ + const char *prefix = eal_mp_socket_path(); + + if (strlen(name) > 0) + snprintf(buf, len, "%s_%s", prefix, name); + else + snprintf(buf, len, "%s", prefix); +} + int rte_eal_primary_proc_alive(const char *config_file_path) { @@ -241,23 +282,29 @@ read_msg(struct mp_msg_internal *m, struct sockaddr_un *s) static void process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) { - struct sync_request *sync_req; + struct pending_request *sync_req; struct action_entry *entry; struct rte_mp_msg *msg = &m->msg; rte_mp_t action = NULL; RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); - if (m->type == MP_REP) { - pthread_mutex_lock(&sync_requests.lock); + if (m->type == MP_REP || m->type == MP_IGN) { + pthread_mutex_lock(&pending_requests.lock); sync_req = find_sync_request(s->sun_path, msg->name); if (sync_req) { memcpy(sync_req->reply, msg, sizeof(*msg)); - sync_req->reply_received = 1; - pthread_cond_signal(&sync_req->cond); + /* -1 indicates that we've been asked to ignore */ + sync_req->reply_received = m->type == MP_REP ? 1 : -1; + + if (sync_req->type == REQUEST_TYPE_SYNC) + pthread_cond_signal(&sync_req->sync.cond); + else if (sync_req->type == REQUEST_TYPE_ASYNC) + pthread_cond_signal( + &pending_requests.async_cond); } else RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); - pthread_mutex_unlock(&sync_requests.lock); + pthread_mutex_unlock(&pending_requests.lock); return; } @@ -267,10 +314,23 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) action = entry->action; pthread_mutex_unlock(&mp_mutex_action); - if (!action) - RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name); - else if (action(msg, s->sun_path) < 0) + if (!action) { + if (m->type == MP_REQ && !internal_config.init_complete) { + /* if this is a request, and init is not yet complete, + * and callback wasn't registered, we should tell the + * requester to ignore our existence because we're not + * yet ready to process this request. + */ + struct rte_mp_msg dummy; + memset(&dummy, 0, sizeof(dummy)); + mp_send(&dummy, s->sun_path, MP_IGN); + } else { + RTE_LOG(ERR, EAL, "Cannot find action: %s\n", + msg->name); + } + } else if (action(msg, s->sun_path) < 0) { RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); + } } static void * @@ -287,11 +347,198 @@ mp_handle(void *arg __rte_unused) return NULL; } +static int +timespec_cmp(const struct timespec *a, const struct timespec *b) +{ + if (a->tv_sec < b->tv_sec) + return -1; + if (a->tv_sec > b->tv_sec) + return 1; + if (a->tv_nsec < b->tv_nsec) + return -1; + if (a->tv_nsec > b->tv_nsec) + return 1; + return 0; +} + +enum async_action { + ACTION_NONE, /**< don't do anything */ + ACTION_FREE, /**< free the action entry, but don't trigger callback */ + ACTION_TRIGGER /**< trigger callback, then free action entry */ +}; + +static enum async_action +process_async_request(struct pending_request *sr, const struct timespec *now) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + bool timeout, received, last_msg; + + param = sr->async.param; + reply = ¶m->user_reply; + + /* did we timeout? */ + timeout = timespec_cmp(¶m->end, now) <= 0; + + /* did we receive a response? */ + received = sr->reply_received != 0; + + /* if we didn't time out, and we didn't receive a response, ignore */ + if (!timeout && !received) + return ACTION_NONE; + + /* if we received a response, adjust relevant data and copy mesasge. */ + if (sr->reply_received == 1 && sr->reply) { + struct rte_mp_msg *msg, *user_msgs, *tmp; + + msg = sr->reply; + user_msgs = reply->msgs; + + tmp = realloc(user_msgs, sizeof(*msg) * + (reply->nb_received + 1)); + if (!tmp) { + RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n", + sr->dst, sr->request->name); + /* this entry is going to be removed and its message + * dropped, but we don't want to leak memory, so + * continue. + */ + } else { + user_msgs = tmp; + reply->msgs = user_msgs; + memcpy(&user_msgs[reply->nb_received], + msg, sizeof(*msg)); + reply->nb_received++; + } + + /* mark this request as processed */ + param->n_responses_processed++; + } else if (sr->reply_received == -1) { + /* we were asked to ignore this process */ + reply->nb_sent--; + } + free(sr->reply); + + last_msg = param->n_responses_processed == reply->nb_sent; + + return last_msg ? ACTION_TRIGGER : ACTION_FREE; +} + +static void +trigger_async_action(struct pending_request *sr) +{ + struct async_request_param *param; + struct rte_mp_reply *reply; + + param = sr->async.param; + reply = ¶m->user_reply; + + param->clb(sr->request, reply); + + /* clean up */ + free(sr->async.param->user_reply.msgs); + free(sr->async.param); + free(sr->request); +} + +static void * +async_reply_handle(void *arg __rte_unused) +{ + struct pending_request *sr; + struct timeval now; + struct timespec timeout, ts_now; + while (1) { + struct pending_request *trigger = NULL; + int ret; + bool nowait = false; + bool timedwait = false; + + pthread_mutex_lock(&pending_requests.lock); + + /* scan through the list and see if there are any timeouts that + * are earlier than our current timeout. + */ + TAILQ_FOREACH(sr, &pending_requests.requests, next) { + if (sr->type != REQUEST_TYPE_ASYNC) + continue; + if (!timedwait || timespec_cmp(&sr->async.param->end, + &timeout) < 0) { + memcpy(&timeout, &sr->async.param->end, + sizeof(timeout)); + timedwait = true; + } + + /* sometimes, we don't even wait */ + if (sr->reply_received) { + nowait = true; + break; + } + } + + if (nowait) + ret = 0; + else if (timedwait) + ret = pthread_cond_timedwait( + &pending_requests.async_cond, + &pending_requests.lock, &timeout); + else + ret = pthread_cond_wait(&pending_requests.async_cond, + &pending_requests.lock); + + if (gettimeofday(&now, NULL) < 0) { + RTE_LOG(ERR, EAL, "Cannot get current time\n"); + break; + } + ts_now.tv_nsec = now.tv_usec * 1000; + ts_now.tv_sec = now.tv_sec; + + if (ret == 0 || ret == ETIMEDOUT) { + struct pending_request *next; + /* we've either been woken up, or we timed out */ + + /* we have still the lock, check if anything needs + * processing. + */ + TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next, + next) { + enum async_action action; + if (sr->type != REQUEST_TYPE_ASYNC) + continue; + + action = process_async_request(sr, &ts_now); + if (action == ACTION_FREE) { + TAILQ_REMOVE(&pending_requests.requests, + sr, next); + free(sr); + } else if (action == ACTION_TRIGGER && + trigger == NULL) { + TAILQ_REMOVE(&pending_requests.requests, + sr, next); + trigger = sr; + } + } + } + pthread_mutex_unlock(&pending_requests.lock); + if (trigger) { + trigger_async_action(trigger); + free(trigger); + } + }; + + RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n"); + + return NULL; +} + static int open_socket_fd(void) { + char peer_name[PATH_MAX] = {0}; struct sockaddr_un un; - const char *prefix = eal_mp_socket_path(); + + if (rte_eal_process_type() == RTE_PROC_SECONDARY) + snprintf(peer_name, sizeof(peer_name), + "%d_%"PRIx64, getpid(), rte_rdtsc()); mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (mp_fd < 0) { @@ -301,13 +548,11 @@ open_socket_fd(void) memset(&un, 0, sizeof(un)); un.sun_family = AF_UNIX; - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix); - else { - snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64, - prefix, getpid(), rte_rdtsc()); - } + + create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); + unlink(un.sun_path); /* May still exist since last run */ + if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", un.sun_path, strerror(errno)); @@ -342,44 +587,50 @@ unlink_sockets(const char *filter) return 0; } -static void -unlink_socket_by_path(const char *path) -{ - char *filename; - char *fullpath = strdup(path); - - if (!fullpath) - return; - filename = basename(fullpath); - unlink_sockets(filename); - free(fullpath); - RTE_LOG(INFO, EAL, "Remove socket %s\n", path); -} - int rte_mp_channel_init(void) { char thread_name[RTE_MAX_THREAD_NAME_LEN]; - char *path; - pthread_t tid; + char path[PATH_MAX]; + int dir_fd; + pthread_t mp_handle_tid, async_reply_handle_tid; + + /* create filter path */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path)); - snprintf(mp_filter, PATH_MAX, ".%s_unix_*", - internal_config.hugefile_prefix); + /* path may have been modified, so recreate it */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path)); - path = strdup(eal_mp_socket_path()); - snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path)); - free(path); + /* lock the directory */ + dir_fd = open(mp_dir_path, O_RDONLY); + if (dir_fd < 0) { + RTE_LOG(ERR, EAL, "failed to open %s: %s\n", + mp_dir_path, strerror(errno)); + return -1; + } + + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", + mp_dir_path, strerror(errno)); + close(dir_fd); + return -1; + } if (rte_eal_process_type() == RTE_PROC_PRIMARY && - unlink_sockets(mp_filter)) { + unlink_sockets(mp_filter)) { RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n"); + close(dir_fd); return -1; } - if (open_socket_fd() < 0) + if (open_socket_fd() < 0) { + close(dir_fd); return -1; + } - if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) { + if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) { RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", strerror(errno)); close(mp_fd); @@ -387,9 +638,28 @@ rte_mp_channel_init(void) return -1; } + if (pthread_create(&async_reply_handle_tid, NULL, + async_reply_handle, NULL) < 0) { + RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", + strerror(errno)); + close(mp_fd); + close(dir_fd); + mp_fd = -1; + return -1; + } + /* try best to set thread name */ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle"); - rte_thread_setname(tid, thread_name); + rte_thread_setname(mp_handle_tid, thread_name); + + /* try best to set thread name */ + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle"); + rte_thread_setname(async_reply_handle_tid, thread_name); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + close(dir_fd); + return 0; } @@ -444,13 +714,12 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) if (snd < 0) { rte_errno = errno; /* Check if it caused by peer process exits */ - if (errno == -ECONNREFUSED) { - /* We don't unlink the primary's socket here */ - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - unlink_socket_by_path(dst_path); + if (errno == ECONNREFUSED && + rte_eal_process_type() == RTE_PROC_PRIMARY) { + unlink(dst_path); return 0; } - if (errno == -ENOBUFS) { + if (errno == ENOBUFS) { RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n", dst_path); return 0; @@ -466,7 +735,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) static int mp_send(struct rte_mp_msg *msg, const char *peer, int type) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; @@ -488,14 +757,32 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) rte_errno = errno; return -1; } + + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + closedir(mp_dir); + return -1; + } + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + if (fnmatch(mp_filter, ent->d_name, 0) != 0) continue; - if (send_msg(ent->d_name, msg, type) < 0) + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + if (send_msg(path, msg, type) < 0) ret = -1; } + /* unlock the dir */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } @@ -539,28 +826,86 @@ rte_mp_sendmsg(struct rte_mp_msg *msg) } static int -mp_request_one(const char *dst, struct rte_mp_msg *req, +mp_request_async(const char *dst, struct rte_mp_msg *req, + struct async_request_param *param) +{ + struct rte_mp_msg *reply_msg; + struct pending_request *sync_req, *exist; + int ret; + + sync_req = malloc(sizeof(*sync_req)); + reply_msg = malloc(sizeof(*reply_msg)); + if (sync_req == NULL || reply_msg == NULL) { + RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n"); + rte_errno = ENOMEM; + ret = -1; + goto fail; + } + + memset(sync_req, 0, sizeof(*sync_req)); + memset(reply_msg, 0, sizeof(*reply_msg)); + + sync_req->type = REQUEST_TYPE_ASYNC; + strcpy(sync_req->dst, dst); + sync_req->request = req; + sync_req->reply = reply_msg; + sync_req->async.param = param; + + /* queue already locked by caller */ + + exist = find_sync_request(dst, req->name); + if (!exist) { + TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next); + } else { + RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); + rte_errno = EEXIST; + ret = -1; + goto fail; + } + + ret = send_msg(dst, req, MP_REQ); + if (ret < 0) { + RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n", + dst, req->name); + ret = -1; + goto fail; + } else if (ret == 0) { + ret = 0; + goto fail; + } + + param->user_reply.nb_sent++; + + return 0; +fail: + free(sync_req); + free(reply_msg); + return ret; +} + +static int +mp_request_sync(const char *dst, struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { int ret; - struct timeval now; struct rte_mp_msg msg, *tmp; - struct sync_request sync_req, *exist; + struct pending_request sync_req, *exist; + sync_req.type = REQUEST_TYPE_SYNC; sync_req.reply_received = 0; strcpy(sync_req.dst, dst); sync_req.request = req; sync_req.reply = &msg; - pthread_cond_init(&sync_req.cond, NULL); + pthread_cond_init(&sync_req.sync.cond, NULL); - pthread_mutex_lock(&sync_requests.lock); + pthread_mutex_lock(&pending_requests.lock); exist = find_sync_request(dst, req->name); if (!exist) - TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next); - pthread_mutex_unlock(&sync_requests.lock); + TAILQ_INSERT_TAIL(&pending_requests.requests, &sync_req, next); if (exist) { RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); rte_errno = EEXIST; + pthread_mutex_unlock(&pending_requests.lock); return -1; } @@ -574,24 +919,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, reply->nb_sent++; - pthread_mutex_lock(&sync_requests.lock); do { - pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts); - /* Check spurious wakeups */ - if (sync_req.reply_received == 1) - break; - /* Check if time is out */ - if (gettimeofday(&now, NULL) < 0) - break; - if (ts->tv_sec < now.tv_sec) - break; - else if (now.tv_sec == ts->tv_sec && - now.tv_usec * 1000 < ts->tv_nsec) - break; - } while (1); + ret = pthread_cond_timedwait(&sync_req.sync.cond, + &pending_requests.lock, ts); + } while (ret != 0 && ret != ETIMEDOUT); + /* We got the lock now */ - TAILQ_REMOVE(&sync_requests.requests, &sync_req, next); - pthread_mutex_unlock(&sync_requests.lock); + TAILQ_REMOVE(&pending_requests.requests, &sync_req, next); + pthread_mutex_unlock(&pending_requests.lock); if (sync_req.reply_received == 0) { RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n", @@ -599,6 +934,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, rte_errno = ETIMEDOUT; return -1; } + if (sync_req.reply_received == -1) { + RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); + /* not receiving this message is not an error, so decrement + * number of sent messages + */ + reply->nb_sent--; + return 0; + } tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); if (!tmp) { @@ -614,10 +957,10 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, } int __rte_experimental -rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, +rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; struct timeval now; @@ -643,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, /* for secondary process, send request to the primary process only */ if (rte_eal_process_type() == RTE_PROC_SECONDARY) - return mp_request_one(eal_mp_socket_path(), req, reply, &end); + return mp_request_sync(eal_mp_socket_path(), req, reply, &end); /* for primary process, broadcast request, and collect reply 1 by 1 */ mp_dir = opendir(mp_dir_path); @@ -653,22 +996,189 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, return -1; } + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + closedir(mp_dir); + rte_errno = errno; + return -1; + } + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + if (fnmatch(mp_filter, ent->d_name, 0) != 0) continue; - if (mp_request_one(ent->d_name, req, reply, &end)) + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + + if (mp_request_sync(path, req, reply, &end)) ret = -1; } + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } int __rte_experimental -rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts, + rte_mp_async_reply_t clb) { + struct rte_mp_msg *copy; + struct pending_request *dummy; + struct async_request_param *param; + struct rte_mp_reply *reply; + int dir_fd, ret = 0; + DIR *mp_dir; + struct dirent *ent; + struct timeval now; + struct timespec *end; + bool dummy_used = false; + + RTE_LOG(DEBUG, EAL, "request: %s\n", req->name); + if (check_input(req) == false) + return -1; + if (gettimeofday(&now, NULL) < 0) { + RTE_LOG(ERR, EAL, "Faile to get current time\n"); + rte_errno = errno; + return -1; + } + copy = malloc(sizeof(*copy)); + dummy = malloc(sizeof(*dummy)); + param = malloc(sizeof(*param)); + if (copy == NULL || dummy == NULL || param == NULL) { + RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n"); + rte_errno = ENOMEM; + goto fail; + } + + memset(copy, 0, sizeof(*copy)); + memset(dummy, 0, sizeof(*dummy)); + memset(param, 0, sizeof(*param)); + + /* copy message */ + memcpy(copy, req, sizeof(*copy)); + + param->n_responses_processed = 0; + param->clb = clb; + end = ¶m->end; + reply = ¶m->user_reply; + + end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000; + end->tv_sec = now.tv_sec + ts->tv_sec + + (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000; + reply->nb_sent = 0; + reply->nb_received = 0; + reply->msgs = NULL; + + /* we have to lock the request queue here, as we will be adding a bunch + * of requests to the queue at once, and some of the replies may arrive + * before we add all of the requests to the queue. + */ + pthread_mutex_lock(&pending_requests.lock); + + /* we have to ensure that callback gets triggered even if we don't send + * anything, therefore earlier we have allocated a dummy request. fill + * it, and put it on the queue if we don't send any requests. + */ + dummy->type = REQUEST_TYPE_ASYNC; + dummy->request = copy; + dummy->reply = NULL; + dummy->async.param = param; + dummy->reply_received = 1; /* short-circuit the timeout */ + + /* for secondary process, send request to the primary process only */ + if (rte_eal_process_type() == RTE_PROC_SECONDARY) { + ret = mp_request_async(eal_mp_socket_path(), copy, param); + + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, + next); + dummy_used = true; + } + + pthread_mutex_unlock(&pending_requests.lock); + + /* if we couldn't send anything, clean up */ + if (ret != 0) + goto fail; + return 0; + } + + /* for primary process, broadcast request */ + mp_dir = opendir(mp_dir_path); + if (!mp_dir) { + RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path); + rte_errno = errno; + goto unlock_fail; + } + dir_fd = dirfd(mp_dir); + + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + goto closedir_fail; + } + + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + + if (fnmatch(mp_filter, ent->d_name, 0) != 0) + continue; + + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + + if (mp_request_async(path, copy, param)) + ret = -1; + } + /* if we didn't send anything, put dummy request on the queue */ + if (ret == 0 && reply->nb_sent == 0) { + TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next); + dummy_used = true; + } + + /* trigger async request thread wake up */ + pthread_cond_signal(&pending_requests.async_cond); + + /* finally, unlock the queue */ + pthread_mutex_unlock(&pending_requests.lock); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + + /* dir_fd automatically closed on closedir */ + closedir(mp_dir); + + /* if dummy was unused, free it */ + if (!dummy_used) + free(dummy); + + return ret; +closedir_fail: + closedir(mp_dir); +unlock_fail: + pthread_mutex_unlock(&pending_requests.lock); +fail: + free(dummy); + free(param); + free(copy); + return -1; +} + +int __rte_experimental +rte_mp_reply(struct rte_mp_msg *msg, const char *peer) +{ RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name); if (check_input(msg) == false)