From fd390896f4a3dd27ebdf551673960bece8aff966 Mon Sep 17 00:00:00 2001 From: Anoob Joseph Date: Mon, 18 Oct 2021 13:21:40 +0530 Subject: [PATCH] crypto/cnxk: allow different cores in pending queue Rework pending queue to allow producer and consumer cores to be different. Signed-off-by: Anoob Joseph --- doc/guides/cryptodevs/cnxk.rst | 6 --- drivers/crypto/cnxk/cn10k_cryptodev_ops.c | 36 ++++++++----- drivers/crypto/cnxk/cn9k_cryptodev_ops.c | 63 ++++++++++------------- drivers/crypto/cnxk/cnxk_cryptodev_ops.c | 20 +++++-- drivers/crypto/cnxk/cnxk_cryptodev_ops.h | 37 ++++++++++--- 5 files changed, 97 insertions(+), 65 deletions(-) diff --git a/doc/guides/cryptodevs/cnxk.rst b/doc/guides/cryptodevs/cnxk.rst index 85171a50a6..b5b6645008 100644 --- a/doc/guides/cryptodevs/cnxk.rst +++ b/doc/guides/cryptodevs/cnxk.rst @@ -259,9 +259,3 @@ CN10XX Features supported * UDP Encapsulation * AES-128/192/256-GCM * AES-128/192/256-CBC-SHA1-HMAC - -Limitations ------------ - -Multiple lcores may not operate on the same crypto queue pair. The lcore that -enqueues to a queue pair is the one that must dequeue from it. diff --git a/drivers/crypto/cnxk/cn10k_cryptodev_ops.c b/drivers/crypto/cnxk/cn10k_cryptodev_ops.c index c25c8e67b2..7f724de302 100644 --- a/drivers/crypto/cnxk/cn10k_cryptodev_ops.c +++ b/drivers/crypto/cnxk/cn10k_cryptodev_ops.c @@ -196,11 +196,15 @@ cn10k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) struct pending_queue *pend_q; struct cpt_inst_s *inst; uint16_t lmt_id; + uint64_t head; int ret, i; pend_q = &qp->pend_q; - nb_allowed = qp->lf.nb_desc - pend_q->pending_count; + const uint64_t pq_mask = pend_q->pq_mask; + + head = pend_q->head; + nb_allowed = pending_queue_free_cnt(head, pend_q->tail, pq_mask); nb_ops = RTE_MIN(nb_ops, nb_allowed); if (unlikely(nb_ops == 0)) @@ -214,18 +218,18 @@ cn10k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) again: for (i = 0; i < RTE_MIN(PKTS_PER_LOOP, nb_ops); i++) { - infl_req = &pend_q->req_queue[pend_q->enq_tail]; + infl_req = &pend_q->req_queue[head]; infl_req->op_flags = 0; ret = cn10k_cpt_fill_inst(qp, ops + i, &inst[2 * i], infl_req); if (unlikely(ret != 1)) { plt_dp_err("Could not process op: %p", ops + i); if (i == 0) - goto update_pending; + goto pend_q_commit; break; } - MOD_INC(pend_q->enq_tail, qp->lf.nb_desc); + pending_queue_advance(&head, pq_mask); } if (i > PKTS_PER_STEORL) { @@ -251,9 +255,10 @@ again: goto again; } -update_pending: - pend_q->pending_count += count + i; +pend_q_commit: + rte_atomic_thread_fence(__ATOMIC_RELEASE); + pend_q->head = head; pend_q->time_out = rte_get_timer_cycles() + DEFAULT_COMMAND_TIMEOUT * rte_get_timer_hz(); @@ -512,18 +517,23 @@ cn10k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) struct cnxk_cpt_qp *qp = qptr; struct pending_queue *pend_q; struct cpt_cn10k_res_s *res; + uint64_t infl_cnt, pq_tail; struct rte_crypto_op *cop; - int i, nb_pending; + int i; pend_q = &qp->pend_q; - nb_pending = pend_q->pending_count; + const uint64_t pq_mask = pend_q->pq_mask; + + pq_tail = pend_q->tail; + infl_cnt = pending_queue_infl_cnt(pend_q->head, pq_tail, pq_mask); + nb_ops = RTE_MIN(nb_ops, infl_cnt); - if (nb_ops > nb_pending) - nb_ops = nb_pending; + /* Ensure infl_cnt isn't read before data lands */ + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); for (i = 0; i < nb_ops; i++) { - infl_req = &pend_q->req_queue[pend_q->deq_head]; + infl_req = &pend_q->req_queue[pq_tail]; res = (struct cpt_cn10k_res_s *)&infl_req->res; @@ -538,7 +548,7 @@ cn10k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) break; } - MOD_INC(pend_q->deq_head, qp->lf.nb_desc); + pending_queue_advance(&pq_tail, pq_mask); cop = infl_req->cop; @@ -550,7 +560,7 @@ cn10k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) rte_mempool_put(qp->meta_info.pool, infl_req->mdata); } - pend_q->pending_count -= i; + pend_q->tail = pq_tail; return i; } diff --git a/drivers/crypto/cnxk/cn9k_cryptodev_ops.c b/drivers/crypto/cnxk/cn9k_cryptodev_ops.c index 75277936b0..449208da8f 100644 --- a/drivers/crypto/cnxk/cn9k_cryptodev_ops.c +++ b/drivers/crypto/cnxk/cn9k_cryptodev_ops.c @@ -218,14 +218,14 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) uint16_t nb_allowed, count = 0; struct cnxk_cpt_qp *qp = qptr; struct pending_queue *pend_q; - uint64_t enq_tail; + uint64_t head; int ret; - const uint32_t nb_desc = qp->lf.nb_desc; + pend_q = &qp->pend_q; + const uint64_t lmt_base = qp->lf.lmt_base; const uint64_t io_addr = qp->lf.io_addr; - - pend_q = &qp->pend_q; + const uint64_t pq_mask = pend_q->pq_mask; /* Clear w0, w2, w3 of both inst */ @@ -236,14 +236,13 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) inst[1].w2.u64 = 0; inst[1].w3.u64 = 0; - nb_allowed = qp->lf.nb_desc - pend_q->pending_count; + head = pend_q->head; + nb_allowed = pending_queue_free_cnt(head, pend_q->tail, pq_mask); nb_ops = RTE_MIN(nb_ops, nb_allowed); - enq_tail = pend_q->enq_tail; - if (unlikely(nb_ops & 1)) { op_1 = ops[0]; - infl_req_1 = &pend_q->req_queue[enq_tail]; + infl_req_1 = &pend_q->req_queue[head]; infl_req_1->op_flags = 0; ret = cn9k_cpt_inst_prep(qp, op_1, infl_req_1, &inst[0]); @@ -257,7 +256,7 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) inst[0].res_addr = (uint64_t)&infl_req_1->res; cn9k_cpt_inst_submit(&inst[0], lmt_base, io_addr); - MOD_INC(enq_tail, nb_desc); + pending_queue_advance(&head, pq_mask); count++; } @@ -265,10 +264,10 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) op_1 = ops[count]; op_2 = ops[count + 1]; - infl_req_1 = &pend_q->req_queue[enq_tail]; - MOD_INC(enq_tail, nb_desc); - infl_req_2 = &pend_q->req_queue[enq_tail]; - MOD_INC(enq_tail, nb_desc); + infl_req_1 = &pend_q->req_queue[head]; + pending_queue_advance(&head, pq_mask); + infl_req_2 = &pend_q->req_queue[head]; + pending_queue_advance(&head, pq_mask); infl_req_1->cop = op_1; infl_req_2->cop = op_2; @@ -284,23 +283,14 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) ret = cn9k_cpt_inst_prep(qp, op_1, infl_req_1, &inst[0]); if (unlikely(ret)) { plt_dp_err("Could not process op: %p", op_1); - if (enq_tail == 0) - enq_tail = nb_desc - 2; - else if (enq_tail == 1) - enq_tail = nb_desc - 1; - else - enq_tail--; + pending_queue_retreat(&head, pq_mask, 2); break; } ret = cn9k_cpt_inst_prep(qp, op_2, infl_req_2, &inst[1]); if (unlikely(ret)) { plt_dp_err("Could not process op: %p", op_2); - if (enq_tail == 0) - enq_tail = nb_desc - 1; - else - enq_tail--; - + pending_queue_retreat(&head, pq_mask, 1); cn9k_cpt_inst_submit(&inst[0], lmt_base, io_addr); count++; break; @@ -311,8 +301,9 @@ cn9k_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) count += 2; } - pend_q->enq_tail = enq_tail; - pend_q->pending_count += count; + rte_atomic_thread_fence(__ATOMIC_RELEASE); + + pend_q->head = head; pend_q->time_out = rte_get_timer_cycles() + DEFAULT_COMMAND_TIMEOUT * rte_get_timer_hz(); @@ -522,20 +513,23 @@ cn9k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) struct cnxk_cpt_qp *qp = qptr; struct pending_queue *pend_q; struct cpt_cn9k_res_s *res; + uint64_t infl_cnt, pq_tail; struct rte_crypto_op *cop; - uint32_t pq_deq_head; int i; - const uint32_t nb_desc = qp->lf.nb_desc; - pend_q = &qp->pend_q; - nb_ops = RTE_MIN(nb_ops, pend_q->pending_count); + const uint64_t pq_mask = pend_q->pq_mask; + + pq_tail = pend_q->tail; + infl_cnt = pending_queue_infl_cnt(pend_q->head, pq_tail, pq_mask); + nb_ops = RTE_MIN(nb_ops, infl_cnt); - pq_deq_head = pend_q->deq_head; + /* Ensure infl_cnt isn't read before data lands */ + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); for (i = 0; i < nb_ops; i++) { - infl_req = &pend_q->req_queue[pq_deq_head]; + infl_req = &pend_q->req_queue[pq_tail]; res = (struct cpt_cn9k_res_s *)&infl_req->res; @@ -550,7 +544,7 @@ cn9k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) break; } - MOD_INC(pq_deq_head, nb_desc); + pending_queue_advance(&pq_tail, pq_mask); cop = infl_req->cop; @@ -562,8 +556,7 @@ cn9k_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) rte_mempool_put(qp->meta_info.pool, infl_req->mdata); } - pend_q->pending_count -= i; - pend_q->deq_head = pq_deq_head; + pend_q->tail = pq_tail; return i; } diff --git a/drivers/crypto/cnxk/cnxk_cryptodev_ops.c b/drivers/crypto/cnxk/cnxk_cryptodev_ops.c index e49f826225..a2281fb8de 100644 --- a/drivers/crypto/cnxk/cnxk_cryptodev_ops.c +++ b/drivers/crypto/cnxk/cnxk_cryptodev_ops.c @@ -174,9 +174,10 @@ cnxk_cpt_metabuf_mempool_create(const struct rte_cryptodev *dev, { char mempool_name[RTE_MEMPOOL_NAMESIZE]; struct cpt_qp_meta_info *meta_info; + int lcore_cnt = rte_lcore_count(); struct rte_mempool *pool; + int mb_pool_sz, mlen = 8; uint32_t cache_sz; - int mlen = 8; if (dev->feature_flags & RTE_CRYPTODEV_FF_SYMMETRIC_CRYPTO) { /* Get meta len */ @@ -189,14 +190,22 @@ cnxk_cpt_metabuf_mempool_create(const struct rte_cryptodev *dev, mlen = RTE_MAX(mlen, cnxk_cpt_asym_get_mlen()); } + mb_pool_sz = nb_elements; cache_sz = RTE_MIN(RTE_MEMPOOL_CACHE_MAX_SIZE, nb_elements / 1.5); + /* For poll mode, core that enqueues and core that dequeues can be + * different. For event mode, all cores are allowed to use same crypto + * queue pair. + */ + + mb_pool_sz += (RTE_MAX(2, lcore_cnt) * cache_sz); + /* Allocate mempool */ snprintf(mempool_name, RTE_MEMPOOL_NAMESIZE, "cnxk_cpt_mb_%u:%u", dev->data->dev_id, qp_id); - pool = rte_mempool_create(mempool_name, nb_elements, mlen, cache_sz, 0, + pool = rte_mempool_create(mempool_name, mb_pool_sz, mlen, cache_sz, 0, NULL, NULL, NULL, NULL, rte_socket_id(), 0); if (pool == NULL) { @@ -269,9 +278,8 @@ cnxk_cpt_qp_create(const struct rte_cryptodev *dev, uint16_t qp_id, /* Initialize pending queue */ qp->pend_q.req_queue = pq_mem->addr; - qp->pend_q.enq_tail = 0; - qp->pend_q.deq_head = 0; - qp->pend_q.pending_count = 0; + qp->pend_q.head = 0; + qp->pend_q.tail = 0; return qp; @@ -372,6 +380,8 @@ cnxk_cpt_queue_pair_setup(struct rte_cryptodev *dev, uint16_t qp_id, goto exit; } + qp->pend_q.pq_mask = qp->lf.nb_desc - 1; + roc_cpt->lf[qp_id] = &qp->lf; ret = roc_cpt_lmtline_init(roc_cpt, &qp->lmtline, qp_id); diff --git a/drivers/crypto/cnxk/cnxk_cryptodev_ops.h b/drivers/crypto/cnxk/cnxk_cryptodev_ops.h index c5332dec53..0d363651ff 100644 --- a/drivers/crypto/cnxk/cnxk_cryptodev_ops.h +++ b/drivers/crypto/cnxk/cnxk_cryptodev_ops.h @@ -53,14 +53,14 @@ struct cpt_inflight_req { } __rte_aligned(16); struct pending_queue { - /** Pending requests count */ - uint64_t pending_count; /** Array of pending requests */ struct cpt_inflight_req *req_queue; - /** Tail of queue to be used for enqueue */ - uint16_t enq_tail; - /** Head of queue to be used for dequeue */ - uint16_t deq_head; + /** Head of the queue to be used for enqueue */ + uint64_t head; + /** Tail of the queue to be used for dequeue */ + uint64_t tail; + /** Pending queue mask */ + uint64_t pq_mask; /** Timeout to track h/w being unresponsive */ uint64_t time_out; }; @@ -151,4 +151,29 @@ cnxk_event_crypto_mdata_get(struct rte_crypto_op *op) return ec_mdata; } +static __rte_always_inline void +pending_queue_advance(uint64_t *index, const uint64_t mask) +{ + *index = (*index + 1) & mask; +} + +static __rte_always_inline void +pending_queue_retreat(uint64_t *index, const uint64_t mask, uint64_t nb_entry) +{ + *index = (*index - nb_entry) & mask; +} + +static __rte_always_inline uint64_t +pending_queue_infl_cnt(uint64_t head, uint64_t tail, const uint64_t mask) +{ + return (head - tail) & mask; +} + +static __rte_always_inline uint64_t +pending_queue_free_cnt(uint64_t head, uint64_t tail, const uint64_t mask) +{ + /* mask is nb_desc - 1 */ + return mask - pending_queue_infl_cnt(head, tail, mask); +} + #endif /* _CNXK_CRYPTODEV_OPS_H_ */ -- 2.39.5