return 0;
}
+static int
+handle_and_mark_work(void *arg)
+{
+ struct rte_mbuf *buf[8] __rte_cache_aligned;
+ struct worker_params *wp = arg;
+ struct rte_distributor *db = wp->dist;
+ unsigned int num, i;
+ unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
+ num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
+ while (!quit) {
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ for (i = 0; i < num; i++)
+ buf[i]->udata64 += id + 1;
+ num = rte_distributor_get_pkt(db, id,
+ buf, buf, num);
+ }
+ __atomic_fetch_add(&worker_stats[id].handled_packets, num,
+ __ATOMIC_RELAXED);
+ rte_distributor_return_pkt(db, id, buf, num);
+ return 0;
+}
+
+/* sanity_mark_test sends packets to workers which mark them.
+ * Every packet has also encoded sequence number.
+ * The returned packets are sorted and verified if they were handled
+ * by proper workers.
+ */
+static int
+sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
+{
+ const unsigned int buf_count = 24;
+ const unsigned int burst = 8;
+ const unsigned int shift = 12;
+ const unsigned int seq_shift = 10;
+
+ struct rte_distributor *db = wp->dist;
+ struct rte_mbuf *bufs[buf_count];
+ struct rte_mbuf *returns[buf_count];
+ unsigned int i, count, id;
+ unsigned int sorted[buf_count], seq;
+ unsigned int failed = 0;
+
+ printf("=== Marked packets test ===\n");
+ clear_packet_count();
+ if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return -1;
+ }
+
+ /* bufs' hashes will be like these below, but shifted left.
+ * The shifting is for avoiding collisions with backlogs
+ * and in-flight tags left by previous tests.
+ * [1, 1, 1, 1, 1, 1, 1, 1
+ * 1, 1, 1, 1, 2, 2, 2, 2
+ * 2, 2, 2, 2, 1, 1, 1, 1]
+ */
+ for (i = 0; i < burst; i++) {
+ bufs[0 * burst + i]->hash.usr = 1 << shift;
+ bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
+ << shift;
+ bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
+ << shift;
+ }
+ /* Assign a sequence number to each packet. The sequence is shifted,
+ * so that lower bits of the udate64 will hold mark from worker.
+ */
+ for (i = 0; i < buf_count; i++)
+ bufs[i]->udata64 = i << seq_shift;
+
+ count = 0;
+ for (i = 0; i < buf_count/burst; i++) {
+ rte_distributor_process(db, &bufs[i * burst], burst);
+ count += rte_distributor_returned_pkts(db, &returns[count],
+ buf_count - count);
+ }
+
+ do {
+ rte_distributor_flush(db);
+ count += rte_distributor_returned_pkts(db, &returns[count],
+ buf_count - count);
+ } while (count < buf_count);
+
+ for (i = 0; i < rte_lcore_count() - 1; i++)
+ printf("Worker %u handled %u packets\n", i,
+ __atomic_load_n(&worker_stats[i].handled_packets,
+ __ATOMIC_RELAXED));
+
+ /* Sort returned packets by sent order (sequence numbers). */
+ for (i = 0; i < buf_count; i++) {
+ seq = returns[i]->udata64 >> seq_shift;
+ id = returns[i]->udata64 - (seq << seq_shift);
+ sorted[seq] = id;
+ }
+
+ /* Verify that packets [0-11] and [20-23] were processed
+ * by the same worker
+ */
+ for (i = 1; i < 12; i++) {
+ if (sorted[i] != sorted[0]) {
+ printf("Packet number %u processed by worker %u,"
+ " but should be processes by worker %u\n",
+ i, sorted[i], sorted[0]);
+ failed = 1;
+ }
+ }
+ for (i = 20; i < 24; i++) {
+ if (sorted[i] != sorted[0]) {
+ printf("Packet number %u processed by worker %u,"
+ " but should be processes by worker %u\n",
+ i, sorted[i], sorted[0]);
+ failed = 1;
+ }
+ }
+ /* And verify that packets [12-19] were processed
+ * by the another worker
+ */
+ for (i = 13; i < 20; i++) {
+ if (sorted[i] != sorted[12]) {
+ printf("Packet number %u processed by worker %u,"
+ " but should be processes by worker %u\n",
+ i, sorted[i], sorted[12]);
+ failed = 1;
+ }
+ }
+
+ rte_mempool_put_bulk(p, (void *)bufs, buf_count);
+
+ if (failed)
+ return -1;
+
+ printf("Marked packets test passed\n");
+ return 0;
+}
+
static
int test_error_distributor_create_name(void)
{
goto err;
quit_workers(&worker_params, p);
+ rte_eal_mp_remote_launch(handle_and_mark_work,
+ &worker_params, SKIP_MASTER);
+ if (sanity_mark_test(&worker_params, p) < 0)
+ goto err;
+ quit_workers(&worker_params, p);
+
} else {
printf("Too few cores to run worker shutdown test\n");
}