From 3c57ec0098752c8a5371cfe7befb29f65984de31 Mon Sep 17 00:00:00 2001 From: Lukasz Wojciechowski Date: Sat, 17 Oct 2020 05:06:57 +0200 Subject: [PATCH] test/distributor: add test with packets marking All of the former tests analyzed only statistics of packets processed by all workers. The new test verifies also if packets are processed on workers as expected. Every packets processed by the worker is marked and analyzed after it is returned to distributor. This test allows finding issues in matching algorithms. Signed-off-by: Lukasz Wojciechowski Acked-by: David Hunt --- app/test/test_distributor.c | 141 ++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index fdb6ea9ceb..cfae5a1ac9 100644 --- a/app/test/test_distributor.c +++ b/app/test/test_distributor.c @@ -543,6 +543,141 @@ test_flush_with_worker_shutdown(struct worker_params *wp, return 0; } +static int +handle_and_mark_work(void *arg) +{ + struct rte_mbuf *buf[8] __rte_cache_aligned; + struct worker_params *wp = arg; + struct rte_distributor *db = wp->dist; + unsigned int num, i; + unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); + num = rte_distributor_get_pkt(db, id, buf, NULL, 0); + while (!quit) { + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + for (i = 0; i < num; i++) + buf[i]->udata64 += id + 1; + num = rte_distributor_get_pkt(db, id, + buf, buf, num); + } + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + rte_distributor_return_pkt(db, id, buf, num); + return 0; +} + +/* sanity_mark_test sends packets to workers which mark them. + * Every packet has also encoded sequence number. + * The returned packets are sorted and verified if they were handled + * by proper workers. + */ +static int +sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) +{ + const unsigned int buf_count = 24; + const unsigned int burst = 8; + const unsigned int shift = 12; + const unsigned int seq_shift = 10; + + struct rte_distributor *db = wp->dist; + struct rte_mbuf *bufs[buf_count]; + struct rte_mbuf *returns[buf_count]; + unsigned int i, count, id; + unsigned int sorted[buf_count], seq; + unsigned int failed = 0; + + printf("=== Marked packets test ===\n"); + clear_packet_count(); + if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { + printf("line %d: Error getting mbufs from pool\n", __LINE__); + return -1; + } + + /* bufs' hashes will be like these below, but shifted left. + * The shifting is for avoiding collisions with backlogs + * and in-flight tags left by previous tests. + * [1, 1, 1, 1, 1, 1, 1, 1 + * 1, 1, 1, 1, 2, 2, 2, 2 + * 2, 2, 2, 2, 1, 1, 1, 1] + */ + for (i = 0; i < burst; i++) { + bufs[0 * burst + i]->hash.usr = 1 << shift; + bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) + << shift; + bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) + << shift; + } + /* Assign a sequence number to each packet. The sequence is shifted, + * so that lower bits of the udate64 will hold mark from worker. + */ + for (i = 0; i < buf_count; i++) + bufs[i]->udata64 = i << seq_shift; + + count = 0; + for (i = 0; i < buf_count/burst; i++) { + rte_distributor_process(db, &bufs[i * burst], burst); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } + + do { + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } while (count < buf_count); + + for (i = 0; i < rte_lcore_count() - 1; i++) + printf("Worker %u handled %u packets\n", i, + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); + + /* Sort returned packets by sent order (sequence numbers). */ + for (i = 0; i < buf_count; i++) { + seq = returns[i]->udata64 >> seq_shift; + id = returns[i]->udata64 - (seq << seq_shift); + sorted[seq] = id; + } + + /* Verify that packets [0-11] and [20-23] were processed + * by the same worker + */ + for (i = 1; i < 12; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + for (i = 20; i < 24; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + /* And verify that packets [12-19] were processed + * by the another worker + */ + for (i = 13; i < 20; i++) { + if (sorted[i] != sorted[12]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[12]); + failed = 1; + } + } + + rte_mempool_put_bulk(p, (void *)bufs, buf_count); + + if (failed) + return -1; + + printf("Marked packets test passed\n"); + return 0; +} + static int test_error_distributor_create_name(void) { @@ -727,6 +862,12 @@ test_distributor(void) goto err; quit_workers(&worker_params, p); + rte_eal_mp_remote_launch(handle_and_mark_work, + &worker_params, SKIP_MASTER); + if (sanity_mark_test(&worker_params, p) < 0) + goto err; + quit_workers(&worker_params, p); + } else { printf("Too few cores to run worker shutdown test\n"); } -- 2.20.1