#include <rte_errno.h>
#include <rte_mempool.h>
#include <rte_mbuf.h>
+#include <rte_mbuf_dyn.h>
+
+#ifdef RTE_EXEC_ENV_WINDOWS
+static int
+test_distributor(void)
+{
+ printf("distributor not supported on Windows, skipping test\n");
+ return TEST_SKIPPED;
+}
+
+#else
+
#include <rte_distributor.h>
#include <rte_string_fns.h>
#define BURST 32
#define BIG_BATCH 1024
+typedef uint32_t seq_dynfield_t;
+static int seq_dynfield_offset = -1;
+
+static inline seq_dynfield_t *
+seq_field(struct rte_mbuf *mbuf)
+{
+ return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *);
+}
+
struct worker_params {
char name[64];
struct rte_distributor *dist;
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
clear_packet_count();
struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
unsigned num_returned = 0;
+ unsigned int num_being_processed = 0;
+ unsigned int return_buffer_capacity = 127;/* RTE_DISTRIB_RETURNS_MASK */
/* flush out any remaining packets */
rte_distributor_flush(db);
for (i = 0; i < BIG_BATCH/BURST; i++) {
rte_distributor_process(db,
&many_bufs[i*BURST], BURST);
- count = rte_distributor_returned_pkts(db,
- &return_bufs[num_returned],
- BIG_BATCH - num_returned);
- num_returned += count;
+ num_being_processed += BURST;
+ do {
+ count = rte_distributor_returned_pkts(db,
+ &return_bufs[num_returned],
+ BIG_BATCH - num_returned);
+ num_being_processed -= count;
+ num_returned += count;
+ rte_distributor_flush(db);
+ } while (num_being_processed + BURST > return_buffer_capacity);
}
- rte_distributor_flush(db);
- count = rte_distributor_returned_pkts(db,
- &return_bufs[num_returned],
- BIG_BATCH - num_returned);
- num_returned += count;
retries = 0;
do {
rte_distributor_flush(db);
if (num_returned != BIG_BATCH) {
printf("line %d: Missing packets, expected %d\n",
__LINE__, num_returned);
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
return -1;
}
if (j == BIG_BATCH) {
printf("Error: could not find source packet #%u\n", i);
+ rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
return -1;
}
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_RELAXED);
for (i = 0; i < num; i++)
- buf[i]->udata64 += id + 1;
+ *seq_field(buf[i]) += id + 1;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
<< shift;
}
/* Assign a sequence number to each packet. The sequence is shifted,
- * so that lower bits of the udate64 will hold mark from worker.
+ * so that lower bits will hold mark from worker.
*/
for (i = 0; i < buf_count; i++)
- bufs[i]->udata64 = i << seq_shift;
+ *seq_field(bufs[i]) = i << seq_shift;
count = 0;
for (i = 0; i < buf_count/burst; i++) {
/* Sort returned packets by sent order (sequence numbers). */
for (i = 0; i < buf_count; i++) {
- seq = returns[i]->udata64 >> seq_shift;
- id = returns[i]->udata64 - (seq << seq_shift);
+ seq = *seq_field(returns[i]) >> seq_shift;
+ id = *seq_field(returns[i]) - (seq << seq_shift);
sorted[seq] = id;
}
static struct rte_mempool *p;
int i;
+ static const struct rte_mbuf_dynfield seq_dynfield_desc = {
+ .name = "test_distributor_dynfield_seq",
+ .size = sizeof(seq_dynfield_t),
+ .align = __alignof__(seq_dynfield_t),
+ };
+ seq_dynfield_offset =
+ rte_mbuf_dynfield_register(&seq_dynfield_desc);
+ if (seq_dynfield_offset < 0) {
+ printf("Error registering mbuf field\n");
+ return TEST_FAILED;
+ }
+
if (rte_lcore_count() < 2) {
printf("Not enough cores for distributor_autotest, expecting at least 2\n");
return TEST_SKIPPED;
sizeof(worker_params.name));
rte_eal_mp_remote_launch(handle_work,
- &worker_params, SKIP_MASTER);
+ &worker_params, SKIP_MAIN);
if (sanity_test(&worker_params, p) < 0)
goto err;
quit_workers(&worker_params, p);
rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
- &worker_params, SKIP_MASTER);
+ &worker_params, SKIP_MAIN);
if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
goto err;
quit_workers(&worker_params, p);
if (rte_lcore_count() > 2) {
rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
&worker_params,
- SKIP_MASTER);
+ SKIP_MAIN);
if (sanity_test_with_worker_shutdown(&worker_params,
p) < 0)
goto err;
rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
&worker_params,
- SKIP_MASTER);
+ SKIP_MAIN);
if (test_flush_with_worker_shutdown(&worker_params,
p) < 0)
goto err;
quit_workers(&worker_params, p);
rte_eal_mp_remote_launch(handle_and_mark_work,
- &worker_params, SKIP_MASTER);
+ &worker_params, SKIP_MAIN);
if (sanity_mark_test(&worker_params, p) < 0)
goto err;
quit_workers(&worker_params, p);
return -1;
}
+#endif /* !RTE_EXEC_ENV_WINDOWS */
+
REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);