ipc: handle unsupported IPC in action register
[dpdk.git] / lib / librte_eal / common / malloc_mp.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4
5 #include <string.h>
6 #include <sys/time.h>
7
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11
12 #include "eal_memalloc.h"
13
14 #include "malloc_elem.h"
15 #include "malloc_mp.h"
16
17 #define MP_ACTION_SYNC "mp_malloc_sync"
18 /**< request sent by primary process to notify of changes in memory map */
19 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
20 /**< request sent by primary process to notify of changes in memory map. this is
21  * essentially a regular sync request, but we cannot send sync requests while
22  * another one is in progress, and we might have to - therefore, we do this as
23  * a separate callback.
24  */
25 #define MP_ACTION_REQUEST "mp_malloc_request"
26 /**< request sent by secondary process to ask for allocation/deallocation */
27 #define MP_ACTION_RESPONSE "mp_malloc_response"
28 /**< response sent to secondary process to indicate result of request */
29
30 /* forward declarations */
31 static int
32 handle_sync_response(const struct rte_mp_msg *request,
33                 const struct rte_mp_reply *reply);
34 static int
35 handle_rollback_response(const struct rte_mp_msg *request,
36                 const struct rte_mp_reply *reply);
37
38 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
39
40 /* when we're allocating, we need to store some state to ensure that we can
41  * roll back later
42  */
43 struct primary_alloc_req_state {
44         struct malloc_heap *heap;
45         struct rte_memseg **ms;
46         int ms_len;
47         struct malloc_elem *elem;
48         void *map_addr;
49         size_t map_len;
50 };
51
52 enum req_state {
53         REQ_STATE_INACTIVE = 0,
54         REQ_STATE_ACTIVE,
55         REQ_STATE_COMPLETE
56 };
57
58 struct mp_request {
59         TAILQ_ENTRY(mp_request) next;
60         struct malloc_mp_req user_req; /**< contents of request */
61         pthread_cond_t cond; /**< variable we use to time out on this request */
62         enum req_state state; /**< indicate status of this request */
63         struct primary_alloc_req_state alloc_state;
64 };
65
66 /*
67  * We could've used just a single request, but it may be possible for
68  * secondaries to timeout earlier than the primary, and send a new request while
69  * primary is still expecting replies to the old one. Therefore, each new
70  * request will get assigned a new ID, which is how we will distinguish between
71  * expected and unexpected messages.
72  */
73 TAILQ_HEAD(mp_request_list, mp_request);
74 static struct {
75         struct mp_request_list list;
76         pthread_mutex_t lock;
77 } mp_request_list = {
78         .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
79         .lock = PTHREAD_MUTEX_INITIALIZER
80 };
81
82 /**
83  * General workflow is the following:
84  *
85  * Allocation:
86  * S: send request to primary
87  * P: attempt to allocate memory
88  *    if failed, sendmsg failure
89  *    if success, send sync request
90  * S: if received msg of failure, quit
91  *    if received sync request, synchronize memory map and reply with result
92  * P: if received sync request result
93  *    if success, sendmsg success
94  *    if failure, roll back allocation and send a rollback request
95  * S: if received msg of success, quit
96  *    if received rollback request, synchronize memory map and reply with result
97  * P: if received sync request result
98  *    sendmsg sync request result
99  * S: if received msg, quit
100  *
101  * Aside from timeouts, there are three points where we can quit:
102  *  - if allocation failed straight away
103  *  - if allocation and sync request succeeded
104  *  - if allocation succeeded, sync request failed, allocation rolled back and
105  *    rollback request received (irrespective of whether it succeeded or failed)
106  *
107  * Deallocation:
108  * S: send request to primary
109  * P: attempt to deallocate memory
110  *    if failed, sendmsg failure
111  *    if success, send sync request
112  * S: if received msg of failure, quit
113  *    if received sync request, synchronize memory map and reply with result
114  * P: if received sync request result
115  *    sendmsg sync request result
116  * S: if received msg, quit
117  *
118  * There is no "rollback" from deallocation, as it's safe to have some memory
119  * mapped in some processes - it's absent from the heap, so it won't get used.
120  */
121
122 static struct mp_request *
123 find_request_by_id(uint64_t id)
124 {
125         struct mp_request *req;
126         TAILQ_FOREACH(req, &mp_request_list.list, next) {
127                 if (req->user_req.id == id)
128                         break;
129         }
130         return req;
131 }
132
133 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
134 static uint64_t
135 get_unique_id(void)
136 {
137         uint64_t id;
138         do {
139                 id = rte_rand();
140         } while (find_request_by_id(id) != NULL);
141         return id;
142 }
143
144 /* secondary will respond to sync requests thusly */
145 static int
146 handle_sync(const struct rte_mp_msg *msg, const void *peer)
147 {
148         struct rte_mp_msg reply;
149         const struct malloc_mp_req *req =
150                         (const struct malloc_mp_req *)msg->param;
151         struct malloc_mp_req *resp =
152                         (struct malloc_mp_req *)reply.param;
153         int ret;
154
155         if (req->t != REQ_TYPE_SYNC) {
156                 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
157                 return -1;
158         }
159
160         memset(&reply, 0, sizeof(reply));
161
162         reply.num_fds = 0;
163         strlcpy(reply.name, msg->name, sizeof(reply.name));
164         reply.len_param = sizeof(*resp);
165
166         ret = eal_memalloc_sync_with_primary();
167
168         resp->t = REQ_TYPE_SYNC;
169         resp->id = req->id;
170         resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
171
172         rte_mp_reply(&reply, peer);
173
174         return 0;
175 }
176
177 static int
178 handle_alloc_request(const struct malloc_mp_req *m,
179                 struct mp_request *req)
180 {
181         const struct malloc_req_alloc *ar = &m->alloc_req;
182         struct malloc_heap *heap;
183         struct malloc_elem *elem;
184         struct rte_memseg **ms;
185         size_t alloc_sz;
186         int n_segs;
187         void *map_addr;
188
189         alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
190                         MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
191         n_segs = alloc_sz / ar->page_sz;
192
193         heap = ar->heap;
194
195         /* we can't know in advance how many pages we'll need, so we malloc */
196         ms = malloc(sizeof(*ms) * n_segs);
197         if (ms == NULL) {
198                 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
199                 goto fail;
200         }
201         memset(ms, 0, sizeof(*ms) * n_segs);
202
203         elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
204                         ar->flags, ar->align, ar->bound, ar->contig, ms,
205                         n_segs);
206
207         if (elem == NULL)
208                 goto fail;
209
210         map_addr = ms[0]->addr;
211
212         eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
213
214         /* we have succeeded in allocating memory, but we still need to sync
215          * with other processes. however, since DPDK IPC is single-threaded, we
216          * send an asynchronous request and exit this callback.
217          */
218
219         req->alloc_state.ms = ms;
220         req->alloc_state.ms_len = n_segs;
221         req->alloc_state.map_addr = map_addr;
222         req->alloc_state.map_len = alloc_sz;
223         req->alloc_state.elem = elem;
224         req->alloc_state.heap = heap;
225
226         return 0;
227 fail:
228         free(ms);
229         return -1;
230 }
231
232 /* first stage of primary handling requests from secondary */
233 static int
234 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
235 {
236         const struct malloc_mp_req *m =
237                         (const struct malloc_mp_req *)msg->param;
238         struct mp_request *entry;
239         int ret;
240
241         /* lock access to request */
242         pthread_mutex_lock(&mp_request_list.lock);
243
244         /* make sure it's not a dupe */
245         entry = find_request_by_id(m->id);
246         if (entry != NULL) {
247                 RTE_LOG(ERR, EAL, "Duplicate request id\n");
248                 goto fail;
249         }
250
251         entry = malloc(sizeof(*entry));
252         if (entry == NULL) {
253                 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
254                 goto fail;
255         }
256
257         /* erase all data */
258         memset(entry, 0, sizeof(*entry));
259
260         if (m->t == REQ_TYPE_ALLOC) {
261                 ret = handle_alloc_request(m, entry);
262         } else if (m->t == REQ_TYPE_FREE) {
263                 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
264                                 m->free_req.addr, m->free_req.len);
265
266                 ret = malloc_heap_free_pages(m->free_req.addr,
267                                 m->free_req.len);
268         } else {
269                 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
270                 goto fail;
271         }
272
273         if (ret != 0) {
274                 struct rte_mp_msg resp_msg;
275                 struct malloc_mp_req *resp =
276                                 (struct malloc_mp_req *)resp_msg.param;
277
278                 /* send failure message straight away */
279                 resp_msg.num_fds = 0;
280                 resp_msg.len_param = sizeof(*resp);
281                 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
282                                 sizeof(resp_msg.name));
283
284                 resp->t = m->t;
285                 resp->result = REQ_RESULT_FAIL;
286                 resp->id = m->id;
287
288                 if (rte_mp_sendmsg(&resp_msg)) {
289                         RTE_LOG(ERR, EAL, "Couldn't send response\n");
290                         goto fail;
291                 }
292                 /* we did not modify the request */
293                 free(entry);
294         } else {
295                 struct rte_mp_msg sr_msg;
296                 struct malloc_mp_req *sr =
297                                 (struct malloc_mp_req *)sr_msg.param;
298                 struct timespec ts;
299
300                 memset(&sr_msg, 0, sizeof(sr_msg));
301
302                 /* we can do something, so send sync request asynchronously */
303                 sr_msg.num_fds = 0;
304                 sr_msg.len_param = sizeof(*sr);
305                 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
306
307                 ts.tv_nsec = 0;
308                 ts.tv_sec = MP_TIMEOUT_S;
309
310                 /* sync requests carry no data */
311                 sr->t = REQ_TYPE_SYNC;
312                 sr->id = m->id;
313
314                 /* there may be stray timeout still waiting */
315                 do {
316                         ret = rte_mp_request_async(&sr_msg, &ts,
317                                         handle_sync_response);
318                 } while (ret != 0 && rte_errno == EEXIST);
319                 if (ret != 0) {
320                         RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
321                         if (m->t == REQ_TYPE_ALLOC)
322                                 free(entry->alloc_state.ms);
323                         goto fail;
324                 }
325
326                 /* mark request as in progress */
327                 memcpy(&entry->user_req, m, sizeof(*m));
328                 entry->state = REQ_STATE_ACTIVE;
329
330                 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
331         }
332         pthread_mutex_unlock(&mp_request_list.lock);
333         return 0;
334 fail:
335         pthread_mutex_unlock(&mp_request_list.lock);
336         free(entry);
337         return -1;
338 }
339
340 /* callback for asynchronous sync requests for primary. this will either do a
341  * sendmsg with results, or trigger rollback request.
342  */
343 static int
344 handle_sync_response(const struct rte_mp_msg *request,
345                 const struct rte_mp_reply *reply)
346 {
347         enum malloc_req_result result;
348         struct mp_request *entry;
349         const struct malloc_mp_req *mpreq =
350                         (const struct malloc_mp_req *)request->param;
351         int i;
352
353         /* lock the request */
354         pthread_mutex_lock(&mp_request_list.lock);
355
356         entry = find_request_by_id(mpreq->id);
357         if (entry == NULL) {
358                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
359                 goto fail;
360         }
361
362         result = REQ_RESULT_SUCCESS;
363
364         if (reply->nb_received != reply->nb_sent)
365                 result = REQ_RESULT_FAIL;
366
367         for (i = 0; i < reply->nb_received; i++) {
368                 struct malloc_mp_req *resp =
369                                 (struct malloc_mp_req *)reply->msgs[i].param;
370
371                 if (resp->t != REQ_TYPE_SYNC) {
372                         RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
373                         result = REQ_RESULT_FAIL;
374                         break;
375                 }
376                 if (resp->id != entry->user_req.id) {
377                         RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
378                         result = REQ_RESULT_FAIL;
379                         break;
380                 }
381                 if (resp->result == REQ_RESULT_FAIL) {
382                         result = REQ_RESULT_FAIL;
383                         break;
384                 }
385         }
386
387         if (entry->user_req.t == REQ_TYPE_FREE) {
388                 struct rte_mp_msg msg;
389                 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
390
391                 memset(&msg, 0, sizeof(msg));
392
393                 /* this is a free request, just sendmsg result */
394                 resp->t = REQ_TYPE_FREE;
395                 resp->result = result;
396                 resp->id = entry->user_req.id;
397                 msg.num_fds = 0;
398                 msg.len_param = sizeof(*resp);
399                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
400
401                 if (rte_mp_sendmsg(&msg))
402                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
403
404                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
405                 free(entry);
406         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
407                         result == REQ_RESULT_SUCCESS) {
408                 struct malloc_heap *heap = entry->alloc_state.heap;
409                 struct rte_mp_msg msg;
410                 struct malloc_mp_req *resp =
411                                 (struct malloc_mp_req *)msg.param;
412
413                 memset(&msg, 0, sizeof(msg));
414
415                 heap->total_size += entry->alloc_state.map_len;
416
417                 /* result is success, so just notify secondary about this */
418                 resp->t = REQ_TYPE_ALLOC;
419                 resp->result = result;
420                 resp->id = entry->user_req.id;
421                 msg.num_fds = 0;
422                 msg.len_param = sizeof(*resp);
423                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
424
425                 if (rte_mp_sendmsg(&msg))
426                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
427
428                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
429                 free(entry->alloc_state.ms);
430                 free(entry);
431         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
432                         result == REQ_RESULT_FAIL) {
433                 struct rte_mp_msg rb_msg;
434                 struct malloc_mp_req *rb =
435                                 (struct malloc_mp_req *)rb_msg.param;
436                 struct timespec ts;
437                 struct primary_alloc_req_state *state =
438                                 &entry->alloc_state;
439                 int ret;
440
441                 memset(&rb_msg, 0, sizeof(rb_msg));
442
443                 /* we've failed to sync, so do a rollback */
444                 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
445                                 state->map_addr, state->map_len);
446
447                 rollback_expand_heap(state->ms, state->ms_len, state->elem,
448                                 state->map_addr, state->map_len);
449
450                 /* send rollback request */
451                 rb_msg.num_fds = 0;
452                 rb_msg.len_param = sizeof(*rb);
453                 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
454
455                 ts.tv_nsec = 0;
456                 ts.tv_sec = MP_TIMEOUT_S;
457
458                 /* sync requests carry no data */
459                 rb->t = REQ_TYPE_SYNC;
460                 rb->id = entry->user_req.id;
461
462                 /* there may be stray timeout still waiting */
463                 do {
464                         ret = rte_mp_request_async(&rb_msg, &ts,
465                                         handle_rollback_response);
466                 } while (ret != 0 && rte_errno == EEXIST);
467                 if (ret != 0) {
468                         RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
469
470                         /* we couldn't send rollback request, but that's OK -
471                          * secondary will time out, and memory has been removed
472                          * from heap anyway.
473                          */
474                         TAILQ_REMOVE(&mp_request_list.list, entry, next);
475                         free(state->ms);
476                         free(entry);
477                         goto fail;
478                 }
479         } else {
480                 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
481                 goto fail;
482         }
483
484         pthread_mutex_unlock(&mp_request_list.lock);
485         return 0;
486 fail:
487         pthread_mutex_unlock(&mp_request_list.lock);
488         return -1;
489 }
490
491 static int
492 handle_rollback_response(const struct rte_mp_msg *request,
493                 const struct rte_mp_reply *reply __rte_unused)
494 {
495         struct rte_mp_msg msg;
496         struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
497         const struct malloc_mp_req *mpreq =
498                         (const struct malloc_mp_req *)request->param;
499         struct mp_request *entry;
500
501         /* lock the request */
502         pthread_mutex_lock(&mp_request_list.lock);
503
504         memset(&msg, 0, sizeof(msg));
505
506         entry = find_request_by_id(mpreq->id);
507         if (entry == NULL) {
508                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
509                 goto fail;
510         }
511
512         if (entry->user_req.t != REQ_TYPE_ALLOC) {
513                 RTE_LOG(ERR, EAL, "Unexpected active request\n");
514                 goto fail;
515         }
516
517         /* we don't care if rollback succeeded, request still failed */
518         resp->t = REQ_TYPE_ALLOC;
519         resp->result = REQ_RESULT_FAIL;
520         resp->id = mpreq->id;
521         msg.num_fds = 0;
522         msg.len_param = sizeof(*resp);
523         strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
524
525         if (rte_mp_sendmsg(&msg))
526                 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
527
528         /* clean up */
529         TAILQ_REMOVE(&mp_request_list.list, entry, next);
530         free(entry->alloc_state.ms);
531         free(entry);
532
533         pthread_mutex_unlock(&mp_request_list.lock);
534         return 0;
535 fail:
536         pthread_mutex_unlock(&mp_request_list.lock);
537         return -1;
538 }
539
540 /* final stage of the request from secondary */
541 static int
542 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
543 {
544         const struct malloc_mp_req *m =
545                         (const struct malloc_mp_req *)msg->param;
546         struct mp_request *entry;
547
548         pthread_mutex_lock(&mp_request_list.lock);
549
550         entry = find_request_by_id(m->id);
551         if (entry != NULL) {
552                 /* update request status */
553                 entry->user_req.result = m->result;
554
555                 entry->state = REQ_STATE_COMPLETE;
556
557                 /* trigger thread wakeup */
558                 pthread_cond_signal(&entry->cond);
559         }
560
561         pthread_mutex_unlock(&mp_request_list.lock);
562
563         return 0;
564 }
565
566 /* synchronously request memory map sync, this is only called whenever primary
567  * process initiates the allocation.
568  */
569 int
570 request_sync(void)
571 {
572         struct rte_mp_msg msg;
573         struct rte_mp_reply reply;
574         struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
575         struct timespec ts;
576         int i, ret;
577
578         memset(&msg, 0, sizeof(msg));
579         memset(&reply, 0, sizeof(reply));
580
581         /* no need to create tailq entries as this is entirely synchronous */
582
583         msg.num_fds = 0;
584         msg.len_param = sizeof(*req);
585         strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
586
587         /* sync request carries no data */
588         req->t = REQ_TYPE_SYNC;
589         req->id = get_unique_id();
590
591         ts.tv_nsec = 0;
592         ts.tv_sec = MP_TIMEOUT_S;
593
594         /* there may be stray timeout still waiting */
595         do {
596                 ret = rte_mp_request_sync(&msg, &reply, &ts);
597         } while (ret != 0 && rte_errno == EEXIST);
598         if (ret != 0) {
599                 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
600                 ret = -1;
601                 goto out;
602         }
603
604         if (reply.nb_received != reply.nb_sent) {
605                 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
606                 ret = -1;
607                 goto out;
608         }
609
610         for (i = 0; i < reply.nb_received; i++) {
611                 struct malloc_mp_req *resp =
612                                 (struct malloc_mp_req *)reply.msgs[i].param;
613                 if (resp->t != REQ_TYPE_SYNC) {
614                         RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
615                         ret = -1;
616                         goto out;
617                 }
618                 if (resp->id != req->id) {
619                         RTE_LOG(ERR, EAL, "Wrong request ID\n");
620                         ret = -1;
621                         goto out;
622                 }
623                 if (resp->result != REQ_RESULT_SUCCESS) {
624                         RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
625                         ret = -1;
626                         goto out;
627                 }
628         }
629
630         ret = 0;
631 out:
632         free(reply.msgs);
633         return ret;
634 }
635
636 /* this is a synchronous wrapper around a bunch of asynchronous requests to
637  * primary process. this will initiate a request and wait until responses come.
638  */
639 int
640 request_to_primary(struct malloc_mp_req *user_req)
641 {
642         struct rte_mp_msg msg;
643         struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
644         struct mp_request *entry;
645         struct timespec ts;
646         struct timeval now;
647         int ret;
648
649         memset(&msg, 0, sizeof(msg));
650         memset(&ts, 0, sizeof(ts));
651
652         pthread_mutex_lock(&mp_request_list.lock);
653
654         entry = malloc(sizeof(*entry));
655         if (entry == NULL) {
656                 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
657                 goto fail;
658         }
659
660         memset(entry, 0, sizeof(*entry));
661
662         if (gettimeofday(&now, NULL) < 0) {
663                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
664                 goto fail;
665         }
666
667         ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
668         ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
669                         (now.tv_usec * 1000) / 1000000000;
670
671         /* initialize the request */
672         pthread_cond_init(&entry->cond, NULL);
673
674         msg.num_fds = 0;
675         msg.len_param = sizeof(*msg_req);
676         strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
677
678         /* (attempt to) get a unique id */
679         user_req->id = get_unique_id();
680
681         /* copy contents of user request into the message */
682         memcpy(msg_req, user_req, sizeof(*msg_req));
683
684         if (rte_mp_sendmsg(&msg)) {
685                 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
686                 goto fail;
687         }
688
689         /* copy contents of user request into active request */
690         memcpy(&entry->user_req, user_req, sizeof(*user_req));
691
692         /* mark request as in progress */
693         entry->state = REQ_STATE_ACTIVE;
694
695         TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
696
697         /* finally, wait on timeout */
698         do {
699                 ret = pthread_cond_timedwait(&entry->cond,
700                                 &mp_request_list.lock, &ts);
701         } while (ret != 0 && ret != ETIMEDOUT);
702
703         if (entry->state != REQ_STATE_COMPLETE) {
704                 RTE_LOG(ERR, EAL, "Request timed out\n");
705                 ret = -1;
706         } else {
707                 ret = 0;
708                 user_req->result = entry->user_req.result;
709         }
710         TAILQ_REMOVE(&mp_request_list.list, entry, next);
711         free(entry);
712
713         pthread_mutex_unlock(&mp_request_list.lock);
714         return ret;
715 fail:
716         pthread_mutex_unlock(&mp_request_list.lock);
717         free(entry);
718         return -1;
719 }
720
721 int
722 register_mp_requests(void)
723 {
724         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
725                 /* it's OK for primary to not support IPC */
726                 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
727                                 rte_errno != ENOTSUP) {
728                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
729                                 MP_ACTION_REQUEST);
730                         return -1;
731                 }
732         } else {
733                 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
734                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
735                                 MP_ACTION_SYNC);
736                         return -1;
737                 }
738                 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
739                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
740                                 MP_ACTION_SYNC);
741                         return -1;
742                 }
743                 if (rte_mp_action_register(MP_ACTION_RESPONSE,
744                                 handle_response)) {
745                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
746                                 MP_ACTION_RESPONSE);
747                         return -1;
748                 }
749         }
750         return 0;
751 }