common/sfc_efx/base: add API to get mport ID by selector
[dpdk.git] / lib / eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_memcfg.h"
33 #include "eal_private.h"
34 #include "eal_filesystem.h"
35 #include "eal_internal_cfg.h"
36
37 static int mp_fd = -1;
38 static pthread_t mp_handle_tid;
39 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
40 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
41 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
42 static char peer_name[PATH_MAX];
43
44 struct action_entry {
45         TAILQ_ENTRY(action_entry) next;
46         char action_name[RTE_MP_MAX_NAME_LEN];
47         rte_mp_t action;
48 };
49
50 /** Double linked list of actions. */
51 TAILQ_HEAD(action_entry_list, action_entry);
52
53 static struct action_entry_list action_entry_list =
54         TAILQ_HEAD_INITIALIZER(action_entry_list);
55
56 enum mp_type {
57         MP_MSG, /* Share message with peers, will not block */
58         MP_REQ, /* Request for information, Will block for a reply */
59         MP_REP, /* Response to previously-received request */
60         MP_IGN, /* Response telling requester to ignore this response */
61 };
62
63 struct mp_msg_internal {
64         int type;
65         struct rte_mp_msg msg;
66 };
67
68 struct async_request_param {
69         rte_mp_async_reply_t clb;
70         struct rte_mp_reply user_reply;
71         struct timespec end;
72         int n_responses_processed;
73 };
74
75 struct pending_request {
76         TAILQ_ENTRY(pending_request) next;
77         enum {
78                 REQUEST_TYPE_SYNC,
79                 REQUEST_TYPE_ASYNC
80         } type;
81         char dst[PATH_MAX];
82         struct rte_mp_msg *request;
83         struct rte_mp_msg *reply;
84         int reply_received;
85         RTE_STD_C11
86         union {
87                 struct {
88                         struct async_request_param *param;
89                 } async;
90                 struct {
91                         pthread_cond_t cond;
92                 } sync;
93         };
94 };
95
96 TAILQ_HEAD(pending_request_list, pending_request);
97
98 static struct {
99         struct pending_request_list requests;
100         pthread_mutex_t lock;
101 } pending_requests = {
102         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
103         .lock = PTHREAD_MUTEX_INITIALIZER,
104         /**< used in async requests only */
105 };
106
107 /* forward declarations */
108 static int
109 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
110
111 /* for use with alarm callback */
112 static void
113 async_reply_handle(void *arg);
114
115 /* for use with process_msg */
116 static struct pending_request *
117 async_reply_handle_thread_unsafe(void *arg);
118
119 static void
120 trigger_async_action(struct pending_request *req);
121
122 static struct pending_request *
123 find_pending_request(const char *dst, const char *act_name)
124 {
125         struct pending_request *r;
126
127         TAILQ_FOREACH(r, &pending_requests.requests, next) {
128                 if (!strcmp(r->dst, dst) &&
129                     !strcmp(r->request->name, act_name))
130                         break;
131         }
132
133         return r;
134 }
135
136 static void
137 create_socket_path(const char *name, char *buf, int len)
138 {
139         const char *prefix = eal_mp_socket_path();
140
141         if (strlen(name) > 0)
142                 snprintf(buf, len, "%s_%s", prefix, name);
143         else
144                 strlcpy(buf, prefix, len);
145 }
146
147 int
148 rte_eal_primary_proc_alive(const char *config_file_path)
149 {
150         int config_fd;
151
152         if (config_file_path)
153                 config_fd = open(config_file_path, O_RDONLY);
154         else {
155                 const char *path;
156
157                 path = eal_runtime_config_path();
158                 config_fd = open(path, O_RDONLY);
159         }
160         if (config_fd < 0)
161                 return 0;
162
163         int ret = lockf(config_fd, F_TEST, 0);
164         close(config_fd);
165
166         return !!ret;
167 }
168
169 static struct action_entry *
170 find_action_entry_by_name(const char *name)
171 {
172         struct action_entry *entry;
173
174         TAILQ_FOREACH(entry, &action_entry_list, next) {
175                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
176                         break;
177         }
178
179         return entry;
180 }
181
182 static int
183 validate_action_name(const char *name)
184 {
185         if (name == NULL) {
186                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
187                 rte_errno = EINVAL;
188                 return -1;
189         }
190         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
191                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
192                 rte_errno = EINVAL;
193                 return -1;
194         }
195         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
196                 rte_errno = E2BIG;
197                 return -1;
198         }
199         return 0;
200 }
201
202 int
203 rte_mp_action_register(const char *name, rte_mp_t action)
204 {
205         struct action_entry *entry;
206         const struct internal_config *internal_conf =
207                 eal_get_internal_configuration();
208
209         if (validate_action_name(name) != 0)
210                 return -1;
211
212         if (internal_conf->no_shconf) {
213                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
214                 rte_errno = ENOTSUP;
215                 return -1;
216         }
217
218         entry = malloc(sizeof(struct action_entry));
219         if (entry == NULL) {
220                 rte_errno = ENOMEM;
221                 return -1;
222         }
223         strlcpy(entry->action_name, name, sizeof(entry->action_name));
224         entry->action = action;
225
226         pthread_mutex_lock(&mp_mutex_action);
227         if (find_action_entry_by_name(name) != NULL) {
228                 pthread_mutex_unlock(&mp_mutex_action);
229                 rte_errno = EEXIST;
230                 free(entry);
231                 return -1;
232         }
233         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
234         pthread_mutex_unlock(&mp_mutex_action);
235         return 0;
236 }
237
238 void
239 rte_mp_action_unregister(const char *name)
240 {
241         struct action_entry *entry;
242         const struct internal_config *internal_conf =
243                 eal_get_internal_configuration();
244
245         if (validate_action_name(name) != 0)
246                 return;
247
248         if (internal_conf->no_shconf) {
249                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
250                 return;
251         }
252
253         pthread_mutex_lock(&mp_mutex_action);
254         entry = find_action_entry_by_name(name);
255         if (entry == NULL) {
256                 pthread_mutex_unlock(&mp_mutex_action);
257                 return;
258         }
259         TAILQ_REMOVE(&action_entry_list, entry, next);
260         pthread_mutex_unlock(&mp_mutex_action);
261         free(entry);
262 }
263
264 static int
265 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
266 {
267         int msglen;
268         struct iovec iov;
269         struct msghdr msgh;
270         char control[CMSG_SPACE(sizeof(m->msg.fds))];
271         struct cmsghdr *cmsg;
272         int buflen = sizeof(*m) - sizeof(m->msg.fds);
273
274         memset(&msgh, 0, sizeof(msgh));
275         iov.iov_base = m;
276         iov.iov_len  = buflen;
277
278         msgh.msg_name = s;
279         msgh.msg_namelen = sizeof(*s);
280         msgh.msg_iov = &iov;
281         msgh.msg_iovlen = 1;
282         msgh.msg_control = control;
283         msgh.msg_controllen = sizeof(control);
284
285         msglen = recvmsg(mp_fd, &msgh, 0);
286         if (msglen < 0) {
287                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
288                 return -1;
289         }
290
291         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
292                 RTE_LOG(ERR, EAL, "truncated msg\n");
293                 return -1;
294         }
295
296         /* read auxiliary FDs if any */
297         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
298                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
299                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
300                         (cmsg->cmsg_type == SCM_RIGHTS)) {
301                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
302                         break;
303                 }
304         }
305         /* sanity-check the response */
306         if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) {
307                 RTE_LOG(ERR, EAL, "invalid number of fd's received\n");
308                 return -1;
309         }
310         if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) {
311                 RTE_LOG(ERR, EAL, "invalid received data length\n");
312                 return -1;
313         }
314         return 0;
315 }
316
317 static void
318 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
319 {
320         struct pending_request *pending_req;
321         struct action_entry *entry;
322         struct rte_mp_msg *msg = &m->msg;
323         rte_mp_t action = NULL;
324         const struct internal_config *internal_conf =
325                 eal_get_internal_configuration();
326
327         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
328
329         if (m->type == MP_REP || m->type == MP_IGN) {
330                 struct pending_request *req = NULL;
331
332                 pthread_mutex_lock(&pending_requests.lock);
333                 pending_req = find_pending_request(s->sun_path, msg->name);
334                 if (pending_req) {
335                         memcpy(pending_req->reply, msg, sizeof(*msg));
336                         /* -1 indicates that we've been asked to ignore */
337                         pending_req->reply_received =
338                                 m->type == MP_REP ? 1 : -1;
339
340                         if (pending_req->type == REQUEST_TYPE_SYNC)
341                                 pthread_cond_signal(&pending_req->sync.cond);
342                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
343                                 req = async_reply_handle_thread_unsafe(
344                                                 pending_req);
345                 } else
346                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
347                 pthread_mutex_unlock(&pending_requests.lock);
348
349                 if (req != NULL)
350                         trigger_async_action(req);
351                 return;
352         }
353
354         pthread_mutex_lock(&mp_mutex_action);
355         entry = find_action_entry_by_name(msg->name);
356         if (entry != NULL)
357                 action = entry->action;
358         pthread_mutex_unlock(&mp_mutex_action);
359
360         if (!action) {
361                 if (m->type == MP_REQ && !internal_conf->init_complete) {
362                         /* if this is a request, and init is not yet complete,
363                          * and callback wasn't registered, we should tell the
364                          * requester to ignore our existence because we're not
365                          * yet ready to process this request.
366                          */
367                         struct rte_mp_msg dummy;
368
369                         memset(&dummy, 0, sizeof(dummy));
370                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
371                         mp_send(&dummy, s->sun_path, MP_IGN);
372                 } else {
373                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
374                                 msg->name);
375                 }
376         } else if (action(msg, s->sun_path) < 0) {
377                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
378         }
379 }
380
381 static void *
382 mp_handle(void *arg __rte_unused)
383 {
384         struct mp_msg_internal msg;
385         struct sockaddr_un sa;
386
387         while (mp_fd >= 0) {
388                 if (read_msg(&msg, &sa) == 0)
389                         process_msg(&msg, &sa);
390         }
391
392         return NULL;
393 }
394
395 static int
396 timespec_cmp(const struct timespec *a, const struct timespec *b)
397 {
398         if (a->tv_sec < b->tv_sec)
399                 return -1;
400         if (a->tv_sec > b->tv_sec)
401                 return 1;
402         if (a->tv_nsec < b->tv_nsec)
403                 return -1;
404         if (a->tv_nsec > b->tv_nsec)
405                 return 1;
406         return 0;
407 }
408
409 enum async_action {
410         ACTION_FREE, /**< free the action entry, but don't trigger callback */
411         ACTION_TRIGGER /**< trigger callback, then free action entry */
412 };
413
414 static enum async_action
415 process_async_request(struct pending_request *sr, const struct timespec *now)
416 {
417         struct async_request_param *param;
418         struct rte_mp_reply *reply;
419         bool timeout, last_msg;
420
421         param = sr->async.param;
422         reply = &param->user_reply;
423
424         /* did we timeout? */
425         timeout = timespec_cmp(&param->end, now) <= 0;
426
427         /* if we received a response, adjust relevant data and copy message. */
428         if (sr->reply_received == 1 && sr->reply) {
429                 struct rte_mp_msg *msg, *user_msgs, *tmp;
430
431                 msg = sr->reply;
432                 user_msgs = reply->msgs;
433
434                 tmp = realloc(user_msgs, sizeof(*msg) *
435                                 (reply->nb_received + 1));
436                 if (!tmp) {
437                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
438                                 sr->dst, sr->request->name);
439                         /* this entry is going to be removed and its message
440                          * dropped, but we don't want to leak memory, so
441                          * continue.
442                          */
443                 } else {
444                         user_msgs = tmp;
445                         reply->msgs = user_msgs;
446                         memcpy(&user_msgs[reply->nb_received],
447                                         msg, sizeof(*msg));
448                         reply->nb_received++;
449                 }
450
451                 /* mark this request as processed */
452                 param->n_responses_processed++;
453         } else if (sr->reply_received == -1) {
454                 /* we were asked to ignore this process */
455                 reply->nb_sent--;
456         } else if (timeout) {
457                 /* count it as processed response, but don't increment
458                  * nb_received.
459                  */
460                 param->n_responses_processed++;
461         }
462
463         free(sr->reply);
464
465         last_msg = param->n_responses_processed == reply->nb_sent;
466
467         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
468 }
469
470 static void
471 trigger_async_action(struct pending_request *sr)
472 {
473         struct async_request_param *param;
474         struct rte_mp_reply *reply;
475
476         param = sr->async.param;
477         reply = &param->user_reply;
478
479         param->clb(sr->request, reply);
480
481         /* clean up */
482         free(sr->async.param->user_reply.msgs);
483         free(sr->async.param);
484         free(sr->request);
485         free(sr);
486 }
487
488 static struct pending_request *
489 async_reply_handle_thread_unsafe(void *arg)
490 {
491         struct pending_request *req = (struct pending_request *)arg;
492         enum async_action action;
493         struct timespec ts_now;
494
495         if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) {
496                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
497                 goto no_trigger;
498         }
499
500         action = process_async_request(req, &ts_now);
501
502         TAILQ_REMOVE(&pending_requests.requests, req, next);
503
504         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
505                 /* if we failed to cancel the alarm because it's already in
506                  * progress, don't proceed because otherwise we will end up
507                  * handling the same message twice.
508                  */
509                 if (rte_errno == EINPROGRESS) {
510                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
511                         goto no_trigger;
512                 }
513                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
514         }
515
516         if (action == ACTION_TRIGGER)
517                 return req;
518 no_trigger:
519         free(req);
520         return NULL;
521 }
522
523 static void
524 async_reply_handle(void *arg)
525 {
526         struct pending_request *req;
527
528         pthread_mutex_lock(&pending_requests.lock);
529         req = async_reply_handle_thread_unsafe(arg);
530         pthread_mutex_unlock(&pending_requests.lock);
531
532         if (req != NULL)
533                 trigger_async_action(req);
534 }
535
536 static int
537 open_socket_fd(void)
538 {
539         struct sockaddr_un un;
540
541         peer_name[0] = '\0';
542         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
543                 snprintf(peer_name, sizeof(peer_name),
544                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
545
546         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
547         if (mp_fd < 0) {
548                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
549                 return -1;
550         }
551
552         memset(&un, 0, sizeof(un));
553         un.sun_family = AF_UNIX;
554
555         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
556
557         unlink(un.sun_path); /* May still exist since last run */
558
559         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
560                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
561                         un.sun_path, strerror(errno));
562                 close(mp_fd);
563                 return -1;
564         }
565
566         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
567         return mp_fd;
568 }
569
570 static void
571 close_socket_fd(int fd)
572 {
573         char path[PATH_MAX];
574
575         close(fd);
576         create_socket_path(peer_name, path, sizeof(path));
577         unlink(path);
578 }
579
580 int
581 rte_mp_channel_init(void)
582 {
583         char path[PATH_MAX];
584         int dir_fd;
585         const struct internal_config *internal_conf =
586                 eal_get_internal_configuration();
587
588         /* in no shared files mode, we do not have secondary processes support,
589          * so no need to initialize IPC.
590          */
591         if (internal_conf->no_shconf) {
592                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
593                 rte_errno = ENOTSUP;
594                 return -1;
595         }
596
597         /* create filter path */
598         create_socket_path("*", path, sizeof(path));
599         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
600
601         /* path may have been modified, so recreate it */
602         create_socket_path("*", path, sizeof(path));
603         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
604
605         /* lock the directory */
606         dir_fd = open(mp_dir_path, O_RDONLY);
607         if (dir_fd < 0) {
608                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
609                         mp_dir_path, strerror(errno));
610                 return -1;
611         }
612
613         if (flock(dir_fd, LOCK_EX)) {
614                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
615                         mp_dir_path, strerror(errno));
616                 close(dir_fd);
617                 return -1;
618         }
619
620         if (open_socket_fd() < 0) {
621                 close(dir_fd);
622                 return -1;
623         }
624
625         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
626                         NULL, mp_handle, NULL) < 0) {
627                 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n",
628                         strerror(errno));
629                 close(mp_fd);
630                 close(dir_fd);
631                 mp_fd = -1;
632                 return -1;
633         }
634
635         /* unlock the directory */
636         flock(dir_fd, LOCK_UN);
637         close(dir_fd);
638
639         return 0;
640 }
641
642 void
643 rte_mp_channel_cleanup(void)
644 {
645         int fd;
646
647         if (mp_fd < 0)
648                 return;
649
650         fd = mp_fd;
651         mp_fd = -1;
652         pthread_cancel(mp_handle_tid);
653         pthread_join(mp_handle_tid, NULL);
654         close_socket_fd(fd);
655 }
656
657 /**
658  * Return -1, as fail to send message and it's caused by the local side.
659  * Return 0, as fail to send message and it's caused by the remote side.
660  * Return 1, as succeed to send message.
661  *
662  */
663 static int
664 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
665 {
666         int snd;
667         struct iovec iov;
668         struct msghdr msgh;
669         struct cmsghdr *cmsg;
670         struct sockaddr_un dst;
671         struct mp_msg_internal m;
672         int fd_size = msg->num_fds * sizeof(int);
673         char control[CMSG_SPACE(fd_size)];
674
675         m.type = type;
676         memcpy(&m.msg, msg, sizeof(*msg));
677
678         memset(&dst, 0, sizeof(dst));
679         dst.sun_family = AF_UNIX;
680         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
681
682         memset(&msgh, 0, sizeof(msgh));
683         memset(control, 0, sizeof(control));
684
685         iov.iov_base = &m;
686         iov.iov_len = sizeof(m) - sizeof(msg->fds);
687
688         msgh.msg_name = &dst;
689         msgh.msg_namelen = sizeof(dst);
690         msgh.msg_iov = &iov;
691         msgh.msg_iovlen = 1;
692         msgh.msg_control = control;
693         msgh.msg_controllen = sizeof(control);
694
695         cmsg = CMSG_FIRSTHDR(&msgh);
696         cmsg->cmsg_len = CMSG_LEN(fd_size);
697         cmsg->cmsg_level = SOL_SOCKET;
698         cmsg->cmsg_type = SCM_RIGHTS;
699         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
700
701         do {
702                 snd = sendmsg(mp_fd, &msgh, 0);
703         } while (snd < 0 && errno == EINTR);
704
705         if (snd < 0) {
706                 rte_errno = errno;
707                 /* Check if it caused by peer process exits */
708                 if (errno == ECONNREFUSED &&
709                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
710                         unlink(dst_path);
711                         return 0;
712                 }
713                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
714                         dst_path, strerror(errno));
715                 return -1;
716         }
717
718         return 1;
719 }
720
721 static int
722 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
723 {
724         int dir_fd, ret = 0;
725         DIR *mp_dir;
726         struct dirent *ent;
727
728         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
729                 peer = eal_mp_socket_path();
730
731         if (peer) {
732                 if (send_msg(peer, msg, type) < 0)
733                         return -1;
734                 else
735                         return 0;
736         }
737
738         /* broadcast to all secondary processes */
739         mp_dir = opendir(mp_dir_path);
740         if (!mp_dir) {
741                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
742                                 mp_dir_path);
743                 rte_errno = errno;
744                 return -1;
745         }
746
747         dir_fd = dirfd(mp_dir);
748         /* lock the directory to prevent processes spinning up while we send */
749         if (flock(dir_fd, LOCK_SH)) {
750                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
751                         mp_dir_path);
752                 rte_errno = errno;
753                 closedir(mp_dir);
754                 return -1;
755         }
756
757         while ((ent = readdir(mp_dir))) {
758                 char path[PATH_MAX];
759
760                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
761                         continue;
762
763                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
764                          ent->d_name);
765                 if (send_msg(path, msg, type) < 0)
766                         ret = -1;
767         }
768         /* unlock the dir */
769         flock(dir_fd, LOCK_UN);
770
771         /* dir_fd automatically closed on closedir */
772         closedir(mp_dir);
773         return ret;
774 }
775
776 static int
777 check_input(const struct rte_mp_msg *msg)
778 {
779         if (msg == NULL) {
780                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
781                 rte_errno = EINVAL;
782                 return -1;
783         }
784
785         if (validate_action_name(msg->name) != 0)
786                 return -1;
787
788         if (msg->len_param < 0) {
789                 RTE_LOG(ERR, EAL, "Message data length is negative\n");
790                 rte_errno = EINVAL;
791                 return -1;
792         }
793
794         if (msg->num_fds < 0) {
795                 RTE_LOG(ERR, EAL, "Number of fd's is negative\n");
796                 rte_errno = EINVAL;
797                 return -1;
798         }
799
800         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
801                 RTE_LOG(ERR, EAL, "Message data is too long\n");
802                 rte_errno = E2BIG;
803                 return -1;
804         }
805
806         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
807                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
808                         RTE_MP_MAX_FD_NUM);
809                 rte_errno = E2BIG;
810                 return -1;
811         }
812
813         return 0;
814 }
815
816 int
817 rte_mp_sendmsg(struct rte_mp_msg *msg)
818 {
819         const struct internal_config *internal_conf =
820                 eal_get_internal_configuration();
821
822         if (check_input(msg) != 0)
823                 return -1;
824
825         if (internal_conf->no_shconf) {
826                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
827                 rte_errno = ENOTSUP;
828                 return -1;
829         }
830
831         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
832         return mp_send(msg, NULL, MP_MSG);
833 }
834
835 static int
836 mp_request_async(const char *dst, struct rte_mp_msg *req,
837                 struct async_request_param *param, const struct timespec *ts)
838 {
839         struct rte_mp_msg *reply_msg;
840         struct pending_request *pending_req, *exist;
841         int ret = -1;
842
843         pending_req = calloc(1, sizeof(*pending_req));
844         reply_msg = calloc(1, sizeof(*reply_msg));
845         if (pending_req == NULL || reply_msg == NULL) {
846                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
847                 rte_errno = ENOMEM;
848                 ret = -1;
849                 goto fail;
850         }
851
852         pending_req->type = REQUEST_TYPE_ASYNC;
853         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
854         pending_req->request = req;
855         pending_req->reply = reply_msg;
856         pending_req->async.param = param;
857
858         /* queue already locked by caller */
859
860         exist = find_pending_request(dst, req->name);
861         if (exist) {
862                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
863                 rte_errno = EEXIST;
864                 ret = -1;
865                 goto fail;
866         }
867
868         ret = send_msg(dst, req, MP_REQ);
869         if (ret < 0) {
870                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
871                         dst, req->name);
872                 ret = -1;
873                 goto fail;
874         } else if (ret == 0) {
875                 ret = 0;
876                 goto fail;
877         }
878         param->user_reply.nb_sent++;
879
880         /* if alarm set fails, we simply ignore the reply */
881         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
882                               async_reply_handle, pending_req) < 0) {
883                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
884                         dst, req->name);
885                 ret = -1;
886                 goto fail;
887         }
888         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
889
890         return 0;
891 fail:
892         free(pending_req);
893         free(reply_msg);
894         return ret;
895 }
896
897 static int
898 mp_request_sync(const char *dst, struct rte_mp_msg *req,
899                struct rte_mp_reply *reply, const struct timespec *ts)
900 {
901         int ret;
902         pthread_condattr_t attr;
903         struct rte_mp_msg msg, *tmp;
904         struct pending_request pending_req, *exist;
905
906         pending_req.type = REQUEST_TYPE_SYNC;
907         pending_req.reply_received = 0;
908         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
909         pending_req.request = req;
910         pending_req.reply = &msg;
911         pthread_condattr_init(&attr);
912         pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
913         pthread_cond_init(&pending_req.sync.cond, &attr);
914
915         exist = find_pending_request(dst, req->name);
916         if (exist) {
917                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
918                 rte_errno = EEXIST;
919                 return -1;
920         }
921
922         ret = send_msg(dst, req, MP_REQ);
923         if (ret < 0) {
924                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
925                         dst, req->name);
926                 return -1;
927         } else if (ret == 0)
928                 return 0;
929
930         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
931
932         reply->nb_sent++;
933
934         do {
935                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
936                                 &pending_requests.lock, ts);
937         } while (ret != 0 && ret != ETIMEDOUT);
938
939         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
940
941         if (pending_req.reply_received == 0) {
942                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
943                         dst, req->name);
944                 rte_errno = ETIMEDOUT;
945                 return -1;
946         }
947         if (pending_req.reply_received == -1) {
948                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
949                 /* not receiving this message is not an error, so decrement
950                  * number of sent messages
951                  */
952                 reply->nb_sent--;
953                 return 0;
954         }
955
956         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
957         if (!tmp) {
958                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
959                         dst, req->name);
960                 rte_errno = ENOMEM;
961                 return -1;
962         }
963         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
964         reply->msgs = tmp;
965         reply->nb_received++;
966         return 0;
967 }
968
969 int
970 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
971                 const struct timespec *ts)
972 {
973         int dir_fd, ret = -1;
974         DIR *mp_dir;
975         struct dirent *ent;
976         struct timespec now, end;
977         const struct internal_config *internal_conf =
978                 eal_get_internal_configuration();
979
980         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
981
982         reply->nb_sent = 0;
983         reply->nb_received = 0;
984         reply->msgs = NULL;
985
986         if (check_input(req) != 0)
987                 goto end;
988
989         if (internal_conf->no_shconf) {
990                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
991                 rte_errno = ENOTSUP;
992                 return -1;
993         }
994
995         if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) {
996                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
997                 rte_errno = errno;
998                 goto end;
999         }
1000
1001         end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000;
1002         end.tv_sec = now.tv_sec + ts->tv_sec +
1003                         (now.tv_nsec + ts->tv_nsec) / 1000000000;
1004
1005         /* for secondary process, send request to the primary process only */
1006         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1007                 pthread_mutex_lock(&pending_requests.lock);
1008                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
1009                 pthread_mutex_unlock(&pending_requests.lock);
1010                 goto end;
1011         }
1012
1013         /* for primary process, broadcast request, and collect reply 1 by 1 */
1014         mp_dir = opendir(mp_dir_path);
1015         if (!mp_dir) {
1016                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1017                 rte_errno = errno;
1018                 goto end;
1019         }
1020
1021         dir_fd = dirfd(mp_dir);
1022         /* lock the directory to prevent processes spinning up while we send */
1023         if (flock(dir_fd, LOCK_SH)) {
1024                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1025                         mp_dir_path);
1026                 rte_errno = errno;
1027                 goto close_end;
1028         }
1029
1030         pthread_mutex_lock(&pending_requests.lock);
1031         while ((ent = readdir(mp_dir))) {
1032                 char path[PATH_MAX];
1033
1034                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1035                         continue;
1036
1037                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1038                          ent->d_name);
1039
1040                 /* unlocks the mutex while waiting for response,
1041                  * locks on receive
1042                  */
1043                 if (mp_request_sync(path, req, reply, &end))
1044                         goto unlock_end;
1045         }
1046         ret = 0;
1047
1048 unlock_end:
1049         pthread_mutex_unlock(&pending_requests.lock);
1050         /* unlock the directory */
1051         flock(dir_fd, LOCK_UN);
1052
1053 close_end:
1054         /* dir_fd automatically closed on closedir */
1055         closedir(mp_dir);
1056
1057 end:
1058         if (ret) {
1059                 free(reply->msgs);
1060                 reply->nb_received = 0;
1061                 reply->msgs = NULL;
1062         }
1063         return ret;
1064 }
1065
1066 int
1067 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
1068                 rte_mp_async_reply_t clb)
1069 {
1070         struct rte_mp_msg *copy;
1071         struct pending_request *dummy;
1072         struct async_request_param *param;
1073         struct rte_mp_reply *reply;
1074         int dir_fd, ret = 0;
1075         DIR *mp_dir;
1076         struct dirent *ent;
1077         struct timespec now;
1078         struct timespec *end;
1079         bool dummy_used = false;
1080         const struct internal_config *internal_conf =
1081                 eal_get_internal_configuration();
1082
1083         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
1084
1085         if (check_input(req) != 0)
1086                 return -1;
1087
1088         if (internal_conf->no_shconf) {
1089                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1090                 rte_errno = ENOTSUP;
1091                 return -1;
1092         }
1093
1094         if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) {
1095                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
1096                 rte_errno = errno;
1097                 return -1;
1098         }
1099         copy = calloc(1, sizeof(*copy));
1100         dummy = calloc(1, sizeof(*dummy));
1101         param = calloc(1, sizeof(*param));
1102         if (copy == NULL || dummy == NULL || param == NULL) {
1103                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1104                 rte_errno = ENOMEM;
1105                 goto fail;
1106         }
1107
1108         /* copy message */
1109         memcpy(copy, req, sizeof(*copy));
1110
1111         param->n_responses_processed = 0;
1112         param->clb = clb;
1113         end = &param->end;
1114         reply = &param->user_reply;
1115
1116         end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000;
1117         end->tv_sec = now.tv_sec + ts->tv_sec +
1118                         (now.tv_nsec + ts->tv_nsec) / 1000000000;
1119         reply->nb_sent = 0;
1120         reply->nb_received = 0;
1121         reply->msgs = NULL;
1122
1123         /* we have to lock the request queue here, as we will be adding a bunch
1124          * of requests to the queue at once, and some of the replies may arrive
1125          * before we add all of the requests to the queue.
1126          */
1127         pthread_mutex_lock(&pending_requests.lock);
1128
1129         /* we have to ensure that callback gets triggered even if we don't send
1130          * anything, therefore earlier we have allocated a dummy request. fill
1131          * it, and put it on the queue if we don't send any requests.
1132          */
1133         dummy->type = REQUEST_TYPE_ASYNC;
1134         dummy->request = copy;
1135         dummy->reply = NULL;
1136         dummy->async.param = param;
1137         dummy->reply_received = 1; /* short-circuit the timeout */
1138
1139         /* for secondary process, send request to the primary process only */
1140         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1141                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1142
1143                 /* if we didn't send anything, put dummy request on the queue */
1144                 if (ret == 0 && reply->nb_sent == 0) {
1145                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1146                                         next);
1147                         dummy_used = true;
1148                 }
1149
1150                 pthread_mutex_unlock(&pending_requests.lock);
1151
1152                 /* if we couldn't send anything, clean up */
1153                 if (ret != 0)
1154                         goto fail;
1155                 return 0;
1156         }
1157
1158         /* for primary process, broadcast request */
1159         mp_dir = opendir(mp_dir_path);
1160         if (!mp_dir) {
1161                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1162                 rte_errno = errno;
1163                 goto unlock_fail;
1164         }
1165         dir_fd = dirfd(mp_dir);
1166
1167         /* lock the directory to prevent processes spinning up while we send */
1168         if (flock(dir_fd, LOCK_SH)) {
1169                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1170                         mp_dir_path);
1171                 rte_errno = errno;
1172                 goto closedir_fail;
1173         }
1174
1175         while ((ent = readdir(mp_dir))) {
1176                 char path[PATH_MAX];
1177
1178                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1179                         continue;
1180
1181                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1182                          ent->d_name);
1183
1184                 if (mp_request_async(path, copy, param, ts))
1185                         ret = -1;
1186         }
1187         /* if we didn't send anything, put dummy request on the queue */
1188         if (ret == 0 && reply->nb_sent == 0) {
1189                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1190                 dummy_used = true;
1191         }
1192
1193         /* finally, unlock the queue */
1194         pthread_mutex_unlock(&pending_requests.lock);
1195
1196         /* unlock the directory */
1197         flock(dir_fd, LOCK_UN);
1198
1199         /* dir_fd automatically closed on closedir */
1200         closedir(mp_dir);
1201
1202         /* if dummy was unused, free it */
1203         if (!dummy_used)
1204                 free(dummy);
1205
1206         return ret;
1207 closedir_fail:
1208         closedir(mp_dir);
1209 unlock_fail:
1210         pthread_mutex_unlock(&pending_requests.lock);
1211 fail:
1212         free(dummy);
1213         free(param);
1214         free(copy);
1215         return -1;
1216 }
1217
1218 int
1219 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1220 {
1221         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1222         const struct internal_config *internal_conf =
1223                 eal_get_internal_configuration();
1224
1225         if (check_input(msg) != 0)
1226                 return -1;
1227
1228         if (peer == NULL) {
1229                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1230                 rte_errno = EINVAL;
1231                 return -1;
1232         }
1233
1234         if (internal_conf->no_shconf) {
1235                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1236                 return 0;
1237         }
1238
1239         return mp_send(msg, peer, MP_REP);
1240 }
1241
1242 /* Internally, the status of the mp feature is represented as a three-state:
1243  * - "unknown" as long as no secondary process attached to a primary process
1244  *   and there was no call to rte_mp_disable yet,
1245  * - "enabled" as soon as a secondary process attaches to a primary process,
1246  * - "disabled" when a primary process successfully called rte_mp_disable,
1247  */
1248 enum mp_status {
1249         MP_STATUS_UNKNOWN,
1250         MP_STATUS_DISABLED,
1251         MP_STATUS_ENABLED,
1252 };
1253
1254 static bool
1255 set_mp_status(enum mp_status status)
1256 {
1257         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
1258         uint8_t expected;
1259         uint8_t desired;
1260
1261         expected = MP_STATUS_UNKNOWN;
1262         desired = status;
1263         if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired,
1264                         false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
1265                 return true;
1266
1267         return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired;
1268 }
1269
1270 bool
1271 rte_mp_disable(void)
1272 {
1273         return set_mp_status(MP_STATUS_DISABLED);
1274 }
1275
1276 bool
1277 __rte_mp_enable(void)
1278 {
1279         return set_mp_status(MP_STATUS_ENABLED);
1280 }