cryptodev: reorganize asymmetric structs
[dpdk.git] / lib / eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_memcfg.h"
33 #include "eal_private.h"
34 #include "eal_filesystem.h"
35 #include "eal_internal_cfg.h"
36
37 static int mp_fd = -1;
38 static pthread_t mp_handle_tid;
39 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
40 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
41 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
42 static char peer_name[PATH_MAX];
43
44 struct action_entry {
45         TAILQ_ENTRY(action_entry) next;
46         char action_name[RTE_MP_MAX_NAME_LEN];
47         rte_mp_t action;
48 };
49
50 /** Double linked list of actions. */
51 TAILQ_HEAD(action_entry_list, action_entry);
52
53 static struct action_entry_list action_entry_list =
54         TAILQ_HEAD_INITIALIZER(action_entry_list);
55
56 enum mp_type {
57         MP_MSG, /* Share message with peers, will not block */
58         MP_REQ, /* Request for information, Will block for a reply */
59         MP_REP, /* Response to previously-received request */
60         MP_IGN, /* Response telling requester to ignore this response */
61 };
62
63 struct mp_msg_internal {
64         int type;
65         struct rte_mp_msg msg;
66 };
67
68 struct async_request_param {
69         rte_mp_async_reply_t clb;
70         struct rte_mp_reply user_reply;
71         struct timespec end;
72         int n_responses_processed;
73 };
74
75 struct pending_request {
76         TAILQ_ENTRY(pending_request) next;
77         enum {
78                 REQUEST_TYPE_SYNC,
79                 REQUEST_TYPE_ASYNC
80         } type;
81         char dst[PATH_MAX];
82         struct rte_mp_msg *request;
83         struct rte_mp_msg *reply;
84         int reply_received;
85         RTE_STD_C11
86         union {
87                 struct {
88                         struct async_request_param *param;
89                 } async;
90                 struct {
91                         pthread_cond_t cond;
92                 } sync;
93         };
94 };
95
96 TAILQ_HEAD(pending_request_list, pending_request);
97
98 static struct {
99         struct pending_request_list requests;
100         pthread_mutex_t lock;
101 } pending_requests = {
102         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
103         .lock = PTHREAD_MUTEX_INITIALIZER,
104         /**< used in async requests only */
105 };
106
107 /* forward declarations */
108 static int
109 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
110
111 /* for use with alarm callback */
112 static void
113 async_reply_handle(void *arg);
114
115 /* for use with process_msg */
116 static struct pending_request *
117 async_reply_handle_thread_unsafe(void *arg);
118
119 static void
120 trigger_async_action(struct pending_request *req);
121
122 static struct pending_request *
123 find_pending_request(const char *dst, const char *act_name)
124 {
125         struct pending_request *r;
126
127         TAILQ_FOREACH(r, &pending_requests.requests, next) {
128                 if (!strcmp(r->dst, dst) &&
129                     !strcmp(r->request->name, act_name))
130                         break;
131         }
132
133         return r;
134 }
135
136 static void
137 create_socket_path(const char *name, char *buf, int len)
138 {
139         const char *prefix = eal_mp_socket_path();
140
141         if (strlen(name) > 0)
142                 snprintf(buf, len, "%s_%s", prefix, name);
143         else
144                 strlcpy(buf, prefix, len);
145 }
146
147 int
148 rte_eal_primary_proc_alive(const char *config_file_path)
149 {
150         int config_fd;
151
152         if (config_file_path)
153                 config_fd = open(config_file_path, O_RDONLY);
154         else {
155                 const char *path;
156
157                 path = eal_runtime_config_path();
158                 config_fd = open(path, O_RDONLY);
159         }
160         if (config_fd < 0)
161                 return 0;
162
163         int ret = lockf(config_fd, F_TEST, 0);
164         close(config_fd);
165
166         return !!ret;
167 }
168
169 static struct action_entry *
170 find_action_entry_by_name(const char *name)
171 {
172         struct action_entry *entry;
173
174         TAILQ_FOREACH(entry, &action_entry_list, next) {
175                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
176                         break;
177         }
178
179         return entry;
180 }
181
182 static int
183 validate_action_name(const char *name)
184 {
185         if (name == NULL) {
186                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
187                 rte_errno = EINVAL;
188                 return -1;
189         }
190         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
191                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
192                 rte_errno = EINVAL;
193                 return -1;
194         }
195         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
196                 rte_errno = E2BIG;
197                 return -1;
198         }
199         return 0;
200 }
201
202 int
203 rte_mp_action_register(const char *name, rte_mp_t action)
204 {
205         struct action_entry *entry;
206         const struct internal_config *internal_conf =
207                 eal_get_internal_configuration();
208
209         if (validate_action_name(name) != 0)
210                 return -1;
211
212         if (internal_conf->no_shconf) {
213                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
214                 rte_errno = ENOTSUP;
215                 return -1;
216         }
217
218         entry = malloc(sizeof(struct action_entry));
219         if (entry == NULL) {
220                 rte_errno = ENOMEM;
221                 return -1;
222         }
223         strlcpy(entry->action_name, name, sizeof(entry->action_name));
224         entry->action = action;
225
226         pthread_mutex_lock(&mp_mutex_action);
227         if (find_action_entry_by_name(name) != NULL) {
228                 pthread_mutex_unlock(&mp_mutex_action);
229                 rte_errno = EEXIST;
230                 free(entry);
231                 return -1;
232         }
233         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
234         pthread_mutex_unlock(&mp_mutex_action);
235         return 0;
236 }
237
238 void
239 rte_mp_action_unregister(const char *name)
240 {
241         struct action_entry *entry;
242         const struct internal_config *internal_conf =
243                 eal_get_internal_configuration();
244
245         if (validate_action_name(name) != 0)
246                 return;
247
248         if (internal_conf->no_shconf) {
249                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
250                 return;
251         }
252
253         pthread_mutex_lock(&mp_mutex_action);
254         entry = find_action_entry_by_name(name);
255         if (entry == NULL) {
256                 pthread_mutex_unlock(&mp_mutex_action);
257                 return;
258         }
259         TAILQ_REMOVE(&action_entry_list, entry, next);
260         pthread_mutex_unlock(&mp_mutex_action);
261         free(entry);
262 }
263
264 static int
265 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
266 {
267         int msglen;
268         struct iovec iov;
269         struct msghdr msgh;
270         char control[CMSG_SPACE(sizeof(m->msg.fds))];
271         struct cmsghdr *cmsg;
272         int buflen = sizeof(*m) - sizeof(m->msg.fds);
273
274         memset(&msgh, 0, sizeof(msgh));
275         iov.iov_base = m;
276         iov.iov_len  = buflen;
277
278         msgh.msg_name = s;
279         msgh.msg_namelen = sizeof(*s);
280         msgh.msg_iov = &iov;
281         msgh.msg_iovlen = 1;
282         msgh.msg_control = control;
283         msgh.msg_controllen = sizeof(control);
284
285 retry:
286         msglen = recvmsg(mp_fd, &msgh, 0);
287
288         /* zero length message means socket was closed */
289         if (msglen == 0)
290                 return 0;
291
292         if (msglen < 0) {
293                 if (errno == EINTR)
294                         goto retry;
295
296                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
297                 return -1;
298         }
299
300         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
301                 RTE_LOG(ERR, EAL, "truncated msg\n");
302                 return -1;
303         }
304
305         /* read auxiliary FDs if any */
306         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
307                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
308                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
309                         (cmsg->cmsg_type == SCM_RIGHTS)) {
310                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
311                         break;
312                 }
313         }
314         /* sanity-check the response */
315         if (m->msg.num_fds < 0 || m->msg.num_fds > RTE_MP_MAX_FD_NUM) {
316                 RTE_LOG(ERR, EAL, "invalid number of fd's received\n");
317                 return -1;
318         }
319         if (m->msg.len_param < 0 || m->msg.len_param > RTE_MP_MAX_PARAM_LEN) {
320                 RTE_LOG(ERR, EAL, "invalid received data length\n");
321                 return -1;
322         }
323         return msglen;
324 }
325
326 static void
327 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
328 {
329         struct pending_request *pending_req;
330         struct action_entry *entry;
331         struct rte_mp_msg *msg = &m->msg;
332         rte_mp_t action = NULL;
333         const struct internal_config *internal_conf =
334                 eal_get_internal_configuration();
335
336         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
337
338         if (m->type == MP_REP || m->type == MP_IGN) {
339                 struct pending_request *req = NULL;
340
341                 pthread_mutex_lock(&pending_requests.lock);
342                 pending_req = find_pending_request(s->sun_path, msg->name);
343                 if (pending_req) {
344                         memcpy(pending_req->reply, msg, sizeof(*msg));
345                         /* -1 indicates that we've been asked to ignore */
346                         pending_req->reply_received =
347                                 m->type == MP_REP ? 1 : -1;
348
349                         if (pending_req->type == REQUEST_TYPE_SYNC)
350                                 pthread_cond_signal(&pending_req->sync.cond);
351                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
352                                 req = async_reply_handle_thread_unsafe(
353                                                 pending_req);
354                 } else
355                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
356                 pthread_mutex_unlock(&pending_requests.lock);
357
358                 if (req != NULL)
359                         trigger_async_action(req);
360                 return;
361         }
362
363         pthread_mutex_lock(&mp_mutex_action);
364         entry = find_action_entry_by_name(msg->name);
365         if (entry != NULL)
366                 action = entry->action;
367         pthread_mutex_unlock(&mp_mutex_action);
368
369         if (!action) {
370                 if (m->type == MP_REQ && !internal_conf->init_complete) {
371                         /* if this is a request, and init is not yet complete,
372                          * and callback wasn't registered, we should tell the
373                          * requester to ignore our existence because we're not
374                          * yet ready to process this request.
375                          */
376                         struct rte_mp_msg dummy;
377
378                         memset(&dummy, 0, sizeof(dummy));
379                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
380                         mp_send(&dummy, s->sun_path, MP_IGN);
381                 } else {
382                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
383                                 msg->name);
384                 }
385         } else if (action(msg, s->sun_path) < 0) {
386                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
387         }
388 }
389
390 static void *
391 mp_handle(void *arg __rte_unused)
392 {
393         struct mp_msg_internal msg;
394         struct sockaddr_un sa;
395
396         while (mp_fd >= 0) {
397                 int ret;
398
399                 ret = read_msg(&msg, &sa);
400                 if (ret <= 0)
401                         break;
402
403                 process_msg(&msg, &sa);
404         }
405
406         return NULL;
407 }
408
409 static int
410 timespec_cmp(const struct timespec *a, const struct timespec *b)
411 {
412         if (a->tv_sec < b->tv_sec)
413                 return -1;
414         if (a->tv_sec > b->tv_sec)
415                 return 1;
416         if (a->tv_nsec < b->tv_nsec)
417                 return -1;
418         if (a->tv_nsec > b->tv_nsec)
419                 return 1;
420         return 0;
421 }
422
423 enum async_action {
424         ACTION_FREE, /**< free the action entry, but don't trigger callback */
425         ACTION_TRIGGER /**< trigger callback, then free action entry */
426 };
427
428 static enum async_action
429 process_async_request(struct pending_request *sr, const struct timespec *now)
430 {
431         struct async_request_param *param;
432         struct rte_mp_reply *reply;
433         bool timeout, last_msg;
434
435         param = sr->async.param;
436         reply = &param->user_reply;
437
438         /* did we timeout? */
439         timeout = timespec_cmp(&param->end, now) <= 0;
440
441         /* if we received a response, adjust relevant data and copy message. */
442         if (sr->reply_received == 1 && sr->reply) {
443                 struct rte_mp_msg *msg, *user_msgs, *tmp;
444
445                 msg = sr->reply;
446                 user_msgs = reply->msgs;
447
448                 tmp = realloc(user_msgs, sizeof(*msg) *
449                                 (reply->nb_received + 1));
450                 if (!tmp) {
451                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
452                                 sr->dst, sr->request->name);
453                         /* this entry is going to be removed and its message
454                          * dropped, but we don't want to leak memory, so
455                          * continue.
456                          */
457                 } else {
458                         user_msgs = tmp;
459                         reply->msgs = user_msgs;
460                         memcpy(&user_msgs[reply->nb_received],
461                                         msg, sizeof(*msg));
462                         reply->nb_received++;
463                 }
464
465                 /* mark this request as processed */
466                 param->n_responses_processed++;
467         } else if (sr->reply_received == -1) {
468                 /* we were asked to ignore this process */
469                 reply->nb_sent--;
470         } else if (timeout) {
471                 /* count it as processed response, but don't increment
472                  * nb_received.
473                  */
474                 param->n_responses_processed++;
475         }
476
477         free(sr->reply);
478
479         last_msg = param->n_responses_processed == reply->nb_sent;
480
481         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
482 }
483
484 static void
485 trigger_async_action(struct pending_request *sr)
486 {
487         struct async_request_param *param;
488         struct rte_mp_reply *reply;
489
490         param = sr->async.param;
491         reply = &param->user_reply;
492
493         param->clb(sr->request, reply);
494
495         /* clean up */
496         free(sr->async.param->user_reply.msgs);
497         free(sr->async.param);
498         free(sr->request);
499         free(sr);
500 }
501
502 static struct pending_request *
503 async_reply_handle_thread_unsafe(void *arg)
504 {
505         struct pending_request *req = (struct pending_request *)arg;
506         enum async_action action;
507         struct timespec ts_now;
508
509         if (clock_gettime(CLOCK_MONOTONIC, &ts_now) < 0) {
510                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
511                 goto no_trigger;
512         }
513
514         action = process_async_request(req, &ts_now);
515
516         TAILQ_REMOVE(&pending_requests.requests, req, next);
517
518         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
519                 /* if we failed to cancel the alarm because it's already in
520                  * progress, don't proceed because otherwise we will end up
521                  * handling the same message twice.
522                  */
523                 if (rte_errno == EINPROGRESS) {
524                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
525                         goto no_trigger;
526                 }
527                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
528         }
529
530         if (action == ACTION_TRIGGER)
531                 return req;
532 no_trigger:
533         free(req);
534         return NULL;
535 }
536
537 static void
538 async_reply_handle(void *arg)
539 {
540         struct pending_request *req;
541
542         pthread_mutex_lock(&pending_requests.lock);
543         req = async_reply_handle_thread_unsafe(arg);
544         pthread_mutex_unlock(&pending_requests.lock);
545
546         if (req != NULL)
547                 trigger_async_action(req);
548 }
549
550 static int
551 open_socket_fd(void)
552 {
553         struct sockaddr_un un;
554
555         peer_name[0] = '\0';
556         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
557                 snprintf(peer_name, sizeof(peer_name),
558                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
559
560         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
561         if (mp_fd < 0) {
562                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
563                 return -1;
564         }
565
566         memset(&un, 0, sizeof(un));
567         un.sun_family = AF_UNIX;
568
569         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
570
571         unlink(un.sun_path); /* May still exist since last run */
572
573         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
574                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
575                         un.sun_path, strerror(errno));
576                 close(mp_fd);
577                 return -1;
578         }
579
580         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
581         return mp_fd;
582 }
583
584 static void
585 close_socket_fd(int fd)
586 {
587         char path[PATH_MAX];
588
589         close(fd);
590         create_socket_path(peer_name, path, sizeof(path));
591         unlink(path);
592 }
593
594 int
595 rte_mp_channel_init(void)
596 {
597         char path[PATH_MAX];
598         int dir_fd;
599         const struct internal_config *internal_conf =
600                 eal_get_internal_configuration();
601
602         /* in no shared files mode, we do not have secondary processes support,
603          * so no need to initialize IPC.
604          */
605         if (internal_conf->no_shconf) {
606                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
607                 rte_errno = ENOTSUP;
608                 return -1;
609         }
610
611         /* create filter path */
612         create_socket_path("*", path, sizeof(path));
613         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
614
615         /* path may have been modified, so recreate it */
616         create_socket_path("*", path, sizeof(path));
617         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
618
619         /* lock the directory */
620         dir_fd = open(mp_dir_path, O_RDONLY);
621         if (dir_fd < 0) {
622                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
623                         mp_dir_path, strerror(errno));
624                 return -1;
625         }
626
627         if (flock(dir_fd, LOCK_EX)) {
628                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
629                         mp_dir_path, strerror(errno));
630                 close(dir_fd);
631                 return -1;
632         }
633
634         if (open_socket_fd() < 0) {
635                 close(dir_fd);
636                 return -1;
637         }
638
639         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
640                         NULL, mp_handle, NULL) < 0) {
641                 RTE_LOG(ERR, EAL, "failed to create mp thread: %s\n",
642                         strerror(errno));
643                 close(mp_fd);
644                 close(dir_fd);
645                 mp_fd = -1;
646                 return -1;
647         }
648
649         /* unlock the directory */
650         flock(dir_fd, LOCK_UN);
651         close(dir_fd);
652
653         return 0;
654 }
655
656 void
657 rte_mp_channel_cleanup(void)
658 {
659         int fd;
660
661         if (mp_fd < 0)
662                 return;
663
664         fd = mp_fd;
665         mp_fd = -1;
666         pthread_cancel(mp_handle_tid);
667         pthread_join(mp_handle_tid, NULL);
668         close_socket_fd(fd);
669 }
670
671 /**
672  * Return -1, as fail to send message and it's caused by the local side.
673  * Return 0, as fail to send message and it's caused by the remote side.
674  * Return 1, as succeed to send message.
675  *
676  */
677 static int
678 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
679 {
680         int snd;
681         struct iovec iov;
682         struct msghdr msgh;
683         struct cmsghdr *cmsg;
684         struct sockaddr_un dst;
685         struct mp_msg_internal m;
686         int fd_size = msg->num_fds * sizeof(int);
687         char control[CMSG_SPACE(fd_size)];
688
689         m.type = type;
690         memcpy(&m.msg, msg, sizeof(*msg));
691
692         memset(&dst, 0, sizeof(dst));
693         dst.sun_family = AF_UNIX;
694         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
695
696         memset(&msgh, 0, sizeof(msgh));
697         memset(control, 0, sizeof(control));
698
699         iov.iov_base = &m;
700         iov.iov_len = sizeof(m) - sizeof(msg->fds);
701
702         msgh.msg_name = &dst;
703         msgh.msg_namelen = sizeof(dst);
704         msgh.msg_iov = &iov;
705         msgh.msg_iovlen = 1;
706         msgh.msg_control = control;
707         msgh.msg_controllen = sizeof(control);
708
709         cmsg = CMSG_FIRSTHDR(&msgh);
710         cmsg->cmsg_len = CMSG_LEN(fd_size);
711         cmsg->cmsg_level = SOL_SOCKET;
712         cmsg->cmsg_type = SCM_RIGHTS;
713         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
714
715         do {
716                 snd = sendmsg(mp_fd, &msgh, 0);
717         } while (snd < 0 && errno == EINTR);
718
719         if (snd < 0) {
720                 rte_errno = errno;
721                 /* Check if it caused by peer process exits */
722                 if (errno == ECONNREFUSED &&
723                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
724                         unlink(dst_path);
725                         return 0;
726                 }
727                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
728                         dst_path, strerror(errno));
729                 return -1;
730         }
731
732         return 1;
733 }
734
735 static int
736 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
737 {
738         int dir_fd, ret = 0;
739         DIR *mp_dir;
740         struct dirent *ent;
741
742         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
743                 peer = eal_mp_socket_path();
744
745         if (peer) {
746                 if (send_msg(peer, msg, type) < 0)
747                         return -1;
748                 else
749                         return 0;
750         }
751
752         /* broadcast to all secondary processes */
753         mp_dir = opendir(mp_dir_path);
754         if (!mp_dir) {
755                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
756                                 mp_dir_path);
757                 rte_errno = errno;
758                 return -1;
759         }
760
761         dir_fd = dirfd(mp_dir);
762         /* lock the directory to prevent processes spinning up while we send */
763         if (flock(dir_fd, LOCK_SH)) {
764                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
765                         mp_dir_path);
766                 rte_errno = errno;
767                 closedir(mp_dir);
768                 return -1;
769         }
770
771         while ((ent = readdir(mp_dir))) {
772                 char path[PATH_MAX];
773
774                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
775                         continue;
776
777                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
778                          ent->d_name);
779                 if (send_msg(path, msg, type) < 0)
780                         ret = -1;
781         }
782         /* unlock the dir */
783         flock(dir_fd, LOCK_UN);
784
785         /* dir_fd automatically closed on closedir */
786         closedir(mp_dir);
787         return ret;
788 }
789
790 static int
791 check_input(const struct rte_mp_msg *msg)
792 {
793         if (msg == NULL) {
794                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
795                 rte_errno = EINVAL;
796                 return -1;
797         }
798
799         if (validate_action_name(msg->name) != 0)
800                 return -1;
801
802         if (msg->len_param < 0) {
803                 RTE_LOG(ERR, EAL, "Message data length is negative\n");
804                 rte_errno = EINVAL;
805                 return -1;
806         }
807
808         if (msg->num_fds < 0) {
809                 RTE_LOG(ERR, EAL, "Number of fd's is negative\n");
810                 rte_errno = EINVAL;
811                 return -1;
812         }
813
814         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
815                 RTE_LOG(ERR, EAL, "Message data is too long\n");
816                 rte_errno = E2BIG;
817                 return -1;
818         }
819
820         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
821                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
822                         RTE_MP_MAX_FD_NUM);
823                 rte_errno = E2BIG;
824                 return -1;
825         }
826
827         return 0;
828 }
829
830 int
831 rte_mp_sendmsg(struct rte_mp_msg *msg)
832 {
833         const struct internal_config *internal_conf =
834                 eal_get_internal_configuration();
835
836         if (check_input(msg) != 0)
837                 return -1;
838
839         if (internal_conf->no_shconf) {
840                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
841                 rte_errno = ENOTSUP;
842                 return -1;
843         }
844
845         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
846         return mp_send(msg, NULL, MP_MSG);
847 }
848
849 static int
850 mp_request_async(const char *dst, struct rte_mp_msg *req,
851                 struct async_request_param *param, const struct timespec *ts)
852 {
853         struct rte_mp_msg *reply_msg;
854         struct pending_request *pending_req, *exist;
855         int ret = -1;
856
857         pending_req = calloc(1, sizeof(*pending_req));
858         reply_msg = calloc(1, sizeof(*reply_msg));
859         if (pending_req == NULL || reply_msg == NULL) {
860                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
861                 rte_errno = ENOMEM;
862                 ret = -1;
863                 goto fail;
864         }
865
866         pending_req->type = REQUEST_TYPE_ASYNC;
867         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
868         pending_req->request = req;
869         pending_req->reply = reply_msg;
870         pending_req->async.param = param;
871
872         /* queue already locked by caller */
873
874         exist = find_pending_request(dst, req->name);
875         if (exist) {
876                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
877                 rte_errno = EEXIST;
878                 ret = -1;
879                 goto fail;
880         }
881
882         ret = send_msg(dst, req, MP_REQ);
883         if (ret < 0) {
884                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
885                         dst, req->name);
886                 ret = -1;
887                 goto fail;
888         } else if (ret == 0) {
889                 ret = 0;
890                 goto fail;
891         }
892         param->user_reply.nb_sent++;
893
894         /* if alarm set fails, we simply ignore the reply */
895         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
896                               async_reply_handle, pending_req) < 0) {
897                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
898                         dst, req->name);
899                 ret = -1;
900                 goto fail;
901         }
902         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
903
904         return 0;
905 fail:
906         free(pending_req);
907         free(reply_msg);
908         return ret;
909 }
910
911 static int
912 mp_request_sync(const char *dst, struct rte_mp_msg *req,
913                struct rte_mp_reply *reply, const struct timespec *ts)
914 {
915         int ret;
916         pthread_condattr_t attr;
917         struct rte_mp_msg msg, *tmp;
918         struct pending_request pending_req, *exist;
919
920         pending_req.type = REQUEST_TYPE_SYNC;
921         pending_req.reply_received = 0;
922         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
923         pending_req.request = req;
924         pending_req.reply = &msg;
925         pthread_condattr_init(&attr);
926         pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
927         pthread_cond_init(&pending_req.sync.cond, &attr);
928
929         exist = find_pending_request(dst, req->name);
930         if (exist) {
931                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
932                 rte_errno = EEXIST;
933                 return -1;
934         }
935
936         ret = send_msg(dst, req, MP_REQ);
937         if (ret < 0) {
938                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
939                         dst, req->name);
940                 return -1;
941         } else if (ret == 0)
942                 return 0;
943
944         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
945
946         reply->nb_sent++;
947
948         do {
949                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
950                                 &pending_requests.lock, ts);
951         } while (ret != 0 && ret != ETIMEDOUT);
952
953         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
954
955         if (pending_req.reply_received == 0) {
956                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
957                         dst, req->name);
958                 rte_errno = ETIMEDOUT;
959                 return -1;
960         }
961         if (pending_req.reply_received == -1) {
962                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
963                 /* not receiving this message is not an error, so decrement
964                  * number of sent messages
965                  */
966                 reply->nb_sent--;
967                 return 0;
968         }
969
970         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
971         if (!tmp) {
972                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
973                         dst, req->name);
974                 rte_errno = ENOMEM;
975                 return -1;
976         }
977         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
978         reply->msgs = tmp;
979         reply->nb_received++;
980         return 0;
981 }
982
983 int
984 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
985                 const struct timespec *ts)
986 {
987         int dir_fd, ret = -1;
988         DIR *mp_dir;
989         struct dirent *ent;
990         struct timespec now, end;
991         const struct internal_config *internal_conf =
992                 eal_get_internal_configuration();
993
994         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
995
996         reply->nb_sent = 0;
997         reply->nb_received = 0;
998         reply->msgs = NULL;
999
1000         if (check_input(req) != 0)
1001                 goto end;
1002
1003         if (internal_conf->no_shconf) {
1004                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1005                 rte_errno = ENOTSUP;
1006                 return -1;
1007         }
1008
1009         if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) {
1010                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
1011                 rte_errno = errno;
1012                 goto end;
1013         }
1014
1015         end.tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000;
1016         end.tv_sec = now.tv_sec + ts->tv_sec +
1017                         (now.tv_nsec + ts->tv_nsec) / 1000000000;
1018
1019         /* for secondary process, send request to the primary process only */
1020         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1021                 pthread_mutex_lock(&pending_requests.lock);
1022                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
1023                 pthread_mutex_unlock(&pending_requests.lock);
1024                 goto end;
1025         }
1026
1027         /* for primary process, broadcast request, and collect reply 1 by 1 */
1028         mp_dir = opendir(mp_dir_path);
1029         if (!mp_dir) {
1030                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1031                 rte_errno = errno;
1032                 goto end;
1033         }
1034
1035         dir_fd = dirfd(mp_dir);
1036         /* lock the directory to prevent processes spinning up while we send */
1037         if (flock(dir_fd, LOCK_SH)) {
1038                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1039                         mp_dir_path);
1040                 rte_errno = errno;
1041                 goto close_end;
1042         }
1043
1044         pthread_mutex_lock(&pending_requests.lock);
1045         while ((ent = readdir(mp_dir))) {
1046                 char path[PATH_MAX];
1047
1048                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1049                         continue;
1050
1051                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1052                          ent->d_name);
1053
1054                 /* unlocks the mutex while waiting for response,
1055                  * locks on receive
1056                  */
1057                 if (mp_request_sync(path, req, reply, &end))
1058                         goto unlock_end;
1059         }
1060         ret = 0;
1061
1062 unlock_end:
1063         pthread_mutex_unlock(&pending_requests.lock);
1064         /* unlock the directory */
1065         flock(dir_fd, LOCK_UN);
1066
1067 close_end:
1068         /* dir_fd automatically closed on closedir */
1069         closedir(mp_dir);
1070
1071 end:
1072         if (ret) {
1073                 free(reply->msgs);
1074                 reply->nb_received = 0;
1075                 reply->msgs = NULL;
1076         }
1077         return ret;
1078 }
1079
1080 int
1081 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
1082                 rte_mp_async_reply_t clb)
1083 {
1084         struct rte_mp_msg *copy;
1085         struct pending_request *dummy;
1086         struct async_request_param *param;
1087         struct rte_mp_reply *reply;
1088         int dir_fd, ret = 0;
1089         DIR *mp_dir;
1090         struct dirent *ent;
1091         struct timespec now;
1092         struct timespec *end;
1093         bool dummy_used = false;
1094         const struct internal_config *internal_conf =
1095                 eal_get_internal_configuration();
1096
1097         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
1098
1099         if (check_input(req) != 0)
1100                 return -1;
1101
1102         if (internal_conf->no_shconf) {
1103                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1104                 rte_errno = ENOTSUP;
1105                 return -1;
1106         }
1107
1108         if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) {
1109                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
1110                 rte_errno = errno;
1111                 return -1;
1112         }
1113         copy = calloc(1, sizeof(*copy));
1114         dummy = calloc(1, sizeof(*dummy));
1115         param = calloc(1, sizeof(*param));
1116         if (copy == NULL || dummy == NULL || param == NULL) {
1117                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1118                 rte_errno = ENOMEM;
1119                 goto fail;
1120         }
1121
1122         /* copy message */
1123         memcpy(copy, req, sizeof(*copy));
1124
1125         param->n_responses_processed = 0;
1126         param->clb = clb;
1127         end = &param->end;
1128         reply = &param->user_reply;
1129
1130         end->tv_nsec = (now.tv_nsec + ts->tv_nsec) % 1000000000;
1131         end->tv_sec = now.tv_sec + ts->tv_sec +
1132                         (now.tv_nsec + ts->tv_nsec) / 1000000000;
1133         reply->nb_sent = 0;
1134         reply->nb_received = 0;
1135         reply->msgs = NULL;
1136
1137         /* we have to lock the request queue here, as we will be adding a bunch
1138          * of requests to the queue at once, and some of the replies may arrive
1139          * before we add all of the requests to the queue.
1140          */
1141         pthread_mutex_lock(&pending_requests.lock);
1142
1143         /* we have to ensure that callback gets triggered even if we don't send
1144          * anything, therefore earlier we have allocated a dummy request. fill
1145          * it, and put it on the queue if we don't send any requests.
1146          */
1147         dummy->type = REQUEST_TYPE_ASYNC;
1148         dummy->request = copy;
1149         dummy->reply = NULL;
1150         dummy->async.param = param;
1151         dummy->reply_received = 1; /* short-circuit the timeout */
1152
1153         /* for secondary process, send request to the primary process only */
1154         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1155                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1156
1157                 /* if we didn't send anything, put dummy request on the queue */
1158                 if (ret == 0 && reply->nb_sent == 0) {
1159                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1160                                         next);
1161                         dummy_used = true;
1162                 }
1163
1164                 pthread_mutex_unlock(&pending_requests.lock);
1165
1166                 /* if we couldn't send anything, clean up */
1167                 if (ret != 0)
1168                         goto fail;
1169                 return 0;
1170         }
1171
1172         /* for primary process, broadcast request */
1173         mp_dir = opendir(mp_dir_path);
1174         if (!mp_dir) {
1175                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1176                 rte_errno = errno;
1177                 goto unlock_fail;
1178         }
1179         dir_fd = dirfd(mp_dir);
1180
1181         /* lock the directory to prevent processes spinning up while we send */
1182         if (flock(dir_fd, LOCK_SH)) {
1183                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1184                         mp_dir_path);
1185                 rte_errno = errno;
1186                 goto closedir_fail;
1187         }
1188
1189         while ((ent = readdir(mp_dir))) {
1190                 char path[PATH_MAX];
1191
1192                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1193                         continue;
1194
1195                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1196                          ent->d_name);
1197
1198                 if (mp_request_async(path, copy, param, ts))
1199                         ret = -1;
1200         }
1201         /* if we didn't send anything, put dummy request on the queue */
1202         if (ret == 0 && reply->nb_sent == 0) {
1203                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1204                 dummy_used = true;
1205         }
1206
1207         /* finally, unlock the queue */
1208         pthread_mutex_unlock(&pending_requests.lock);
1209
1210         /* unlock the directory */
1211         flock(dir_fd, LOCK_UN);
1212
1213         /* dir_fd automatically closed on closedir */
1214         closedir(mp_dir);
1215
1216         /* if dummy was unused, free it */
1217         if (!dummy_used)
1218                 free(dummy);
1219
1220         return ret;
1221 closedir_fail:
1222         closedir(mp_dir);
1223 unlock_fail:
1224         pthread_mutex_unlock(&pending_requests.lock);
1225 fail:
1226         free(dummy);
1227         free(param);
1228         free(copy);
1229         return -1;
1230 }
1231
1232 int
1233 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1234 {
1235         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1236         const struct internal_config *internal_conf =
1237                 eal_get_internal_configuration();
1238
1239         if (check_input(msg) != 0)
1240                 return -1;
1241
1242         if (peer == NULL) {
1243                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1244                 rte_errno = EINVAL;
1245                 return -1;
1246         }
1247
1248         if (internal_conf->no_shconf) {
1249                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1250                 return 0;
1251         }
1252
1253         return mp_send(msg, peer, MP_REP);
1254 }
1255
1256 /* Internally, the status of the mp feature is represented as a three-state:
1257  * - "unknown" as long as no secondary process attached to a primary process
1258  *   and there was no call to rte_mp_disable yet,
1259  * - "enabled" as soon as a secondary process attaches to a primary process,
1260  * - "disabled" when a primary process successfully called rte_mp_disable,
1261  */
1262 enum mp_status {
1263         MP_STATUS_UNKNOWN,
1264         MP_STATUS_DISABLED,
1265         MP_STATUS_ENABLED,
1266 };
1267
1268 static bool
1269 set_mp_status(enum mp_status status)
1270 {
1271         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
1272         uint8_t expected;
1273         uint8_t desired;
1274
1275         expected = MP_STATUS_UNKNOWN;
1276         desired = status;
1277         if (__atomic_compare_exchange_n(&mcfg->mp_status, &expected, desired,
1278                         false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
1279                 return true;
1280
1281         return __atomic_load_n(&mcfg->mp_status, __ATOMIC_RELAXED) == desired;
1282 }
1283
1284 bool
1285 rte_mp_disable(void)
1286 {
1287         return set_mp_status(MP_STATUS_DISABLED);
1288 }
1289
1290 bool
1291 __rte_mp_enable(void)
1292 {
1293         return set_mp_status(MP_STATUS_ENABLED);
1294 }