event/sw: change worker rings to standard event rings
[dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
index c0fe6a3..8a2c9d4 100644 (file)
@@ -32,9 +32,9 @@
 
 #include <rte_ring.h>
 #include <rte_hash_crc.h>
+#include <rte_event_ring.h>
 #include "sw_evdev.h"
 #include "iq_ring.h"
-#include "event_ring.h"
 
 #define SW_IQS_MASK (SW_IQS_MAX-1)
 
@@ -51,6 +51,8 @@
 
 #define MAX_PER_IQ_DEQUEUE 48
 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
+/* use cheap bit mixing, we only need to lose a few bits */
+#define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
 
 static inline uint32_t
 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
@@ -72,9 +74,7 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
        iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
        for (i = 0; i < count; i++) {
                const struct rte_event *qe = &qes[i];
-               /* use cheap bit mixing, we only need to lose a few bits */
-               uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
-               const uint16_t flow_id = FLOWID_MASK & flow_id32;
+               const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
                struct sw_fid_t *fid = &qid->fids[flow_id];
                int cq = fid->cq;
 
@@ -119,11 +119,12 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
 
                p->stats.tx_pkts++;
                qid->stats.tx_pkts++;
+               qid->to_port[cq]++;
 
                /* if we just filled in the last slot, flush the buffer */
                if (sw->cq_ring_space[cq] == 0) {
-                       struct qe_ring *worker = p->cq_worker_ring;
-                       qe_ring_enqueue_burst(worker, p->cq_buf,
+                       struct rte_event_ring *worker = p->cq_worker_ring;
+                       rte_event_ring_enqueue_burst(worker, p->cq_buf,
                                        p->cq_buf_count,
                                        &sw->cq_ring_space[cq]);
                        p->cq_buf_count = 0;
@@ -170,7 +171,8 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                        cq = qid->cq_map[cq_idx];
                        if (++cq_idx == qid->cq_num_mapped_cqs)
                                cq_idx = 0;
-               } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
+               } while (rte_event_ring_free_count(
+                               sw->ports[cq].cq_worker_ring) == 0 ||
                                sw->ports[cq].inflights == SW_PORT_HIST_LIST);
 
                struct sw_port *p = &sw->ports[cq];
@@ -183,8 +185,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                qid->stats.tx_pkts++;
 
                const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
-
-               p->hist_list[head].fid = qe->flow_id;
+               p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
                p->hist_list[head].qid = qid_id;
 
                if (keep_order)
@@ -363,20 +364,20 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
        return pkts_iter;
 }
 
-static inline void __attribute__((always_inline))
+static __rte_always_inline void
 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
 {
        RTE_SET_USED(sw);
-       struct qe_ring *worker = port->rx_worker_ring;
+       struct rte_event_ring *worker = port->rx_worker_ring;
        port->pp_buf_start = 0;
-       port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
-                       RTE_DIM(port->pp_buf));
+       port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
+                       RTE_DIM(port->pp_buf), NULL);
 }
 
-static inline uint32_t __attribute__((always_inline))
+static __rte_always_inline uint32_t
 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
 {
-       static const struct reorder_buffer_entry dummy_rob;
+       static struct reorder_buffer_entry dummy_rob;
        uint32_t pkts_iter = 0;
        struct sw_port *port = &sw->ports[port_id];
 
@@ -449,6 +450,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
                                struct reorder_buffer_entry *rob_entry =
                                                hist_entry->rob_entry;
 
+                               hist_entry->rob_entry = NULL;
                                /* Although fragmentation not currently
                                 * supported by eventdev API, we support it
                                 * here. Open: How do we alert the user that
@@ -585,8 +587,8 @@ sw_event_schedule(struct rte_eventdev *dev)
         * worker cores: aka, do the ring transfers batched.
         */
        for (i = 0; i < sw->port_count; i++) {
-               struct qe_ring *worker = sw->ports[i].cq_worker_ring;
-               qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
+               struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
+               rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
                                sw->ports[i].cq_buf_count,
                                &sw->cq_ring_space[i]);
                sw->ports[i].cq_buf_count = 0;