return nb_ops_sent;
}
+/* Use this for compression only - but keep consistent with above common
+ * function as much as possible.
+ */
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops)
+{
+ register struct qat_queue *queue;
+ struct qat_qp *tmp_qp = (struct qat_qp *)qp;
+ register uint32_t nb_ops_sent = 0;
+ register int nb_desc_to_build;
+ uint16_t nb_ops_possible = nb_ops;
+ register uint8_t *base_addr;
+ register uint32_t tail;
+
+ int descriptors_built, total_descriptors_built = 0;
+ int nb_remaining_descriptors;
+ int overflow = 0;
+
+ if (unlikely(nb_ops == 0))
+ return 0;
+
+ /* read params used a lot in main loop into registers */
+ queue = &(tmp_qp->tx_q);
+ base_addr = (uint8_t *)queue->base_addr;
+ tail = queue->tail;
+
+ /* Find how many can actually fit on the ring */
+ {
+ /* dequeued can only be written by one thread, but it may not
+ * be this thread. As it's 4-byte aligned it will be read
+ * atomically here by any Intel CPU.
+ * enqueued can wrap before dequeued, but cannot
+ * lap it as var size of enq/deq (uint32_t) > var size of
+ * max_inflights (uint16_t). In reality inflights is never
+ * even as big as max uint16_t, as it's <= ADF_MAX_DESC.
+ * On wrapping, the calculation still returns the correct
+ * positive value as all three vars are unsigned.
+ */
+ uint32_t inflights =
+ tmp_qp->enqueued - tmp_qp->dequeued;
+
+ /* Find how many can actually fit on the ring */
+ overflow = (inflights + nb_ops) - tmp_qp->max_inflights;
+ if (overflow > 0) {
+ nb_ops_possible = nb_ops - overflow;
+ if (nb_ops_possible == 0)
+ return 0;
+ }
+
+ /* QAT has plenty of work queued already, so don't waste cycles
+ * enqueueing, wait til the application has gathered a bigger
+ * burst or some completed ops have been dequeued
+ */
+ if (tmp_qp->min_enq_burst_threshold && inflights >
+ QAT_QP_MIN_INFL_THRESHOLD && nb_ops_possible <
+ tmp_qp->min_enq_burst_threshold) {
+ tmp_qp->stats.threshold_hit_count++;
+ return 0;
+ }
+ }
+
+ /* At this point nb_ops_possible is assuming a 1:1 mapping
+ * between ops and descriptors.
+ * Fewer may be sent if some ops have to be split.
+ * nb_ops_possible is <= burst size.
+ * Find out how many spaces are actually available on the qp in case
+ * more are needed.
+ */
+ nb_remaining_descriptors = nb_ops_possible
+ + ((overflow >= 0) ? 0 : overflow * (-1));
+ QAT_DP_LOG(DEBUG, "Nb ops requested %d, nb descriptors remaining %d",
+ nb_ops, nb_remaining_descriptors);
+
+ while (nb_ops_sent != nb_ops_possible &&
+ nb_remaining_descriptors > 0) {
+ struct qat_comp_op_cookie *cookie =
+ tmp_qp->op_cookies[tail >> queue->trailz];
+
+ descriptors_built = 0;
+
+ QAT_DP_LOG(DEBUG, "--- data length: %u",
+ ((struct rte_comp_op *)*ops)->src.length);
+
+ nb_desc_to_build = qat_comp_build_request(*ops,
+ base_addr + tail, cookie, tmp_qp->qat_dev_gen);
+ QAT_DP_LOG(DEBUG, "%d descriptors built, %d remaining, "
+ "%d ops sent, %d descriptors needed",
+ total_descriptors_built, nb_remaining_descriptors,
+ nb_ops_sent, nb_desc_to_build);
+
+ if (unlikely(nb_desc_to_build < 0)) {
+ /* this message cannot be enqueued */
+ tmp_qp->stats.enqueue_err_count++;
+ if (nb_ops_sent == 0)
+ return 0;
+ goto kick_tail;
+ } else if (unlikely(nb_desc_to_build > 1)) {
+ /* this op is too big and must be split - get more
+ * descriptors and retry
+ */
+
+ QAT_DP_LOG(DEBUG, "Build %d descriptors for this op",
+ nb_desc_to_build);
+
+ nb_remaining_descriptors -= nb_desc_to_build;
+ if (nb_remaining_descriptors >= 0) {
+ /* There are enough remaining descriptors
+ * so retry
+ */
+ int ret2 = qat_comp_build_multiple_requests(
+ *ops, tmp_qp, tail,
+ nb_desc_to_build);
+
+ if (unlikely(ret2 < 1)) {
+ QAT_DP_LOG(DEBUG,
+ "Failed to build (%d) descriptors, status %d",
+ nb_desc_to_build, ret2);
+
+ qat_comp_free_split_op_memzones(cookie,
+ nb_desc_to_build - 1);
+
+ tmp_qp->stats.enqueue_err_count++;
+
+ /* This message cannot be enqueued */
+ if (nb_ops_sent == 0)
+ return 0;
+ goto kick_tail;
+ } else {
+ descriptors_built = ret2;
+ total_descriptors_built +=
+ descriptors_built;
+ nb_remaining_descriptors -=
+ descriptors_built;
+ QAT_DP_LOG(DEBUG,
+ "Multiple descriptors (%d) built ok",
+ descriptors_built);
+ }
+ } else {
+ QAT_DP_LOG(ERR, "For the current op, number of requested descriptors (%d) "
+ "exceeds number of available descriptors (%d)",
+ nb_desc_to_build,
+ nb_remaining_descriptors +
+ nb_desc_to_build);
+
+ qat_comp_free_split_op_memzones(cookie,
+ nb_desc_to_build - 1);
+
+ /* Not enough extra descriptors */
+ if (nb_ops_sent == 0)
+ return 0;
+ goto kick_tail;
+ }
+ } else {
+ descriptors_built = 1;
+ total_descriptors_built++;
+ nb_remaining_descriptors--;
+ QAT_DP_LOG(DEBUG, "Single descriptor built ok");
+ }
+
+ tail = adf_modulo(tail + (queue->msg_size * descriptors_built),
+ queue->modulo_mask);
+ ops++;
+ nb_ops_sent++;
+ }
+
+kick_tail:
+ queue->tail = tail;
+ tmp_qp->enqueued += total_descriptors_built;
+ tmp_qp->stats.enqueued_count += total_descriptors_built;
+ txq_write_tail(tmp_qp, queue);
+ return nb_ops_sent;
+}
+
uint16_t
qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
{
struct qat_queue *rx_queue;
struct qat_qp *tmp_qp = (struct qat_qp *)qp;
uint32_t head;
- uint32_t resp_counter = 0;
+ uint32_t op_resp_counter = 0, fw_resp_counter = 0;
uint8_t *resp_msg;
+ int nb_fw_responses = 0;
rx_queue = &(tmp_qp->rx_q);
head = rx_queue->head;
resp_msg = (uint8_t *)rx_queue->base_addr + rx_queue->head;
while (*(uint32_t *)resp_msg != ADF_RING_EMPTY_SIG &&
- resp_counter != nb_ops) {
+ op_resp_counter != nb_ops) {
- if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC)
+ nb_fw_responses = 0;
+ if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC) {
qat_sym_process_response(ops, resp_msg);
- else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
- qat_comp_process_response(ops, resp_msg,
+ nb_fw_responses = 1;
+ } else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
+
+ nb_fw_responses = qat_comp_process_response(
+ ops, resp_msg,
tmp_qp->op_cookies[head >> rx_queue->trailz],
&tmp_qp->stats.dequeue_err_count);
+
else if (tmp_qp->service_type == QAT_SERVICE_ASYMMETRIC) {
#ifdef BUILD_QAT_ASYM
qat_asym_process_response(ops, resp_msg,
tmp_qp->op_cookies[head >> rx_queue->trailz]);
+ nb_fw_responses = 1;
#endif
}
rx_queue->modulo_mask);
resp_msg = (uint8_t *)rx_queue->base_addr + head;
- ops++;
- resp_counter++;
+
+ if (ops != NULL && nb_fw_responses) {
+ /* only move on to next op if one was ready to return
+ * to API
+ */
+ ops++;
+ op_resp_counter++;
+ }
+
+ /* A compression op may be broken up into multiple fw requests.
+ * Only count fw responses as complete once ALL the responses
+ * associated with an op have been processed, as the cookie
+ * data from the first response must be available until
+ * finished with all firmware responses.
+ */
+ fw_resp_counter += nb_fw_responses;
}
- if (resp_counter > 0) {
+
+ if (fw_resp_counter > 0) {
rx_queue->head = head;
- tmp_qp->dequeued += resp_counter;
- tmp_qp->stats.dequeued_count += resp_counter;
- rx_queue->nb_processed_responses += resp_counter;
+ tmp_qp->dequeued += fw_resp_counter;
+ tmp_qp->stats.dequeued_count += fw_resp_counter;
+ rx_queue->nb_processed_responses += fw_resp_counter;
if (rx_queue->nb_processed_responses >
- QAT_CSR_HEAD_WRITE_THRESH)
+ QAT_CSR_HEAD_WRITE_THRESH)
rxq_free_desc(tmp_qp, rx_queue);
}
+ QAT_DP_LOG(DEBUG, "Dequeue burst return: %u, QAT responses: %u",
+ op_resp_counter, fw_resp_counter);
- return resp_counter;
+ return op_resp_counter;
}
/* This is almost same as dequeue_op_burst, without the atomic, without stats