From: Lukasz Wojciechowski Date: Sat, 17 Oct 2020 03:06:46 +0000 (+0200) Subject: distributor: fix handshake deadlock X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=5acce079e7dc773c17bf2043fc18106392b4a863;p=dpdk.git distributor: fix handshake deadlock Synchronization of data exchange between distributor and worker cores is based on 2 handshakes: retptr64 for returning mbufs from workers to distributor and bufptr64 for passing mbufs to workers. Without proper order of verifying those 2 handshakes a deadlock may occur. This can happen when worker core wants to return back mbufs and waits for retptr handshake to be cleared while distributor core waits for bufptr to send mbufs to worker. This can happen as worker core first returns mbufs to distributor and later gets new mbufs, while distributor first releases mbufs to worker and later handle returning packets. This patch fixes possibility of the deadlock by always taking care of returning packets first on the distributor side and handling packets while waiting to release new. Fixes: 775003ad2f96 ("distributor: add new burst-capable library") Cc: stable@dpdk.org Signed-off-by: Lukasz Wojciechowski Acked-by: David Hunt --- diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index c6b19a3886..d6d4350a28 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -319,12 +319,14 @@ release(struct rte_distributor *d, unsigned int wkr) struct rte_distributor_buffer *buf = &(d->bufs[wkr]); unsigned int i; + handle_returns(d, wkr); + /* Sync with worker on GET_BUF flag */ while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE) - & RTE_DISTRIB_GET_BUF)) + & RTE_DISTRIB_GET_BUF)) { + handle_returns(d, wkr); rte_pause(); - - handle_returns(d, wkr); + } buf->count = 0; @@ -374,6 +376,7 @@ rte_distributor_process(struct rte_distributor *d, /* Flush out all non-full cache-lines to workers. */ for (wid = 0 ; wid < d->num_workers; wid++) { /* Sync with worker on GET_BUF flag. */ + handle_returns(d, wid); if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]), __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) { release(d, wid);