eventdev/crypto: store operations in circular buffer
authorGanapati Kundapura <ganapati.kundapura@intel.com>
Thu, 10 Feb 2022 17:41:16 +0000 (11:41 -0600)
committerJerin Jacob <jerinj@marvell.com>
Mon, 14 Feb 2022 15:23:43 +0000 (16:23 +0100)
Move crypto ops to circular buffer to retain crypto
ops when cryptodev/eventdev are temporarily full.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
Acked-by: Abhinandan Gujjar <abhinandan.gujjar@intel.com>
lib/eventdev/rte_event_crypto_adapter.c

index d840803..0b484f3 100644 (file)
 #define CRYPTO_ADAPTER_MEM_NAME_LEN 32
 #define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
 
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
 /* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
  * iterations of eca_crypto_adapter_enq_run()
  */
 #define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
 
+struct crypto_ops_circular_buffer {
+       /* index of head element in circular buffer */
+       uint16_t head;
+       /* index of tail element in circular buffer */
+       uint16_t tail;
+       /* number of elements in buffer */
+       uint16_t count;
+       /* size of circular buffer */
+       uint16_t size;
+       /* Pointer to hold rte_crypto_ops for batching */
+       struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
 struct event_crypto_adapter {
        /* Event device identifier */
        uint8_t eventdev_id;
@@ -37,6 +53,10 @@ struct event_crypto_adapter {
        uint8_t event_port_id;
        /* Store event device's implicit release capability */
        uint8_t implicit_release_disabled;
+       /* Flag to indicate backpressure at cryptodev
+        * Stop further dequeuing events from eventdev
+        */
+       bool stop_enq_to_cryptodev;
        /* Max crypto ops processed in any service function invocation */
        uint32_t max_nb;
        /* Lock to serialize config updates with service function */
@@ -47,6 +67,8 @@ struct event_crypto_adapter {
        struct crypto_device_info *cdevs;
        /* Loop counter to flush crypto ops */
        uint16_t transmit_loop_count;
+       /* Circular buffer for batching crypto ops to eventdev */
+       struct crypto_ops_circular_buffer ebuf;
        /* Per instance stats structure */
        struct rte_event_crypto_adapter_stats crypto_stats;
        /* Configuration callback for rte_service configuration */
@@ -93,10 +115,8 @@ struct crypto_device_info {
 struct crypto_queue_pair_info {
        /* Set to indicate queue pair is enabled */
        bool qp_enabled;
-       /* Pointer to hold rte_crypto_ops for batching */
-       struct rte_crypto_op **op_buffer;
-       /* No of crypto ops accumulated */
-       uint8_t len;
+       /* Circular buffer for batching crypto ops to cdev */
+       struct crypto_ops_circular_buffer cbuf;
 } __rte_cache_aligned;
 
 static struct event_crypto_adapter **event_crypto_adapter;
@@ -141,6 +161,84 @@ eca_init(void)
        return 0;
 }
 
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+       return bufp->count >= BATCH_SIZE;
+}
+
+static inline bool
+eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer *bufp)
+{
+       return (bufp->size - bufp->count) >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+       rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+                        struct crypto_ops_circular_buffer *bufp,
+                        uint16_t sz)
+{
+       bufp->op_buffer = rte_zmalloc(name,
+                                     sizeof(struct rte_crypto_op *) * sz,
+                                     0);
+       if (bufp->op_buffer == NULL)
+               return -ENOMEM;
+
+       bufp->size = sz;
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+                       struct rte_crypto_op *op)
+{
+       uint16_t *tailp = &bufp->tail;
+
+       bufp->op_buffer[*tailp] = op;
+       /* circular buffer, go round */
+       *tailp = (*tailp + 1) % bufp->size;
+       bufp->count++;
+
+       return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+                                 uint8_t cdev_id, uint16_t qp_id,
+                                 uint16_t *nb_ops_flushed)
+{
+       uint16_t n = 0;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else {
+               *nb_ops_flushed = 0;
+               return 0;  /* buffer empty */
+       }
+
+       *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+                                                     &ops[*headp], n);
+       bufp->count -= *nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+       } else
+               *headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+       return *nb_ops_flushed == n ? 0 : -1;
+}
+
 static inline struct event_crypto_adapter *
 eca_id_to_adapter(uint8_t id)
 {
@@ -237,10 +335,19 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
                return -ENOMEM;
        }
 
