eal: rename IPC request as synchronous one
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
index 1aab3ac..b704f5a 100644 (file)
@@ -13,6 +13,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/file.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -51,6 +52,7 @@ enum mp_type {
        MP_MSG, /* Share message with peers, will not block */
        MP_REQ, /* Request for information, Will block for a reply */
        MP_REP, /* Response to previously-received request */
+       MP_IGN, /* Response telling requester to ignore this response */
 };
 
 struct mp_msg_internal {
@@ -58,8 +60,8 @@ struct mp_msg_internal {
        struct rte_mp_msg msg;
 };
 
-struct sync_request {
-       TAILQ_ENTRY(sync_request) next;
+struct pending_request {
+       TAILQ_ENTRY(pending_request) next;
        int reply_received;
        char dst[PATH_MAX];
        struct rte_mp_msg *request;
@@ -67,22 +69,27 @@ struct sync_request {
        pthread_cond_t cond;
 };
 
-TAILQ_HEAD(sync_request_list, sync_request);
+TAILQ_HEAD(pending_request_list, pending_request);
 
 static struct {
-       struct sync_request_list requests;
+       struct pending_request_list requests;
        pthread_mutex_t lock;
-} sync_requests = {
-       .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
+} pending_requests = {
+       .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
        .lock = PTHREAD_MUTEX_INITIALIZER
 };
 
-static struct sync_request *
+/* forward declarations */
+static int
+mp_send(struct rte_mp_msg *msg, const char *peer, int type);
+
+
+static struct pending_request *
 find_sync_request(const char *dst, const char *act_name)
 {
-       struct sync_request *r;
+       struct pending_request *r;
 
-       TAILQ_FOREACH(r, &sync_requests.requests, next) {
+       TAILQ_FOREACH(r, &pending_requests.requests, next) {
                if (!strcmp(r->dst, dst) &&
                    !strcmp(r->request->name, act_name))
                        break;
@@ -252,23 +259,24 @@ read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 static void
 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
 {
-       struct sync_request *sync_req;
+       struct pending_request *sync_req;
        struct action_entry *entry;
        struct rte_mp_msg *msg = &m->msg;
        rte_mp_t action = NULL;
 
        RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
 
-       if (m->type == MP_REP) {
-               pthread_mutex_lock(&sync_requests.lock);
+       if (m->type == MP_REP || m->type == MP_IGN) {
+               pthread_mutex_lock(&pending_requests.lock);
                sync_req = find_sync_request(s->sun_path, msg->name);
                if (sync_req) {
                        memcpy(sync_req->reply, msg, sizeof(*msg));
-                       sync_req->reply_received = 1;
+                       /* -1 indicates that we've been asked to ignore */
+                       sync_req->reply_received = m->type == MP_REP ? 1 : -1;
                        pthread_cond_signal(&sync_req->cond);
                } else
                        RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
-               pthread_mutex_unlock(&sync_requests.lock);
+               pthread_mutex_unlock(&pending_requests.lock);
                return;
        }
 
@@ -278,10 +286,23 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
                action = entry->action;
        pthread_mutex_unlock(&mp_mutex_action);
 
-       if (!action)
-               RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name);
-       else if (action(msg, s->sun_path) < 0)
+       if (!action) {
+               if (m->type == MP_REQ && !internal_config.init_complete) {
+                       /* if this is a request, and init is not yet complete,
+                        * and callback wasn't registered, we should tell the
+                        * requester to ignore our existence because we're not
+                        * yet ready to process this request.
+                        */
+                       struct rte_mp_msg dummy;
+                       memset(&dummy, 0, sizeof(dummy));
+                       mp_send(&dummy, s->sun_path, MP_IGN);
+               } else {
+                       RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
+                               msg->name);
+               }
+       } else if (action(msg, s->sun_path) < 0) {
                RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
+       }
 }
 
 static void *
@@ -359,29 +380,50 @@ int
 rte_mp_channel_init(void)
 {
        char thread_name[RTE_MAX_THREAD_NAME_LEN];
-       char *path;
+       char path[PATH_MAX];
+       int dir_fd;
        pthread_t tid;
 
-       snprintf(mp_filter, PATH_MAX, ".%s_unix_*",
-                internal_config.hugefile_prefix);
+       /* create filter path */
+       create_socket_path("*", path, sizeof(path));
+       snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path));
+
+       /* path may have been modified, so recreate it */
+       create_socket_path("*", path, sizeof(path));
+       snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path));
 
-       path = strdup(eal_mp_socket_path());
-       snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path));
-       free(path);
+       /* lock the directory */
+       dir_fd = open(mp_dir_path, O_RDONLY);
+       if (dir_fd < 0) {
+               RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
+                       mp_dir_path, strerror(errno));
+               return -1;
+       }
+
+       if (flock(dir_fd, LOCK_EX)) {
+               RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
+                       mp_dir_path, strerror(errno));
+               close(dir_fd);
+               return -1;
+       }
 
        if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
-           unlink_sockets(mp_filter)) {
+                       unlink_sockets(mp_filter)) {
                RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
+               close(dir_fd);
                return -1;
        }
 
