eventdev: switch sequence number to dynamic mbuf field
[dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
index cff747d..f747b3c 100644 (file)
@@ -7,6 +7,7 @@
 #include <rte_event_ring.h>
 #include "sw_evdev.h"
 #include "iq_chunk.h"
+#include "event_ring.h"
 
 #define SW_IQS_MASK (SW_IQS_MAX-1)
 
@@ -26,6 +27,7 @@
 /* use cheap bit mixing, we only need to lose a few bits */
 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
 
+
 static inline uint32_t
 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                uint32_t iq_num, unsigned int count)
@@ -127,7 +129,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
        if (keep_order)
                /* only schedule as many as we have reorder buffer entries */
                count = RTE_MIN(count,
-                               rte_ring_count(qid->reorder_buffer_freelist));
+                               rob_ring_count(qid->reorder_buffer_freelist));
 
        for (i = 0; i < count; i++) {
                const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
@@ -146,9 +148,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                                cq_idx = 0;
                        cq = qid->cq_map[cq_idx++];
 
-               } while (rte_event_ring_free_count(
-                               sw->ports[cq].cq_worker_ring) == 0 ||
-                               sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+               } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
+                               rte_event_ring_free_count(
+                                       sw->ports[cq].cq_worker_ring) == 0);
 
                struct sw_port *p = &sw->ports[cq];
                if (sw->cq_ring_space[cq] == 0 ||
@@ -164,7 +166,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                p->hist_list[head].qid = qid_id;
 
                if (keep_order)
-                       rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+                       rob_ring_dequeue(qid->reorder_buffer_freelist,
                                        (void *)&p->hist_list[head].rob_entry);
 
                sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
@@ -229,7 +231,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
                uint32_t pkts_done = 0;
                uint32_t count = iq_count(&qid->iq[iq_num]);
 
-               if (count > 0) {
+               if (count >= sw->sched_min_burst) {
                        if (type == SW_SCHED_TYPE_DIRECT)
                                pkts_done += sw_schedule_dir_to_cq(sw, qid,
                                                iq_num, count);
@@ -267,14 +269,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
 
        for (; qid_start < qid_end; qid_start++) {
                struct sw_qid *qid = &sw->qids[qid_start];
-               int i, num_entries_in_use;
+               unsigned int i, num_entries_in_use;
 
                if (qid->type != RTE_SCHED_TYPE_ORDERED)
                        continue;
 
-               num_entries_in_use = rte_ring_free_count(
+               num_entries_in_use = rob_ring_free_count(
                                        qid->reorder_buffer_freelist);
 
+               if (num_entries_in_use < sw->sched_min_burst)
+                       num_entries_in_use = 0;
+
                for (i = 0; i < num_entries_in_use; i++) {
                        struct reorder_buffer_entry *entry;
                        int j;
@@ -320,7 +325,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
                        if (!entry->ready) {
                                entry->fragment_index = 0;
 
-                               rte_ring_sp_enqueue(
+                               rob_ring_enqueue(
                                                qid->reorder_buffer_freelist,
                                                entry);
 
@@ -339,7 +344,7 @@ sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
        struct rte_event_ring *worker = port->rx_worker_ring;
        port->pp_buf_start = 0;
        port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
-                       RTE_DIM(port->pp_buf), NULL);
+                       sw->sched_deq_burst_size, NULL);
 }
 
 static __rte_always_inline uint32_t
@@ -350,7 +355,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
        struct sw_port *port = &sw->ports[port_id];
 
        /* If shadow ring has 0 pkts, pull from worker ring */
-       if (port->pp_buf_count == 0)
+       if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
                sw_refill_pp_buf(sw, port);
 
        while (port->pp_buf_count) {
@@ -468,7 +473,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
        struct sw_port *port = &sw->ports[port_id];
 
        /* If shadow ring has 0 pkts, pull from worker ring */
-       if (port->pp_buf_count == 0)
+       if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
                sw_refill_pp_buf(sw, port);
 
        while (port->pp_buf_count) {
@@ -557,12 +562,39 @@ sw_event_schedule(struct rte_eventdev *dev)
        /* push all the internal buffered QEs in port->cq_ring to the
         * worker cores: aka, do the ring transfers batched.
         */
+       int no_enq = 1;
        for (i = 0; i < sw->port_count; i++) {
-               struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
-               rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
-                               sw->ports[i].cq_buf_count,
-                               &sw->cq_ring_space[i]);
-               sw->ports[i].cq_buf_count = 0;
+               struct sw_port *port = &sw->ports[i];
+               struct rte_event_ring *worker = port->cq_worker_ring;
+
+               /* If shadow ring has 0 pkts, pull from worker ring */
+               if (sw->refill_once_per_iter && port->pp_buf_count == 0)
+                       sw_refill_pp_buf(sw, port);
+
+               if (port->cq_buf_count >= sw->sched_min_burst) {
+                       rte_event_ring_enqueue_burst(worker,
+                                       port->cq_buf,
+                                       port->cq_buf_count,
+                                       &sw->cq_ring_space[i]);
+                       port->cq_buf_count = 0;
+                       no_enq = 0;
+               } else {
+                       sw->cq_ring_space[i] =
+                                       rte_event_ring_free_count(worker) -
+                                       port->cq_buf_count;
+               }
+       }
+
+       if (no_enq) {
+               if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
+                       sw->sched_min_burst = 1;
+               else
+                       sw->sched_flush_count++;
+       } else {
+               if (sw->sched_flush_count)
+                       sw->sched_flush_count--;
+               else
+                       sw->sched_min_burst = sw->sched_min_burst_size;
        }
 
 }