raw/ioat: report status of completed jobs
authorBruce Richardson <bruce.richardson@intel.com>
Tue, 4 May 2021 13:14:58 +0000 (14:14 +0100)
committerThomas Monjalon <thomas@monjalon.net>
Tue, 4 May 2021 15:43:50 +0000 (17:43 +0200)
Add improved error handling to rte_ioat_completed_ops(). This patch adds
new parameters to the function to enable the user to track the completion
status of each individual operation in a batch. With this addition, the
function can help the user to determine firstly, how many operations may
have failed or been skipped and then secondly, which specific operations
did not complete successfully.

Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Signed-off-by: Bruce Richardson <bruce.richardson@intel.com>
doc/guides/rel_notes/release_21_05.rst
drivers/raw/ioat/ioat_common.c
drivers/raw/ioat/ioat_rawdev_test.c
drivers/raw/ioat/rte_idxd_rawdev_fns.h
drivers/raw/ioat/rte_ioat_rawdev.h
drivers/raw/ioat/rte_ioat_rawdev_fns.h
examples/ioat/ioatfwd.c
examples/vhost/ioat.c

index cd33898..064810a 100644 (file)
@@ -333,6 +333,11 @@ API Changes
   it's not supported on the current platform. Instead ``rte_stack_create()``
   fails and ``rte_errno`` is set to ``ENOTSUP``.
 
+* raw/ioat: The experimental function ``rte_ioat_completed_ops()`` now
+  supports two additional parameters, ``status`` and ``num_unsuccessful``,
+  to allow the reporting of errors from hardware when performing copy
+  operations.
+
 
 ABI Changes
 -----------
index fcb3057..d01c1ee 100644 (file)
@@ -162,6 +162,15 @@ idxd_dev_configure(const struct rte_rawdev *dev,
                rte_idxd->desc_ring = NULL;
                return -ENOMEM;
        }
+       rte_idxd->hdl_ring_flags = rte_zmalloc(NULL,
+                       sizeof(*rte_idxd->hdl_ring_flags) * max_desc, 0);
+       if (rte_idxd->hdl_ring_flags == NULL) {
+               rte_free(rte_idxd->desc_ring);
+               rte_free(rte_idxd->hdl_ring);
+               rte_idxd->desc_ring = NULL;
+               rte_idxd->hdl_ring = NULL;
+               return -ENOMEM;
+       }
        rte_idxd->hdls_read = rte_idxd->batch_start = 0;
        rte_idxd->batch_size = 0;
 
index 839a716..5e33669 100644 (file)
@@ -73,13 +73,15 @@ do_multi_copies(int dev_id, int split_batches, int split_completions)
        if (split_completions) {
                /* gather completions in two halves */
                uint16_t half_len = RTE_DIM(srcs) / 2;
-               if (rte_ioat_completed_ops(dev_id, half_len, (void *)completed_src,
+               if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+                               (void *)completed_src,
                                (void *)completed_dst) != half_len) {
                        PRINT_ERR("Error with rte_ioat_completed_ops - first half request\n");
                        rte_rawdev_dump(dev_id, stdout);
                        return -1;
                }
-               if (rte_ioat_completed_ops(dev_id, half_len, (void *)&completed_src[half_len],
+               if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+                               (void *)&completed_src[half_len],
                                (void *)&completed_dst[half_len]) != half_len) {
                        PRINT_ERR("Error with rte_ioat_completed_ops - second half request\n");
                        rte_rawdev_dump(dev_id, stdout);
@@ -87,7 +89,8 @@ do_multi_copies(int dev_id, int split_batches, int split_completions)
                }
        } else {
                /* gather all completions in one go */
-               if (rte_ioat_completed_ops(dev_id, 64, (void *)completed_src,
+               if (rte_ioat_completed_ops(dev_id, RTE_DIM(completed_src), NULL, NULL,
+                               (void *)completed_src,
                                (void *)completed_dst) != RTE_DIM(srcs)) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        rte_rawdev_dump(dev_id, stdout);
@@ -151,7 +154,7 @@ test_enqueue_copies(int dev_id)
                rte_ioat_perform_ops(dev_id);
                usleep(10);
 
-               if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
                                (void *)&completed[1]) != 1) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        return -1;
@@ -170,6 +173,13 @@ test_enqueue_copies(int dev_id)
                        }
                rte_pktmbuf_free(src);
                rte_pktmbuf_free(dst);
