1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Intel Corporation
10 #include <rte_string_fns.h>
12 #include "eal_memalloc.h"
13 #include "eal_memcfg.h"
14 #include "eal_private.h"
16 #include "malloc_elem.h"
17 #include "malloc_mp.h"
19 #define MP_ACTION_SYNC "mp_malloc_sync"
20 /**< request sent by primary process to notify of changes in memory map */
21 #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
22 /**< request sent by primary process to notify of changes in memory map. this is
23 * essentially a regular sync request, but we cannot send sync requests while
24 * another one is in progress, and we might have to - therefore, we do this as
25 * a separate callback.
27 #define MP_ACTION_REQUEST "mp_malloc_request"
28 /**< request sent by secondary process to ask for allocation/deallocation */
29 #define MP_ACTION_RESPONSE "mp_malloc_response"
30 /**< response sent to secondary process to indicate result of request */
32 /* forward declarations */
34 handle_sync_response(const struct rte_mp_msg *request,
35 const struct rte_mp_reply *reply);
37 handle_rollback_response(const struct rte_mp_msg *request,
38 const struct rte_mp_reply *reply);
40 #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
42 /* when we're allocating, we need to store some state to ensure that we can
45 struct primary_alloc_req_state {
46 struct malloc_heap *heap;
47 struct rte_memseg **ms;
49 struct malloc_elem *elem;
55 REQ_STATE_INACTIVE = 0,
61 TAILQ_ENTRY(mp_request) next;
62 struct malloc_mp_req user_req; /**< contents of request */
63 pthread_cond_t cond; /**< variable we use to time out on this request */
64 enum req_state state; /**< indicate status of this request */
65 struct primary_alloc_req_state alloc_state;
69 * We could've used just a single request, but it may be possible for
70 * secondaries to timeout earlier than the primary, and send a new request while
71 * primary is still expecting replies to the old one. Therefore, each new
72 * request will get assigned a new ID, which is how we will distinguish between
73 * expected and unexpected messages.
75 TAILQ_HEAD(mp_request_list, mp_request);
77 struct mp_request_list list;
80 .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
81 .lock = PTHREAD_MUTEX_INITIALIZER
85 * General workflow is the following:
88 * S: send request to primary
89 * P: attempt to allocate memory
90 * if failed, sendmsg failure
91 * if success, send sync request
92 * S: if received msg of failure, quit
93 * if received sync request, synchronize memory map and reply with result
94 * P: if received sync request result
95 * if success, sendmsg success
96 * if failure, roll back allocation and send a rollback request
97 * S: if received msg of success, quit
98 * if received rollback request, synchronize memory map and reply with result
99 * P: if received sync request result
100 * sendmsg sync request result
101 * S: if received msg, quit
103 * Aside from timeouts, there are three points where we can quit:
104 * - if allocation failed straight away
105 * - if allocation and sync request succeeded
106 * - if allocation succeeded, sync request failed, allocation rolled back and
107 * rollback request received (irrespective of whether it succeeded or failed)
110 * S: send request to primary
111 * P: attempt to deallocate memory
112 * if failed, sendmsg failure
113 * if success, send sync request
114 * S: if received msg of failure, quit
115 * if received sync request, synchronize memory map and reply with result
116 * P: if received sync request result
117 * sendmsg sync request result
118 * S: if received msg, quit
120 * There is no "rollback" from deallocation, as it's safe to have some memory
121 * mapped in some processes - it's absent from the heap, so it won't get used.
124 static struct mp_request *
125 find_request_by_id(uint64_t id)
127 struct mp_request *req;
128 TAILQ_FOREACH(req, &mp_request_list.list, next) {
129 if (req->user_req.id == id)
135 /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
142 } while (find_request_by_id(id) != NULL);
146 /* secondary will respond to sync requests thusly */
148 handle_sync(const struct rte_mp_msg *msg, const void *peer)
150 struct rte_mp_msg reply;
151 const struct malloc_mp_req *req =
152 (const struct malloc_mp_req *)msg->param;
153 struct malloc_mp_req *resp =
154 (struct malloc_mp_req *)reply.param;
157 if (req->t != REQ_TYPE_SYNC) {
158 RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
162 memset(&reply, 0, sizeof(reply));
165 strlcpy(reply.name, msg->name, sizeof(reply.name));
166 reply.len_param = sizeof(*resp);
168 ret = eal_memalloc_sync_with_primary();
170 resp->t = REQ_TYPE_SYNC;
172 resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
174 return rte_mp_reply(&reply, peer);
178 handle_free_request(const struct malloc_mp_req *m)
180 const struct rte_memseg_list *msl;
184 len = m->free_req.len;
185 start = m->free_req.addr;
186 end = RTE_PTR_ADD(start, len - 1);
188 /* check if the requested memory actually exists */
189 msl = rte_mem_virt2memseg_list(start);
191 RTE_LOG(ERR, EAL, "Requested to free unknown memory\n");
195 /* check if end is within the same memory region */
196 if (rte_mem_virt2memseg_list(end) != msl) {
197 RTE_LOG(ERR, EAL, "Requested to free memory spanning multiple regions\n");
201 /* we're supposed to only free memory that's not external */
203 RTE_LOG(ERR, EAL, "Requested to free external memory\n");
207 /* now that we've validated the request, announce it */
208 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
209 m->free_req.addr, m->free_req.len);
211 /* now, do the actual freeing */
212 return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
216 handle_alloc_request(const struct malloc_mp_req *m,
217 struct mp_request *req)
219 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
220 const struct malloc_req_alloc *ar = &m->alloc_req;
221 struct malloc_heap *heap;
222 struct malloc_elem *elem;
223 struct rte_memseg **ms;
228 /* this is checked by the API, but we need to prevent divide by zero */
229 if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
230 RTE_LOG(ERR, EAL, "Attempting to allocate with invalid page size\n");
234 /* heap idx is index into the heap array, not socket ID */
235 if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
236 RTE_LOG(ERR, EAL, "Attempting to allocate from invalid heap\n");
240 heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
243 * for allocations, we must only use internal heaps, but since the
244 * rte_malloc_heap_socket_is_external() is thread-safe and we're already
245 * read-locked, we'll have to take advantage of the fact that internal
246 * socket ID's are always lower than RTE_MAX_NUMA_NODES.
248 if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
249 RTE_LOG(ERR, EAL, "Attempting to allocate from external heap\n");
253 alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
254 MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
255 n_segs = alloc_sz / ar->page_sz;
257 /* we can't know in advance how many pages we'll need, so we malloc */
258 ms = malloc(sizeof(*ms) * n_segs);
260 RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
263 memset(ms, 0, sizeof(*ms) * n_segs);
265 elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
266 ar->flags, ar->align, ar->bound, ar->contig, ms,
272 map_addr = ms[0]->addr;
274 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
276 /* we have succeeded in allocating memory, but we still need to sync
277 * with other processes. however, since DPDK IPC is single-threaded, we
278 * send an asynchronous request and exit this callback.
281 req->alloc_state.ms = ms;
282 req->alloc_state.ms_len = n_segs;
283 req->alloc_state.map_addr = map_addr;
284 req->alloc_state.map_len = alloc_sz;
285 req->alloc_state.elem = elem;
286 req->alloc_state.heap = heap;
294 /* first stage of primary handling requests from secondary */
296 handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
298 const struct malloc_mp_req *m =
299 (const struct malloc_mp_req *)msg->param;
300 struct mp_request *entry;
303 /* lock access to request */
304 pthread_mutex_lock(&mp_request_list.lock);
306 /* make sure it's not a dupe */
307 entry = find_request_by_id(m->id);
309 RTE_LOG(ERR, EAL, "Duplicate request id\n");
313 entry = malloc(sizeof(*entry));
315 RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
320 memset(entry, 0, sizeof(*entry));
322 if (m->t == REQ_TYPE_ALLOC) {
323 ret = handle_alloc_request(m, entry);
324 } else if (m->t == REQ_TYPE_FREE) {
325 ret = handle_free_request(m);
327 RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
332 struct rte_mp_msg resp_msg;
333 struct malloc_mp_req *resp =
334 (struct malloc_mp_req *)resp_msg.param;
336 /* send failure message straight away */
337 resp_msg.num_fds = 0;
338 resp_msg.len_param = sizeof(*resp);
339 strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
340 sizeof(resp_msg.name));
343 resp->result = REQ_RESULT_FAIL;
346 if (rte_mp_sendmsg(&resp_msg)) {
347 RTE_LOG(ERR, EAL, "Couldn't send response\n");
350 /* we did not modify the request */
353 struct rte_mp_msg sr_msg;
354 struct malloc_mp_req *sr =
355 (struct malloc_mp_req *)sr_msg.param;
358 memset(&sr_msg, 0, sizeof(sr_msg));
360 /* we can do something, so send sync request asynchronously */
362 sr_msg.len_param = sizeof(*sr);
363 strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
366 ts.tv_sec = MP_TIMEOUT_S;
368 /* sync requests carry no data */
369 sr->t = REQ_TYPE_SYNC;
372 /* there may be stray timeout still waiting */
374 ret = rte_mp_request_async(&sr_msg, &ts,
375 handle_sync_response);
376 } while (ret != 0 && rte_errno == EEXIST);
378 RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
379 if (m->t == REQ_TYPE_ALLOC)
380 free(entry->alloc_state.ms);
384 /* mark request as in progress */
385 memcpy(&entry->user_req, m, sizeof(*m));
386 entry->state = REQ_STATE_ACTIVE;
388 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
390 pthread_mutex_unlock(&mp_request_list.lock);
393 pthread_mutex_unlock(&mp_request_list.lock);
398 /* callback for asynchronous sync requests for primary. this will either do a
399 * sendmsg with results, or trigger rollback request.
402 handle_sync_response(const struct rte_mp_msg *request,
403 const struct rte_mp_reply *reply)
405 enum malloc_req_result result;
406 struct mp_request *entry;
407 const struct malloc_mp_req *mpreq =
408 (const struct malloc_mp_req *)request->param;
411 /* lock the request */
412 pthread_mutex_lock(&mp_request_list.lock);
414 entry = find_request_by_id(mpreq->id);
416 RTE_LOG(ERR, EAL, "Wrong request ID\n");
420 result = REQ_RESULT_SUCCESS;
422 if (reply->nb_received != reply->nb_sent)
423 result = REQ_RESULT_FAIL;
425 for (i = 0; i < reply->nb_received; i++) {
426 struct malloc_mp_req *resp =
427 (struct malloc_mp_req *)reply->msgs[i].param;
429 if (resp->t != REQ_TYPE_SYNC) {
430 RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
431 result = REQ_RESULT_FAIL;
434 if (resp->id != entry->user_req.id) {
435 RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
436 result = REQ_RESULT_FAIL;
439 if (resp->result == REQ_RESULT_FAIL) {
440 result = REQ_RESULT_FAIL;
445 if (entry->user_req.t == REQ_TYPE_FREE) {
446 struct rte_mp_msg msg;
447 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
449 memset(&msg, 0, sizeof(msg));
451 /* this is a free request, just sendmsg result */
452 resp->t = REQ_TYPE_FREE;
453 resp->result = result;
454 resp->id = entry->user_req.id;
456 msg.len_param = sizeof(*resp);
457 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
459 if (rte_mp_sendmsg(&msg))
460 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
462 TAILQ_REMOVE(&mp_request_list.list, entry, next);
464 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
465 result == REQ_RESULT_SUCCESS) {
466 struct malloc_heap *heap = entry->alloc_state.heap;
467 struct rte_mp_msg msg;
468 struct malloc_mp_req *resp =
469 (struct malloc_mp_req *)msg.param;
471 memset(&msg, 0, sizeof(msg));
473 heap->total_size += entry->alloc_state.map_len;
475 /* result is success, so just notify secondary about this */
476 resp->t = REQ_TYPE_ALLOC;
477 resp->result = result;
478 resp->id = entry->user_req.id;
480 msg.len_param = sizeof(*resp);
481 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
483 if (rte_mp_sendmsg(&msg))
484 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
486 TAILQ_REMOVE(&mp_request_list.list, entry, next);
487 free(entry->alloc_state.ms);
489 } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
490 result == REQ_RESULT_FAIL) {
491 struct rte_mp_msg rb_msg;
492 struct malloc_mp_req *rb =
493 (struct malloc_mp_req *)rb_msg.param;
495 struct primary_alloc_req_state *state =
499 memset(&rb_msg, 0, sizeof(rb_msg));
501 /* we've failed to sync, so do a rollback */
502 eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
503 state->map_addr, state->map_len);
505 rollback_expand_heap(state->ms, state->ms_len, state->elem,
506 state->map_addr, state->map_len);
508 /* send rollback request */
510 rb_msg.len_param = sizeof(*rb);
511 strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
514 ts.tv_sec = MP_TIMEOUT_S;
516 /* sync requests carry no data */
517 rb->t = REQ_TYPE_SYNC;
518 rb->id = entry->user_req.id;
520 /* there may be stray timeout still waiting */
522 ret = rte_mp_request_async(&rb_msg, &ts,
523 handle_rollback_response);
524 } while (ret != 0 && rte_errno == EEXIST);
526 RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
528 /* we couldn't send rollback request, but that's OK -
529 * secondary will time out, and memory has been removed
532 TAILQ_REMOVE(&mp_request_list.list, entry, next);
538 RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
542 pthread_mutex_unlock(&mp_request_list.lock);
545 pthread_mutex_unlock(&mp_request_list.lock);
550 handle_rollback_response(const struct rte_mp_msg *request,
551 const struct rte_mp_reply *reply __rte_unused)
553 struct rte_mp_msg msg;
554 struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
555 const struct malloc_mp_req *mpreq =
556 (const struct malloc_mp_req *)request->param;
557 struct mp_request *entry;
559 /* lock the request */
560 pthread_mutex_lock(&mp_request_list.lock);
562 memset(&msg, 0, sizeof(msg));
564 entry = find_request_by_id(mpreq->id);
566 RTE_LOG(ERR, EAL, "Wrong request ID\n");
570 if (entry->user_req.t != REQ_TYPE_ALLOC) {
571 RTE_LOG(ERR, EAL, "Unexpected active request\n");
575 /* we don't care if rollback succeeded, request still failed */
576 resp->t = REQ_TYPE_ALLOC;
577 resp->result = REQ_RESULT_FAIL;
578 resp->id = mpreq->id;
580 msg.len_param = sizeof(*resp);
581 strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
583 if (rte_mp_sendmsg(&msg))
584 RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
587 TAILQ_REMOVE(&mp_request_list.list, entry, next);
588 free(entry->alloc_state.ms);
591 pthread_mutex_unlock(&mp_request_list.lock);
594 pthread_mutex_unlock(&mp_request_list.lock);
598 /* final stage of the request from secondary */
600 handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
602 const struct malloc_mp_req *m =
603 (const struct malloc_mp_req *)msg->param;
604 struct mp_request *entry;
606 pthread_mutex_lock(&mp_request_list.lock);
608 entry = find_request_by_id(m->id);
610 /* update request status */
611 entry->user_req.result = m->result;
613 entry->state = REQ_STATE_COMPLETE;
615 /* trigger thread wakeup */
616 pthread_cond_signal(&entry->cond);
619 pthread_mutex_unlock(&mp_request_list.lock);
624 /* synchronously request memory map sync, this is only called whenever primary
625 * process initiates the allocation.
630 struct rte_mp_msg msg;
631 struct rte_mp_reply reply;
632 struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
636 memset(&msg, 0, sizeof(msg));
637 memset(&reply, 0, sizeof(reply));
639 /* no need to create tailq entries as this is entirely synchronous */
642 msg.len_param = sizeof(*req);
643 strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
645 /* sync request carries no data */
646 req->t = REQ_TYPE_SYNC;
647 req->id = get_unique_id();
650 ts.tv_sec = MP_TIMEOUT_S;
652 /* there may be stray timeout still waiting */
654 ret = rte_mp_request_sync(&msg, &reply, &ts);
655 } while (ret != 0 && rte_errno == EEXIST);
657 /* if IPC is unsupported, behave as if the call succeeded */
658 if (rte_errno != ENOTSUP)
659 RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
665 if (reply.nb_received != reply.nb_sent) {
666 RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
670 for (i = 0; i < reply.nb_received; i++) {
671 struct malloc_mp_req *resp =
672 (struct malloc_mp_req *)reply.msgs[i].param;
673 if (resp->t != REQ_TYPE_SYNC) {
674 RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
677 if (resp->id != req->id) {
678 RTE_LOG(ERR, EAL, "Wrong request ID\n");
681 if (resp->result != REQ_RESULT_SUCCESS) {
682 RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
693 /* this is a synchronous wrapper around a bunch of asynchronous requests to
694 * primary process. this will initiate a request and wait until responses come.
697 request_to_primary(struct malloc_mp_req *user_req)
699 struct rte_mp_msg msg;
700 struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
701 struct mp_request *entry;
706 memset(&msg, 0, sizeof(msg));
707 memset(&ts, 0, sizeof(ts));
709 pthread_mutex_lock(&mp_request_list.lock);
711 entry = malloc(sizeof(*entry));
713 RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
717 memset(entry, 0, sizeof(*entry));
719 if (gettimeofday(&now, NULL) < 0) {
720 RTE_LOG(ERR, EAL, "Cannot get current time\n");
724 ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
725 ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
726 (now.tv_usec * 1000) / 1000000000;
728 /* initialize the request */
729 pthread_cond_init(&entry->cond, NULL);
732 msg.len_param = sizeof(*msg_req);
733 strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
735 /* (attempt to) get a unique id */
736 user_req->id = get_unique_id();
738 /* copy contents of user request into the message */
739 memcpy(msg_req, user_req, sizeof(*msg_req));
741 if (rte_mp_sendmsg(&msg)) {
742 RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
746 /* copy contents of user request into active request */
747 memcpy(&entry->user_req, user_req, sizeof(*user_req));
749 /* mark request as in progress */
750 entry->state = REQ_STATE_ACTIVE;
752 TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
754 /* finally, wait on timeout */
756 ret = pthread_cond_timedwait(&entry->cond,
757 &mp_request_list.lock, &ts);
758 } while (ret != 0 && ret != ETIMEDOUT);
760 if (entry->state != REQ_STATE_COMPLETE) {
761 RTE_LOG(ERR, EAL, "Request timed out\n");
765 user_req->result = entry->user_req.result;
767 TAILQ_REMOVE(&mp_request_list.list, entry, next);
770 pthread_mutex_unlock(&mp_request_list.lock);
773 pthread_mutex_unlock(&mp_request_list.lock);
779 register_mp_requests(void)
781 if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
782 /* it's OK for primary to not support IPC */
783 if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
784 rte_errno != ENOTSUP) {
785 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
790 if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
791 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
795 if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
796 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
800 if (rte_mp_action_register(MP_ACTION_RESPONSE,
802 RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",