doc: add Meson coding style to contributors guide
[dpdk.git] / lib / librte_eal / common / malloc_mp.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4
5 #include <string.h>
6 #include <sys/time.h>
7
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 #include "eal_private.h"
15
16 #include "malloc_elem.h"
17 #include "malloc_mp.h"
18
19 #define MP_ACTION_SYNC "mp_malloc_sync"
20 /**< request sent by primary process to notify of changes in memory map */
21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
22 /**< request sent by primary process to notify of changes in memory map. this is
23  * essentially a regular sync request, but we cannot send sync requests while
24  * another one is in progress, and we might have to - therefore, we do this as
25  * a separate callback.
26  */
27 #define MP_ACTION_REQUEST "mp_malloc_request"
28 /**< request sent by secondary process to ask for allocation/deallocation */
29 #define MP_ACTION_RESPONSE "mp_malloc_response"
30 /**< response sent to secondary process to indicate result of request */
31
32 /* forward declarations */
33 static int
34 handle_sync_response(const struct rte_mp_msg *request,
35                 const struct rte_mp_reply *reply);
36 static int
37 handle_rollback_response(const struct rte_mp_msg *request,
38                 const struct rte_mp_reply *reply);
39
40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
41
42 /* when we're allocating, we need to store some state to ensure that we can
43  * roll back later
44  */
45 struct primary_alloc_req_state {
46         struct malloc_heap *heap;
47         struct rte_memseg **ms;
48         int ms_len;
49         struct malloc_elem *elem;
50         void *map_addr;
51         size_t map_len;
52 };
53
54 enum req_state {
55         REQ_STATE_INACTIVE = 0,
56         REQ_STATE_ACTIVE,
57         REQ_STATE_COMPLETE
58 };
59
60 struct mp_request {
61         TAILQ_ENTRY(mp_request) next;
62         struct malloc_mp_req user_req; /**< contents of request */
63         pthread_cond_t cond; /**< variable we use to time out on this request */
64         enum req_state state; /**< indicate status of this request */
65         struct primary_alloc_req_state alloc_state;
66 };
67
68 /*
69  * We could've used just a single request, but it may be possible for
70  * secondaries to timeout earlier than the primary, and send a new request while
71  * primary is still expecting replies to the old one. Therefore, each new
72  * request will get assigned a new ID, which is how we will distinguish between
73  * expected and unexpected messages.
74  */
75 TAILQ_HEAD(mp_request_list, mp_request);
76 static struct {
77         struct mp_request_list list;
78         pthread_mutex_t lock;
79 } mp_request_list = {
80         .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
81         .lock = PTHREAD_MUTEX_INITIALIZER
82 };
83
84 /**
85  * General workflow is the following:
86  *
87  * Allocation:
88  * S: send request to primary
89  * P: attempt to allocate memory
90  *    if failed, sendmsg failure
91  *    if success, send sync request
92  * S: if received msg of failure, quit
93  *    if received sync request, synchronize memory map and reply with result
94  * P: if received sync request result
95  *    if success, sendmsg success
96  *    if failure, roll back allocation and send a rollback request
97  * S: if received msg of success, quit
98  *    if received rollback request, synchronize memory map and reply with result
99  * P: if received sync request result
100  *    sendmsg sync request result
101  * S: if received msg, quit
102  *
103  * Aside from timeouts, there are three points where we can quit:
104  *  - if allocation failed straight away
105  *  - if allocation and sync request succeeded
106  *  - if allocation succeeded, sync request failed, allocation rolled back and
107  *    rollback request received (irrespective of whether it succeeded or failed)
108  *
109  * Deallocation:
110  * S: send request to primary
111  * P: attempt to deallocate memory
112  *    if failed, sendmsg failure
113  *    if success, send sync request
114  * S: if received msg of failure, quit
115  *    if received sync request, synchronize memory map and reply with result
116  * P: if received sync request result
117  *    sendmsg sync request result
118  * S: if received msg, quit
119  *
120  * There is no "rollback" from deallocation, as it's safe to have some memory
121  * mapped in some processes - it's absent from the heap, so it won't get used.
122  */
123
124 static struct mp_request *
125 find_request_by_id(uint64_t id)
126 {
127         struct mp_request *req;
128         TAILQ_FOREACH(req, &mp_request_list.list, next) {
129                 if (req->user_req.id == id)
130                         break;
131         }
132         return req;
133 }
134
135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
136 static uint64_t
137 get_unique_id(void)
138 {
139         uint64_t id;
140         do {
141                 id = rte_rand();
142         } while (find_request_by_id(id) != NULL);
143         return id;
144 }
145
146 /* secondary will respond to sync requests thusly */
147 static int
148 handle_sync(const struct rte_mp_msg *msg, const void *peer)
149 {
150         struct rte_mp_msg reply;
151         const struct malloc_mp_req *req =
152                         (const struct malloc_mp_req *)msg->param;
153         struct malloc_mp_req *resp =
154                         (struct malloc_mp_req *)reply.param;
155         int ret;
156
157         if (req->t != REQ_TYPE_SYNC) {
158                 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
159                 return -1;
160         }
161
162         memset(&reply, 0, sizeof(reply));
163
164         reply.num_fds = 0;
165         strlcpy(reply.name, msg->name, sizeof(reply.name));
166         reply.len_param = sizeof(*resp);
167
168         ret = eal_memalloc_sync_with_primary();
169
170         resp->t = REQ_TYPE_SYNC;
171         resp->id = req->id;
172         resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
173
174         rte_mp_reply(&reply, peer);
175
176         return 0;
177 }
178
179 static int
180 handle_free_request(const struct malloc_mp_req *m)
181 {
182         const struct rte_memseg_list *msl;
183         void *start, *end;
184         size_t len;
185
186         len = m->free_req.len;
187         start = m->free_req.addr;
188         end = RTE_PTR_ADD(start, len - 1);
189
190         /* check if the requested memory actually exists */
191         msl = rte_mem_virt2memseg_list(start);
192         if (msl == NULL) {
193                 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
194                 return -1;
195         }
196
197         /* check if end is within the same memory region */
198         if (rte_mem_virt2memseg_list(end) != msl) {
199                 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
200                 return -1;
201         }
202
203         /* we're supposed to only free memory that's not external */
204         if (msl->external) {
205                 RTE_LOG(ERR, EAL, "Requested to free external memory\n");
206                 return -1;
207         }
208
209         /* now that we've validated the request, announce it */
210         eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
211                         m->free_req.addr, m->free_req.len);
212
213         /* now, do the actual freeing */
214         return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
215 }
216
217 static int
218 handle_alloc_request(const struct malloc_mp_req *m,
219                 struct mp_request *req)
220 {
221         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
222         const struct malloc_req_alloc *ar = &m->alloc_req;
223         struct malloc_heap *heap;
224         struct malloc_elem *elem;
225         struct rte_memseg **ms;
226         size_t alloc_sz;
227         int n_segs;
228         void *map_addr;
229
230         /* this is checked by the API, but we need to prevent divide by zero */
231         if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
232                 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
233                 return -1;
234         }
235
236         /* heap idx is index into the heap array, not socket ID */
237         if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
238                 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
239                 return -1;
240         }
241
242         heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
243
244         /*
245          * for allocations, we must only use internal heaps, but since the
246          * rte_malloc_heap_socket_is_external() is thread-safe and we're already
247          * read-locked, we'll have to take advantage of the fact that internal
248          * socket ID's are always lower than RTE_MAX_NUMA_NODES.
249          */
250         if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
251                 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
252                 return -1;
253         }
254
255         alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
256                         MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
257         n_segs = alloc_sz / ar->page_sz;
258
259         /* we can't know in advance how many pages we'll need, so we malloc */
260         ms = malloc(sizeof(*ms) * n_segs);
261         if (ms == NULL) {
262                 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
263                 return -1;
264         }
265         memset(ms, 0, sizeof(*ms) * n_segs);
266
267         elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
268                         ar->flags, ar->align, ar->bound, ar->contig, ms,
269                         n_segs);
270
271         if (elem == NULL)
272                 goto fail;
273
274         map_addr = ms[0]->addr;
275
276         eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
277
278         /* we have succeeded in allocating memory, but we still need to sync
279          * with other processes. however, since DPDK IPC is single-threaded, we
280          * send an asynchronous request and exit this callback.
281          */
282
283         req->alloc_state.ms = ms;
284         req->alloc_state.ms_len = n_segs;
285         req->alloc_state.map_addr = map_addr;
286         req->alloc_state.map_len = alloc_sz;
287         req->alloc_state.elem = elem;
288         req->alloc_state.heap = heap;
289
290         return 0;
291 fail:
292         free(ms);
293         return -1;
294 }
295
296 /* first stage of primary handling requests from secondary */
297 static int
298 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
299 {
300         const struct malloc_mp_req *m =
301                         (const struct malloc_mp_req *)msg->param;
302         struct mp_request *entry;
303         int ret;
304
305         /* lock access to request */
306         pthread_mutex_lock(&mp_request_list.lock);
307
308         /* make sure it's not a dupe */
309         entry = find_request_by_id(m->id);
310         if (entry != NULL) {
311                 RTE_LOG(ERR, EAL, "Duplicate request id\n");
312                 goto fail;
313         }
314
315         entry = malloc(sizeof(*entry));
316         if (entry == NULL) {
317                 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
318                 goto fail;
319         }
320
321         /* erase all data */
322         memset(entry, 0, sizeof(*entry));
323
324         if (m->t == REQ_TYPE_ALLOC) {
325                 ret = handle_alloc_request(m, entry);
326         } else if (m->t == REQ_TYPE_FREE) {
327                 ret = handle_free_request(m);
328         } else {
329                 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
330                 goto fail;
331         }
332
333         if (ret != 0) {
334                 struct rte_mp_msg resp_msg;
335                 struct malloc_mp_req *resp =
336                                 (struct malloc_mp_req *)resp_msg.param;
337
338                 /* send failure message straight away */
339                 resp_msg.num_fds = 0;
340                 resp_msg.len_param = sizeof(*resp);
341                 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
342                                 sizeof(resp_msg.name));
343
344                 resp->t = m->t;
345                 resp->result = REQ_RESULT_FAIL;
346                 resp->id = m->id;
347
348                 if (rte_mp_sendmsg(&resp_msg)) {
349                         RTE_LOG(ERR, EAL, "Couldn't send response\n");
350                         goto fail;
351                 }
352                 /* we did not modify the request */
353                 free(entry);
354         } else {
355                 struct rte_mp_msg sr_msg;
356                 struct malloc_mp_req *sr =
357                                 (struct malloc_mp_req *)sr_msg.param;
358                 struct timespec ts;
359
360                 memset(&sr_msg, 0, sizeof(sr_msg));
361
362                 /* we can do something, so send sync request asynchronously */
363                 sr_msg.num_fds = 0;
364                 sr_msg.len_param = sizeof(*sr);
365                 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
366
367                 ts.tv_nsec = 0;
368                 ts.tv_sec = MP_TIMEOUT_S;
369
370                 /* sync requests carry no data */
371                 sr->t = REQ_TYPE_SYNC;
372                 sr->id = m->id;
373
374                 /* there may be stray timeout still waiting */
375                 do {
376                         ret = rte_mp_request_async(&sr_msg, &ts,
377                                         handle_sync_response);
378                 } while (ret != 0 && rte_errno == EEXIST);
379                 if (ret != 0) {
380                         RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
381                         if (m->t == REQ_TYPE_ALLOC)
382                                 free(entry->alloc_state.ms);
383                         goto fail;
384                 }
385
386                 /* mark request as in progress */
387                 memcpy(&entry->user_req, m, sizeof(*m));
388                 entry->state = REQ_STATE_ACTIVE;
389
390                 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
391         }
392         pthread_mutex_unlock(&mp_request_list.lock);
393         return 0;
394 fail:
395         pthread_mutex_unlock(&mp_request_list.lock);
396         free(entry);
397         return -1;
398 }
399
400 /* callback for asynchronous sync requests for primary. this will either do a
401  * sendmsg with results, or trigger rollback request.
402  */
403 static int
404 handle_sync_response(const struct rte_mp_msg *request,
405                 const struct rte_mp_reply *reply)
406 {
407         enum malloc_req_result result;
408         struct mp_request *entry;
409         const struct malloc_mp_req *mpreq =
410                         (const struct malloc_mp_req *)request->param;
411         int i;
412
413         /* lock the request */
414         pthread_mutex_lock(&mp_request_list.lock);
415
416         entry = find_request_by_id(mpreq->id);
417         if (entry == NULL) {
418                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
419                 goto fail;
420         }
421
422         result = REQ_RESULT_SUCCESS;
423
424         if (reply->nb_received != reply->nb_sent)
425                 result = REQ_RESULT_FAIL;
426
427         for (i = 0; i < reply->nb_received; i++) {
428                 struct malloc_mp_req *resp =
429                                 (struct malloc_mp_req *)reply->msgs[i].param;
430
431                 if (resp->t != REQ_TYPE_SYNC) {
432                         RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
433                         result = REQ_RESULT_FAIL;
434                         break;
435                 }
436                 if (resp->id != entry->user_req.id) {
437                         RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
438                         result = REQ_RESULT_FAIL;
439                         break;
440                 }
441                 if (resp->result == REQ_RESULT_FAIL) {
442                         result = REQ_RESULT_FAIL;
443                         break;
444                 }
445         }
446
447         if (entry->user_req.t == REQ_TYPE_FREE) {
448                 struct rte_mp_msg msg;
449                 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
450
451                 memset(&msg, 0, sizeof(msg));
452
453                 /* this is a free request, just sendmsg result */
454                 resp->t = REQ_TYPE_FREE;
455                 resp->result = result;
456                 resp->id = entry->user_req.id;
457                 msg.num_fds = 0;
458                 msg.len_param = sizeof(*resp);
459                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
460
461                 if (rte_mp_sendmsg(&msg))
462                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
463
464                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
465                 free(entry);
466         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
467                         result == REQ_RESULT_SUCCESS) {
468                 struct malloc_heap *heap = entry->alloc_state.heap;
469                 struct rte_mp_msg msg;
470                 struct malloc_mp_req *resp =
471                                 (struct malloc_mp_req *)msg.param;
472
473                 memset(&msg, 0, sizeof(msg));
474
475                 heap->total_size += entry->alloc_state.map_len;
476
477                 /* result is success, so just notify secondary about this */
478                 resp->t = REQ_TYPE_ALLOC;
479                 resp->result = result;
480                 resp->id = entry->user_req.id;
481                 msg.num_fds = 0;
482                 msg.len_param = sizeof(*resp);
483                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
484
485                 if (rte_mp_sendmsg(&msg))
486                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
487
488                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
489                 free(entry->alloc_state.ms);
490                 free(entry);
491         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
492                         result == REQ_RESULT_FAIL) {
493                 struct rte_mp_msg rb_msg;
494                 struct malloc_mp_req *rb =
495                                 (struct malloc_mp_req *)rb_msg.param;
496                 struct timespec ts;
497                 struct primary_alloc_req_state *state =
498                                 &entry->alloc_state;
499                 int ret;
500
501                 memset(&rb_msg, 0, sizeof(rb_msg));
502
503                 /* we've failed to sync, so do a rollback */
504                 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
505                                 state->map_addr, state->map_len);
506
507                 rollback_expand_heap(state->ms, state->ms_len, state->elem,
508                                 state->map_addr, state->map_len);
509
510                 /* send rollback request */
511                 rb_msg.num_fds = 0;
512                 rb_msg.len_param = sizeof(*rb);
513                 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
514
515                 ts.tv_nsec = 0;
516                 ts.tv_sec = MP_TIMEOUT_S;
517
518                 /* sync requests carry no data */
519                 rb->t = REQ_TYPE_SYNC;
520                 rb->id = entry->user_req.id;
521
522                 /* there may be stray timeout still waiting */
523                 do {
524                         ret = rte_mp_request_async(&rb_msg, &ts,
525                                         handle_rollback_response);
526                 } while (ret != 0 && rte_errno == EEXIST);
527                 if (ret != 0) {
528                         RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
529
530                         /* we couldn't send rollback request, but that's OK -
531                          * secondary will time out, and memory has been removed
532                          * from heap anyway.
533                          */
534                         TAILQ_REMOVE(&mp_request_list.list, entry, next);
535                         free(state->ms);
536                         free(entry);
537                         goto fail;
538                 }
539         } else {
540                 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
541                 goto fail;
542         }
543
544         pthread_mutex_unlock(&mp_request_list.lock);
545         return 0;
546 fail:
547         pthread_mutex_unlock(&mp_request_list.lock);
548         return -1;
549 }
550
551 static int
552 handle_rollback_response(const struct rte_mp_msg *request,
553                 const struct rte_mp_reply *reply __rte_unused)
554 {
555         struct rte_mp_msg msg;
556         struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
557         const struct malloc_mp_req *mpreq =
558                         (const struct malloc_mp_req *)request->param;
559         struct mp_request *entry;
560
561         /* lock the request */
562         pthread_mutex_lock(&mp_request_list.lock);
563
564         memset(&msg, 0, sizeof(msg));
565
566         entry = find_request_by_id(mpreq->id);
567         if (entry == NULL) {
568                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
569                 goto fail;
570         }
571
572         if (entry->user_req.t != REQ_TYPE_ALLOC) {
573                 RTE_LOG(ERR, EAL, "Unexpected active request\n");
574                 goto fail;
575         }
576
577         /* we don't care if rollback succeeded, request still failed */
578         resp->t = REQ_TYPE_ALLOC;
579         resp->result = REQ_RESULT_FAIL;
580         resp->id = mpreq->id;
581         msg.num_fds = 0;
582         msg.len_param = sizeof(*resp);
583         strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
584
585         if (rte_mp_sendmsg(&msg))
586                 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
587
588         /* clean up */
589         TAILQ_REMOVE(&mp_request_list.list, entry, next);
590         free(entry->alloc_state.ms);
591         free(entry);
592
593         pthread_mutex_unlock(&mp_request_list.lock);
594         return 0;
595 fail:
596         pthread_mutex_unlock(&mp_request_list.lock);
597         return -1;
598 }
599
600 /* final stage of the request from secondary */
601 static int
602 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
603 {
604         const struct malloc_mp_req *m =
605                         (const struct malloc_mp_req *)msg->param;
606         struct mp_request *entry;
607
608         pthread_mutex_lock(&mp_request_list.lock);
609
610         entry = find_request_by_id(m->id);
611         if (entry != NULL) {
612                 /* update request status */
613                 entry->user_req.result = m->result;
614
615                 entry->state = REQ_STATE_COMPLETE;
616
617                 /* trigger thread wakeup */
618                 pthread_cond_signal(&entry->cond);
619         }
620
621         pthread_mutex_unlock(&mp_request_list.lock);
622
623         return 0;
624 }
625
626 /* synchronously request memory map sync, this is only called whenever primary
627  * process initiates the allocation.
628  */
629 int
630 request_sync(void)
631 {
632         struct rte_mp_msg msg;
633         struct rte_mp_reply reply;
634         struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
635         struct timespec ts;
636         int i, ret = -1;
637
638         memset(&msg, 0, sizeof(msg));
639         memset(&reply, 0, sizeof(reply));
640
641         /* no need to create tailq entries as this is entirely synchronous */
642
643         msg.num_fds = 0;
644         msg.len_param = sizeof(*req);
645         strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
646
647         /* sync request carries no data */
648         req->t = REQ_TYPE_SYNC;
649         req->id = get_unique_id();
650
651         ts.tv_nsec = 0;
652         ts.tv_sec = MP_TIMEOUT_S;
653
654         /* there may be stray timeout still waiting */
655         do {
656                 ret = rte_mp_request_sync(&msg, &reply, &ts);
657         } while (ret != 0 && rte_errno == EEXIST);
658         if (ret != 0) {
659                 /* if IPC is unsupported, behave as if the call succeeded */
660                 if (rte_errno != ENOTSUP)
661                         RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
662                 else
663                         ret = 0;
664                 goto out;
665         }
666
667         if (reply.nb_received != reply.nb_sent) {
668                 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
669                 goto out;
670         }
671
672         for (i = 0; i < reply.nb_received; i++) {
673                 struct malloc_mp_req *resp =
674                                 (struct malloc_mp_req *)reply.msgs[i].param;
675                 if (resp->t != REQ_TYPE_SYNC) {
676                         RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
677                         goto out;
678                 }
679                 if (resp->id != req->id) {
680                         RTE_LOG(ERR, EAL, "Wrong request ID\n");
681                         goto out;
682                 }
683                 if (resp->result != REQ_RESULT_SUCCESS) {
684                         RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
685                         goto out;
686                 }
687         }
688
689         ret = 0;
690 out:
691         free(reply.msgs);
692         return ret;
693 }
694
695 /* this is a synchronous wrapper around a bunch of asynchronous requests to
696  * primary process. this will initiate a request and wait until responses come.
697  */
698 int
699 request_to_primary(struct malloc_mp_req *user_req)
700 {
701         struct rte_mp_msg msg;
702         struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
703         struct mp_request *entry;
704         struct timespec ts;
705         struct timeval now;
706         int ret;
707
708         memset(&msg, 0, sizeof(msg));
709         memset(&ts, 0, sizeof(ts));
710
711         pthread_mutex_lock(&mp_request_list.lock);
712
713         entry = malloc(sizeof(*entry));
714         if (entry == NULL) {
715                 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
716                 goto fail;
717         }
718
719         memset(entry, 0, sizeof(*entry));
720
721         if (gettimeofday(&now, NULL) < 0) {
722                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
723                 goto fail;
724         }
725
726         ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
727         ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
728                         (now.tv_usec * 1000) / 1000000000;
729
730         /* initialize the request */
731         pthread_cond_init(&entry->cond, NULL);
732
733         msg.num_fds = 0;
734         msg.len_param = sizeof(*msg_req);
735         strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
736
737         /* (attempt to) get a unique id */
738         user_req->id = get_unique_id();
739
740         /* copy contents of user request into the message */
741         memcpy(msg_req, user_req, sizeof(*msg_req));
742
743         if (rte_mp_sendmsg(&msg)) {
744                 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
745                 goto fail;
746         }
747
748         /* copy contents of user request into active request */
749         memcpy(&entry->user_req, user_req, sizeof(*user_req));
750
751         /* mark request as in progress */
752         entry->state = REQ_STATE_ACTIVE;
753
754         TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
755
756         /* finally, wait on timeout */
757         do {
758                 ret = pthread_cond_timedwait(&entry->cond,
759                                 &mp_request_list.lock, &ts);
760         } while (ret != 0 && ret != ETIMEDOUT);
761
762         if (entry->state != REQ_STATE_COMPLETE) {
763                 RTE_LOG(ERR, EAL, "Request timed out\n");
764                 ret = -1;
765         } else {
766                 ret = 0;
767                 user_req->result = entry->user_req.result;
768         }
769         TAILQ_REMOVE(&mp_request_list.list, entry, next);
770         free(entry);
771
772         pthread_mutex_unlock(&mp_request_list.lock);
773         return ret;
774 fail:
775         pthread_mutex_unlock(&mp_request_list.lock);
776         free(entry);
777         return -1;
778 }
779
780 int
781 register_mp_requests(void)
782 {
783         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
784                 /* it's OK for primary to not support IPC */
785                 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
786                                 rte_errno != ENOTSUP) {
787                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
788                                 MP_ACTION_REQUEST);
789                         return -1;
790                 }
791         } else {
792                 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
793                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
794                                 MP_ACTION_SYNC);
795                         return -1;
796                 }
797                 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
798                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
799                                 MP_ACTION_SYNC);
800                         return -1;
801                 }
802                 if (rte_mp_action_register(MP_ACTION_RESPONSE,
803                                 handle_response)) {
804                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
805                                 MP_ACTION_RESPONSE);
806                         return -1;
807                 }
808         }
809         return 0;
810 }