X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eal%2Fcommon%2Feal_common_proc.c;h=4131b67b0289b67efd6572e10d2dfd845d23bc78;hb=579a4ccc345c1932ff75259463ffde64722a16a9;hp=caa8774aef02f2d70096aac2583863b6fa247ddc;hpb=d9fff8a318264644d20674e62cddcd365253e7fe;p=dpdk.git diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index caa8774aef..4131b67b02 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,7 @@ enum mp_type { MP_MSG, /* Share message with peers, will not block */ MP_REQ, /* Request for information, Will block for a reply */ MP_REP, /* Response to previously-received request */ + MP_IGN, /* Response telling requester to ignore this response */ }; struct mp_msg_internal { @@ -77,6 +79,11 @@ static struct { .lock = PTHREAD_MUTEX_INITIALIZER }; +/* forward declarations */ +static int +mp_send(struct rte_mp_msg *msg, const char *peer, int type); + + static struct sync_request * find_sync_request(const char *dst, const char *act_name) { @@ -91,6 +98,17 @@ find_sync_request(const char *dst, const char *act_name) return r; } +static void +create_socket_path(const char *name, char *buf, int len) +{ + const char *prefix = eal_mp_socket_path(); + + if (strlen(name) > 0) + snprintf(buf, len, "%s_%s", prefix, name); + else + snprintf(buf, len, "%s", prefix); +} + int rte_eal_primary_proc_alive(const char *config_file_path) { @@ -248,12 +266,13 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name); - if (m->type == MP_REP) { + if (m->type == MP_REP || m->type == MP_IGN) { pthread_mutex_lock(&sync_requests.lock); sync_req = find_sync_request(s->sun_path, msg->name); if (sync_req) { memcpy(sync_req->reply, msg, sizeof(*msg)); - sync_req->reply_received = 1; + /* -1 indicates that we've been asked to ignore */ + sync_req->reply_received = m->type == MP_REP ? 1 : -1; pthread_cond_signal(&sync_req->cond); } else RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name); @@ -267,10 +286,23 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s) action = entry->action; pthread_mutex_unlock(&mp_mutex_action); - if (!action) - RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name); - else if (action(msg, s->sun_path) < 0) + if (!action) { + if (m->type == MP_REQ && !internal_config.init_complete) { + /* if this is a request, and init is not yet complete, + * and callback wasn't registered, we should tell the + * requester to ignore our existence because we're not + * yet ready to process this request. + */ + struct rte_mp_msg dummy; + memset(&dummy, 0, sizeof(dummy)); + mp_send(&dummy, s->sun_path, MP_IGN); + } else { + RTE_LOG(ERR, EAL, "Cannot find action: %s\n", + msg->name); + } + } else if (action(msg, s->sun_path) < 0) { RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name); + } } static void * @@ -290,8 +322,12 @@ mp_handle(void *arg __rte_unused) static int open_socket_fd(void) { + char peer_name[PATH_MAX] = {0}; struct sockaddr_un un; - const char *prefix = eal_mp_socket_path(); + + if (rte_eal_process_type() == RTE_PROC_SECONDARY) + snprintf(peer_name, sizeof(peer_name), + "%d_%"PRIx64, getpid(), rte_rdtsc()); mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (mp_fd < 0) { @@ -301,13 +337,11 @@ open_socket_fd(void) memset(&un, 0, sizeof(un)); un.sun_family = AF_UNIX; - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix); - else { - snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64, - prefix, getpid(), rte_rdtsc()); - } + + create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path)); + unlink(un.sun_path); /* May still exist since last run */ + if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) { RTE_LOG(ERR, EAL, "failed to bind %s: %s\n", un.sun_path, strerror(errno)); @@ -342,47 +376,54 @@ unlink_sockets(const char *filter) return 0; } -static void -unlink_socket_by_path(const char *path) -{ - char *filename; - char *fullpath = strdup(path); - - if (!fullpath) - return; - filename = basename(fullpath); - unlink_sockets(filename); - free(fullpath); - RTE_LOG(INFO, EAL, "Remove socket %s\n", path); -} - int rte_mp_channel_init(void) { char thread_name[RTE_MAX_THREAD_NAME_LEN]; - char *path; + char path[PATH_MAX]; + int dir_fd; pthread_t tid; - snprintf(mp_filter, PATH_MAX, ".%s_unix_*", - internal_config.hugefile_prefix); + /* create filter path */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path)); + + /* path may have been modified, so recreate it */ + create_socket_path("*", path, sizeof(path)); + snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path)); + + /* lock the directory */ + dir_fd = open(mp_dir_path, O_RDONLY); + if (dir_fd < 0) { + RTE_LOG(ERR, EAL, "failed to open %s: %s\n", + mp_dir_path, strerror(errno)); + return -1; + } - path = strdup(eal_mp_socket_path()); - snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path)); - free(path); + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "failed to lock %s: %s\n", + mp_dir_path, strerror(errno)); + close(dir_fd); + return -1; + } if (rte_eal_process_type() == RTE_PROC_PRIMARY && - unlink_sockets(mp_filter)) { + unlink_sockets(mp_filter)) { RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n"); + close(dir_fd); return -1; } - if (open_socket_fd() < 0) + if (open_socket_fd() < 0) { + close(dir_fd); return -1; + } if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) { RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n", strerror(errno)); close(mp_fd); + close(dir_fd); mp_fd = -1; return -1; } @@ -390,6 +431,11 @@ rte_mp_channel_init(void) /* try best to set thread name */ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle"); rte_thread_setname(tid, thread_name); + + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + close(dir_fd); + return 0; } @@ -444,13 +490,12 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) if (snd < 0) { rte_errno = errno; /* Check if it caused by peer process exits */ - if (errno == -ECONNREFUSED) { - /* We don't unlink the primary's socket here */ - if (rte_eal_process_type() == RTE_PROC_PRIMARY) - unlink_socket_by_path(dst_path); + if (errno == ECONNREFUSED && + rte_eal_process_type() == RTE_PROC_PRIMARY) { + unlink(dst_path); return 0; } - if (errno == -ENOBUFS) { + if (errno == ENOBUFS) { RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n", dst_path); return 0; @@ -466,7 +511,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type) static int mp_send(struct rte_mp_msg *msg, const char *peer, int type) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; @@ -488,14 +533,32 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type) rte_errno = errno; return -1; } + + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + rte_errno = errno; + closedir(mp_dir); + return -1; + } + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + if (fnmatch(mp_filter, ent->d_name, 0) != 0) continue; - if (send_msg(ent->d_name, msg, type) < 0) + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + if (send_msg(path, msg, type) < 0) ret = -1; } + /* unlock the dir */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; } @@ -543,7 +606,6 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { int ret; - struct timeval now; struct rte_mp_msg msg, *tmp; struct sync_request sync_req, *exist; @@ -557,10 +619,10 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, exist = find_sync_request(dst, req->name); if (!exist) TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next); - pthread_mutex_unlock(&sync_requests.lock); if (exist) { RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name); rte_errno = EEXIST; + pthread_mutex_unlock(&sync_requests.lock); return -1; } @@ -574,21 +636,11 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, reply->nb_sent++; - pthread_mutex_lock(&sync_requests.lock); do { - pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts); - /* Check spurious wakeups */ - if (sync_req.reply_received == 1) - break; - /* Check if time is out */ - if (gettimeofday(&now, NULL) < 0) - break; - if (now.tv_sec < ts->tv_sec) - break; - else if (now.tv_sec == ts->tv_sec && - now.tv_usec * 1000 < ts->tv_nsec) - break; - } while (1); + ret = pthread_cond_timedwait(&sync_req.cond, + &sync_requests.lock, ts); + } while (ret != 0 && ret != ETIMEDOUT); + /* We got the lock now */ TAILQ_REMOVE(&sync_requests.requests, &sync_req, next); pthread_mutex_unlock(&sync_requests.lock); @@ -599,6 +651,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req, rte_errno = ETIMEDOUT; return -1; } + if (sync_req.reply_received == -1) { + RTE_LOG(DEBUG, EAL, "Asked to ignore response\n"); + /* not receiving this message is not an error, so decrement + * number of sent messages + */ + reply->nb_sent--; + return 0; + } tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1)); if (!tmp) { @@ -617,7 +677,7 @@ int __rte_experimental rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, const struct timespec *ts) { - int ret = 0; + int dir_fd, ret = 0; DIR *mp_dir; struct dirent *ent; struct timeval now; @@ -653,14 +713,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply, return -1; } + dir_fd = dirfd(mp_dir); + /* lock the directory to prevent processes spinning up while we send */ + if (flock(dir_fd, LOCK_EX)) { + RTE_LOG(ERR, EAL, "Unable to lock directory %s\n", + mp_dir_path); + closedir(mp_dir); + rte_errno = errno; + return -1; + } + while ((ent = readdir(mp_dir))) { + char path[PATH_MAX]; + if (fnmatch(mp_filter, ent->d_name, 0) != 0) continue; - if (mp_request_one(ent->d_name, req, reply, &end)) + snprintf(path, sizeof(path), "%s/%s", mp_dir_path, + ent->d_name); + + if (mp_request_one(path, req, reply, &end)) ret = -1; } + /* unlock the directory */ + flock(dir_fd, LOCK_UN); + /* dir_fd automatically closed on closedir */ closedir(mp_dir); return ret; }