crypto/scheduler: optimize crypto op ordering
authorFan Zhang <roy.fan.zhang@intel.com>
Thu, 2 Mar 2017 14:18:34 +0000 (14:18 +0000)
committerPablo de Lara <pablo.de.lara.guarch@intel.com>
Wed, 5 Apr 2017 22:17:44 +0000 (00:17 +0200)
This patch optimizes the crypto op ordering by replacing the
ordering method from using rte_reorder library to using rte_ring
to avoid unnecessary crypto op storing and recovering cost.

Signed-off-by: Fan Zhang <roy.fan.zhang@intel.com>
Signed-off-by: Sergio Gonzalez Monroy <sergio.gonzalez.monroy@intel.com>
Acked-by: Declan Doherty <declan.doherty@intel.com>
drivers/crypto/scheduler/scheduler_pmd_ops.c
drivers/crypto/scheduler/scheduler_pmd_private.h
drivers/crypto/scheduler/scheduler_roundrobin.c

index 56624c7..287b2fb 100644 (file)
@@ -63,24 +63,25 @@ scheduler_pmd_config(struct rte_cryptodev *dev)
 }
 
 static int
-update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id)
+update_order_ring(struct rte_cryptodev *dev, uint16_t qp_id)
 {
        struct scheduler_ctx *sched_ctx = dev->data->dev_private;
        struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
 
        if (sched_ctx->reordering_enabled) {
-               char reorder_buff_name[RTE_CRYPTODEV_NAME_MAX_LEN];
-               uint32_t buff_size = sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE;
+               char order_ring_name[RTE_CRYPTODEV_NAME_MAX_LEN];
+               uint32_t buff_size = rte_align32pow2(
+                       sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE);
 
-               if (qp_ctx->reorder_buf) {
-                       rte_reorder_free(qp_ctx->reorder_buf);
-                       qp_ctx->reorder_buf = NULL;
+               if (qp_ctx->order_ring) {
+                       rte_ring_free(qp_ctx->order_ring);
+                       qp_ctx->order_ring = NULL;
                }
 
                if (!buff_size)
                        return 0;
 
-               if (snprintf(reorder_buff_name, RTE_CRYPTODEV_NAME_MAX_LEN,
+               if (snprintf(order_ring_name, RTE_CRYPTODEV_NAME_MAX_LEN,
                        "%s_rb_%u_%u", RTE_STR(CRYPTODEV_NAME_SCHEDULER_PMD),
                        dev->data->dev_id, qp_id) < 0) {
                        CS_LOG_ERR("failed to create unique reorder buffer "
@@ -88,16 +89,17 @@ update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id)
                        return -ENOMEM;
                }
 
-               qp_ctx->reorder_buf = rte_reorder_create(reorder_buff_name,
-                               rte_socket_id(), buff_size);
-               if (!qp_ctx->reorder_buf) {
-                       CS_LOG_ERR("failed to create reorder buffer");
+               qp_ctx->order_ring = rte_ring_create(order_ring_name,
+                               buff_size, rte_socket_id(),
+                               RING_F_SP_ENQ | RING_F_SC_DEQ);
+               if (!qp_ctx->order_ring) {
+                       CS_LOG_ERR("failed to create order ring");
                        return -ENOMEM;
                }
        } else {
-               if (qp_ctx->reorder_buf) {
-                       rte_reorder_free(qp_ctx->reorder_buf);
-                       qp_ctx->reorder_buf = NULL;
+               if (qp_ctx->order_ring) {
+                       rte_ring_free(qp_ctx->order_ring);
+                       qp_ctx->order_ring = NULL;
                }
        }
 
@@ -116,7 +118,7 @@ scheduler_pmd_start(struct rte_cryptodev *dev)
                return 0;
 
        for (i = 0; i < dev->data->nb_queue_pairs; i++) {
-               ret = update_reorder_buff(dev, i);
+               ret = update_order_ring(dev, i);
                if (ret < 0) {
                        CS_LOG_ERR("Failed to update reorder buffer");
                        return ret;
@@ -224,9 +226,9 @@ scheduler_pmd_close(struct rte_cryptodev *dev)
        for (i = 0; i < dev->data->nb_queue_pairs; i++) {
                struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
 
-               if (qp_ctx->reorder_buf) {
-                       rte_reorder_free(qp_ctx->reorder_buf);
-                       qp_ctx->reorder_buf = NULL;
+               if (qp_ctx->order_ring) {
+                       rte_ring_free(qp_ctx->order_ring);
+                       qp_ctx->order_ring = NULL;
                }
 
                if (qp_ctx->private_qp_ctx) {
@@ -324,8 +326,8 @@ scheduler_pmd_qp_release(struct rte_cryptodev *dev, uint16_t qp_id)
        if (!qp_ctx)
                return 0;
 
-       if (qp_ctx->reorder_buf)
-               rte_reorder_free(qp_ctx->reorder_buf);
+       if (qp_ctx->order_ring)
+               rte_ring_free(qp_ctx->order_ring);
        if (qp_ctx->private_qp_ctx)
                rte_free(qp_ctx->private_qp_ctx);
 
index 43718cc..faa9c99 100644 (file)
@@ -34,8 +34,6 @@
 #ifndef _SCHEDULER_PMD_PRIVATE_H
 #define _SCHEDULER_PMD_PRIVATE_H
 
-#include <rte_hash.h>
-#include <rte_reorder.h>
 #include "rte_cryptodev_scheduler.h"
 
 /**< Maximum number of bonded devices per devices */
@@ -98,7 +96,7 @@ struct scheduler_ctx {
 struct scheduler_qp_ctx {
        void *private_qp_ctx;
 
-       struct rte_reorder_buffer *reorder_buf;
+       struct rte_ring *order_ring;
        uint32_t seqn;
 } __rte_cache_aligned;
 
@@ -106,6 +104,51 @@ struct scheduler_session {
        struct rte_cryptodev_sym_session *sessions[MAX_SLAVES_NUM];
 };
 
+static inline uint16_t __attribute__((always_inline))
+get_max_enqueue_order_count(struct rte_ring *order_ring, uint16_t nb_ops)
+{
+       uint32_t count = rte_ring_free_count(order_ring);
+
+       return count > nb_ops ? nb_ops : count;
+}
+
+static inline void __attribute__((always_inline))
+scheduler_order_insert(struct rte_ring *order_ring,
+               struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+       rte_ring_sp_enqueue_burst(order_ring, (void **)ops, nb_ops, NULL);
+}
+
+#define SCHEDULER_GET_RING_OBJ(order_ring, pos, op) do {            \
+       struct rte_crypto_op **ring = (void *)&order_ring[1];     \
+       op = ring[(order_ring->cons.head + pos) & order_ring->mask]; \
+} while (0)
+
+static inline uint16_t __attribute__((always_inline))
+scheduler_order_drain(struct rte_ring *order_ring,
+               struct rte_crypto_op **ops, uint16_t nb_ops)
+{
+       struct rte_crypto_op *op;
+       uint32_t nb_objs = rte_ring_count(order_ring);
+       uint32_t nb_ops_to_deq = 0;
+       int status = -1;
+
+       if (nb_objs > nb_ops)
+               nb_objs = nb_ops;
+
+       while (nb_ops_to_deq < nb_objs) {
+               SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op);
+               if (op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED)
+                       break;
+               nb_ops_to_deq++;
+       }
+
+       if (nb_ops_to_deq)
+               status = rte_ring_sc_dequeue_bulk(order_ring, (void **)ops,
+                               nb_ops_to_deq, NULL);
+
+       return (status == 0) ? nb_ops_to_deq : 0;
+}
 /** device specific operations function pointer structure */
 extern struct rte_cryptodev_ops *rte_crypto_scheduler_pmd_ops;
 
index 4990c74..f276132 100644 (file)
@@ -115,79 +115,16 @@ static uint16_t
 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
                uint16_t nb_ops)
 {
-       struct scheduler_qp_ctx *qp_ctx = qp;
-       struct rr_scheduler_qp_ctx *rr_qp_ctx = qp_ctx->private_qp_ctx;
-       uint32_t slave_idx = rr_qp_ctx->last_enq_slave_idx;
-       struct scheduler_slave *slave = &rr_qp_ctx->slaves[slave_idx];
-       uint16_t i, processed_ops;
-       struct rte_cryptodev_sym_session *sessions[nb_ops];
-       struct scheduler_session *sess0, *sess1, *sess2, *sess3;
-
-       if (unlikely(nb_ops == 0))
-               return 0;
-
-       for (i = 0; i < nb_ops && i < 4; i++) {
-               rte_prefetch0(ops[i]->sym->session);
-               rte_prefetch0(ops[i]->sym->m_src);
-       }
-
-       for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) {
-               sess0 = (struct scheduler_session *)
-                               ops[i]->sym->session->_private;
-               sess1 = (struct scheduler_session *)
-                               ops[i+1]->sym->session->_private;
-               sess2 = (struct scheduler_session *)
-                               ops[i+2]->sym->session->_private;
-               sess3 = (struct scheduler_session *)
-                               ops[i+3]->sym->session->_private;
-
-               sessions[i] = ops[i]->sym->session;
-               sessions[i + 1] = ops[i + 1]->sym->session;
-               sessions[i + 2] = ops[i + 2]->sym->session;
-               sessions[i + 3] = ops[i + 3]->sym->session;
-
-               ops[i]->sym->session = sess0->sessions[slave_idx];
-               ops[i]->sym->m_src->seqn = qp_ctx->seqn++;
-               ops[i + 1]->sym->session = sess1->sessions[slave_idx];
-               ops[i + 1]->sym->m_src->seqn = qp_ctx->seqn++;
-               ops[i + 2]->sym->session = sess2->sessions[slave_idx];
-               ops[i + 2]->sym->m_src->seqn = qp_ctx->seqn++;
-               ops[i + 3]->sym->session = sess3->sessions[slave_idx];
-               ops[i + 3]->sym->m_src->seqn = qp_ctx->seqn++;
-
-               rte_prefetch0(ops[i + 4]->sym->session);
-               rte_prefetch0(ops[i + 4]->sym->m_src);
-               rte_prefetch0(ops[i + 5]->sym->session);
-               rte_prefetch0(ops[i + 5]->sym->m_src);
-               rte_prefetch0(ops[i + 6]->sym->session);
-               rte_prefetch0(ops[i + 6]->sym->m_src);
-               rte_prefetch0(ops[i + 7]->sym->session);
-               rte_prefetch0(ops[i + 7]->sym->m_src);
-       }
-
-       for (; i < nb_ops; i++) {
-               sess0 = (struct scheduler_session *)
-                               ops[i]->sym->session->_private;
-               sessions[i] = ops[i]->sym->session;
-               ops[i]->sym->session = sess0->sessions[slave_idx];
-               ops[i]->sym->m_src->seqn = qp_ctx->seqn++;
-       }
-
-       processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
-                       slave->qp_id, ops, nb_ops);
-
-       slave->nb_inflight_cops += processed_ops;
+       struct rte_ring *order_ring =
+                       ((struct scheduler_qp_ctx *)qp)->order_ring;
+       uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
+                       nb_ops);
+       uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
+                       nb_ops_to_enq);
 
-       rr_qp_ctx->last_enq_slave_idx += 1;
-       rr_qp_ctx->last_enq_slave_idx %= rr_qp_ctx->nb_slaves;
+       scheduler_order_insert(order_ring, ops, nb_ops_enqd);
 
-       /* recover session if enqueue is failed */
-       if (unlikely(processed_ops < nb_ops)) {
-               for (i = processed_ops; i < nb_ops; i++)
-                       ops[i]->sym->session = sessions[i];
-       }
-
-       return processed_ops;
+       return nb_ops_enqd;
 }
 
 
@@ -232,105 +169,12 @@ static uint16_t
 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
                uint16_t nb_ops)
 {
-       struct scheduler_qp_ctx *qp_ctx = (struct scheduler_qp_ctx *)qp;
-       struct rr_scheduler_qp_ctx *rr_qp_ctx = (qp_ctx->private_qp_ctx);
-       struct scheduler_slave *slave;
-       struct rte_reorder_buffer *reorder_buff = qp_ctx->reorder_buf;
-       struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3;
-       uint16_t nb_deq_ops, nb_drained_mbufs;
-       const uint16_t nb_op_ops = nb_ops;
-       struct rte_crypto_op *op_ops[nb_op_ops];
-       struct rte_mbuf *reorder_mbufs[nb_op_ops];
-       uint32_t last_slave_idx = rr_qp_ctx->last_deq_slave_idx;
-       uint16_t i;
+       struct rte_ring *order_ring =
+                       ((struct scheduler_qp_ctx *)qp)->order_ring;
 
-       if (unlikely(rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops == 0)) {
-               do {
-                       last_slave_idx += 1;
-
-                       if (unlikely(last_slave_idx >= rr_qp_ctx->nb_slaves))
-                               last_slave_idx = 0;
-                       /* looped back, means no inflight cops in the queue */
-                       if (last_slave_idx == rr_qp_ctx->last_deq_slave_idx)
-                               return 0;
-               } while (rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops
-                               == 0);
-       }
-
-       slave = &rr_qp_ctx->slaves[last_slave_idx];
-
-       nb_deq_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
-                       slave->qp_id, op_ops, nb_ops);
-
-       rr_qp_ctx->last_deq_slave_idx += 1;
-       rr_qp_ctx->last_deq_slave_idx %= rr_qp_ctx->nb_slaves;
-
-       slave->nb_inflight_cops -= nb_deq_ops;
-
-       for (i = 0; i < nb_deq_ops && i < 4; i++)
-               rte_prefetch0(op_ops[i]->sym->m_src);
-
-       for (i = 0; (i < (nb_deq_ops - 8)) && (nb_deq_ops > 8); i += 4) {
-               mbuf0 = op_ops[i]->sym->m_src;
-               mbuf1 = op_ops[i + 1]->sym->m_src;
-               mbuf2 = op_ops[i + 2]->sym->m_src;
-               mbuf3 = op_ops[i + 3]->sym->m_src;
-
-               mbuf0->userdata = op_ops[i];
-               mbuf1->userdata = op_ops[i + 1];
-               mbuf2->userdata = op_ops[i + 2];
-               mbuf3->userdata = op_ops[i + 3];
-
-               rte_reorder_insert(reorder_buff, mbuf0);
-               rte_reorder_insert(reorder_buff, mbuf1);
-               rte_reorder_insert(reorder_buff, mbuf2);
-               rte_reorder_insert(reorder_buff, mbuf3);
-
-               rte_prefetch0(op_ops[i + 4]->sym->m_src);
-               rte_prefetch0(op_ops[i + 5]->sym->m_src);
-               rte_prefetch0(op_ops[i + 6]->sym->m_src);
-               rte_prefetch0(op_ops[i + 7]->sym->m_src);
-       }
-
-       for (; i < nb_deq_ops; i++) {
-               mbuf0 = op_ops[i]->sym->m_src;
-               mbuf0->userdata = op_ops[i];
-               rte_reorder_insert(reorder_buff, mbuf0);
-       }
-
-       nb_drained_mbufs = rte_reorder_drain(reorder_buff, reorder_mbufs,
-                       nb_ops);
-       for (i = 0; i < nb_drained_mbufs && i < 4; i++)
-               rte_prefetch0(reorder_mbufs[i]);
-
-       for (i = 0; (i < (nb_drained_mbufs - 8)) && (nb_drained_mbufs > 8);
-                       i += 4) {
-               ops[i] = *(struct rte_crypto_op **)reorder_mbufs[i]->userdata;
-               ops[i + 1] = *(struct rte_crypto_op **)
-                       reorder_mbufs[i + 1]->userdata;
-               ops[i + 2] = *(struct rte_crypto_op **)
-                       reorder_mbufs[i + 2]->userdata;
-               ops[i + 3] = *(struct rte_crypto_op **)
-                       reorder_mbufs[i + 3]->userdata;
-
-               reorder_mbufs[i]->userdata = NULL;
-               reorder_mbufs[i + 1]->userdata = NULL;
-               reorder_mbufs[i + 2]->userdata = NULL;
-               reorder_mbufs[i + 3]->userdata = NULL;
-
-               rte_prefetch0(reorder_mbufs[i + 4]);
-               rte_prefetch0(reorder_mbufs[i + 5]);
-               rte_prefetch0(reorder_mbufs[i + 6]);
-               rte_prefetch0(reorder_mbufs[i + 7]);
-       }
-
-       for (; i < nb_drained_mbufs; i++) {
-               ops[i] = *(struct rte_crypto_op **)
-                       reorder_mbufs[i]->userdata;
-               reorder_mbufs[i]->userdata = NULL;
-       }
+       schedule_dequeue(qp, ops, nb_ops);
 
-       return nb_drained_mbufs;
+       return scheduler_order_drain(order_ring, ops, nb_ops);
 }
 
 static int