From bea84d5592eb2658cf8c36214a9db75d398efa1c Mon Sep 17 00:00:00 2001 From: Lukasz Wojciechowski Date: Sat, 17 Oct 2020 05:06:45 +0200 Subject: [PATCH] distributor: fix handshake synchronization rte_distributor_return_pkt function which is run on worker cores must wait for distributor core to clear handshake on retptr64 before using those buffers. While the handshake is set distributor core controls buffers and any operations on worker side might overwrite buffers which are unread yet. Same situation appears in the legacy single distributor. Function rte_distributor_return_pkt_single shouldn't modify the bufptr64 until handshake on it is cleared by distributor lcore. Fixes: 775003ad2f96 ("distributor: add new burst-capable library") Cc: stable@dpdk.org Signed-off-by: Lukasz Wojciechowski Acked-by: David Hunt Reviewed-by: Honnappa Nagarahalli --- lib/librte_distributor/rte_distributor.c | 12 ++++++++++++ lib/librte_distributor/rte_distributor_single.c | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 1c047f065a..c6b19a3886 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -169,6 +169,18 @@ rte_distributor_return_pkt(struct rte_distributor *d, return -EINVAL; } + /* Spin while handshake bits are set (scheduler clears it). + * Sync with worker on GET_BUF flag. + */ + while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED) + & RTE_DISTRIB_GET_BUF)) { + rte_pause(); + uint64_t t = rte_rdtsc()+100; + + while (rte_rdtsc() < t) + rte_pause(); + } + /* Sync with distributor to acquire retptrs */ __atomic_thread_fence(__ATOMIC_ACQUIRE); for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git a/lib/librte_distributor/rte_distributor_single.c b/lib/librte_distributor/rte_distributor_single.c index abaf7730c3..f4725b1d0b 100644 --- a/lib/librte_distributor/rte_distributor_single.c +++ b/lib/librte_distributor/rte_distributor_single.c @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct rte_distributor_single *d, union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; + while (unlikely(__atomic_load_n(&buf->bufptr64, __ATOMIC_RELAXED) + & RTE_DISTRIB_FLAGS_MASK)) + rte_pause(); + /* Sync with distributor on RETURN_BUF flag. */ __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); return 0; -- 2.20.1