X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eal%2Fcommon%2Feal_common_proc.c;h=4131b67b0289b67efd6572e10d2dfd845d23bc78;hb=579a4ccc345c1932ff75259463ffde64722a16a9;hp=1aab3acd4e1fda378a8db067b6255ee6cb2a4eb8;hpb=2e30c3fac4e94417fe2299dea3bb4555e76da4c6;p=dpdk.git diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index 1aab3acd4e..4131b67b02 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,7 @@ enum mp_type { MP_MSG, /* Share message with peers, will not block */ MP_REQ, /* Request for information, Will block for a reply */ MP_REP, /* Response to previously-received request */ + MP_IGN, /* Response telling requester to ignore this response */ }; struct mp_msg_internal { @@ -77,6 +79,11 @@ static struct { .lock = PTHREAD_MUTEX_INITIALIZER }; +/* forward declarations */ +static int +mp_send(struct rte_mp_msg *msg, const char *peer, int type); + + static struct sync_request * find_sync_request(const char *dst, const char *act_name) { @@ -259,12 +266,13 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); - if (m->type == MP_REP) { + if (m->type == MP_REP || m->type == MP_IGN) { pthread_mutex_lock(&sync_requests.lock); sync_req = find_sync_request(s->sun_path, msg->name); if (sync_req) { memcpy(sync_req->reply, msg, sizeof(*msg)); - sync_req->reply_received = 1; + /* -1 indicates that we've been asked to ignore */ + sync_req->reply_received = m->type == MP_REP ? 1 : -1; pthread_cond_signal(&sync_req->cond); } else RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); @@ -278,10 +286,23 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) action = entry->action; pthread_mutex_unlock(&mp_mutex_action); - if (!action) - RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name); - else if (action(msg, s->sun_path) < 0) + if (!action) { + if (m->type == MP_REQ && !internal_config.init_complete) { + /* if this is a request, and init is not yet complete, + * and callback wasn't registered, we should tell the + * requester to ignore our existence because we're not + * yet ready to process this request. + */ + struct rte_mp_msg dummy; + memset(&dummy, 0, sizeof(dummy)); + mp_send(&dummy, s->sun_path, MP_IGN); + } else { + RTE_LOG(ERR, EAL, "Cannot find action: %s\n", + msg->name); + } + } else if (action(msg, s->sun_path) < 0) { RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); + } } static void * @@ -359,29 +380,50 @@ int rte_mp_channel_init(void) { char thread_name[RTE_MAX_THREAD_NAME_LEN]; - char *path; + char path[PATH_MAX]; + int dir_fd; pthread_t tid; - snprintf(mp_filter, PATH_MAX, ".%s_unix_*", - internal_config.hugefile_prefix); + /* create filter path */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path)); + + /* path may have been modified, so recreate it */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path)); - path = strdup(eal_mp_socket_path()); - snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path)); - free(path); + /* lock the directory */ + dir_fd = open(mp_dir_path, O_RDONLY); + if (dir_fd < 0) { + RTE_LOG(ERR, EAL, "failed to open %s: %s\n", + mp_dir_path, strerror(errno)); + return -1; + } + + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", + mp_dir_path, strerror(errno)); + close(dir_fd); + return -1; + } if (rte_eal_process_type() == RTE_PROC_PRIMARY && - unlink_sockets(mp_filter)) { + unlink_sockets(mp_filter)) { RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n"); + close(dir_fd); return -1; } - if (open_socket_fd() < 0) + if (open_socket_fd() < 0) { + close(dir_fd); return -1; + } if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) { RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", strerror(errno)); close(mp_fd); + close(dir_fd); mp_fd = -1; return -1; } @@ -389,6 +431,11 @@ rte_mp_channel_init(void) /* try best to set thread name */ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle"); rte_thread_setname(tid, thread_name); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + close(dir_fd); + return 0; } @@ -464,7 +511,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) static int mp_send(struct rte_mp_msg *msg, const char *peer, int type) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; @@ -486,6 +533,17 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) rte_errno = errno; return -1; } + + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + closedir(mp_dir); + return -1; + } + while ((ent = readdir(mp_dir))) { char path[PATH_MAX]; @@ -497,7 +555,10 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) if (send_msg(path, msg, type) < 0) ret = -1; } + /* unlock the dir */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } @@ -545,7 +606,6 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { int ret; - struct timeval now; struct rte_mp_msg msg, *tmp; struct sync_request sync_req, *exist; @@ -577,19 +637,10 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, reply->nb_sent++; do { - pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts); - /* Check spurious wakeups */ - if (sync_req.reply_received == 1) - break; - /* Check if time is out */ - if (gettimeofday(&now, NULL) < 0) - break; - if (ts->tv_sec < now.tv_sec) - break; - else if (now.tv_sec == ts->tv_sec && - now.tv_usec * 1000 < ts->tv_nsec) - break; - } while (1); + ret = pthread_cond_timedwait(&sync_req.cond, + &sync_requests.lock, ts); + } while (ret != 0 && ret != ETIMEDOUT); + /* We got the lock now */ TAILQ_REMOVE(&sync_requests.requests, &sync_req, next); pthread_mutex_unlock(&sync_requests.lock); @@ -600,6 +651,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, rte_errno = ETIMEDOUT; return -1; } + if (sync_req.reply_received == -1) { + RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); + /* not receiving this message is not an error, so decrement + * number of sent messages + */ + reply->nb_sent--; + return 0; + } tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); if (!tmp) { @@ -618,7 +677,7 @@ int __rte_experimental rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; struct timeval now; @@ -654,6 +713,16 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, return -1; } + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + closedir(mp_dir); + rte_errno = errno; + return -1; + } + while ((ent = readdir(mp_dir))) { char path[PATH_MAX]; @@ -666,7 +735,10 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, if (mp_request_one(path, req, reply, &end)) ret = -1; } + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; }