7e115650ca84b2f3f14d2f88c1852bb5ef940f1b
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_memcfg.h"
33 #include "eal_private.h"
34 #include "eal_filesystem.h"
35 #include "eal_internal_cfg.h"
36
37 static int mp_fd = -1;
38 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
39 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
40 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
41 static char peer_name[PATH_MAX];
42
43 struct action_entry {
44         TAILQ_ENTRY(action_entry) next;
45         char action_name[RTE_MP_MAX_NAME_LEN];
46         rte_mp_t action;
47 };
48
49 /** Double linked list of actions. */
50 TAILQ_HEAD(action_entry_list, action_entry);
51
52 static struct action_entry_list action_entry_list =
53         TAILQ_HEAD_INITIALIZER(action_entry_list);
54
55 enum mp_type {
56         MP_MSG, /* Share message with peers, will not block */
57         MP_REQ, /* Request for information, Will block for a reply */
58         MP_REP, /* Response to previously-received request */
59         MP_IGN, /* Response telling requester to ignore this response */
60 };
61
62 struct mp_msg_internal {
63         int type;
64         struct rte_mp_msg msg;
65 };
66
67 struct async_request_param {
68         rte_mp_async_reply_t clb;
69         struct rte_mp_reply user_reply;
70         struct timespec end;
71         int n_responses_processed;
72 };
73
74 struct pending_request {
75         TAILQ_ENTRY(pending_request) next;
76         enum {
77                 REQUEST_TYPE_SYNC,
78                 REQUEST_TYPE_ASYNC
79         } type;
80         char dst[PATH_MAX];
81         struct rte_mp_msg *request;
82         struct rte_mp_msg *reply;
83         int reply_received;
84         RTE_STD_C11
85         union {
86                 struct {
87                         struct async_request_param *param;
88                 } async;
89                 struct {
90                         pthread_cond_t cond;
91                 } sync;
92         };
93 };
94
95 TAILQ_HEAD(pending_request_list, pending_request);
96
97 static struct {
98         struct pending_request_list requests;
99         pthread_mutex_t lock;
100 } pending_requests = {
101         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
102         .lock = PTHREAD_MUTEX_INITIALIZER,
103         /**< used in async requests only */
104 };
105
106 /* forward declarations */
107 static int
108 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
109
110 /* for use with alarm callback */
111 static void
112 async_reply_handle(void *arg);
113
114 /* for use with process_msg */
115 static struct pending_request *
116 async_reply_handle_thread_unsafe(void *arg);
117
118 static void
119 trigger_async_action(struct pending_request *req);
120
121 static struct pending_request *
122 find_pending_request(const char *dst, const char *act_name)
123 {
124         struct pending_request *r;
125
126         TAILQ_FOREACH(r, &pending_requests.requests, next) {
127                 if (!strcmp(r->dst, dst) &&
128                     !strcmp(r->request->name, act_name))
129                         break;
130         }
131
132         return r;
133 }
134
135 static void
136 create_socket_path(const char *name, char *buf, int len)
137 {
138         const char *prefix = eal_mp_socket_path();
139
140         if (strlen(name) > 0)
141                 snprintf(buf, len, "%s_%s", prefix, name);
142         else
143                 strlcpy(buf, prefix, len);
144 }
145
146 int
147 rte_eal_primary_proc_alive(const char *config_file_path)
148 {
149         int config_fd;
150
151         if (config_file_path)
152                 config_fd = open(config_file_path, O_RDONLY);
153         else {
154                 const char *path;
155
156                 path = eal_runtime_config_path();
157                 config_fd = open(path, O_RDONLY);
158         }
159         if (config_fd < 0)
160                 return 0;
161
162         int ret = lockf(config_fd, F_TEST, 0);
163         close(config_fd);
164
165         return !!ret;
166 }
167
168 static struct action_entry *
169 find_action_entry_by_name(const char *name)
170 {
171         struct action_entry *entry;
172
173         TAILQ_FOREACH(entry, &action_entry_list, next) {
174                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
175                         break;
176         }
177
178         return entry;
179 }
180
181 static int
182 validate_action_name(const char *name)
183 {
184         if (name == NULL) {
185                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
186                 rte_errno = EINVAL;
187                 return -1;
188         }
189         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
190                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
191                 rte_errno = EINVAL;
192                 return -1;
193         }
194         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
195                 rte_errno = E2BIG;
196                 return -1;
197         }
198         return 0;
199 }
200
201 int
202 rte_mp_action_register(const char *name, rte_mp_t action)
203 {
204         struct action_entry *entry;
205         const struct internal_config *internal_conf =
206                 eal_get_internal_configuration();
207
208         if (validate_action_name(name) != 0)
209                 return -1;
210
211         if (internal_conf->no_shconf) {
212                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
213                 rte_errno = ENOTSUP;
214                 return -1;
215         }
216
217         entry = malloc(sizeof(struct action_entry));
218         if (entry == NULL) {
219                 rte_errno = ENOMEM;
220                 return -1;
221         }
222         strlcpy(entry->action_name, name, sizeof(entry->action_name));
223         entry->action = action;
224
225         pthread_mutex_lock(&mp_mutex_action);
226         if (find_action_entry_by_name(name) != NULL) {
227                 pthread_mutex_unlock(&mp_mutex_action);
228                 rte_errno = EEXIST;
229                 free(entry);
230                 return -1;
231         }
232         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
233         pthread_mutex_unlock(&mp_mutex_action);
234         return 0;
235 }
236
237 void
238 rte_mp_action_unregister(const char *name)
239 {
240         struct action_entry *entry;
241         const struct internal_config *internal_conf =
242                 eal_get_internal_configuration();
243
244         if (validate_action_name(name) != 0)
245                 return;
246
247         if (internal_conf->no_shconf) {
248                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
249                 return;
250         }
251
252         pthread_mutex_lock(&mp_mutex_action);
253         entry = find_action_entry_by_name(name);
254         if (entry == NULL) {
255                 pthread_mutex_unlock(&mp_mutex_action);
256                 return;
257         }
258         TAILQ_REMOVE(&action_entry_list, entry, next);
259         pthread_mutex_unlock(&mp_mutex_action);
260         free(entry);
261 }
262
263 static int
264 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
265 {
266         int msglen;
267         struct iovec iov;
268         struct msghdr msgh;
269         char control[CMSG_SPACE(sizeof(m->msg.fds))];
270         struct cmsghdr *cmsg;
271         int buflen = sizeof(*m) - sizeof(m->msg.fds);
272
273         memset(&msgh, 0, sizeof(msgh));
274         iov.iov_base = m;
275         iov.iov_len  = buflen;
276
277         msgh.msg_name = s;
278         msgh.msg_namelen = sizeof(*s);
279         msgh.msg_iov = &iov;
280         msgh.msg_iovlen = 1;
281         msgh.msg_control = control;
282         msgh.msg_controllen = sizeof(control);
283
284         msglen = recvmsg(mp_fd, &msgh, 0);
285         if (msglen < 0) {
286                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
287                 return -1;
288         }
289
290         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
291                 RTE_LOG(ERR, EAL, "truncated msg\n");
292                 return -1;
293         }
294
295         /* read auxiliary FDs if any */
296         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
297                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
298                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
299                         (cmsg->cmsg_type == SCM_RIGHTS)) {
300                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
301                         break;
302                 }
303         }
304         /* sanity-check the response */
305         if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) {
306                 RTE_LOG(ERR, EAL, "invalid number of fd's received\n");
307                 return -1;
308         }
309         if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) {
310                 RTE_LOG(ERR, EAL, "invalid received data length\n");
311                 return -1;
312         }
313         return 0;
314 }
315
316 static void
317 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
318 {
319         struct pending_request *pending_req;
320         struct action_entry *entry;
321         struct rte_mp_msg *msg = &m->msg;
322         rte_mp_t action = NULL;
323         const struct internal_config *internal_conf =
324                 eal_get_internal_configuration();
325
326         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
327
328         if (m->type == MP_REP || m->type == MP_IGN) {
329                 struct pending_request *req = NULL;
330
331                 pthread_mutex_lock(&pending_requests.lock);
332                 pending_req = find_pending_request(s->sun_path, msg->name);
333                 if (pending_req) {
334                         memcpy(pending_req->reply, msg, sizeof(*msg));
335                         /* -1 indicates that we've been asked to ignore */
336                         pending_req->reply_received =
337                                 m->type == MP_REP ? 1 : -1;
338
339                         if (pending_req->type == REQUEST_TYPE_SYNC)
340                                 pthread_cond_signal(&pending_req->sync.cond);
341                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
342                                 req = async_reply_handle_thread_unsafe(
343                                                 pending_req);
344                 } else
345                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
346                 pthread_mutex_unlock(&pending_requests.lock);
347
348                 if (req != NULL)
349                         trigger_async_action(req);
350                 return;
351         }
352
353         pthread_mutex_lock(&mp_mutex_action);
354         entry = find_action_entry_by_name(msg->name);
355         if (entry != NULL)
356                 action = entry->action;
357         pthread_mutex_unlock(&mp_mutex_action);
358
359         if (!action) {
360                 if (m->type == MP_REQ && !internal_conf->init_complete) {
361                         /* if this is a request, and init is not yet complete,
362                          * and callback wasn't registered, we should tell the
363                          * requester to ignore our existence because we're not
364                          * yet ready to process this request.
365                          */
366                         struct rte_mp_msg dummy;
367
368                         memset(&dummy, 0, sizeof(dummy));
369                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
370                         mp_send(&dummy, s->sun_path, MP_IGN);
371                 } else {
372                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
373                                 msg->name);
374                 }
375         } else if (action(msg, s->sun_path) < 0) {
376                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
377         }
378 }
379
380 static void *
381 mp_handle(void *arg __rte_unused)
382 {
383         struct mp_msg_internal msg;
384         struct sockaddr_un sa;
385
386         while (1) {
387                 if (read_msg(&msg, &sa) == 0)
388                         process_msg(&msg, &sa);
389         }
390
391         return NULL;
392 }
393
394 static int
395 timespec_cmp(const struct timespec *a, const struct timespec *b)
396 {
397         if (a->tv_sec < b->tv_sec)
398                 return -1;
399         if (a->tv_sec > b->tv_sec)
400                 return 1;
401         if (a->tv_nsec < b->tv_nsec)
402                 return -1;
403         if (a->tv_nsec > b->tv_nsec)
404                 return 1;
405         return 0;
406 }
407
408 enum async_action {
409         ACTION_FREE, /**< free the action entry, but don't trigger callback */
410         ACTION_TRIGGER /**< trigger callback, then free action entry */
411 };
412
413 static enum async_action
414 process_async_request(struct pending_request *sr, const struct timespec *now)
415 {
416         struct async_request_param *param;
417         struct rte_mp_reply *reply;
418         bool timeout, last_msg;
419
420         param = sr->async.param;
421         reply = &param->user_reply;
422
423         /* did we timeout? */
424         timeout = timespec_cmp(&param->end, now) <= 0;
425
426         /* if we received a response, adjust relevant data and copy mesasge. */
427         if (sr->reply_received == 1 && sr->reply) {
428                 struct rte_mp_msg *msg, *user_msgs, *tmp;
429
430                 msg = sr->reply;
431                 user_msgs = reply->msgs;
432
433                 tmp = realloc(user_msgs, sizeof(*msg) *
434                                 (reply->nb_received + 1));
435                 if (!tmp) {
436                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
437                                 sr->dst, sr->request->name);
438                         /* this entry is going to be removed and its message
439                          * dropped, but we don't want to leak memory, so
440                          * continue.
441                          */
442                 } else {
443                         user_msgs = tmp;
444                         reply->msgs = user_msgs;
445                         memcpy(&user_msgs[reply->nb_received],
446                                         msg, sizeof(*msg));
447                         reply->nb_received++;
448                 }
449
450                 /* mark this request as processed */
451                 param->n_responses_processed++;
452         } else if (sr->reply_received == -1) {
453                 /* we were asked to ignore this process */
454                 reply->nb_sent--;
455         } else if (timeout) {
456                 /* count it as processed response, but don't increment
457                  * nb_received.
458                  */
459                 param->n_responses_processed++;
460         }
461
462         free(sr->reply);
463
464         last_msg = param->n_responses_processed == reply->nb_sent;
465
466         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
467 }
468
469 static void
470 trigger_async_action(struct pending_request *sr)
471 {
472         struct async_request_param *param;
473         struct rte_mp_reply *reply;
474
475         param = sr->async.param;
476         reply = &param->user_reply;
477
478         param->clb(sr->request, reply);
479
480         /* clean up */
481         free(sr->async.param->user_reply.msgs);
482         free(sr->async.param);
483         free(sr->request);
484         free(sr);
485 }
486
487 static struct pending_request *
488 async_reply_handle_thread_unsafe(void *arg)
489 {
490         struct pending_request *req = (struct pending_request *)arg;
491         enum async_action action;
492         struct timespec ts_now;
493         struct timeval now;
494
495         if (gettimeofday(&now, NULL) < 0) {
496                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
497                 goto no_trigger;
498         }
499         ts_now.tv_nsec = now.tv_usec * 1000;
500         ts_now.tv_sec = now.tv_sec;
501
502         action = process_async_request(req, &ts_now);
503
504         TAILQ_REMOVE(&pending_requests.requests, req, next);
505
506         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
507                 /* if we failed to cancel the alarm because it's already in
508                  * progress, don't proceed because otherwise we will end up
509                  * handling the same message twice.
510                  */
511                 if (rte_errno == EINPROGRESS) {
512                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
513                         goto no_trigger;
514                 }
515                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
516         }
517
518         if (action == ACTION_TRIGGER)
519                 return req;
520 no_trigger:
521         free(req);
522         return NULL;
523 }
524
525 static void
526 async_reply_handle(void *arg)
527 {
528         struct pending_request *req;
529
530         pthread_mutex_lock(&pending_requests.lock);
531         req = async_reply_handle_thread_unsafe(arg);
532         pthread_mutex_unlock(&pending_requests.lock);
533
534         if (req != NULL)
535                 trigger_async_action(req);
536 }
537
538 static int
539 open_socket_fd(void)
540 {
541         struct sockaddr_un un;
542
543         peer_name[0] = '\0';
544         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
545                 snprintf(peer_name, sizeof(peer_name),
546                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
547
548         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
549         if (mp_fd < 0) {
550                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
551                 return -1;
552         }
553
554         memset(&un, 0, sizeof(un));
555         un.sun_family = AF_UNIX;
556
557         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
558
559         unlink(un.sun_path); /* May still exist since last run */
560
561         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
562                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
563                         un.sun_path, strerror(errno));
564                 close(mp_fd);
565                 return -1;
566         }
567
568         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
569         return mp_fd;
570 }
571
572 static void
573 close_socket_fd(void)
574 {
575         char path[PATH_MAX];
576
577         if (mp_fd < 0)
578                 return;
579
580         close(mp_fd);
581         create_socket_path(peer_name, path, sizeof(path));
582         unlink(path);
583 }
584
585 int
586 rte_mp_channel_init(void)
587 {
588         char path[PATH_MAX];
589         int dir_fd;
590         pthread_t mp_handle_tid;
591         const struct internal_config *internal_conf =
592                 eal_get_internal_configuration();
593
594         /* in no shared files mode, we do not have secondary processes support,
595          * so no need to initialize IPC.
596          */
597         if (internal_conf->no_shconf) {
598                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
599                 rte_errno = ENOTSUP;
600                 return -1;
601         }
602
603         /* create filter path */
604         create_socket_path("*", path, sizeof(path));
605         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
606
607         /* path may have been modified, so recreate it */
608         create_socket_path("*", path, sizeof(path));
609         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
610
611         /* lock the directory */
612         dir_fd = open(mp_dir_path, O_RDONLY);
613         if (dir_fd < 0) {
614                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
615                         mp_dir_path, strerror(errno));
616                 return -1;
617         }
618
619         if (flock(dir_fd, LOCK_EX)) {
620                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
621                         mp_dir_path, strerror(errno));
622                 close(dir_fd);
623                 return -1;
624         }
625
626         if (open_socket_fd() < 0) {
627                 close(dir_fd);
628                 return -1;
629         }
630
631         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
632                         NULL, mp_handle, NULL) < 0) {
633                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
634                         strerror(errno));
635                 close(mp_fd);
636                 close(dir_fd);
637                 mp_fd = -1;
638                 return -1;
639         }
640
641         /* unlock the directory */
642         flock(dir_fd, LOCK_UN);
643         close(dir_fd);
644
645         return 0;
646 }
647
648 void
649 rte_mp_channel_cleanup(void)
650 {
651         close_socket_fd();
652 }
653
654 /**
655  * Return -1, as fail to send message and it's caused by the local side.
656  * Return 0, as fail to send message and it's caused by the remote side.
657  * Return 1, as succeed to send message.
658  *
659  */
660 static int
661 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
662 {
663         int snd;
664         struct iovec iov;
665         struct msghdr msgh;
666         struct cmsghdr *cmsg;
667         struct sockaddr_un dst;
668         struct mp_msg_internal m;
669         int fd_size = msg->num_fds * sizeof(int);
670         char control[CMSG_SPACE(fd_size)];
671
672         m.type = type;
673         memcpy(&m.msg, msg, sizeof(*msg));
674
675         memset(&dst, 0, sizeof(dst));
676         dst.sun_family = AF_UNIX;
677         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
678
679         memset(&msgh, 0, sizeof(msgh));
680         memset(control, 0, sizeof(control));
681
682         iov.iov_base = &m;
683         iov.iov_len = sizeof(m) - sizeof(msg->fds);
684
685         msgh.msg_name = &dst;
686         msgh.msg_namelen = sizeof(dst);
687         msgh.msg_iov = &iov;
688         msgh.msg_iovlen = 1;
689         msgh.msg_control = control;
690         msgh.msg_controllen = sizeof(control);
691
692         cmsg = CMSG_FIRSTHDR(&msgh);
693         cmsg->cmsg_len = CMSG_LEN(fd_size);
694         cmsg->cmsg_level = SOL_SOCKET;
695         cmsg->cmsg_type = SCM_RIGHTS;
696         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
697
698         do {
699                 snd = sendmsg(mp_fd, &msgh, 0);
700         } while (snd < 0 && errno == EINTR);
701
702         if (snd < 0) {
703                 rte_errno = errno;
704                 /* Check if it caused by peer process exits */
705                 if (errno == ECONNREFUSED &&
706                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
707                         unlink(dst_path);
708                         return 0;
709                 }
710                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
711                         dst_path, strerror(errno));
712                 return -1;
713         }
714
715         return 1;
716 }
717
718 static int
719 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
720 {
721         int dir_fd, ret = 0;
722         DIR *mp_dir;
723         struct dirent *ent;
724
725         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
726                 peer = eal_mp_socket_path();
727
728         if (peer) {
729                 if (send_msg(peer, msg, type) < 0)
730                         return -1;
731                 else
732                         return 0;
733         }
734
735         /* broadcast to all secondary processes */
736         mp_dir = opendir(mp_dir_path);
737         if (!mp_dir) {
738                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
739                                 mp_dir_path);
740                 rte_errno = errno;
741                 return -1;
742         }
743
744         dir_fd = dirfd(mp_dir);
745         /* lock the directory to prevent processes spinning up while we send */
746         if (flock(dir_fd, LOCK_SH)) {
747                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
748                         mp_dir_path);
749                 rte_errno = errno;
750                 closedir(mp_dir);
751                 return -1;
752         }
753
754         while ((ent = readdir(mp_dir))) {
755                 char path[PATH_MAX];
756
757                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
758                         continue;
759
760                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
761                          ent->d_name);
762                 if (send_msg(path, msg, type) < 0)
763                         ret = -1;
764         }
765         /* unlock the dir */
766         flock(dir_fd, LOCK_UN);
767
768         /* dir_fd automatically closed on closedir */
769         closedir(mp_dir);
770         return ret;
771 }
772
773 static int
774 check_input(const struct rte_mp_msg *msg)
775 {
776         if (msg == NULL) {
777                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
778                 rte_errno = EINVAL;
779                 return -1;
780         }
781
782         if (validate_action_name(msg->name) != 0)
783                 return -1;
784
785         if (msg->len_param < 0) {
786                 RTE_LOG(ERR, EAL, "Message data length is negative\n");
787                 rte_errno = EINVAL;
788                 return -1;
789         }
790
791         if (msg->num_fds < 0) {
792                 RTE_LOG(ERR, EAL, "Number of fd's is negative\n");
793                 rte_errno = EINVAL;
794                 return -1;
795         }
796
797         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
798                 RTE_LOG(ERR, EAL, "Message data is too long\n");
799                 rte_errno = E2BIG;
800                 return -1;
801         }
802
803         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
804                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
805                         RTE_MP_MAX_FD_NUM);
806                 rte_errno = E2BIG;
807                 return -1;
808         }
809
810         return 0;
811 }
812
813 int
814 rte_mp_sendmsg(struct rte_mp_msg *msg)
815 {
816         const struct internal_config *internal_conf =
817                 eal_get_internal_configuration();
818
819         if (check_input(msg) != 0)
820                 return -1;
821
822         if (internal_conf->no_shconf) {
823                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
824                 rte_errno = ENOTSUP;
825                 return -1;
826         }
827
828         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
829         return mp_send(msg, NULL, MP_MSG);
830 }
831
832 static int
833 mp_request_async(const char *dst, struct rte_mp_msg *req,
834                 struct async_request_param *param, const struct timespec *ts)
835 {
836         struct rte_mp_msg *reply_msg;
837         struct pending_request *pending_req, *exist;
838         int ret = -1;
839
840         pending_req = calloc(1, sizeof(*pending_req));
841         reply_msg = calloc(1, sizeof(*reply_msg));
842         if (pending_req == NULL || reply_msg == NULL) {
843                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
844                 rte_errno = ENOMEM;
845                 ret = -1;
846                 goto fail;
847         }
848
849         pending_req->type = REQUEST_TYPE_ASYNC;
850         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
851         pending_req->request = req;
852         pending_req->reply = reply_msg;
853         pending_req->async.param = param;
854
855         /* queue already locked by caller */
856
857         exist = find_pending_request(dst, req->name);
858         if (exist) {
859                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
860                 rte_errno = EEXIST;
861                 ret = -1;
862                 goto fail;
863         }
864
865         ret = send_msg(dst, req, MP_REQ);
866         if (ret < 0) {
867                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
868                         dst, req->name);
869                 ret = -1;
870                 goto fail;
871         } else if (ret == 0) {
872                 ret = 0;
873                 goto fail;
874         }
875         param->user_reply.nb_sent++;
876
877         /* if alarm set fails, we simply ignore the reply */
878         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
879                               async_reply_handle, pending_req) < 0) {
880                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
881                         dst, req->name);
882                 ret = -1;
883                 goto fail;
884         }
885         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
886
887         return 0;
888 fail:
889         free(pending_req);
890         free(reply_msg);
891         return ret;
892 }
893
894 static int
895 mp_request_sync(const char *dst, struct rte_mp_msg *req,
896                struct rte_mp_reply *reply, const struct timespec *ts)
897 {
898         int ret;
899         struct rte_mp_msg msg, *tmp;
900         struct pending_request pending_req, *exist;
901
902         pending_req.type = REQUEST_TYPE_SYNC;
903         pending_req.reply_received = 0;
904         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
905         pending_req.request = req;
906         pending_req.reply = &msg;
907         pthread_cond_init(&pending_req.sync.cond, NULL);
908
909         exist = find_pending_request(dst, req->name);
910         if (exist) {
911                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
912                 rte_errno = EEXIST;
913                 return -1;
914         }
915
916         ret = send_msg(dst, req, MP_REQ);
917         if (ret < 0) {
918                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
919                         dst, req->name);
920                 return -1;
921         } else if (ret == 0)
922                 return 0;
923
924         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
925
926         reply->nb_sent++;
927
928         do {
929                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
930                                 &pending_requests.lock, ts);
931         } while (ret != 0 && ret != ETIMEDOUT);
932
933         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
934
935         if (pending_req.reply_received == 0) {
936                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
937                         dst, req->name);
938                 rte_errno = ETIMEDOUT;
939                 return -1;
940         }
941         if (pending_req.reply_received == -1) {
942                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
943                 /* not receiving this message is not an error, so decrement
944                  * number of sent messages
945                  */
946                 reply->nb_sent--;
947                 return 0;
948         }
949
950         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
951         if (!tmp) {
952                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
953                         dst, req->name);
954                 rte_errno = ENOMEM;
955                 return -1;
956         }
957         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
958         reply->msgs = tmp;
959         reply->nb_received++;
960         return 0;
961 }
962
963 int
964 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
965                 const struct timespec *ts)
966 {
967         int dir_fd, ret = -1;
968         DIR *mp_dir;
969         struct dirent *ent;
970         struct timeval now;
971         struct timespec end;
972         const struct internal_config *internal_conf =
973                 eal_get_internal_configuration();
974
975         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
976
977         reply->nb_sent = 0;
978         reply->nb_received = 0;
979         reply->msgs = NULL;
980
981         if (check_input(req) != 0)
982                 goto end;
983
984         if (internal_conf->no_shconf) {
985                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
986                 rte_errno = ENOTSUP;
987                 return -1;
988         }
989
990         if (gettimeofday(&now, NULL) < 0) {
991                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
992                 rte_errno = errno;
993                 goto end;
994         }
995
996         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
997         end.tv_sec = now.tv_sec + ts->tv_sec +
998                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
999
1000         /* for secondary process, send request to the primary process only */
1001         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1002                 pthread_mutex_lock(&pending_requests.lock);
1003                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
1004                 pthread_mutex_unlock(&pending_requests.lock);
1005                 goto end;
1006         }
1007
1008         /* for primary process, broadcast request, and collect reply 1 by 1 */
1009         mp_dir = opendir(mp_dir_path);
1010         if (!mp_dir) {
1011                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1012                 rte_errno = errno;
1013                 goto end;
1014         }
1015
1016         dir_fd = dirfd(mp_dir);
1017         /* lock the directory to prevent processes spinning up while we send */
1018         if (flock(dir_fd, LOCK_SH)) {
1019                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1020                         mp_dir_path);
1021                 rte_errno = errno;
1022                 goto close_end;
1023         }
1024
1025         pthread_mutex_lock(&pending_requests.lock);
1026         while ((ent = readdir(mp_dir))) {
1027                 char path[PATH_MAX];
1028
1029                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1030                         continue;
1031
1032                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1033                          ent->d_name);
1034
1035                 /* unlocks the mutex while waiting for response,
1036                  * locks on receive
1037                  */
1038                 if (mp_request_sync(path, req, reply, &end))
1039                         goto unlock_end;
1040         }
1041         ret = 0;
1042
1043 unlock_end:
1044         pthread_mutex_unlock(&pending_requests.lock);
1045         /* unlock the directory */
1046         flock(dir_fd, LOCK_UN);
1047
1048 close_end:
1049         /* dir_fd automatically closed on closedir */
1050         closedir(mp_dir);
1051
1052 end:
1053         if (ret) {
1054                 free(reply->msgs);
1055                 reply->nb_received = 0;
1056                 reply->msgs = NULL;
1057         }
1058         return ret;
1059 }
1060
1061 int
1062 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
1063                 rte_mp_async_reply_t clb)
1064 {
1065         struct rte_mp_msg *copy;
1066         struct pending_request *dummy;
1067         struct async_request_param *param;
1068         struct rte_mp_reply *reply;
1069         int dir_fd, ret = 0;
1070         DIR *mp_dir;
1071         struct dirent *ent;
1072         struct timeval now;
1073         struct timespec *end;
1074         bool dummy_used = false;
1075         const struct internal_config *internal_conf =
1076                 eal_get_internal_configuration();
1077
1078         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
1079
1080         if (check_input(req) != 0)
1081                 return -1;
1082
1083         if (internal_conf->no_shconf) {
1084                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1085                 rte_errno = ENOTSUP;
1086                 return -1;
1087         }
1088
1089         if (gettimeofday(&now, NULL) < 0) {
1090                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
1091                 rte_errno = errno;
1092                 return -1;
1093         }
1094         copy = calloc(1, sizeof(*copy));
1095         dummy = calloc(1, sizeof(*dummy));
1096         param = calloc(1, sizeof(*param));
1097         if (copy == NULL || dummy == NULL || param == NULL) {
1098                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1099                 rte_errno = ENOMEM;
1100                 goto fail;
1101         }
1102
1103         /* copy message */
1104         memcpy(copy, req, sizeof(*copy));
1105
1106         param->n_responses_processed = 0;
1107         param->clb = clb;
1108         end = &param->end;
1109         reply = &param->user_reply;
1110
1111         end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
1112         end->tv_sec = now.tv_sec + ts->tv_sec +
1113                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
1114         reply->nb_sent = 0;
1115         reply->nb_received = 0;
1116         reply->msgs = NULL;
1117
1118         /* we have to lock the request queue here, as we will be adding a bunch
1119          * of requests to the queue at once, and some of the replies may arrive
1120          * before we add all of the requests to the queue.
1121          */
1122         pthread_mutex_lock(&pending_requests.lock);
1123
1124         /* we have to ensure that callback gets triggered even if we don't send
1125          * anything, therefore earlier we have allocated a dummy request. fill
1126          * it, and put it on the queue if we don't send any requests.
1127          */
1128         dummy->type = REQUEST_TYPE_ASYNC;
1129         dummy->request = copy;
1130         dummy->reply = NULL;
1131         dummy->async.param = param;
1132         dummy->reply_received = 1; /* short-circuit the timeout */
1133
1134         /* for secondary process, send request to the primary process only */
1135         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1136                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1137
1138                 /* if we didn't send anything, put dummy request on the queue */
1139                 if (ret == 0 && reply->nb_sent == 0) {
1140                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1141                                         next);
1142                         dummy_used = true;
1143                 }
1144
1145                 pthread_mutex_unlock(&pending_requests.lock);
1146
1147                 /* if we couldn't send anything, clean up */
1148                 if (ret != 0)
1149                         goto fail;
1150                 return 0;
1151         }
1152
1153         /* for primary process, broadcast request */
1154         mp_dir = opendir(mp_dir_path);
1155         if (!mp_dir) {
1156                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1157                 rte_errno = errno;
1158                 goto unlock_fail;
1159         }
1160         dir_fd = dirfd(mp_dir);
1161
1162         /* lock the directory to prevent processes spinning up while we send */
1163         if (flock(dir_fd, LOCK_SH)) {
1164                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1165                         mp_dir_path);
1166                 rte_errno = errno;
1167                 goto closedir_fail;
1168         }
1169
1170         while ((ent = readdir(mp_dir))) {
1171                 char path[PATH_MAX];
1172
1173                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1174                         continue;
1175
1176                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1177                          ent->d_name);
1178
1179                 if (mp_request_async(path, copy, param, ts))
1180                         ret = -1;
1181         }
1182         /* if we didn't send anything, put dummy request on the queue */
1183         if (ret == 0 && reply->nb_sent == 0) {
1184                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1185                 dummy_used = true;
1186         }
1187
1188         /* finally, unlock the queue */
1189         pthread_mutex_unlock(&pending_requests.lock);
1190
1191         /* unlock the directory */
1192         flock(dir_fd, LOCK_UN);
1193
1194         /* dir_fd automatically closed on closedir */
1195         closedir(mp_dir);
1196
1197         /* if dummy was unused, free it */
1198         if (!dummy_used)
1199                 free(dummy);
1200
1201         return ret;
1202 closedir_fail:
1203         closedir(mp_dir);
1204 unlock_fail:
1205         pthread_mutex_unlock(&pending_requests.lock);
1206 fail:
1207         free(dummy);
1208         free(param);
1209         free(copy);
1210         return -1;
1211 }
1212
1213 int
1214 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1215 {
1216         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1217         const struct internal_config *internal_conf =
1218                 eal_get_internal_configuration();
1219
1220         if (check_input(msg) != 0)
1221                 return -1;
1222
1223         if (peer == NULL) {
1224                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1225                 rte_errno = EINVAL;
1226                 return -1;
1227         }
1228
1229         if (internal_conf->no_shconf) {
1230                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1231                 return 0;
1232         }
1233
1234         return mp_send(msg, peer, MP_REP);
1235 }
1236
1237 /* Internally, the status of the mp feature is represented as a three-state:
1238  * - "unknown" as long as no secondary process attached to a primary process
1239  *   and there was no call to __rte_mp_disable yet,
1240  * - "enabled" as soon as a secondary process attaches to a primary process,
1241  * - "disabled" when a primary process successfully called __rte_mp_disable,
1242  */
1243 enum mp_status {
1244         MP_STATUS_UNKNOWN,
1245         MP_STATUS_DISABLED,
1246         MP_STATUS_ENABLED,
1247 };
1248
1249 static bool
1250 set_mp_status(enum mp_status status)
1251 {
1252         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
1253         uint8_t expected;
1254         uint8_t desired;
1255
1256         expected = MP_STATUS_UNKNOWN;
1257         desired = status;
1258         if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired,
1259                         false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
1260                 return true;
1261
1262         return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired;
1263 }
1264
1265 bool
1266 __rte_mp_disable(void)
1267 {
1268         return set_mp_status(MP_STATUS_DISABLED);
1269 }
1270
1271 bool
1272 __rte_mp_enable(void)
1273 {
1274         return set_mp_status(MP_STATUS_ENABLED);
1275 }