1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
11 #include "eal_memalloc.h"
13 #include "malloc_elem.h"
14 #include "malloc_mp.h"
16 #define MP_ACTION_SYNC "mp_malloc_sync"
17 /**< request sent by primary process to notify of changes in memory map */
18 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
19 /**< request sent by primary process to notify of changes in memory map. this is
20 * essentially a regular sync request, but we cannot send sync requests while
21 * another one is in progress, and we might have to - therefore, we do this as
22 * a separate callback.
24 #define MP_ACTION_REQUEST "mp_malloc_request"
25 /**< request sent by secondary process to ask for allocation/deallocation */
26 #define MP_ACTION_RESPONSE "mp_malloc_response"
27 /**< response sent to secondary process to indicate result of request */
29 /* forward declarations */
31 handle_sync_response(const struct rte_mp_msg *request,
32 const struct rte_mp_reply *reply);
34 handle_rollback_response(const struct rte_mp_msg *request,
35 const struct rte_mp_reply *reply);
37 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
39 /* when we're allocating, we need to store some state to ensure that we can
42 struct primary_alloc_req_state {
43 struct malloc_heap *heap;
44 struct rte_memseg **ms;
46 struct malloc_elem *elem;
52 REQ_STATE_INACTIVE = 0,
58 TAILQ_ENTRY(mp_request) next;
59 struct malloc_mp_req user_req; /**< contents of request */
60 pthread_cond_t cond; /**< variable we use to time out on this request */
61 enum req_state state; /**< indicate status of this request */
62 struct primary_alloc_req_state alloc_state;
66 * We could've used just a single request, but it may be possible for
67 * secondaries to timeout earlier than the primary, and send a new request while
68 * primary is still expecting replies to the old one. Therefore, each new
69 * request will get assigned a new ID, which is how we will distinguish between
70 * expected and unexpected messages.
72 TAILQ_HEAD(mp_request_list, mp_request);
74 struct mp_request_list list;
77 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
78 .lock = PTHREAD_MUTEX_INITIALIZER
82 * General workflow is the following:
85 * S: send request to primary
86 * P: attempt to allocate memory
87 * if failed, sendmsg failure
88 * if success, send sync request
89 * S: if received msg of failure, quit
90 * if received sync request, synchronize memory map and reply with result
91 * P: if received sync request result
92 * if success, sendmsg success
93 * if failure, roll back allocation and send a rollback request
94 * S: if received msg of success, quit
95 * if received rollback request, synchronize memory map and reply with result
96 * P: if received sync request result
97 * sendmsg sync request result
98 * S: if received msg, quit
100 * Aside from timeouts, there are three points where we can quit:
101 * - if allocation failed straight away
102 * - if allocation and sync request succeeded
103 * - if allocation succeeded, sync request failed, allocation rolled back and
104 * rollback request received (irrespective of whether it succeeded or failed)
107 * S: send request to primary
108 * P: attempt to deallocate memory
109 * if failed, sendmsg failure
110 * if success, send sync request
111 * S: if received msg of failure, quit
112 * if received sync request, synchronize memory map and reply with result
113 * P: if received sync request result
114 * sendmsg sync request result
115 * S: if received msg, quit
117 * There is no "rollback" from deallocation, as it's safe to have some memory
118 * mapped in some processes - it's absent from the heap, so it won't get used.
121 static struct mp_request *
122 find_request_by_id(uint64_t id)
124 struct mp_request *req;
125 TAILQ_FOREACH(req, &mp_request_list.list, next) {
126 if (req->user_req.id == id)
132 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
139 } while (find_request_by_id(id) != NULL);
143 /* secondary will respond to sync requests thusly */
145 handle_sync(const struct rte_mp_msg *msg, const void *peer)
147 struct rte_mp_msg reply;
148 const struct malloc_mp_req *req =
149 (const struct malloc_mp_req *)msg->param;
150 struct malloc_mp_req *resp =
151 (struct malloc_mp_req *)reply.param;
154 if (req->t != REQ_TYPE_SYNC) {
155 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
159 memset(&reply, 0, sizeof(reply));
162 snprintf(reply.name, sizeof(reply.name), "%s", msg->name);
163 reply.len_param = sizeof(*resp);
165 ret = eal_memalloc_sync_with_primary();
167 resp->t = REQ_TYPE_SYNC;
169 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
171 rte_mp_reply(&reply, peer);
177 handle_alloc_request(const struct malloc_mp_req *m,
178 struct mp_request *req)
180 const struct malloc_req_alloc *ar = &m->alloc_req;
181 struct malloc_heap *heap;
182 struct malloc_elem *elem;
183 struct rte_memseg **ms;
188 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
189 MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
190 n_segs = alloc_sz / ar->page_sz;
194 /* we can't know in advance how many pages we'll need, so we malloc */
195 ms = malloc(sizeof(*ms) * n_segs);
197 memset(ms, 0, sizeof(*ms) * n_segs);
200 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
204 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
205 ar->flags, ar->align, ar->bound, ar->contig, ms,
211 map_addr = ms[0]->addr;
213 /* we have succeeded in allocating memory, but we still need to sync
214 * with other processes. however, since DPDK IPC is single-threaded, we
215 * send an asynchronous request and exit this callback.
218 req->alloc_state.ms = ms;
219 req->alloc_state.ms_len = n_segs;
220 req->alloc_state.map_addr = map_addr;
221 req->alloc_state.map_len = alloc_sz;
222 req->alloc_state.elem = elem;
223 req->alloc_state.heap = heap;
231 /* first stage of primary handling requests from secondary */
233 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
235 const struct malloc_mp_req *m =
236 (const struct malloc_mp_req *)msg->param;
237 struct mp_request *entry;
240 /* lock access to request */
241 pthread_mutex_lock(&mp_request_list.lock);
243 /* make sure it's not a dupe */
244 entry = find_request_by_id(m->id);
246 RTE_LOG(ERR, EAL, "Duplicate request id\n");
250 entry = malloc(sizeof(*entry));
252 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
257 memset(entry, 0, sizeof(*entry));
259 if (m->t == REQ_TYPE_ALLOC) {
260 ret = handle_alloc_request(m, entry);
261 } else if (m->t == REQ_TYPE_FREE) {
262 ret = malloc_heap_free_pages(m->free_req.addr,
265 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
270 struct rte_mp_msg resp_msg;
271 struct malloc_mp_req *resp =
272 (struct malloc_mp_req *)resp_msg.param;
274 /* send failure message straight away */
275 resp_msg.num_fds = 0;
276 resp_msg.len_param = sizeof(*resp);
277 snprintf(resp_msg.name, sizeof(resp_msg.name), "%s",
281 resp->result = REQ_RESULT_FAIL;
284 if (rte_mp_sendmsg(&resp_msg)) {
285 RTE_LOG(ERR, EAL, "Couldn't send response\n");
288 /* we did not modify the request */
291 struct rte_mp_msg sr_msg;
292 struct malloc_mp_req *sr =
293 (struct malloc_mp_req *)sr_msg.param;
296 memset(&sr_msg, 0, sizeof(sr_msg));
298 /* we can do something, so send sync request asynchronously */
300 sr_msg.len_param = sizeof(*sr);
301 snprintf(sr_msg.name, sizeof(sr_msg.name), "%s",
305 ts.tv_sec = MP_TIMEOUT_S;
307 /* sync requests carry no data */
308 sr->t = REQ_TYPE_SYNC;
311 /* there may be stray timeout still waiting */
313 ret = rte_mp_request_async(&sr_msg, &ts,
314 handle_sync_response);
315 } while (ret != 0 && rte_errno == EEXIST);
317 RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
318 if (m->t == REQ_TYPE_ALLOC)
319 free(entry->alloc_state.ms);
323 /* mark request as in progress */
324 memcpy(&entry->user_req, m, sizeof(*m));
325 entry->state = REQ_STATE_ACTIVE;
327 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
329 pthread_mutex_unlock(&mp_request_list.lock);
332 pthread_mutex_unlock(&mp_request_list.lock);
337 /* callback for asynchronous sync requests for primary. this will either do a
338 * sendmsg with results, or trigger rollback request.
341 handle_sync_response(const struct rte_mp_msg *request,
342 const struct rte_mp_reply *reply)
344 enum malloc_req_result result;
345 struct mp_request *entry;
346 const struct malloc_mp_req *mpreq =
347 (const struct malloc_mp_req *)request->param;
350 /* lock the request */
351 pthread_mutex_lock(&mp_request_list.lock);
353 entry = find_request_by_id(mpreq->id);
355 RTE_LOG(ERR, EAL, "Wrong request ID\n");
359 result = REQ_RESULT_SUCCESS;
361 if (reply->nb_received != reply->nb_sent)
362 result = REQ_RESULT_FAIL;
364 for (i = 0; i < reply->nb_received; i++) {
365 struct malloc_mp_req *resp =
366 (struct malloc_mp_req *)reply->msgs[i].param;
368 if (resp->t != REQ_TYPE_SYNC) {
369 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
370 result = REQ_RESULT_FAIL;
373 if (resp->id != entry->user_req.id) {
374 RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
375 result = REQ_RESULT_FAIL;
378 if (resp->result == REQ_RESULT_FAIL) {
379 result = REQ_RESULT_FAIL;
384 if (entry->user_req.t == REQ_TYPE_FREE) {
385 struct rte_mp_msg msg;
386 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
388 memset(&msg, 0, sizeof(msg));
390 /* this is a free request, just sendmsg result */
391 resp->t = REQ_TYPE_FREE;
392 resp->result = result;
393 resp->id = entry->user_req.id;
395 msg.len_param = sizeof(*resp);
396 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
398 if (rte_mp_sendmsg(&msg))
399 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
401 TAILQ_REMOVE(&mp_request_list.list, entry, next);
403 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
404 result == REQ_RESULT_SUCCESS) {
405 struct malloc_heap *heap = entry->alloc_state.heap;
406 struct rte_mp_msg msg;
407 struct malloc_mp_req *resp =
408 (struct malloc_mp_req *)msg.param;
410 memset(&msg, 0, sizeof(msg));
412 heap->total_size += entry->alloc_state.map_len;
414 /* result is success, so just notify secondary about this */
415 resp->t = REQ_TYPE_ALLOC;
416 resp->result = result;
417 resp->id = entry->user_req.id;
419 msg.len_param = sizeof(*resp);
420 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
422 if (rte_mp_sendmsg(&msg))
423 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
425 TAILQ_REMOVE(&mp_request_list.list, entry, next);
426 free(entry->alloc_state.ms);
428 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
429 result == REQ_RESULT_FAIL) {
430 struct rte_mp_msg rb_msg;
431 struct malloc_mp_req *rb =
432 (struct malloc_mp_req *)rb_msg.param;
434 struct primary_alloc_req_state *state =
438 memset(&rb_msg, 0, sizeof(rb_msg));
440 /* we've failed to sync, so do a rollback */
441 rollback_expand_heap(state->ms, state->ms_len, state->elem,
442 state->map_addr, state->map_len);
444 /* send rollback request */
446 rb_msg.len_param = sizeof(*rb);
447 snprintf(rb_msg.name, sizeof(rb_msg.name), "%s",
451 ts.tv_sec = MP_TIMEOUT_S;
453 /* sync requests carry no data */
454 rb->t = REQ_TYPE_SYNC;
455 rb->id = entry->user_req.id;
457 /* there may be stray timeout still waiting */
459 ret = rte_mp_request_async(&rb_msg, &ts,
460 handle_rollback_response);
461 } while (ret != 0 && rte_errno == EEXIST);
463 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
465 /* we couldn't send rollback request, but that's OK -
466 * secondary will time out, and memory has been removed
469 TAILQ_REMOVE(&mp_request_list.list, entry, next);
475 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
479 pthread_mutex_unlock(&mp_request_list.lock);
482 pthread_mutex_unlock(&mp_request_list.lock);
487 handle_rollback_response(const struct rte_mp_msg *request,
488 const struct rte_mp_reply *reply __rte_unused)
490 struct rte_mp_msg msg;
491 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
492 const struct malloc_mp_req *mpreq =
493 (const struct malloc_mp_req *)request->param;
494 struct mp_request *entry;
496 /* lock the request */
497 pthread_mutex_lock(&mp_request_list.lock);
499 memset(&msg, 0, sizeof(0));
501 entry = find_request_by_id(mpreq->id);
503 RTE_LOG(ERR, EAL, "Wrong request ID\n");
507 if (entry->user_req.t != REQ_TYPE_ALLOC) {
508 RTE_LOG(ERR, EAL, "Unexpected active request\n");
512 /* we don't care if rollback succeeded, request still failed */
513 resp->t = REQ_TYPE_ALLOC;
514 resp->result = REQ_RESULT_FAIL;
515 resp->id = mpreq->id;
517 msg.len_param = sizeof(*resp);
518 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
520 if (rte_mp_sendmsg(&msg))
521 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
524 TAILQ_REMOVE(&mp_request_list.list, entry, next);
525 free(entry->alloc_state.ms);
528 pthread_mutex_unlock(&mp_request_list.lock);
531 pthread_mutex_unlock(&mp_request_list.lock);
535 /* final stage of the request from secondary */
537 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
539 const struct malloc_mp_req *m =
540 (const struct malloc_mp_req *)msg->param;
541 struct mp_request *entry;
543 pthread_mutex_lock(&mp_request_list.lock);
545 entry = find_request_by_id(m->id);
547 /* update request status */
548 entry->user_req.result = m->result;
550 entry->state = REQ_STATE_COMPLETE;
552 /* trigger thread wakeup */
553 pthread_cond_signal(&entry->cond);
556 pthread_mutex_unlock(&mp_request_list.lock);
561 /* synchronously request memory map sync, this is only called whenever primary
562 * process initiates the allocation.
567 struct rte_mp_msg msg;
568 struct rte_mp_reply reply;
569 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
573 memset(&msg, 0, sizeof(msg));
574 memset(&reply, 0, sizeof(reply));
576 /* no need to create tailq entries as this is entirely synchronous */
579 msg.len_param = sizeof(*req);
580 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_SYNC);
582 /* sync request carries no data */
583 req->t = REQ_TYPE_SYNC;
584 req->id = get_unique_id();
587 ts.tv_sec = MP_TIMEOUT_S;
589 /* there may be stray timeout still waiting */
591 ret = rte_mp_request_sync(&msg, &reply, &ts);
592 } while (ret != 0 && rte_errno == EEXIST);
594 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
599 if (reply.nb_received != reply.nb_sent) {
600 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
605 for (i = 0; i < reply.nb_received; i++) {
606 struct malloc_mp_req *resp =
607 (struct malloc_mp_req *)reply.msgs[i].param;
608 if (resp->t != REQ_TYPE_SYNC) {
609 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
613 if (resp->id != req->id) {
614 RTE_LOG(ERR, EAL, "Wrong request ID\n");
618 if (resp->result != REQ_RESULT_SUCCESS) {
619 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
631 /* this is a synchronous wrapper around a bunch of asynchronous requests to
632 * primary process. this will initiate a request and wait until responses come.
635 request_to_primary(struct malloc_mp_req *user_req)
637 struct rte_mp_msg msg;
638 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
639 struct mp_request *entry;
644 memset(&msg, 0, sizeof(msg));
645 memset(&ts, 0, sizeof(ts));
647 pthread_mutex_lock(&mp_request_list.lock);
649 entry = malloc(sizeof(*entry));
651 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
655 memset(entry, 0, sizeof(*entry));
657 if (gettimeofday(&now, NULL) < 0) {
658 RTE_LOG(ERR, EAL, "Cannot get current time\n");
662 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
663 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
664 (now.tv_usec * 1000) / 1000000000;
666 /* initialize the request */
667 pthread_cond_init(&entry->cond, NULL);
670 msg.len_param = sizeof(*msg_req);
671 snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_REQUEST);
673 /* (attempt to) get a unique id */
674 user_req->id = get_unique_id();
676 /* copy contents of user request into the message */
677 memcpy(msg_req, user_req, sizeof(*msg_req));
679 if (rte_mp_sendmsg(&msg)) {
680 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
684 /* copy contents of user request into active request */
685 memcpy(&entry->user_req, user_req, sizeof(*user_req));
687 /* mark request as in progress */
688 entry->state = REQ_STATE_ACTIVE;
690 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
692 /* finally, wait on timeout */
694 ret = pthread_cond_timedwait(&entry->cond,
695 &mp_request_list.lock, &ts);
696 } while (ret != 0 && ret != ETIMEDOUT);
698 if (entry->state != REQ_STATE_COMPLETE) {
699 RTE_LOG(ERR, EAL, "Request timed out\n");
703 user_req->result = entry->user_req.result;
705 TAILQ_REMOVE(&mp_request_list.list, entry, next);
708 pthread_mutex_unlock(&mp_request_list.lock);
711 pthread_mutex_unlock(&mp_request_list.lock);
717 register_mp_requests(void)
719 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
720 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request)) {
721 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
726 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
727 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
731 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
732 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
736 if (rte_mp_action_register(MP_ACTION_RESPONSE,
738 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",