X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=app%2Ftest%2Ftest_distributor.c;h=eb889b91d119f03ff1993823967d0e6cf2360281;hb=cb056611a8ed9ab9024f3b91bf26e97255194514;hp=6cd7a2edda1d15d2a83382f5dfd8e8d5c1e6ee38;hpb=cf669d6930116b80493d67cdc5d7a1a568eed8e9;p=dpdk.git diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index 6cd7a2edda..eb889b91d1 100644 --- a/app/test/test_distributor.c +++ b/app/test/test_distributor.c @@ -27,6 +27,7 @@ struct worker_params worker_params; /* statics - all zero-initialized by default */ static volatile int quit; /**< general quit variable for all threads */ static volatile int zero_quit; /**< var for when we just want thr0 to quit*/ +static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/ static volatile unsigned worker_idx; static volatile unsigned zero_idx; @@ -43,7 +44,8 @@ total_packet_count(void) { unsigned i, count = 0; for (i = 0; i < worker_idx; i++) - count += worker_stats[i].handled_packets; + count += __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED); return count; } @@ -51,7 +53,10 @@ total_packet_count(void) static inline void clear_packet_count(void) { - memset(&worker_stats, 0, sizeof(worker_stats)); + unsigned int i; + for (i = 0; i < RTE_MAX_LCORE; i++) + __atomic_store_n(&worker_stats[i].handled_packets, 0, + __ATOMIC_RELAXED); } /* this is the basic worker function for sanity test @@ -63,20 +68,18 @@ handle_work(void *arg) struct rte_mbuf *buf[8] __rte_cache_aligned; struct worker_params *wp = arg; struct rte_distributor *db = wp->dist; - unsigned int count = 0, num; + unsigned int num; unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); num = rte_distributor_get_pkt(db, id, buf, NULL, 0); while (!quit) { __atomic_fetch_add(&worker_stats[id].handled_packets, num, __ATOMIC_RELAXED); - count += num; num = rte_distributor_get_pkt(db, id, buf, buf, num); } __atomic_fetch_add(&worker_stats[id].handled_packets, num, __ATOMIC_RELAXED); - count += num; rte_distributor_return_pkt(db, id, buf, num); return 0; } @@ -100,6 +103,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) struct rte_mbuf *returns[BURST*2]; unsigned int i, count; unsigned int retries; + unsigned int processed; printf("=== Basic distributor sanity tests ===\n"); clear_packet_count(); @@ -113,7 +117,11 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 0; - rte_distributor_process(db, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(db, &bufs[processed], + BURST - processed); + count = 0; do { @@ -126,12 +134,14 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); printf("Sanity test with all zero hashes done.\n"); /* pick two flows and check they go correctly */ @@ -151,12 +161,15 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); + __atomic_load_n( + &worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); printf("Sanity test with two hash values done\n"); } @@ -177,12 +190,14 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); printf("Sanity test with non-zero hashes done\n"); rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -231,6 +246,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) if (num_returned != BIG_BATCH) { printf("line %d: Missing packets, expected %d\n", __LINE__, num_returned); + rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); return -1; } @@ -245,6 +261,7 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) if (j == BIG_BATCH) { printf("Error: could not find source packet #%u\n", i); + rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH); return -1; } } @@ -268,21 +285,20 @@ handle_work_with_free_mbufs(void *arg) struct rte_mbuf *buf[8] __rte_cache_aligned; struct worker_params *wp = arg; struct rte_distributor *d = wp->dist; - unsigned int count = 0; unsigned int i; unsigned int num; unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); while (!quit) { - worker_stats[id].handled_packets += num; - count += num; + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); for (i = 0; i < num; i++) rte_pktmbuf_free(buf[i]); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); } - worker_stats[id].handled_packets += num; - count += num; + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); rte_distributor_return_pkt(d, id, buf, num); return 0; } @@ -298,6 +314,7 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) struct rte_distributor *d = wp->dist; unsigned i; struct rte_mbuf *bufs[BURST]; + unsigned int processed; printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name); @@ -308,10 +325,12 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) rte_distributor_process(d, NULL, 0); for (j = 0; j < BURST; j++) { bufs[j]->hash.usr = (i+j) << 1; - rte_mbuf_refcnt_set(bufs[j], 1); } - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, + &bufs[processed], BURST - processed); } rte_distributor_flush(d); @@ -332,15 +351,10 @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p) static int handle_work_for_shutdown_test(void *arg) { - struct rte_mbuf *pkt = NULL; struct rte_mbuf *buf[8] __rte_cache_aligned; struct worker_params *wp = arg; struct rte_distributor *d = wp->dist; - unsigned int count = 0; unsigned int num; - unsigned int total = 0; - unsigned int i; - unsigned int returned = 0; unsigned int zero_id = 0; unsigned int zero_unset; const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ -358,10 +372,8 @@ handle_work_for_shutdown_test(void *arg) /* wait for quit single globally, or for worker zero, wait * for zero_quit */ while (!quit && !(id == zero_id && zero_quit)) { - worker_stats[id].handled_packets += num; - count += num; - for (i = 0; i < num; i++) - rte_pktmbuf_free(buf[i]); + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); if (num > 0) { @@ -370,32 +382,30 @@ handle_work_for_shutdown_test(void *arg) false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE); } zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE); - - total += num; } - worker_stats[id].handled_packets += num; - count += num; - returned = rte_distributor_return_pkt(d, id, buf, num); + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); if (id == zero_id) { + rte_distributor_return_pkt(d, id, NULL, 0); + /* for worker zero, allow it to restart to pick up last packet * when all workers are shutting down. */ + __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE); while (zero_quit) usleep(100); + __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); while (!quit) { - worker_stats[id].handled_packets += num; - count += num; - rte_pktmbuf_free(pkt); + __atomic_fetch_add(&worker_stats[id].handled_packets, + num, __ATOMIC_RELAXED); num = rte_distributor_get_pkt(d, id, buf, NULL, 0); } - returned = rte_distributor_return_pkt(d, - id, buf, num); - printf("Num returned = %d\n", returned); } + rte_distributor_return_pkt(d, id, buf, num); return 0; } @@ -411,7 +421,10 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, { struct rte_distributor *d = wp->dist; struct rte_mbuf *bufs[BURST]; - unsigned i; + struct rte_mbuf *bufs2[BURST]; + unsigned int i; + unsigned int failed = 0; + unsigned int processed = 0; printf("=== Sanity test of worker shutdown ===\n"); @@ -429,7 +442,10 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 1; - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, &bufs[processed], + BURST - processed); rte_distributor_flush(d); /* at this point, we will have processed some packets and have a full @@ -437,32 +453,45 @@ sanity_test_with_worker_shutdown(struct worker_params *wp, */ /* get more buffers to queue up, again setting them to the same flow */ - if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { + if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { printf("line %d: Error getting mbufs from pool\n", __LINE__); + rte_mempool_put_bulk(p, (void *)bufs, BURST); return -1; } for (i = 0; i < BURST; i++) - bufs[i]->hash.usr = 1; + bufs2[i]->hash.usr = 1; /* get worker zero to quit */ zero_quit = 1; - rte_distributor_process(d, bufs, BURST); + rte_distributor_process(d, bufs2, BURST); /* flush the distributor */ rte_distributor_flush(d); - rte_delay_us(10000); + while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_distributor_flush(d); + + zero_quit = 0; + while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_delay_us(100); for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); if (total_packet_count() != BURST * 2) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST * 2, total_packet_count()); - return -1; + failed = 1; } + rte_mempool_put_bulk(p, (void *)bufs, BURST); + rte_mempool_put_bulk(p, (void *)bufs2, BURST); + + if (failed) + return -1; + printf("Sanity test with worker shutdown passed\n\n"); return 0; } @@ -476,7 +505,9 @@ test_flush_with_worker_shutdown(struct worker_params *wp, { struct rte_distributor *d = wp->dist; struct rte_mbuf *bufs[BURST]; - unsigned i; + unsigned int i; + unsigned int failed = 0; + unsigned int processed; printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name); @@ -491,7 +522,10 @@ test_flush_with_worker_shutdown(struct worker_params *wp, for (i = 0; i < BURST; i++) bufs[i]->hash.usr = 0; - rte_distributor_process(d, bufs, BURST); + processed = 0; + while (processed < BURST) + processed += rte_distributor_process(d, &bufs[processed], + BURST - processed); /* at this point, we will have processed some packets and have a full * backlog for the other ones at worker 0. */ @@ -502,24 +536,175 @@ test_flush_with_worker_shutdown(struct worker_params *wp, /* flush the distributor */ rte_distributor_flush(d); - rte_delay_us(10000); + while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_distributor_flush(d); zero_quit = 0; + + while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE)) + rte_delay_us(100); + for (i = 0; i < rte_lcore_count() - 1; i++) printf("Worker %u handled %u packets\n", i, - worker_stats[i].handled_packets); + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); if (total_packet_count() != BURST) { printf("Line %d: Error, not all packets flushed. " "Expected %u, got %u\n", __LINE__, BURST, total_packet_count()); - return -1; + failed = 1; } + rte_mempool_put_bulk(p, (void *)bufs, BURST); + + if (failed) + return -1; + printf("Flush test with worker shutdown passed\n\n"); return 0; } +static int +handle_and_mark_work(void *arg) +{ + struct rte_mbuf *buf[8] __rte_cache_aligned; + struct worker_params *wp = arg; + struct rte_distributor *db = wp->dist; + unsigned int num, i; + unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED); + num = rte_distributor_get_pkt(db, id, buf, NULL, 0); + while (!quit) { + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + for (i = 0; i < num; i++) + buf[i]->udata64 += id + 1; + num = rte_distributor_get_pkt(db, id, + buf, buf, num); + } + __atomic_fetch_add(&worker_stats[id].handled_packets, num, + __ATOMIC_RELAXED); + rte_distributor_return_pkt(db, id, buf, num); + return 0; +} + +/* sanity_mark_test sends packets to workers which mark them. + * Every packet has also encoded sequence number. + * The returned packets are sorted and verified if they were handled + * by proper workers. + */ +static int +sanity_mark_test(struct worker_params *wp, struct rte_mempool *p) +{ + const unsigned int buf_count = 24; + const unsigned int burst = 8; + const unsigned int shift = 12; + const unsigned int seq_shift = 10; + + struct rte_distributor *db = wp->dist; + struct rte_mbuf *bufs[buf_count]; + struct rte_mbuf *returns[buf_count]; + unsigned int i, count, id; + unsigned int sorted[buf_count], seq; + unsigned int failed = 0; + unsigned int processed; + + printf("=== Marked packets test ===\n"); + clear_packet_count(); + if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) { + printf("line %d: Error getting mbufs from pool\n", __LINE__); + return -1; + } + + /* bufs' hashes will be like these below, but shifted left. + * The shifting is for avoiding collisions with backlogs + * and in-flight tags left by previous tests. + * [1, 1, 1, 1, 1, 1, 1, 1 + * 1, 1, 1, 1, 2, 2, 2, 2 + * 2, 2, 2, 2, 1, 1, 1, 1] + */ + for (i = 0; i < burst; i++) { + bufs[0 * burst + i]->hash.usr = 1 << shift; + bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2) + << shift; + bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1) + << shift; + } + /* Assign a sequence number to each packet. The sequence is shifted, + * so that lower bits of the udate64 will hold mark from worker. + */ + for (i = 0; i < buf_count; i++) + bufs[i]->udata64 = i << seq_shift; + + count = 0; + for (i = 0; i < buf_count/burst; i++) { + processed = 0; + while (processed < burst) + processed += rte_distributor_process(db, + &bufs[i * burst + processed], + burst - processed); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } + + do { + rte_distributor_flush(db); + count += rte_distributor_returned_pkts(db, &returns[count], + buf_count - count); + } while (count < buf_count); + + for (i = 0; i < rte_lcore_count() - 1; i++) + printf("Worker %u handled %u packets\n", i, + __atomic_load_n(&worker_stats[i].handled_packets, + __ATOMIC_RELAXED)); + + /* Sort returned packets by sent order (sequence numbers). */ + for (i = 0; i < buf_count; i++) { + seq = returns[i]->udata64 >> seq_shift; + id = returns[i]->udata64 - (seq << seq_shift); + sorted[seq] = id; + } + + /* Verify that packets [0-11] and [20-23] were processed + * by the same worker + */ + for (i = 1; i < 12; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + for (i = 20; i < 24; i++) { + if (sorted[i] != sorted[0]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[0]); + failed = 1; + } + } + /* And verify that packets [12-19] were processed + * by the another worker + */ + for (i = 13; i < 20; i++) { + if (sorted[i] != sorted[12]) { + printf("Packet number %u processed by worker %u," + " but should be processes by worker %u\n", + i, sorted[i], sorted[12]); + failed = 1; + } + } + + rte_mempool_put_bulk(p, (void *)bufs, buf_count); + + if (failed) + return -1; + + printf("Marked packets test passed\n"); + return 0; +} + static int test_error_distributor_create_name(void) { @@ -581,22 +766,34 @@ quit_workers(struct worker_params *wp, struct rte_mempool *p) const unsigned num_workers = rte_lcore_count() - 1; unsigned i; struct rte_mbuf *bufs[RTE_MAX_LCORE]; - rte_mempool_get_bulk(p, (void *)bufs, num_workers); + struct rte_mbuf *returns[RTE_MAX_LCORE]; + if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { + printf("line %d: Error getting mbufs from pool\n", __LINE__); + return; + } zero_quit = 0; quit = 1; - for (i = 0; i < num_workers; i++) + for (i = 0; i < num_workers; i++) { bufs[i]->hash.usr = i << 1; - rte_distributor_process(d, bufs, num_workers); - - rte_mempool_put_bulk(p, (void *)bufs, num_workers); + rte_distributor_process(d, &bufs[i], 1); + } rte_distributor_process(d, NULL, 0); rte_distributor_flush(d); rte_eal_mp_wait_lcore(); + + while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE)) + ; + + rte_distributor_clear_returns(d); + rte_mempool_put_bulk(p, (void *)bufs, num_workers); + quit = 0; worker_idx = 0; zero_idx = RTE_MAX_LCORE; + zero_quit = 0; + zero_sleep = 0; } static int @@ -665,13 +862,13 @@ test_distributor(void) sizeof(worker_params.name)); rte_eal_mp_remote_launch(handle_work, - &worker_params, SKIP_MASTER); + &worker_params, SKIP_MAIN); if (sanity_test(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); rte_eal_mp_remote_launch(handle_work_with_free_mbufs, - &worker_params, SKIP_MASTER); + &worker_params, SKIP_MAIN); if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); @@ -679,7 +876,7 @@ test_distributor(void) if (rte_lcore_count() > 2) { rte_eal_mp_remote_launch(handle_work_for_shutdown_test, &worker_params, - SKIP_MASTER); + SKIP_MAIN); if (sanity_test_with_worker_shutdown(&worker_params, p) < 0) goto err; @@ -687,12 +884,18 @@ test_distributor(void) rte_eal_mp_remote_launch(handle_work_for_shutdown_test, &worker_params, - SKIP_MASTER); + SKIP_MAIN); if (test_flush_with_worker_shutdown(&worker_params, p) < 0) goto err; quit_workers(&worker_params, p); + rte_eal_mp_remote_launch(handle_and_mark_work, + &worker_params, SKIP_MAIN); + if (sanity_mark_test(&worker_params, p) < 0) + goto err; + quit_workers(&worker_params, p); + } else { printf("Too few cores to run worker shutdown test\n"); }