}
/* Allocate the queue pair data structure. */
- qp = rte_zmalloc("qat PMD qp metadata",
- sizeof(*qp), RTE_CACHE_LINE_SIZE);
+ qp = rte_zmalloc_socket("qat PMD qp metadata",
+ sizeof(*qp), RTE_CACHE_LINE_SIZE,
+ qat_qp_conf->socket_id);
if (qp == NULL) {
QAT_LOG(ERR, "Failed to alloc mem for qp struct");
return -ENOMEM;
}
qp->nb_descriptors = qat_qp_conf->nb_descriptors;
- qp->op_cookies = rte_zmalloc("qat PMD op cookie pointer",
+ qp->op_cookies = rte_zmalloc_socket("qat PMD op cookie pointer",
qat_qp_conf->nb_descriptors * sizeof(*qp->op_cookies),
- RTE_CACHE_LINE_SIZE);
+ RTE_CACHE_LINE_SIZE, qat_qp_conf->socket_id);
if (qp->op_cookies == NULL) {
QAT_LOG(ERR, "Failed to alloc mem for cookie");
rte_free(qp);
}
qp->mmap_bar_addr = pci_dev->mem_resource[0].addr;
- qp->inflights16 = 0;
+ qp->enqueued = qp->dequeued = 0;
if (qat_queue_create(qat_dev, &(qp->tx_q), qat_qp_conf,
ADF_RING_DIR_TX) != 0) {
goto create_err;
}
+ qp->max_inflights = ADF_MAX_INFLIGHTS(qp->tx_q.queue_size,
+ ADF_BYTES_TO_MSG_SIZE(qp->tx_q.msg_size));
+
+ if (qp->max_inflights < 2) {
+ QAT_LOG(ERR, "Invalid num inflights");
+ qat_queue_delete(&(qp->tx_q));
+ goto create_err;
+ }
+
if (qat_queue_create(qat_dev, &(qp->rx_q), qat_qp_conf,
ADF_RING_DIR_RX) != 0) {
QAT_LOG(ERR, "Rx queue create failed "
qp->op_cookie_pool = rte_mempool_create(op_cookie_pool_name,
qp->nb_descriptors,
qat_qp_conf->cookie_size, 64, 0,
- NULL, NULL, NULL, NULL, qat_qp_conf->socket_id,
+ NULL, NULL, NULL, NULL,
+ qat_dev->pci_dev->device.numa_node,
0);
if (!qp->op_cookie_pool) {
QAT_LOG(ERR, "QAT PMD Cannot create"
qp->qat_dev->qat_dev_id);
/* Don't free memory if there are still responses to be processed */
- if (qp->inflights16 == 0) {
+ if ((qp->enqueued - qp->dequeued) == 0) {
qat_queue_delete(&(qp->tx_q));
qat_queue_delete(&(qp->rx_q));
} else {
qp_conf->service_str, "qp_mem",
queue->hw_bundle_number, queue->hw_queue_number);
qp_mz = queue_dma_zone_reserve(queue->memz_name, queue_size_bytes,
- qp_conf->socket_id);
+ qat_dev->pci_dev->device.numa_node);
if (qp_mz == NULL) {
QAT_LOG(ERR, "Failed to allocate ring memzone");
return -ENOMEM;
goto queue_create_err;
}
- queue->max_inflights = ADF_MAX_INFLIGHTS(queue->queue_size,
- ADF_BYTES_TO_MSG_SIZE(desc_size));
queue->modulo_mask = (1 << ADF_RING_SIZE_MODULO(queue->queue_size)) - 1;
-
- if (queue->max_inflights < 2) {
- QAT_LOG(ERR, "Invalid num inflights");
- ret = -EINVAL;
- goto queue_create_err;
- }
queue->head = 0;
queue->tail = 0;
queue->msg_size = desc_size;
queue->hw_queue_number, queue_base);
QAT_LOG(DEBUG, "RING: Name:%s, size in CSR: %u, in bytes %u,"
- " nb msgs %u, msg_size %u, max_inflights %u modulo mask %u",
+ " nb msgs %u, msg_size %u, modulo mask %u",
queue->memz_name,
queue->queue_size, queue_size_bytes,
qp_conf->nb_descriptors, desc_size,
- queue->max_inflights, queue->modulo_mask);
+ queue->modulo_mask);
return 0;
txq_write_tail(struct qat_qp *qp, struct qat_queue *q) {
WRITE_CSR_RING_TAIL(qp->mmap_bar_addr, q->hw_bundle_number,
q->hw_queue_number, q->tail);
- q->nb_pending_requests = 0;
q->csr_tail = q->tail;
}
uint16_t nb_ops_possible = nb_ops;
register uint8_t *base_addr;
register uint32_t tail;
- int overflow;
if (unlikely(nb_ops == 0))
return 0;
tail = queue->tail;
/* Find how many can actually fit on the ring */
- tmp_qp->inflights16 += nb_ops;
- overflow = tmp_qp->inflights16 - queue->max_inflights;
- if (overflow > 0) {
- tmp_qp->inflights16 -= overflow;
- nb_ops_possible = nb_ops - overflow;
- if (nb_ops_possible == 0)
+ {
+ /* dequeued can only be written by one thread, but it may not
+ * be this thread. As it's 4-byte aligned it will be read
+ * atomically here by any Intel CPU.
+ * enqueued can wrap before dequeued, but cannot
+ * lap it as var size of enq/deq (uint32_t) > var size of
+ * max_inflights (uint16_t). In reality inflights is never
+ * even as big as max uint16_t, as it's <= ADF_MAX_DESC.
+ * On wrapping, the calculation still returns the correct
+ * positive value as all three vars are unsigned.
+ */
+ uint32_t inflights =
+ tmp_qp->enqueued - tmp_qp->dequeued;
+
+ if ((inflights + nb_ops) > tmp_qp->max_inflights) {
+ nb_ops_possible = tmp_qp->max_inflights - inflights;
+ if (nb_ops_possible == 0)
+ return 0;
+ }
+ /* QAT has plenty of work queued already, so don't waste cycles
+ * enqueueing, wait til the application has gathered a bigger
+ * burst or some completed ops have been dequeued
+ */
+ if (tmp_qp->min_enq_burst_threshold && inflights >
+ QAT_QP_MIN_INFL_THRESHOLD && nb_ops_possible <
+ tmp_qp->min_enq_burst_threshold) {
+ tmp_qp->stats.threshold_hit_count++;
return 0;
+ }
}
+
while (nb_ops_sent != nb_ops_possible) {
ret = tmp_qp->build_request(*ops, base_addr + tail,
tmp_qp->op_cookies[tail / queue->msg_size],
tmp_qp->qat_dev_gen);
if (ret != 0) {
tmp_qp->stats.enqueue_err_count++;
- /*
- * This message cannot be enqueued,
- * decrease number of ops that wasn't sent
- */
- tmp_qp->inflights16 -= nb_ops_possible - nb_ops_sent;
+ /* This message cannot be enqueued */
if (nb_ops_sent == 0)
return 0;
goto kick_tail;
}
kick_tail:
queue->tail = tail;
+ tmp_qp->enqueued += nb_ops_sent;
tmp_qp->stats.enqueued_count += nb_ops_sent;
- queue->nb_pending_requests += nb_ops_sent;
- if (tmp_qp->inflights16 < QAT_CSR_TAIL_FORCE_WRITE_THRESH ||
- queue->nb_pending_requests > QAT_CSR_TAIL_WRITE_THRESH) {
- txq_write_tail(tmp_qp, queue);
- }
+ txq_write_tail(tmp_qp, queue);
return nb_ops_sent;
}
uint16_t
qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
{
- struct qat_queue *rx_queue, *tx_queue;
+ struct qat_queue *rx_queue;
struct qat_qp *tmp_qp = (struct qat_qp *)qp;
uint32_t head;
uint32_t resp_counter = 0;
uint8_t *resp_msg;
rx_queue = &(tmp_qp->rx_q);
- tx_queue = &(tmp_qp->tx_q);
head = rx_queue->head;
resp_msg = (uint8_t *)rx_queue->base_addr + rx_queue->head;
qat_sym_process_response(ops, resp_msg);
else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
qat_comp_process_response(ops, resp_msg,
- &tmp_qp->stats.dequeue_err_count);
+ tmp_qp->op_cookies[head / rx_queue->msg_size],
+ &tmp_qp->stats.dequeue_err_count);
else if (tmp_qp->service_type == QAT_SERVICE_ASYMMETRIC) {
#ifdef BUILD_QAT_ASYM
qat_asym_process_response(ops, resp_msg,
}
if (resp_counter > 0) {
rx_queue->head = head;
+ tmp_qp->dequeued += resp_counter;
tmp_qp->stats.dequeued_count += resp_counter;
rx_queue->nb_processed_responses += resp_counter;
- tmp_qp->inflights16 -= resp_counter;
if (rx_queue->nb_processed_responses >
QAT_CSR_HEAD_WRITE_THRESH)
rxq_free_desc(tmp_qp, rx_queue);
}
- /* also check if tail needs to be advanced */
- if (tmp_qp->inflights16 <= QAT_CSR_TAIL_FORCE_WRITE_THRESH &&
- tx_queue->tail != tx_queue->csr_tail) {
- txq_write_tail(tmp_qp, tx_queue);
- }
+
return resp_counter;
}
__rte_weak int
qat_comp_process_response(void **op __rte_unused, uint8_t *resp __rte_unused,
+ void *op_cookie __rte_unused,
uint64_t *dequeue_err_count __rte_unused)
{
return 0;