#define CRYPTO_ADAPTER_MEM_NAME_LEN 32
#define CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES 100
+#define CRYPTO_ADAPTER_OPS_BUFFER_SZ (BATCH_SIZE + BATCH_SIZE)
+#define CRYPTO_ADAPTER_BUFFER_SZ 1024
+
/* Flush an instance's enqueue buffers every CRYPTO_ENQ_FLUSH_THRESHOLD
* iterations of eca_crypto_adapter_enq_run()
*/
#define CRYPTO_ENQ_FLUSH_THRESHOLD 1024
+struct crypto_ops_circular_buffer {
+ /* index of head element in circular buffer */
+ uint16_t head;
+ /* index of tail element in circular buffer */
+ uint16_t tail;
+ /* number of elements in buffer */
+ uint16_t count;
+ /* size of circular buffer */
+ uint16_t size;
+ /* Pointer to hold rte_crypto_ops for batching */
+ struct rte_crypto_op **op_buffer;
+} __rte_cache_aligned;
+
struct event_crypto_adapter {
/* Event device identifier */
uint8_t eventdev_id;
uint8_t event_port_id;
/* Store event device's implicit release capability */
uint8_t implicit_release_disabled;
+ /* Flag to indicate backpressure at cryptodev
+ * Stop further dequeuing events from eventdev
+ */
+ bool stop_enq_to_cryptodev;
/* Max crypto ops processed in any service function invocation */
uint32_t max_nb;
/* Lock to serialize config updates with service function */
struct crypto_device_info *cdevs;
/* Loop counter to flush crypto ops */
uint16_t transmit_loop_count;
+ /* Circular buffer for batching crypto ops to eventdev */
+ struct crypto_ops_circular_buffer ebuf;
/* Per instance stats structure */
struct rte_event_crypto_adapter_stats crypto_stats;
/* Configuration callback for rte_service configuration */
struct crypto_queue_pair_info {
/* Set to indicate queue pair is enabled */
bool qp_enabled;
- /* Pointer to hold rte_crypto_ops for batching */
- struct rte_crypto_op **op_buffer;
- /* No of crypto ops accumulated */
- uint8_t len;
+ /* Circular buffer for batching crypto ops to cdev */
+ struct crypto_ops_circular_buffer cbuf;
} __rte_cache_aligned;
static struct event_crypto_adapter **event_crypto_adapter;
return 0;
}
+static inline bool
+eca_circular_buffer_batch_ready(struct crypto_ops_circular_buffer *bufp)
+{
+ return bufp->count >= BATCH_SIZE;
+}
+
+static inline bool
+eca_circular_buffer_space_for_batch(struct crypto_ops_circular_buffer *bufp)
+{
+ return (bufp->size - bufp->count) >= BATCH_SIZE;
+}
+
+static inline void
+eca_circular_buffer_free(struct crypto_ops_circular_buffer *bufp)
+{
+ rte_free(bufp->op_buffer);
+}
+
+static inline int
+eca_circular_buffer_init(const char *name,
+ struct crypto_ops_circular_buffer *bufp,
+ uint16_t sz)
+{
+ bufp->op_buffer = rte_zmalloc(name,
+ sizeof(struct rte_crypto_op *) * sz,
+ 0);
+ if (bufp->op_buffer == NULL)
+ return -ENOMEM;
+
+ bufp->size = sz;
+ return 0;
+}
+
+static inline int
+eca_circular_buffer_add(struct crypto_ops_circular_buffer *bufp,
+ struct rte_crypto_op *op)
+{
+ uint16_t *tailp = &bufp->tail;
+
+ bufp->op_buffer[*tailp] = op;
+ /* circular buffer, go round */
+ *tailp = (*tailp + 1) % bufp->size;
+ bufp->count++;
+
+ return 0;
+}
+
+static inline int
+eca_circular_buffer_flush_to_cdev(struct crypto_ops_circular_buffer *bufp,
+ uint8_t cdev_id, uint16_t qp_id,
+ uint16_t *nb_ops_flushed)
+{
+ uint16_t n = 0;
+ uint16_t *headp = &bufp->head;
+ uint16_t *tailp = &bufp->tail;
+ struct rte_crypto_op **ops = bufp->op_buffer;
+
+ if (*tailp > *headp)
+ n = *tailp - *headp;
+ else if (*tailp < *headp)
+ n = bufp->size - *headp;
+ else {
+ *nb_ops_flushed = 0;
+ return 0; /* buffer empty */
+ }
+
+ *nb_ops_flushed = rte_cryptodev_enqueue_burst(cdev_id, qp_id,
+ &ops[*headp], n);
+ bufp->count -= *nb_ops_flushed;
+ if (!bufp->count) {
+ *headp = 0;
+ *tailp = 0;
+ } else
+ *headp = (*headp + *nb_ops_flushed) % bufp->size;
+
+ return *nb_ops_flushed == n ? 0 : -1;
+}
+
static inline struct event_crypto_adapter *
eca_id_to_adapter(uint8_t id)
{
return -ENOMEM;
}
+ if (eca_circular_buffer_init("eca_edev_circular_buffer",
+ &adapter->ebuf,
+ CRYPTO_ADAPTER_BUFFER_SZ)) {
+ RTE_EDEV_LOG_ERR("Failed to get memory for eventdev buffer");
+ rte_free(adapter);
+ return -ENOMEM;
+ }
+
ret = rte_event_dev_info_get(dev_id, &dev_info);
if (ret < 0) {
RTE_EDEV_LOG_ERR("Failed to get info for eventdev %d: %s!",
dev_id, dev_info.driver_name);
+ eca_circular_buffer_free(&adapter->ebuf);
rte_free(adapter);
return ret;
}
socket_id);
if (adapter->cdevs == NULL) {
RTE_EDEV_LOG_ERR("Failed to get mem for crypto devices\n");
+ eca_circular_buffer_free(&adapter->ebuf);
rte_free(adapter);
return -ENOMEM;
}
struct crypto_queue_pair_info *qp_info = NULL;
struct rte_crypto_op *crypto_op;
unsigned int i, n;
- uint16_t qp_id, len, ret;
+ uint16_t qp_id, nb_enqueued = 0;
uint8_t cdev_id;
+ int ret;
- len = 0;
ret = 0;
n = 0;
stats->event_deq_count += cnt;
rte_crypto_op_free(crypto_op);
continue;
}
- len = qp_info->len;
- qp_info->op_buffer[len] = crypto_op;
- len++;
+ eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
} else if (crypto_op->sess_type == RTE_CRYPTO_OP_SESSIONLESS &&
crypto_op->private_data_offset) {
m_data = (union rte_event_crypto_metadata *)
rte_crypto_op_free(crypto_op);
continue;
}
- len = qp_info->len;
- qp_info->op_buffer[len] = crypto_op;
- len++;
+ eca_circular_buffer_add(&qp_info->cbuf, crypto_op);
} else {
rte_pktmbuf_free(crypto_op->sym->m_src);
rte_crypto_op_free(crypto_op);
continue;
}
- if (len == BATCH_SIZE) {
- struct rte_crypto_op **op_buffer = qp_info->op_buffer;
- ret = rte_cryptodev_enqueue_burst(cdev_id,
- qp_id,
- op_buffer,
- BATCH_SIZE);
-
- stats->crypto_enq_count += ret;
-
- while (ret < len) {
- struct rte_crypto_op *op;
- op = op_buffer[ret++];
- stats->crypto_enq_fail++;
- rte_pktmbuf_free(op->sym->m_src);
- rte_crypto_op_free(op);
- }
-
- len = 0;
+ if (eca_circular_buffer_batch_ready(&qp_info->cbuf)) {
+ ret = eca_circular_buffer_flush_to_cdev(&qp_info->cbuf,
+ cdev_id,
+ qp_id,
+ &nb_enqueued);
+ /**
+ * If some crypto ops failed to flush to cdev and
+ * space for another batch is not available, stop
+ * dequeue from eventdev momentarily
+ */
+ if (unlikely(ret < 0 &&
+ !eca_circular_buffer_space_for_batch(
+ &qp_info->cbuf)))
+ adapter->stop_enq_to_cryptodev = true;
}
- if (qp_info)
- qp_info->len = len;
- n += ret;
+ stats->crypto_enq_count += nb_enqueued;
+ n += nb_enqueued;
}
return n;
}
static unsigned int
-eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+eca_crypto_cdev_flush(struct event_crypto_adapter *adapter,
+ uint8_t cdev_id, uint16_t *nb_ops_flushed)
{
- struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
struct crypto_device_info *curr_dev;
struct crypto_queue_pair_info *curr_queue;
- struct rte_crypto_op **op_buffer;
struct rte_cryptodev *dev;
- uint8_t cdev_id;
+ uint16_t nb = 0, nb_enqueued = 0;
uint16_t qp;
- uint16_t ret;
- uint16_t num_cdev = rte_cryptodev_count();
- ret = 0;
- for (cdev_id = 0; cdev_id < num_cdev; cdev_id++) {
- curr_dev = &adapter->cdevs[cdev_id];
- dev = curr_dev->dev;
- if (dev == NULL)
- continue;
- for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
+ curr_dev = &adapter->cdevs[cdev_id];
+ if (unlikely(curr_dev == NULL))
+ return 0;
- curr_queue = &curr_dev->qpairs[qp];
- if (!curr_queue->qp_enabled)
- continue;
+ dev = rte_cryptodev_pmd_get_dev(cdev_id);
+ for (qp = 0; qp < dev->data->nb_queue_pairs; qp++) {
- op_buffer = curr_queue->op_buffer;
- ret = rte_cryptodev_enqueue_burst(cdev_id,
- qp,
- op_buffer,
- curr_queue->len);
- stats->crypto_enq_count += ret;
-
- while (ret < curr_queue->len) {
- struct rte_crypto_op *op;
- op = op_buffer[ret++];
- stats->crypto_enq_fail++;
- rte_pktmbuf_free(op->sym->m_src);
- rte_crypto_op_free(op);
- }
- curr_queue->len = 0;
- }
+ curr_queue = &curr_dev->qpairs[qp];
+ if (unlikely(curr_queue == NULL || !curr_queue->qp_enabled))
+ continue;
+
+ eca_circular_buffer_flush_to_cdev(&curr_queue->cbuf,
+ cdev_id,
+ qp,
+ &nb_enqueued);
+ *nb_ops_flushed += curr_queue->cbuf.count;
+ nb += nb_enqueued;
}
- return ret;
+ return nb;
+}
+
+static unsigned int
+eca_crypto_enq_flush(struct event_crypto_adapter *adapter)
+{
+ struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
+ uint8_t cdev_id;
+ uint16_t nb_enqueued = 0;
+ uint16_t nb_ops_flushed = 0;
+ uint16_t num_cdev = rte_cryptodev_count();
+
+ for (cdev_id = 0; cdev_id < num_cdev; cdev_id++)
+ nb_enqueued += eca_crypto_cdev_flush(adapter,
+ cdev_id,
+ &nb_ops_flushed);
+ /**
+ * Enable dequeue from eventdev if all ops from circular
+ * buffer flushed to cdev
+ */
+ if (!nb_ops_flushed)
+ adapter->stop_enq_to_cryptodev = false;
+
+ stats->crypto_enq_count += nb_enqueued;
+
+ return nb_enqueued;
}
static int
if (adapter->mode == RTE_EVENT_CRYPTO_ADAPTER_OP_NEW)
return 0;
+ if (unlikely(adapter->stop_enq_to_cryptodev)) {
+ nb_enqueued += eca_crypto_enq_flush(adapter);
+
+ if (unlikely(adapter->stop_enq_to_cryptodev))
+ goto skip_event_dequeue_burst;
+ }
+
for (nb_enq = 0; nb_enq < max_enq; nb_enq += n) {
stats->event_poll_count++;
n = rte_event_dequeue_burst(event_dev_id,
nb_enqueued += eca_enq_to_cryptodev(adapter, ev, n);
}
+skip_event_dequeue_burst:
+
if ((++adapter->transmit_loop_count &
(CRYPTO_ENQ_FLUSH_THRESHOLD - 1)) == 0) {
nb_enqueued += eca_crypto_enq_flush(adapter);
return nb_enqueued;
}
-static inline void
+static inline uint16_t
eca_ops_enqueue_burst(struct event_crypto_adapter *adapter,
- struct rte_crypto_op **ops, uint16_t num)
+ struct rte_crypto_op **ops, uint16_t num)
{
struct rte_event_crypto_adapter_stats *stats = &adapter->crypto_stats;
union rte_event_crypto_metadata *m_data = NULL;
num = RTE_MIN(num, BATCH_SIZE);
for (i = 0; i < num; i++) {
struct rte_event *ev = &events[nb_ev++];
+
+ m_data = NULL;
if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) {
m_data = rte_cryptodev_sym_session_get_user_data(
ops[i]->sym->session);
event_port_id,
&events[nb_enqueued],
nb_ev - nb_enqueued);
+
} while (retry++ < CRYPTO_ADAPTER_MAX_EV_ENQ_RETRIES &&
nb_enqueued < nb_ev);
- /* Free mbufs and rte_crypto_ops for failed events */
- for (i = nb_enqueued; i < nb_ev; i++) {
- struct rte_crypto_op *op = events[i].event_ptr;
- rte_pktmbuf_free(op->sym->m_src);
- rte_crypto_op_free(op);
- }
-
stats->event_enq_fail_count += nb_ev - nb_enqueued;
stats->event_enq_count += nb_enqueued;
stats->event_enq_retry_count += retry - 1;
+
+ return nb_enqueued;
+}
+
+static int
+eca_circular_buffer_flush_to_evdev(struct event_crypto_adapter *adapter,
+ struct crypto_ops_circular_buffer *bufp)
+{
+ uint16_t n = 0, nb_ops_flushed;
+ uint16_t *headp = &bufp->head;
+ uint16_t *tailp = &bufp->tail;
+ struct rte_crypto_op **ops = bufp->op_buffer;
+
+ if (*tailp > *headp)
+ n = *tailp - *headp;
+ else if (*tailp < *headp)
+ n = bufp->size - *headp;
+ else
+ return 0; /* buffer empty */
+
+ nb_ops_flushed = eca_ops_enqueue_burst(adapter, ops, n);
+ bufp->count -= nb_ops_flushed;
+ if (!bufp->count) {
+ *headp = 0;
+ *tailp = 0;
+ return 0; /* buffer empty */
+ }
+
+ *headp = (*headp + nb_ops_flushed) % bufp->size;
+ return 1;
}
+
+static void
+eca_ops_buffer_flush(struct event_crypto_adapter *adapter)
+{
+ if (likely(adapter->ebuf.count == 0))
+ return;
+
+ while (eca_circular_buffer_flush_to_evdev(adapter,
+ &adapter->ebuf))
+ ;
+}
static inline unsigned int
eca_crypto_adapter_deq_run(struct event_crypto_adapter *adapter,
unsigned int max_deq)
struct crypto_device_info *curr_dev;
struct crypto_queue_pair_info *curr_queue;
struct rte_crypto_op *ops[BATCH_SIZE];
- uint16_t n, nb_deq;
+ uint16_t n, nb_deq, nb_enqueued, i;
struct rte_cryptodev *dev;
uint8_t cdev_id;
uint16_t qp, dev_qps;
uint16_t num_cdev = rte_cryptodev_count();
nb_deq = 0;
+ eca_ops_buffer_flush(adapter);
+
do {
- uint16_t queues = 0;
done = true;
for (cdev_id = adapter->next_cdev_id;
cdev_id < num_cdev; cdev_id++) {
+ uint16_t queues = 0;
+
curr_dev = &adapter->cdevs[cdev_id];
dev = curr_dev->dev;
- if (dev == NULL)
+ if (unlikely(dev == NULL))
continue;
+
dev_qps = dev->data->nb_queue_pairs;
for (qp = curr_dev->next_queue_pair_id;
queues++) {
curr_queue = &curr_dev->qpairs[qp];
- if (!curr_queue->qp_enabled)
+ if (unlikely(curr_queue == NULL ||
+ !curr_queue->qp_enabled))
continue;
n = rte_cryptodev_dequeue_burst(cdev_id, qp,
continue;
done = false;
+ nb_enqueued = 0;
+
stats->crypto_deq_count += n;
- eca_ops_enqueue_burst(adapter, ops, n);
+
+ if (unlikely(!adapter->ebuf.count))
+ nb_enqueued = eca_ops_enqueue_burst(
+ adapter, ops, n);
+
+ if (likely(nb_enqueued == n))
+ goto check;
+
+ /* Failed to enqueue events case */
+ for (i = nb_enqueued; i < n; i++)
+ eca_circular_buffer_add(
+ &adapter->ebuf,
+ ops[nb_enqueued]);
+
+check:
nb_deq += n;
- if (nb_deq > max_deq) {
+ if (nb_deq >= max_deq) {
if ((qp + 1) == dev_qps) {
adapter->next_cdev_id =
(cdev_id + 1)
}
}
}
+ adapter->next_cdev_id = 0;
} while (done == false);
return nb_deq;
}
return -ENOMEM;
qpairs = dev_info->qpairs;
- qpairs->op_buffer = rte_zmalloc_socket(adapter->mem_name,
- BATCH_SIZE *
- sizeof(struct rte_crypto_op *),
- 0, adapter->socket_id);
- if (!qpairs->op_buffer) {
+
+ if (eca_circular_buffer_init("eca_cdev_circular_buffer",
+ &qpairs->cbuf,
+ CRYPTO_ADAPTER_OPS_BUFFER_SZ)) {
+ RTE_EDEV_LOG_ERR("Failed to get memory for cryptodev "
+ "buffer");
rte_free(qpairs);
return -ENOMEM;
}