#include <sys/queue.h>
#include <rte_memory.h>
+#include <rte_errno.h>
#include <rte_eal.h>
#include <rte_eal_memconfig.h>
#include <rte_launch.h>
#include "eal_memalloc.h"
#include "malloc_elem.h"
#include "malloc_heap.h"
+#include "malloc_mp.h"
static unsigned
check_hugepage_sz(unsigned flags, uint64_t hugepage_sz)
malloc_elem_free_list_insert(elem);
- heap->total_size += len;
-
return elem;
}
return elem == NULL ? NULL : (void *)(&elem[1]);
}
-static int
-try_expand_heap(struct malloc_heap *heap, size_t pg_sz, size_t elt_size,
+/* this function is exposed in malloc_mp.h */
+void
+rollback_expand_heap(struct rte_memseg **ms, int n_segs,
+ struct malloc_elem *elem, void *map_addr, size_t map_len)
+{
+ if (elem != NULL) {
+ malloc_elem_free_list_remove(elem);
+ malloc_elem_hide_region(elem, map_addr, map_len);
+ }
+
+ eal_memalloc_free_seg_bulk(ms, n_segs);
+}
+
+/* this function is exposed in malloc_mp.h */
+struct malloc_elem *
+alloc_pages_on_heap(struct malloc_heap *heap, uint64_t pg_sz, size_t elt_size,
int socket, unsigned int flags, size_t align, size_t bound,
- bool contig)
+ bool contig, struct rte_memseg **ms, int n_segs)
{
- size_t map_len;
struct rte_memseg_list *msl;
- struct rte_memseg **ms;
- struct malloc_elem *elem;
- int n_segs, allocd_pages;
+ struct malloc_elem *elem = NULL;
+ size_t alloc_sz;
+ int allocd_pages;
void *ret, *map_addr;
- align = RTE_MAX(align, MALLOC_ELEM_HEADER_LEN);
- map_len = RTE_ALIGN_CEIL(align + elt_size + MALLOC_ELEM_TRAILER_LEN,
- pg_sz);
-
- n_segs = map_len / pg_sz;
-
- /* we can't know in advance how many pages we'll need, so malloc */
- ms = malloc(sizeof(*ms) * n_segs);
-
allocd_pages = eal_memalloc_alloc_seg_bulk(ms, n_segs, pg_sz,
socket, true);
/* make sure we've allocated our pages... */
if (allocd_pages < 0)
- goto free_ms;
+ return NULL;
map_addr = ms[0]->addr;
msl = rte_mem_virt2memseg_list(map_addr);
+ alloc_sz = (size_t)msl->page_sz * allocd_pages;
/* check if we wanted contiguous memory but didn't get it */
- if (contig && !eal_memalloc_is_contig(msl, map_addr, map_len)) {
+ if (contig && !eal_memalloc_is_contig(msl, map_addr, alloc_sz)) {
RTE_LOG(DEBUG, EAL, "%s(): couldn't allocate physically contiguous space\n",
__func__);
- goto free_pages;
+ goto fail;
}
/* add newly minted memsegs to malloc heap */
- elem = malloc_heap_add_memory(heap, msl, map_addr, map_len);
+ elem = malloc_heap_add_memory(heap, msl, map_addr, alloc_sz);
/* try once more, as now we have allocated new memory */
ret = find_suitable_element(heap, elt_size, flags, align, bound,
contig);
if (ret == NULL)
+ goto fail;
+
+ return elem;
+
+fail:
+ rollback_expand_heap(ms, n_segs, elem, map_addr, alloc_sz);
+ return NULL;
+}
+
+static int
+try_expand_heap_primary(struct malloc_heap *heap, uint64_t pg_sz,
+ size_t elt_size, int socket, unsigned int flags, size_t align,
+ size_t bound, bool contig)
+{
+ struct malloc_elem *elem;
+ struct rte_memseg **ms;
+ void *map_addr;
+ size_t alloc_sz;
+ int n_segs;
+
+ alloc_sz = RTE_ALIGN_CEIL(align + elt_size +
+ MALLOC_ELEM_TRAILER_LEN, pg_sz);
+ n_segs = alloc_sz / pg_sz;
+
+ /* we can't know in advance how many pages we'll need, so we malloc */
+ ms = malloc(sizeof(*ms) * n_segs);
+
+ memset(ms, 0, sizeof(*ms) * n_segs);
+
+ if (ms == NULL)
+ return -1;
+
+ elem = alloc_pages_on_heap(heap, pg_sz, elt_size, socket, flags, align,
+ bound, contig, ms, n_segs);
+
+ if (elem == NULL)
+ goto free_ms;
+
+ map_addr = ms[0]->addr;
+
+ /* notify other processes that this has happened */
+ if (request_sync()) {
+ /* we couldn't ensure all processes have mapped memory,
+ * so free it back and notify everyone that it's been
+ * freed back.
+ */
goto free_elem;
+ }
+ heap->total_size += alloc_sz;
RTE_LOG(DEBUG, EAL, "Heap on socket %d was expanded by %zdMB\n",
- socket, map_len >> 20ULL);
+ socket, alloc_sz >> 20ULL);
free(ms);
return 0;
free_elem:
- malloc_elem_free_list_remove(elem);
- malloc_elem_hide_region(elem, map_addr, map_len);
- heap->total_size -= map_len;
+ rollback_expand_heap(ms, n_segs, elem, map_addr, alloc_sz);
-free_pages:
- eal_memalloc_free_seg_bulk(ms, n_segs);
+ request_sync();
free_ms:
free(ms);
return -1;
}
+static int
+try_expand_heap_secondary(struct malloc_heap *heap, uint64_t pg_sz,
+ size_t elt_size, int socket, unsigned int flags, size_t align,
+ size_t bound, bool contig)
+{
+ struct malloc_mp_req req;
+ int req_result;
+
+ memset(&req, 0, sizeof(req));
+
+ req.t = REQ_TYPE_ALLOC;
+ req.alloc_req.align = align;
+ req.alloc_req.bound = bound;
+ req.alloc_req.contig = contig;
+ req.alloc_req.flags = flags;
+ req.alloc_req.elt_size = elt_size;
+ req.alloc_req.page_sz = pg_sz;
+ req.alloc_req.socket = socket;
+ req.alloc_req.heap = heap; /* it's in shared memory */
+
+ req_result = request_to_primary(&req);
+
+ if (req_result != 0)
+ return -1;
+
+ if (req.result != REQ_RESULT_SUCCESS)
+ return -1;
+
+ return 0;
+}
+
+static int
+try_expand_heap(struct malloc_heap *heap, uint64_t pg_sz, size_t elt_size,
+ int socket, unsigned int flags, size_t align, size_t bound,
+ bool contig)
+{
+ struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
+ int ret;
+
+ rte_rwlock_write_lock(&mcfg->memory_hotplug_lock);
+
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ ret = try_expand_heap_primary(heap, pg_sz, elt_size, socket,
+ flags, align, bound, contig);
+ } else {
+ ret = try_expand_heap_secondary(heap, pg_sz, elt_size, socket,
+ flags, align, bound, contig);
+ }
+
+ rte_rwlock_write_unlock(&mcfg->memory_hotplug_lock);
+ return ret;
+}
+
static int
compare_pagesz(const void *a, const void *b)
{
}
static int
-alloc_mem_on_socket(size_t size, int socket, unsigned int flags, size_t align,
- size_t bound, bool contig)
+alloc_more_mem_on_socket(struct malloc_heap *heap, size_t size, int socket,
+ unsigned int flags, size_t align, size_t bound, bool contig)
{
struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
- struct malloc_heap *heap = &mcfg->malloc_heaps[socket];
struct rte_memseg_list *requested_msls[RTE_MAX_MEMSEG_LISTS];
struct rte_memseg_list *other_msls[RTE_MAX_MEMSEG_LISTS];
uint64_t requested_pg_sz[RTE_MAX_MEMSEG_LISTS];
if (ret != NULL)
goto alloc_unlock;
- if (!alloc_mem_on_socket(size, socket, flags, align, bound, contig)) {
+ if (!alloc_more_mem_on_socket(heap, size, socket, flags, align, bound,
+ contig)) {
ret = heap_alloc(heap, type, size, flags, align, bound, contig);
/* this should have succeeded */
return NULL;
}
+/* this function is exposed in malloc_mp.h */
+int
+malloc_heap_free_pages(void *aligned_start, size_t aligned_len)
+{
+ int n_segs, seg_idx, max_seg_idx;
+ struct rte_memseg_list *msl;
+ size_t page_sz;
+
+ msl = rte_mem_virt2memseg_list(aligned_start);
+ if (msl == NULL)
+ return -1;
+
+ page_sz = (size_t)msl->page_sz;
+ n_segs = aligned_len / page_sz;
+ seg_idx = RTE_PTR_DIFF(aligned_start, msl->base_va) / page_sz;
+ max_seg_idx = seg_idx + n_segs;
+
+ for (; seg_idx < max_seg_idx; seg_idx++) {
+ struct rte_memseg *ms;
+
+ ms = rte_fbarray_get(&msl->memseg_arr, seg_idx);
+ eal_memalloc_free_seg(ms);
+ }
+ return 0;
+}
+
int
malloc_heap_free(struct malloc_elem *elem)
{
+ struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
struct malloc_heap *heap;
void *start, *aligned_start, *end, *aligned_end;
size_t len, aligned_len, page_sz;
struct rte_memseg_list *msl;
- int n_segs, seg_idx, max_seg_idx, ret;
+ int ret;
if (!malloc_elem_cookies_ok(elem) || elem->state != ELEM_BUSY)
return -1;
if (aligned_len < page_sz)
goto free_unlock;
+ rte_rwlock_write_lock(&mcfg->memory_hotplug_lock);
+
+ /*
+ * we allow secondary processes to clear the heap of this allocated
+ * memory because it is safe to do so, as even if notifications about
+ * unmapped pages don't make it to other processes, heap is shared
+ * across all processes, and will become empty of this memory anyway,
+ * and nothing can allocate it back unless primary process will be able
+ * to deliver allocation message to every single running process.
+ */
+
malloc_elem_free_list_remove(elem);
malloc_elem_hide_region(elem, (void *) aligned_start, aligned_len);
- /* we don't really care if we fail to deallocate memory */
- n_segs = aligned_len / page_sz;
- seg_idx = RTE_PTR_DIFF(aligned_start, msl->base_va) / page_sz;
- max_seg_idx = seg_idx + n_segs;
+ heap->total_size -= aligned_len;
- for (; seg_idx < max_seg_idx; seg_idx++) {
- struct rte_memseg *ms;
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ /* don't care if any of this fails */
+ malloc_heap_free_pages(aligned_start, aligned_len);
- ms = rte_fbarray_get(&msl->memseg_arr, seg_idx);
- eal_memalloc_free_seg(ms);
+ request_sync();
+ } else {
+ struct malloc_mp_req req;
+
+ memset(&req, 0, sizeof(req));
+
+ req.t = REQ_TYPE_FREE;
+ req.free_req.addr = aligned_start;
+ req.free_req.len = aligned_len;
+
+ /*
+ * we request primary to deallocate pages, but we don't do it
+ * in this thread. instead, we notify primary that we would like
+ * to deallocate pages, and this process will receive another
+ * request (in parallel) that will do it for us on another
+ * thread.
+ *
+ * we also don't really care if this succeeds - the data is
+ * already removed from the heap, so it is, for all intents and
+ * purposes, hidden from the rest of DPDK even if some other
+ * process (including this one) may have these pages mapped.
+ */
+ request_to_primary(&req);
}
- heap->total_size -= aligned_len;
RTE_LOG(DEBUG, EAL, "Heap on socket %d was shrunk by %zdMB\n",
msl->socket_id, aligned_len >> 20ULL);
+
+ rte_rwlock_write_unlock(&mcfg->memory_hotplug_lock);
free_unlock:
rte_spinlock_unlock(&(heap->lock));
return ret;
{
struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
- if (mcfg == NULL)
+ if (register_mp_requests()) {
+ RTE_LOG(ERR, EAL, "Couldn't register malloc multiprocess actions\n");
return -1;
+ }
+
+ /* unlock mem hotplug here. it's safe for primary as no requests can
+ * even come before primary itself is fully initialized, and secondaries
+ * do not need to initialize the heap.
+ */
+ rte_rwlock_read_unlock(&mcfg->memory_hotplug_lock);
/* secondary process does not need to initialize anything */
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Intel Corporation
+ */
+
+#include <string.h>
+#include <sys/time.h>
+
+#include <rte_alarm.h>
+#include <rte_errno.h>
+
+#include "eal_memalloc.h"
+
+#include "malloc_elem.h"
+#include "malloc_mp.h"
+
+#define MP_ACTION_SYNC "mp_malloc_sync"
+/**< request sent by primary process to notify of changes in memory map */
+#define MP_ACTION_ROLLBACK "mp_malloc_rollback"
+/**< request sent by primary process to notify of changes in memory map. this is
+ * essentially a regular sync request, but we cannot send sync requests while
+ * another one is in progress, and we might have to - therefore, we do this as
+ * a separate callback.
+ */
+#define MP_ACTION_REQUEST "mp_malloc_request"
+/**< request sent by secondary process to ask for allocation/deallocation */
+#define MP_ACTION_RESPONSE "mp_malloc_response"
+/**< response sent to secondary process to indicate result of request */
+
+/* forward declarations */
+static int
+handle_sync_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+static int
+handle_rollback_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+#define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
+
+/* when we're allocating, we need to store some state to ensure that we can
+ * roll back later
+ */
+struct primary_alloc_req_state {
+ struct malloc_heap *heap;
+ struct rte_memseg **ms;
+ int ms_len;
+ struct malloc_elem *elem;
+ void *map_addr;
+ size_t map_len;
+};
+
+enum req_state {
+ REQ_STATE_INACTIVE = 0,
+ REQ_STATE_ACTIVE,
+ REQ_STATE_COMPLETE
+};
+
+struct mp_request {
+ TAILQ_ENTRY(mp_request) next;
+ struct malloc_mp_req user_req; /**< contents of request */
+ pthread_cond_t cond; /**< variable we use to time out on this request */
+ enum req_state state; /**< indicate status of this request */
+ struct primary_alloc_req_state alloc_state;
+};
+
+/*
+ * We could've used just a single request, but it may be possible for
+ * secondaries to timeout earlier than the primary, and send a new request while
+ * primary is still expecting replies to the old one. Therefore, each new
+ * request will get assigned a new ID, which is how we will distinguish between
+ * expected and unexpected messages.
+ */
+TAILQ_HEAD(mp_request_list, mp_request);
+static struct {
+ struct mp_request_list list;
+ pthread_mutex_t lock;
+} mp_request_list = {
+ .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
+ .lock = PTHREAD_MUTEX_INITIALIZER
+};
+
+/**
+ * General workflow is the following:
+ *
+ * Allocation:
+ * S: send request to primary
+ * P: attempt to allocate memory
+ * if failed, sendmsg failure
+ * if success, send sync request
+ * S: if received msg of failure, quit
+ * if received sync request, synchronize memory map and reply with result
+ * P: if received sync request result
+ * if success, sendmsg success
+ * if failure, roll back allocation and send a rollback request
+ * S: if received msg of success, quit
+ * if received rollback request, synchronize memory map and reply with result
+ * P: if received sync request result
+ * sendmsg sync request result
+ * S: if received msg, quit
+ *
+ * Aside from timeouts, there are three points where we can quit:
+ * - if allocation failed straight away
+ * - if allocation and sync request succeeded
+ * - if allocation succeeded, sync request failed, allocation rolled back and
+ * rollback request received (irrespective of whether it succeeded or failed)
+ *
+ * Deallocation:
+ * S: send request to primary
+ * P: attempt to deallocate memory
+ * if failed, sendmsg failure
+ * if success, send sync request
+ * S: if received msg of failure, quit
+ * if received sync request, synchronize memory map and reply with result
+ * P: if received sync request result
+ * sendmsg sync request result
+ * S: if received msg, quit
+ *
+ * There is no "rollback" from deallocation, as it's safe to have some memory
+ * mapped in some processes - it's absent from the heap, so it won't get used.
+ */
+
+static struct mp_request *
+find_request_by_id(uint64_t id)
+{
+ struct mp_request *req;
+ TAILQ_FOREACH(req, &mp_request_list.list, next) {
+ if (req->user_req.id == id)
+ break;
+ }
+ return req;
+}
+
+/* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
+static uint64_t
+get_unique_id(void)
+{
+ uint64_t id;
+ do {
+ id = rte_rand();
+ } while (find_request_by_id(id) != NULL);
+ return id;
+}
+
+/* secondary will respond to sync requests thusly */
+static int
+handle_sync(const struct rte_mp_msg *msg, const void *peer)
+{
+ struct rte_mp_msg reply;
+ const struct malloc_mp_req *req =
+ (const struct malloc_mp_req *)msg->param;
+ struct malloc_mp_req *resp =
+ (struct malloc_mp_req *)reply.param;
+ int ret;
+
+ if (req->t != REQ_TYPE_SYNC) {
+ RTE_LOG(ERR, EAL, "Unexpected request from primary\n");
+ return -1;
+ }
+
+ memset(&reply, 0, sizeof(reply));
+
+ reply.num_fds = 0;
+ snprintf(reply.name, sizeof(reply.name), "%s", msg->name);
+ reply.len_param = sizeof(*resp);
+
+ ret = eal_memalloc_sync_with_primary();
+
+ resp->t = REQ_TYPE_SYNC;
+ resp->id = req->id;
+ resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
+
+ rte_mp_reply(&reply, peer);
+
+ return 0;
+}
+
+static int
+handle_alloc_request(const struct malloc_mp_req *m,
+ struct mp_request *req)
+{
+ const struct malloc_req_alloc *ar = &m->alloc_req;
+ struct malloc_heap *heap;
+ struct malloc_elem *elem;
+ struct rte_memseg **ms;
+ size_t alloc_sz;
+ int n_segs;
+ void *map_addr;
+
+ alloc_sz = RTE_ALIGN_CEIL(ar->align + ar->elt_size +
+ MALLOC_ELEM_TRAILER_LEN, ar->page_sz);
+ n_segs = alloc_sz / ar->page_sz;
+
+ heap = ar->heap;
+
+ /* we can't know in advance how many pages we'll need, so we malloc */
+ ms = malloc(sizeof(*ms) * n_segs);
+
+ memset(ms, 0, sizeof(*ms) * n_segs);
+
+ if (ms == NULL) {
+ RTE_LOG(ERR, EAL, "Couldn't allocate memory for request state\n");
+ goto fail;
+ }
+
+ elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
+ ar->flags, ar->align, ar->bound, ar->contig, ms,
+ n_segs);
+
+ if (elem == NULL)
+ goto fail;
+
+ map_addr = ms[0]->addr;
+
+ /* we have succeeded in allocating memory, but we still need to sync
+ * with other processes. however, since DPDK IPC is single-threaded, we
+ * send an asynchronous request and exit this callback.
+ */
+
+ req->alloc_state.ms = ms;
+ req->alloc_state.ms_len = n_segs;
+ req->alloc_state.map_addr = map_addr;
+ req->alloc_state.map_len = alloc_sz;
+ req->alloc_state.elem = elem;
+ req->alloc_state.heap = heap;
+
+ return 0;
+fail:
+ free(ms);
+ return -1;
+}
+
+/* first stage of primary handling requests from secondary */
+static int
+handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
+{
+ const struct malloc_mp_req *m =
+ (const struct malloc_mp_req *)msg->param;
+ struct mp_request *entry;
+ int ret;
+
+ /* lock access to request */
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ /* make sure it's not a dupe */
+ entry = find_request_by_id(m->id);
+ if (entry != NULL) {
+ RTE_LOG(ERR, EAL, "Duplicate request id\n");
+ goto fail;
+ }
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ RTE_LOG(ERR, EAL, "Unable to allocate memory for request\n");
+ goto fail;
+ }
+
+ /* erase all data */
+ memset(entry, 0, sizeof(*entry));
+
+ if (m->t == REQ_TYPE_ALLOC) {
+ ret = handle_alloc_request(m, entry);
+ } else if (m->t == REQ_TYPE_FREE) {
+ ret = malloc_heap_free_pages(m->free_req.addr,
+ m->free_req.len);
+ } else {
+ RTE_LOG(ERR, EAL, "Unexpected request from secondary\n");
+ goto fail;
+ }
+
+ if (ret != 0) {
+ struct rte_mp_msg resp_msg;
+ struct malloc_mp_req *resp =
+ (struct malloc_mp_req *)resp_msg.param;
+
+ /* send failure message straight away */
+ resp_msg.num_fds = 0;
+ resp_msg.len_param = sizeof(*resp);
+ snprintf(resp_msg.name, sizeof(resp_msg.name), "%s",
+ MP_ACTION_RESPONSE);
+
+ resp->t = m->t;
+ resp->result = REQ_RESULT_FAIL;
+ resp->id = m->id;
+
+ if (rte_mp_sendmsg(&resp_msg)) {
+ RTE_LOG(ERR, EAL, "Couldn't send response\n");
+ goto fail;
+ }
+ /* we did not modify the request */
+ free(entry);
+ } else {
+ struct rte_mp_msg sr_msg;
+ struct malloc_mp_req *sr =
+ (struct malloc_mp_req *)sr_msg.param;
+ struct timespec ts;
+
+ memset(&sr_msg, 0, sizeof(sr_msg));
+
+ /* we can do something, so send sync request asynchronously */
+ sr_msg.num_fds = 0;
+ sr_msg.len_param = sizeof(*sr);
+ snprintf(sr_msg.name, sizeof(sr_msg.name), "%s",
+ MP_ACTION_SYNC);
+
+ ts.tv_nsec = 0;
+ ts.tv_sec = MP_TIMEOUT_S;
+
+ /* sync requests carry no data */
+ sr->t = REQ_TYPE_SYNC;
+ sr->id = m->id;
+
+ /* there may be stray timeout still waiting */
+ do {
+ ret = rte_mp_request_async(&sr_msg, &ts,
+ handle_sync_response);
+ } while (ret != 0 && rte_errno == EEXIST);
+ if (ret != 0) {
+ RTE_LOG(ERR, EAL, "Couldn't send sync request\n");
+ if (m->t == REQ_TYPE_ALLOC)
+ free(entry->alloc_state.ms);
+ goto fail;
+ }
+
+ /* mark request as in progress */
+ memcpy(&entry->user_req, m, sizeof(*m));
+ entry->state = REQ_STATE_ACTIVE;
+
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+ }
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return 0;
+fail:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ free(entry);
+ return -1;
+}
+
+/* callback for asynchronous sync requests for primary. this will either do a
+ * sendmsg with results, or trigger rollback request.
+ */
+static int
+handle_sync_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply)
+{
+ enum malloc_req_result result;
+ struct mp_request *entry;
+ const struct malloc_mp_req *mpreq =
+ (const struct malloc_mp_req *)request->param;
+ int i;
+
+ /* lock the request */
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(mpreq->id);
+ if (entry == NULL) {
+ RTE_LOG(ERR, EAL, "Wrong request ID\n");
+ goto fail;
+ }
+
+ result = REQ_RESULT_SUCCESS;
+
+ if (reply->nb_received != reply->nb_sent)
+ result = REQ_RESULT_FAIL;
+
+ for (i = 0; i < reply->nb_received; i++) {
+ struct malloc_mp_req *resp =
+ (struct malloc_mp_req *)reply->msgs[i].param;
+
+ if (resp->t != REQ_TYPE_SYNC) {
+ RTE_LOG(ERR, EAL, "Unexpected response to sync request\n");
+ result = REQ_RESULT_FAIL;
+ break;
+ }
+ if (resp->id != entry->user_req.id) {
+ RTE_LOG(ERR, EAL, "Response to wrong sync request\n");
+ result = REQ_RESULT_FAIL;
+ break;
+ }
+ if (resp->result == REQ_RESULT_FAIL) {
+ result = REQ_RESULT_FAIL;
+ break;
+ }
+ }
+
+ if (entry->user_req.t == REQ_TYPE_FREE) {
+ struct rte_mp_msg msg;
+ struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
+
+ memset(&msg, 0, sizeof(msg));
+
+ /* this is a free request, just sendmsg result */
+ resp->t = REQ_TYPE_FREE;
+ resp->result = result;
+ resp->id = entry->user_req.id;
+ msg.num_fds = 0;
+ msg.len_param = sizeof(*resp);
+ snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
+
+ if (rte_mp_sendmsg(&msg))
+ RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
+
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+ } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
+ result == REQ_RESULT_SUCCESS) {
+ struct malloc_heap *heap = entry->alloc_state.heap;
+ struct rte_mp_msg msg;
+ struct malloc_mp_req *resp =
+ (struct malloc_mp_req *)msg.param;
+
+ memset(&msg, 0, sizeof(msg));
+
+ heap->total_size += entry->alloc_state.map_len;
+
+ /* result is success, so just notify secondary about this */
+ resp->t = REQ_TYPE_ALLOC;
+ resp->result = result;
+ resp->id = entry->user_req.id;
+ msg.num_fds = 0;
+ msg.len_param = sizeof(*resp);
+ snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
+
+ if (rte_mp_sendmsg(&msg))
+ RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
+
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry->alloc_state.ms);
+ free(entry);
+ } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
+ result == REQ_RESULT_FAIL) {
+ struct rte_mp_msg rb_msg;
+ struct malloc_mp_req *rb =
+ (struct malloc_mp_req *)rb_msg.param;
+ struct timespec ts;
+ struct primary_alloc_req_state *state =
+ &entry->alloc_state;
+ int ret;
+
+ memset(&rb_msg, 0, sizeof(rb_msg));
+
+ /* we've failed to sync, so do a rollback */
+ rollback_expand_heap(state->ms, state->ms_len, state->elem,
+ state->map_addr, state->map_len);
+
+ /* send rollback request */
+ rb_msg.num_fds = 0;
+ rb_msg.len_param = sizeof(*rb);
+ snprintf(rb_msg.name, sizeof(rb_msg.name), "%s",
+ MP_ACTION_ROLLBACK);
+
+ ts.tv_nsec = 0;
+ ts.tv_sec = MP_TIMEOUT_S;
+
+ /* sync requests carry no data */
+ rb->t = REQ_TYPE_SYNC;
+ rb->id = entry->user_req.id;
+
+ /* there may be stray timeout still waiting */
+ do {
+ ret = rte_mp_request_async(&rb_msg, &ts,
+ handle_rollback_response);
+ } while (ret != 0 && rte_errno == EEXIST);
+ if (ret != 0) {
+ RTE_LOG(ERR, EAL, "Could not send rollback request to secondary process\n");
+
+ /* we couldn't send rollback request, but that's OK -
+ * secondary will time out, and memory has been removed
+ * from heap anyway.
+ */
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(state->ms);
+ free(entry);
+ goto fail;
+ }
+ } else {
+ RTE_LOG(ERR, EAL, " to sync request of unknown type\n");
+ goto fail;
+ }
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return 0;
+fail:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return -1;
+}
+
+static int
+handle_rollback_response(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply __rte_unused)
+{
+ struct rte_mp_msg msg;
+ struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
+ const struct malloc_mp_req *mpreq =
+ (const struct malloc_mp_req *)request->param;
+ struct mp_request *entry;
+
+ /* lock the request */
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ memset(&msg, 0, sizeof(0));
+
+ entry = find_request_by_id(mpreq->id);
+ if (entry == NULL) {
+ RTE_LOG(ERR, EAL, "Wrong request ID\n");
+ goto fail;
+ }
+
+ if (entry->user_req.t != REQ_TYPE_ALLOC) {
+ RTE_LOG(ERR, EAL, "Unexpected active request\n");
+ goto fail;
+ }
+
+ /* we don't care if rollback succeeded, request still failed */
+ resp->t = REQ_TYPE_ALLOC;
+ resp->result = REQ_RESULT_FAIL;
+ resp->id = mpreq->id;
+ msg.num_fds = 0;
+ msg.len_param = sizeof(*resp);
+ snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_RESPONSE);
+
+ if (rte_mp_sendmsg(&msg))
+ RTE_LOG(ERR, EAL, "Could not send message to secondary process\n");
+
+ /* clean up */
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry->alloc_state.ms);
+ free(entry);
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return 0;
+fail:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return -1;
+}
+
+/* final stage of the request from secondary */
+static int
+handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
+{
+ const struct malloc_mp_req *m =
+ (const struct malloc_mp_req *)msg->param;
+ struct mp_request *entry;
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = find_request_by_id(m->id);
+ if (entry != NULL) {
+ /* update request status */
+ entry->user_req.result = m->result;
+
+ entry->state = REQ_STATE_COMPLETE;
+
+ /* trigger thread wakeup */
+ pthread_cond_signal(&entry->cond);
+ }
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+
+ return 0;
+}
+
+/* synchronously request memory map sync, this is only called whenever primary
+ * process initiates the allocation.
+ */
+int
+request_sync(void)
+{
+ struct rte_mp_msg msg;
+ struct rte_mp_reply reply;
+ struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
+ struct timespec ts;
+ int i, ret;
+
+ memset(&msg, 0, sizeof(msg));
+ memset(&reply, 0, sizeof(reply));
+
+ /* no need to create tailq entries as this is entirely synchronous */
+
+ msg.num_fds = 0;
+ msg.len_param = sizeof(*req);
+ snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_SYNC);
+
+ /* sync request carries no data */
+ req->t = REQ_TYPE_SYNC;
+ req->id = get_unique_id();
+
+ ts.tv_nsec = 0;
+ ts.tv_sec = MP_TIMEOUT_S;
+
+ /* there may be stray timeout still waiting */
+ do {
+ ret = rte_mp_request_sync(&msg, &reply, &ts);
+ } while (ret != 0 && rte_errno == EEXIST);
+ if (ret != 0) {
+ RTE_LOG(ERR, EAL, "Could not send sync request to secondary process\n");
+ ret = -1;
+ goto out;
+ }
+
+ if (reply.nb_received != reply.nb_sent) {
+ RTE_LOG(ERR, EAL, "Not all secondaries have responded\n");
+ ret = -1;
+ goto out;
+ }
+
+ for (i = 0; i < reply.nb_received; i++) {
+ struct malloc_mp_req *resp =
+ (struct malloc_mp_req *)reply.msgs[i].param;
+ if (resp->t != REQ_TYPE_SYNC) {
+ RTE_LOG(ERR, EAL, "Unexpected response from secondary\n");
+ ret = -1;
+ goto out;
+ }
+ if (resp->id != req->id) {
+ RTE_LOG(ERR, EAL, "Wrong request ID\n");
+ ret = -1;
+ goto out;
+ }
+ if (resp->result != REQ_RESULT_SUCCESS) {
+ RTE_LOG(ERR, EAL, "Secondary process failed to synchronize\n");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ ret = 0;
+out:
+ free(reply.msgs);
+ return ret;
+}
+
+/* this is a synchronous wrapper around a bunch of asynchronous requests to
+ * primary process. this will initiate a request and wait until responses come.
+ */
+int
+request_to_primary(struct malloc_mp_req *user_req)
+{
+ struct rte_mp_msg msg;
+ struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
+ struct mp_request *entry;
+ struct timespec ts;
+ struct timeval now;
+ int ret;
+
+ memset(&msg, 0, sizeof(msg));
+ memset(&ts, 0, sizeof(ts));
+
+ pthread_mutex_lock(&mp_request_list.lock);
+
+ entry = malloc(sizeof(*entry));
+ if (entry == NULL) {
+ RTE_LOG(ERR, EAL, "Cannot allocate memory for request\n");
+ goto fail;
+ }
+
+ memset(entry, 0, sizeof(*entry));
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ goto fail;
+ }
+
+ ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
+ ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
+ (now.tv_usec * 1000) / 1000000000;
+
+ /* initialize the request */
+ pthread_cond_init(&entry->cond, NULL);
+
+ msg.num_fds = 0;
+ msg.len_param = sizeof(*msg_req);
+ snprintf(msg.name, sizeof(msg.name), "%s", MP_ACTION_REQUEST);
+
+ /* (attempt to) get a unique id */
+ user_req->id = get_unique_id();
+
+ /* copy contents of user request into the message */
+ memcpy(msg_req, user_req, sizeof(*msg_req));
+
+ if (rte_mp_sendmsg(&msg)) {
+ RTE_LOG(ERR, EAL, "Cannot send message to primary\n");
+ goto fail;
+ }
+
+ /* copy contents of user request into active request */
+ memcpy(&entry->user_req, user_req, sizeof(*user_req));
+
+ /* mark request as in progress */
+ entry->state = REQ_STATE_ACTIVE;
+
+ TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
+
+ /* finally, wait on timeout */
+ do {
+ ret = pthread_cond_timedwait(&entry->cond,
+ &mp_request_list.lock, &ts);
+ } while (ret != 0 && ret != ETIMEDOUT);
+
+ if (entry->state != REQ_STATE_COMPLETE) {
+ RTE_LOG(ERR, EAL, "Request timed out\n");
+ ret = -1;
+ } else {
+ ret = 0;
+ user_req->result = entry->user_req.result;
+ }
+ TAILQ_REMOVE(&mp_request_list.list, entry, next);
+ free(entry);
+
+ pthread_mutex_unlock(&mp_request_list.lock);
+ return ret;
+fail:
+ pthread_mutex_unlock(&mp_request_list.lock);
+ free(entry);
+ return -1;
+}
+
+int
+register_mp_requests(void)
+{
+ if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+ if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request)) {
+ RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
+ MP_ACTION_REQUEST);
+ return -1;
+ }
+ } else {
+ if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
+ RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
+ MP_ACTION_SYNC);
+ return -1;
+ }
+ if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
+ RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
+ MP_ACTION_SYNC);
+ return -1;
+ }
+ if (rte_mp_action_register(MP_ACTION_RESPONSE,
+ handle_response)) {
+ RTE_LOG(ERR, EAL, "Couldn't register '%s' action\n",
+ MP_ACTION_RESPONSE);
+ return -1;
+ }
+ }
+ return 0;
+}