+ failed = 1;
+ }
+
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ if (failed)
+ return -1;
+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static int
+handle_and_mark_work(void *arg)
+{
+ struct rte_mbuf *buf[8] __rte_cache_aligned;
+ struct worker_params *wp = arg;
+ struct rte_distributor *db = wp->dist;
+ unsigned int num, i;
+ unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
+ num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
+ while (!quit) {
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ for (i = 0; i < num; i++)
+ buf[i]->udata64 += id + 1;
+ num = rte_distributor_get_pkt(db, id,
+ buf, buf, num);
+ }
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ rte_distributor_return_pkt(db, id, buf, num);
+ return 0;
+}
+
+/* sanity_mark_test sends packets to workers which mark them.
+ * Every packet has also encoded sequence number.
+ * The returned packets are sorted and verified if they were handled
+ * by proper workers.
+ */
+static int
+sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
+{
+ const unsigned int buf_count = 24;
+ const unsigned int burst = 8;
+ const unsigned int shift = 12;
+ const unsigned int seq_shift = 10;
+
+ struct rte_distributor *db = wp->dist;
+ struct rte_mbuf *bufs[buf_count];
+ struct rte_mbuf *returns[buf_count];
+ unsigned int i, count, id;
+ unsigned int sorted[buf_count], seq;
+ unsigned int failed = 0;
+ unsigned int processed;
+
+ printf("=== Marked packets test ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);