ipc: remove thread for async requests
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_private.h"
33 #include "eal_filesystem.h"
34 #include "eal_internal_cfg.h"
35
36 static int mp_fd = -1;
37 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
38 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
39 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
40
41 struct action_entry {
42         TAILQ_ENTRY(action_entry) next;
43         char action_name[RTE_MP_MAX_NAME_LEN];
44         rte_mp_t action;
45 };
46
47 /** Double linked list of actions. */
48 TAILQ_HEAD(action_entry_list, action_entry);
49
50 static struct action_entry_list action_entry_list =
51         TAILQ_HEAD_INITIALIZER(action_entry_list);
52
53 enum mp_type {
54         MP_MSG, /* Share message with peers, will not block */
55         MP_REQ, /* Request for information, Will block for a reply */
56         MP_REP, /* Response to previously-received request */
57         MP_IGN, /* Response telling requester to ignore this response */
58 };
59
60 struct mp_msg_internal {
61         int type;
62         struct rte_mp_msg msg;
63 };
64
65 struct async_request_param {
66         rte_mp_async_reply_t clb;
67         struct rte_mp_reply user_reply;
68         struct timespec end;
69         int n_responses_processed;
70 };
71
72 struct pending_request {
73         TAILQ_ENTRY(pending_request) next;
74         enum {
75                 REQUEST_TYPE_SYNC,
76                 REQUEST_TYPE_ASYNC
77         } type;
78         char dst[PATH_MAX];
79         struct rte_mp_msg *request;
80         struct rte_mp_msg *reply;
81         int reply_received;
82         RTE_STD_C11
83         union {
84                 struct {
85                         struct async_request_param *param;
86                 } async;
87                 struct {
88                         pthread_cond_t cond;
89                 } sync;
90         };
91 };
92
93 TAILQ_HEAD(pending_request_list, pending_request);
94
95 static struct {
96         struct pending_request_list requests;
97         pthread_mutex_t lock;
98 } pending_requests = {
99         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
100         .lock = PTHREAD_MUTEX_INITIALIZER,
101         /**< used in async requests only */
102 };
103
104 /* forward declarations */
105 static int
106 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
107
108 /* for use with alarm callback */
109 static void
110 async_reply_handle(void *arg);
111
112 /* for use with process_msg */
113 static struct pending_request *
114 async_reply_handle_thread_unsafe(void *arg);
115
116 static void
117 trigger_async_action(struct pending_request *req);
118
119 static struct pending_request *
120 find_pending_request(const char *dst, const char *act_name)
121 {
122         struct pending_request *r;
123
124         TAILQ_FOREACH(r, &pending_requests.requests, next) {
125                 if (!strcmp(r->dst, dst) &&
126                     !strcmp(r->request->name, act_name))
127                         break;
128         }
129
130         return r;
131 }
132
133 static void
134 create_socket_path(const char *name, char *buf, int len)
135 {
136         const char *prefix = eal_mp_socket_path();
137
138         if (strlen(name) > 0)
139                 snprintf(buf, len, "%s_%s", prefix, name);
140         else
141                 strlcpy(buf, prefix, len);
142 }
143
144 int
145 rte_eal_primary_proc_alive(const char *config_file_path)
146 {
147         int config_fd;
148
149         if (config_file_path)
150                 config_fd = open(config_file_path, O_RDONLY);
151         else {
152                 const char *path;
153
154                 path = eal_runtime_config_path();
155                 config_fd = open(path, O_RDONLY);
156         }
157         if (config_fd < 0)
158                 return 0;
159
160         int ret = lockf(config_fd, F_TEST, 0);
161         close(config_fd);
162
163         return !!ret;
164 }
165
166 static struct action_entry *
167 find_action_entry_by_name(const char *name)
168 {
169         struct action_entry *entry;
170
171         TAILQ_FOREACH(entry, &action_entry_list, next) {
172                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
173                         break;
174         }
175
176         return entry;
177 }
178
179 static int
180 validate_action_name(const char *name)
181 {
182         if (name == NULL) {
183                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
184                 rte_errno = EINVAL;
185                 return -1;
186         }
187         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
188                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
189                 rte_errno = EINVAL;
190                 return -1;
191         }
192         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
193                 rte_errno = E2BIG;
194                 return -1;
195         }
196         return 0;
197 }
198
199 int __rte_experimental
200 rte_mp_action_register(const char *name, rte_mp_t action)
201 {
202         struct action_entry *entry;
203
204         if (validate_action_name(name))
205                 return -1;
206
207         entry = malloc(sizeof(struct action_entry));
208         if (entry == NULL) {
209                 rte_errno = ENOMEM;
210                 return -1;
211         }
212         strlcpy(entry->action_name, name, sizeof(entry->action_name));
213         entry->action = action;
214
215         pthread_mutex_lock(&mp_mutex_action);
216         if (find_action_entry_by_name(name) != NULL) {
217                 pthread_mutex_unlock(&mp_mutex_action);
218                 rte_errno = EEXIST;
219                 free(entry);
220                 return -1;
221         }
222         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
223         pthread_mutex_unlock(&mp_mutex_action);
224         return 0;
225 }
226
227 void __rte_experimental
228 rte_mp_action_unregister(const char *name)
229 {
230         struct action_entry *entry;
231
232         if (validate_action_name(name))
233                 return;
234
235         pthread_mutex_lock(&mp_mutex_action);
236         entry = find_action_entry_by_name(name);
237         if (entry == NULL) {
238                 pthread_mutex_unlock(&mp_mutex_action);
239                 return;
240         }
241         TAILQ_REMOVE(&action_entry_list, entry, next);
242         pthread_mutex_unlock(&mp_mutex_action);
243         free(entry);
244 }
245
246 static int
247 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
248 {
249         int msglen;
250         struct iovec iov;
251         struct msghdr msgh;
252         char control[CMSG_SPACE(sizeof(m->msg.fds))];
253         struct cmsghdr *cmsg;
254         int buflen = sizeof(*m) - sizeof(m->msg.fds);
255
256         memset(&msgh, 0, sizeof(msgh));
257         iov.iov_base = m;
258         iov.iov_len  = buflen;
259
260         msgh.msg_name = s;
261         msgh.msg_namelen = sizeof(*s);
262         msgh.msg_iov = &iov;
263         msgh.msg_iovlen = 1;
264         msgh.msg_control = control;
265         msgh.msg_controllen = sizeof(control);
266
267         msglen = recvmsg(mp_fd, &msgh, 0);
268         if (msglen < 0) {
269                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
270                 return -1;
271         }
272
273         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
274                 RTE_LOG(ERR, EAL, "truncted msg\n");
275                 return -1;
276         }
277
278         /* read auxiliary FDs if any */
279         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
280                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
281                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
282                         (cmsg->cmsg_type == SCM_RIGHTS)) {
283                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
284                         break;
285                 }
286         }
287
288         return 0;
289 }
290
291 static void
292 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
293 {
294         struct pending_request *pending_req;
295         struct action_entry *entry;
296         struct rte_mp_msg *msg = &m->msg;
297         rte_mp_t action = NULL;
298
299         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
300
301         if (m->type == MP_REP || m->type == MP_IGN) {
302                 struct pending_request *req = NULL;
303
304                 pthread_mutex_lock(&pending_requests.lock);
305                 pending_req = find_pending_request(s->sun_path, msg->name);
306                 if (pending_req) {
307                         memcpy(pending_req->reply, msg, sizeof(*msg));
308                         /* -1 indicates that we've been asked to ignore */
309                         pending_req->reply_received =
310                                 m->type == MP_REP ? 1 : -1;
311
312                         if (pending_req->type == REQUEST_TYPE_SYNC)
313                                 pthread_cond_signal(&pending_req->sync.cond);
314                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
315                                 req = async_reply_handle_thread_unsafe(
316                                                 pending_req);
317                 } else
318                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
319                 pthread_mutex_unlock(&pending_requests.lock);
320
321                 if (req != NULL)
322                         trigger_async_action(req);
323                 return;
324         }
325
326         pthread_mutex_lock(&mp_mutex_action);
327         entry = find_action_entry_by_name(msg->name);
328         if (entry != NULL)
329                 action = entry->action;
330         pthread_mutex_unlock(&mp_mutex_action);
331
332         if (!action) {
333                 if (m->type == MP_REQ && !internal_config.init_complete) {
334                         /* if this is a request, and init is not yet complete,
335                          * and callback wasn't registered, we should tell the
336                          * requester to ignore our existence because we're not
337                          * yet ready to process this request.
338                          */
339                         struct rte_mp_msg dummy;
340
341                         memset(&dummy, 0, sizeof(dummy));
342                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
343                         mp_send(&dummy, s->sun_path, MP_IGN);
344                 } else {
345                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
346                                 msg->name);
347                 }
348         } else if (action(msg, s->sun_path) < 0) {
349                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
350         }
351 }
352
353 static void *
354 mp_handle(void *arg __rte_unused)
355 {
356         struct mp_msg_internal msg;
357         struct sockaddr_un sa;
358
359         while (1) {
360                 if (read_msg(&msg, &sa) == 0)
361                         process_msg(&msg, &sa);
362         }
363
364         return NULL;
365 }
366
367 static int
368 timespec_cmp(const struct timespec *a, const struct timespec *b)
369 {
370         if (a->tv_sec < b->tv_sec)
371                 return -1;
372         if (a->tv_sec > b->tv_sec)
373                 return 1;
374         if (a->tv_nsec < b->tv_nsec)
375                 return -1;
376         if (a->tv_nsec > b->tv_nsec)
377                 return 1;
378         return 0;
379 }
380
381 enum async_action {
382         ACTION_FREE, /**< free the action entry, but don't trigger callback */
383         ACTION_TRIGGER /**< trigger callback, then free action entry */
384 };
385
386 static enum async_action
387 process_async_request(struct pending_request *sr, const struct timespec *now)
388 {
389         struct async_request_param *param;
390         struct rte_mp_reply *reply;
391         bool timeout, last_msg;
392
393         param = sr->async.param;
394         reply = &param->user_reply;
395
396         /* did we timeout? */
397         timeout = timespec_cmp(&param->end, now) <= 0;
398
399         /* if we received a response, adjust relevant data and copy mesasge. */
400         if (sr->reply_received == 1 && sr->reply) {
401                 struct rte_mp_msg *msg, *user_msgs, *tmp;
402
403                 msg = sr->reply;
404                 user_msgs = reply->msgs;
405
406                 tmp = realloc(user_msgs, sizeof(*msg) *
407                                 (reply->nb_received + 1));
408                 if (!tmp) {
409                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
410                                 sr->dst, sr->request->name);
411                         /* this entry is going to be removed and its message
412                          * dropped, but we don't want to leak memory, so
413                          * continue.
414                          */
415                 } else {
416                         user_msgs = tmp;
417                         reply->msgs = user_msgs;
418                         memcpy(&user_msgs[reply->nb_received],
419                                         msg, sizeof(*msg));
420                         reply->nb_received++;
421                 }
422
423                 /* mark this request as processed */
424                 param->n_responses_processed++;
425         } else if (sr->reply_received == -1) {
426                 /* we were asked to ignore this process */
427                 reply->nb_sent--;
428         } else if (timeout) {
429                 /* count it as processed response, but don't increment
430                  * nb_received.
431                  */
432                 param->n_responses_processed++;
433         }
434
435         free(sr->reply);
436
437         last_msg = param->n_responses_processed == reply->nb_sent;
438
439         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
440 }
441
442 static void
443 trigger_async_action(struct pending_request *sr)
444 {
445         struct async_request_param *param;
446         struct rte_mp_reply *reply;
447
448         param = sr->async.param;
449         reply = &param->user_reply;
450
451         param->clb(sr->request, reply);
452
453         /* clean up */
454         free(sr->async.param->user_reply.msgs);
455         free(sr->async.param);
456         free(sr->request);
457         free(sr);
458 }
459
460 static struct pending_request *
461 async_reply_handle_thread_unsafe(void *arg)
462 {
463         struct pending_request *req = (struct pending_request *)arg;
464         enum async_action action;
465         struct timespec ts_now;
466         struct timeval now;
467
468         if (gettimeofday(&now, NULL) < 0) {
469                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
470                 goto no_trigger;
471         }
472         ts_now.tv_nsec = now.tv_usec * 1000;
473         ts_now.tv_sec = now.tv_sec;
474
475         action = process_async_request(req, &ts_now);
476
477         TAILQ_REMOVE(&pending_requests.requests, req, next);
478
479         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
480                 /* if we failed to cancel the alarm because it's already in
481                  * progress, don't proceed because otherwise we will end up
482                  * handling the same message twice.
483                  */
484                 if (rte_errno == EINPROGRESS) {
485                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
486                         goto no_trigger;
487                 }
488                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
489         }
490
491         if (action == ACTION_TRIGGER)
492                 return req;
493 no_trigger:
494         free(req);
495         return NULL;
496 }
497
498 static void
499 async_reply_handle(void *arg)
500 {
501         struct pending_request *req;
502
503         pthread_mutex_lock(&pending_requests.lock);
504         req = async_reply_handle_thread_unsafe(arg);
505         pthread_mutex_unlock(&pending_requests.lock);
506
507         if (req != NULL)
508                 trigger_async_action(req);
509 }
510
511 static int
512 open_socket_fd(void)
513 {
514         char peer_name[PATH_MAX] = {0};
515         struct sockaddr_un un;
516
517         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
518                 snprintf(peer_name, sizeof(peer_name),
519                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
520
521         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
522         if (mp_fd < 0) {
523                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
524                 return -1;
525         }
526
527         memset(&un, 0, sizeof(un));
528         un.sun_family = AF_UNIX;
529
530         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
531
532         unlink(un.sun_path); /* May still exist since last run */
533
534         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
535                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
536                         un.sun_path, strerror(errno));
537                 close(mp_fd);
538                 return -1;
539         }
540
541         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
542         return mp_fd;
543 }
544
545 static int
546 unlink_sockets(const char *filter)
547 {
548         int dir_fd;
549         DIR *mp_dir;
550         struct dirent *ent;
551
552         mp_dir = opendir(mp_dir_path);
553         if (!mp_dir) {
554                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
555                 return -1;
556         }
557         dir_fd = dirfd(mp_dir);
558
559         while ((ent = readdir(mp_dir))) {
560                 if (fnmatch(filter, ent->d_name, 0) == 0)
561                         unlinkat(dir_fd, ent->d_name, 0);
562         }
563
564         closedir(mp_dir);
565         return 0;
566 }
567
568 int
569 rte_mp_channel_init(void)
570 {
571         char path[PATH_MAX];
572         int dir_fd;
573         pthread_t mp_handle_tid;
574
575         /* create filter path */
576         create_socket_path("*", path, sizeof(path));
577         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
578
579         /* path may have been modified, so recreate it */
580         create_socket_path("*", path, sizeof(path));
581         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
582
583         /* lock the directory */
584         dir_fd = open(mp_dir_path, O_RDONLY);
585         if (dir_fd < 0) {
586                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
587                         mp_dir_path, strerror(errno));
588                 return -1;
589         }
590
591         if (flock(dir_fd, LOCK_EX)) {
592                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
593                         mp_dir_path, strerror(errno));
594                 close(dir_fd);
595                 return -1;
596         }
597
598         if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
599                         unlink_sockets(mp_filter)) {
600                 RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
601                 close(dir_fd);
602                 return -1;
603         }
604
605         if (open_socket_fd() < 0) {
606                 close(dir_fd);
607                 return -1;
608         }
609
610         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
611                         NULL, mp_handle, NULL) < 0) {
612                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
613                         strerror(errno));
614                 close(mp_fd);
615                 close(dir_fd);
616                 mp_fd = -1;
617                 return -1;
618         }
619
620         /* unlock the directory */
621         flock(dir_fd, LOCK_UN);
622         close(dir_fd);
623
624         return 0;
625 }
626
627 /**
628  * Return -1, as fail to send message and it's caused by the local side.
629  * Return 0, as fail to send message and it's caused by the remote side.
630  * Return 1, as succeed to send message.
631  *
632  */
633 static int
634 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
635 {
636         int snd;
637         struct iovec iov;
638         struct msghdr msgh;
639         struct cmsghdr *cmsg;
640         struct sockaddr_un dst;
641         struct mp_msg_internal m;
642         int fd_size = msg->num_fds * sizeof(int);
643         char control[CMSG_SPACE(fd_size)];
644
645         m.type = type;
646         memcpy(&m.msg, msg, sizeof(*msg));
647
648         memset(&dst, 0, sizeof(dst));
649         dst.sun_family = AF_UNIX;
650         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
651
652         memset(&msgh, 0, sizeof(msgh));
653         memset(control, 0, sizeof(control));
654
655         iov.iov_base = &m;
656         iov.iov_len = sizeof(m) - sizeof(msg->fds);
657
658         msgh.msg_name = &dst;
659         msgh.msg_namelen = sizeof(dst);
660         msgh.msg_iov = &iov;
661         msgh.msg_iovlen = 1;
662         msgh.msg_control = control;
663         msgh.msg_controllen = sizeof(control);
664
665         cmsg = CMSG_FIRSTHDR(&msgh);
666         cmsg->cmsg_len = CMSG_LEN(fd_size);
667         cmsg->cmsg_level = SOL_SOCKET;
668         cmsg->cmsg_type = SCM_RIGHTS;
669         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
670
671         do {
672                 snd = sendmsg(mp_fd, &msgh, 0);
673         } while (snd < 0 && errno == EINTR);
674
675         if (snd < 0) {
676                 rte_errno = errno;
677                 /* Check if it caused by peer process exits */
678                 if (errno == ECONNREFUSED &&
679                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
680                         unlink(dst_path);
681                         return 0;
682                 }
683                 if (errno == ENOBUFS) {
684                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
685                                 dst_path);
686                         return 0;
687                 }
688                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
689                         dst_path, strerror(errno));
690                 return -1;
691         }
692
693         return 1;
694 }
695
696 static int
697 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
698 {
699         int dir_fd, ret = 0;
700         DIR *mp_dir;
701         struct dirent *ent;
702
703         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
704                 peer = eal_mp_socket_path();
705
706         if (peer) {
707                 if (send_msg(peer, msg, type) < 0)
708                         return -1;
709                 else
710                         return 0;
711         }
712
713         /* broadcast to all secondary processes */
714         mp_dir = opendir(mp_dir_path);
715         if (!mp_dir) {
716                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
717                                 mp_dir_path);
718                 rte_errno = errno;
719                 return -1;
720         }
721
722         dir_fd = dirfd(mp_dir);
723         /* lock the directory to prevent processes spinning up while we send */
724         if (flock(dir_fd, LOCK_SH)) {
725                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
726                         mp_dir_path);
727                 rte_errno = errno;
728                 closedir(mp_dir);
729                 return -1;
730         }
731
732         while ((ent = readdir(mp_dir))) {
733                 char path[PATH_MAX];
734
735                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
736                         continue;
737
738                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
739                          ent->d_name);
740                 if (send_msg(path, msg, type) < 0)
741                         ret = -1;
742         }
743         /* unlock the dir */
744         flock(dir_fd, LOCK_UN);
745
746         /* dir_fd automatically closed on closedir */
747         closedir(mp_dir);
748         return ret;
749 }
750
751 static bool
752 check_input(const struct rte_mp_msg *msg)
753 {
754         if (msg == NULL) {
755                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
756                 rte_errno = EINVAL;
757                 return false;
758         }
759
760         if (validate_action_name(msg->name))
761                 return false;
762
763         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
764                 RTE_LOG(ERR, EAL, "Message data is too long\n");
765                 rte_errno = E2BIG;
766                 return false;
767         }
768
769         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
770                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
771                         RTE_MP_MAX_FD_NUM);
772                 rte_errno = E2BIG;
773                 return false;
774         }
775
776         return true;
777 }
778
779 int __rte_experimental
780 rte_mp_sendmsg(struct rte_mp_msg *msg)
781 {
782         if (!check_input(msg))
783                 return -1;
784
785         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
786         return mp_send(msg, NULL, MP_MSG);
787 }
788
789 static int
790 mp_request_async(const char *dst, struct rte_mp_msg *req,
791                 struct async_request_param *param, const struct timespec *ts)
792 {
793         struct rte_mp_msg *reply_msg;
794         struct pending_request *pending_req, *exist;
795         int ret;
796
797         pending_req = calloc(1, sizeof(*pending_req));
798         reply_msg = calloc(1, sizeof(*reply_msg));
799         if (pending_req == NULL || reply_msg == NULL) {
800                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
801                 rte_errno = ENOMEM;
802                 ret = -1;
803                 goto fail;
804         }
805
806         pending_req->type = REQUEST_TYPE_ASYNC;
807         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
808         pending_req->request = req;
809         pending_req->reply = reply_msg;
810         pending_req->async.param = param;
811
812         /* queue already locked by caller */
813
814         exist = find_pending_request(dst, req->name);
815         if (exist) {
816                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
817                 rte_errno = EEXIST;
818                 ret = -1;
819                 goto fail;
820         }
821
822         ret = send_msg(dst, req, MP_REQ);
823         if (ret < 0) {
824                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
825                         dst, req->name);
826                 ret = -1;
827                 goto fail;
828         } else if (ret == 0) {
829                 ret = 0;
830                 goto fail;
831         }
832         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
833
834         param->user_reply.nb_sent++;
835
836         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
837                               async_reply_handle, pending_req) < 0) {
838                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
839                         dst, req->name);
840                 rte_panic("Fix the above shit to properly free all memory\n");
841         }
842
843         return 0;
844 fail:
845         free(pending_req);
846         free(reply_msg);
847         return ret;
848 }
849
850 static int
851 mp_request_sync(const char *dst, struct rte_mp_msg *req,
852                struct rte_mp_reply *reply, const struct timespec *ts)
853 {
854         int ret;
855         struct rte_mp_msg msg, *tmp;
856         struct pending_request pending_req, *exist;
857
858         pending_req.type = REQUEST_TYPE_SYNC;
859         pending_req.reply_received = 0;
860         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
861         pending_req.request = req;
862         pending_req.reply = &msg;
863         pthread_cond_init(&pending_req.sync.cond, NULL);
864
865         exist = find_pending_request(dst, req->name);
866         if (exist) {
867                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
868                 rte_errno = EEXIST;
869                 return -1;
870         }
871
872         ret = send_msg(dst, req, MP_REQ);
873         if (ret < 0) {
874                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
875                         dst, req->name);
876                 return -1;
877         } else if (ret == 0)
878                 return 0;
879
880         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
881
882         reply->nb_sent++;
883
884         do {
885                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
886                                 &pending_requests.lock, ts);
887         } while (ret != 0 && ret != ETIMEDOUT);
888
889         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
890
891         if (pending_req.reply_received == 0) {
892                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
893                         dst, req->name);
894                 rte_errno = ETIMEDOUT;
895                 return -1;
896         }
897         if (pending_req.reply_received == -1) {
898                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
899                 /* not receiving this message is not an error, so decrement
900                  * number of sent messages
901                  */
902                 reply->nb_sent--;
903                 return 0;
904         }
905
906         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
907         if (!tmp) {
908                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
909                         dst, req->name);
910                 rte_errno = ENOMEM;
911                 return -1;
912         }
913         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
914         reply->msgs = tmp;
915         reply->nb_received++;
916         return 0;
917 }
918
919 int __rte_experimental
920 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
921                 const struct timespec *ts)
922 {
923         int dir_fd, ret = 0;
924         DIR *mp_dir;
925         struct dirent *ent;
926         struct timeval now;
927         struct timespec end;
928
929         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
930
931         if (check_input(req) == false)
932                 return -1;
933         if (gettimeofday(&now, NULL) < 0) {
934                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
935                 rte_errno = errno;
936                 return -1;
937         }
938
939         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
940         end.tv_sec = now.tv_sec + ts->tv_sec +
941                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
942
943         reply->nb_sent = 0;
944         reply->nb_received = 0;
945         reply->msgs = NULL;
946
947         /* for secondary process, send request to the primary process only */
948         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
949                 pthread_mutex_lock(&pending_requests.lock);
950                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
951                 pthread_mutex_unlock(&pending_requests.lock);
952                 return ret;
953         }
954
955         /* for primary process, broadcast request, and collect reply 1 by 1 */
956         mp_dir = opendir(mp_dir_path);
957         if (!mp_dir) {
958                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
959                 rte_errno = errno;
960                 return -1;
961         }
962
963         dir_fd = dirfd(mp_dir);
964         /* lock the directory to prevent processes spinning up while we send */
965         if (flock(dir_fd, LOCK_SH)) {
966                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
967                         mp_dir_path);
968                 closedir(mp_dir);
969                 rte_errno = errno;
970                 return -1;
971         }
972
973         pthread_mutex_lock(&pending_requests.lock);
974         while ((ent = readdir(mp_dir))) {
975                 char path[PATH_MAX];
976
977                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
978                         continue;
979
980                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
981                          ent->d_name);
982
983                 /* unlocks the mutex while waiting for response,
984                  * locks on receive
985                  */
986                 if (mp_request_sync(path, req, reply, &end))
987                         ret = -1;
988         }
989         pthread_mutex_unlock(&pending_requests.lock);
990         /* unlock the directory */
991         flock(dir_fd, LOCK_UN);
992
993         /* dir_fd automatically closed on closedir */
994         closedir(mp_dir);
995         return ret;
996 }
997
998 int __rte_experimental
999 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
1000                 rte_mp_async_reply_t clb)
1001 {
1002         struct rte_mp_msg *copy;
1003         struct pending_request *dummy;
1004         struct async_request_param *param;
1005         struct rte_mp_reply *reply;
1006         int dir_fd, ret = 0;
1007         DIR *mp_dir;
1008         struct dirent *ent;
1009         struct timeval now;
1010         struct timespec *end;
1011         bool dummy_used = false;
1012
1013         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
1014
1015         if (check_input(req) == false)
1016                 return -1;
1017         if (gettimeofday(&now, NULL) < 0) {
1018                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
1019                 rte_errno = errno;
1020                 return -1;
1021         }
1022         copy = calloc(1, sizeof(*copy));
1023         dummy = calloc(1, sizeof(*dummy));
1024         param = calloc(1, sizeof(*param));
1025         if (copy == NULL || dummy == NULL || param == NULL) {
1026                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1027                 rte_errno = ENOMEM;
1028                 goto fail;
1029         }
1030
1031         /* copy message */
1032         memcpy(copy, req, sizeof(*copy));
1033
1034         param->n_responses_processed = 0;
1035         param->clb = clb;
1036         end = &param->end;
1037         reply = &param->user_reply;
1038
1039         end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
1040         end->tv_sec = now.tv_sec + ts->tv_sec +
1041                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
1042         reply->nb_sent = 0;
1043         reply->nb_received = 0;
1044         reply->msgs = NULL;
1045
1046         /* we have to lock the request queue here, as we will be adding a bunch
1047          * of requests to the queue at once, and some of the replies may arrive
1048          * before we add all of the requests to the queue.
1049          */
1050         pthread_mutex_lock(&pending_requests.lock);
1051
1052         /* we have to ensure that callback gets triggered even if we don't send
1053          * anything, therefore earlier we have allocated a dummy request. fill
1054          * it, and put it on the queue if we don't send any requests.
1055          */
1056         dummy->type = REQUEST_TYPE_ASYNC;
1057         dummy->request = copy;
1058         dummy->reply = NULL;
1059         dummy->async.param = param;
1060         dummy->reply_received = 1; /* short-circuit the timeout */
1061
1062         /* for secondary process, send request to the primary process only */
1063         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1064                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1065
1066                 /* if we didn't send anything, put dummy request on the queue */
1067                 if (ret == 0 && reply->nb_sent == 0) {
1068                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1069                                         next);
1070                         dummy_used = true;
1071                 }
1072
1073                 pthread_mutex_unlock(&pending_requests.lock);
1074
1075                 /* if we couldn't send anything, clean up */
1076                 if (ret != 0)
1077                         goto fail;
1078                 return 0;
1079         }
1080
1081         /* for primary process, broadcast request */
1082         mp_dir = opendir(mp_dir_path);
1083         if (!mp_dir) {
1084                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1085                 rte_errno = errno;
1086                 goto unlock_fail;
1087         }
1088         dir_fd = dirfd(mp_dir);
1089
1090         /* lock the directory to prevent processes spinning up while we send */
1091         if (flock(dir_fd, LOCK_SH)) {
1092                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1093                         mp_dir_path);
1094                 rte_errno = errno;
1095                 goto closedir_fail;
1096         }
1097
1098         while ((ent = readdir(mp_dir))) {
1099                 char path[PATH_MAX];
1100
1101                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1102                         continue;
1103
1104                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1105                          ent->d_name);
1106
1107                 if (mp_request_async(path, copy, param, ts))
1108                         ret = -1;
1109         }
1110         /* if we didn't send anything, put dummy request on the queue */
1111         if (ret == 0 && reply->nb_sent == 0) {
1112                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1113                 dummy_used = true;
1114         }
1115
1116         /* finally, unlock the queue */
1117         pthread_mutex_unlock(&pending_requests.lock);
1118
1119         /* unlock the directory */
1120         flock(dir_fd, LOCK_UN);
1121
1122         /* dir_fd automatically closed on closedir */
1123         closedir(mp_dir);
1124
1125         /* if dummy was unused, free it */
1126         if (!dummy_used)
1127                 free(dummy);
1128
1129         return ret;
1130 closedir_fail:
1131         closedir(mp_dir);
1132 unlock_fail:
1133         pthread_mutex_unlock(&pending_requests.lock);
1134 fail:
1135         free(dummy);
1136         free(param);
1137         free(copy);
1138         return -1;
1139 }
1140
1141 int __rte_experimental
1142 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1143 {
1144         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1145
1146         if (check_input(msg) == false)
1147                 return -1;
1148
1149         if (peer == NULL) {
1150                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1151                 rte_errno = EINVAL;
1152                 return -1;
1153         }
1154
1155         return mp_send(msg, peer, MP_REP);
1156 }