+
+               /* check ring is now empty */
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
+                               (void *)&completed[1]) != 0) {
+                       PRINT_ERR("Error: got unexpected returned handles from rte_ioat_completed_ops\n");
+                       return -1;
+               }
        } while (0);
 
        /* test doing a multiple single copies */
@@ -203,7 +213,8 @@ test_enqueue_copies(int dev_id)
                }
                usleep(10);
 
-               if (rte_ioat_completed_ops(dev_id, max_completions, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, max_completions, NULL, NULL,
+                               (void *)&completed[0],
                                (void *)&completed[max_completions]) != max_ops) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        rte_rawdev_dump(dev_id, stdout);
@@ -256,7 +267,7 @@ test_enqueue_fill(int dev_id)
                rte_ioat_perform_ops(dev_id);
                usleep(100);
 
-               if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
                        (void *)&completed[1]) != 1) {
                        PRINT_ERR("Error with completed ops\n");
                        return -1;
@@ -266,8 +277,7 @@ test_enqueue_fill(int dev_id)
                        char pat_byte = ((char *)&pattern)[j % 8];
                        if (dst_data[j] != pat_byte) {
                                PRINT_ERR("Error with fill operation (lengths = %u): got (%x), not (%x)\n",
-                                               lengths[i], dst_data[j],
-                                               pat_byte);
+                                               lengths[i], dst_data[j], pat_byte);
                                return -1;
                        }
                }
