net/virtio: fix default duplex mode
[dpdk.git] / lib / eal / common / malloc_mp.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Intel Corporation
3  */
4
5 #include <string.h>
6 #include <sys/time.h>
7
8 #include <rte_alarm.h>
9 #include <rte_errno.h>
10 #include <rte_string_fns.h>
11
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 #include "eal_private.h"
15
16 #include "malloc_elem.h"
17 #include "malloc_mp.h"
18
19 #define MP_ACTION_SYNC "mp_malloc_sync"
20 /**< request sent by primary process to notify of changes in memory map */
21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
22 /**< request sent by primary process to notify of changes in memory map. this is
23  * essentially a regular sync request, but we cannot send sync requests while
24  * another one is in progress, and we might have to - therefore, we do this as
25  * a separate callback.
26  */
27 #define MP_ACTION_REQUEST "mp_malloc_request"
28 /**< request sent by secondary process to ask for allocation/deallocation */
29 #define MP_ACTION_RESPONSE "mp_malloc_response"
30 /**< response sent to secondary process to indicate result of request */
31
32 /* forward declarations */
33 static int
34 handle_sync_response(const struct rte_mp_msg *request,
35                 const struct rte_mp_reply *reply);
36 static int
37 handle_rollback_response(const struct rte_mp_msg *request,
38                 const struct rte_mp_reply *reply);
39
40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
41
42 /* when we're allocating, we need to store some state to ensure that we can
43  * roll back later
44  */
45 struct primary_alloc_req_state {
46         struct malloc_heap *heap;
47         struct rte_memseg **ms;
48         int ms_len;
49         struct malloc_elem *elem;
50         void *map_addr;
51         size_t map_len;
52 };
53
54 enum req_state {
55         REQ_STATE_INACTIVE = 0,
56         REQ_STATE_ACTIVE,
57         REQ_STATE_COMPLETE
58 };
59
60 struct mp_request {
61         TAILQ_ENTRY(mp_request) next;
62         struct malloc_mp_req user_req; /**< contents of request */
63         pthread_cond_t cond; /**< variable we use to time out on this request */
64         enum req_state state; /**< indicate status of this request */
65         struct primary_alloc_req_state alloc_state;
66 };
67
68 /*
69  * We could've used just a single request, but it may be possible for
70  * secondaries to timeout earlier than the primary, and send a new request while
71  * primary is still expecting replies to the old one. Therefore, each new
72  * request will get assigned a new ID, which is how we will distinguish between
73  * expected and unexpected messages.
74  */
75 TAILQ_HEAD(mp_request_list, mp_request);
76 static struct {
77         struct mp_request_list list;
78         pthread_mutex_t lock;
79 } mp_request_list = {
80         .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
81         .lock = PTHREAD_MUTEX_INITIALIZER
82 };
83
84 /**
85  * General workflow is the following:
86  *
87  * Allocation:
88  * S: send request to primary
89  * P: attempt to allocate memory
90  *    if failed, sendmsg failure
91  *    if success, send sync request
92  * S: if received msg of failure, quit
93  *    if received sync request, synchronize memory map and reply with result
94  * P: if received sync request result
95  *    if success, sendmsg success
96  *    if failure, roll back allocation and send a rollback request
97  * S: if received msg of success, quit
98  *    if received rollback request, synchronize memory map and reply with result
99  * P: if received sync request result
100  *    sendmsg sync request result
101  * S: if received msg, quit
102  *
103  * Aside from timeouts, there are three points where we can quit:
104  *  - if allocation failed straight away
105  *  - if allocation and sync request succeeded
106  *  - if allocation succeeded, sync request failed, allocation rolled back and
107  *    rollback request received (irrespective of whether it succeeded or failed)
108  *
109  * Deallocation:
110  * S: send request to primary
111  * P: attempt to deallocate memory
112  *    if failed, sendmsg failure
113  *    if success, send sync request
114  * S: if received msg of failure, quit
115  *    if received sync request, synchronize memory map and reply with result
116  * P: if received sync request result
117  *    sendmsg sync request result
118  * S: if received msg, quit
119  *
120  * There is no "rollback" from deallocation, as it's safe to have some memory
121  * mapped in some processes - it's absent from the heap, so it won't get used.
122  */
123
124 static struct mp_request *
125 find_request_by_id(uint64_t id)
126 {
127         struct mp_request *req;
128         TAILQ_FOREACH(req, &mp_request_list.list, next) {
129                 if (req->user_req.id == id)
130                         break;
131         }
132         return req;
133 }
134
135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
136 static uint64_t
137 get_unique_id(void)
138 {
139         uint64_t id;
140         do {
141                 id = rte_rand();
142         } while (find_request_by_id(id) != NULL);
143         return id;
144 }
145
146 /* secondary will respond to sync requests thusly */
147 static int
148 handle_sync(const struct rte_mp_msg *msg, const void *peer)
149 {
150         struct rte_mp_msg reply;
151         const struct malloc_mp_req *req =
152                         (const struct malloc_mp_req *)msg->param;
153         struct malloc_mp_req *resp =
154                         (struct malloc_mp_req *)reply.param;
155         int ret;
156
157         if (req->t != REQ_TYPE_SYNC) {
158                 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
159                 return -1;
160         }
161
162         memset(&reply, 0, sizeof(reply));
163
164         reply.num_fds = 0;
165         strlcpy(reply.name, msg->name, sizeof(reply.name));
166         reply.len_param = sizeof(*resp);
167
168         ret = eal_memalloc_sync_with_primary();
169
170         resp->t = REQ_TYPE_SYNC;
171         resp->id = req->id;
172         resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
173
174         return rte_mp_reply(&reply, peer);
175 }
176
177 static int
178 handle_free_request(const struct malloc_mp_req *m)
179 {
180         const struct rte_memseg_list *msl;
181         void *start, *end;
182         size_t len;
183
184         len = m->free_req.len;
185         start = m->free_req.addr;
186         end = RTE_PTR_ADD(start, len - 1);
187
188         /* check if the requested memory actually exists */
189         msl = rte_mem_virt2memseg_list(start);
190         if (msl == NULL) {
191                 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
192                 return -1;
193         }
194
195         /* check if end is within the same memory region */
196         if (rte_mem_virt2memseg_list(end) != msl) {
197                 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
198                 return -1;
199         }
200
201         /* we're supposed to only free memory that's not external */
202         if (msl->external) {
203                 RTE_LOG(ERR, EAL, "Requested to free external memory\n");
204                 return -1;
205         }
206
207         /* now that we've validated the request, announce it */
208         eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
209                         m->free_req.addr, m->free_req.len);
210
211         /* now, do the actual freeing */
212         return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
213 }
214
215 static int
216 handle_alloc_request(const struct malloc_mp_req *m,
217                 struct mp_request *req)
218 {
219         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
220         const struct malloc_req_alloc *ar = &m->alloc_req;
221         struct malloc_heap *heap;
222         struct malloc_elem *elem;
223         struct rte_memseg **ms;
224         size_t alloc_sz;
225         int n_segs;
226         void *map_addr;
227
228         /* this is checked by the API, but we need to prevent divide by zero */
229         if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
230                 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
231                 return -1;
232         }
233
234         /* heap idx is index into the heap array, not socket ID */
235         if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
236                 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
237                 return -1;
238         }
239
240         heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
241
242         /*
243          * for allocations, we must only use internal heaps, but since the
244          * rte_malloc_heap_socket_is_external() is thread-safe and we're already
245          * read-locked, we'll have to take advantage of the fact that internal
246          * socket ID's are always lower than RTE_MAX_NUMA_NODES.
247          */
248         if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
249                 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
250                 return -1;
251         }
252
253         alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
254                         MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
255         n_segs = alloc_sz / ar->page_sz;
256
257         /* we can't know in advance how many pages we'll need, so we malloc */
258         ms = malloc(sizeof(*ms) * n_segs);
259         if (ms == NULL) {
260                 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
261                 return -1;
262         }
263         memset(ms, 0, sizeof(*ms) * n_segs);
264
265         elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
266                         ar->flags, ar->align, ar->bound, ar->contig, ms,
267                         n_segs);
268
269         if (elem == NULL)
270                 goto fail;
271
272         map_addr = ms[0]->addr;
273
274         eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
275
276         /* we have succeeded in allocating memory, but we still need to sync
277          * with other processes. however, since DPDK IPC is single-threaded, we
278          * send an asynchronous request and exit this callback.
279          */
280
281         req->alloc_state.ms = ms;
282         req->alloc_state.ms_len = n_segs;
283         req->alloc_state.map_addr = map_addr;
284         req->alloc_state.map_len = alloc_sz;
285         req->alloc_state.elem = elem;
286         req->alloc_state.heap = heap;
287
288         return 0;
289 fail:
290         free(ms);
291         return -1;
292 }
293
294 /* first stage of primary handling requests from secondary */
295 static int
296 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
297 {
298         const struct malloc_mp_req *m =
299                         (const struct malloc_mp_req *)msg->param;
300         struct mp_request *entry;
301         int ret;
302
303         /* lock access to request */
304         pthread_mutex_lock(&mp_request_list.lock);
305
306         /* make sure it's not a dupe */
307         entry = find_request_by_id(m->id);
308         if (entry != NULL) {
309                 RTE_LOG(ERR, EAL, "Duplicate request id\n");
310                 goto fail;
311         }
312
313         entry = malloc(sizeof(*entry));
314         if (entry == NULL) {
315                 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
316                 goto fail;
317         }
318
319         /* erase all data */
320         memset(entry, 0, sizeof(*entry));
321
322         if (m->t == REQ_TYPE_ALLOC) {
323                 ret = handle_alloc_request(m, entry);
324         } else if (m->t == REQ_TYPE_FREE) {
325                 ret = handle_free_request(m);
326         } else {
327                 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
328                 goto fail;
329         }
330
331         if (ret != 0) {
332                 struct rte_mp_msg resp_msg;
333                 struct malloc_mp_req *resp =
334                                 (struct malloc_mp_req *)resp_msg.param;
335
336                 /* send failure message straight away */
337                 resp_msg.num_fds = 0;
338                 resp_msg.len_param = sizeof(*resp);
339                 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
340                                 sizeof(resp_msg.name));
341
342                 resp->t = m->t;
343                 resp->result = REQ_RESULT_FAIL;
344                 resp->id = m->id;
345
346                 if (rte_mp_sendmsg(&resp_msg)) {
347                         RTE_LOG(ERR, EAL, "Couldn't send response\n");
348                         goto fail;
349                 }
350                 /* we did not modify the request */
351                 free(entry);
352         } else {
353                 struct rte_mp_msg sr_msg;
354                 struct malloc_mp_req *sr =
355                                 (struct malloc_mp_req *)sr_msg.param;
356                 struct timespec ts;
357
358                 memset(&sr_msg, 0, sizeof(sr_msg));
359
360                 /* we can do something, so send sync request asynchronously */
361                 sr_msg.num_fds = 0;
362                 sr_msg.len_param = sizeof(*sr);
363                 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
364
365                 ts.tv_nsec = 0;
366                 ts.tv_sec = MP_TIMEOUT_S;
367
368                 /* sync requests carry no data */
369                 sr->t = REQ_TYPE_SYNC;
370                 sr->id = m->id;
371
372                 /* there may be stray timeout still waiting */
373                 do {
374                         ret = rte_mp_request_async(&sr_msg, &ts,
375                                         handle_sync_response);
376                 } while (ret != 0 && rte_errno == EEXIST);
377                 if (ret != 0) {
378                         RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
379                         if (m->t == REQ_TYPE_ALLOC)
380                                 free(entry->alloc_state.ms);
381                         goto fail;
382                 }
383
384                 /* mark request as in progress */
385                 memcpy(&entry->user_req, m, sizeof(*m));
386                 entry->state = REQ_STATE_ACTIVE;
387
388                 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
389         }
390         pthread_mutex_unlock(&mp_request_list.lock);
391         return 0;
392 fail:
393         pthread_mutex_unlock(&mp_request_list.lock);
394         free(entry);
395         return -1;
396 }
397
398 /* callback for asynchronous sync requests for primary. this will either do a
399  * sendmsg with results, or trigger rollback request.
400  */
401 static int
402 handle_sync_response(const struct rte_mp_msg *request,
403                 const struct rte_mp_reply *reply)
404 {
405         enum malloc_req_result result;
406         struct mp_request *entry;
407         const struct malloc_mp_req *mpreq =
408                         (const struct malloc_mp_req *)request->param;
409         int i;
410
411         /* lock the request */
412         pthread_mutex_lock(&mp_request_list.lock);
413
414         entry = find_request_by_id(mpreq->id);
415         if (entry == NULL) {
416                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
417                 goto fail;
418         }
419
420         result = REQ_RESULT_SUCCESS;
421
422         if (reply->nb_received != reply->nb_sent)
423                 result = REQ_RESULT_FAIL;
424
425         for (i = 0; i < reply->nb_received; i++) {
426                 struct malloc_mp_req *resp =
427                                 (struct malloc_mp_req *)reply->msgs[i].param;
428
429                 if (resp->t != REQ_TYPE_SYNC) {
430                         RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
431                         result = REQ_RESULT_FAIL;
432                         break;
433                 }
434                 if (resp->id != entry->user_req.id) {
435                         RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
436                         result = REQ_RESULT_FAIL;
437                         break;
438                 }
439                 if (resp->result == REQ_RESULT_FAIL) {
440                         result = REQ_RESULT_FAIL;
441                         break;
442                 }
443         }
444
445         if (entry->user_req.t == REQ_TYPE_FREE) {
446                 struct rte_mp_msg msg;
447                 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
448
449                 memset(&msg, 0, sizeof(msg));
450
451                 /* this is a free request, just sendmsg result */
452                 resp->t = REQ_TYPE_FREE;
453                 resp->result = result;
454                 resp->id = entry->user_req.id;
455                 msg.num_fds = 0;
456                 msg.len_param = sizeof(*resp);
457                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
458
459                 if (rte_mp_sendmsg(&msg))
460                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
461
462                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
463                 free(entry);
464         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
465                         result == REQ_RESULT_SUCCESS) {
466                 struct malloc_heap *heap = entry->alloc_state.heap;
467                 struct rte_mp_msg msg;
468                 struct malloc_mp_req *resp =
469                                 (struct malloc_mp_req *)msg.param;
470
471                 memset(&msg, 0, sizeof(msg));
472
473                 heap->total_size += entry->alloc_state.map_len;
474
475                 /* result is success, so just notify secondary about this */
476                 resp->t = REQ_TYPE_ALLOC;
477                 resp->result = result;
478                 resp->id = entry->user_req.id;
479                 msg.num_fds = 0;
480                 msg.len_param = sizeof(*resp);
481                 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
482
483                 if (rte_mp_sendmsg(&msg))
484                         RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
485
486                 TAILQ_REMOVE(&mp_request_list.list, entry, next);
487                 free(entry->alloc_state.ms);
488                 free(entry);
489         } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
490                         result == REQ_RESULT_FAIL) {
491                 struct rte_mp_msg rb_msg;
492                 struct malloc_mp_req *rb =
493                                 (struct malloc_mp_req *)rb_msg.param;
494                 struct timespec ts;
495                 struct primary_alloc_req_state *state =
496                                 &entry->alloc_state;
497                 int ret;
498
499                 memset(&rb_msg, 0, sizeof(rb_msg));
500
501                 /* we've failed to sync, so do a rollback */
502                 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
503                                 state->map_addr, state->map_len);
504
505                 rollback_expand_heap(state->ms, state->ms_len, state->elem,
506                                 state->map_addr, state->map_len);
507
508                 /* send rollback request */
509                 rb_msg.num_fds = 0;
510                 rb_msg.len_param = sizeof(*rb);
511                 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
512
513                 ts.tv_nsec = 0;
514                 ts.tv_sec = MP_TIMEOUT_S;
515
516                 /* sync requests carry no data */
517                 rb->t = REQ_TYPE_SYNC;
518                 rb->id = entry->user_req.id;
519
520                 /* there may be stray timeout still waiting */
521                 do {
522                         ret = rte_mp_request_async(&rb_msg, &ts,
523                                         handle_rollback_response);
524                 } while (ret != 0 && rte_errno == EEXIST);
525                 if (ret != 0) {
526                         RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
527
528                         /* we couldn't send rollback request, but that's OK -
529                          * secondary will time out, and memory has been removed
530                          * from heap anyway.
531                          */
532                         TAILQ_REMOVE(&mp_request_list.list, entry, next);
533                         free(state->ms);
534                         free(entry);
535                         goto fail;
536                 }
537         } else {
538                 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
539                 goto fail;
540         }
541
542         pthread_mutex_unlock(&mp_request_list.lock);
543         return 0;
544 fail:
545         pthread_mutex_unlock(&mp_request_list.lock);
546         return -1;
547 }
548
549 static int
550 handle_rollback_response(const struct rte_mp_msg *request,
551                 const struct rte_mp_reply *reply __rte_unused)
552 {
553         struct rte_mp_msg msg;
554         struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
555         const struct malloc_mp_req *mpreq =
556                         (const struct malloc_mp_req *)request->param;
557         struct mp_request *entry;
558
559         /* lock the request */
560         pthread_mutex_lock(&mp_request_list.lock);
561
562         memset(&msg, 0, sizeof(msg));
563
564         entry = find_request_by_id(mpreq->id);
565         if (entry == NULL) {
566                 RTE_LOG(ERR, EAL, "Wrong request ID\n");
567                 goto fail;
568         }
569
570         if (entry->user_req.t != REQ_TYPE_ALLOC) {
571                 RTE_LOG(ERR, EAL, "Unexpected active request\n");
572                 goto fail;
573         }
574
575         /* we don't care if rollback succeeded, request still failed */
576         resp->t = REQ_TYPE_ALLOC;
577         resp->result = REQ_RESULT_FAIL;
578         resp->id = mpreq->id;
579         msg.num_fds = 0;
580         msg.len_param = sizeof(*resp);
581         strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
582
583         if (rte_mp_sendmsg(&msg))
584                 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
585
586         /* clean up */
587         TAILQ_REMOVE(&mp_request_list.list, entry, next);
588         free(entry->alloc_state.ms);
589         free(entry);
590
591         pthread_mutex_unlock(&mp_request_list.lock);
592         return 0;
593 fail:
594         pthread_mutex_unlock(&mp_request_list.lock);
595         return -1;
596 }
597
598 /* final stage of the request from secondary */
599 static int
600 handle_response(const struct rte_mp_msg *msg, const void *peer  __rte_unused)
601 {
602         const struct malloc_mp_req *m =
603                         (const struct malloc_mp_req *)msg->param;
604         struct mp_request *entry;
605
606         pthread_mutex_lock(&mp_request_list.lock);
607
608         entry = find_request_by_id(m->id);
609         if (entry != NULL) {
610                 /* update request status */
611                 entry->user_req.result = m->result;
612
613                 entry->state = REQ_STATE_COMPLETE;
614
615                 /* trigger thread wakeup */
616                 pthread_cond_signal(&entry->cond);
617         }
618
619         pthread_mutex_unlock(&mp_request_list.lock);
620
621         return 0;
622 }
623
624 /* synchronously request memory map sync, this is only called whenever primary
625  * process initiates the allocation.
626  */
627 int
628 request_sync(void)
629 {
630         struct rte_mp_msg msg;
631         struct rte_mp_reply reply;
632         struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
633         struct timespec ts;
634         int i, ret = -1;
635
636         memset(&msg, 0, sizeof(msg));
637         memset(&reply, 0, sizeof(reply));
638
639         /* no need to create tailq entries as this is entirely synchronous */
640
641         msg.num_fds = 0;
642         msg.len_param = sizeof(*req);
643         strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
644
645         /* sync request carries no data */
646         req->t = REQ_TYPE_SYNC;
647         req->id = get_unique_id();
648
649         ts.tv_nsec = 0;
650         ts.tv_sec = MP_TIMEOUT_S;
651
652         /* there may be stray timeout still waiting */
653         do {
654                 ret = rte_mp_request_sync(&msg, &reply, &ts);
655         } while (ret != 0 && rte_errno == EEXIST);
656         if (ret != 0) {
657                 /* if IPC is unsupported, behave as if the call succeeded */
658                 if (rte_errno != ENOTSUP)
659                         RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
660                 else
661                         ret = 0;
662                 goto out;
663         }
664
665         if (reply.nb_received != reply.nb_sent) {
666                 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
667                 goto out;
668         }
669
670         for (i = 0; i < reply.nb_received; i++) {
671                 struct malloc_mp_req *resp =
672                                 (struct malloc_mp_req *)reply.msgs[i].param;
673                 if (resp->t != REQ_TYPE_SYNC) {
674                         RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
675                         goto out;
676                 }
677                 if (resp->id != req->id) {
678                         RTE_LOG(ERR, EAL, "Wrong request ID\n");
679                         goto out;
680                 }
681                 if (resp->result != REQ_RESULT_SUCCESS) {
682                         RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
683                         goto out;
684                 }
685         }
686
687         ret = 0;
688 out:
689         free(reply.msgs);
690         return ret;
691 }
692
693 /* this is a synchronous wrapper around a bunch of asynchronous requests to
694  * primary process. this will initiate a request and wait until responses come.
695  */
696 int
697 request_to_primary(struct malloc_mp_req *user_req)
698 {
699         struct rte_mp_msg msg;
700         struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
701         struct mp_request *entry;
702         struct timespec ts;
703         struct timeval now;
704         int ret;
705
706         memset(&msg, 0, sizeof(msg));
707         memset(&ts, 0, sizeof(ts));
708
709         pthread_mutex_lock(&mp_request_list.lock);
710
711         entry = malloc(sizeof(*entry));
712         if (entry == NULL) {
713                 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
714                 goto fail;
715         }
716
717         memset(entry, 0, sizeof(*entry));
718
719         if (gettimeofday(&now, NULL) < 0) {
720                 RTE_LOG(ERR, EAL, "Cannot get current time\n");
721                 goto fail;
722         }
723
724         ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
725         ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
726                         (now.tv_usec * 1000) / 1000000000;
727
728         /* initialize the request */
729         pthread_cond_init(&entry->cond, NULL);
730
731         msg.num_fds = 0;
732         msg.len_param = sizeof(*msg_req);
733         strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
734
735         /* (attempt to) get a unique id */
736         user_req->id = get_unique_id();
737
738         /* copy contents of user request into the message */
739         memcpy(msg_req, user_req, sizeof(*msg_req));
740
741         if (rte_mp_sendmsg(&msg)) {
742                 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
743                 goto fail;
744         }
745
746         /* copy contents of user request into active request */
747         memcpy(&entry->user_req, user_req, sizeof(*user_req));
748
749         /* mark request as in progress */
750         entry->state = REQ_STATE_ACTIVE;
751
752         TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
753
754         /* finally, wait on timeout */
755         do {
756                 ret = pthread_cond_timedwait(&entry->cond,
757                                 &mp_request_list.lock, &ts);
758         } while (ret != 0 && ret != ETIMEDOUT);
759
760         if (entry->state != REQ_STATE_COMPLETE) {
761                 RTE_LOG(ERR, EAL, "Request timed out\n");
762                 ret = -1;
763         } else {
764                 ret = 0;
765                 user_req->result = entry->user_req.result;
766         }
767         TAILQ_REMOVE(&mp_request_list.list, entry, next);
768         free(entry);
769
770         pthread_mutex_unlock(&mp_request_list.lock);
771         return ret;
772 fail:
773         pthread_mutex_unlock(&mp_request_list.lock);
774         free(entry);
775         return -1;
776 }
777
778 int
779 register_mp_requests(void)
780 {
781         if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
782                 /* it's OK for primary to not support IPC */
783                 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
784                                 rte_errno != ENOTSUP) {
785                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
786                                 MP_ACTION_REQUEST);
787                         return -1;
788                 }
789         } else {
790                 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
791                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
792                                 MP_ACTION_SYNC);
793                         return -1;
794                 }
795                 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
796                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
797                                 MP_ACTION_SYNC);
798                         return -1;
799                 }
800                 if (rte_mp_action_register(MP_ACTION_RESPONSE,
801                                 handle_response)) {
802                         RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
803                                 MP_ACTION_RESPONSE);
804                         return -1;
805                 }
806         }
807         return 0;
808 }