X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=drivers%2Fevent%2Fsw%2Fsw_evdev_scheduler.c;h=f747b3c6d4ebe5d4bd0ac33f89be44f1535050bf;hb=ca4355e4c7bd107994f35519964466146206c4fd;hp=fb5d44630beaaf8385b6e53812710d0c843c447a;hpb=733fc6ca0b48e4bdf2717a8723243f93a3864bb4;p=dpdk.git diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index fb5d44630b..f747b3c6d4 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -7,6 +7,7 @@ #include #include "sw_evdev.h" #include "iq_chunk.h" +#include "event_ring.h" #define SW_IQS_MASK (SW_IQS_MAX-1) @@ -26,6 +27,7 @@ /* use cheap bit mixing, we only need to lose a few bits */ #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) + static inline uint32_t sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, uint32_t iq_num, unsigned int count) @@ -127,7 +129,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, if (keep_order) /* only schedule as many as we have reorder buffer entries */ count = RTE_MIN(count, - rte_ring_count(qid->reorder_buffer_freelist)); + rob_ring_count(qid->reorder_buffer_freelist)); for (i = 0; i < count; i++) { const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); @@ -146,9 +148,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, cq_idx = 0; cq = qid->cq_map[cq_idx++]; - } while (rte_event_ring_free_count( - sw->ports[cq].cq_worker_ring) == 0 || - sw->ports[cq].inflights == SW_PORT_HIST_LIST); + } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || + rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0); struct sw_port *p = &sw->ports[cq]; if (sw->cq_ring_space[cq] == 0 || @@ -164,7 +166,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, p->hist_list[head].qid = qid_id; if (keep_order) - rte_ring_sc_dequeue(qid->reorder_buffer_freelist, + rob_ring_dequeue(qid->reorder_buffer_freelist, (void *)&p->hist_list[head].rob_entry); sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; @@ -229,7 +231,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) uint32_t pkts_done = 0; uint32_t count = iq_count(&qid->iq[iq_num]); - if (count > 0) { + if (count >= sw->sched_min_burst) { if (type == SW_SCHED_TYPE_DIRECT) pkts_done += sw_schedule_dir_to_cq(sw, qid, iq_num, count); @@ -267,14 +269,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) for (; qid_start < qid_end; qid_start++) { struct sw_qid *qid = &sw->qids[qid_start]; - int i, num_entries_in_use; + unsigned int i, num_entries_in_use; if (qid->type != RTE_SCHED_TYPE_ORDERED) continue; - num_entries_in_use = rte_ring_free_count( + num_entries_in_use = rob_ring_free_count( qid->reorder_buffer_freelist); + if (num_entries_in_use < sw->sched_min_burst) + num_entries_in_use = 0; + for (i = 0; i < num_entries_in_use; i++) { struct reorder_buffer_entry *entry; int j; @@ -320,7 +325,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) if (!entry->ready) { entry->fragment_index = 0; - rte_ring_sp_enqueue( + rob_ring_enqueue( qid->reorder_buffer_freelist, entry); @@ -339,7 +344,7 @@ sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) struct rte_event_ring *worker = port->rx_worker_ring; port->pp_buf_start = 0; port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, - RTE_DIM(port->pp_buf), NULL); + sw->sched_deq_burst_size, NULL); } static __rte_always_inline uint32_t @@ -350,7 +355,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -468,7 +473,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -520,13 +525,18 @@ sw_event_schedule(struct rte_eventdev *dev) /* Pull from rx_ring for ports */ do { in_pkts = 0; - for (i = 0; i < sw->port_count; i++) + for (i = 0; i < sw->port_count; i++) { + /* ack the unlinks in progress as done */ + if (sw->ports[i].unlinks_in_progress) + sw->ports[i].unlinks_in_progress = 0; + if (sw->ports[i].is_directed) in_pkts += sw_schedule_pull_port_dir(sw, i); else if (sw->ports[i].num_ordered_qids > 0) in_pkts += sw_schedule_pull_port_lb(sw, i); else in_pkts += sw_schedule_pull_port_no_reorder(sw, i); + } /* QID scan for re-ordered */ in_pkts += sw_schedule_reorder(sw, 0, @@ -552,12 +562,39 @@ sw_event_schedule(struct rte_eventdev *dev) /* push all the internal buffered QEs in port->cq_ring to the * worker cores: aka, do the ring transfers batched. */ + int no_enq = 1; for (i = 0; i < sw->port_count; i++) { - struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; - rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, - sw->ports[i].cq_buf_count, - &sw->cq_ring_space[i]); - sw->ports[i].cq_buf_count = 0; + struct sw_port *port = &sw->ports[i]; + struct rte_event_ring *worker = port->cq_worker_ring; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (sw->refill_once_per_iter && port->pp_buf_count == 0) + sw_refill_pp_buf(sw, port); + + if (port->cq_buf_count >= sw->sched_min_burst) { + rte_event_ring_enqueue_burst(worker, + port->cq_buf, + port->cq_buf_count, + &sw->cq_ring_space[i]); + port->cq_buf_count = 0; + no_enq = 0; + } else { + sw->cq_ring_space[i] = + rte_event_ring_free_count(worker) - + port->cq_buf_count; + } + } + + if (no_enq) { + if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH)) + sw->sched_min_burst = 1; + else + sw->sched_flush_count++; + } else { + if (sw->sched_flush_count) + sw->sched_flush_count--; + else + sw->sched_min_burst = sw->sched_min_burst_size; } }