+       if (eca_circular_buffer_init("eca_edev_circular_buffer",
+                                    &adapter->ebuf,
+                                    CRYPTO_ADAPTER_BUFFER_SZ)) {
+               RTE_EDEV_LOG_ERR("Failed to get memory for eventdev buffer");
+               rte_free(adapter);
+               return -ENOMEM;
+       }
+
        ret = rte_event_dev_info_get(dev_id, &dev_info);
        if (ret < 0) {
                RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
                                 dev_id, dev_info.driver_name);
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return ret;
        }
@@ -259,6 +366,7 @@ rte_event_crypto_adapter_create_ext(uint8_t id, uint8_t dev_id,
                                        socket_id);
        if (adapter->cdevs == NULL) {
                RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+               eca_circular_buffer_free(&adapter->ebuf);
                rte_free(adapter);
                return -ENOMEM;
        }
@@ -337,10 +445,10 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
        struct crypto_queue_pair_info *qp_info = NULL;
        struct rte_crypto_op *crypto_op;
        unsigned int i, n;
-       uint16_t qp_id, len, ret;
+       uint16_t qp_id, nb_enqueued = 0;
        uint8_t cdev_id;
+       int ret;
 
-       len = 0;
        ret = 0;
        n = 0;
        stats->event_deq_count += cnt;
@@ -366,9 +474,7 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
                                rte_crypto_op_free(crypto_op);
                                continue;
                        }
-                       len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
-                       len++;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                } else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
                                crypto_op->private_data_offset) {
                        m_data = (union rte_event_crypto_metadata *)
@@ -382,87 +488,91 @@ eca_enq_to_cryptodev(struct event_crypto_adapter *adapter, struct rte_event *ev,
                                rte_crypto_op_free(crypto_op);
                                continue;
                        }
-                       len = qp_info->len;
-                       qp_info->op_buffer[len] = crypto_op;
-                       len++;
+                       eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
                } else {
                        rte_pktmbuf_free(crypto_op->sym->m_src);
                        rte_crypto_op_free(crypto_op);
                        continue;
                }
 
-               if (len == BATCH_SIZE) {
-                       struct rte_crypto_op **op_buffer = qp_info->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
-                                                         qp_id,
-                                                         op_buffer,
-                                                         BATCH_SIZE);
-
-                       stats->crypto_enq_count += ret;
-
-                       while (ret < len) {
-                               struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
-                               stats->crypto_enq_fail++;
-                               rte_pktmbuf_free(op->sym->m_src);
-                               rte_crypto_op_free(op);
-                       }
-
-                       len = 0;
+               if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+                       ret = eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+                                                               cdev_id,
+                                                               qp_id,
+                                                               &nb_enqueued);
+                       /**
+                        * If some crypto ops failed to flush to cdev and
+                        * space for another batch is not available, stop
+                        * dequeue from eventdev momentarily
+                        */
+                       if (unlikely(ret < 0 &&
+                               !eca_circular_buffer_space_for_batch(
+                                                       &qp_info->cbuf)))
+                               adapter->stop_enq_to_cryptodev = true;
                }
 
-               if (qp_info)
-                       qp_info->len = len;
-               n += ret;
+               stats->crypto_enq_count += nb_enqueued;
+               n += nb_enqueued;
        }
 
        return n;
 }
 
 static unsigned int
-eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
+                     uint8_t cdev_id, uint16_t *nb_ops_flushed)
 {
-       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
-       struct rte_crypto_op **op_buffer;
        struct rte_cryptodev *dev;
-       uint8_t cdev_id;
+       uint16_t nb = 0, nb_enqueued = 0;
        uint16_t qp;
-       uint16_t ret;
-       uint16_t num_cdev = rte_cryptodev_count();
 
-       ret = 0;
-       for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
-               curr_dev = &adapter->cdevs[cdev_id];
-               dev = curr_dev->dev;
-               if (dev == NULL)
-                       continue;
-               for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
+       curr_dev = &adapter->cdevs[cdev_id];
+       if (unlikely(curr_dev == NULL))
+               return 0;
 
-                       curr_queue = &curr_dev->qpairs[qp];
-                       if (!curr_queue->qp_enabled)
-                               continue;
+       dev = rte_cryptodev_pmd_get_dev(cdev_id);
+       for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
 
-                       op_buffer = curr_queue->op_buffer;
-                       ret = rte_cryptodev_enqueue_burst(cdev_id,
-                                                         qp,
-                                                         op_buffer,
-                                                         curr_queue->len);
-                       stats->crypto_enq_count += ret;
-
-                       while (ret < curr_queue->len) {
-                               struct rte_crypto_op *op;
-                               op = op_buffer[ret++];
-                               stats->crypto_enq_fail++;
-                               rte_pktmbuf_free(op->sym->m_src);
-                               rte_crypto_op_free(op);
-                       }
-                       curr_queue->len = 0;
-               }
+               curr_queue = &curr_dev->qpairs[qp];
+               if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled))
+                       continue;
+
+               eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+                                                 cdev_id,
+                                                 qp,
+                                                 &nb_enqueued);
+               *nb_ops_flushed += curr_queue->cbuf.count;
+               nb += nb_enqueued;
        }
 
-       return ret;
+       return nb;
+}
+
+static unsigned int
+eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+{
+       struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
+       uint8_t cdev_id;
+       uint16_t nb_enqueued = 0;
+       uint16_t nb_ops_flushed = 0;
+       uint16_t num_cdev = rte_cryptodev_count();
+
+       for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
+               nb_enqueued += eca_crypto_cdev_flush(adapter,
+                                                   cdev_id,
+                                                   &nb_ops_flushed);
+       /**
+        * Enable dequeue from eventdev if all ops from circular
+        * buffer flushed to cdev
+        */
+       if (!nb_ops_flushed)
+               adapter->stop_enq_to_cryptodev = false;
+
+       stats->crypto_enq_count += nb_enqueued;
+
+       return nb_enqueued;
 }
 
 static int
@@ -480,6 +590,13 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
        if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
                return 0;
 
+       if (unlikely(adapter->stop_enq_to_cryptodev)) {
+               nb_enqueued += eca_crypto_enq_flush(adapter);
+
+               if (unlikely(adapter->stop_enq_to_cryptodev))
+                       goto skip_event_dequeue_burst;
+       }
+
        for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
                stats->event_poll_count++;
                n = rte_event_dequeue_burst(event_dev_id,
@@ -491,6 +608,8 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
                nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
        }
 
+skip_event_dequeue_burst:
+
        if ((++adapter->transmit_loop_count &
                (CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
                nb_enqueued += eca_crypto_enq_flush(adapter);
@@ -499,9 +618,9 @@ eca_crypto_adapter_enq_run(struct event_crypto_adapter *adapter,
        return nb_enqueued;
 }
 
-static inline void
+static inline uint16_t
 eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
-                     struct rte_crypto_op **ops, uint16_t num)
+                 struct rte_crypto_op **ops, uint16_t num)
 {
        struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
        union rte_event_crypto_metadata *m_data = NULL;
@@ -518,6 +637,8 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
        num = RTE_MIN(num, BATCH_SIZE);
        for (i = 0; i < num; i++) {
                struct rte_event *ev = &events[nb_ev++];
+
+               m_data = NULL;
                if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
                        m_data = rte_cryptodev_sym_session_get_user_data(
                                        ops[i]->sym->session);
@@ -548,21 +669,56 @@ eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
                                                  event_port_id,
                                                  &events[nb_enqueued],
                                                  nb_ev - nb_enqueued);
+
        } while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
                 nb_enqueued < nb_ev);
 
-       /* Free mbufs and rte_crypto_ops for failed events */
-       for (i = nb_enqueued; i < nb_ev; i++) {
-               struct rte_crypto_op *op = events[i].event_ptr;
-               rte_pktmbuf_free(op->sym->m_src);
-               rte_crypto_op_free(op);
-       }
-
        stats->event_enq_fail_count += nb_ev - nb_enqueued;
        stats->event_enq_count += nb_enqueued;
        stats->event_enq_retry_count += retry - 1;
