1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
10 #include <rte_string_fns.h>
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
15 #include "malloc_elem.h"
16 #include "malloc_mp.h"
18 #define MP_ACTION_SYNC "mp_malloc_sync"
19 /**< request sent by primary process to notify of changes in memory map */
20 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
21 /**< request sent by primary process to notify of changes in memory map. this is
22 * essentially a regular sync request, but we cannot send sync requests while
23 * another one is in progress, and we might have to - therefore, we do this as
24 * a separate callback.
26 #define MP_ACTION_REQUEST "mp_malloc_request"
27 /**< request sent by secondary process to ask for allocation/deallocation */
28 #define MP_ACTION_RESPONSE "mp_malloc_response"
29 /**< response sent to secondary process to indicate result of request */
31 /* forward declarations */
33 handle_sync_response(const struct rte_mp_msg *request,
34 const struct rte_mp_reply *reply);
36 handle_rollback_response(const struct rte_mp_msg *request,
37 const struct rte_mp_reply *reply);
39 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
41 /* when we're allocating, we need to store some state to ensure that we can
44 struct primary_alloc_req_state {
45 struct malloc_heap *heap;
46 struct rte_memseg **ms;
48 struct malloc_elem *elem;
54 REQ_STATE_INACTIVE = 0,
60 TAILQ_ENTRY(mp_request) next;
61 struct malloc_mp_req user_req; /**< contents of request */
62 pthread_cond_t cond; /**< variable we use to time out on this request */
63 enum req_state state; /**< indicate status of this request */
64 struct primary_alloc_req_state alloc_state;
68 * We could've used just a single request, but it may be possible for
69 * secondaries to timeout earlier than the primary, and send a new request while
70 * primary is still expecting replies to the old one. Therefore, each new
71 * request will get assigned a new ID, which is how we will distinguish between
72 * expected and unexpected messages.
74 TAILQ_HEAD(mp_request_list, mp_request);
76 struct mp_request_list list;
79 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
80 .lock = PTHREAD_MUTEX_INITIALIZER
84 * General workflow is the following:
87 * S: send request to primary
88 * P: attempt to allocate memory
89 * if failed, sendmsg failure
90 * if success, send sync request
91 * S: if received msg of failure, quit
92 * if received sync request, synchronize memory map and reply with result
93 * P: if received sync request result
94 * if success, sendmsg success
95 * if failure, roll back allocation and send a rollback request
96 * S: if received msg of success, quit
97 * if received rollback request, synchronize memory map and reply with result
98 * P: if received sync request result
99 * sendmsg sync request result
100 * S: if received msg, quit
102 * Aside from timeouts, there are three points where we can quit:
103 * - if allocation failed straight away
104 * - if allocation and sync request succeeded
105 * - if allocation succeeded, sync request failed, allocation rolled back and
106 * rollback request received (irrespective of whether it succeeded or failed)
109 * S: send request to primary
110 * P: attempt to deallocate memory
111 * if failed, sendmsg failure
112 * if success, send sync request
113 * S: if received msg of failure, quit
114 * if received sync request, synchronize memory map and reply with result
115 * P: if received sync request result
116 * sendmsg sync request result
117 * S: if received msg, quit
119 * There is no "rollback" from deallocation, as it's safe to have some memory
120 * mapped in some processes - it's absent from the heap, so it won't get used.
123 static struct mp_request *
124 find_request_by_id(uint64_t id)
126 struct mp_request *req;
127 TAILQ_FOREACH(req, &mp_request_list.list, next) {
128 if (req->user_req.id == id)
134 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
141 } while (find_request_by_id(id) != NULL);
145 /* secondary will respond to sync requests thusly */
147 handle_sync(const struct rte_mp_msg *msg, const void *peer)
149 struct rte_mp_msg reply;
150 const struct malloc_mp_req *req =
151 (const struct malloc_mp_req *)msg->param;
152 struct malloc_mp_req *resp =
153 (struct malloc_mp_req *)reply.param;
156 if (req->t != REQ_TYPE_SYNC) {
157 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
161 memset(&reply, 0, sizeof(reply));
164 strlcpy(reply.name, msg->name, sizeof(reply.name));
165 reply.len_param = sizeof(*resp);
167 ret = eal_memalloc_sync_with_primary();
169 resp->t = REQ_TYPE_SYNC;
171 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
173 rte_mp_reply(&reply, peer);
179 handle_alloc_request(const struct malloc_mp_req *m,
180 struct mp_request *req)
182 const struct malloc_req_alloc *ar = &m->alloc_req;
183 struct malloc_heap *heap;
184 struct malloc_elem *elem;
185 struct rte_memseg **ms;
190 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
191 MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
192 n_segs = alloc_sz / ar->page_sz;
196 /* we can't know in advance how many pages we'll need, so we malloc */
197 ms = malloc(sizeof(*ms) * n_segs);
199 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
202 memset(ms, 0, sizeof(*ms) * n_segs);
204 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
205 ar->flags, ar->align, ar->bound, ar->contig, ms,
211 map_addr = ms[0]->addr;
213 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
215 /* we have succeeded in allocating memory, but we still need to sync
216 * with other processes. however, since DPDK IPC is single-threaded, we
217 * send an asynchronous request and exit this callback.
220 req->alloc_state.ms = ms;
221 req->alloc_state.ms_len = n_segs;
222 req->alloc_state.map_addr = map_addr;
223 req->alloc_state.map_len = alloc_sz;
224 req->alloc_state.elem = elem;
225 req->alloc_state.heap = heap;
233 /* first stage of primary handling requests from secondary */
235 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
237 const struct malloc_mp_req *m =
238 (const struct malloc_mp_req *)msg->param;
239 struct mp_request *entry;
242 /* lock access to request */
243 pthread_mutex_lock(&mp_request_list.lock);
245 /* make sure it's not a dupe */
246 entry = find_request_by_id(m->id);
248 RTE_LOG(ERR, EAL, "Duplicate request id\n");
252 entry = malloc(sizeof(*entry));
254 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
259 memset(entry, 0, sizeof(*entry));
261 if (m->t == REQ_TYPE_ALLOC) {
262 ret = handle_alloc_request(m, entry);
263 } else if (m->t == REQ_TYPE_FREE) {
264 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
265 m->free_req.addr, m->free_req.len);
267 ret = malloc_heap_free_pages(m->free_req.addr,
270 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
275 struct rte_mp_msg resp_msg;
276 struct malloc_mp_req *resp =
277 (struct malloc_mp_req *)resp_msg.param;
279 /* send failure message straight away */
280 resp_msg.num_fds = 0;
281 resp_msg.len_param = sizeof(*resp);
282 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
283 sizeof(resp_msg.name));
286 resp->result = REQ_RESULT_FAIL;
289 if (rte_mp_sendmsg(&resp_msg)) {
290 RTE_LOG(ERR, EAL, "Couldn't send response\n");
293 /* we did not modify the request */
296 struct rte_mp_msg sr_msg;
297 struct malloc_mp_req *sr =
298 (struct malloc_mp_req *)sr_msg.param;
301 memset(&sr_msg, 0, sizeof(sr_msg));
303 /* we can do something, so send sync request asynchronously */
305 sr_msg.len_param = sizeof(*sr);
306 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
309 ts.tv_sec = MP_TIMEOUT_S;
311 /* sync requests carry no data */
312 sr->t = REQ_TYPE_SYNC;
315 /* there may be stray timeout still waiting */
317 ret = rte_mp_request_async(&sr_msg, &ts,
318 handle_sync_response);
319 } while (ret != 0 && rte_errno == EEXIST);
321 RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
322 if (m->t == REQ_TYPE_ALLOC)
323 free(entry->alloc_state.ms);
327 /* mark request as in progress */
328 memcpy(&entry->user_req, m, sizeof(*m));
329 entry->state = REQ_STATE_ACTIVE;
331 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
333 pthread_mutex_unlock(&mp_request_list.lock);
336 pthread_mutex_unlock(&mp_request_list.lock);
341 /* callback for asynchronous sync requests for primary. this will either do a
342 * sendmsg with results, or trigger rollback request.
345 handle_sync_response(const struct rte_mp_msg *request,
346 const struct rte_mp_reply *reply)
348 enum malloc_req_result result;
349 struct mp_request *entry;
350 const struct malloc_mp_req *mpreq =
351 (const struct malloc_mp_req *)request->param;
354 /* lock the request */
355 pthread_mutex_lock(&mp_request_list.lock);
357 entry = find_request_by_id(mpreq->id);
359 RTE_LOG(ERR, EAL, "Wrong request ID\n");
363 result = REQ_RESULT_SUCCESS;
365 if (reply->nb_received != reply->nb_sent)
366 result = REQ_RESULT_FAIL;
368 for (i = 0; i < reply->nb_received; i++) {
369 struct malloc_mp_req *resp =
370 (struct malloc_mp_req *)reply->msgs[i].param;
372 if (resp->t != REQ_TYPE_SYNC) {
373 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
374 result = REQ_RESULT_FAIL;
377 if (resp->id != entry->user_req.id) {
378 RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
379 result = REQ_RESULT_FAIL;
382 if (resp->result == REQ_RESULT_FAIL) {
383 result = REQ_RESULT_FAIL;
388 if (entry->user_req.t == REQ_TYPE_FREE) {
389 struct rte_mp_msg msg;
390 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
392 memset(&msg, 0, sizeof(msg));
394 /* this is a free request, just sendmsg result */
395 resp->t = REQ_TYPE_FREE;
396 resp->result = result;
397 resp->id = entry->user_req.id;
399 msg.len_param = sizeof(*resp);
400 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
402 if (rte_mp_sendmsg(&msg))
403 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
405 TAILQ_REMOVE(&mp_request_list.list, entry, next);
407 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
408 result == REQ_RESULT_SUCCESS) {
409 struct malloc_heap *heap = entry->alloc_state.heap;
410 struct rte_mp_msg msg;
411 struct malloc_mp_req *resp =
412 (struct malloc_mp_req *)msg.param;
414 memset(&msg, 0, sizeof(msg));
416 heap->total_size += entry->alloc_state.map_len;
418 /* result is success, so just notify secondary about this */
419 resp->t = REQ_TYPE_ALLOC;
420 resp->result = result;
421 resp->id = entry->user_req.id;
423 msg.len_param = sizeof(*resp);
424 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
426 if (rte_mp_sendmsg(&msg))
427 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
429 TAILQ_REMOVE(&mp_request_list.list, entry, next);
430 free(entry->alloc_state.ms);
432 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
433 result == REQ_RESULT_FAIL) {
434 struct rte_mp_msg rb_msg;
435 struct malloc_mp_req *rb =
436 (struct malloc_mp_req *)rb_msg.param;
438 struct primary_alloc_req_state *state =
442 memset(&rb_msg, 0, sizeof(rb_msg));
444 /* we've failed to sync, so do a rollback */
445 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
446 state->map_addr, state->map_len);
448 rollback_expand_heap(state->ms, state->ms_len, state->elem,
449 state->map_addr, state->map_len);
451 /* send rollback request */
453 rb_msg.len_param = sizeof(*rb);
454 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
457 ts.tv_sec = MP_TIMEOUT_S;
459 /* sync requests carry no data */
460 rb->t = REQ_TYPE_SYNC;
461 rb->id = entry->user_req.id;
463 /* there may be stray timeout still waiting */
465 ret = rte_mp_request_async(&rb_msg, &ts,
466 handle_rollback_response);
467 } while (ret != 0 && rte_errno == EEXIST);
469 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
471 /* we couldn't send rollback request, but that's OK -
472 * secondary will time out, and memory has been removed
475 TAILQ_REMOVE(&mp_request_list.list, entry, next);
481 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
485 pthread_mutex_unlock(&mp_request_list.lock);
488 pthread_mutex_unlock(&mp_request_list.lock);
493 handle_rollback_response(const struct rte_mp_msg *request,
494 const struct rte_mp_reply *reply __rte_unused)
496 struct rte_mp_msg msg;
497 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
498 const struct malloc_mp_req *mpreq =
499 (const struct malloc_mp_req *)request->param;
500 struct mp_request *entry;
502 /* lock the request */
503 pthread_mutex_lock(&mp_request_list.lock);
505 memset(&msg, 0, sizeof(msg));
507 entry = find_request_by_id(mpreq->id);
509 RTE_LOG(ERR, EAL, "Wrong request ID\n");
513 if (entry->user_req.t != REQ_TYPE_ALLOC) {
514 RTE_LOG(ERR, EAL, "Unexpected active request\n");
518 /* we don't care if rollback succeeded, request still failed */
519 resp->t = REQ_TYPE_ALLOC;
520 resp->result = REQ_RESULT_FAIL;
521 resp->id = mpreq->id;
523 msg.len_param = sizeof(*resp);
524 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
526 if (rte_mp_sendmsg(&msg))
527 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
530 TAILQ_REMOVE(&mp_request_list.list, entry, next);
531 free(entry->alloc_state.ms);
534 pthread_mutex_unlock(&mp_request_list.lock);
537 pthread_mutex_unlock(&mp_request_list.lock);
541 /* final stage of the request from secondary */
543 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
545 const struct malloc_mp_req *m =
546 (const struct malloc_mp_req *)msg->param;
547 struct mp_request *entry;
549 pthread_mutex_lock(&mp_request_list.lock);
551 entry = find_request_by_id(m->id);
553 /* update request status */
554 entry->user_req.result = m->result;
556 entry->state = REQ_STATE_COMPLETE;
558 /* trigger thread wakeup */
559 pthread_cond_signal(&entry->cond);
562 pthread_mutex_unlock(&mp_request_list.lock);
567 /* synchronously request memory map sync, this is only called whenever primary
568 * process initiates the allocation.
573 struct rte_mp_msg msg;
574 struct rte_mp_reply reply;
575 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
579 memset(&msg, 0, sizeof(msg));
580 memset(&reply, 0, sizeof(reply));
582 /* no need to create tailq entries as this is entirely synchronous */
585 msg.len_param = sizeof(*req);
586 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
588 /* sync request carries no data */
589 req->t = REQ_TYPE_SYNC;
590 req->id = get_unique_id();
593 ts.tv_sec = MP_TIMEOUT_S;
595 /* there may be stray timeout still waiting */
597 ret = rte_mp_request_sync(&msg, &reply, &ts);
598 } while (ret != 0 && rte_errno == EEXIST);
600 /* if IPC is unsupported, behave as if the call succeeded */
601 if (rte_errno != ENOTSUP)
602 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
608 if (reply.nb_received != reply.nb_sent) {
609 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
613 for (i = 0; i < reply.nb_received; i++) {
614 struct malloc_mp_req *resp =
615 (struct malloc_mp_req *)reply.msgs[i].param;
616 if (resp->t != REQ_TYPE_SYNC) {
617 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
620 if (resp->id != req->id) {
621 RTE_LOG(ERR, EAL, "Wrong request ID\n");
624 if (resp->result != REQ_RESULT_SUCCESS) {
625 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
636 /* this is a synchronous wrapper around a bunch of asynchronous requests to
637 * primary process. this will initiate a request and wait until responses come.
640 request_to_primary(struct malloc_mp_req *user_req)
642 struct rte_mp_msg msg;
643 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
644 struct mp_request *entry;
649 memset(&msg, 0, sizeof(msg));
650 memset(&ts, 0, sizeof(ts));
652 pthread_mutex_lock(&mp_request_list.lock);
654 entry = malloc(sizeof(*entry));
656 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
660 memset(entry, 0, sizeof(*entry));
662 if (gettimeofday(&now, NULL) < 0) {
663 RTE_LOG(ERR, EAL, "Cannot get current time\n");
667 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
668 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
669 (now.tv_usec * 1000) / 1000000000;
671 /* initialize the request */
672 pthread_cond_init(&entry->cond, NULL);
675 msg.len_param = sizeof(*msg_req);
676 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
678 /* (attempt to) get a unique id */
679 user_req->id = get_unique_id();
681 /* copy contents of user request into the message */
682 memcpy(msg_req, user_req, sizeof(*msg_req));
684 if (rte_mp_sendmsg(&msg)) {
685 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
689 /* copy contents of user request into active request */
690 memcpy(&entry->user_req, user_req, sizeof(*user_req));
692 /* mark request as in progress */
693 entry->state = REQ_STATE_ACTIVE;
695 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
697 /* finally, wait on timeout */
699 ret = pthread_cond_timedwait(&entry->cond,
700 &mp_request_list.lock, &ts);
701 } while (ret != 0 && ret != ETIMEDOUT);
703 if (entry->state != REQ_STATE_COMPLETE) {
704 RTE_LOG(ERR, EAL, "Request timed out\n");
708 user_req->result = entry->user_req.result;
710 TAILQ_REMOVE(&mp_request_list.list, entry, next);
713 pthread_mutex_unlock(&mp_request_list.lock);
716 pthread_mutex_unlock(&mp_request_list.lock);
722 register_mp_requests(void)
724 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
725 /* it's OK for primary to not support IPC */
726 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
727 rte_errno != ENOTSUP) {
728 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
733 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
734 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
738 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
739 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
743 if (rte_mp_action_register(MP_ACTION_RESPONSE,
745 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",