X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;ds=sidebyside;f=app%2Ftest%2Ftest_distributor.c;h=eb889b91d119f03ff1993823967d0e6cf2360281;hb=cb056611a8ed9ab9024f3b91bf26e97255194514;hp=4343efed14ed2244d5b819ea58e0b4fce5df8411;hpb=2dfdfcb404493a781d152afbc85a2a8a5b90580b;p=dpdk.git diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index 4343efed14..eb889b91d1 100644 --- a/app/test/test_distributor.c +++ b/app/test/test_distributor.c @@ -27,6 +27,7 @@ struct worker_params worker_params; /* statics - all zero-initialized by default */ static volatile int quit; /**< general quit variable for all threads */ static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ +static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/ static volatile unsigned worker_idx; static volatile unsigned zero_idx; @@ -102,6 +103,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) struct rte_mbuf *returns[BURST*2]; unsigned int i, count; unsigned int retries; + unsigned int processed; printf("=== Basic distributor sanity tests ===\n"); clear_packet_count(); @@ -115,7 +117,11 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 0; - rte_distributor_process(db, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(db, &bufs[processed], + BURST - processed); + count = 0; do { @@ -128,6 +134,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } @@ -154,6 +161,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } @@ -182,6 +190,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } @@ -237,6 +246,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) if (num_returned != BIG_BATCH) { printf("line %d: Missing packets, expected %d\n", __LINE__, num_returned); + rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); return -1; } @@ -251,6 +261,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) if (j == BIG_BATCH) { printf("Error: could not find source packet #%u\n", i); + rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); return -1; } } @@ -303,6 +314,7 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) struct rte_distributor *d = wp->dist; unsigned i; struct rte_mbuf *bufs[BURST]; + unsigned int processed; printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); @@ -315,7 +327,10 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) bufs[j]->hash.usr = (i+j) << 1; } - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, + &bufs[processed], BURST - processed); } rte_distributor_flush(d); @@ -377,8 +392,10 @@ handle_work_for_shutdown_test(void *arg) /* for worker zero, allow it to restart to pick up last packet * when all workers are shutting down. */ + __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE); while (zero_quit) usleep(100); + __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); @@ -407,6 +424,7 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, struct rte_mbuf *bufs2[BURST]; unsigned int i; unsigned int failed = 0; + unsigned int processed = 0; printf("=== Sanity test of worker shutdown ===\n"); @@ -424,7 +442,10 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 1; - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, &bufs[processed], + BURST - processed); rte_distributor_flush(d); /* at this point, we will have processed some packets and have a full @@ -446,7 +467,12 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, /* flush the distributor */ rte_distributor_flush(d); - rte_delay_us(10000); + while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_distributor_flush(d); + + zero_quit = 0; + while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_delay_us(100); for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, @@ -481,6 +507,7 @@ test_flush_with_worker_shutdown(struct worker_params *wp, struct rte_mbuf *bufs[BURST]; unsigned int i; unsigned int failed = 0; + unsigned int processed; printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); @@ -495,7 +522,10 @@ test_flush_with_worker_shutdown(struct worker_params *wp, for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 0; - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, &bufs[processed], + BURST - processed); /* at this point, we will have processed some packets and have a full * backlog for the other ones at worker 0. */ @@ -506,9 +536,14 @@ test_flush_with_worker_shutdown(struct worker_params *wp, /* flush the distributor */ rte_distributor_flush(d); - rte_delay_us(10000); + while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_distributor_flush(d); zero_quit = 0; + + while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_delay_us(100); + for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, __atomic_load_n(&worker_stats[i].handled_packets, @@ -530,6 +565,146 @@ test_flush_with_worker_shutdown(struct worker_params *wp, return 0; } +static int +handle_and_mark_work(void *arg) +{ + struct rte_mbuf *buf[8] __rte_cache_aligned; + struct worker_params *wp = arg; + struct rte_distributor *db = wp->dist; + unsigned int num, i; + unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); + num = rte_distributor_get_pkt(db, id, buf, NULL, 0); + while (!quit) { + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + for (i = 0; i < num; i++) + buf[i]->udata64 += id + 1; + num = rte_distributor_get_pkt(db, id, + buf, buf, num); + } + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + rte_distributor_return_pkt(db, id, buf, num); + return 0; +} + +/* sanity_mark_test sends packets to workers which mark them. + * Every packet has also encoded sequence number. + * The returned packets are sorted and verified if they were handled + * by proper workers. + */ +static int +sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) +{ + const unsigned int buf_count = 24; + const unsigned int burst = 8; + const unsigned int shift = 12; + const unsigned int seq_shift = 10; + + struct rte_distributor *db = wp->dist; + struct rte_mbuf *bufs[buf_count]; + struct rte_mbuf *returns[buf_count]; + unsigned int i, count, id; + unsigned int sorted[buf_count], seq; + unsigned int failed = 0; + unsigned int processed; + + printf("=== Marked packets test ===\n"); + clear_packet_count(); + if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { + printf("line %d: Error getting mbufs from pool\n", __LINE__); + return -1; + } + + /* bufs' hashes will be like these below, but shifted left. + * The shifting is for avoiding collisions with backlogs + * and in-flight tags left by previous tests. + * [1, 1, 1, 1, 1, 1, 1, 1 + * 1, 1, 1, 1, 2, 2, 2, 2 + * 2, 2, 2, 2, 1, 1, 1, 1] + */ + for (i = 0; i < burst; i++) { + bufs[0 * burst + i]->hash.usr = 1 << shift; + bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) + << shift; + bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) + << shift; + } + /* Assign a sequence number to each packet. The sequence is shifted, + * so that lower bits of the udate64 will hold mark from worker. + */ + for (i = 0; i < buf_count; i++) + bufs[i]->udata64 = i << seq_shift; + + count = 0; + for (i = 0; i < buf_count/burst; i++) { + processed = 0; + while (processed < burst) + processed += rte_distributor_process(db, + &bufs[i * burst + processed], + burst - processed); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } + + do { + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } while (count < buf_count); + + for (i = 0; i < rte_lcore_count() - 1; i++) + printf("Worker %u handled %u packets\n", i, + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); + + /* Sort returned packets by sent order (sequence numbers). */ + for (i = 0; i < buf_count; i++) { + seq = returns[i]->udata64 >> seq_shift; + id = returns[i]->udata64 - (seq << seq_shift); + sorted[seq] = id; + } + + /* Verify that packets [0-11] and [20-23] were processed + * by the same worker + */ + for (i = 1; i < 12; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + for (i = 20; i < 24; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + /* And verify that packets [12-19] were processed + * by the another worker + */ + for (i = 13; i < 20; i++) { + if (sorted[i] != sorted[12]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[12]); + failed = 1; + } + } + + rte_mempool_put_bulk(p, (void *)bufs, buf_count); + + if (failed) + return -1; + + printf("Marked packets test passed\n"); + return 0; +} + static int test_error_distributor_create_name(void) { @@ -591,6 +766,7 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p) const unsigned num_workers = rte_lcore_count() - 1; unsigned i; struct rte_mbuf *bufs[RTE_MAX_LCORE]; + struct rte_mbuf *returns[RTE_MAX_LCORE]; if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { printf("line %d: Error getting mbufs from pool\n", __LINE__); return; @@ -598,19 +774,26 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p) zero_quit = 0; quit = 1; - for (i = 0; i < num_workers; i++) + for (i = 0; i < num_workers; i++) { bufs[i]->hash.usr = i << 1; - rte_distributor_process(d, bufs, num_workers); + rte_distributor_process(d, &bufs[i], 1); + } rte_distributor_process(d, NULL, 0); rte_distributor_flush(d); rte_eal_mp_wait_lcore(); + while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE)) + ; + + rte_distributor_clear_returns(d); rte_mempool_put_bulk(p, (void *)bufs, num_workers); quit = 0; worker_idx = 0; zero_idx = RTE_MAX_LCORE; + zero_quit = 0; + zero_sleep = 0; } static int @@ -679,13 +862,13 @@ test_distributor(void) sizeof(worker_params.name)); rte_eal_mp_remote_launch(handle_work, - &worker_params, SKIP_MASTER); + &worker_params, SKIP_MAIN); if (sanity_test(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); rte_eal_mp_remote_launch(handle_work_with_free_mbufs, - &worker_params, SKIP_MASTER); + &worker_params, SKIP_MAIN); if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); @@ -693,7 +876,7 @@ test_distributor(void) if (rte_lcore_count() > 2) { rte_eal_mp_remote_launch(handle_work_for_shutdown_test, &worker_params, - SKIP_MASTER); + SKIP_MAIN); if (sanity_test_with_worker_shutdown(&worker_params, p) < 0) goto err; @@ -701,12 +884,18 @@ test_distributor(void) rte_eal_mp_remote_launch(handle_work_for_shutdown_test, &worker_params, - SKIP_MASTER); + SKIP_MAIN); if (test_flush_with_worker_shutdown(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); + rte_eal_mp_remote_launch(handle_and_mark_work, + &worker_params, SKIP_MAIN); + if (sanity_mark_test(&worker_params, p) < 0) + goto err; + quit_workers(&worker_params, p); + } else { printf("Too few cores to run worker shutdown test\n"); }