malloc: support multiprocess memory hotplug
[dpdk.git] / lib / librte_eal / common / malloc_mp.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4
5 #include <string.h>
6 #include <sys/time.h>
7
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10
11 #include "eal_memalloc.h"
12
13 #include "malloc_elem.h"
14 #include "malloc_mp.h"
15
16 #define MP_ACTION_SYNC "mp_malloc_sync"
17 /**< request sent by primary process to notify of changes in memory map */
18 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
19 /**< request sent by primary process to notify of changes in memory map. this is
20  * essentially a regular sync request, but we cannot send sync requests while
21  * another one is in progress, and we might have to - therefore, we do this as
22  * a separate callback.
23  */
24 #define MP_ACTION_REQUEST "mp_malloc_request"
25 /**< request sent by secondary process to ask for allocation/deallocation */
26 #define MP_ACTION_RESPONSE "mp_malloc_response"
27 /**< response sent to secondary process to indicate result of request */
28
29 /* forward declarations */
30 static int
31 handle_sync_response(const struct rte_mp_msg *request,
32                 const struct rte_mp_reply *reply);
33 static int
34 handle_rollback_response(const struct rte_mp_msg *request,
35                 const struct rte_mp_reply *reply);
36
37 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
38
39 /* when we're allocating, we need to store some state to ensure that we can
40  * roll back later
41  */
42 struct primary_alloc_req_state {
43         struct malloc_heap *heap;
44         struct rte_memseg **ms;
45         int ms_len;
46         struct malloc_elem *elem;
47         void *map_addr;
48         size_t map_len;
49 };
50
51 enum req_state {
52         REQ_STATE_INACTIVE = 0,
53         REQ_STATE_ACTIVE,
54         REQ_STATE_COMPLETE
55 };
56
57 struct mp_request {
58         TAILQ_ENTRY(mp_request) next;
59         struct malloc_mp_req user_req; /**< contents of request */
60         pthread_cond_t cond; /**< variable we use to time out on this request */
61         enum req_state state; /**< indicate status of this request */
62         struct primary_alloc_req_state alloc_state;
63 };
64
65 /*
66  * We could've used just a single request, but it may be possible for
67  * secondaries to timeout earlier than the primary, and send a new request while
68  * primary is still expecting replies to the old one. Therefore, each new
69  * request will get assigned a new ID, which is how we will distinguish between
70  * expected and unexpected messages.
71  */
72 TAILQ_HEAD(mp_request_list, mp_request);
73 static struct {
74         struct mp_request_list list;
75         pthread_mutex_t lock;
76 } mp_request_list = {
77         .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
78         .lock = PTHREAD_MUTEX_INITIALIZER
79 };
80
81 /**
82  * General workflow is the following:
83  *
84  * Allocation:
85  * S: send request to primary
86  * P: attempt to allocate memory
87  *    if failed, sendmsg failure
88  *    if success, send sync request
89  * S: if received msg of failure, quit
90  *    if received sync request, synchronize memory map and reply with result
91  * P: if received sync request result
92  *    if success, sendmsg success
93  *    if failure, roll back allocation and send a rollback request
94  * S: if received msg of success, quit
95  *    if received rollback request, synchronize memory map and reply with result
96  * P: if received sync request result
97  *    sendmsg sync request result
98  * S: if received msg, quit
99  *
100  * Aside from timeouts, there are three points where we can quit:
101  *  - if allocation failed straight away
102  *  - if allocation and sync request succeeded
103  *  - if allocation succeeded, sync request failed, allocation rolled back and
104  *    rollback request received (irrespective of whether it succeeded or failed)
105  *
106  * Deallocation:
107  * S: send request to primary
108  * P: attempt to deallocate memory
109  *    if failed, sendmsg failure
110  *    if success, send sync request
111  * S: if received msg of failure, quit
112  *    if received sync request, synchronize memory map and reply with result
113  * P: if received sync request result
114  *    sendmsg sync request result
115  * S: if received msg, quit
116  *
117  * There is no "rollback" from deallocation, as it's safe to have some memory
118  * mapped in some processes - it's absent from the heap, so it won't get used.
119  */
120
121 static struct mp_request *
122 find_request_by_id(uint64_t id)
123 {
124         struct mp_request *req;
125         TAILQ_FOREACH(req, &mp_request_list.list, next) {
126                 if (req->user_req.id == id)
127                         break;
128         }
129         return req;
130 }
131
132 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
133 static uint64_t
134 get_unique_id(void)
135 {
136         uint64_t id;
137         do {
138                 id = rte_rand();
139         } while (find_request_by_id(id) != NULL);
140         return id;
141 }
142
143 /* secondary will respond to sync requests thusly */
144 static int
145 handle_sync(const struct rte_mp_msg *msg, const void *peer)
146 {
147         struct rte_mp_msg reply;
148         const struct malloc_mp_req *req =
149                         (const struct malloc_mp_req *)msg->param;
150         struct malloc_mp_req *resp =
151                         (struct malloc_mp_req *)reply.param;
152         int ret;
153
154         if (req->t != REQ_TYPE_SYNC) {
155                 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
156                 return -1;
157         }
158
159         memset(&reply, 0, sizeof(reply));
160
161         reply.num_fds = 0;
162         snprintf(reply.name, sizeof(reply.name), "%s", msg->name);
163         reply.len_param = sizeof(*resp);
164
165         ret = eal_memalloc_sync_with_primary();
166
167         resp->t = REQ_TYPE_SYNC;
168         resp->id = req->id;
169         resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
170
171         rte_mp_reply(&reply, peer);
172
173         return 0;
174 }
175
176 static int
177 handle_alloc_request(const struct malloc_mp_req *m,
178                 struct mp_request *req)
179 {
180         const struct malloc_req_alloc *ar = &m->alloc_req;
181         struct malloc_heap *heap;
182         struct malloc_elem *elem;
183         struct rte_memseg **ms;
184         size_t alloc_sz;
185         int n_segs;
186         void *map_addr;
187
188         alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
189                         MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
190         n_segs = alloc_sz / ar->page_sz;
191
192         heap = ar->heap;
193
194         /* we can't know in advance how many pages we'll need, so we malloc */
195         ms = malloc(sizeof(*ms) * n_segs);
196
197         memset(ms, 0, sizeof(*ms) * n_segs);
198
199         if (ms == NULL) {
200                 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
201                 goto fail;
202         }
203
204         elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
205                         ar->flags, ar->align, ar->bound, ar->contig, ms,
206                         n_segs);
207
208         if (elem == NULL)
209                 goto fail;
210
211         map_addr = ms[0]->addr;
212
213         /* we have succeeded in allocating memory, but we still need to sync
214          * with other processes. however, since DPDK IPC is single-threaded, we
215          * send an asynchronous request and exit this callback.
216          */
217
218         req->alloc_state.ms = ms;
219         req->alloc_state.ms_len = n_segs;
220         req->alloc_state.map_addr = map_addr;
221         req->alloc_state.map_len = alloc_sz;
222         req->alloc_state.elem = elem;
223         req->alloc_state.heap = heap;
224
225         return 0;
226 fail:
227         free(ms);
228         return -1;
229 }
230
231 /* first stage of primary handling requests from secondary */
232 static int
233 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
234 {
235         const struct malloc_mp_req *m =
236                         (const struct malloc_mp_req *)msg->param;
237         struct mp_request *entry;
238         int ret;
239
240         /* lock access to request */
241         pthread_mutex_lock(&mp_request_list.lock);
242
243         /* make sure it's not a dupe */
244         entry = find_request_by_id(m->id);
245         if (entry != NULL) {
246                 RTE_LOG(ERR, EAL, "Duplicate request id\n");
247                 goto fail;
248         }
249
250         entry = malloc(sizeof(*entry));
251         if (entry == NULL) {
252                 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
253                 goto fail;
254         }
255
256         /* erase all data */
257         memset(entry, 0, sizeof(*entry));
258
259         if (m->t == REQ_TYPE_ALLOC) {
260                 ret = handle_alloc_request(m, entry);
261         } else if (m->t == REQ_TYPE_FREE) {
262                 ret = malloc_heap_free_pages(m->free_req.addr,
263                                 m->free_req.len);
264         } else {
265                 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
266                 goto fail;
267         }
268
269         if (ret != 0) {
270                 struct rte_mp_msg resp_msg;
271                 struct malloc_mp_req *resp =
272                                 (struct malloc_mp_req *)resp_msg.param;
273
274                 /* send failure message straight away */
275                 resp_msg.num_fds = 0;
276                 resp_msg.len_param = sizeof(*resp);
277                 snprintf(resp_msg.name, sizeof(resp_msg.name), "%s",
278                                 MP_ACTION_RESPONSE);
279
280                 resp->t = m->t;
281                 resp->result = REQ_RESULT_FAIL;
282                 resp->id = m->id;
283
284                 if (rte_mp_sendmsg(&resp_msg)) {
285                         RTE_LOG(ERR, EAL, "Couldn't send response\n");
286                         goto fail;
287                 }
288                 /* we did not modify the request */
289                 free(entry);
290         } else {
291                 struct rte_mp_msg sr_msg;
292                 struct malloc_mp_req *sr =
293                                 (struct malloc_mp_req *)sr_msg.param;
294                 struct timespec ts;
295
296                 memset(&sr_msg, 0, sizeof(sr_msg));
297
298                 /* we can do something, so send sync request asynchronously */
299                 sr_msg.num_fds = 0;
300                 sr_msg.len_param = sizeof(*sr);
301                 snprintf(sr_msg.name, sizeof(sr_msg.name), "%s",
302                                 MP_ACTION_SYNC);
303
304                 ts.tv_nsec = 0;
305                 ts.tv_sec = MP_TIMEOUT_S;
306
307                 /* sync requests carry no data */
308                 sr->t = REQ_TYPE_SYNC;
309                 sr->id = m->id;
310
311                 /* there may be stray timeout still waiting */
312                 do {
313                         ret = rte_mp_request_async(&sr_msg, &ts,
314                                         handle_sync_response);
315                 } while (ret != 0 && rte_errno == EEXIST);
316                 if (ret != 0) {
317                         RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
318                         if (m->t == REQ_TYPE_ALLOC)
319                                 free(entry->alloc_state.ms);
320                         goto fail;
321                 }
322
323                 /* mark request as in progress */
324                 memcpy(&entry->user_req, m, sizeof(*m));
325                 entry->state = REQ_STATE_ACTIVE;
326
327                 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
328         }
329         pthread_mutex_unlock(&mp_request_list.lock);
330         return 0;
331 fail:
332         pthread_mutex_unlock(&mp_request_list.lock);
333         free(entry);
334         return -1;
335 }
336
337 /* callback for asynchronous sync requests for primary. this will either do a
338  * sendmsg with results, or trigger rollback request.
339  */
340 static int
341 handle_sync_response(const struct rte_mp_msg *request,
342                 const struct rte_mp_reply *reply)
343 {
344         enum malloc_req_result result;
345         struct mp_request *entry;
346         const struct malloc_mp_req *mpreq =
347                         (const struct malloc_mp_req *)request->param;
348         int i;
349
350         /* lock the request */
351         pthread_mutex_lock(&mp_request_list.lock);
352
353         entry = find_request_by_id(mpreq->id);
354         if (entry == NULL) {
355                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
356                 goto fail;
357         }
358
359         result = REQ_RESULT_SUCCESS;
360
361         if (reply->nb_received != reply->nb_sent)
362                 result = REQ_RESULT_FAIL;
363
364         for (i = 0; i < reply->nb_received; i++) {
365                 struct malloc_mp_req *resp =
366                                 (struct malloc_mp_req *)reply->msgs[i].param;
367
368                 if (resp->t != REQ_TYPE_SYNC) {
369                         RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
370                         result = REQ_RESULT_FAIL;
371                         break;
372                 }
373                 if (resp->id != entry->user_req.id) {
374                         RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
375                         result = REQ_RESULT_FAIL;
376                         break;
377                 }
378                 if (resp->result == REQ_RESULT_FAIL) {
379                         result = REQ_RESULT_FAIL;
380                         break;
381                 }
382         }
383
384         if (entry->user_req.t == REQ_TYPE_FREE) {
385                 struct rte_mp_msg msg;
386                 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
387
388                 memset(&msg, 0, sizeof(msg));
389
390                 /* this is a free request, just sendmsg result */
391                 resp->t = REQ_TYPE_FREE;
392                 resp->result = result;
393                 resp->id = entry->user_req.id;
394                 msg.num_fds = 0;
395                 msg.len_param = sizeof(*resp);
396                 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
397
398                 if (rte_mp_sendmsg(&msg))
399                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
400
401                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
402                 free(entry);
403         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
404                         result == REQ_RESULT_SUCCESS) {
405                 struct malloc_heap *heap = entry->alloc_state.heap;
406                 struct rte_mp_msg msg;
407                 struct malloc_mp_req *resp =
408                                 (struct malloc_mp_req *)msg.param;
409
410                 memset(&msg, 0, sizeof(msg));
411
412                 heap->total_size += entry->alloc_state.map_len;
413
414                 /* result is success, so just notify secondary about this */
415                 resp->t = REQ_TYPE_ALLOC;
416                 resp->result = result;
417                 resp->id = entry->user_req.id;
418                 msg.num_fds = 0;
419                 msg.len_param = sizeof(*resp);
420                 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
421
422                 if (rte_mp_sendmsg(&msg))
423                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
424
425                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
426                 free(entry->alloc_state.ms);
427                 free(entry);
428         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
429                         result == REQ_RESULT_FAIL) {
430                 struct rte_mp_msg rb_msg;
431                 struct malloc_mp_req *rb =
432                                 (struct malloc_mp_req *)rb_msg.param;
433                 struct timespec ts;
434                 struct primary_alloc_req_state *state =
435                                 &entry->alloc_state;
436                 int ret;
437
438                 memset(&rb_msg, 0, sizeof(rb_msg));
439
440                 /* we've failed to sync, so do a rollback */
441                 rollback_expand_heap(state->ms, state->ms_len, state->elem,
442                                 state->map_addr, state->map_len);
443
444                 /* send rollback request */
445                 rb_msg.num_fds = 0;
446                 rb_msg.len_param = sizeof(*rb);
447                 snprintf(rb_msg.name, sizeof(rb_msg.name), "%s",
448                                 MP_ACTION_ROLLBACK);
449
450                 ts.tv_nsec = 0;
451                 ts.tv_sec = MP_TIMEOUT_S;
452
453                 /* sync requests carry no data */
454                 rb->t = REQ_TYPE_SYNC;
455                 rb->id = entry->user_req.id;
456
457                 /* there may be stray timeout still waiting */
458                 do {
459                         ret = rte_mp_request_async(&rb_msg, &ts,
460                                         handle_rollback_response);
461                 } while (ret != 0 && rte_errno == EEXIST);
462                 if (ret != 0) {
463                         RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
464
465                         /* we couldn't send rollback request, but that's OK -
466                          * secondary will time out, and memory has been removed
467                          * from heap anyway.
468                          */
469                         TAILQ_REMOVE(&mp_request_list.list, entry, next);
470                         free(state->ms);
471                         free(entry);
472                         goto fail;
473                 }
474         } else {
475                 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
476                 goto fail;
477         }
478
479         pthread_mutex_unlock(&mp_request_list.lock);
480         return 0;
481 fail:
482         pthread_mutex_unlock(&mp_request_list.lock);
483         return -1;
484 }
485
486 static int
487 handle_rollback_response(const struct rte_mp_msg *request,
488                 const struct rte_mp_reply *reply __rte_unused)
489 {
490         struct rte_mp_msg msg;
491         struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
492         const struct malloc_mp_req *mpreq =
493                         (const struct malloc_mp_req *)request->param;
494         struct mp_request *entry;
495
496         /* lock the request */
497         pthread_mutex_lock(&mp_request_list.lock);
498
499         memset(&msg, 0, sizeof(0));
500
501         entry = find_request_by_id(mpreq->id);
502         if (entry == NULL) {
503                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
504                 goto fail;
505         }
506
507         if (entry->user_req.t != REQ_TYPE_ALLOC) {
508                 RTE_LOG(ERR, EAL, "Unexpected active request\n");
509                 goto fail;
510         }
511
512         /* we don't care if rollback succeeded, request still failed */
513         resp->t = REQ_TYPE_ALLOC;
514         resp->result = REQ_RESULT_FAIL;
515         resp->id = mpreq->id;
516         msg.num_fds = 0;
517         msg.len_param = sizeof(*resp);
518         snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
519
520         if (rte_mp_sendmsg(&msg))
521                 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
522
523         /* clean up */
524         TAILQ_REMOVE(&mp_request_list.list, entry, next);
525         free(entry->alloc_state.ms);
526         free(entry);
527
528         pthread_mutex_unlock(&mp_request_list.lock);
529         return 0;
530 fail:
531         pthread_mutex_unlock(&mp_request_list.lock);
532         return -1;
533 }
534
535 /* final stage of the request from secondary */
536 static int
537 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
538 {
539         const struct malloc_mp_req *m =
540                         (const struct malloc_mp_req *)msg->param;
541         struct mp_request *entry;
542
543         pthread_mutex_lock(&mp_request_list.lock);
544
545         entry = find_request_by_id(m->id);
546         if (entry != NULL) {
547                 /* update request status */
548                 entry->user_req.result = m->result;
549
550                 entry->state = REQ_STATE_COMPLETE;
551
552                 /* trigger thread wakeup */
553                 pthread_cond_signal(&entry->cond);
554         }
555
556         pthread_mutex_unlock(&mp_request_list.lock);
557
558         return 0;
559 }
560
561 /* synchronously request memory map sync, this is only called whenever primary
562  * process initiates the allocation.
563  */
564 int
565 request_sync(void)
566 {
567         struct rte_mp_msg msg;
568         struct rte_mp_reply reply;
569         struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
570         struct timespec ts;
571         int i, ret;
572
573         memset(&msg, 0, sizeof(msg));
574         memset(&reply, 0, sizeof(reply));
575
576         /* no need to create tailq entries as this is entirely synchronous */
577
578         msg.num_fds = 0;
579         msg.len_param = sizeof(*req);
580         snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_SYNC);
581
582         /* sync request carries no data */
583         req->t = REQ_TYPE_SYNC;
584         req->id = get_unique_id();
585
586         ts.tv_nsec = 0;
587         ts.tv_sec = MP_TIMEOUT_S;
588
589         /* there may be stray timeout still waiting */
590         do {
591                 ret = rte_mp_request_sync(&msg, &reply, &ts);
592         } while (ret != 0 && rte_errno == EEXIST);
593         if (ret != 0) {
594                 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
595                 ret = -1;
596                 goto out;
597         }
598
599         if (reply.nb_received != reply.nb_sent) {
600                 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
601                 ret = -1;
602                 goto out;
603         }
604
605         for (i = 0; i < reply.nb_received; i++) {
606                 struct malloc_mp_req *resp =
607                                 (struct malloc_mp_req *)reply.msgs[i].param;
608                 if (resp->t != REQ_TYPE_SYNC) {
609                         RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
610                         ret = -1;
611                         goto out;
612                 }
613                 if (resp->id != req->id) {
614                         RTE_LOG(ERR, EAL, "Wrong request ID\n");
615                         ret = -1;
616                         goto out;
617                 }
618                 if (resp->result != REQ_RESULT_SUCCESS) {
619                         RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
620                         ret = -1;
621                         goto out;
622                 }
623         }
624
625         ret = 0;
626 out:
627         free(reply.msgs);
628         return ret;
629 }
630
631 /* this is a synchronous wrapper around a bunch of asynchronous requests to
632  * primary process. this will initiate a request and wait until responses come.
633  */
634 int
635 request_to_primary(struct malloc_mp_req *user_req)
636 {
637         struct rte_mp_msg msg;
638         struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
639         struct mp_request *entry;
640         struct timespec ts;
641         struct timeval now;
642         int ret;
643
644         memset(&msg, 0, sizeof(msg));
645         memset(&ts, 0, sizeof(ts));
646
647         pthread_mutex_lock(&mp_request_list.lock);
648
649         entry = malloc(sizeof(*entry));
650         if (entry == NULL) {
651                 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
652                 goto fail;
653         }
654
655         memset(entry, 0, sizeof(*entry));
656
657         if (gettimeofday(&now, NULL) < 0) {
658                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
659                 goto fail;
660         }
661
662         ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
663         ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
664                         (now.tv_usec * 1000) / 1000000000;
665
666         /* initialize the request */
667         pthread_cond_init(&entry->cond, NULL);
668
669         msg.num_fds = 0;
670         msg.len_param = sizeof(*msg_req);
671         snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_REQUEST);
672
673         /* (attempt to) get a unique id */
674         user_req->id = get_unique_id();
675
676         /* copy contents of user request into the message */
677         memcpy(msg_req, user_req, sizeof(*msg_req));
678
679         if (rte_mp_sendmsg(&msg)) {
680                 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
681                 goto fail;
682         }
683
684         /* copy contents of user request into active request */
685         memcpy(&entry->user_req, user_req, sizeof(*user_req));
686
687         /* mark request as in progress */
688         entry->state = REQ_STATE_ACTIVE;
689
690         TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
691
692         /* finally, wait on timeout */
693         do {
694                 ret = pthread_cond_timedwait(&entry->cond,
695                                 &mp_request_list.lock, &ts);
696         } while (ret != 0 && ret != ETIMEDOUT);
697
698         if (entry->state != REQ_STATE_COMPLETE) {
699                 RTE_LOG(ERR, EAL, "Request timed out\n");
700                 ret = -1;
701         } else {
702                 ret = 0;
703                 user_req->result = entry->user_req.result;
704         }
705         TAILQ_REMOVE(&mp_request_list.list, entry, next);
706         free(entry);
707
708         pthread_mutex_unlock(&mp_request_list.lock);
709         return ret;
710 fail:
711         pthread_mutex_unlock(&mp_request_list.lock);
712         free(entry);
713         return -1;
714 }
715
716 int
717 register_mp_requests(void)
718 {
719         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
720                 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request)) {
721                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
722                                 MP_ACTION_REQUEST);
723                         return -1;
724                 }
725         } else {
726                 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
727                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
728                                 MP_ACTION_SYNC);
729                         return -1;
730                 }
731                 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
732                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
733                                 MP_ACTION_SYNC);
734                         return -1;
735                 }
736                 if (rte_mp_action_register(MP_ACTION_RESPONSE,
737                                 handle_response)) {
738                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
739                                 MP_ACTION_RESPONSE);
740                         return -1;
741                 }
742         }
743         return 0;
744 }