struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *db = wp->dist;
- unsigned int count = 0, num;
+ unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
while (!quit) {
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_RELAXED);
- count += num;
num = rte_distributor_get_pkt(db, id,
buf, buf, num);
}
__atomic_fetch_add(&worker_stats[id].handled_packets, num,
__ATOMIC_RELAXED);
- count += num;
rte_distributor_return_pkt(db, id, buf, num);
return 0;
}
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int i;
unsigned int num;
unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
while (!quit) {
worker_stats[id].handled_packets += num;
- count += num;
for (i = 0; i < num; i++)
rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
worker_stats[id].handled_packets += num;
- count += num;
rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
rte_distributor_process(d, NULL, 0);
for (j = 0; j < BURST; j++) {
bufs[j]->hash.usr = (i+j) << 1;
- rte_mbuf_refcnt_set(bufs[j], 1);
}
rte_distributor_process(d, bufs, BURST);
static int
handle_work_for_shutdown_test(void *arg)
{
- struct rte_mbuf *pkt = NULL;
struct rte_mbuf *buf[8] __rte_cache_aligned;
struct worker_params *wp = arg;
struct rte_distributor *d = wp->dist;
- unsigned int count = 0;
unsigned int num;
- unsigned int total = 0;
- unsigned int i;
- unsigned int returned = 0;
unsigned int zero_id = 0;
unsigned int zero_unset;
const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
* for zero_quit */
while (!quit && !(id == zero_id && zero_quit)) {
worker_stats[id].handled_packets += num;
- count += num;
- for (i = 0; i < num; i++)
- rte_pktmbuf_free(buf[i]);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
if (num > 0) {
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
}
zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
-
- total += num;
}
worker_stats[id].handled_packets += num;
- count += num;
- returned = rte_distributor_return_pkt(d, id, buf, num);
if (id == zero_id) {
+ rte_distributor_return_pkt(d, id, NULL, 0);
+
/* for worker zero, allow it to restart to pick up last packet
* when all workers are shutting down.
*/
while (!quit) {
worker_stats[id].handled_packets += num;
- count += num;
- rte_pktmbuf_free(pkt);
num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
}
- returned = rte_distributor_return_pkt(d,
- id, buf, num);
- printf("Num returned = %d\n", returned);
}
+ rte_distributor_return_pkt(d, id, buf, num);
return 0;
}
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ struct rte_mbuf *bufs2[BURST];
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Sanity test of worker shutdown ===\n");
*/
/* get more buffers to queue up, again setting them to the same flow */
- if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+ if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
return -1;
}
for (i = 0; i < BURST; i++)
- bufs[i]->hash.usr = 1;
+ bufs2[i]->hash.usr = 1;
/* get worker zero to quit */
zero_quit = 1;
- rte_distributor_process(d, bufs, BURST);
+ rte_distributor_process(d, bufs2, BURST);
/* flush the distributor */
rte_distributor_flush(d);
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST * 2, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+ rte_mempool_put_bulk(p, (void *)bufs2, BURST);
+
+ if (failed)
+ return -1;
+
printf("Sanity test with worker shutdown passed\n\n");
return 0;
}
{
struct rte_distributor *d = wp->dist;
struct rte_mbuf *bufs[BURST];
- unsigned i;
+ unsigned int i;
+ unsigned int failed = 0;
printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
printf("Line %d: Error, not all packets flushed. "
"Expected %u, got %u\n",
__LINE__, BURST, total_packet_count());
- return -1;
+ failed = 1;
}
+ rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+ if (failed)
+ return -1;
+
printf("Flush test with worker shutdown passed\n\n");
return 0;
}
const unsigned num_workers = rte_lcore_count() - 1;
unsigned i;
struct rte_mbuf *bufs[RTE_MAX_LCORE];
- rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+ if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
+ printf("line %d: Error getting mbufs from pool\n", __LINE__);
+ return;
+ }
zero_quit = 0;
quit = 1;
bufs[i]->hash.usr = i << 1;
rte_distributor_process(d, bufs, num_workers);
- rte_mempool_put_bulk(p, (void *)bufs, num_workers);
-
rte_distributor_process(d, NULL, 0);
rte_distributor_flush(d);
rte_eal_mp_wait_lcore();
+
+ rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
quit = 0;
worker_idx = 0;
zero_idx = RTE_MAX_LCORE;