+
+ printf("Flush test with worker shutdown passed\n\n");
+ return 0;
+}
+
+static int
+handle_and_mark_work(void *arg)
+{
+ struct rte_mbuf *buf[8] __rte_cache_aligned;
+ struct worker_params *wp = arg;
+ struct rte_distributor *db = wp->dist;
+ unsigned int num, i;
+ unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
+ num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
+ while (!quit) {
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ for (i = 0; i < num; i++)
+ buf[i]->udata64 += id + 1;
+ num = rte_distributor_get_pkt(db, id,
+ buf, buf, num);
+ }
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ rte_distributor_return_pkt(db, id, buf, num);
+ return 0;
+}
+
+/* sanity_mark_test sends packets to workers which mark them.
+ * Every packet has also encoded sequence number.
+ * The returned packets are sorted and verified if they were handled
+ * by proper workers.
+ */
+static int
+sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
+{
+ const unsigned int buf_count = 24;
+ const unsigned int burst = 8;
+ const unsigned int shift = 12;
+ const unsigned int seq_shift = 10;
+
+ struct rte_distributor *db = wp->dist;
+ struct rte_mbuf *bufs[buf_count];
+ struct rte_mbuf *returns[buf_count];
+ unsigned int i, count, id;
+ unsigned int sorted[buf_count], seq;
+ unsigned int failed = 0;
+ unsigned int processed;
+
+ printf("=== Marked packets test ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* bufs' hashes will be like these below, but shifted left.
+ * The shifting is for avoiding collisions with backlogs
+ * and in-flight tags left by previous tests.
+ * [1, 1, 1, 1, 1, 1, 1, 1
+ * 1, 1, 1, 1, 2, 2, 2, 2
+ * 2, 2, 2, 2, 1, 1, 1, 1]
+ */
+ for (i = 0; i < burst; i++) {
+ bufs[0 * burst + i]->hash.usr = 1 << shift;
+ bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
+ << shift;
+ bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
+ << shift;
+ }
+ /* Assign a sequence number to each packet. The sequence is shifted,
+ * so that lower bits of the udate64 will hold mark from worker.
+ */
+ for (i = 0; i < buf_count; i++)
+ bufs[i]->udata64 = i << seq_shift;
+
+ count = 0;
+ for (i = 0; i < buf_count/burst; i++) {
+ processed = 0;
+ while (processed < burst)
+ processed += rte_distributor_process(db,
+ &bufs[i * burst + processed],
+ burst - processed);
+ count += rte_distributor_returned_pkts(db, &returns[count],
+ buf_count - count);