compress/qat: support IM buffer too small operation
[dpdk.git] / drivers / common / qat / qat_qp.c
index eb1da72..64dfd85 100644 (file)
@@ -650,32 +650,212 @@ kick_tail:
        return nb_ops_sent;
 }
 
+/* Use this for compression only - but keep consistent with above common
+ * function as much as possible.
+ */
+uint16_t
+qat_enqueue_comp_op_burst(void *qp, void **ops, uint16_t nb_ops)
+{
+       register struct qat_queue *queue;
+       struct qat_qp *tmp_qp = (struct qat_qp *)qp;
+       register uint32_t nb_ops_sent = 0;
+       register int nb_desc_to_build;
+       uint16_t nb_ops_possible = nb_ops;
+       register uint8_t *base_addr;
+       register uint32_t tail;
+
+       int descriptors_built, total_descriptors_built = 0;
+       int nb_remaining_descriptors;
+       int overflow = 0;
+
+       if (unlikely(nb_ops == 0))
+               return 0;
+
+       /* read params used a lot in main loop into registers */
+       queue = &(tmp_qp->tx_q);
+       base_addr = (uint8_t *)queue->base_addr;
+       tail = queue->tail;
+
+       /* Find how many can actually fit on the ring */
+       {
+               /* dequeued can only be written by one thread, but it may not
+                * be this thread. As it's 4-byte aligned it will be read
+                * atomically here by any Intel CPU.
+                * enqueued can wrap before dequeued, but cannot
+                * lap it as var size of enq/deq (uint32_t) > var size of
+                * max_inflights (uint16_t). In reality inflights is never
+                * even as big as max uint16_t, as it's <= ADF_MAX_DESC.
+                * On wrapping, the calculation still returns the correct
+                * positive value as all three vars are unsigned.
+                */
+               uint32_t inflights =
+                       tmp_qp->enqueued - tmp_qp->dequeued;
+
+               /* Find how many can actually fit on the ring */
+               overflow = (inflights + nb_ops) - tmp_qp->max_inflights;
+               if (overflow > 0) {
+                       nb_ops_possible = nb_ops - overflow;
+                       if (nb_ops_possible == 0)
+                               return 0;
+               }
+
+               /* QAT has plenty of work queued already, so don't waste cycles
+                * enqueueing, wait til the application has gathered a bigger
+                * burst or some completed ops have been dequeued
+                */
+               if (tmp_qp->min_enq_burst_threshold && inflights >
+                               QAT_QP_MIN_INFL_THRESHOLD && nb_ops_possible <
+                               tmp_qp->min_enq_burst_threshold) {
+                       tmp_qp->stats.threshold_hit_count++;
+                       return 0;
+               }
+       }
+
+       /* At this point nb_ops_possible is assuming a 1:1 mapping
+        * between ops and descriptors.
+        * Fewer may be sent if some ops have to be split.
+        * nb_ops_possible is <= burst size.
+        * Find out how many spaces are actually available on the qp in case
+        * more are needed.
+        */
+       nb_remaining_descriptors = nb_ops_possible
+                        + ((overflow >= 0) ? 0 : overflow * (-1));
+       QAT_DP_LOG(DEBUG, "Nb ops requested %d, nb descriptors remaining %d",
+                       nb_ops, nb_remaining_descriptors);
+
+       while (nb_ops_sent != nb_ops_possible &&
+                               nb_remaining_descriptors > 0) {
+               struct qat_comp_op_cookie *cookie =
+                               tmp_qp->op_cookies[tail >> queue->trailz];
+
+               descriptors_built = 0;
+
+               QAT_DP_LOG(DEBUG, "--- data length: %u",
+                          ((struct rte_comp_op *)*ops)->src.length);
+
+               nb_desc_to_build = qat_comp_build_request(*ops,
+                               base_addr + tail, cookie, tmp_qp->qat_dev_gen);
+               QAT_DP_LOG(DEBUG, "%d descriptors built, %d remaining, "
+                       "%d ops sent, %d descriptors needed",
+                       total_descriptors_built, nb_remaining_descriptors,
+                       nb_ops_sent, nb_desc_to_build);
+
+               if (unlikely(nb_desc_to_build < 0)) {
+                       /* this message cannot be enqueued */
+                       tmp_qp->stats.enqueue_err_count++;
+                       if (nb_ops_sent == 0)
+                               return 0;
+                       goto kick_tail;
+               } else if (unlikely(nb_desc_to_build > 1)) {
+                       /* this op is too big and must be split - get more
+                        * descriptors and retry
+                        */
+
+                       QAT_DP_LOG(DEBUG, "Build %d descriptors for this op",
+                                       nb_desc_to_build);
+
+                       nb_remaining_descriptors -= nb_desc_to_build;
+                       if (nb_remaining_descriptors >= 0) {
+                               /* There are enough remaining descriptors
+                                * so retry
+                                */
+                               int ret2 = qat_comp_build_multiple_requests(
+                                               *ops, tmp_qp, tail,
+                                               nb_desc_to_build);
+
+                               if (unlikely(ret2 < 1)) {
+                                       QAT_DP_LOG(DEBUG,
+                                                       "Failed to build (%d) descriptors, status %d",
+                                                       nb_desc_to_build, ret2);
+
+                                       qat_comp_free_split_op_memzones(cookie,
+                                                       nb_desc_to_build - 1);
+
+                                       tmp_qp->stats.enqueue_err_count++;
+
+                                       /* This message cannot be enqueued */
+                                       if (nb_ops_sent == 0)
+                                               return 0;
+                                       goto kick_tail;
+                               } else {
+                                       descriptors_built = ret2;
+                                       total_descriptors_built +=
+                                                       descriptors_built;
+                                       nb_remaining_descriptors -=
+                                                       descriptors_built;
+                                       QAT_DP_LOG(DEBUG,
+                                                       "Multiple descriptors (%d) built ok",
+                                                       descriptors_built);
+                               }
+                       } else {
+                               QAT_DP_LOG(ERR, "For the current op, number of requested descriptors (%d) "
+                                               "exceeds number of available descriptors (%d)",
+                                               nb_desc_to_build,
+                                               nb_remaining_descriptors +
+                                                       nb_desc_to_build);
+
+                               qat_comp_free_split_op_memzones(cookie,
+                                               nb_desc_to_build - 1);
+
+                               /* Not enough extra descriptors */
+                               if (nb_ops_sent == 0)
+                                       return 0;
+                               goto kick_tail;
+                       }
+               } else {
+                       descriptors_built = 1;
+                       total_descriptors_built++;
+                       nb_remaining_descriptors--;
+                       QAT_DP_LOG(DEBUG, "Single descriptor built ok");
+               }
+
+               tail = adf_modulo(tail + (queue->msg_size * descriptors_built),
+                                 queue->modulo_mask);
+               ops++;
+               nb_ops_sent++;
+       }
+
+kick_tail:
+       queue->tail = tail;
+       tmp_qp->enqueued += total_descriptors_built;
+       tmp_qp->stats.enqueued_count += total_descriptors_built;
+       txq_write_tail(tmp_qp, queue);
+       return nb_ops_sent;
+}
+
 uint16_t
 qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
 {
        struct qat_queue *rx_queue;
        struct qat_qp *tmp_qp = (struct qat_qp *)qp;
        uint32_t head;
-       uint32_t resp_counter = 0;
+       uint32_t op_resp_counter = 0, fw_resp_counter = 0;
        uint8_t *resp_msg;
+       int nb_fw_responses = 0;
 
        rx_queue = &(tmp_qp->rx_q);
        head = rx_queue->head;
        resp_msg = (uint8_t *)rx_queue->base_addr + rx_queue->head;
 
        while (*(uint32_t *)resp_msg != ADF_RING_EMPTY_SIG &&
-                       resp_counter != nb_ops) {
+                       op_resp_counter != nb_ops) {
 
-               if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC)
+               nb_fw_responses = 0;
+               if (tmp_qp->service_type == QAT_SERVICE_SYMMETRIC) {
                        qat_sym_process_response(ops, resp_msg);
-               else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
-                       qat_comp_process_response(ops, resp_msg,
+                       nb_fw_responses = 1;
+               } else if (tmp_qp->service_type == QAT_SERVICE_COMPRESSION)
+
+                       nb_fw_responses = qat_comp_process_response(
+                               ops, resp_msg,
                                tmp_qp->op_cookies[head >> rx_queue->trailz],
                                &tmp_qp->stats.dequeue_err_count);
+
                else if (tmp_qp->service_type == QAT_SERVICE_ASYMMETRIC) {
 #ifdef BUILD_QAT_ASYM
                        qat_asym_process_response(ops, resp_msg,
                                tmp_qp->op_cookies[head >> rx_queue->trailz]);
+                       nb_fw_responses = 1;
 #endif
                }
 
@@ -683,21 +863,38 @@ qat_dequeue_op_burst(void *qp, void **ops, uint16_t nb_ops)
                                  rx_queue->modulo_mask);
 
                resp_msg = (uint8_t *)rx_queue->base_addr + head;
-               ops++;
-               resp_counter++;
+
+               if (ops != NULL && nb_fw_responses) {
+                       /* only move on to next op if one was ready to return
+                        * to API
+                        */
+                       ops++;
+                       op_resp_counter++;
+               }
+
+                /* A compression op may be broken up into multiple fw requests.
+                 * Only count fw responses as complete once ALL the responses
+                 * associated with an op have been processed, as the cookie
+                 * data from the first response must be available until
+                 * finished with all firmware responses.
+                 */
+               fw_resp_counter += nb_fw_responses;
        }
-       if (resp_counter > 0) {
+
+       if (fw_resp_counter > 0) {
                rx_queue->head = head;
-               tmp_qp->dequeued += resp_counter;
-               tmp_qp->stats.dequeued_count += resp_counter;
-               rx_queue->nb_processed_responses += resp_counter;
+               tmp_qp->dequeued += fw_resp_counter;
+               tmp_qp->stats.dequeued_count += fw_resp_counter;
+               rx_queue->nb_processed_responses += fw_resp_counter;
 
                if (rx_queue->nb_processed_responses >
-                                               QAT_CSR_HEAD_WRITE_THRESH)
+                               QAT_CSR_HEAD_WRITE_THRESH)
                        rxq_free_desc(tmp_qp, rx_queue);
        }
+       QAT_DP_LOG(DEBUG, "Dequeue burst return: %u, QAT responses: %u",
+                       op_resp_counter, fw_resp_counter);
 
-       return resp_counter;
+       return op_resp_counter;
 }
 
 /* This is almost same as dequeue_op_burst, without the atomic, without stats