test/distributor: fix return buffer queue overload
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Tue, 19 Jan 2021 03:59:10 +0000 (04:59 +0100)
committerDavid Marchand <david.marchand@redhat.com>
Fri, 29 Jan 2021 07:48:45 +0000 (08:48 +0100)
The distributor library implementation uses a cyclic queue to store
packets returned from workers. These packets can be later collected
with rte_distributor_returned_pkts() call.
However the queue has limited capacity. It is able to contain only
127 packets (RTE_DISTRIB_RETURNS_MASK).

Big burst tests sent 1024 packets in 32 packets bursts without waiting
until they are processed by the distributor. In case when tests were
run with big number of worker threads, it happened that more than
127 packets were returned from workers and put into cyclic queue.
This caused packets to be dropped by the queue, making them impossible
to be collected later with rte_distributor_returned_pkts() calls.
However the test waited for all packets to be returned infinitely.

This patch fixes the big burst test by not allowing more than
queue capacity packets to be processed at the same time, making
impossible to drop any packets.
It also cleans up duplicated code in the same test.

Bugzilla ID: 612
Fixes: c0de0eb82e40 ("distributor: switch over to new API")
Cc: stable@dpdk.org
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Tested-by: David Marchand <david.marchand@redhat.com>
Reviewed-by: David Hunt <david.hunt@intel.com>
app/test/test_distributor.c

index f4c6229..961f326 100644 (file)
@@ -217,6 +217,8 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
        clear_packet_count();
        struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
        unsigned num_returned = 0;
+       unsigned int num_being_processed = 0;
+       unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */
 
        /* flush out any remaining packets */
        rte_distributor_flush(db);
@@ -233,16 +235,16 @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
        for (i = 0; i < BIG_BATCH/BURST; i++) {
                rte_distributor_process(db,
                                &many_bufs[i*BURST], BURST);
-               count = rte_distributor_returned_pkts(db,
-                               &return_bufs[num_returned],
-                               BIG_BATCH - num_returned);
-               num_returned += count;
+               num_being_processed += BURST;
+               do {
+                       count = rte_distributor_returned_pkts(db,
+                                       &return_bufs[num_returned],
+                                       BIG_BATCH - num_returned);
+                       num_being_processed -= count;
+                       num_returned += count;
+                       rte_distributor_flush(db);
+               } while (num_being_processed + BURST > return_buffer_capacity);
        }
-       rte_distributor_flush(db);
-       count = rte_distributor_returned_pkts(db,
-               &return_bufs[num_returned],
-                       BIG_BATCH - num_returned);
-       num_returned += count;
        retries = 0;
        do {
                rte_distributor_flush(db);