eal: add channel for multi-process communication
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <sys/un.h>
19 #include <unistd.h>
20
21 #include <rte_common.h>
22 #include <rte_cycles.h>
23 #include <rte_eal.h>
24 #include <rte_errno.h>
25 #include <rte_lcore.h>
26 #include <rte_log.h>
27
28 #include "eal_private.h"
29 #include "eal_filesystem.h"
30 #include "eal_internal_cfg.h"
31
32 static int mp_fd = -1;
33 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
34 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
35 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
36
37 struct action_entry {
38         TAILQ_ENTRY(action_entry) next;
39         char action_name[RTE_MP_MAX_NAME_LEN];
40         rte_mp_t action;
41 };
42
43 /** Double linked list of actions. */
44 TAILQ_HEAD(action_entry_list, action_entry);
45
46 static struct action_entry_list action_entry_list =
47         TAILQ_HEAD_INITIALIZER(action_entry_list);
48
49 int
50 rte_eal_primary_proc_alive(const char *config_file_path)
51 {
52         int config_fd;
53
54         if (config_file_path)
55                 config_fd = open(config_file_path, O_RDONLY);
56         else {
57                 const char *path;
58
59                 path = eal_runtime_config_path();
60                 config_fd = open(path, O_RDONLY);
61         }
62         if (config_fd < 0)
63                 return 0;
64
65         int ret = lockf(config_fd, F_TEST, 0);
66         close(config_fd);
67
68         return !!ret;
69 }
70
71 static struct action_entry *
72 find_action_entry_by_name(const char *name)
73 {
74         struct action_entry *entry;
75
76         TAILQ_FOREACH(entry, &action_entry_list, next) {
77                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
78                         break;
79         }
80
81         return entry;
82 }
83
84 static int
85 validate_action_name(const char *name)
86 {
87         if (name == NULL) {
88                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
89                 rte_errno = -EINVAL;
90                 return -1;
91         }
92         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
93                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
94                 rte_errno = -EINVAL;
95                 return -1;
96         }
97         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
98                 rte_errno = -E2BIG;
99                 return -1;
100         }
101         return 0;
102 }
103
104 int __rte_experimental
105 rte_mp_action_register(const char *name, rte_mp_t action)
106 {
107         struct action_entry *entry;
108
109         if (validate_action_name(name))
110                 return -1;
111
112         entry = malloc(sizeof(struct action_entry));
113         if (entry == NULL) {
114                 rte_errno = -ENOMEM;
115                 return -1;
116         }
117         strcpy(entry->action_name, name);
118         entry->action = action;
119
120         pthread_mutex_lock(&mp_mutex_action);
121         if (find_action_entry_by_name(name) != NULL) {
122                 pthread_mutex_unlock(&mp_mutex_action);
123                 rte_errno = -EEXIST;
124                 free(entry);
125                 return -1;
126         }
127         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
128         pthread_mutex_unlock(&mp_mutex_action);
129         return 0;
130 }
131
132 void __rte_experimental
133 rte_mp_action_unregister(const char *name)
134 {
135         struct action_entry *entry;
136
137         if (validate_action_name(name))
138                 return;
139
140         pthread_mutex_lock(&mp_mutex_action);
141         entry = find_action_entry_by_name(name);
142         if (entry == NULL) {
143                 pthread_mutex_unlock(&mp_mutex_action);
144                 return;
145         }
146         TAILQ_REMOVE(&action_entry_list, entry, next);
147         pthread_mutex_unlock(&mp_mutex_action);
148         free(entry);
149 }
150
151 static int
152 read_msg(struct rte_mp_msg *msg)
153 {
154         int msglen;
155         struct iovec iov;
156         struct msghdr msgh;
157         char control[CMSG_SPACE(sizeof(msg->fds))];
158         struct cmsghdr *cmsg;
159         int buflen = sizeof(*msg) - sizeof(msg->fds);
160
161         memset(&msgh, 0, sizeof(msgh));
162         iov.iov_base = msg;
163         iov.iov_len  = buflen;
164
165         msgh.msg_iov = &iov;
166         msgh.msg_iovlen = 1;
167         msgh.msg_control = control;
168         msgh.msg_controllen = sizeof(control);
169
170         msglen = recvmsg(mp_fd, &msgh, 0);
171         if (msglen < 0) {
172                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
173                 return -1;
174         }
175
176         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
177                 RTE_LOG(ERR, EAL, "truncted msg\n");
178                 return -1;
179         }
180
181         /* read auxiliary FDs if any */
182         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
183                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
184                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
185                         (cmsg->cmsg_type == SCM_RIGHTS)) {
186                         memcpy(msg->fds, CMSG_DATA(cmsg), sizeof(msg->fds));
187                         break;
188                 }
189         }
190
191         return 0;
192 }
193
194 static void
195 process_msg(struct rte_mp_msg *msg)
196 {
197         struct action_entry *entry;
198         rte_mp_t action = NULL;
199
200         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
201         pthread_mutex_lock(&mp_mutex_action);
202         entry = find_action_entry_by_name(msg->name);
203         if (entry != NULL)
204                 action = entry->action;
205         pthread_mutex_unlock(&mp_mutex_action);
206
207         if (!action)
208                 RTE_LOG(ERR, EAL, "Cannot find action: %s\n", msg->name);
209         else if (action(msg) < 0)
210                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
211 }
212
213 static void *
214 mp_handle(void *arg __rte_unused)
215 {
216         struct rte_mp_msg msg;
217
218         while (1) {
219                 if (read_msg(&msg) == 0)
220                         process_msg(&msg);
221         }
222
223         return NULL;
224 }
225
226 static int
227 open_socket_fd(void)
228 {
229         struct sockaddr_un un;
230         const char *prefix = eal_mp_socket_path();
231
232         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
233         if (mp_fd < 0) {
234                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
235                 return -1;
236         }
237
238         memset(&un, 0, sizeof(un));
239         un.sun_family = AF_UNIX;
240         if (rte_eal_process_type() == RTE_PROC_PRIMARY)
241                 snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);
242         else {
243                 snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64,
244                          prefix, getpid(), rte_rdtsc());
245         }
246         unlink(un.sun_path); /* May still exist since last run */
247         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
248                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
249                         un.sun_path, strerror(errno));
250                 close(mp_fd);
251                 return -1;
252         }
253
254         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
255         return mp_fd;
256 }
257
258 static int
259 unlink_sockets(const char *filter)
260 {
261         int dir_fd;
262         DIR *mp_dir;
263         struct dirent *ent;
264
265         mp_dir = opendir(mp_dir_path);
266         if (!mp_dir) {
267                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
268                 return -1;
269         }
270         dir_fd = dirfd(mp_dir);
271
272         while ((ent = readdir(mp_dir))) {
273                 if (fnmatch(filter, ent->d_name, 0) == 0)
274                         unlinkat(dir_fd, ent->d_name, 0);
275         }
276
277         closedir(mp_dir);
278         return 0;
279 }
280
281 static void
282 unlink_socket_by_path(const char *path)
283 {
284         char *filename;
285         char *fullpath = strdup(path);
286
287         if (!fullpath)
288                 return;
289         filename = basename(fullpath);
290         unlink_sockets(filename);
291         free(fullpath);
292         RTE_LOG(INFO, EAL, "Remove socket %s\n", path);
293 }
294
295 int
296 rte_mp_channel_init(void)
297 {
298         char thread_name[RTE_MAX_THREAD_NAME_LEN];
299         char *path;
300         pthread_t tid;
301
302         snprintf(mp_filter, PATH_MAX, ".%s_unix_*",
303                  internal_config.hugefile_prefix);
304
305         path = strdup(eal_mp_socket_path());
306         snprintf(mp_dir_path, PATH_MAX, "%s", dirname(path));
307         free(path);
308
309         if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
310             unlink_sockets(mp_filter)) {
311                 RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
312                 return -1;
313         }
314
315         if (open_socket_fd() < 0)
316                 return -1;
317
318         if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
319                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
320                         strerror(errno));
321                 close(mp_fd);
322                 mp_fd = -1;
323                 return -1;
324         }
325
326         /* try best to set thread name */
327         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
328         rte_thread_setname(tid, thread_name);
329         return 0;
330 }
331
332 /**
333  * Return -1, as fail to send message and it's caused by the local side.
334  * Return 0, as fail to send message and it's caused by the remote side.
335  * Return 1, as succeed to send message.
336  *
337  */
338 static int
339 send_msg(const char *dst_path, struct rte_mp_msg *msg)
340 {
341         int snd;
342         struct iovec iov;
343         struct msghdr msgh;
344         struct cmsghdr *cmsg;
345         struct sockaddr_un dst;
346         int fd_size = msg->num_fds * sizeof(int);
347         char control[CMSG_SPACE(fd_size)];
348
349         memset(&dst, 0, sizeof(dst));
350         dst.sun_family = AF_UNIX;
351         snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path);
352
353         memset(&msgh, 0, sizeof(msgh));
354         memset(control, 0, sizeof(control));
355
356         iov.iov_base = msg;
357         iov.iov_len = sizeof(*msg) - sizeof(msg->fds);
358
359         msgh.msg_name = &dst;
360         msgh.msg_namelen = sizeof(dst);
361         msgh.msg_iov = &iov;
362         msgh.msg_iovlen = 1;
363         msgh.msg_control = control;
364         msgh.msg_controllen = sizeof(control);
365
366         cmsg = CMSG_FIRSTHDR(&msgh);
367         cmsg->cmsg_len = CMSG_LEN(fd_size);
368         cmsg->cmsg_level = SOL_SOCKET;
369         cmsg->cmsg_type = SCM_RIGHTS;
370         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
371
372         do {
373                 snd = sendmsg(mp_fd, &msgh, 0);
374         } while (snd < 0 && errno == EINTR);
375
376         if (snd < 0) {
377                 rte_errno = errno;
378                 /* Check if it caused by peer process exits */
379                 if (errno == -ECONNREFUSED) {
380                         /* We don't unlink the primary's socket here */
381                         if (rte_eal_process_type() == RTE_PROC_PRIMARY)
382                                 unlink_socket_by_path(dst_path);
383                         return 0;
384                 }
385                 if (errno == -ENOBUFS) {
386                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
387                                 dst_path);
388                         return 0;
389                 }
390                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
391                         dst_path, strerror(errno));
392                 return -1;
393         }
394
395         return 1;
396 }
397
398 static int
399 mp_send(struct rte_mp_msg *msg)
400 {
401         int ret = 0;
402         DIR *mp_dir;
403         struct dirent *ent;
404
405         if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
406                 if (send_msg(eal_mp_socket_path(), msg) < 0)
407                         return -1;
408                 else
409                         return 0;
410         }
411
412         /* broadcast to all secondary processes */
413         mp_dir = opendir(mp_dir_path);
414         if (!mp_dir) {
415                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
416                                 mp_dir_path);
417                 rte_errno = errno;
418                 return -1;
419         }
420         while ((ent = readdir(mp_dir))) {
421                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
422                         continue;
423
424                 if (send_msg(ent->d_name, msg) < 0)
425                         ret = -1;
426         }
427         closedir(mp_dir);
428
429         return ret;
430 }
431
432 static bool
433 check_input(const struct rte_mp_msg *msg)
434 {
435         if (msg == NULL) {
436                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
437                 rte_errno = -EINVAL;
438                 return false;
439         }
440
441         if (validate_action_name(msg->name))
442                 return false;
443
444         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
445                 RTE_LOG(ERR, EAL, "Message data is too long\n");
446                 rte_errno = -E2BIG;
447                 return false;
448         }
449
450         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
451                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
452                         RTE_MP_MAX_FD_NUM);
453                 rte_errno = -E2BIG;
454                 return false;
455         }
456
457         return true;
458 }
459
460 int __rte_experimental
461 rte_mp_sendmsg(struct rte_mp_msg *msg)
462 {
463         if (!check_input(msg))
464                 return -1;
465
466         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
467         return mp_send(msg);
468 }