it's not supported on the current platform. Instead ``rte_stack_create()``
   fails and ``rte_errno`` is set to ``ENOTSUP``.
 
+* raw/ioat: The experimental function ``rte_ioat_completed_ops()`` now
+  supports two additional parameters, ``status`` and ``num_unsuccessful``,
+  to allow the reporting of errors from hardware when performing copy
+  operations.
+
 
 ABI Changes
 -----------
 
                rte_idxd->desc_ring = NULL;
                return -ENOMEM;
        }
+       rte_idxd->hdl_ring_flags = rte_zmalloc(NULL,
+                       sizeof(*rte_idxd->hdl_ring_flags) * max_desc, 0);
+       if (rte_idxd->hdl_ring_flags == NULL) {
+               rte_free(rte_idxd->desc_ring);
+               rte_free(rte_idxd->hdl_ring);
+               rte_idxd->desc_ring = NULL;
+               rte_idxd->hdl_ring = NULL;
+               return -ENOMEM;
+       }
        rte_idxd->hdls_read = rte_idxd->batch_start = 0;
        rte_idxd->batch_size = 0;
 
 
        if (split_completions) {
                /* gather completions in two halves */
                uint16_t half_len = RTE_DIM(srcs) / 2;
-               if (rte_ioat_completed_ops(dev_id, half_len, (void *)completed_src,
+               if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+                               (void *)completed_src,
                                (void *)completed_dst) != half_len) {
                        PRINT_ERR("Error with rte_ioat_completed_ops - first half request\n");
                        rte_rawdev_dump(dev_id, stdout);
                        return -1;
                }
-               if (rte_ioat_completed_ops(dev_id, half_len, (void *)&completed_src[half_len],
+               if (rte_ioat_completed_ops(dev_id, half_len, NULL, NULL,
+                               (void *)&completed_src[half_len],
                                (void *)&completed_dst[half_len]) != half_len) {
                        PRINT_ERR("Error with rte_ioat_completed_ops - second half request\n");
                        rte_rawdev_dump(dev_id, stdout);
                }
        } else {
                /* gather all completions in one go */
-               if (rte_ioat_completed_ops(dev_id, 64, (void *)completed_src,
+               if (rte_ioat_completed_ops(dev_id, RTE_DIM(completed_src), NULL, NULL,
+                               (void *)completed_src,
                                (void *)completed_dst) != RTE_DIM(srcs)) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        rte_rawdev_dump(dev_id, stdout);
                rte_ioat_perform_ops(dev_id);
                usleep(10);
 
-               if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
                                (void *)&completed[1]) != 1) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        return -1;
                        }
                rte_pktmbuf_free(src);
                rte_pktmbuf_free(dst);
+
+               /* check ring is now empty */
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
+                               (void *)&completed[1]) != 0) {
+                       PRINT_ERR("Error: got unexpected returned handles from rte_ioat_completed_ops\n");
+                       return -1;
+               }
        } while (0);
 
        /* test doing a multiple single copies */
                }
                usleep(10);
 
