rte_pause();
}
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+ struct rte_event *ev, const uint16_t nb_rx)
+{
+ uint16_t enq;
+
+ enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+ while (enq < nb_rx) {
+ enq += rte_event_enqueue_burst(dev, port,
+ ev + enq, nb_rx - enq);
+ }
+}
+
static __rte_always_inline void
worker_tx_pkt(struct rte_mbuf *mbuf)
{
return 0;
}
+static int
+worker_do_tx_burst(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev = data->dev_id;
+ uint8_t port = data->port_id;
+ uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+ ev, BATCH_SIZE, 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+ if (cq_id >= lst_qid) {
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev[i].mbuf);
+ tx++;
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ continue;
+ }
+ ev[i].queue_id = (cq_id == lst_qid) ?
+ cdata.next_qid[ev[i].queue_id] :
+ ev[i].queue_id;
+
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ } else {
+ ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
+ worker_fwd_event(&ev[i], cdata.queue_type);
+ }
+ work();
+ }
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
static int
setup_eventdev_worker_tx(struct cons_data *cons_data,
struct worker_data *worker_data)
void
set_worker_tx_setup_data(struct setup_data *caps, bool burst)
{
- RTE_SET_USED(burst);
- caps->worker = worker_do_tx;
+ if (burst)
+ caps->worker = worker_do_tx_burst;
+ else
+ caps->worker = worker_do_tx;
memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);