+
+       return nb_enqueued;
+}
+
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+                                  struct crypto_ops_circular_buffer *bufp)
+{
+       uint16_t n = 0, nb_ops_flushed;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_crypto_op **ops = bufp->op_buffer;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = bufp->size - *headp;
+       else
+               return 0;  /* buffer empty */
+
+       nb_ops_flushed =  eca_ops_enqueue_burst(adapter, ops, n);
+       bufp->count -= nb_ops_flushed;
+       if (!bufp->count) {
+               *headp = 0;
+               *tailp = 0;
+               return 0;  /* buffer empty */
+       }
+
+       *headp = (*headp + nb_ops_flushed) % bufp->size;
+       return 1;
 }
 
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+       if (likely(adapter->ebuf.count == 0))
+               return;
+
+       while (eca_circular_buffer_flush_to_evdev(adapter,
+                                                 &adapter->ebuf))
+               ;
+}
 static inline unsigned int
 eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                           unsigned int max_deq)
@@ -571,7 +727,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
        struct crypto_device_info *curr_dev;
        struct crypto_queue_pair_info *curr_queue;
        struct rte_crypto_op *ops[BATCH_SIZE];
-       uint16_t n, nb_deq;
+       uint16_t n, nb_deq, nb_enqueued, i;
        struct rte_cryptodev *dev;
        uint8_t cdev_id;
        uint16_t qp, dev_qps;
@@ -579,16 +735,20 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
        uint16_t num_cdev = rte_cryptodev_count();
 
        nb_deq = 0;
+       eca_ops_buffer_flush(adapter);
+
        do {
-               uint16_t queues = 0;
                done = true;
 
                for (cdev_id = adapter->next_cdev_id;
                        cdev_id < num_cdev; cdev_id++) {
+                       uint16_t queues = 0;
+
                        curr_dev = &adapter->cdevs[cdev_id];
                        dev = curr_dev->dev;
-                       if (dev == NULL)
+                       if (unlikely(dev == NULL))
                                continue;
+
                        dev_qps = dev->data->nb_queue_pairs;
 
                        for (qp = curr_dev->next_queue_pair_id;
@@ -596,7 +756,8 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                                queues++) {
 
                                curr_queue = &curr_dev->qpairs[qp];
-                               if (!curr_queue->qp_enabled)
+                               if (unlikely(curr_queue == NULL ||
+                                   !curr_queue->qp_enabled))
                                        continue;
 
                                n = rte_cryptodev_dequeue_burst(cdev_id, qp,
@@ -605,11 +766,27 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                                        continue;
 
                                done = false;
+                               nb_enqueued = 0;
+
                                stats->crypto_deq_count += n;
-                               eca_ops_enqueue_burst(adapter, ops, n);
+
+                               if (unlikely(!adapter->ebuf.count))
+                                       nb_enqueued = eca_ops_enqueue_burst(
+                                                       adapter, ops, n);
+
+                               if (likely(nb_enqueued == n))
+                                       goto check;
+
+                               /* Failed to enqueue events case */
+                               for (i = nb_enqueued; i < n; i++)
+                                       eca_circular_buffer_add(
+                                               &adapter->ebuf,
+                                               ops[nb_enqueued]);
+
+check:
                                nb_deq += n;
 
-                               if (nb_deq > max_deq) {
+                               if (nb_deq >= max_deq) {
                                        if ((qp + 1) == dev_qps) {
                                                adapter->next_cdev_id =
                                                        (cdev_id + 1)
@@ -622,6 +799,7 @@ eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
                                }
                        }
                }
+               adapter->next_cdev_id = 0;
        } while (done == false);
        return nb_deq;
 }
@@ -751,11 +929,12 @@ eca_add_queue_pair(struct event_crypto_adapter *adapter, uint8_t cdev_id,
                        return -ENOMEM;
 
                qpairs = dev_info->qpairs;
-               qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
-                                       BATCH_SIZE *
-                                       sizeof(struct rte_crypto_op *),
-                                       0, adapter->socket_id);
-               if (!qpairs->op_buffer) {
+
+               if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+                                            &qpairs->cbuf,
+                                            CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
+                       RTE_EDEV_LOG_ERR("Failed to get memory for cryptodev "
+                                        "buffer");
                        rte_free(qpairs);
                        return -ENOMEM;
                }