eal: simplify IPC sync request timeout
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_common.h>
24 #include <rte_cycles.h>
25 #include <rte_eal.h>
26 #include <rte_errno.h>
27 #include <rte_lcore.h>
28 #include <rte_log.h>
29
30 #include "eal_private.h"
31 #include "eal_filesystem.h"
32 #include "eal_internal_cfg.h"
33
34 static int mp_fd = -1;
35 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
36 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
37 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
38
39 struct action_entry {
40         TAILQ_ENTRY(action_entry) next;
41         char action_name[RTE_MP_MAX_NAME_LEN];
42         rte_mp_t action;
43 };
44
45 /** Double linked list of actions. */
46 TAILQ_HEAD(action_entry_list, action_entry);
47
48 static struct action_entry_list action_entry_list =
49         TAILQ_HEAD_INITIALIZER(action_entry_list);
50
51 enum mp_type {
52         MP_MSG, /* Share message with peers, will not block */
53         MP_REQ, /* Request for information, Will block for a reply */
54         MP_REP, /* Response to previously-received request */
55 };
56
57 struct mp_msg_internal {
58         int type;
59         struct rte_mp_msg msg;
60 };
61
62 struct sync_request {
63         TAILQ_ENTRY(sync_request) next;
64         int reply_received;
65         char dst[PATH_MAX];
66         struct rte_mp_msg *request;
67         struct rte_mp_msg *reply;
68         pthread_cond_t cond;
69 };
70
71 TAILQ_HEAD(sync_request_list, sync_request);
72
73 static struct {
74         struct sync_request_list requests;
75         pthread_mutex_t lock;
76 } sync_requests = {
77         .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
78         .lock = PTHREAD_MUTEX_INITIALIZER
79 };
80
81 static struct sync_request *
82 find_sync_request(const char *dst, const char *act_name)
83 {
84         struct sync_request *r;
85
86         TAILQ_FOREACH(r, &sync_requests.requests, next) {
87                 if (!strcmp(r->dst, dst) &&
88                     !strcmp(r->request->name, act_name))
89                         break;
90         }
91
92         return r;
93 }
94
95 static void
96 create_socket_path(const char *name, char *buf, int len)
97 {
98         const char *prefix = eal_mp_socket_path();
99
100         if (strlen(name) > 0)
101                 snprintf(buf, len, "%s_%s", prefix, name);
102         else
103                 snprintf(buf, len, "%s", prefix);
104 }
105
106 int
107 rte_eal_primary_proc_alive(const char *config_file_path)
108 {
109         int config_fd;
110
111         if (config_file_path)
112                 config_fd = open(config_file_path, O_RDONLY);
113         else {
114                 const char *path;
115
116                 path = eal_runtime_config_path();
117                 config_fd = open(path, O_RDONLY);
118         }
119         if (config_fd < 0)
120                 return 0;
121
122         int ret = lockf(config_fd, F_TEST, 0);
123         close(config_fd);
124
125         return !!ret;
126 }
127
128 static struct action_entry *
129 find_action_entry_by_name(const char *name)
130 {
131         struct action_entry *entry;
132
133         TAILQ_FOREACH(entry, &action_entry_list, next) {
134                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
135                         break;
136         }
137
138         return entry;
139 }
140
141 static int
142 validate_action_name(const char *name)
143 {
144         if (name == NULL) {
145                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
146                 rte_errno = EINVAL;
147                 return -1;
148         }
149         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
150                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
151                 rte_errno = EINVAL;
152                 return -1;
153         }
154         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
155                 rte_errno = E2BIG;
156                 return -1;
157         }
158         return 0;
159 }
160
161 int __rte_experimental
162 rte_mp_action_register(const char *name, rte_mp_t action)
163 {
164         struct action_entry *entry;
165
166         if (validate_action_name(name))
167                 return -1;
168
169         entry = malloc(sizeof(struct action_entry));
170         if (entry == NULL) {
171                 rte_errno = ENOMEM;
172                 return -1;
173         }
174         strcpy(entry->action_name, name);
175         entry->action = action;
176
177         pthread_mutex_lock(&mp_mutex_action);
178         if (find_action_entry_by_name(name) != NULL) {
179                 pthread_mutex_unlock(&mp_mutex_action);
180                 rte_errno = EEXIST;
181                 free(entry);
182                 return -1;
183         }
184         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
185         pthread_mutex_unlock(&mp_mutex_action);
186         return 0;
187 }
188
189 void __rte_experimental
190 rte_mp_action_unregister(const char *name)
191 {
192         struct action_entry *entry;
193
194         if (validate_action_name(name))
195                 return;
196
197         pthread_mutex_lock(&mp_mutex_action);
198         entry = find_action_entry_by_name(name);
199         if (entry == NULL) {
200                 pthread_mutex_unlock(&mp_mutex_action);
201                 return;
202         }
203         TAILQ_REMOVE(&action_entry_list, entry, next);
204         pthread_mutex_unlock(&mp_mutex_action);
205         free(entry);
206 }
207
208 static int
209 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
210 {
211         int msglen;
212         struct iovec iov;
213         struct msghdr msgh;
214         char control[CMSG_SPACE(sizeof(m->msg.fds))];
215         struct cmsghdr *cmsg;
216         int buflen = sizeof(*m) - sizeof(m->msg.fds);
217
218         memset(&msgh, 0, sizeof(msgh));
219         iov.iov_base = m;
220         iov.iov_len  = buflen;
221
222         msgh.msg_name = s;
223         msgh.msg_namelen = sizeof(*s);
224         msgh.msg_iov = &iov;
225         msgh.msg_iovlen = 1;
226         msgh.msg_control = control;
227         msgh.msg_controllen = sizeof(control);
228
229         msglen = recvmsg(mp_fd, &msgh, 0);
230         if (msglen < 0) {
231                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
232                 return -1;
233         }
234
235         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
236                 RTE_LOG(ERR, EAL, "truncted msg\n");
237                 return -1;
238         }
239
240         /* read auxiliary FDs if any */
241         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
242                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
243                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
244                         (cmsg->cmsg_type == SCM_RIGHTS)) {
245                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
246                         break;
247                 }
248         }
249
250         return 0;
251 }
252
253 static void
254 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
255 {
256         struct sync_request *sync_req;
257         struct action_entry *entry;
258         struct rte_mp_msg *msg = &m->msg;
259         rte_mp_t action = NULL;
260
261         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
262
263         if (m->type == MP_REP) {
264                 pthread_mutex_lock(&sync_requests.lock);
265                 sync_req = find_sync_request(s->sun_path, msg->name);
266                 if (sync_req) {
267                         memcpy(sync_req->reply, msg, sizeof(*msg));
268                         sync_req->reply_received = 1;
269                         pthread_cond_signal(&sync_req->cond);
270                 } else
271                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
272                 pthread_mutex_unlock(&sync_requests.lock);
273                 return;
274         }
275
276         pthread_mutex_lock(&mp_mutex_action);
277         entry = find_action_entry_by_name(msg->name);
278         if (entry != NULL)
279                 action = entry->action;
280         pthread_mutex_unlock(&mp_mutex_action);
281
282         if (!action)
283                 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name);
284         else if (action(msg, s->sun_path) < 0)
285                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
286 }
287
288 static void *
289 mp_handle(void *arg __rte_unused)
290 {
291         struct mp_msg_internal msg;
292         struct sockaddr_un sa;
293
294         while (1) {
295                 if (read_msg(&msg, &sa) == 0)
296                         process_msg(&msg, &sa);
297         }
298
299         return NULL;
300 }
301
302 static int
303 open_socket_fd(void)
304 {
305         char peer_name[PATH_MAX] = {0};
306         struct sockaddr_un un;
307
308         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
309                 snprintf(peer_name, sizeof(peer_name),
310                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
311
312         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
313         if (mp_fd < 0) {
314                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
315                 return -1;
316         }
317
318         memset(&un, 0, sizeof(un));
319         un.sun_family = AF_UNIX;
320
321         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
322
323         unlink(un.sun_path); /* May still exist since last run */
324
325         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
326                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
327                         un.sun_path, strerror(errno));
328                 close(mp_fd);
329                 return -1;
330         }
331
332         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
333         return mp_fd;
334 }
335
336 static int
337 unlink_sockets(const char *filter)
338 {
339         int dir_fd;
340         DIR *mp_dir;
341         struct dirent *ent;
342
343         mp_dir = opendir(mp_dir_path);
344         if (!mp_dir) {
345                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
346                 return -1;
347         }
348         dir_fd = dirfd(mp_dir);
349
350         while ((ent = readdir(mp_dir))) {
351                 if (fnmatch(filter, ent->d_name, 0) == 0)
352                         unlinkat(dir_fd, ent->d_name, 0);
353         }
354
355         closedir(mp_dir);
356         return 0;
357 }
358
359 int
360 rte_mp_channel_init(void)
361 {
362         char thread_name[RTE_MAX_THREAD_NAME_LEN];
363         char path[PATH_MAX];
364         int dir_fd;
365         pthread_t tid;
366
367         /* create filter path */
368         create_socket_path("*", path, sizeof(path));
369         snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path));
370
371         /* path may have been modified, so recreate it */
372         create_socket_path("*", path, sizeof(path));
373         snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path));
374
375         /* lock the directory */
376         dir_fd = open(mp_dir_path, O_RDONLY);
377         if (dir_fd < 0) {
378                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
379                         mp_dir_path, strerror(errno));
380                 return -1;
381         }
382
383         if (flock(dir_fd, LOCK_EX)) {
384                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
385                         mp_dir_path, strerror(errno));
386                 close(dir_fd);
387                 return -1;
388         }
389
390         if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
391                         unlink_sockets(mp_filter)) {
392                 RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
393                 close(dir_fd);
394                 return -1;
395         }
396
397         if (open_socket_fd() < 0) {
398                 close(dir_fd);
399                 return -1;
400         }
401
402         if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
403                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
404                         strerror(errno));
405                 close(mp_fd);
406                 close(dir_fd);
407                 mp_fd = -1;
408                 return -1;
409         }
410
411         /* try best to set thread name */
412         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
413         rte_thread_setname(tid, thread_name);
414
415         /* unlock the directory */
416         flock(dir_fd, LOCK_UN);
417         close(dir_fd);
418
419         return 0;
420 }
421
422 /**
423  * Return -1, as fail to send message and it's caused by the local side.
424  * Return 0, as fail to send message and it's caused by the remote side.
425  * Return 1, as succeed to send message.
426  *
427  */
428 static int
429 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
430 {
431         int snd;
432         struct iovec iov;
433         struct msghdr msgh;
434         struct cmsghdr *cmsg;
435         struct sockaddr_un dst;
436         struct mp_msg_internal m;
437         int fd_size = msg->num_fds * sizeof(int);
438         char control[CMSG_SPACE(fd_size)];
439
440         m.type = type;
441         memcpy(&m.msg, msg, sizeof(*msg));
442
443         memset(&dst, 0, sizeof(dst));
444         dst.sun_family = AF_UNIX;
445         snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path);
446
447         memset(&msgh, 0, sizeof(msgh));
448         memset(control, 0, sizeof(control));
449
450         iov.iov_base = &m;
451         iov.iov_len = sizeof(m) - sizeof(msg->fds);
452
453         msgh.msg_name = &dst;
454         msgh.msg_namelen = sizeof(dst);
455         msgh.msg_iov = &iov;
456         msgh.msg_iovlen = 1;
457         msgh.msg_control = control;
458         msgh.msg_controllen = sizeof(control);
459
460         cmsg = CMSG_FIRSTHDR(&msgh);
461         cmsg->cmsg_len = CMSG_LEN(fd_size);
462         cmsg->cmsg_level = SOL_SOCKET;
463         cmsg->cmsg_type = SCM_RIGHTS;
464         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
465
466         do {
467                 snd = sendmsg(mp_fd, &msgh, 0);
468         } while (snd < 0 && errno == EINTR);
469
470         if (snd < 0) {
471                 rte_errno = errno;
472                 /* Check if it caused by peer process exits */
473                 if (errno == ECONNREFUSED &&
474                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
475                         unlink(dst_path);
476                         return 0;
477                 }
478                 if (errno == ENOBUFS) {
479                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
480                                 dst_path);
481                         return 0;
482                 }
483                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
484                         dst_path, strerror(errno));
485                 return -1;
486         }
487
488         return 1;
489 }
490
491 static int
492 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
493 {
494         int dir_fd, ret = 0;
495         DIR *mp_dir;
496         struct dirent *ent;
497
498         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
499                 peer = eal_mp_socket_path();
500
501         if (peer) {
502                 if (send_msg(peer, msg, type) < 0)
503                         return -1;
504                 else
505                         return 0;
506         }
507
508         /* broadcast to all secondary processes */
509         mp_dir = opendir(mp_dir_path);
510         if (!mp_dir) {
511                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
512                                 mp_dir_path);
513                 rte_errno = errno;
514                 return -1;
515         }
516
517         dir_fd = dirfd(mp_dir);
518         /* lock the directory to prevent processes spinning up while we send */
519         if (flock(dir_fd, LOCK_EX)) {
520                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
521                         mp_dir_path);
522                 rte_errno = errno;
523                 closedir(mp_dir);
524                 return -1;
525         }
526
527         while ((ent = readdir(mp_dir))) {
528                 char path[PATH_MAX];
529
530                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
531                         continue;
532
533                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
534                          ent->d_name);
535                 if (send_msg(path, msg, type) < 0)
536                         ret = -1;
537         }
538         /* unlock the dir */
539         flock(dir_fd, LOCK_UN);
540
541         /* dir_fd automatically closed on closedir */
542         closedir(mp_dir);
543         return ret;
544 }
545
546 static bool
547 check_input(const struct rte_mp_msg *msg)
548 {
549         if (msg == NULL) {
550                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
551                 rte_errno = EINVAL;
552                 return false;
553         }
554
555         if (validate_action_name(msg->name))
556                 return false;
557
558         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
559                 RTE_LOG(ERR, EAL, "Message data is too long\n");
560                 rte_errno = E2BIG;
561                 return false;
562         }
563
564         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
565                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
566                         RTE_MP_MAX_FD_NUM);
567                 rte_errno = E2BIG;
568                 return false;
569         }
570
571         return true;
572 }
573
574 int __rte_experimental
575 rte_mp_sendmsg(struct rte_mp_msg *msg)
576 {
577         if (!check_input(msg))
578                 return -1;
579
580         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
581         return mp_send(msg, NULL, MP_MSG);
582 }
583
584 static int
585 mp_request_one(const char *dst, struct rte_mp_msg *req,
586                struct rte_mp_reply *reply, const struct timespec *ts)
587 {
588         int ret;
589         struct rte_mp_msg msg, *tmp;
590         struct sync_request sync_req, *exist;
591
592         sync_req.reply_received = 0;
593         strcpy(sync_req.dst, dst);
594         sync_req.request = req;
595         sync_req.reply = &msg;
596         pthread_cond_init(&sync_req.cond, NULL);
597
598         pthread_mutex_lock(&sync_requests.lock);
599         exist = find_sync_request(dst, req->name);
600         if (!exist)
601                 TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next);
602         if (exist) {
603                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
604                 rte_errno = EEXIST;
605                 pthread_mutex_unlock(&sync_requests.lock);
606                 return -1;
607         }
608
609         ret = send_msg(dst, req, MP_REQ);
610         if (ret < 0) {
611                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
612                         dst, req->name);
613                 return -1;
614         } else if (ret == 0)
615                 return 0;
616
617         reply->nb_sent++;
618
619         do {
620                 ret = pthread_cond_timedwait(&sync_req.cond,
621                                 &sync_requests.lock, ts);
622         } while (ret != 0 && ret != ETIMEDOUT);
623
624         /* We got the lock now */
625         TAILQ_REMOVE(&sync_requests.requests, &sync_req, next);
626         pthread_mutex_unlock(&sync_requests.lock);
627
628         if (sync_req.reply_received == 0) {
629                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
630                         dst, req->name);
631                 rte_errno = ETIMEDOUT;
632                 return -1;
633         }
634
635         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
636         if (!tmp) {
637                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
638                         dst, req->name);
639                 rte_errno = ENOMEM;
640                 return -1;
641         }
642         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
643         reply->msgs = tmp;
644         reply->nb_received++;
645         return 0;
646 }
647
648 int __rte_experimental
649 rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
650                 const struct timespec *ts)
651 {
652         int dir_fd, ret = 0;
653         DIR *mp_dir;
654         struct dirent *ent;
655         struct timeval now;
656         struct timespec end;
657
658         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
659
660         if (check_input(req) == false)
661                 return -1;
662         if (gettimeofday(&now, NULL) < 0) {
663                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
664                 rte_errno = errno;
665                 return -1;
666         }
667
668         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
669         end.tv_sec = now.tv_sec + ts->tv_sec +
670                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
671
672         reply->nb_sent = 0;
673         reply->nb_received = 0;
674         reply->msgs = NULL;
675
676         /* for secondary process, send request to the primary process only */
677         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
678                 return mp_request_one(eal_mp_socket_path(), req, reply, &end);
679
680         /* for primary process, broadcast request, and collect reply 1 by 1 */
681         mp_dir = opendir(mp_dir_path);
682         if (!mp_dir) {
683                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
684                 rte_errno = errno;
685                 return -1;
686         }
687
688         dir_fd = dirfd(mp_dir);
689         /* lock the directory to prevent processes spinning up while we send */
690         if (flock(dir_fd, LOCK_EX)) {
691                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
692                         mp_dir_path);
693                 closedir(mp_dir);
694                 rte_errno = errno;
695                 return -1;
696         }
697
698         while ((ent = readdir(mp_dir))) {
699                 char path[PATH_MAX];
700
701                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
702                         continue;
703
704                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
705                          ent->d_name);
706
707                 if (mp_request_one(path, req, reply, &end))
708                         ret = -1;
709         }
710         /* unlock the directory */
711         flock(dir_fd, LOCK_UN);
712
713         /* dir_fd automatically closed on closedir */
714         closedir(mp_dir);
715         return ret;
716 }
717
718 int __rte_experimental
719 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
720 {
721
722         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
723
724         if (check_input(msg) == false)
725                 return -1;
726
727         if (peer == NULL) {
728                 RTE_LOG(ERR, EAL, "peer is not specified\n");
729                 rte_errno = EINVAL;
730                 return -1;
731         }
732
733         return mp_send(msg, peer, MP_REP);
734 }