return evt_nr_active_lcores(opt->wlcores) + 1 /* producer */;
}
+static inline __attribute__((always_inline)) void
+order_process_stage_1(struct test_order *const t,
+ struct rte_event *const ev, const uint32_t nb_flows,
+ uint32_t *const expected_flow_seq,
+ rte_atomic64_t *const outstand_pkts)
+{
+ const uint32_t flow = (uintptr_t)ev->mbuf % nb_flows;
+ /* compare the seqn against expected value */
+ if (ev->mbuf->seqn != expected_flow_seq[flow]) {
+ evt_err("flow=%x seqn mismatch got=%x expected=%x",
+ flow, ev->mbuf->seqn, expected_flow_seq[flow]);
+ t->err = true;
+ rte_smp_wmb();
+ }
+ /*
+ * Events from an atomic flow of an event queue can be scheduled only to
+ * a single port at a time. The port is guaranteed to have exclusive
+ * (atomic) access for given atomic flow.So we don't need to update
+ * expected_flow_seq in critical section.
+ */
+ expected_flow_seq[flow]++;
+ rte_pktmbuf_free(ev->mbuf);
+ rte_atomic64_sub(outstand_pkts, 1);
+}
+
+static inline __attribute__((always_inline)) void
+order_process_stage_invalid(struct test_order *const t,
+ struct rte_event *const ev)
+{
+ evt_err("invalid queue %d", ev->queue_id);
+ t->err = true;
+ rte_smp_wmb();
+}
+
+#define ORDER_WORKER_INIT\
+ struct worker_data *w = arg;\
+ struct test_order *t = w->t;\
+ struct evt_options *opt = t->opt;\
+ const uint8_t dev_id = w->dev_id;\
+ const uint8_t port = w->port_id;\
+ const uint32_t nb_flows = t->nb_flows;\
+ uint32_t *expected_flow_seq = t->expected_flow_seq;\
+ rte_atomic64_t *outstand_pkts = &t->outstand_pkts;\
+ if (opt->verbose_level > 1)\
+ printf("%s(): lcore %d dev_id %d port=%d\n",\
+ __func__, rte_lcore_id(), dev_id, port)
+
int order_test_result(struct evt_test *test, struct evt_options *opt);
int order_opt_check(struct evt_options *opt);
int order_test_setup(struct evt_test *test, struct evt_options *opt);
/* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
+static inline __attribute__((always_inline)) void
+order_queue_process_stage_0(struct rte_event *const ev)
+{
+ ev->queue_id = 1; /* q1 atomic queue */
+ ev->op = RTE_EVENT_OP_FORWARD;
+ ev->sched_type = RTE_SCHED_TYPE_ATOMIC;
+ ev->event_type = RTE_EVENT_TYPE_CPU;
+}
+
+static int
+order_queue_worker(void *arg)
+{
+ ORDER_WORKER_INIT;
+ struct rte_event ev;
+
+ while (t->err == false) {
+ uint16_t event = rte_event_dequeue_burst(dev_id, port,
+ &ev, 1, 0);
+ if (!event) {
+ if (rte_atomic64_read(outstand_pkts) <= 0)
+ break;
+ rte_pause();
+ continue;
+ }
+
+ if (ev.queue_id == 0) { /* from ordered queue */
+ order_queue_process_stage_0(&ev);
+ while (rte_event_enqueue_burst(dev_id, port, &ev, 1)
+ != 1)
+ rte_pause();
+ } else if (ev.queue_id == 1) { /* from atomic queue */
+ order_process_stage_1(t, &ev, nb_flows,
+ expected_flow_seq, outstand_pkts);
+ } else {
+ order_process_stage_invalid(t, &ev);
+ }
+ }
+ return 0;
+}
+
+static int
+order_queue_worker_burst(void *arg)
+{
+ ORDER_WORKER_INIT;
+ struct rte_event ev[BURST_SIZE];
+ uint16_t i;
+
+ while (t->err == false) {
+ uint16_t const nb_rx = rte_event_dequeue_burst(dev_id, port, ev,
+ BURST_SIZE, 0);
+
+ if (nb_rx == 0) {
+ if (rte_atomic64_read(outstand_pkts) <= 0)
+ break;
+ rte_pause();
+ continue;
+ }
+
+ for (i = 0; i < nb_rx; i++) {
+ if (ev[i].queue_id == 0) { /* from ordered queue */
+ order_queue_process_stage_0(&ev[i]);
+ } else if (ev[i].queue_id == 1) {/* from atomic queue */
+ order_process_stage_1(t, &ev[i], nb_flows,
+ expected_flow_seq, outstand_pkts);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ } else {
+ order_process_stage_invalid(t, &ev[i]);
+ }
+ }
+
+ uint16_t enq;
+
+ enq = rte_event_enqueue_burst(dev_id, port, ev, nb_rx);
+ while (enq < nb_rx) {
+ enq += rte_event_enqueue_burst(dev_id, port,
+ ev + enq, nb_rx - enq);
+ }
+ }
+ return 0;
+}
+
+static int
+worker_wrapper(void *arg)
+{
+ struct worker_data *w = arg;
+ const bool burst = evt_has_burst_mode(w->dev_id);
+
+ if (burst)
+ return order_queue_worker_burst(arg);
+ else
+ return order_queue_worker(arg);
+}
+
+static int
+order_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
+{
+ return order_launch_lcores(test, opt, worker_wrapper);
+}
+
#define NB_QUEUES 2
static int
order_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
.test_setup = order_test_setup,
.mempool_setup = order_mempool_setup,
.eventdev_setup = order_queue_eventdev_setup,
+ .launch_lcores = order_queue_launch_lcores,
.eventdev_destroy = order_eventdev_destroy,
.mempool_destroy = order_mempool_destroy,
.test_result = order_test_result,