@@ -323,6 +333,7 @@ test_burst_capacity(int dev_id)
                usleep(100);
                for (i = 0; i < ring_space / (2 * BURST_SIZE); i++) {
                        if (rte_ioat_completed_ops(dev_id, BURST_SIZE,
+                                       NULL, NULL,
                                        completions, completions) != BURST_SIZE) {
                                PRINT_ERR("Error with completions\n");
                                return -1;
@@ -341,10 +352,248 @@ test_burst_capacity(int dev_id)
        return 0;
 }
 
+static int
+test_completion_status(int dev_id)
+{
+#define COMP_BURST_SZ  16
+       const unsigned int fail_copy[] = {0, 7, 15};
+       struct rte_mbuf *srcs[COMP_BURST_SZ], *dsts[COMP_BURST_SZ];
+       struct rte_mbuf *completed_src[COMP_BURST_SZ * 2];
+       struct rte_mbuf *completed_dst[COMP_BURST_SZ * 2];
+       unsigned int length = 1024;
+       unsigned int i;
+       uint8_t not_ok = 0;
+
+       /* Test single full batch statuses */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int j;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, &not_ok,
+                               (void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS) {
+                       unsigned int j;
+                       PRINT_ERR("Error, missing expected failed copy, %u\n", fail_copy[i]);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering status for two batches at once */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int batch, j;
+               unsigned int expected_failures = 0;
+
+               for (batch = 0; batch < 2; batch++) {
+                       for (j = 0; j < COMP_BURST_SZ/2; j++) {
+                               srcs[j] = rte_pktmbuf_alloc(pool);
+                               dsts[j] = rte_pktmbuf_alloc(pool);
+
+                               if (j == fail_copy[i])
+                                       expected_failures++;
+                               if (rte_ioat_enqueue_copy(dev_id,
+                                               (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                               dsts[j]->buf_iova + dsts[j]->data_off,
+                                               length,
+                                               (uintptr_t)srcs[j],
+                                               (uintptr_t)dsts[j]) != 1) {
+                                       PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n",
+                                                       j);
+                                       return -1;
+                               }
+                       }
+                       rte_ioat_perform_ops(dev_id);
+               }
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, &not_ok,
+                               (void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (not_ok != expected_failures) {
+                       unsigned int j;
+                       PRINT_ERR("Error, missing expected failed copy, got %u, not %u\n",
+                                       not_ok, expected_failures);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering status for half batch at a time */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int j;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, &not_ok,
+                               (void *)completed_src,
+                               (void *)completed_dst) != (COMP_BURST_SZ / 2)) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (fail_copy[i] < COMP_BURST_SZ / 2 &&
+                               (not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS)) {
+                       PRINT_ERR("Missing expected failure in first half-batch\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, &not_ok,
+                               (void *)&completed_src[COMP_BURST_SZ / 2],
+                               (void *)&completed_dst[COMP_BURST_SZ / 2]) != (COMP_BURST_SZ / 2)) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (fail_copy[i] >= COMP_BURST_SZ / 2 && (not_ok != 1 ||
+                               status[fail_copy[i] - (COMP_BURST_SZ / 2)]
+                                       == RTE_IOAT_OP_SUCCESS)) {
+                       PRINT_ERR("Missing expected failure in second half-batch\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering statuses with fence */
+       for (i = 1; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ * 2] = {0};
+               unsigned int j;
+               uint16_t count;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       /* always fail the first copy */
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == 0 ? (phys_addr_t)NULL :
+                                               (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+                       /* put in a fence which will stop any further transactions
+                        * because we had a previous failure.
+                        */
+                       if (j == fail_copy[i])
+                               rte_ioat_fence(dev_id);
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               count = rte_ioat_completed_ops(dev_id, COMP_BURST_SZ * 2, status, &not_ok,
+                               (void *)completed_src, (void *)completed_dst);
+               if (count != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops, got %u not %u\n",
+                                       count, COMP_BURST_SZ);
+                       for (j = 0; j < count; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               if (not_ok != COMP_BURST_SZ - fail_copy[i]) {
+                       PRINT_ERR("Unexpected failed copy count, got %u, expected %u\n",
+                                       not_ok, COMP_BURST_SZ - fail_copy[i]);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               if (status[0] == RTE_IOAT_OP_SUCCESS || status[0] == RTE_IOAT_OP_SKIPPED) {
+                       PRINT_ERR("Error, op 0 unexpectedly did not fail.\n");
+                       return -1;
+               }
+               for (j = 1; j <= fail_copy[i]; j++) {
+                       if (status[j] != RTE_IOAT_OP_SUCCESS) {
+                               PRINT_ERR("Error, op %u unexpectedly failed\n", j);
+                               return -1;
+                       }
+               }
+               for (j = fail_copy[i] + 1; j < COMP_BURST_SZ; j++) {
+                       if (status[j] != RTE_IOAT_OP_SKIPPED) {
+                               PRINT_ERR("Error, all descriptors after fence should be invalid\n");
+                               return -1;
+                       }
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       return 0;
+}
+
 int
 ioat_rawdev_test(uint16_t dev_id)
 {
 #define IOAT_TEST_RINGSIZE 512
+       const struct rte_idxd_rawdev *idxd =
+                       (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
+       const enum rte_ioat_dev_type ioat_type = idxd->type;
        struct rte_ioat_rawdev_config p = { .ring_size = -1 };
        struct rte_rawdev_info info = { .dev_private = &p };
        struct rte_rawdev_xstats_name *snames = NULL;
@@ -453,6 +702,15 @@ ioat_rawdev_test(uint16_t dev_id)
        if (test_burst_capacity(dev_id) != 0)
                goto err;
 
+       /* only DSA devices report address errors, and we can only use null pointers
+        * to generate those errors when DPDK is in VA mode.
+        */
+       if (rte_eal_iova_mode() == RTE_IOVA_VA && ioat_type == RTE_IDXD_DEV) {
+               printf("Running Completions Status Test\n");
+               if (test_completion_status(dev_id) != 0)
+                       goto err;
+       }
+
        rte_rawdev_stop(dev_id);
        if (rte_rawdev_xstats_reset(dev_id, NULL, 0) != 0) {
                PRINT_ERR("Error resetting xstat values\n");
index 0bd9cfb..862e0eb 100644 (file)
@@ -115,8 +115,17 @@ struct rte_idxd_rawdev {
 
        struct rte_idxd_hw_desc *desc_ring;
        struct rte_idxd_user_hdl *hdl_ring;
+       /* flags to indicate handle validity. Kept separate from ring, to avoid
+        * using 8 bytes per flag. Upper 8 bits holds error code if any.
+        */
+       uint16_t *hdl_ring_flags;
 };
 
+#define RTE_IDXD_HDL_NORMAL     0
+#define RTE_IDXD_HDL_INVALID    (1 << 0) /* no handle stored for this element */
+#define RTE_IDXD_HDL_OP_FAILED  (1 << 1) /* return failure for this one */
+#define RTE_IDXD_HDL_OP_SKIPPED (1 << 2) /* this op was skipped */
+
 static __rte_always_inline uint16_t
 __idxd_burst_capacity(int dev_id)
 {
@@ -135,8 +144,10 @@ __idxd_burst_capacity(int dev_id)
                write_idx += idxd->desc_ring_mask + 1;
        used_space = write_idx - idxd->hdls_read;
 
-       /* Return amount of free space in the descriptor ring */
-       return idxd->desc_ring_mask - used_space;
+       /* Return amount of free space in the descriptor ring
+        * subtract 1 for space for batch descriptor and 1 for possible null desc
+        */
+       return idxd->desc_ring_mask - used_space - 2;
 }
 
 static __rte_always_inline rte_iova_t
@@ -156,23 +167,28 @@ __idxd_write_desc(int dev_id,
        struct rte_idxd_rawdev *idxd =
                        (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
        uint16_t write_idx = idxd->batch_start + idxd->batch_size;
+       uint16_t mask = idxd->desc_ring_mask;
 
        /* first check batch ring space then desc ring space */
        if ((idxd->batch_idx_read == 0 && idxd->batch_idx_write == idxd->max_batches) ||
                        idxd->batch_idx_write + 1 == idxd->batch_idx_read)
                goto failed;
-       if (((write_idx + 1) & idxd->desc_ring_mask) == idxd->hdls_read)
+       /* for descriptor ring, we always need a slot for batch completion */
+       if (((write_idx + 2) & mask) == idxd->hdls_read)
                goto failed;
 
        /* write desc and handle. Note, descriptors don't wrap */
        idxd->desc_ring[write_idx].pasid = 0;
        idxd->desc_ring[write_idx].op_flags = op_flags | IDXD_FLAG_COMPLETION_ADDR_VALID;
-       idxd->desc_ring[write_idx].completion = __desc_idx_to_iova(idxd, write_idx);
+       idxd->desc_ring[write_idx].completion = __desc_idx_to_iova(idxd, write_idx & mask);
        idxd->desc_ring[write_idx].src = src;
        idxd->desc_ring[write_idx].dst = dst;
        idxd->desc_ring[write_idx].size = size;
 
-       idxd->hdl_ring[write_idx & idxd->desc_ring_mask] = *hdl;
+       if (hdl == NULL)
+               idxd->hdl_ring_flags[write_idx & mask] = RTE_IDXD_HDL_INVALID;
+       else
+               idxd->hdl_ring[write_idx & mask] = *hdl;
        idxd->batch_size++;
 
        idxd->xstats.enqueued++;
@@ -214,9 +230,8 @@ __idxd_enqueue_copy(int dev_id, rte_iova_t src, rte_iova_t dst,
 static __rte_always_inline int
 __idxd_fence(int dev_id)
 {
-       static const struct rte_idxd_user_hdl null_hdl;
        /* only op field needs filling - zero src, dst and length */
-       return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, &null_hdl);
+       return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, NULL);
 }
 
 static __rte_always_inline void
@@ -233,42 +248,37 @@ __idxd_perform_ops(int dev_id)
 {
        struct rte_idxd_rawdev *idxd =
                        (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
-       /* write completion to last desc in the batch */
-       uint16_t comp_idx = idxd->batch_start + idxd->batch_size - 1;
-       if (comp_idx > idxd->desc_ring_mask) {
-               comp_idx &= idxd->desc_ring_mask;
-               *((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
-       }
+
+       if (!idxd->cfg.no_prefetch_completions)
+               rte_prefetch1(&idxd->desc_ring[idxd->batch_idx_ring[idxd->batch_idx_read]]);
 
        if (idxd->batch_size == 0)
                return 0;
 
-       _mm_sfence(); /* fence before writing desc to device */
-       if (idxd->batch_size > 1) {
-               struct rte_idxd_hw_desc batch_desc = {
-                               .op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
-                                       IDXD_FLAG_COMPLETION_ADDR_VALID |
-                                       IDXD_FLAG_REQUEST_COMPLETION,
-                               .desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
-                               .completion = __desc_idx_to_iova(idxd, comp_idx),
-                               .size = idxd->batch_size,
-               };
-
-               __idxd_movdir64b(idxd->portal, &batch_desc);
-       } else {
-               /* special case batch size of 1, as not allowed by HW */
-               /* comp_idx == batch_start */
-               struct rte_idxd_hw_desc *desc = &idxd->desc_ring[comp_idx];
-               desc->op_flags |= IDXD_FLAG_COMPLETION_ADDR_VALID |
-                               IDXD_FLAG_REQUEST_COMPLETION;
-               desc->completion = __desc_idx_to_iova(idxd, comp_idx);
-
-               __idxd_movdir64b(idxd->portal, desc);
-       }
+       if (idxd->batch_size == 1)
+               /* use a fence as a null descriptor, so batch_size >= 2 */
+               if (__idxd_fence(dev_id) != 1)
+                       return -1;
+
+       /* write completion beyond last desc in the batch */
+       uint16_t comp_idx = (idxd->batch_start + idxd->batch_size) & idxd->desc_ring_mask;
+       *((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
+       idxd->hdl_ring_flags[comp_idx] = RTE_IDXD_HDL_INVALID;
+
+       const struct rte_idxd_hw_desc batch_desc = {
+                       .op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
+                               IDXD_FLAG_COMPLETION_ADDR_VALID |
+                               IDXD_FLAG_REQUEST_COMPLETION,
+                       .desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
+                       .completion = __desc_idx_to_iova(idxd, comp_idx),
+                       .size = idxd->batch_size,
+       };
 
+       _mm_sfence(); /* fence before writing desc to device */
+       __idxd_movdir64b(idxd->portal, &batch_desc);
        idxd->xstats.started += idxd->batch_size;
 
-       idxd->batch_start += idxd->batch_size;
+       idxd->batch_start += idxd->batch_size + 1;
        idxd->batch_start &= idxd->desc_ring_mask;
        idxd->batch_size = 0;
 
@@ -280,7 +290,7 @@ __idxd_perform_ops(int dev_id)
 }
 
 static __rte_always_inline int
-__idxd_completed_ops(int dev_id, uint8_t max_ops,
+__idxd_completed_ops(int dev_id, uint8_t max_ops, uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
        struct rte_idxd_rawdev *idxd =
@@ -291,8 +301,37 @@ __idxd_completed_ops(int dev_id, uint8_t max_ops,
                uint16_t idx_to_chk = idxd->batch_idx_ring[idxd->batch_idx_read];
                volatile struct rte_idxd_completion *comp_to_chk =
                                (struct rte_idxd_completion *)&idxd->desc_ring[idx_to_chk];
-               if (comp_to_chk->status == 0)
+               uint8_t status = comp_to_chk->status;
+               if (status == 0)
                        break;
+               comp_to_chk->status = 0;
+               if (unlikely(status > 1)) {
+                       /* error occurred somewhere in batch, start where last checked */
+                       uint16_t desc_count = comp_to_chk->completed_size;
+                       uint16_t batch_start = idxd->hdls_avail;
+                       uint16_t batch_end = idx_to_chk;
+
+                       if (batch_start > batch_end)
+                               batch_end += idxd->desc_ring_mask + 1;
+                       /* go through each batch entry and see status */
+                       for (n = 0; n < desc_count; n++) {
+                               uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+                               volatile struct rte_idxd_completion *comp =
+                                       (struct rte_idxd_completion *)&idxd->desc_ring[idx];
+                               if (comp->status != 0 &&
+                                               idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL) {
+                                       idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_FAILED;
+                                       idxd->hdl_ring_flags[idx] |= (comp->status << 8);
+                                       comp->status = 0; /* clear error for next time */
+                               }
+                       }
+                       /* if batch is incomplete, mark rest as skipped */
+                       for ( ; n < batch_end - batch_start; n++) {
+                               uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+                               if (idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL)
+                                       idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_SKIPPED;
+                       }
+               }
                /* avail points to one after the last one written */
                idxd->hdls_avail = (idx_to_chk + 1) & idxd->desc_ring_mask;
                idxd->batch_idx_read++;
@@ -300,7 +339,7 @@ __idxd_completed_ops(int dev_id, uint8_t max_ops,
                        idxd->batch_idx_read = 0;
        }
 
-       if (idxd->cfg.hdls_disable) {
+       if (idxd->cfg.hdls_disable && status == NULL) {
                n = (idxd->hdls_avail < idxd->hdls_read) ?
                                (idxd->hdls_avail + idxd->desc_ring_mask + 1 - idxd->hdls_read) :
                                (idxd->hdls_avail - idxd->hdls_read);
@@ -308,10 +347,36 @@ __idxd_completed_ops(int dev_id, uint8_t max_ops,
                goto out;
        }
 
-       for (n = 0, h_idx = idxd->hdls_read;
-                       n < max_ops && h_idx != idxd->hdls_avail; n++) {
-               src_hdls[n] = idxd->hdl_ring[h_idx].src;
-               dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+       n = 0;
+       h_idx = idxd->hdls_read;
+       while (h_idx != idxd->hdls_avail) {
+               uint16_t flag = idxd->hdl_ring_flags[h_idx];
+               if (flag != RTE_IDXD_HDL_INVALID) {
+                       if (!idxd->cfg.hdls_disable) {
+                               src_hdls[n] = idxd->hdl_ring[h_idx].src;
+                               dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+                       }
+                       if (unlikely(flag != RTE_IDXD_HDL_NORMAL)) {
+                               if (status != NULL)
+                                       status[n] = flag == RTE_IDXD_HDL_OP_SKIPPED ?
+                                                       RTE_IOAT_OP_SKIPPED :
+                                                       /* failure case, return err code */
+                                                       idxd->hdl_ring_flags[h_idx] >> 8;
+                               if (num_unsuccessful != NULL)
+                                       *num_unsuccessful += 1;
+                       }
+                       n++;
+               }
+               idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
+               if (++h_idx > idxd->desc_ring_mask)
+                       h_idx = 0;
+               if (n >= max_ops)
+                       break;
+       }
+
+       /* skip over any remaining blank elements, e.g. batch completion */
+       while (idxd->hdl_ring_flags[h_idx] == RTE_IDXD_HDL_INVALID && h_idx != idxd->hdls_avail) {
+               idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
                if (++h_idx > idxd->desc_ring_mask)
                        h_idx = 0;
        }
index e5a22a0..6cc1560 100644 (file)
@@ -35,6 +35,10 @@ extern "C" {
 struct rte_ioat_rawdev_config {
        unsigned short ring_size; /**< size of job submission descriptor ring */
        bool hdls_disable;    /**< if set, ignore user-supplied handle params */
+       /** set "no_prefetch_completions", if polling completions on separate core
+        * from the core submitting the jobs
+        */
+       bool no_prefetch_completions;
 };
 
 /**
@@ -131,40 +135,73 @@ static inline int
 __rte_experimental
 rte_ioat_perform_ops(int dev_id);
 
+/*
+ *  Status codes for operations.
+ */
+#define RTE_IOAT_OP_SUCCESS 0  /**< Operation completed successfully */
+#define RTE_IOAT_OP_SKIPPED 1  /**< Operation was not attempted (Earlier fenced op failed) */
+/* Values >1 indicate a failure condition */
+/* Error codes taken from Intel(R) Data Streaming Accelerator Architecture
+ * Specification, section 5.7
+ */
+#define RTE_IOAT_OP_ADDRESS_ERR 0x03  /**< Page fault or invalid address */
+#define RTE_IOAT_OP_INVALID_LEN 0x13  /**< Invalid/too big length field passed */
+#define RTE_IOAT_OP_OVERLAPPING_BUFS 0x16 /**< Overlapping buffers error */
+
+
 /**
  * Returns details of operations that have been completed
  *
+ * The status of each operation is returned in the status array parameter.
  * If the hdls_disable option was not set when the device was configured,
  * the function will return to the caller the user-provided "handles" for
  * the copy operations which have been completed by the hardware, and not
  * already returned by a previous call to this API.
  * If the hdls_disable option for the device was set on configure, the
- * max_copies, src_hdls and dst_hdls parameters will be ignored, and the
+ * src_hdls and dst_hdls parameters will be ignored, and the
  * function returns the number of newly-completed operations.
+ * If status is also NULL, then max_copies parameter is also ignored and the
+ * function returns a count of the number of newly-completed operations.
  *
  * @param dev_id
  *   The rawdev device id of the ioat instance
  * @param max_copies
- *   The number of entries which can fit in the src_hdls and dst_hdls
+ *   The number of entries which can fit in the status, src_hdls and dst_hdls
  *   arrays, i.e. max number of completed operations to report.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter applies only to the "status" array if specified
+ * @param status
+ *   Array to hold the status of each completed operation. Array should be
+ *   set to zeros on input, as the driver will only write error status values.
+ *   A value of 1 implies an operation was not attempted, and any other non-zero
+ *   value indicates operation failure.
+ *   Parameter may be NULL if no status value checking is required.
+ * @param num_unsuccessful
+ *   Returns the number of elements in status where the value is non-zero,
+ *   i.e. the operation either failed or was not attempted due to an earlier
+ *   failure. If this value is returned as zero (the expected case), the
+ *   status array will not have been modified by the function and need not be
+ *   checked by software
  * @param src_hdls
  *   Array to hold the source handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @param dst_hdls
  *   Array to hold the destination handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @return
- *   -1 on error, with rte_errno set appropriately.
- *   Otherwise number of completed operations i.e. number of entries written
- *   to the src_hdls and dst_hdls array parameters.
+ *   -1 on device error, with rte_errno set appropriately and parameters
+ *   unmodified.
+ *   Otherwise number of returned operations i.e. number of valid entries
+ *   in the status, src_hdls and dst_hdls array parameters. If status is NULL,
+ *   and the hdls_disable config option is set, this value may be greater than
+ *   max_copies parameter.
  */
 static inline int
 __rte_experimental
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+               uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls);
 
 /* include the implementation details from a separate file */
index 1eff75e..6049e3b 100644 (file)
@@ -345,16 +345,22 @@ rte_ioat_perform_ops(int dev_id)
 
 static inline int
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+               uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
        enum rte_ioat_dev_type *type =
                        (enum rte_ioat_dev_type *)rte_rawdevs[dev_id].dev_private;
+       uint8_t tmp; /* used so functions don't need to check for null parameter */
+
+       if (num_unsuccessful == NULL)
+               num_unsuccessful = &tmp;
+
+       *num_unsuccessful = 0;
        if (*type == RTE_IDXD_DEV)
-               return __idxd_completed_ops(dev_id, max_copies,
+               return __idxd_completed_ops(dev_id, max_copies, status, num_unsuccessful,
                                src_hdls, dst_hdls);
        else
-               return __ioat_completed_ops(dev_id,  max_copies,
-                               src_hdls, dst_hdls);
+               return __ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
 }
 
 static inline void
@@ -366,7 +372,8 @@ __rte_deprecated_msg("use rte_ioat_completed_ops() instead")
 rte_ioat_completed_copies(int dev_id, uint8_t max_copies,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
-       return rte_ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
+       return rte_ioat_completed_ops(dev_id, max_copies, NULL, NULL,
+                       src_hdls, dst_hdls);
 }
 
 #endif /* _RTE_IOAT_RAWDEV_FNS_H_ */
index 845301a..2e377e2 100644 (file)
@@ -447,12 +447,15 @@ ioat_tx_port(struct rxtx_port_config *tx_config)
 
        for (i = 0; i < tx_config->nb_queues; i++) {
                if (copy_mode == COPY_MODE_IOAT_NUM) {
-                       /* Deque the mbufs from IOAT device. */
+                       /* Dequeue the mbufs from IOAT device. Since all memory
+                        * is DPDK pinned memory and therefore all addresses should
+                        * be valid, we don't check for copy errors
+                        */
                        nb_dq = rte_ioat_completed_ops(
-                               tx_config->ioat_ids[i], MAX_PKT_BURST,
+                               tx_config->ioat_ids[i], MAX_PKT_BURST, NULL, NULL,
                                (void *)mbufs_src, (void *)mbufs_dst);
                } else {
-                       /* Deque the mbufs from rx_to_tx_ring. */
+                       /* Dequeue the mbufs from rx_to_tx_ring. */
                        nb_dq = rte_ring_dequeue_burst(
                                tx_config->rx_to_tx_ring, (void *)mbufs_dst,
                                MAX_PKT_BURST, NULL);
@@ -725,7 +728,10 @@ check_link_status(uint32_t port_mask)
 static void
 configure_rawdev_queue(uint32_t dev_id)
 {
-       struct rte_ioat_rawdev_config dev_config = { .ring_size = ring_size };
+       struct rte_ioat_rawdev_config dev_config = {
+                       .ring_size = ring_size,
+                       .no_prefetch_completions = (cfg.nb_lcores > 1),
+       };
        struct rte_rawdev_info info = { .dev_private = &dev_config };
 
        if (rte_rawdev_configure(dev_id, &info, sizeof(dev_config)) != 0) {
index 60b73be..efdd3f6 100644 (file)
@@ -183,7 +183,7 @@ ioat_check_completed_copies_cb(int vid, uint16_t queue_id,
 
                uint16_t dev_id = dma_bind[vid].dmas[queue_id * 2
                                + VIRTIO_RXQ].dev_id;
-               n_seg = rte_ioat_completed_ops(dev_id, 255, dump, dump);
+               n_seg = rte_ioat_completed_ops(dev_id, 255, NULL, NULL, dump, dump);
                if (n_seg < 0) {
                        RTE_LOG(ERR,
                                VHOST_DATA,