From: David George Date: Fri, 24 Sep 2021 11:20:36 +0000 (+0530) Subject: common/cpt: rework pending queue X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=c9902a15bd005b6d4fe072cf7b60fe4ee679155f;p=dpdk.git common/cpt: rework pending queue Replace pending queue with one that allows concurrent single producer and single consumer. This relaxes the restriction of only allowing a single lcore to operate on a given queue pair. Signed-off-by: David George Signed-off-by: Anoob Joseph --- diff --git a/doc/guides/cryptodevs/octeontx.rst b/doc/guides/cryptodevs/octeontx.rst index 497227424a..a39f3f3d02 100644 --- a/doc/guides/cryptodevs/octeontx.rst +++ b/doc/guides/cryptodevs/octeontx.rst @@ -135,9 +135,3 @@ application: ./dpdk-test RTE>>cryptodev_octeontx_asym_autotest - -Limitations ------------ - -Multiple lcores may not operate on the same crypto queue pair. The lcore that -enqueues to a queue pair is the one that must dequeue from it. diff --git a/doc/guides/cryptodevs/octeontx2.rst b/doc/guides/cryptodevs/octeontx2.rst index f0beb92a49..811e61a1f6 100644 --- a/doc/guides/cryptodevs/octeontx2.rst +++ b/doc/guides/cryptodevs/octeontx2.rst @@ -186,9 +186,3 @@ Features supported * AES-128/192/256-GCM * AES-128/192/256-CBC-SHA1-HMAC * AES-128/192/256-CBC-SHA256-128-HMAC - -Limitations ------------ - -Multiple lcores may not operate on the same crypto queue pair. The lcore that -enqueues to a queue pair is the one that must dequeue from it. diff --git a/drivers/common/cpt/cpt_common.h b/drivers/common/cpt/cpt_common.h index 724e5ec736..d70668a3c2 100644 --- a/drivers/common/cpt/cpt_common.h +++ b/drivers/common/cpt/cpt_common.h @@ -5,6 +5,7 @@ #ifndef _CPT_COMMON_H_ #define _CPT_COMMON_H_ +#include #include /* @@ -32,14 +33,12 @@ struct cpt_qp_meta_info { * */ struct pending_queue { - /** Pending requests count */ - uint64_t pending_count; /** Array of pending requests */ - uintptr_t *req_queue; + void **rid_queue; /** Tail of queue to be used for enqueue */ - uint16_t enq_tail; + unsigned int tail; /** Head of queue to be used for dequeue */ - uint16_t deq_head; + unsigned int head; }; struct cpt_request_info { @@ -61,4 +60,68 @@ struct cpt_request_info { uint8_t extra_time; } __rte_aligned(8); +static __rte_always_inline void +pending_queue_push(struct pending_queue *q, void *rid, unsigned int off, + const int qsize) +{ + /* NOTE: no free space check, but it is expected that one is made */ + q->rid_queue[(q->tail + off) & (qsize - 1)] = rid; +} + +static __rte_always_inline void +pending_queue_commit(struct pending_queue *q, unsigned int cnt, + const unsigned int qsize) +{ + /* Ensure ordering between setting the entry and updating the tail */ + rte_atomic_thread_fence(__ATOMIC_RELEASE); + + q->tail = (q->tail + cnt) & (qsize - 1); +} + +static __rte_always_inline void +pending_queue_pop(struct pending_queue *q, const int qsize) +{ + /* NOTE: no empty check, but it is expected that one is made prior */ + + q->head = (q->head + 1) & (qsize - 1); +} + +static __rte_always_inline void +pending_queue_peek(struct pending_queue *q, void **rid, const int qsize, + int prefetch_next) +{ + void *next_rid; + /* NOTE: no empty check, but it is expected that one is made */ + + *rid = q->rid_queue[q->head]; + + if (likely(prefetch_next)) { + next_rid = q->rid_queue[(q->head + 1) & (qsize - 1)]; + rte_prefetch_non_temporal((void *)next_rid); + } +} + +static __rte_always_inline unsigned int +pending_queue_level(struct pending_queue *q, const int qsize) +{ + return (q->tail - q->head) & (qsize - 1); +} + +static __rte_always_inline unsigned int +pending_queue_free_slots(struct pending_queue *q, const int qsize, + const int reserved_slots) +{ + int free_slots; + + free_slots = qsize - pending_queue_level(q, qsize); + + /* Use only use qsize - 1 */ + free_slots -= 1 + reserved_slots; + + if (unlikely(free_slots < 0)) + return 0; + + return free_slots; +} + #endif /* _CPT_COMMON_H_ */ diff --git a/drivers/crypto/octeontx/otx_cryptodev_hw_access.c b/drivers/crypto/octeontx/otx_cryptodev_hw_access.c index ab335c6a62..7b89a62d81 100644 --- a/drivers/crypto/octeontx/otx_cryptodev_hw_access.c +++ b/drivers/crypto/octeontx/otx_cryptodev_hw_access.c @@ -527,10 +527,10 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, uint8_t group, memset(&cptvf->pqueue, 0, sizeof(cptvf->pqueue)); /* Chunks are of fixed size buffers */ + + qlen = DEFAULT_CMD_QLEN; chunks = DEFAULT_CMD_QCHUNKS; chunk_len = DEFAULT_CMD_QCHUNK_SIZE; - - qlen = chunks * chunk_len; /* Chunk size includes 8 bytes of next chunk ptr */ chunk_size = chunk_len * CPT_INST_SIZE + CPT_NEXT_CHUNK_PTR_SIZE; @@ -538,7 +538,7 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, uint8_t group, len = chunks * RTE_ALIGN(sizeof(struct command_chunk), 8); /* For pending queue */ - len += qlen * sizeof(uintptr_t); + len += qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8); /* So that instruction queues start as pg size aligned */ len = RTE_ALIGN(len, pg_sz); @@ -573,14 +573,11 @@ otx_cpt_get_resource(const struct rte_cryptodev *dev, uint8_t group, } /* Pending queue setup */ - cptvf->pqueue.req_queue = (uintptr_t *)mem; - cptvf->pqueue.enq_tail = 0; - cptvf->pqueue.deq_head = 0; - cptvf->pqueue.pending_count = 0; - - mem += qlen * sizeof(uintptr_t); - len -= qlen * sizeof(uintptr_t); - dma_addr += qlen * sizeof(uintptr_t); + cptvf->pqueue.rid_queue = (void **)mem; + + mem += qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8); + len -= qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8); + dma_addr += qlen * RTE_ALIGN(sizeof(cptvf->pqueue.rid_queue[0]), 8); /* Alignment wastage */ used_len = alloc_len - len; diff --git a/drivers/crypto/octeontx/otx_cryptodev_hw_access.h b/drivers/crypto/octeontx/otx_cryptodev_hw_access.h index f7b1e93402..7c6b1e45b4 100644 --- a/drivers/crypto/octeontx/otx_cryptodev_hw_access.h +++ b/drivers/crypto/octeontx/otx_cryptodev_hw_access.h @@ -23,10 +23,16 @@ #define CPT_INTR_POLL_INTERVAL_MS (50) /* Default command queue length */ -#define DEFAULT_CMD_QCHUNKS 2 -#define DEFAULT_CMD_QCHUNK_SIZE 1023 -#define DEFAULT_CMD_QLEN \ - (DEFAULT_CMD_QCHUNK_SIZE * DEFAULT_CMD_QCHUNKS) +#define DEFAULT_CMD_QLEN 2048 +#define DEFAULT_CMD_QCHUNKS 2 + +/* Instruction memory benefits from being 1023, so introduce + * reserved entries so we can't overrun the instruction queue + */ +#define DEFAULT_CMD_QRSVD_SLOTS DEFAULT_CMD_QCHUNKS +#define DEFAULT_CMD_QCHUNK_SIZE \ + ((DEFAULT_CMD_QLEN - DEFAULT_CMD_QRSVD_SLOTS) / \ + DEFAULT_CMD_QCHUNKS) #define CPT_CSR_REG_BASE(cpt) ((cpt)->reg_base) diff --git a/drivers/crypto/octeontx/otx_cryptodev_ops.c b/drivers/crypto/octeontx/otx_cryptodev_ops.c index 9b5bde53f8..9e8fd495cf 100644 --- a/drivers/crypto/octeontx/otx_cryptodev_ops.c +++ b/drivers/crypto/octeontx/otx_cryptodev_ops.c @@ -431,16 +431,10 @@ otx_cpt_asym_session_clear(struct rte_cryptodev *dev, static __rte_always_inline void * __rte_hot otx_cpt_request_enqueue(struct cpt_instance *instance, - struct pending_queue *pqueue, void *req, uint64_t cpt_inst_w7) { struct cpt_request_info *user_req = (struct cpt_request_info *)req; - if (unlikely(pqueue->pending_count >= DEFAULT_CMD_QLEN)) { - rte_errno = EAGAIN; - return NULL; - } - fill_cpt_inst(instance, req, cpt_inst_w7); CPT_LOG_DP_DEBUG("req: %p op: %p ", req, user_req->op); @@ -460,8 +454,7 @@ otx_cpt_request_enqueue(struct cpt_instance *instance, static __rte_always_inline void * __rte_hot otx_cpt_enq_single_asym(struct cpt_instance *instance, - struct rte_crypto_op *op, - struct pending_queue *pqueue) + struct rte_crypto_op *op) { struct cpt_qp_meta_info *minfo = &instance->meta_info; struct rte_crypto_asym_op *asym_op = op->asym; @@ -525,8 +518,7 @@ otx_cpt_enq_single_asym(struct cpt_instance *instance, goto req_fail; } - req = otx_cpt_request_enqueue(instance, pqueue, params.req, - sess->cpt_inst_w7); + req = otx_cpt_request_enqueue(instance, params.req, sess->cpt_inst_w7); if (unlikely(req == NULL)) { CPT_LOG_DP_ERR("Could not enqueue crypto req"); goto req_fail; @@ -542,8 +534,7 @@ req_fail: static __rte_always_inline void * __rte_hot otx_cpt_enq_single_sym(struct cpt_instance *instance, - struct rte_crypto_op *op, - struct pending_queue *pqueue) + struct rte_crypto_op *op) { struct cpt_sess_misc *sess; struct rte_crypto_sym_op *sym_op = op->sym; @@ -573,8 +564,7 @@ otx_cpt_enq_single_sym(struct cpt_instance *instance, } /* Enqueue prepared instruction to h/w */ - req = otx_cpt_request_enqueue(instance, pqueue, prep_req, - sess->cpt_inst_w7); + req = otx_cpt_request_enqueue(instance, prep_req, sess->cpt_inst_w7); if (unlikely(req == NULL)) /* Buffer allocated for request preparation need to be freed */ free_op_meta(mdata, instance->meta_info.pool); @@ -584,8 +574,7 @@ otx_cpt_enq_single_sym(struct cpt_instance *instance, static __rte_always_inline void * __rte_hot otx_cpt_enq_single_sym_sessless(struct cpt_instance *instance, - struct rte_crypto_op *op, - struct pending_queue *pend_q) + struct rte_crypto_op *op) { const int driver_id = otx_cryptodev_driver_id; struct rte_crypto_sym_op *sym_op = op->sym; @@ -607,8 +596,8 @@ otx_cpt_enq_single_sym_sessless(struct cpt_instance *instance, sym_op->session = sess; - req = otx_cpt_enq_single_sym(instance, op, pend_q); - + /* Enqueue op with the tmp session set */ + req = otx_cpt_enq_single_sym(instance, op); if (unlikely(req == NULL)) goto priv_put; @@ -627,22 +616,20 @@ sess_put: static __rte_always_inline void *__rte_hot otx_cpt_enq_single(struct cpt_instance *inst, struct rte_crypto_op *op, - struct pending_queue *pqueue, const uint8_t op_type) { /* Check for the type */ if (op_type == OP_TYPE_SYM) { if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION) - return otx_cpt_enq_single_sym(inst, op, pqueue); + return otx_cpt_enq_single_sym(inst, op); else - return otx_cpt_enq_single_sym_sessless(inst, op, - pqueue); + return otx_cpt_enq_single_sym_sessless(inst, op); } if (op_type == OP_TYPE_ASYM) { if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION) - return otx_cpt_enq_single_asym(inst, op, pqueue); + return otx_cpt_enq_single_asym(inst, op); } /* Should not reach here */ @@ -655,30 +642,33 @@ otx_cpt_pkt_enqueue(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops, const uint8_t op_type) { struct cpt_instance *instance = (struct cpt_instance *)qptr; - uint16_t count; + uint16_t count, free_slots; void *req; struct cpt_vf *cptvf = (struct cpt_vf *)instance; struct pending_queue *pqueue = &cptvf->pqueue; - count = DEFAULT_CMD_QLEN - pqueue->pending_count; - if (nb_ops > count) - nb_ops = count; + free_slots = pending_queue_free_slots(pqueue, DEFAULT_CMD_QLEN, + DEFAULT_CMD_QRSVD_SLOTS); + if (nb_ops > free_slots) + nb_ops = free_slots; count = 0; while (likely(count < nb_ops)) { /* Enqueue single op */ - req = otx_cpt_enq_single(instance, ops[count], pqueue, op_type); + req = otx_cpt_enq_single(instance, ops[count], op_type); if (unlikely(req == NULL)) break; - pqueue->req_queue[pqueue->enq_tail] = (uintptr_t)req; - MOD_INC(pqueue->enq_tail, DEFAULT_CMD_QLEN); - pqueue->pending_count += 1; + pending_queue_push(pqueue, req, count, DEFAULT_CMD_QLEN); count++; } - otx_cpt_ring_dbell(instance, count); + + if (likely(count)) { + pending_queue_commit(pqueue, count, DEFAULT_CMD_QLEN); + otx_cpt_ring_dbell(instance, count); + } return count; } @@ -756,8 +746,7 @@ otx_crypto_adapter_enqueue(void *port, struct rte_crypto_op *op) op_type = op->type == RTE_CRYPTO_OP_TYPE_SYMMETRIC ? OP_TYPE_SYM : OP_TYPE_ASYM; - req = otx_cpt_enq_single(instance, op, - &((struct cpt_vf *)instance)->pqueue, op_type); + req = otx_cpt_enq_single(instance, op, op_type); if (unlikely(req == NULL)) return 0; @@ -971,17 +960,16 @@ otx_cpt_pkt_dequeue(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops, int nb_completed; struct pending_queue *pqueue = &cptvf->pqueue; - pcount = pqueue->pending_count; + pcount = pending_queue_level(pqueue, DEFAULT_CMD_QLEN); + + /* Ensure pcount isn't read before data lands */ + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); + count = (nb_ops > pcount) ? pcount : nb_ops; for (i = 0; i < count; i++) { - user_req = (struct cpt_request_info *) - pqueue->req_queue[pqueue->deq_head]; - - if (likely((i+1) < count)) { - rte_prefetch_non_temporal( - (void *)pqueue->req_queue[i+1]); - } + pending_queue_peek(pqueue, (void **) &user_req, + DEFAULT_CMD_QLEN, i + 1 < count); ret = check_nb_command_id(user_req, instance); @@ -997,8 +985,7 @@ otx_cpt_pkt_dequeue(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops, CPT_LOG_DP_DEBUG("Request %p Op %p completed with code %d", user_req, user_req->op, ret); - MOD_INC(pqueue->deq_head, DEFAULT_CMD_QLEN); - pqueue->pending_count -= 1; + pending_queue_pop(pqueue, DEFAULT_CMD_QLEN); } nb_completed = i; diff --git a/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h b/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h index f9981ea8c9..90a338e05a 100644 --- a/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h +++ b/drivers/crypto/octeontx2/otx2_cryptodev_hw_access.h @@ -17,10 +17,10 @@ #include "otx2_dev.h" #include "otx2_cryptodev_qp.h" -/* CPT instruction queue length */ -#define OTX2_CPT_IQ_LEN 8200 - -#define OTX2_CPT_DEFAULT_CMD_QLEN OTX2_CPT_IQ_LEN +/* CPT instruction queue length. + * Use queue size as power of 2 for aiding in pending queue calculations. + */ +#define OTX2_CPT_DEFAULT_CMD_QLEN 8192 /* Mask which selects all engine groups */ #define OTX2_CPT_ENG_GRPS_MASK 0xFF diff --git a/drivers/crypto/octeontx2/otx2_cryptodev_ops.c b/drivers/crypto/octeontx2/otx2_cryptodev_ops.c index 09ddbb5f34..37fad11d91 100644 --- a/drivers/crypto/octeontx2/otx2_cryptodev_ops.c +++ b/drivers/crypto/octeontx2/otx2_cryptodev_ops.c @@ -49,6 +49,7 @@ otx2_cpt_metabuf_mempool_create(const struct rte_cryptodev *dev, { char mempool_name[RTE_MEMPOOL_NAMESIZE]; struct cpt_qp_meta_info *meta_info; + int lcore_cnt = rte_lcore_count(); int ret, max_mlen, mb_pool_sz; struct rte_mempool *pool; int asym_mlen = 0; @@ -87,7 +88,13 @@ otx2_cpt_metabuf_mempool_create(const struct rte_cryptodev *dev, snprintf(mempool_name, RTE_MEMPOOL_NAMESIZE, "otx2_cpt_mb_%u:%u", dev->data->dev_id, qp_id); - mb_pool_sz = RTE_MAX(nb_elements, (METABUF_POOL_CACHE_SIZE * rte_lcore_count())); + mb_pool_sz = nb_elements; + + /* For poll mode, core that enqueues and core that dequeues can be + * different. For event mode, all cores are allowed to use same crypto + * queue pair. + */ + mb_pool_sz += (RTE_MAX(2, lcore_cnt) * METABUF_POOL_CACHE_SIZE); pool = rte_mempool_create_empty(mempool_name, mb_pool_sz, max_mlen, METABUF_POOL_CACHE_SIZE, 0, @@ -187,7 +194,13 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, uint16_t qp_id, return NULL; } - iq_len = OTX2_CPT_IQ_LEN; + /* + * Pending queue updates make assumption that queue size is a power + * of 2. + */ + RTE_BUILD_BUG_ON(!RTE_IS_POWER_OF_2(OTX2_CPT_DEFAULT_CMD_QLEN)); + + iq_len = OTX2_CPT_DEFAULT_CMD_QLEN; /* * Queue size must be a multiple of 40 and effective queue size to @@ -196,7 +209,7 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, uint16_t qp_id, size_div40 = (iq_len + 40 - 1) / 40 + 1; /* For pending queue */ - len = iq_len * sizeof(uintptr_t); + len = iq_len * RTE_ALIGN(sizeof(qp->pend_q.rid_queue[0]), 8); /* Space for instruction group memory */ len += size_div40 * 16; @@ -205,7 +218,7 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, uint16_t qp_id, len = RTE_ALIGN(len, pg_sz); /* For instruction queues */ - len += OTX2_CPT_IQ_LEN * sizeof(union cpt_inst_s); + len += OTX2_CPT_DEFAULT_CMD_QLEN * sizeof(union cpt_inst_s); /* Wastage after instruction queues */ len = RTE_ALIGN(len, pg_sz); @@ -233,12 +246,11 @@ otx2_cpt_qp_create(const struct rte_cryptodev *dev, uint16_t qp_id, } /* Initialize pending queue */ - qp->pend_q.req_queue = (uintptr_t *)va; - qp->pend_q.enq_tail = 0; - qp->pend_q.deq_head = 0; - qp->pend_q.pending_count = 0; + qp->pend_q.rid_queue = (void **)va; + qp->pend_q.tail = 0; + qp->pend_q.head = 0; - used_len = iq_len * sizeof(uintptr_t); + used_len = iq_len * RTE_ALIGN(sizeof(qp->pend_q.rid_queue[0]), 8); used_len += size_div40 * 16; used_len = RTE_ALIGN(used_len, pg_sz); iova += used_len; @@ -514,7 +526,8 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp, struct pending_queue *pend_q, struct cpt_request_info *req, struct rte_crypto_op *op, - uint64_t cpt_inst_w7) + uint64_t cpt_inst_w7, + unsigned int burst_index) { void *lmtline = qp->lmtline; union cpt_inst_s inst; @@ -523,9 +536,6 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp, if (qp->ca_enable) return otx2_ca_enqueue_req(qp, req, lmtline, op, cpt_inst_w7); - if (unlikely(pend_q->pending_count >= OTX2_CPT_DEFAULT_CMD_QLEN)) - return -EAGAIN; - inst.u[0] = 0; inst.s9x.res_addr = req->comp_baddr; inst.u[2] = 0; @@ -553,11 +563,7 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp, lmt_status = otx2_lmt_submit(qp->lf_nq_reg); } while (lmt_status == 0); - pend_q->req_queue[pend_q->enq_tail] = (uintptr_t)req; - - /* We will use soft queue length here to limit requests */ - MOD_INC(pend_q->enq_tail, OTX2_CPT_DEFAULT_CMD_QLEN); - pend_q->pending_count += 1; + pending_queue_push(pend_q, req, burst_index, OTX2_CPT_DEFAULT_CMD_QLEN); return 0; } @@ -565,7 +571,8 @@ otx2_cpt_enqueue_req(const struct otx2_cpt_qp *qp, static __rte_always_inline int32_t __rte_hot otx2_cpt_enqueue_asym(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, - struct pending_queue *pend_q) + struct pending_queue *pend_q, + unsigned int burst_index) { struct cpt_qp_meta_info *minfo = &qp->meta_info; struct rte_crypto_asym_op *asym_op = op->asym; @@ -626,8 +633,7 @@ otx2_cpt_enqueue_asym(struct otx2_cpt_qp *qp, } ret = otx2_cpt_enqueue_req(qp, pend_q, params.req, op, - sess->cpt_inst_w7); - + sess->cpt_inst_w7, burst_index); if (unlikely(ret)) { CPT_LOG_DP_ERR("Could not enqueue crypto req"); goto req_fail; @@ -643,7 +649,7 @@ req_fail: static __rte_always_inline int __rte_hot otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, - struct pending_queue *pend_q) + struct pending_queue *pend_q, unsigned int burst_index) { struct rte_crypto_sym_op *sym_op = op->sym; struct cpt_request_info *req; @@ -670,8 +676,8 @@ otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, return ret; } - ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7); - + ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7, + burst_index); if (unlikely(ret)) { /* Free buffer allocated by fill params routines */ free_op_meta(mdata, qp->meta_info.pool); @@ -682,7 +688,8 @@ otx2_cpt_enqueue_sym(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, static __rte_always_inline int __rte_hot otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, - struct pending_queue *pend_q) + struct pending_queue *pend_q, + const unsigned int burst_index) { uint32_t winsz, esn_low = 0, esn_hi = 0, seql = 0, seqh = 0; struct rte_mbuf *m_src = op->sym->m_src; @@ -739,7 +746,8 @@ otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, return ret; } - ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7); + ret = otx2_cpt_enqueue_req(qp, pend_q, req, op, sess->cpt_inst_w7, + burst_index); if (winsz && esn) { seq_in_sa = ((uint64_t)esn_hi << 32) | esn_low; @@ -754,7 +762,8 @@ otx2_cpt_enqueue_sec(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, static __rte_always_inline int __rte_hot otx2_cpt_enqueue_sym_sessless(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, - struct pending_queue *pend_q) + struct pending_queue *pend_q, + unsigned int burst_index) { const int driver_id = otx2_cryptodev_driver_id; struct rte_crypto_sym_op *sym_op = op->sym; @@ -773,7 +782,7 @@ otx2_cpt_enqueue_sym_sessless(struct otx2_cpt_qp *qp, struct rte_crypto_op *op, sym_op->session = sess; - ret = otx2_cpt_enqueue_sym(qp, op, pend_q); + ret = otx2_cpt_enqueue_sym(qp, op, pend_q, burst_index); if (unlikely(ret)) goto priv_put; @@ -798,23 +807,26 @@ otx2_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) pend_q = &qp->pend_q; - nb_allowed = OTX2_CPT_DEFAULT_CMD_QLEN - pend_q->pending_count; - if (nb_ops > nb_allowed) - nb_ops = nb_allowed; + nb_allowed = pending_queue_free_slots(pend_q, + OTX2_CPT_DEFAULT_CMD_QLEN, 0); + nb_ops = RTE_MIN(nb_ops, nb_allowed); for (count = 0; count < nb_ops; count++) { op = ops[count]; if (op->type == RTE_CRYPTO_OP_TYPE_SYMMETRIC) { if (op->sess_type == RTE_CRYPTO_OP_SECURITY_SESSION) - ret = otx2_cpt_enqueue_sec(qp, op, pend_q); + ret = otx2_cpt_enqueue_sec(qp, op, pend_q, + count); else if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION) - ret = otx2_cpt_enqueue_sym(qp, op, pend_q); + ret = otx2_cpt_enqueue_sym(qp, op, pend_q, + count); else ret = otx2_cpt_enqueue_sym_sessless(qp, op, - pend_q); + pend_q, count); } else if (op->type == RTE_CRYPTO_OP_TYPE_ASYMMETRIC) { if (op->sess_type == RTE_CRYPTO_OP_WITH_SESSION) - ret = otx2_cpt_enqueue_asym(qp, op, pend_q); + ret = otx2_cpt_enqueue_asym(qp, op, pend_q, + count); else break; } else @@ -824,6 +836,9 @@ otx2_cpt_enqueue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) break; } + if (unlikely(!qp->ca_enable)) + pending_queue_commit(pend_q, count, OTX2_CPT_DEFAULT_CMD_QLEN); + return count; } @@ -1059,14 +1074,16 @@ otx2_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) pend_q = &qp->pend_q; - nb_pending = pend_q->pending_count; + nb_pending = pending_queue_level(pend_q, OTX2_CPT_DEFAULT_CMD_QLEN); + + /* Ensure pcount isn't read before data lands */ + rte_atomic_thread_fence(__ATOMIC_ACQUIRE); - if (nb_ops > nb_pending) - nb_ops = nb_pending; + nb_ops = RTE_MIN(nb_ops, nb_pending); for (i = 0; i < nb_ops; i++) { - req = (struct cpt_request_info *) - pend_q->req_queue[pend_q->deq_head]; + pending_queue_peek(pend_q, (void **)&req, + OTX2_CPT_DEFAULT_CMD_QLEN, 0); cc[i] = otx2_cpt_compcode_get(req); @@ -1075,8 +1092,7 @@ otx2_cpt_dequeue_burst(void *qptr, struct rte_crypto_op **ops, uint16_t nb_ops) ops[i] = req->op; - MOD_INC(pend_q->deq_head, OTX2_CPT_DEFAULT_CMD_QLEN); - pend_q->pending_count -= 1; + pending_queue_pop(pend_q, OTX2_CPT_DEFAULT_CMD_QLEN); } nb_completed = i;