rwlock: introduce try semantics
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_alarm.h>
24 #include <rte_common.h>
25 #include <rte_cycles.h>
26 #include <rte_eal.h>
27 #include <rte_errno.h>
28 #include <rte_lcore.h>
29 #include <rte_log.h>
30 #include <rte_tailq.h>
31
32 #include "eal_private.h"
33 #include "eal_filesystem.h"
34 #include "eal_internal_cfg.h"
35
36 static int mp_fd = -1;
37 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
38 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
39 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
40
41 struct action_entry {
42         TAILQ_ENTRY(action_entry) next;
43         char action_name[RTE_MP_MAX_NAME_LEN];
44         rte_mp_t action;
45 };
46
47 /** Double linked list of actions. */
48 TAILQ_HEAD(action_entry_list, action_entry);
49
50 static struct action_entry_list action_entry_list =
51         TAILQ_HEAD_INITIALIZER(action_entry_list);
52
53 enum mp_type {
54         MP_MSG, /* Share message with peers, will not block */
55         MP_REQ, /* Request for information, Will block for a reply */
56         MP_REP, /* Response to previously-received request */
57         MP_IGN, /* Response telling requester to ignore this response */
58 };
59
60 struct mp_msg_internal {
61         int type;
62         struct rte_mp_msg msg;
63 };
64
65 struct async_request_param {
66         rte_mp_async_reply_t clb;
67         struct rte_mp_reply user_reply;
68         struct timespec end;
69         int n_responses_processed;
70 };
71
72 struct pending_request {
73         TAILQ_ENTRY(pending_request) next;
74         enum {
75                 REQUEST_TYPE_SYNC,
76                 REQUEST_TYPE_ASYNC
77         } type;
78         char dst[PATH_MAX];
79         struct rte_mp_msg *request;
80         struct rte_mp_msg *reply;
81         int reply_received;
82         RTE_STD_C11
83         union {
84                 struct {
85                         struct async_request_param *param;
86                 } async;
87                 struct {
88                         pthread_cond_t cond;
89                 } sync;
90         };
91 };
92
93 TAILQ_HEAD(pending_request_list, pending_request);
94
95 static struct {
96         struct pending_request_list requests;
97         pthread_mutex_t lock;
98 } pending_requests = {
99         .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
100         .lock = PTHREAD_MUTEX_INITIALIZER,
101         /**< used in async requests only */
102 };
103
104 /* forward declarations */
105 static int
106 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
107
108 /* for use with alarm callback */
109 static void
110 async_reply_handle(void *arg);
111
112 /* for use with process_msg */
113 static struct pending_request *
114 async_reply_handle_thread_unsafe(void *arg);
115
116 static void
117 trigger_async_action(struct pending_request *req);
118
119 static struct pending_request *
120 find_pending_request(const char *dst, const char *act_name)
121 {
122         struct pending_request *r;
123
124         TAILQ_FOREACH(r, &pending_requests.requests, next) {
125                 if (!strcmp(r->dst, dst) &&
126                     !strcmp(r->request->name, act_name))
127                         break;
128         }
129
130         return r;
131 }
132
133 static void
134 create_socket_path(const char *name, char *buf, int len)
135 {
136         const char *prefix = eal_mp_socket_path();
137
138         if (strlen(name) > 0)
139                 snprintf(buf, len, "%s_%s", prefix, name);
140         else
141                 strlcpy(buf, prefix, len);
142 }
143
144 int
145 rte_eal_primary_proc_alive(const char *config_file_path)
146 {
147         int config_fd;
148
149         if (config_file_path)
150                 config_fd = open(config_file_path, O_RDONLY);
151         else {
152                 const char *path;
153
154                 path = eal_runtime_config_path();
155                 config_fd = open(path, O_RDONLY);
156         }
157         if (config_fd < 0)
158                 return 0;
159
160         int ret = lockf(config_fd, F_TEST, 0);
161         close(config_fd);
162
163         return !!ret;
164 }
165
166 static struct action_entry *
167 find_action_entry_by_name(const char *name)
168 {
169         struct action_entry *entry;
170
171         TAILQ_FOREACH(entry, &action_entry_list, next) {
172                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
173                         break;
174         }
175
176         return entry;
177 }
178
179 static int
180 validate_action_name(const char *name)
181 {
182         if (name == NULL) {
183                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
184                 rte_errno = EINVAL;
185                 return -1;
186         }
187         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
188                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
189                 rte_errno = EINVAL;
190                 return -1;
191         }
192         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
193                 rte_errno = E2BIG;
194                 return -1;
195         }
196         return 0;
197 }
198
199 int __rte_experimental
200 rte_mp_action_register(const char *name, rte_mp_t action)
201 {
202         struct action_entry *entry;
203
204         if (validate_action_name(name))
205                 return -1;
206
207         entry = malloc(sizeof(struct action_entry));
208         if (entry == NULL) {
209                 rte_errno = ENOMEM;
210                 return -1;
211         }
212         strlcpy(entry->action_name, name, sizeof(entry->action_name));
213         entry->action = action;
214
215         pthread_mutex_lock(&mp_mutex_action);
216         if (find_action_entry_by_name(name) != NULL) {
217                 pthread_mutex_unlock(&mp_mutex_action);
218                 rte_errno = EEXIST;
219                 free(entry);
220                 return -1;
221         }
222         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
223         pthread_mutex_unlock(&mp_mutex_action);
224         return 0;
225 }
226
227 void __rte_experimental
228 rte_mp_action_unregister(const char *name)
229 {
230         struct action_entry *entry;
231
232         if (validate_action_name(name))
233                 return;
234
235         pthread_mutex_lock(&mp_mutex_action);
236         entry = find_action_entry_by_name(name);
237         if (entry == NULL) {
238                 pthread_mutex_unlock(&mp_mutex_action);
239                 return;
240         }
241         TAILQ_REMOVE(&action_entry_list, entry, next);
242         pthread_mutex_unlock(&mp_mutex_action);
243         free(entry);
244 }
245
246 static int
247 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
248 {
249         int msglen;
250         struct iovec iov;
251         struct msghdr msgh;
252         char control[CMSG_SPACE(sizeof(m->msg.fds))];
253         struct cmsghdr *cmsg;
254         int buflen = sizeof(*m) - sizeof(m->msg.fds);
255
256         memset(&msgh, 0, sizeof(msgh));
257         iov.iov_base = m;
258         iov.iov_len  = buflen;
259
260         msgh.msg_name = s;
261         msgh.msg_namelen = sizeof(*s);
262         msgh.msg_iov = &iov;
263         msgh.msg_iovlen = 1;
264         msgh.msg_control = control;
265         msgh.msg_controllen = sizeof(control);
266
267         msglen = recvmsg(mp_fd, &msgh, 0);
268         if (msglen < 0) {
269                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
270                 return -1;
271         }
272
273         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
274                 RTE_LOG(ERR, EAL, "truncted msg\n");
275                 return -1;
276         }
277
278         /* read auxiliary FDs if any */
279         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
280                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
281                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
282                         (cmsg->cmsg_type == SCM_RIGHTS)) {
283                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
284                         break;
285                 }
286         }
287
288         return 0;
289 }
290
291 static void
292 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
293 {
294         struct pending_request *pending_req;
295         struct action_entry *entry;
296         struct rte_mp_msg *msg = &m->msg;
297         rte_mp_t action = NULL;
298
299         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
300
301         if (m->type == MP_REP || m->type == MP_IGN) {
302                 struct pending_request *req = NULL;
303
304                 pthread_mutex_lock(&pending_requests.lock);
305                 pending_req = find_pending_request(s->sun_path, msg->name);
306                 if (pending_req) {
307                         memcpy(pending_req->reply, msg, sizeof(*msg));
308                         /* -1 indicates that we've been asked to ignore */
309                         pending_req->reply_received =
310                                 m->type == MP_REP ? 1 : -1;
311
312                         if (pending_req->type == REQUEST_TYPE_SYNC)
313                                 pthread_cond_signal(&pending_req->sync.cond);
314                         else if (pending_req->type == REQUEST_TYPE_ASYNC)
315                                 req = async_reply_handle_thread_unsafe(
316                                                 pending_req);
317                 } else
318                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
319                 pthread_mutex_unlock(&pending_requests.lock);
320
321                 if (req != NULL)
322                         trigger_async_action(req);
323                 return;
324         }
325
326         pthread_mutex_lock(&mp_mutex_action);
327         entry = find_action_entry_by_name(msg->name);
328         if (entry != NULL)
329                 action = entry->action;
330         pthread_mutex_unlock(&mp_mutex_action);
331
332         if (!action) {
333                 if (m->type == MP_REQ && !internal_config.init_complete) {
334                         /* if this is a request, and init is not yet complete,
335                          * and callback wasn't registered, we should tell the
336                          * requester to ignore our existence because we're not
337                          * yet ready to process this request.
338                          */
339                         struct rte_mp_msg dummy;
340
341                         memset(&dummy, 0, sizeof(dummy));
342                         strlcpy(dummy.name, msg->name, sizeof(dummy.name));
343                         mp_send(&dummy, s->sun_path, MP_IGN);
344                 } else {
345                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
346                                 msg->name);
347                 }
348         } else if (action(msg, s->sun_path) < 0) {
349                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
350         }
351 }
352
353 static void *
354 mp_handle(void *arg __rte_unused)
355 {
356         struct mp_msg_internal msg;
357         struct sockaddr_un sa;
358
359         while (1) {
360                 if (read_msg(&msg, &sa) == 0)
361                         process_msg(&msg, &sa);
362         }
363
364         return NULL;
365 }
366
367 static int
368 timespec_cmp(const struct timespec *a, const struct timespec *b)
369 {
370         if (a->tv_sec < b->tv_sec)
371                 return -1;
372         if (a->tv_sec > b->tv_sec)
373                 return 1;
374         if (a->tv_nsec < b->tv_nsec)
375                 return -1;
376         if (a->tv_nsec > b->tv_nsec)
377                 return 1;
378         return 0;
379 }
380
381 enum async_action {
382         ACTION_FREE, /**< free the action entry, but don't trigger callback */
383         ACTION_TRIGGER /**< trigger callback, then free action entry */
384 };
385
386 static enum async_action
387 process_async_request(struct pending_request *sr, const struct timespec *now)
388 {
389         struct async_request_param *param;
390         struct rte_mp_reply *reply;
391         bool timeout, last_msg;
392
393         param = sr->async.param;
394         reply = &param->user_reply;
395
396         /* did we timeout? */
397         timeout = timespec_cmp(&param->end, now) <= 0;
398
399         /* if we received a response, adjust relevant data and copy mesasge. */
400         if (sr->reply_received == 1 && sr->reply) {
401                 struct rte_mp_msg *msg, *user_msgs, *tmp;
402
403                 msg = sr->reply;
404                 user_msgs = reply->msgs;
405
406                 tmp = realloc(user_msgs, sizeof(*msg) *
407                                 (reply->nb_received + 1));
408                 if (!tmp) {
409                         RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
410                                 sr->dst, sr->request->name);
411                         /* this entry is going to be removed and its message
412                          * dropped, but we don't want to leak memory, so
413                          * continue.
414                          */
415                 } else {
416                         user_msgs = tmp;
417                         reply->msgs = user_msgs;
418                         memcpy(&user_msgs[reply->nb_received],
419                                         msg, sizeof(*msg));
420                         reply->nb_received++;
421                 }
422
423                 /* mark this request as processed */
424                 param->n_responses_processed++;
425         } else if (sr->reply_received == -1) {
426                 /* we were asked to ignore this process */
427                 reply->nb_sent--;
428         } else if (timeout) {
429                 /* count it as processed response, but don't increment
430                  * nb_received.
431                  */
432                 param->n_responses_processed++;
433         }
434
435         free(sr->reply);
436
437         last_msg = param->n_responses_processed == reply->nb_sent;
438
439         return last_msg ? ACTION_TRIGGER : ACTION_FREE;
440 }
441
442 static void
443 trigger_async_action(struct pending_request *sr)
444 {
445         struct async_request_param *param;
446         struct rte_mp_reply *reply;
447
448         param = sr->async.param;
449         reply = &param->user_reply;
450
451         param->clb(sr->request, reply);
452
453         /* clean up */
454         free(sr->async.param->user_reply.msgs);
455         free(sr->async.param);
456         free(sr->request);
457         free(sr);
458 }
459
460 static struct pending_request *
461 async_reply_handle_thread_unsafe(void *arg)
462 {
463         struct pending_request *req = (struct pending_request *)arg;
464         enum async_action action;
465         struct timespec ts_now;
466         struct timeval now;
467
468         if (gettimeofday(&now, NULL) < 0) {
469                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
470                 goto no_trigger;
471         }
472         ts_now.tv_nsec = now.tv_usec * 1000;
473         ts_now.tv_sec = now.tv_sec;
474
475         action = process_async_request(req, &ts_now);
476
477         TAILQ_REMOVE(&pending_requests.requests, req, next);
478
479         if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
480                 /* if we failed to cancel the alarm because it's already in
481                  * progress, don't proceed because otherwise we will end up
482                  * handling the same message twice.
483                  */
484                 if (rte_errno == EINPROGRESS) {
485                         RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
486                         goto no_trigger;
487                 }
488                 RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
489         }
490
491         if (action == ACTION_TRIGGER)
492                 return req;
493 no_trigger:
494         free(req);
495         return NULL;
496 }
497
498 static void
499 async_reply_handle(void *arg)
500 {
501         struct pending_request *req;
502
503         pthread_mutex_lock(&pending_requests.lock);
504         req = async_reply_handle_thread_unsafe(arg);
505         pthread_mutex_unlock(&pending_requests.lock);
506
507         if (req != NULL)
508                 trigger_async_action(req);
509 }
510
511 static int
512 open_socket_fd(void)
513 {
514         char peer_name[PATH_MAX] = {0};
515         struct sockaddr_un un;
516
517         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
518                 snprintf(peer_name, sizeof(peer_name),
519                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
520
521         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
522         if (mp_fd < 0) {
523                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
524                 return -1;
525         }
526
527         memset(&un, 0, sizeof(un));
528         un.sun_family = AF_UNIX;
529
530         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
531
532         unlink(un.sun_path); /* May still exist since last run */
533
534         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
535                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
536                         un.sun_path, strerror(errno));
537                 close(mp_fd);
538                 return -1;
539         }
540
541         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
542         return mp_fd;
543 }
544
545 int
546 rte_mp_channel_init(void)
547 {
548         char path[PATH_MAX];
549         int dir_fd;
550         pthread_t mp_handle_tid;
551
552         /* in no shared files mode, we do not have secondary processes support,
553          * so no need to initialize IPC.
554          */
555         if (internal_config.no_shconf) {
556                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC will be disabled\n");
557                 return 0;
558         }
559
560         /* create filter path */
561         create_socket_path("*", path, sizeof(path));
562         strlcpy(mp_filter, basename(path), sizeof(mp_filter));
563
564         /* path may have been modified, so recreate it */
565         create_socket_path("*", path, sizeof(path));
566         strlcpy(mp_dir_path, dirname(path), sizeof(mp_dir_path));
567
568         /* lock the directory */
569         dir_fd = open(mp_dir_path, O_RDONLY);
570         if (dir_fd < 0) {
571                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
572                         mp_dir_path, strerror(errno));
573                 return -1;
574         }
575
576         if (flock(dir_fd, LOCK_EX)) {
577                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
578                         mp_dir_path, strerror(errno));
579                 close(dir_fd);
580                 return -1;
581         }
582
583         if (open_socket_fd() < 0) {
584                 close(dir_fd);
585                 return -1;
586         }
587
588         if (rte_ctrl_thread_create(&mp_handle_tid, "rte_mp_handle",
589                         NULL, mp_handle, NULL) < 0) {
590                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
591                         strerror(errno));
592                 close(mp_fd);
593                 close(dir_fd);
594                 mp_fd = -1;
595                 return -1;
596         }
597
598         /* unlock the directory */
599         flock(dir_fd, LOCK_UN);
600         close(dir_fd);
601
602         return 0;
603 }
604
605 /**
606  * Return -1, as fail to send message and it's caused by the local side.
607  * Return 0, as fail to send message and it's caused by the remote side.
608  * Return 1, as succeed to send message.
609  *
610  */
611 static int
612 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
613 {
614         int snd;
615         struct iovec iov;
616         struct msghdr msgh;
617         struct cmsghdr *cmsg;
618         struct sockaddr_un dst;
619         struct mp_msg_internal m;
620         int fd_size = msg->num_fds * sizeof(int);
621         char control[CMSG_SPACE(fd_size)];
622
623         m.type = type;
624         memcpy(&m.msg, msg, sizeof(*msg));
625
626         memset(&dst, 0, sizeof(dst));
627         dst.sun_family = AF_UNIX;
628         strlcpy(dst.sun_path, dst_path, sizeof(dst.sun_path));
629
630         memset(&msgh, 0, sizeof(msgh));
631         memset(control, 0, sizeof(control));
632
633         iov.iov_base = &m;
634         iov.iov_len = sizeof(m) - sizeof(msg->fds);
635
636         msgh.msg_name = &dst;
637         msgh.msg_namelen = sizeof(dst);
638         msgh.msg_iov = &iov;
639         msgh.msg_iovlen = 1;
640         msgh.msg_control = control;
641         msgh.msg_controllen = sizeof(control);
642
643         cmsg = CMSG_FIRSTHDR(&msgh);
644         cmsg->cmsg_len = CMSG_LEN(fd_size);
645         cmsg->cmsg_level = SOL_SOCKET;
646         cmsg->cmsg_type = SCM_RIGHTS;
647         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
648
649         do {
650                 snd = sendmsg(mp_fd, &msgh, 0);
651         } while (snd < 0 && errno == EINTR);
652
653         if (snd < 0) {
654                 rte_errno = errno;
655                 /* Check if it caused by peer process exits */
656                 if (errno == ECONNREFUSED &&
657                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
658                         unlink(dst_path);
659                         return 0;
660                 }
661                 if (errno == ENOBUFS) {
662                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
663                                 dst_path);
664                         return 0;
665                 }
666                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
667                         dst_path, strerror(errno));
668                 return -1;
669         }
670
671         return 1;
672 }
673
674 static int
675 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
676 {
677         int dir_fd, ret = 0;
678         DIR *mp_dir;
679         struct dirent *ent;
680
681         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
682                 peer = eal_mp_socket_path();
683
684         if (peer) {
685                 if (send_msg(peer, msg, type) < 0)
686                         return -1;
687                 else
688                         return 0;
689         }
690
691         /* broadcast to all secondary processes */
692         mp_dir = opendir(mp_dir_path);
693         if (!mp_dir) {
694                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
695                                 mp_dir_path);
696                 rte_errno = errno;
697                 return -1;
698         }
699
700         dir_fd = dirfd(mp_dir);
701         /* lock the directory to prevent processes spinning up while we send */
702         if (flock(dir_fd, LOCK_SH)) {
703                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
704                         mp_dir_path);
705                 rte_errno = errno;
706                 closedir(mp_dir);
707                 return -1;
708         }
709
710         while ((ent = readdir(mp_dir))) {
711                 char path[PATH_MAX];
712
713                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
714                         continue;
715
716                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
717                          ent->d_name);
718                 if (send_msg(path, msg, type) < 0)
719                         ret = -1;
720         }
721         /* unlock the dir */
722         flock(dir_fd, LOCK_UN);
723
724         /* dir_fd automatically closed on closedir */
725         closedir(mp_dir);
726         return ret;
727 }
728
729 static bool
730 check_input(const struct rte_mp_msg *msg)
731 {
732         if (msg == NULL) {
733                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
734                 rte_errno = EINVAL;
735                 return false;
736         }
737
738         if (validate_action_name(msg->name))
739                 return false;
740
741         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
742                 RTE_LOG(ERR, EAL, "Message data is too long\n");
743                 rte_errno = E2BIG;
744                 return false;
745         }
746
747         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
748                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
749                         RTE_MP_MAX_FD_NUM);
750                 rte_errno = E2BIG;
751                 return false;
752         }
753
754         return true;
755 }
756
757 int __rte_experimental
758 rte_mp_sendmsg(struct rte_mp_msg *msg)
759 {
760         if (!check_input(msg))
761                 return -1;
762
763         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
764         return mp_send(msg, NULL, MP_MSG);
765 }
766
767 static int
768 mp_request_async(const char *dst, struct rte_mp_msg *req,
769                 struct async_request_param *param, const struct timespec *ts)
770 {
771         struct rte_mp_msg *reply_msg;
772         struct pending_request *pending_req, *exist;
773         int ret = -1;
774
775         pending_req = calloc(1, sizeof(*pending_req));
776         reply_msg = calloc(1, sizeof(*reply_msg));
777         if (pending_req == NULL || reply_msg == NULL) {
778                 RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
779                 rte_errno = ENOMEM;
780                 ret = -1;
781                 goto fail;
782         }
783
784         pending_req->type = REQUEST_TYPE_ASYNC;
785         strlcpy(pending_req->dst, dst, sizeof(pending_req->dst));
786         pending_req->request = req;
787         pending_req->reply = reply_msg;
788         pending_req->async.param = param;
789
790         /* queue already locked by caller */
791
792         exist = find_pending_request(dst, req->name);
793         if (exist) {
794                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
795                 rte_errno = EEXIST;
796                 ret = -1;
797                 goto fail;
798         }
799
800         ret = send_msg(dst, req, MP_REQ);
801         if (ret < 0) {
802                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
803                         dst, req->name);
804                 ret = -1;
805                 goto fail;
806         } else if (ret == 0) {
807                 ret = 0;
808                 goto fail;
809         }
810         param->user_reply.nb_sent++;
811
812         /* if alarm set fails, we simply ignore the reply */
813         if (rte_eal_alarm_set(ts->tv_sec * 1000000 + ts->tv_nsec / 1000,
814                               async_reply_handle, pending_req) < 0) {
815                 RTE_LOG(ERR, EAL, "Fail to set alarm for request %s:%s\n",
816                         dst, req->name);
817                 ret = -1;
818                 goto fail;
819         }
820         TAILQ_INSERT_TAIL(&pending_requests.requests, pending_req, next);
821
822         return 0;
823 fail:
824         free(pending_req);
825         free(reply_msg);
826         return ret;
827 }
828
829 static int
830 mp_request_sync(const char *dst, struct rte_mp_msg *req,
831                struct rte_mp_reply *reply, const struct timespec *ts)
832 {
833         int ret;
834         struct rte_mp_msg msg, *tmp;
835         struct pending_request pending_req, *exist;
836
837         pending_req.type = REQUEST_TYPE_SYNC;
838         pending_req.reply_received = 0;
839         strlcpy(pending_req.dst, dst, sizeof(pending_req.dst));
840         pending_req.request = req;
841         pending_req.reply = &msg;
842         pthread_cond_init(&pending_req.sync.cond, NULL);
843
844         exist = find_pending_request(dst, req->name);
845         if (exist) {
846                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
847                 rte_errno = EEXIST;
848                 return -1;
849         }
850
851         ret = send_msg(dst, req, MP_REQ);
852         if (ret < 0) {
853                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
854                         dst, req->name);
855                 return -1;
856         } else if (ret == 0)
857                 return 0;
858
859         TAILQ_INSERT_TAIL(&pending_requests.requests, &pending_req, next);
860
861         reply->nb_sent++;
862
863         do {
864                 ret = pthread_cond_timedwait(&pending_req.sync.cond,
865                                 &pending_requests.lock, ts);
866         } while (ret != 0 && ret != ETIMEDOUT);
867
868         TAILQ_REMOVE(&pending_requests.requests, &pending_req, next);
869
870         if (pending_req.reply_received == 0) {
871                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
872                         dst, req->name);
873                 rte_errno = ETIMEDOUT;
874                 return -1;
875         }
876         if (pending_req.reply_received == -1) {
877                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
878                 /* not receiving this message is not an error, so decrement
879                  * number of sent messages
880                  */
881                 reply->nb_sent--;
882                 return 0;
883         }
884
885         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
886         if (!tmp) {
887                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
888                         dst, req->name);
889                 rte_errno = ENOMEM;
890                 return -1;
891         }
892         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
893         reply->msgs = tmp;
894         reply->nb_received++;
895         return 0;
896 }
897
898 int __rte_experimental
899 rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply *reply,
900                 const struct timespec *ts)
901 {
902         int dir_fd, ret = 0;
903         DIR *mp_dir;
904         struct dirent *ent;
905         struct timeval now;
906         struct timespec end;
907
908         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
909
910         if (check_input(req) == false)
911                 return -1;
912
913         reply->nb_sent = 0;
914         reply->nb_received = 0;
915         reply->msgs = NULL;
916
917         if (internal_config.no_shconf) {
918                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
919                 return 0;
920         }
921
922         if (gettimeofday(&now, NULL) < 0) {
923                 RTE_LOG(ERR, EAL, "Failed to get current time\n");
924                 rte_errno = errno;
925                 return -1;
926         }
927
928         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
929         end.tv_sec = now.tv_sec + ts->tv_sec +
930                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
931
932         /* for secondary process, send request to the primary process only */
933         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
934                 pthread_mutex_lock(&pending_requests.lock);
935                 ret = mp_request_sync(eal_mp_socket_path(), req, reply, &end);
936                 pthread_mutex_unlock(&pending_requests.lock);
937                 return ret;
938         }
939
940         /* for primary process, broadcast request, and collect reply 1 by 1 */
941         mp_dir = opendir(mp_dir_path);
942         if (!mp_dir) {
943                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
944                 rte_errno = errno;
945                 return -1;
946         }
947
948         dir_fd = dirfd(mp_dir);
949         /* lock the directory to prevent processes spinning up while we send */
950         if (flock(dir_fd, LOCK_SH)) {
951                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
952                         mp_dir_path);
953                 closedir(mp_dir);
954                 rte_errno = errno;
955                 return -1;
956         }
957
958         pthread_mutex_lock(&pending_requests.lock);
959         while ((ent = readdir(mp_dir))) {
960                 char path[PATH_MAX];
961
962                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
963                         continue;
964
965                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
966                          ent->d_name);
967
968                 /* unlocks the mutex while waiting for response,
969                  * locks on receive
970                  */
971                 if (mp_request_sync(path, req, reply, &end))
972                         ret = -1;
973         }
974         pthread_mutex_unlock(&pending_requests.lock);
975         /* unlock the directory */
976         flock(dir_fd, LOCK_UN);
977
978         /* dir_fd automatically closed on closedir */
979         closedir(mp_dir);
980         return ret;
981 }
982
983 int __rte_experimental
984 rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
985                 rte_mp_async_reply_t clb)
986 {
987         struct rte_mp_msg *copy;
988         struct pending_request *dummy;
989         struct async_request_param *param;
990         struct rte_mp_reply *reply;
991         int dir_fd, ret = 0;
992         DIR *mp_dir;
993         struct dirent *ent;
994         struct timeval now;
995         struct timespec *end;
996         bool dummy_used = false;
997
998         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
999
1000         if (check_input(req) == false)
1001                 return -1;
1002
1003         if (internal_config.no_shconf) {
1004                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1005                 return 0;
1006         }
1007
1008         if (gettimeofday(&now, NULL) < 0) {
1009                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
1010                 rte_errno = errno;
1011                 return -1;
1012         }
1013         copy = calloc(1, sizeof(*copy));
1014         dummy = calloc(1, sizeof(*dummy));
1015         param = calloc(1, sizeof(*param));
1016         if (copy == NULL || dummy == NULL || param == NULL) {
1017                 RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
1018                 rte_errno = ENOMEM;
1019                 goto fail;
1020         }
1021
1022         /* copy message */
1023         memcpy(copy, req, sizeof(*copy));
1024
1025         param->n_responses_processed = 0;
1026         param->clb = clb;
1027         end = &param->end;
1028         reply = &param->user_reply;
1029
1030         end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
1031         end->tv_sec = now.tv_sec + ts->tv_sec +
1032                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
1033         reply->nb_sent = 0;
1034         reply->nb_received = 0;
1035         reply->msgs = NULL;
1036
1037         /* we have to lock the request queue here, as we will be adding a bunch
1038          * of requests to the queue at once, and some of the replies may arrive
1039          * before we add all of the requests to the queue.
1040          */
1041         pthread_mutex_lock(&pending_requests.lock);
1042
1043         /* we have to ensure that callback gets triggered even if we don't send
1044          * anything, therefore earlier we have allocated a dummy request. fill
1045          * it, and put it on the queue if we don't send any requests.
1046          */
1047         dummy->type = REQUEST_TYPE_ASYNC;
1048         dummy->request = copy;
1049         dummy->reply = NULL;
1050         dummy->async.param = param;
1051         dummy->reply_received = 1; /* short-circuit the timeout */
1052
1053         /* for secondary process, send request to the primary process only */
1054         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
1055                 ret = mp_request_async(eal_mp_socket_path(), copy, param, ts);
1056
1057                 /* if we didn't send anything, put dummy request on the queue */
1058                 if (ret == 0 && reply->nb_sent == 0) {
1059                         TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
1060                                         next);
1061                         dummy_used = true;
1062                 }
1063
1064                 pthread_mutex_unlock(&pending_requests.lock);
1065
1066                 /* if we couldn't send anything, clean up */
1067                 if (ret != 0)
1068                         goto fail;
1069                 return 0;
1070         }
1071
1072         /* for primary process, broadcast request */
1073         mp_dir = opendir(mp_dir_path);
1074         if (!mp_dir) {
1075                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
1076                 rte_errno = errno;
1077                 goto unlock_fail;
1078         }
1079         dir_fd = dirfd(mp_dir);
1080
1081         /* lock the directory to prevent processes spinning up while we send */
1082         if (flock(dir_fd, LOCK_SH)) {
1083                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
1084                         mp_dir_path);
1085                 rte_errno = errno;
1086                 goto closedir_fail;
1087         }
1088
1089         while ((ent = readdir(mp_dir))) {
1090                 char path[PATH_MAX];
1091
1092                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
1093                         continue;
1094
1095                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
1096                          ent->d_name);
1097
1098                 if (mp_request_async(path, copy, param, ts))
1099                         ret = -1;
1100         }
1101         /* if we didn't send anything, put dummy request on the queue */
1102         if (ret == 0 && reply->nb_sent == 0) {
1103                 TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
1104                 dummy_used = true;
1105         }
1106
1107         /* finally, unlock the queue */
1108         pthread_mutex_unlock(&pending_requests.lock);
1109
1110         /* unlock the directory */
1111         flock(dir_fd, LOCK_UN);
1112
1113         /* dir_fd automatically closed on closedir */
1114         closedir(mp_dir);
1115
1116         /* if dummy was unused, free it */
1117         if (!dummy_used)
1118                 free(dummy);
1119
1120         return ret;
1121 closedir_fail:
1122         closedir(mp_dir);
1123 unlock_fail:
1124         pthread_mutex_unlock(&pending_requests.lock);
1125 fail:
1126         free(dummy);
1127         free(param);
1128         free(copy);
1129         return -1;
1130 }
1131
1132 int __rte_experimental
1133 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
1134 {
1135         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
1136
1137         if (check_input(msg) == false)
1138                 return -1;
1139
1140         if (peer == NULL) {
1141                 RTE_LOG(ERR, EAL, "peer is not specified\n");
1142                 rte_errno = EINVAL;
1143                 return -1;
1144         }
1145
1146         if (internal_config.no_shconf) {
1147                 RTE_LOG(DEBUG, EAL, "No shared files mode enabled, IPC is disabled\n");
1148                 return 0;
1149         }
1150
1151         return mp_send(msg, peer, MP_REP);
1152 }