-               if (rte_ioat_completed_ops(dev_id, max_completions, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, max_completions, NULL, NULL,
+                               (void *)&completed[0],
                                (void *)&completed[max_completions]) != max_ops) {
                        PRINT_ERR("Error with rte_ioat_completed_ops\n");
                        rte_rawdev_dump(dev_id, stdout);
                rte_ioat_perform_ops(dev_id);
                usleep(100);
 
-               if (rte_ioat_completed_ops(dev_id, 1, (void *)&completed[0],
+               if (rte_ioat_completed_ops(dev_id, 1, NULL, NULL, (void *)&completed[0],
                        (void *)&completed[1]) != 1) {
                        PRINT_ERR("Error with completed ops\n");
                        return -1;
                        char pat_byte = ((char *)&pattern)[j % 8];
                        if (dst_data[j] != pat_byte) {
                                PRINT_ERR("Error with fill operation (lengths = %u): got (%x), not (%x)\n",
-                                               lengths[i], dst_data[j],
-                                               pat_byte);
+                                               lengths[i], dst_data[j], pat_byte);
                                return -1;
                        }
                }
                usleep(100);
                for (i = 0; i < ring_space / (2 * BURST_SIZE); i++) {
                        if (rte_ioat_completed_ops(dev_id, BURST_SIZE,
+                                       NULL, NULL,
                                        completions, completions) != BURST_SIZE) {
                                PRINT_ERR("Error with completions\n");
                                return -1;
        return 0;
 }
 
+static int
+test_completion_status(int dev_id)
+{
+#define COMP_BURST_SZ  16
+       const unsigned int fail_copy[] = {0, 7, 15};
+       struct rte_mbuf *srcs[COMP_BURST_SZ], *dsts[COMP_BURST_SZ];
+       struct rte_mbuf *completed_src[COMP_BURST_SZ * 2];
+       struct rte_mbuf *completed_dst[COMP_BURST_SZ * 2];
+       unsigned int length = 1024;
+       unsigned int i;
+       uint8_t not_ok = 0;
+
+       /* Test single full batch statuses */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int j;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, ¬_ok,
+                               (void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS) {
+                       unsigned int j;
+                       PRINT_ERR("Error, missing expected failed copy, %u\n", fail_copy[i]);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering status for two batches at once */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int batch, j;
+               unsigned int expected_failures = 0;
+
+               for (batch = 0; batch < 2; batch++) {
+                       for (j = 0; j < COMP_BURST_SZ/2; j++) {
+                               srcs[j] = rte_pktmbuf_alloc(pool);
+                               dsts[j] = rte_pktmbuf_alloc(pool);
+
+                               if (j == fail_copy[i])
+                                       expected_failures++;
+                               if (rte_ioat_enqueue_copy(dev_id,
+                                               (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                               dsts[j]->buf_iova + dsts[j]->data_off,
+                                               length,
+                                               (uintptr_t)srcs[j],
+                                               (uintptr_t)dsts[j]) != 1) {
+                                       PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n",
+                                                       j);
+                                       return -1;
+                               }
+                       }
+                       rte_ioat_perform_ops(dev_id);
+               }
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ, status, ¬_ok,
+                               (void *)completed_src, (void *)completed_dst) != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (not_ok != expected_failures) {
+                       unsigned int j;
+                       PRINT_ERR("Error, missing expected failed copy, got %u, not %u\n",
+                                       not_ok, expected_failures);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering status for half batch at a time */
+       for (i = 0; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ] = {0};
+               unsigned int j;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == fail_copy[i] ? (phys_addr_t)NULL :
+                                                       (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, ¬_ok,
+                               (void *)completed_src,
+                               (void *)completed_dst) != (COMP_BURST_SZ / 2)) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (fail_copy[i] < COMP_BURST_SZ / 2 &&
+                               (not_ok != 1 || status[fail_copy[i]] == RTE_IOAT_OP_SUCCESS)) {
+                       PRINT_ERR("Missing expected failure in first half-batch\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (rte_ioat_completed_ops(dev_id, COMP_BURST_SZ / 2, status, ¬_ok,
+                               (void *)&completed_src[COMP_BURST_SZ / 2],
+                               (void *)&completed_dst[COMP_BURST_SZ / 2]) != (COMP_BURST_SZ / 2)) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+               if (fail_copy[i] >= COMP_BURST_SZ / 2 && (not_ok != 1 ||
+                               status[fail_copy[i] - (COMP_BURST_SZ / 2)]
+                                       == RTE_IOAT_OP_SUCCESS)) {
+                       PRINT_ERR("Missing expected failure in second half-batch\n");
+                       rte_rawdev_dump(dev_id, stdout);
+                       return -1;
+               }
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       /* Test gathering statuses with fence */
+       for (i = 1; i < RTE_DIM(fail_copy); i++) {
+               uint32_t status[COMP_BURST_SZ * 2] = {0};
+               unsigned int j;
+               uint16_t count;
+
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       srcs[j] = rte_pktmbuf_alloc(pool);
+                       dsts[j] = rte_pktmbuf_alloc(pool);
+
+                       /* always fail the first copy */
+                       if (rte_ioat_enqueue_copy(dev_id,
+                                       (j == 0 ? (phys_addr_t)NULL :
+                                               (srcs[j]->buf_iova + srcs[j]->data_off)),
+                                       dsts[j]->buf_iova + dsts[j]->data_off,
+                                       length,
+                                       (uintptr_t)srcs[j],
+                                       (uintptr_t)dsts[j]) != 1) {
+                               PRINT_ERR("Error with rte_ioat_enqueue_copy for buffer %u\n", j);
+                               return -1;
+                       }
+                       /* put in a fence which will stop any further transactions
+                        * because we had a previous failure.
+                        */
+                       if (j == fail_copy[i])
+                               rte_ioat_fence(dev_id);
+               }
+               rte_ioat_perform_ops(dev_id);
+               usleep(100);
+
+               count = rte_ioat_completed_ops(dev_id, COMP_BURST_SZ * 2, status, ¬_ok,
+                               (void *)completed_src, (void *)completed_dst);
+               if (count != COMP_BURST_SZ) {
+                       PRINT_ERR("Error with rte_ioat_completed_ops, got %u not %u\n",
+                                       count, COMP_BURST_SZ);
+                       for (j = 0; j < count; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               if (not_ok != COMP_BURST_SZ - fail_copy[i]) {
+                       PRINT_ERR("Unexpected failed copy count, got %u, expected %u\n",
+                                       not_ok, COMP_BURST_SZ - fail_copy[i]);
+                       for (j = 0; j < COMP_BURST_SZ; j++)
+                               printf("%u ", status[j]);
+                       printf("<-- Statuses\n");
+                       return -1;
+               }
+               if (status[0] == RTE_IOAT_OP_SUCCESS || status[0] == RTE_IOAT_OP_SKIPPED) {
+                       PRINT_ERR("Error, op 0 unexpectedly did not fail.\n");
+                       return -1;
+               }
+               for (j = 1; j <= fail_copy[i]; j++) {
+                       if (status[j] != RTE_IOAT_OP_SUCCESS) {
+                               PRINT_ERR("Error, op %u unexpectedly failed\n", j);
+                               return -1;
+                       }
+               }
+               for (j = fail_copy[i] + 1; j < COMP_BURST_SZ; j++) {
+                       if (status[j] != RTE_IOAT_OP_SKIPPED) {
+                               PRINT_ERR("Error, all descriptors after fence should be invalid\n");
+                               return -1;
+                       }
+               }
+               for (j = 0; j < COMP_BURST_SZ; j++) {
+                       rte_pktmbuf_free(completed_src[j]);
+                       rte_pktmbuf_free(completed_dst[j]);
+               }
+       }
+
+       return 0;
+}
+
 int
 ioat_rawdev_test(uint16_t dev_id)
 {
 #define IOAT_TEST_RINGSIZE 512
+       const struct rte_idxd_rawdev *idxd =
+                       (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
+       const enum rte_ioat_dev_type ioat_type = idxd->type;
        struct rte_ioat_rawdev_config p = { .ring_size = -1 };
        struct rte_rawdev_info info = { .dev_private = &p };
        struct rte_rawdev_xstats_name *snames = NULL;
        if (test_burst_capacity(dev_id) != 0)
                goto err;
 
+       /* only DSA devices report address errors, and we can only use null pointers
+        * to generate those errors when DPDK is in VA mode.
+        */
+       if (rte_eal_iova_mode() == RTE_IOVA_VA && ioat_type == RTE_IDXD_DEV) {
+               printf("Running Completions Status Test\n");
+               if (test_completion_status(dev_id) != 0)
+                       goto err;
+       }
+
        rte_rawdev_stop(dev_id);
        if (rte_rawdev_xstats_reset(dev_id, NULL, 0) != 0) {
                PRINT_ERR("Error resetting xstat values\n");
 
 
        struct rte_idxd_hw_desc *desc_ring;
        struct rte_idxd_user_hdl *hdl_ring;
+       /* flags to indicate handle validity. Kept separate from ring, to avoid
+        * using 8 bytes per flag. Upper 8 bits holds error code if any.
+        */
+       uint16_t *hdl_ring_flags;
 };
 
+#define RTE_IDXD_HDL_NORMAL     0
+#define RTE_IDXD_HDL_INVALID    (1 << 0) /* no handle stored for this element */
+#define RTE_IDXD_HDL_OP_FAILED  (1 << 1) /* return failure for this one */
+#define RTE_IDXD_HDL_OP_SKIPPED (1 << 2) /* this op was skipped */
+
 static __rte_always_inline uint16_t
 __idxd_burst_capacity(int dev_id)
 {
                write_idx += idxd->desc_ring_mask + 1;
        used_space = write_idx - idxd->hdls_read;
 
-       /* Return amount of free space in the descriptor ring */
-       return idxd->desc_ring_mask - used_space;
+       /* Return amount of free space in the descriptor ring
+        * subtract 1 for space for batch descriptor and 1 for possible null desc
+        */
+       return idxd->desc_ring_mask - used_space - 2;
 }
 
 static __rte_always_inline rte_iova_t
        struct rte_idxd_rawdev *idxd =
                        (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
        uint16_t write_idx = idxd->batch_start + idxd->batch_size;
+       uint16_t mask = idxd->desc_ring_mask;
 
        /* first check batch ring space then desc ring space */
        if ((idxd->batch_idx_read == 0 && idxd->batch_idx_write == idxd->max_batches) ||
                        idxd->batch_idx_write + 1 == idxd->batch_idx_read)
                goto failed;
-       if (((write_idx + 1) & idxd->desc_ring_mask) == idxd->hdls_read)
+       /* for descriptor ring, we always need a slot for batch completion */
+       if (((write_idx + 2) & mask) == idxd->hdls_read)
                goto failed;
 
        /* write desc and handle. Note, descriptors don't wrap */
        idxd->desc_ring[write_idx].pasid = 0;
        idxd->desc_ring[write_idx].op_flags = op_flags | IDXD_FLAG_COMPLETION_ADDR_VALID;
-       idxd->desc_ring[write_idx].completion = __desc_idx_to_iova(idxd, write_idx);
+       idxd->desc_ring[write_idx].completion = __desc_idx_to_iova(idxd, write_idx & mask);
        idxd->desc_ring[write_idx].src = src;
        idxd->desc_ring[write_idx].dst = dst;
        idxd->desc_ring[write_idx].size = size;
 
-       idxd->hdl_ring[write_idx & idxd->desc_ring_mask] = *hdl;
+       if (hdl == NULL)
+               idxd->hdl_ring_flags[write_idx & mask] = RTE_IDXD_HDL_INVALID;
+       else
+               idxd->hdl_ring[write_idx & mask] = *hdl;
        idxd->batch_size++;
 
        idxd->xstats.enqueued++;
 static __rte_always_inline int
 __idxd_fence(int dev_id)
 {
-       static const struct rte_idxd_user_hdl null_hdl;
        /* only op field needs filling - zero src, dst and length */
-       return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, &null_hdl);
+       return __idxd_write_desc(dev_id, IDXD_FLAG_FENCE, 0, 0, 0, NULL);
 }
 
 static __rte_always_inline void
 {
        struct rte_idxd_rawdev *idxd =
                        (struct rte_idxd_rawdev *)rte_rawdevs[dev_id].dev_private;
-       /* write completion to last desc in the batch */
-       uint16_t comp_idx = idxd->batch_start + idxd->batch_size - 1;
-       if (comp_idx > idxd->desc_ring_mask) {
-               comp_idx &= idxd->desc_ring_mask;
-               *((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
-       }
+
+       if (!idxd->cfg.no_prefetch_completions)
+               rte_prefetch1(&idxd->desc_ring[idxd->batch_idx_ring[idxd->batch_idx_read]]);
 
        if (idxd->batch_size == 0)
                return 0;
 
-       _mm_sfence(); /* fence before writing desc to device */
-       if (idxd->batch_size > 1) {
-               struct rte_idxd_hw_desc batch_desc = {
-                               .op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
-                                       IDXD_FLAG_COMPLETION_ADDR_VALID |
-                                       IDXD_FLAG_REQUEST_COMPLETION,
-                               .desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
-                               .completion = __desc_idx_to_iova(idxd, comp_idx),
-                               .size = idxd->batch_size,
-               };
-
-               __idxd_movdir64b(idxd->portal, &batch_desc);
-       } else {
-               /* special case batch size of 1, as not allowed by HW */
-               /* comp_idx == batch_start */
-               struct rte_idxd_hw_desc *desc = &idxd->desc_ring[comp_idx];
-               desc->op_flags |= IDXD_FLAG_COMPLETION_ADDR_VALID |
-                               IDXD_FLAG_REQUEST_COMPLETION;
-               desc->completion = __desc_idx_to_iova(idxd, comp_idx);
-
-               __idxd_movdir64b(idxd->portal, desc);
-       }
+       if (idxd->batch_size == 1)
+               /* use a fence as a null descriptor, so batch_size >= 2 */
+               if (__idxd_fence(dev_id) != 1)
+                       return -1;
+
+       /* write completion beyond last desc in the batch */
+       uint16_t comp_idx = (idxd->batch_start + idxd->batch_size) & idxd->desc_ring_mask;
+       *((uint64_t *)&idxd->desc_ring[comp_idx]) = 0; /* zero start of desc */
+       idxd->hdl_ring_flags[comp_idx] = RTE_IDXD_HDL_INVALID;
+
+       const struct rte_idxd_hw_desc batch_desc = {
+                       .op_flags = (idxd_op_batch << IDXD_CMD_OP_SHIFT) |
+                               IDXD_FLAG_COMPLETION_ADDR_VALID |
+                               IDXD_FLAG_REQUEST_COMPLETION,
+                       .desc_addr = __desc_idx_to_iova(idxd, idxd->batch_start),
+                       .completion = __desc_idx_to_iova(idxd, comp_idx),
+                       .size = idxd->batch_size,
+       };
 
+       _mm_sfence(); /* fence before writing desc to device */
+       __idxd_movdir64b(idxd->portal, &batch_desc);
        idxd->xstats.started += idxd->batch_size;
 
-       idxd->batch_start += idxd->batch_size;
+       idxd->batch_start += idxd->batch_size + 1;
        idxd->batch_start &= idxd->desc_ring_mask;
        idxd->batch_size = 0;
 
 }
 
 static __rte_always_inline int
-__idxd_completed_ops(int dev_id, uint8_t max_ops,
+__idxd_completed_ops(int dev_id, uint8_t max_ops, uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
        struct rte_idxd_rawdev *idxd =
                uint16_t idx_to_chk = idxd->batch_idx_ring[idxd->batch_idx_read];
                volatile struct rte_idxd_completion *comp_to_chk =
                                (struct rte_idxd_completion *)&idxd->desc_ring[idx_to_chk];
-               if (comp_to_chk->status == 0)
+               uint8_t status = comp_to_chk->status;
+               if (status == 0)
                        break;
+               comp_to_chk->status = 0;
+               if (unlikely(status > 1)) {
+                       /* error occurred somewhere in batch, start where last checked */
+                       uint16_t desc_count = comp_to_chk->completed_size;
+                       uint16_t batch_start = idxd->hdls_avail;
+                       uint16_t batch_end = idx_to_chk;
+
+                       if (batch_start > batch_end)
+                               batch_end += idxd->desc_ring_mask + 1;
+                       /* go through each batch entry and see status */
+                       for (n = 0; n < desc_count; n++) {
+                               uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+                               volatile struct rte_idxd_completion *comp =
+                                       (struct rte_idxd_completion *)&idxd->desc_ring[idx];
+                               if (comp->status != 0 &&
+                                               idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL) {
+                                       idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_FAILED;
+                                       idxd->hdl_ring_flags[idx] |= (comp->status << 8);
+                                       comp->status = 0; /* clear error for next time */
+                               }
+                       }
+                       /* if batch is incomplete, mark rest as skipped */
+                       for ( ; n < batch_end - batch_start; n++) {
+                               uint16_t idx = (batch_start + n) & idxd->desc_ring_mask;
+                               if (idxd->hdl_ring_flags[idx] == RTE_IDXD_HDL_NORMAL)
+                                       idxd->hdl_ring_flags[idx] = RTE_IDXD_HDL_OP_SKIPPED;
+                       }
+               }
                /* avail points to one after the last one written */
                idxd->hdls_avail = (idx_to_chk + 1) & idxd->desc_ring_mask;
                idxd->batch_idx_read++;
                        idxd->batch_idx_read = 0;
        }
 
-       if (idxd->cfg.hdls_disable) {
+       if (idxd->cfg.hdls_disable && status == NULL) {
                n = (idxd->hdls_avail < idxd->hdls_read) ?
                                (idxd->hdls_avail + idxd->desc_ring_mask + 1 - idxd->hdls_read) :
                                (idxd->hdls_avail - idxd->hdls_read);
                goto out;
        }
 
-       for (n = 0, h_idx = idxd->hdls_read;
-                       n < max_ops && h_idx != idxd->hdls_avail; n++) {
-               src_hdls[n] = idxd->hdl_ring[h_idx].src;
-               dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+       n = 0;
+       h_idx = idxd->hdls_read;
+       while (h_idx != idxd->hdls_avail) {
+               uint16_t flag = idxd->hdl_ring_flags[h_idx];
+               if (flag != RTE_IDXD_HDL_INVALID) {
+                       if (!idxd->cfg.hdls_disable) {
+                               src_hdls[n] = idxd->hdl_ring[h_idx].src;
+                               dst_hdls[n] = idxd->hdl_ring[h_idx].dst;
+                       }
+                       if (unlikely(flag != RTE_IDXD_HDL_NORMAL)) {
+                               if (status != NULL)
+                                       status[n] = flag == RTE_IDXD_HDL_OP_SKIPPED ?
+                                                       RTE_IOAT_OP_SKIPPED :
+                                                       /* failure case, return err code */
+                                                       idxd->hdl_ring_flags[h_idx] >> 8;
+                               if (num_unsuccessful != NULL)
+                                       *num_unsuccessful += 1;
+                       }
+                       n++;
+               }
+               idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
+               if (++h_idx > idxd->desc_ring_mask)
+                       h_idx = 0;
+               if (n >= max_ops)
+                       break;
+       }
+
+       /* skip over any remaining blank elements, e.g. batch completion */
+       while (idxd->hdl_ring_flags[h_idx] == RTE_IDXD_HDL_INVALID && h_idx != idxd->hdls_avail) {
+               idxd->hdl_ring_flags[h_idx] = RTE_IDXD_HDL_NORMAL;
                if (++h_idx > idxd->desc_ring_mask)
                        h_idx = 0;
        }
 
 struct rte_ioat_rawdev_config {
        unsigned short ring_size; /**< size of job submission descriptor ring */
        bool hdls_disable;    /**< if set, ignore user-supplied handle params */
+       /** set "no_prefetch_completions", if polling completions on separate core
+        * from the core submitting the jobs
+        */
+       bool no_prefetch_completions;
 };
 
 /**
 __rte_experimental
 rte_ioat_perform_ops(int dev_id);
 
+/*
+ *  Status codes for operations.
+ */
+#define RTE_IOAT_OP_SUCCESS 0  /**< Operation completed successfully */
+#define RTE_IOAT_OP_SKIPPED 1  /**< Operation was not attempted (Earlier fenced op failed) */
+/* Values >1 indicate a failure condition */
+/* Error codes taken from Intel(R) Data Streaming Accelerator Architecture
+ * Specification, section 5.7
+ */
+#define RTE_IOAT_OP_ADDRESS_ERR 0x03  /**< Page fault or invalid address */
+#define RTE_IOAT_OP_INVALID_LEN 0x13  /**< Invalid/too big length field passed */
+#define RTE_IOAT_OP_OVERLAPPING_BUFS 0x16 /**< Overlapping buffers error */
+
+
 /**
  * Returns details of operations that have been completed
  *
+ * The status of each operation is returned in the status array parameter.
  * If the hdls_disable option was not set when the device was configured,
  * the function will return to the caller the user-provided "handles" for
  * the copy operations which have been completed by the hardware, and not
  * already returned by a previous call to this API.
  * If the hdls_disable option for the device was set on configure, the
- * max_copies, src_hdls and dst_hdls parameters will be ignored, and the
+ * src_hdls and dst_hdls parameters will be ignored, and the
  * function returns the number of newly-completed operations.
+ * If status is also NULL, then max_copies parameter is also ignored and the
+ * function returns a count of the number of newly-completed operations.
  *
  * @param dev_id
  *   The rawdev device id of the ioat instance
  * @param max_copies
- *   The number of entries which can fit in the src_hdls and dst_hdls
+ *   The number of entries which can fit in the status, src_hdls and dst_hdls
  *   arrays, i.e. max number of completed operations to report.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter applies only to the "status" array if specified
+ * @param status
+ *   Array to hold the status of each completed operation. Array should be
+ *   set to zeros on input, as the driver will only write error status values.
+ *   A value of 1 implies an operation was not attempted, and any other non-zero
+ *   value indicates operation failure.
+ *   Parameter may be NULL if no status value checking is required.
+ * @param num_unsuccessful
+ *   Returns the number of elements in status where the value is non-zero,
+ *   i.e. the operation either failed or was not attempted due to an earlier
+ *   failure. If this value is returned as zero (the expected case), the
+ *   status array will not have been modified by the function and need not be
+ *   checked by software
  * @param src_hdls
  *   Array to hold the source handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @param dst_hdls
  *   Array to hold the destination handle parameters of the completed ops.
  *   NOTE: If hdls_disable configuration option for the device is set, this
- *   parameter is ignored.
+ *   parameter is ignored, and may be NULL
  * @return
- *   -1 on error, with rte_errno set appropriately.
- *   Otherwise number of completed operations i.e. number of entries written
- *   to the src_hdls and dst_hdls array parameters.
+ *   -1 on device error, with rte_errno set appropriately and parameters
+ *   unmodified.
+ *   Otherwise number of returned operations i.e. number of valid entries
+ *   in the status, src_hdls and dst_hdls array parameters. If status is NULL,
+ *   and the hdls_disable config option is set, this value may be greater than
+ *   max_copies parameter.
  */
 static inline int
 __rte_experimental
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+               uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls);
 
 /* include the implementation details from a separate file */
 
 
 static inline int
 rte_ioat_completed_ops(int dev_id, uint8_t max_copies,
+               uint32_t *status, uint8_t *num_unsuccessful,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
        enum rte_ioat_dev_type *type =
                        (enum rte_ioat_dev_type *)rte_rawdevs[dev_id].dev_private;
+       uint8_t tmp; /* used so functions don't need to check for null parameter */
+
+       if (num_unsuccessful == NULL)
+               num_unsuccessful = &tmp;
+
+       *num_unsuccessful = 0;
        if (*type == RTE_IDXD_DEV)
-               return __idxd_completed_ops(dev_id, max_copies,
+               return __idxd_completed_ops(dev_id, max_copies, status, num_unsuccessful,
                                src_hdls, dst_hdls);
        else
-               return __ioat_completed_ops(dev_id,  max_copies,
-                               src_hdls, dst_hdls);
+               return __ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
 }
 
 static inline void
 rte_ioat_completed_copies(int dev_id, uint8_t max_copies,
                uintptr_t *src_hdls, uintptr_t *dst_hdls)
 {
-       return rte_ioat_completed_ops(dev_id, max_copies, src_hdls, dst_hdls);
+       return rte_ioat_completed_ops(dev_id, max_copies, NULL, NULL,
+                       src_hdls, dst_hdls);
 }
 
 #endif /* _RTE_IOAT_RAWDEV_FNS_H_ */
 
 
        for (i = 0; i < tx_config->nb_queues; i++) {
                if (copy_mode == COPY_MODE_IOAT_NUM) {
-                       /* Deque the mbufs from IOAT device. */
+                       /* Dequeue the mbufs from IOAT device. Since all memory
+                        * is DPDK pinned memory and therefore all addresses should
+                        * be valid, we don't check for copy errors
+                        */
                        nb_dq = rte_ioat_completed_ops(
-                               tx_config->ioat_ids[i], MAX_PKT_BURST,
+                               tx_config->ioat_ids[i], MAX_PKT_BURST, NULL, NULL,
                                (void *)mbufs_src, (void *)mbufs_dst);
                } else {
-                       /* Deque the mbufs from rx_to_tx_ring. */
+                       /* Dequeue the mbufs from rx_to_tx_ring. */
                        nb_dq = rte_ring_dequeue_burst(
                                tx_config->rx_to_tx_ring, (void *)mbufs_dst,
                                MAX_PKT_BURST, NULL);
 static void
 configure_rawdev_queue(uint32_t dev_id)
 {
-       struct rte_ioat_rawdev_config dev_config = { .ring_size = ring_size };
+       struct rte_ioat_rawdev_config dev_config = {
+                       .ring_size = ring_size,
+                       .no_prefetch_completions = (cfg.nb_lcores > 1),
+       };
        struct rte_rawdev_info info = { .dev_private = &dev_config };
 
        if (rte_rawdev_configure(dev_id, &info, sizeof(dev_config)) != 0) {
 
 
                uint16_t dev_id = dma_bind[vid].dmas[queue_id * 2
                                + VIRTIO_RXQ].dev_id;
-               n_seg = rte_ioat_completed_ops(dev_id, 255, dump, dump);
+               n_seg = rte_ioat_completed_ops(dev_id, 255, NULL, NULL, dump, dump);
                if (n_seg < 0) {
                        RTE_LOG(ERR,
                                VHOST_DATA,