From: Harry van Haaren Date: Thu, 30 Mar 2017 19:30:47 +0000 (+0100) Subject: test/eventdev: add SW deadlock tests X-Git-Tag: spdx-start~3833 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=b1d518db9be83e64e5c1272e1115244e9302c351;p=dpdk.git test/eventdev: add SW deadlock tests This commit adds the worker loopback test to verify that the deadlock avoidance scheme is functioning, and a holb (head-of-line-blocking) test to ensure the head of line blocking avoidance is correct. Signed-off-by: Bruce Richardson Signed-off-by: David Hunt Signed-off-by: Harry van Haaren Acked-by: Anatoly Burakov --- diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c index 89e17b4ac8..fd6447e9b0 100644 --- a/test/test/test_eventdev_sw.c +++ b/test/test/test_eventdev_sw.c @@ -100,6 +100,69 @@ rte_gen_arp(int portid, struct rte_mempool *mp) return m; } +static void +xstats_print(void) +{ + const uint32_t XSTATS_MAX = 1024; + uint32_t i; + uint32_t ids[XSTATS_MAX]; + uint64_t values[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + for (i = 0; i < XSTATS_MAX; i++) + ids[i] = i; + + /* Device names / values */ + int ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, 0, + xstats_names, ids, XSTATS_MAX); + if (ret < 0) { + printf("%d: xstats names get() returned error\n", + __LINE__); + return; + } + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } + + /* Port names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 0, + xstats_names, ids, XSTATS_MAX); + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 1, + ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } + + /* Queue names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, 0, + xstats_names, ids, XSTATS_MAX); + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + 1, ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } +} + /* initialization and config */ static inline int init(struct test *t, int nb_queues, int nb_ports) @@ -2600,6 +2663,324 @@ unordered_basic(struct test *t) return parallel_basic(t, 0); } +static int +holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ +{ + const struct rte_event new_ev = { + .op = RTE_EVENT_OP_NEW + /* all other fields zero */ + }; + struct rte_event ev = new_ev; + unsigned int rx_port = 0; /* port we get the first flow on */ + char rx_port_used_stat[64]; + char rx_port_free_stat[64]; + char other_port_used_stat[64]; + + if (init(t, 1, 2) < 0 || + create_ports(t, 2) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + int nb_links = rte_event_port_link(evdev, t->port[1], NULL, NULL, 0); + if (rte_event_port_link(evdev, t->port[0], NULL, NULL, 0) != 1 || + nb_links != 1) { + printf("%d: Error links queue to ports\n", __LINE__); + goto err; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto err; + } + + /* send one packet and see where it goes, port 0 or 1 */ + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error doing first enqueue\n", __LINE__); + goto err; + } + rte_event_schedule(evdev); + + if (rte_event_dev_xstats_by_name_get(evdev, "port_0_cq_ring_used", NULL) + != 1) + rx_port = 1; + + snprintf(rx_port_used_stat, sizeof(rx_port_used_stat), + "port_%u_cq_ring_used", rx_port); + snprintf(rx_port_free_stat, sizeof(rx_port_free_stat), + "port_%u_cq_ring_free", rx_port); + snprintf(other_port_used_stat, sizeof(other_port_used_stat), + "port_%u_cq_ring_used", rx_port ^ 1); + if (rte_event_dev_xstats_by_name_get(evdev, rx_port_used_stat, NULL) + != 1) { + printf("%d: Error, first event not scheduled\n", __LINE__); + goto err; + } + + /* now fill up the rx port's queue with one flow to cause HOLB */ + do { + ev = new_ev; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_event_schedule(evdev); + } while (rte_event_dev_xstats_by_name_get(evdev, + rx_port_free_stat, NULL) != 0); + + /* one more packet, which needs to stay in IQ - i.e. HOLB */ + ev = new_ev; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_event_schedule(evdev); + + /* check that the other port still has an empty CQ */ + if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) + != 0) { + printf("%d: Error, second port CQ is not empty\n", __LINE__); + goto err; + } + /* check IQ now has one packet */ + if (rte_event_dev_xstats_by_name_get(evdev, "qid_0_iq_0_used", NULL) + != 1) { + printf("%d: Error, QID does not have exactly 1 packet\n", + __LINE__); + goto err; + } + + /* send another flow, which should pass the other IQ entry */ + ev = new_ev; + ev.flow_id = 1; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_event_schedule(evdev); + + if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) + != 1) { + printf("%d: Error, second flow did not pass out first\n", + __LINE__); + goto err; + } + + if (rte_event_dev_xstats_by_name_get(evdev, "qid_0_iq_0_used", NULL) + != 1) { + printf("%d: Error, QID does not have exactly 1 packet\n", + __LINE__); + goto err; + } + cleanup(t); + return 0; +err: + rte_event_dev_dump(evdev, stdout); + cleanup(t); + return -1; +} + +static int +worker_loopback_worker_fn(void *arg) +{ + struct test *t = arg; + uint8_t port = t->port[1]; + int count = 0; + int enqd; + + /* + * Takes packets from the input port and then loops them back through + * the Eventdev. Each packet gets looped through QIDs 0-8, 16 times + * so each packet goes through 8*16 = 128 times. + */ + printf("%d: \tWorker function started\n", __LINE__); + while (count < NUM_PACKETS) { +#define BURST_SIZE 32 + struct rte_event ev[BURST_SIZE]; + uint16_t i, nb_rx = rte_event_dequeue_burst(evdev, port, ev, + BURST_SIZE, 0); + if (nb_rx == 0) { + rte_pause(); + continue; + } + + for (i = 0; i < nb_rx; i++) { + ev[i].queue_id++; + if (ev[i].queue_id != 8) { + ev[i].op = RTE_EVENT_OP_FORWARD; + enqd = rte_event_enqueue_burst(evdev, port, + &ev[i], 1); + if (enqd != 1) { + printf("%d: Can't enqueue FWD!!\n", + __LINE__); + return -1; + } + continue; + } + + ev[i].queue_id = 0; + ev[i].mbuf->udata64++; + if (ev[i].mbuf->udata64 != 16) { + ev[i].op = RTE_EVENT_OP_FORWARD; + enqd = rte_event_enqueue_burst(evdev, port, + &ev[i], 1); + if (enqd != 1) { + printf("%d: Can't enqueue FWD!!\n", + __LINE__); + return -1; + } + continue; + } + /* we have hit 16 iterations through system - drop */ + rte_pktmbuf_free(ev[i].mbuf); + count++; + ev[i].op = RTE_EVENT_OP_RELEASE; + enqd = rte_event_enqueue_burst(evdev, port, &ev[i], 1); + if (enqd != 1) { + printf("%d drop enqueue failed\n", __LINE__); + return -1; + } + } + } + + return 0; +} + +static int +worker_loopback_producer_fn(void *arg) +{ + struct test *t = arg; + uint8_t port = t->port[0]; + uint64_t count = 0; + + printf("%d: \tProducer function started\n", __LINE__); + while (count < NUM_PACKETS) { + struct rte_mbuf *m = 0; + do { + m = rte_pktmbuf_alloc(t->mbuf_pool); + } while (m == NULL); + + m->udata64 = 0; + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .flow_id = (uintptr_t)m & 0xFFFF, + .mbuf = m, + }; + + if (rte_event_enqueue_burst(evdev, port, &ev, 1) != 1) { + while (rte_event_enqueue_burst(evdev, port, &ev, 1) != + 1) + rte_pause(); + } + + count++; + } + + return 0; +} + +static int +worker_loopback(struct test *t) +{ + /* use a single producer core, and a worker core to see what happens + * if the worker loops packets back multiple times + */ + struct test_event_dev_stats stats; + uint64_t print_cycles = 0, cycles = 0; + uint64_t tx_pkts = 0; + int err; + int w_lcore, p_lcore; + + if (init(t, 8, 2) < 0 || + create_atomic_qids(t, 8) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* RX with low max events */ + static struct rte_event_port_conf conf = { + .dequeue_depth = 32, + .enqueue_depth = 64, + }; + /* beware: this cannot be initialized in the static above as it would + * only be initialized once - and this needs to be set for multiple runs + */ + conf.new_event_threshold = 512; + + if (rte_event_port_setup(evdev, 0, &conf) < 0) { + printf("Error setting up RX port\n"); + return -1; + } + t->port[0] = 0; + /* TX with higher max events */ + conf.new_event_threshold = 4096; + if (rte_event_port_setup(evdev, 1, &conf) < 0) { + printf("Error setting up TX port\n"); + return -1; + } + t->port[1] = 1; + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[1], NULL, NULL, 0); + if (err != 8) { /* should have mapped all queues*/ + printf("%d: error mapping port 2 to all qids\n", __LINE__); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + p_lcore = rte_get_next_lcore( + /* start core */ -1, + /* skip master */ 1, + /* wrap */ 0); + w_lcore = rte_get_next_lcore(p_lcore, 1, 0); + + rte_eal_remote_launch(worker_loopback_producer_fn, t, p_lcore); + rte_eal_remote_launch(worker_loopback_worker_fn, t, w_lcore); + + print_cycles = cycles = rte_get_timer_cycles(); + while (rte_eal_get_lcore_state(p_lcore) != FINISHED || + rte_eal_get_lcore_state(w_lcore) != FINISHED) { + + rte_event_schedule(evdev); + + uint64_t new_cycles = rte_get_timer_cycles(); + + if (new_cycles - print_cycles > rte_get_timer_hz()) { + test_event_dev_stats_get(evdev, &stats); + printf( + "%d: \tSched Rx = %"PRIu64", Tx = %"PRIu64"\n", + __LINE__, stats.rx_pkts, stats.tx_pkts); + + print_cycles = new_cycles; + } + if (new_cycles - cycles > rte_get_timer_hz() * 3) { + test_event_dev_stats_get(evdev, &stats); + if (stats.tx_pkts == tx_pkts) { + rte_event_dev_dump(evdev, stdout); + printf("Dumping xstats:\n"); + xstats_print(); + printf( + "%d: No schedules for seconds, deadlock\n", + __LINE__); + return -1; + } + tx_pkts = stats.tx_pkts; + cycles = new_cycles; + } + } + rte_event_schedule(evdev); /* ensure all completions are flushed */ + + rte_eal_mp_wait_lcore(); + + cleanup(t); + return 0; +} + static struct rte_mempool *eventdev_func_mempool; static int @@ -2778,6 +3159,23 @@ test_sw_eventdev(void) printf("ERROR - Port Reconfig Credits Reset test FAILED.\n"); return ret; } + printf("*** Running Head-of-line-blocking test...\n"); + ret = holb(t); + if (ret != 0) { + printf("ERROR - Head-of-line-blocking test FAILED.\n"); + return ret; + } + if (rte_lcore_count() >= 3) { + printf("*** Running Worker loopback test...\n"); + ret = worker_loopback(t); + if (ret != 0) { + printf("ERROR - Worker loopback test FAILED.\n"); + return ret; + } + } else { + printf("### Not enough cores for worker loopback test.\n"); + printf("### Need at least 3 cores for test.\n"); + } /* * Free test instance, leaving mempool initialized, and a pointer to it * in static eventdev_func_mempool, as it is re-used on re-runs