-       if (open_socket_fd() < 0)
+       if (open_socket_fd() < 0) {
+               close(dir_fd);
                return -1;
+       }
 
        if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
                RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
                        strerror(errno));
                close(mp_fd);
+               close(dir_fd);
                mp_fd = -1;
                return -1;
        }
@@ -389,6 +431,11 @@ rte_mp_channel_init(void)
        /* try best to set thread name */
        snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
        rte_thread_setname(tid, thread_name);
+
+       /* unlock the directory */
+       flock(dir_fd, LOCK_UN);
+       close(dir_fd);
+
        return 0;
 }
 
@@ -464,7 +511,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
 static int
 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
 {
-       int ret = 0;
+       int dir_fd, ret = 0;
        DIR *mp_dir;
        struct dirent *ent;
 
@@ -486,6 +533,17 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
                rte_errno = errno;
                return -1;
        }
+
+       dir_fd = dirfd(mp_dir);
+       /* lock the directory to prevent processes spinning up while we send */
+       if (flock(dir_fd, LOCK_EX)) {
+               RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+                       mp_dir_path);
+               rte_errno = errno;
+               closedir(mp_dir);
+               return -1;
+       }
+
        while ((ent = readdir(mp_dir))) {
                char path[PATH_MAX];
 
@@ -497,7 +555,10 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
                if (send_msg(path, msg, type) < 0)
                        ret = -1;
        }
+       /* unlock the dir */
+       flock(dir_fd, LOCK_UN);
 
+       /* dir_fd automatically closed on closedir */
        closedir(mp_dir);
        return ret;
 }
@@ -545,9 +606,8 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
               struct rte_mp_reply *reply, const struct timespec *ts)
 {
        int ret;
-       struct timeval now;
        struct rte_mp_msg msg, *tmp;
-       struct sync_request sync_req, *exist;
+       struct pending_request sync_req, *exist;
 
        sync_req.reply_received = 0;
        strcpy(sync_req.dst, dst);
@@ -555,14 +615,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
        sync_req.reply = &msg;
        pthread_cond_init(&sync_req.cond, NULL);
 
-       pthread_mutex_lock(&sync_requests.lock);
+       pthread_mutex_lock(&pending_requests.lock);
        exist = find_sync_request(dst, req->name);
        if (!exist)
-               TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next);
+               TAILQ_INSERT_TAIL(&pending_requests.requests, &sync_req, next);
        if (exist) {
                RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
                rte_errno = EEXIST;
-               pthread_mutex_unlock(&sync_requests.lock);
+               pthread_mutex_unlock(&pending_requests.lock);
                return -1;
        }
 
@@ -577,22 +637,13 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
        reply->nb_sent++;
 
        do {
-               pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts);
-               /* Check spurious wakeups */
-               if (sync_req.reply_received == 1)
-                       break;
-               /* Check if time is out */
-               if (gettimeofday(&now, NULL) < 0)
-                       break;
-               if (ts->tv_sec < now.tv_sec)
-                       break;
-               else if (now.tv_sec == ts->tv_sec &&
-                        now.tv_usec * 1000 < ts->tv_nsec)
-                       break;
-       } while (1);
+               ret = pthread_cond_timedwait(&sync_req.cond,
+                               &pending_requests.lock, ts);
+       } while (ret != 0 && ret != ETIMEDOUT);
+
        /* We got the lock now */
-       TAILQ_REMOVE(&sync_requests.requests, &sync_req, next);
-       pthread_mutex_unlock(&sync_requests.lock);
+       TAILQ_REMOVE(&pending_requests.requests, &sync_req, next);
+       pthread_mutex_unlock(&pending_requests.lock);
 
        if (sync_req.reply_received == 0) {
                RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
@@ -600,6 +651,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
                rte_errno = ETIMEDOUT;
                return -1;
        }
+       if (sync_req.reply_received == -1) {
+               RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
+               /* not receiving this message is not an error, so decrement
+                * number of sent messages
+                */
+               reply->nb_sent--;
+               return 0;
+       }
 
        tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
        if (!tmp) {
@@ -615,10 +674,10 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
 }
 
 int __rte_experimental
-rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
+rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
                const struct timespec *ts)
 {
-       int ret = 0;
+       int dir_fd, ret = 0;
        DIR *mp_dir;
        struct dirent *ent;
        struct timeval now;
@@ -654,6 +713,16 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
                return -1;
        }
 
+       dir_fd = dirfd(mp_dir);
+       /* lock the directory to prevent processes spinning up while we send */
+       if (flock(dir_fd, LOCK_EX)) {
+               RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+                       mp_dir_path);
+               closedir(mp_dir);
+               rte_errno = errno;
+               return -1;
+       }
+
        while ((ent = readdir(mp_dir))) {
                char path[PATH_MAX];
 
@@ -666,7 +735,10 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
                if (mp_request_one(path, req, reply, &end))
                        ret = -1;
        }
+       /* unlock the directory */
+       flock(dir_fd, LOCK_UN);
 
+       /* dir_fd automatically closed on closedir */
        closedir(mp_dir);
        return ret;
 }