1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
10 #include <rte_string_fns.h>
12 #include "eal_memalloc.h"
14 #include "malloc_elem.h"
15 #include "malloc_mp.h"
17 #define MP_ACTION_SYNC "mp_malloc_sync"
18 /**< request sent by primary process to notify of changes in memory map */
19 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
20 /**< request sent by primary process to notify of changes in memory map. this is
21 * essentially a regular sync request, but we cannot send sync requests while
22 * another one is in progress, and we might have to - therefore, we do this as
23 * a separate callback.
25 #define MP_ACTION_REQUEST "mp_malloc_request"
26 /**< request sent by secondary process to ask for allocation/deallocation */
27 #define MP_ACTION_RESPONSE "mp_malloc_response"
28 /**< response sent to secondary process to indicate result of request */
30 /* forward declarations */
32 handle_sync_response(const struct rte_mp_msg *request,
33 const struct rte_mp_reply *reply);
35 handle_rollback_response(const struct rte_mp_msg *request,
36 const struct rte_mp_reply *reply);
38 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
40 /* when we're allocating, we need to store some state to ensure that we can
43 struct primary_alloc_req_state {
44 struct malloc_heap *heap;
45 struct rte_memseg **ms;
47 struct malloc_elem *elem;
53 REQ_STATE_INACTIVE = 0,
59 TAILQ_ENTRY(mp_request) next;
60 struct malloc_mp_req user_req; /**< contents of request */
61 pthread_cond_t cond; /**< variable we use to time out on this request */
62 enum req_state state; /**< indicate status of this request */
63 struct primary_alloc_req_state alloc_state;
67 * We could've used just a single request, but it may be possible for
68 * secondaries to timeout earlier than the primary, and send a new request while
69 * primary is still expecting replies to the old one. Therefore, each new
70 * request will get assigned a new ID, which is how we will distinguish between
71 * expected and unexpected messages.
73 TAILQ_HEAD(mp_request_list, mp_request);
75 struct mp_request_list list;
78 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
79 .lock = PTHREAD_MUTEX_INITIALIZER
83 * General workflow is the following:
86 * S: send request to primary
87 * P: attempt to allocate memory
88 * if failed, sendmsg failure
89 * if success, send sync request
90 * S: if received msg of failure, quit
91 * if received sync request, synchronize memory map and reply with result
92 * P: if received sync request result
93 * if success, sendmsg success
94 * if failure, roll back allocation and send a rollback request
95 * S: if received msg of success, quit
96 * if received rollback request, synchronize memory map and reply with result
97 * P: if received sync request result
98 * sendmsg sync request result
99 * S: if received msg, quit
101 * Aside from timeouts, there are three points where we can quit:
102 * - if allocation failed straight away
103 * - if allocation and sync request succeeded
104 * - if allocation succeeded, sync request failed, allocation rolled back and
105 * rollback request received (irrespective of whether it succeeded or failed)
108 * S: send request to primary
109 * P: attempt to deallocate memory
110 * if failed, sendmsg failure
111 * if success, send sync request
112 * S: if received msg of failure, quit
113 * if received sync request, synchronize memory map and reply with result
114 * P: if received sync request result
115 * sendmsg sync request result
116 * S: if received msg, quit
118 * There is no "rollback" from deallocation, as it's safe to have some memory
119 * mapped in some processes - it's absent from the heap, so it won't get used.
122 static struct mp_request *
123 find_request_by_id(uint64_t id)
125 struct mp_request *req;
126 TAILQ_FOREACH(req, &mp_request_list.list, next) {
127 if (req->user_req.id == id)
133 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
140 } while (find_request_by_id(id) != NULL);
144 /* secondary will respond to sync requests thusly */
146 handle_sync(const struct rte_mp_msg *msg, const void *peer)
148 struct rte_mp_msg reply;
149 const struct malloc_mp_req *req =
150 (const struct malloc_mp_req *)msg->param;
151 struct malloc_mp_req *resp =
152 (struct malloc_mp_req *)reply.param;
155 if (req->t != REQ_TYPE_SYNC) {
156 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
160 memset(&reply, 0, sizeof(reply));
163 strlcpy(reply.name, msg->name, sizeof(reply.name));
164 reply.len_param = sizeof(*resp);
166 ret = eal_memalloc_sync_with_primary();
168 resp->t = REQ_TYPE_SYNC;
170 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
172 rte_mp_reply(&reply, peer);
178 handle_alloc_request(const struct malloc_mp_req *m,
179 struct mp_request *req)
181 const struct malloc_req_alloc *ar = &m->alloc_req;
182 struct malloc_heap *heap;
183 struct malloc_elem *elem;
184 struct rte_memseg **ms;
189 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
190 MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
191 n_segs = alloc_sz / ar->page_sz;
195 /* we can't know in advance how many pages we'll need, so we malloc */
196 ms = malloc(sizeof(*ms) * n_segs);
198 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
201 memset(ms, 0, sizeof(*ms) * n_segs);
203 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
204 ar->flags, ar->align, ar->bound, ar->contig, ms,
210 map_addr = ms[0]->addr;
212 /* we have succeeded in allocating memory, but we still need to sync
213 * with other processes. however, since DPDK IPC is single-threaded, we
214 * send an asynchronous request and exit this callback.
217 req->alloc_state.ms = ms;
218 req->alloc_state.ms_len = n_segs;
219 req->alloc_state.map_addr = map_addr;
220 req->alloc_state.map_len = alloc_sz;
221 req->alloc_state.elem = elem;
222 req->alloc_state.heap = heap;
230 /* first stage of primary handling requests from secondary */
232 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
234 const struct malloc_mp_req *m =
235 (const struct malloc_mp_req *)msg->param;
236 struct mp_request *entry;
239 /* lock access to request */
240 pthread_mutex_lock(&mp_request_list.lock);
242 /* make sure it's not a dupe */
243 entry = find_request_by_id(m->id);
245 RTE_LOG(ERR, EAL, "Duplicate request id\n");
249 entry = malloc(sizeof(*entry));
251 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
256 memset(entry, 0, sizeof(*entry));
258 if (m->t == REQ_TYPE_ALLOC) {
259 ret = handle_alloc_request(m, entry);
260 } else if (m->t == REQ_TYPE_FREE) {
261 ret = malloc_heap_free_pages(m->free_req.addr,
264 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
269 struct rte_mp_msg resp_msg;
270 struct malloc_mp_req *resp =
271 (struct malloc_mp_req *)resp_msg.param;
273 /* send failure message straight away */
274 resp_msg.num_fds = 0;
275 resp_msg.len_param = sizeof(*resp);
276 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
277 sizeof(resp_msg.name));
280 resp->result = REQ_RESULT_FAIL;
283 if (rte_mp_sendmsg(&resp_msg)) {
284 RTE_LOG(ERR, EAL, "Couldn't send response\n");
287 /* we did not modify the request */
290 struct rte_mp_msg sr_msg;
291 struct malloc_mp_req *sr =
292 (struct malloc_mp_req *)sr_msg.param;
295 memset(&sr_msg, 0, sizeof(sr_msg));
297 /* we can do something, so send sync request asynchronously */
299 sr_msg.len_param = sizeof(*sr);
300 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
303 ts.tv_sec = MP_TIMEOUT_S;
305 /* sync requests carry no data */
306 sr->t = REQ_TYPE_SYNC;
309 /* there may be stray timeout still waiting */
311 ret = rte_mp_request_async(&sr_msg, &ts,
312 handle_sync_response);
313 } while (ret != 0 && rte_errno == EEXIST);
315 RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
316 if (m->t == REQ_TYPE_ALLOC)
317 free(entry->alloc_state.ms);
321 /* mark request as in progress */
322 memcpy(&entry->user_req, m, sizeof(*m));
323 entry->state = REQ_STATE_ACTIVE;
325 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
327 pthread_mutex_unlock(&mp_request_list.lock);
330 pthread_mutex_unlock(&mp_request_list.lock);
335 /* callback for asynchronous sync requests for primary. this will either do a
336 * sendmsg with results, or trigger rollback request.
339 handle_sync_response(const struct rte_mp_msg *request,
340 const struct rte_mp_reply *reply)
342 enum malloc_req_result result;
343 struct mp_request *entry;
344 const struct malloc_mp_req *mpreq =
345 (const struct malloc_mp_req *)request->param;
348 /* lock the request */
349 pthread_mutex_lock(&mp_request_list.lock);
351 entry = find_request_by_id(mpreq->id);
353 RTE_LOG(ERR, EAL, "Wrong request ID\n");
357 result = REQ_RESULT_SUCCESS;
359 if (reply->nb_received != reply->nb_sent)
360 result = REQ_RESULT_FAIL;
362 for (i = 0; i < reply->nb_received; i++) {
363 struct malloc_mp_req *resp =
364 (struct malloc_mp_req *)reply->msgs[i].param;
366 if (resp->t != REQ_TYPE_SYNC) {
367 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
368 result = REQ_RESULT_FAIL;
371 if (resp->id != entry->user_req.id) {
372 RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
373 result = REQ_RESULT_FAIL;
376 if (resp->result == REQ_RESULT_FAIL) {
377 result = REQ_RESULT_FAIL;
382 if (entry->user_req.t == REQ_TYPE_FREE) {
383 struct rte_mp_msg msg;
384 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
386 memset(&msg, 0, sizeof(msg));
388 /* this is a free request, just sendmsg result */
389 resp->t = REQ_TYPE_FREE;
390 resp->result = result;
391 resp->id = entry->user_req.id;
393 msg.len_param = sizeof(*resp);
394 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
396 if (rte_mp_sendmsg(&msg))
397 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
399 TAILQ_REMOVE(&mp_request_list.list, entry, next);
401 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
402 result == REQ_RESULT_SUCCESS) {
403 struct malloc_heap *heap = entry->alloc_state.heap;
404 struct rte_mp_msg msg;
405 struct malloc_mp_req *resp =
406 (struct malloc_mp_req *)msg.param;
408 memset(&msg, 0, sizeof(msg));
410 heap->total_size += entry->alloc_state.map_len;
412 /* result is success, so just notify secondary about this */
413 resp->t = REQ_TYPE_ALLOC;
414 resp->result = result;
415 resp->id = entry->user_req.id;
417 msg.len_param = sizeof(*resp);
418 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
420 if (rte_mp_sendmsg(&msg))
421 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
423 TAILQ_REMOVE(&mp_request_list.list, entry, next);
424 free(entry->alloc_state.ms);
426 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
427 result == REQ_RESULT_FAIL) {
428 struct rte_mp_msg rb_msg;
429 struct malloc_mp_req *rb =
430 (struct malloc_mp_req *)rb_msg.param;
432 struct primary_alloc_req_state *state =
436 memset(&rb_msg, 0, sizeof(rb_msg));
438 /* we've failed to sync, so do a rollback */
439 rollback_expand_heap(state->ms, state->ms_len, state->elem,
440 state->map_addr, state->map_len);
442 /* send rollback request */
444 rb_msg.len_param = sizeof(*rb);
445 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
448 ts.tv_sec = MP_TIMEOUT_S;
450 /* sync requests carry no data */
451 rb->t = REQ_TYPE_SYNC;
452 rb->id = entry->user_req.id;
454 /* there may be stray timeout still waiting */
456 ret = rte_mp_request_async(&rb_msg, &ts,
457 handle_rollback_response);
458 } while (ret != 0 && rte_errno == EEXIST);
460 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
462 /* we couldn't send rollback request, but that's OK -
463 * secondary will time out, and memory has been removed
466 TAILQ_REMOVE(&mp_request_list.list, entry, next);
472 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
476 pthread_mutex_unlock(&mp_request_list.lock);
479 pthread_mutex_unlock(&mp_request_list.lock);
484 handle_rollback_response(const struct rte_mp_msg *request,
485 const struct rte_mp_reply *reply __rte_unused)
487 struct rte_mp_msg msg;
488 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
489 const struct malloc_mp_req *mpreq =
490 (const struct malloc_mp_req *)request->param;
491 struct mp_request *entry;
493 /* lock the request */
494 pthread_mutex_lock(&mp_request_list.lock);
496 memset(&msg, 0, sizeof(0));
498 entry = find_request_by_id(mpreq->id);
500 RTE_LOG(ERR, EAL, "Wrong request ID\n");
504 if (entry->user_req.t != REQ_TYPE_ALLOC) {
505 RTE_LOG(ERR, EAL, "Unexpected active request\n");
509 /* we don't care if rollback succeeded, request still failed */
510 resp->t = REQ_TYPE_ALLOC;
511 resp->result = REQ_RESULT_FAIL;
512 resp->id = mpreq->id;
514 msg.len_param = sizeof(*resp);
515 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
517 if (rte_mp_sendmsg(&msg))
518 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
521 TAILQ_REMOVE(&mp_request_list.list, entry, next);
522 free(entry->alloc_state.ms);
525 pthread_mutex_unlock(&mp_request_list.lock);
528 pthread_mutex_unlock(&mp_request_list.lock);
532 /* final stage of the request from secondary */
534 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
536 const struct malloc_mp_req *m =
537 (const struct malloc_mp_req *)msg->param;
538 struct mp_request *entry;
540 pthread_mutex_lock(&mp_request_list.lock);
542 entry = find_request_by_id(m->id);
544 /* update request status */
545 entry->user_req.result = m->result;
547 entry->state = REQ_STATE_COMPLETE;
549 /* trigger thread wakeup */
550 pthread_cond_signal(&entry->cond);
553 pthread_mutex_unlock(&mp_request_list.lock);
558 /* synchronously request memory map sync, this is only called whenever primary
559 * process initiates the allocation.
564 struct rte_mp_msg msg;
565 struct rte_mp_reply reply;
566 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
570 memset(&msg, 0, sizeof(msg));
571 memset(&reply, 0, sizeof(reply));
573 /* no need to create tailq entries as this is entirely synchronous */
576 msg.len_param = sizeof(*req);
577 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
579 /* sync request carries no data */
580 req->t = REQ_TYPE_SYNC;
581 req->id = get_unique_id();
584 ts.tv_sec = MP_TIMEOUT_S;
586 /* there may be stray timeout still waiting */
588 ret = rte_mp_request_sync(&msg, &reply, &ts);
589 } while (ret != 0 && rte_errno == EEXIST);
591 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
596 if (reply.nb_received != reply.nb_sent) {
597 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
602 for (i = 0; i < reply.nb_received; i++) {
603 struct malloc_mp_req *resp =
604 (struct malloc_mp_req *)reply.msgs[i].param;
605 if (resp->t != REQ_TYPE_SYNC) {
606 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
610 if (resp->id != req->id) {
611 RTE_LOG(ERR, EAL, "Wrong request ID\n");
615 if (resp->result != REQ_RESULT_SUCCESS) {
616 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
628 /* this is a synchronous wrapper around a bunch of asynchronous requests to
629 * primary process. this will initiate a request and wait until responses come.
632 request_to_primary(struct malloc_mp_req *user_req)
634 struct rte_mp_msg msg;
635 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
636 struct mp_request *entry;
641 memset(&msg, 0, sizeof(msg));
642 memset(&ts, 0, sizeof(ts));
644 pthread_mutex_lock(&mp_request_list.lock);
646 entry = malloc(sizeof(*entry));
648 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
652 memset(entry, 0, sizeof(*entry));
654 if (gettimeofday(&now, NULL) < 0) {
655 RTE_LOG(ERR, EAL, "Cannot get current time\n");
659 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
660 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
661 (now.tv_usec * 1000) / 1000000000;
663 /* initialize the request */
664 pthread_cond_init(&entry->cond, NULL);
667 msg.len_param = sizeof(*msg_req);
668 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
670 /* (attempt to) get a unique id */
671 user_req->id = get_unique_id();
673 /* copy contents of user request into the message */
674 memcpy(msg_req, user_req, sizeof(*msg_req));
676 if (rte_mp_sendmsg(&msg)) {
677 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
681 /* copy contents of user request into active request */
682 memcpy(&entry->user_req, user_req, sizeof(*user_req));
684 /* mark request as in progress */
685 entry->state = REQ_STATE_ACTIVE;
687 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
689 /* finally, wait on timeout */
691 ret = pthread_cond_timedwait(&entry->cond,
692 &mp_request_list.lock, &ts);
693 } while (ret != 0 && ret != ETIMEDOUT);
695 if (entry->state != REQ_STATE_COMPLETE) {
696 RTE_LOG(ERR, EAL, "Request timed out\n");
700 user_req->result = entry->user_req.result;
702 TAILQ_REMOVE(&mp_request_list.list, entry, next);
705 pthread_mutex_unlock(&mp_request_list.lock);
708 pthread_mutex_unlock(&mp_request_list.lock);
714 register_mp_requests(void)
716 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
717 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request)) {
718 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
723 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
724 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
728 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
729 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
733 if (rte_mp_action_register(MP_ACTION_RESPONSE,
735 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",