eal: ignore IPC messages until init is complete
[dpdk.git] / lib / librte_eal / common / eal_common_proc.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2018 Intel Corporation
3  */
4
5 #include <dirent.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <fnmatch.h>
9 #include <inttypes.h>
10 #include <libgen.h>
11 #include <limits.h>
12 #include <pthread.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/file.h>
17 #include <sys/time.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <sys/un.h>
21 #include <unistd.h>
22
23 #include <rte_common.h>
24 #include <rte_cycles.h>
25 #include <rte_eal.h>
26 #include <rte_errno.h>
27 #include <rte_lcore.h>
28 #include <rte_log.h>
29
30 #include "eal_private.h"
31 #include "eal_filesystem.h"
32 #include "eal_internal_cfg.h"
33
34 static int mp_fd = -1;
35 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
36 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
37 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
38
39 struct action_entry {
40         TAILQ_ENTRY(action_entry) next;
41         char action_name[RTE_MP_MAX_NAME_LEN];
42         rte_mp_t action;
43 };
44
45 /** Double linked list of actions. */
46 TAILQ_HEAD(action_entry_list, action_entry);
47
48 static struct action_entry_list action_entry_list =
49         TAILQ_HEAD_INITIALIZER(action_entry_list);
50
51 enum mp_type {
52         MP_MSG, /* Share message with peers, will not block */
53         MP_REQ, /* Request for information, Will block for a reply */
54         MP_REP, /* Response to previously-received request */
55         MP_IGN, /* Response telling requester to ignore this response */
56 };
57
58 struct mp_msg_internal {
59         int type;
60         struct rte_mp_msg msg;
61 };
62
63 struct sync_request {
64         TAILQ_ENTRY(sync_request) next;
65         int reply_received;
66         char dst[PATH_MAX];
67         struct rte_mp_msg *request;
68         struct rte_mp_msg *reply;
69         pthread_cond_t cond;
70 };
71
72 TAILQ_HEAD(sync_request_list, sync_request);
73
74 static struct {
75         struct sync_request_list requests;
76         pthread_mutex_t lock;
77 } sync_requests = {
78         .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
79         .lock = PTHREAD_MUTEX_INITIALIZER
80 };
81
82 /* forward declarations */
83 static int
84 mp_send(struct rte_mp_msg *msg, const char *peer, int type);
85
86
87 static struct sync_request *
88 find_sync_request(const char *dst, const char *act_name)
89 {
90         struct sync_request *r;
91
92         TAILQ_FOREACH(r, &sync_requests.requests, next) {
93                 if (!strcmp(r->dst, dst) &&
94                     !strcmp(r->request->name, act_name))
95                         break;
96         }
97
98         return r;
99 }
100
101 static void
102 create_socket_path(const char *name, char *buf, int len)
103 {
104         const char *prefix = eal_mp_socket_path();
105
106         if (strlen(name) > 0)
107                 snprintf(buf, len, "%s_%s", prefix, name);
108         else
109                 snprintf(buf, len, "%s", prefix);
110 }
111
112 int
113 rte_eal_primary_proc_alive(const char *config_file_path)
114 {
115         int config_fd;
116
117         if (config_file_path)
118                 config_fd = open(config_file_path, O_RDONLY);
119         else {
120                 const char *path;
121
122                 path = eal_runtime_config_path();
123                 config_fd = open(path, O_RDONLY);
124         }
125         if (config_fd < 0)
126                 return 0;
127
128         int ret = lockf(config_fd, F_TEST, 0);
129         close(config_fd);
130
131         return !!ret;
132 }
133
134 static struct action_entry *
135 find_action_entry_by_name(const char *name)
136 {
137         struct action_entry *entry;
138
139         TAILQ_FOREACH(entry, &action_entry_list, next) {
140                 if (strncmp(entry->action_name, name, RTE_MP_MAX_NAME_LEN) == 0)
141                         break;
142         }
143
144         return entry;
145 }
146
147 static int
148 validate_action_name(const char *name)
149 {
150         if (name == NULL) {
151                 RTE_LOG(ERR, EAL, "Action name cannot be NULL\n");
152                 rte_errno = EINVAL;
153                 return -1;
154         }
155         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == 0) {
156                 RTE_LOG(ERR, EAL, "Length of action name is zero\n");
157                 rte_errno = EINVAL;
158                 return -1;
159         }
160         if (strnlen(name, RTE_MP_MAX_NAME_LEN) == RTE_MP_MAX_NAME_LEN) {
161                 rte_errno = E2BIG;
162                 return -1;
163         }
164         return 0;
165 }
166
167 int __rte_experimental
168 rte_mp_action_register(const char *name, rte_mp_t action)
169 {
170         struct action_entry *entry;
171
172         if (validate_action_name(name))
173                 return -1;
174
175         entry = malloc(sizeof(struct action_entry));
176         if (entry == NULL) {
177                 rte_errno = ENOMEM;
178                 return -1;
179         }
180         strcpy(entry->action_name, name);
181         entry->action = action;
182
183         pthread_mutex_lock(&mp_mutex_action);
184         if (find_action_entry_by_name(name) != NULL) {
185                 pthread_mutex_unlock(&mp_mutex_action);
186                 rte_errno = EEXIST;
187                 free(entry);
188                 return -1;
189         }
190         TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
191         pthread_mutex_unlock(&mp_mutex_action);
192         return 0;
193 }
194
195 void __rte_experimental
196 rte_mp_action_unregister(const char *name)
197 {
198         struct action_entry *entry;
199
200         if (validate_action_name(name))
201                 return;
202
203         pthread_mutex_lock(&mp_mutex_action);
204         entry = find_action_entry_by_name(name);
205         if (entry == NULL) {
206                 pthread_mutex_unlock(&mp_mutex_action);
207                 return;
208         }
209         TAILQ_REMOVE(&action_entry_list, entry, next);
210         pthread_mutex_unlock(&mp_mutex_action);
211         free(entry);
212 }
213
214 static int
215 read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
216 {
217         int msglen;
218         struct iovec iov;
219         struct msghdr msgh;
220         char control[CMSG_SPACE(sizeof(m->msg.fds))];
221         struct cmsghdr *cmsg;
222         int buflen = sizeof(*m) - sizeof(m->msg.fds);
223
224         memset(&msgh, 0, sizeof(msgh));
225         iov.iov_base = m;
226         iov.iov_len  = buflen;
227
228         msgh.msg_name = s;
229         msgh.msg_namelen = sizeof(*s);
230         msgh.msg_iov = &iov;
231         msgh.msg_iovlen = 1;
232         msgh.msg_control = control;
233         msgh.msg_controllen = sizeof(control);
234
235         msglen = recvmsg(mp_fd, &msgh, 0);
236         if (msglen < 0) {
237                 RTE_LOG(ERR, EAL, "recvmsg failed, %s\n", strerror(errno));
238                 return -1;
239         }
240
241         if (msglen != buflen || (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC))) {
242                 RTE_LOG(ERR, EAL, "truncted msg\n");
243                 return -1;
244         }
245
246         /* read auxiliary FDs if any */
247         for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL;
248                 cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
249                 if ((cmsg->cmsg_level == SOL_SOCKET) &&
250                         (cmsg->cmsg_type == SCM_RIGHTS)) {
251                         memcpy(m->msg.fds, CMSG_DATA(cmsg), sizeof(m->msg.fds));
252                         break;
253                 }
254         }
255
256         return 0;
257 }
258
259 static void
260 process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
261 {
262         struct sync_request *sync_req;
263         struct action_entry *entry;
264         struct rte_mp_msg *msg = &m->msg;
265         rte_mp_t action = NULL;
266
267         RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);
268
269         if (m->type == MP_REP || m->type == MP_IGN) {
270                 pthread_mutex_lock(&sync_requests.lock);
271                 sync_req = find_sync_request(s->sun_path, msg->name);
272                 if (sync_req) {
273                         memcpy(sync_req->reply, msg, sizeof(*msg));
274                         /* -1 indicates that we've been asked to ignore */
275                         sync_req->reply_received = m->type == MP_REP ? 1 : -1;
276                         pthread_cond_signal(&sync_req->cond);
277                 } else
278                         RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
279                 pthread_mutex_unlock(&sync_requests.lock);
280                 return;
281         }
282
283         pthread_mutex_lock(&mp_mutex_action);
284         entry = find_action_entry_by_name(msg->name);
285         if (entry != NULL)
286                 action = entry->action;
287         pthread_mutex_unlock(&mp_mutex_action);
288
289         if (!action) {
290                 if (m->type == MP_REQ && !internal_config.init_complete) {
291                         /* if this is a request, and init is not yet complete,
292                          * and callback wasn't registered, we should tell the
293                          * requester to ignore our existence because we're not
294                          * yet ready to process this request.
295                          */
296                         struct rte_mp_msg dummy;
297                         memset(&dummy, 0, sizeof(dummy));
298                         mp_send(&dummy, s->sun_path, MP_IGN);
299                 } else {
300                         RTE_LOG(ERR, EAL, "Cannot find action: %s\n",
301                                 msg->name);
302                 }
303         } else if (action(msg, s->sun_path) < 0) {
304                 RTE_LOG(ERR, EAL, "Fail to handle message: %s\n", msg->name);
305         }
306 }
307
308 static void *
309 mp_handle(void *arg __rte_unused)
310 {
311         struct mp_msg_internal msg;
312         struct sockaddr_un sa;
313
314         while (1) {
315                 if (read_msg(&msg, &sa) == 0)
316                         process_msg(&msg, &sa);
317         }
318
319         return NULL;
320 }
321
322 static int
323 open_socket_fd(void)
324 {
325         char peer_name[PATH_MAX] = {0};
326         struct sockaddr_un un;
327
328         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
329                 snprintf(peer_name, sizeof(peer_name),
330                                 "%d_%"PRIx64, getpid(), rte_rdtsc());
331
332         mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
333         if (mp_fd < 0) {
334                 RTE_LOG(ERR, EAL, "failed to create unix socket\n");
335                 return -1;
336         }
337
338         memset(&un, 0, sizeof(un));
339         un.sun_family = AF_UNIX;
340
341         create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
342
343         unlink(un.sun_path); /* May still exist since last run */
344
345         if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
346                 RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
347                         un.sun_path, strerror(errno));
348                 close(mp_fd);
349                 return -1;
350         }
351
352         RTE_LOG(INFO, EAL, "Multi-process socket %s\n", un.sun_path);
353         return mp_fd;
354 }
355
356 static int
357 unlink_sockets(const char *filter)
358 {
359         int dir_fd;
360         DIR *mp_dir;
361         struct dirent *ent;
362
363         mp_dir = opendir(mp_dir_path);
364         if (!mp_dir) {
365                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
366                 return -1;
367         }
368         dir_fd = dirfd(mp_dir);
369
370         while ((ent = readdir(mp_dir))) {
371                 if (fnmatch(filter, ent->d_name, 0) == 0)
372                         unlinkat(dir_fd, ent->d_name, 0);
373         }
374
375         closedir(mp_dir);
376         return 0;
377 }
378
379 int
380 rte_mp_channel_init(void)
381 {
382         char thread_name[RTE_MAX_THREAD_NAME_LEN];
383         char path[PATH_MAX];
384         int dir_fd;
385         pthread_t tid;
386
387         /* create filter path */
388         create_socket_path("*", path, sizeof(path));
389         snprintf(mp_filter, sizeof(mp_filter), "%s", basename(path));
390
391         /* path may have been modified, so recreate it */
392         create_socket_path("*", path, sizeof(path));
393         snprintf(mp_dir_path, sizeof(mp_dir_path), "%s", dirname(path));
394
395         /* lock the directory */
396         dir_fd = open(mp_dir_path, O_RDONLY);
397         if (dir_fd < 0) {
398                 RTE_LOG(ERR, EAL, "failed to open %s: %s\n",
399                         mp_dir_path, strerror(errno));
400                 return -1;
401         }
402
403         if (flock(dir_fd, LOCK_EX)) {
404                 RTE_LOG(ERR, EAL, "failed to lock %s: %s\n",
405                         mp_dir_path, strerror(errno));
406                 close(dir_fd);
407                 return -1;
408         }
409
410         if (rte_eal_process_type() == RTE_PROC_PRIMARY &&
411                         unlink_sockets(mp_filter)) {
412                 RTE_LOG(ERR, EAL, "failed to unlink mp sockets\n");
413                 close(dir_fd);
414                 return -1;
415         }
416
417         if (open_socket_fd() < 0) {
418                 close(dir_fd);
419                 return -1;
420         }
421
422         if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
423                 RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
424                         strerror(errno));
425                 close(mp_fd);
426                 close(dir_fd);
427                 mp_fd = -1;
428                 return -1;
429         }
430
431         /* try best to set thread name */
432         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
433         rte_thread_setname(tid, thread_name);
434
435         /* unlock the directory */
436         flock(dir_fd, LOCK_UN);
437         close(dir_fd);
438
439         return 0;
440 }
441
442 /**
443  * Return -1, as fail to send message and it's caused by the local side.
444  * Return 0, as fail to send message and it's caused by the remote side.
445  * Return 1, as succeed to send message.
446  *
447  */
448 static int
449 send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
450 {
451         int snd;
452         struct iovec iov;
453         struct msghdr msgh;
454         struct cmsghdr *cmsg;
455         struct sockaddr_un dst;
456         struct mp_msg_internal m;
457         int fd_size = msg->num_fds * sizeof(int);
458         char control[CMSG_SPACE(fd_size)];
459
460         m.type = type;
461         memcpy(&m.msg, msg, sizeof(*msg));
462
463         memset(&dst, 0, sizeof(dst));
464         dst.sun_family = AF_UNIX;
465         snprintf(dst.sun_path, sizeof(dst.sun_path), "%s", dst_path);
466
467         memset(&msgh, 0, sizeof(msgh));
468         memset(control, 0, sizeof(control));
469
470         iov.iov_base = &m;
471         iov.iov_len = sizeof(m) - sizeof(msg->fds);
472
473         msgh.msg_name = &dst;
474         msgh.msg_namelen = sizeof(dst);
475         msgh.msg_iov = &iov;
476         msgh.msg_iovlen = 1;
477         msgh.msg_control = control;
478         msgh.msg_controllen = sizeof(control);
479
480         cmsg = CMSG_FIRSTHDR(&msgh);
481         cmsg->cmsg_len = CMSG_LEN(fd_size);
482         cmsg->cmsg_level = SOL_SOCKET;
483         cmsg->cmsg_type = SCM_RIGHTS;
484         memcpy(CMSG_DATA(cmsg), msg->fds, fd_size);
485
486         do {
487                 snd = sendmsg(mp_fd, &msgh, 0);
488         } while (snd < 0 && errno == EINTR);
489
490         if (snd < 0) {
491                 rte_errno = errno;
492                 /* Check if it caused by peer process exits */
493                 if (errno == ECONNREFUSED &&
494                                 rte_eal_process_type() == RTE_PROC_PRIMARY) {
495                         unlink(dst_path);
496                         return 0;
497                 }
498                 if (errno == ENOBUFS) {
499                         RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
500                                 dst_path);
501                         return 0;
502                 }
503                 RTE_LOG(ERR, EAL, "failed to send to (%s) due to %s\n",
504                         dst_path, strerror(errno));
505                 return -1;
506         }
507
508         return 1;
509 }
510
511 static int
512 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
513 {
514         int dir_fd, ret = 0;
515         DIR *mp_dir;
516         struct dirent *ent;
517
518         if (!peer && (rte_eal_process_type() == RTE_PROC_SECONDARY))
519                 peer = eal_mp_socket_path();
520
521         if (peer) {
522                 if (send_msg(peer, msg, type) < 0)
523                         return -1;
524                 else
525                         return 0;
526         }
527
528         /* broadcast to all secondary processes */
529         mp_dir = opendir(mp_dir_path);
530         if (!mp_dir) {
531                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n",
532                                 mp_dir_path);
533                 rte_errno = errno;
534                 return -1;
535         }
536
537         dir_fd = dirfd(mp_dir);
538         /* lock the directory to prevent processes spinning up while we send */
539         if (flock(dir_fd, LOCK_EX)) {
540                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
541                         mp_dir_path);
542                 rte_errno = errno;
543                 closedir(mp_dir);
544                 return -1;
545         }
546
547         while ((ent = readdir(mp_dir))) {
548                 char path[PATH_MAX];
549
550                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
551                         continue;
552
553                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
554                          ent->d_name);
555                 if (send_msg(path, msg, type) < 0)
556                         ret = -1;
557         }
558         /* unlock the dir */
559         flock(dir_fd, LOCK_UN);
560
561         /* dir_fd automatically closed on closedir */
562         closedir(mp_dir);
563         return ret;
564 }
565
566 static bool
567 check_input(const struct rte_mp_msg *msg)
568 {
569         if (msg == NULL) {
570                 RTE_LOG(ERR, EAL, "Msg cannot be NULL\n");
571                 rte_errno = EINVAL;
572                 return false;
573         }
574
575         if (validate_action_name(msg->name))
576                 return false;
577
578         if (msg->len_param > RTE_MP_MAX_PARAM_LEN) {
579                 RTE_LOG(ERR, EAL, "Message data is too long\n");
580                 rte_errno = E2BIG;
581                 return false;
582         }
583
584         if (msg->num_fds > RTE_MP_MAX_FD_NUM) {
585                 RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n",
586                         RTE_MP_MAX_FD_NUM);
587                 rte_errno = E2BIG;
588                 return false;
589         }
590
591         return true;
592 }
593
594 int __rte_experimental
595 rte_mp_sendmsg(struct rte_mp_msg *msg)
596 {
597         if (!check_input(msg))
598                 return -1;
599
600         RTE_LOG(DEBUG, EAL, "sendmsg: %s\n", msg->name);
601         return mp_send(msg, NULL, MP_MSG);
602 }
603
604 static int
605 mp_request_one(const char *dst, struct rte_mp_msg *req,
606                struct rte_mp_reply *reply, const struct timespec *ts)
607 {
608         int ret;
609         struct rte_mp_msg msg, *tmp;
610         struct sync_request sync_req, *exist;
611
612         sync_req.reply_received = 0;
613         strcpy(sync_req.dst, dst);
614         sync_req.request = req;
615         sync_req.reply = &msg;
616         pthread_cond_init(&sync_req.cond, NULL);
617
618         pthread_mutex_lock(&sync_requests.lock);
619         exist = find_sync_request(dst, req->name);
620         if (!exist)
621                 TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next);
622         if (exist) {
623                 RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
624                 rte_errno = EEXIST;
625                 pthread_mutex_unlock(&sync_requests.lock);
626                 return -1;
627         }
628
629         ret = send_msg(dst, req, MP_REQ);
630         if (ret < 0) {
631                 RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
632                         dst, req->name);
633                 return -1;
634         } else if (ret == 0)
635                 return 0;
636
637         reply->nb_sent++;
638
639         do {
640                 ret = pthread_cond_timedwait(&sync_req.cond,
641                                 &sync_requests.lock, ts);
642         } while (ret != 0 && ret != ETIMEDOUT);
643
644         /* We got the lock now */
645         TAILQ_REMOVE(&sync_requests.requests, &sync_req, next);
646         pthread_mutex_unlock(&sync_requests.lock);
647
648         if (sync_req.reply_received == 0) {
649                 RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
650                         dst, req->name);
651                 rte_errno = ETIMEDOUT;
652                 return -1;
653         }
654         if (sync_req.reply_received == -1) {
655                 RTE_LOG(DEBUG, EAL, "Asked to ignore response\n");
656                 /* not receiving this message is not an error, so decrement
657                  * number of sent messages
658                  */
659                 reply->nb_sent--;
660                 return 0;
661         }
662
663         tmp = realloc(reply->msgs, sizeof(msg) * (reply->nb_received + 1));
664         if (!tmp) {
665                 RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
666                         dst, req->name);
667                 rte_errno = ENOMEM;
668                 return -1;
669         }
670         memcpy(&tmp[reply->nb_received], &msg, sizeof(msg));
671         reply->msgs = tmp;
672         reply->nb_received++;
673         return 0;
674 }
675
676 int __rte_experimental
677 rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
678                 const struct timespec *ts)
679 {
680         int dir_fd, ret = 0;
681         DIR *mp_dir;
682         struct dirent *ent;
683         struct timeval now;
684         struct timespec end;
685
686         RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
687
688         if (check_input(req) == false)
689                 return -1;
690         if (gettimeofday(&now, NULL) < 0) {
691                 RTE_LOG(ERR, EAL, "Faile to get current time\n");
692                 rte_errno = errno;
693                 return -1;
694         }
695
696         end.tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
697         end.tv_sec = now.tv_sec + ts->tv_sec +
698                         (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
699
700         reply->nb_sent = 0;
701         reply->nb_received = 0;
702         reply->msgs = NULL;
703
704         /* for secondary process, send request to the primary process only */
705         if (rte_eal_process_type() == RTE_PROC_SECONDARY)
706                 return mp_request_one(eal_mp_socket_path(), req, reply, &end);
707
708         /* for primary process, broadcast request, and collect reply 1 by 1 */
709         mp_dir = opendir(mp_dir_path);
710         if (!mp_dir) {
711                 RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
712                 rte_errno = errno;
713                 return -1;
714         }
715
716         dir_fd = dirfd(mp_dir);
717         /* lock the directory to prevent processes spinning up while we send */
718         if (flock(dir_fd, LOCK_EX)) {
719                 RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
720                         mp_dir_path);
721                 closedir(mp_dir);
722                 rte_errno = errno;
723                 return -1;
724         }
725
726         while ((ent = readdir(mp_dir))) {
727                 char path[PATH_MAX];
728
729                 if (fnmatch(mp_filter, ent->d_name, 0) != 0)
730                         continue;
731
732                 snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
733                          ent->d_name);
734
735                 if (mp_request_one(path, req, reply, &end))
736                         ret = -1;
737         }
738         /* unlock the directory */
739         flock(dir_fd, LOCK_UN);
740
741         /* dir_fd automatically closed on closedir */
742         closedir(mp_dir);
743         return ret;
744 }
745
746 int __rte_experimental
747 rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
748 {
749
750         RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
751
752         if (check_input(msg) == false)
753                 return -1;
754
755         if (peer == NULL) {
756                 RTE_LOG(ERR, EAL, "peer is not specified\n");
757                 rte_errno = EINVAL;
758                 return -1;
759         }
760
761         return mp_send(msg, peer, MP_REP);
762 }