X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=drivers%2Fevent%2Fsw%2Fsw_evdev_scheduler.c;h=f747b3c6d4ebe5d4bd0ac33f89be44f1535050bf;hb=ca4355e4c7bd107994f35519964466146206c4fd;hp=e008b519ef3cedbe9d104905bc75c3148b38207d;hpb=7904c82a787cef322907850325b3b2c8b9dbe1c1;p=dpdk.git diff --git a/drivers/event/sw/sw_evdev_scheduler.c b/drivers/event/sw/sw_evdev_scheduler.c index e008b519ef..f747b3c6d4 100644 --- a/drivers/event/sw/sw_evdev_scheduler.c +++ b/drivers/event/sw/sw_evdev_scheduler.c @@ -1,39 +1,12 @@ -/*- - * BSD LICENSE - * - * Copyright(c) 2016-2017 Intel Corporation. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation */ #include #include +#include #include "sw_evdev.h" -#include "iq_ring.h" +#include "iq_chunk.h" #include "event_ring.h" #define SW_IQS_MASK (SW_IQS_MAX-1) @@ -54,6 +27,7 @@ /* use cheap bit mixing, we only need to lose a few bits */ #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) + static inline uint32_t sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, uint32_t iq_num, unsigned int count) @@ -71,7 +45,7 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, */ uint32_t qid_id = qid->id; - iq_ring_dequeue_burst(qid->iq[iq_num], qes, count); + iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); for (i = 0; i < count; i++) { const struct rte_event *qe = &qes[i]; const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); @@ -79,9 +53,11 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, int cq = fid->cq; if (cq < 0) { - uint32_t cq_idx = qid->cq_next_tx++; - if (qid->cq_next_tx == qid->cq_num_mapped_cqs) + uint32_t cq_idx; + if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) qid->cq_next_tx = 0; + cq_idx = qid->cq_next_tx++; + cq = qid->cq_map[cq_idx]; /* find least used */ @@ -119,17 +95,18 @@ sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, p->stats.tx_pkts++; qid->stats.tx_pkts++; + qid->to_port[cq]++; /* if we just filled in the last slot, flush the buffer */ if (sw->cq_ring_space[cq] == 0) { - struct qe_ring *worker = p->cq_worker_ring; - qe_ring_enqueue_burst(worker, p->cq_buf, + struct rte_event_ring *worker = p->cq_worker_ring; + rte_event_ring_enqueue_burst(worker, p->cq_buf, p->cq_buf_count, &sw->cq_ring_space[cq]); p->cq_buf_count = 0; } } - iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked); + iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); return count - nb_blocked; } @@ -152,10 +129,10 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, if (keep_order) /* only schedule as many as we have reorder buffer entries */ count = RTE_MIN(count, - rte_ring_count(qid->reorder_buffer_freelist)); + rob_ring_count(qid->reorder_buffer_freelist)); for (i = 0; i < count; i++) { - const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]); + const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); uint32_t cq_check_count = 0; uint32_t cq; @@ -167,11 +144,13 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, do { if (++cq_check_count > qid->cq_num_mapped_cqs) goto exit; - cq = qid->cq_map[cq_idx]; - if (++cq_idx == qid->cq_num_mapped_cqs) + if (cq_idx >= qid->cq_num_mapped_cqs) cq_idx = 0; - } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 || - sw->ports[cq].inflights == SW_PORT_HIST_LIST); + cq = qid->cq_map[cq_idx++]; + + } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || + rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0); struct sw_port *p = &sw->ports[cq]; if (sw->cq_ring_space[cq] == 0 || @@ -187,11 +166,11 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, p->hist_list[head].qid = qid_id; if (keep_order) - rte_ring_sc_dequeue(qid->reorder_buffer_freelist, + rob_ring_dequeue(qid->reorder_buffer_freelist, (void *)&p->hist_list[head].rob_entry); sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; - iq_ring_pop(qid->iq[iq_num]); + iq_pop(sw, &qid->iq[iq_num]); rte_compiler_barrier(); p->inflights++; @@ -216,8 +195,8 @@ sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, return 0; /* burst dequeue from the QID IQ ring */ - struct iq_ring *ring = qid->iq[iq_num]; - uint32_t ret = iq_ring_dequeue_burst(ring, + struct sw_iq *iq = &qid->iq[iq_num]; + uint32_t ret = iq_dequeue_burst(sw, iq, &port->cq_buf[port->cq_buf_count], count_free); port->cq_buf_count += ret; @@ -246,13 +225,13 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); /* zero mapped CQs indicates directed */ - if (iq_num >= SW_IQS_MAX) + if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) continue; uint32_t pkts_done = 0; - uint32_t count = iq_ring_count(qid->iq[iq_num]); + uint32_t count = iq_count(&qid->iq[iq_num]); - if (count > 0) { + if (count >= sw->sched_min_burst) { if (type == SW_SCHED_TYPE_DIRECT) pkts_done += sw_schedule_dir_to_cq(sw, qid, iq_num, count); @@ -290,14 +269,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) for (; qid_start < qid_end; qid_start++) { struct sw_qid *qid = &sw->qids[qid_start]; - int i, num_entries_in_use; + unsigned int i, num_entries_in_use; if (qid->type != RTE_SCHED_TYPE_ORDERED) continue; - num_entries_in_use = rte_ring_free_count( + num_entries_in_use = rob_ring_free_count( qid->reorder_buffer_freelist); + if (num_entries_in_use < sw->sched_min_burst) + num_entries_in_use = 0; + for (i = 0; i < num_entries_in_use; i++) { struct reorder_buffer_entry *entry; int j; @@ -322,22 +304,15 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) continue; } - struct sw_qid *dest_qid_ptr = - &sw->qids[dest_qid]; - const struct iq_ring *dest_iq_ptr = - dest_qid_ptr->iq[dest_iq]; - if (iq_ring_free_count(dest_iq_ptr) == 0) - break; - pkts_iter++; struct sw_qid *q = &sw->qids[dest_qid]; - struct iq_ring *r = q->iq[dest_iq]; + struct sw_iq *iq = &q->iq[dest_iq]; /* we checked for space above, so enqueue must * succeed */ - iq_ring_enqueue(r, qe); + iq_enqueue(sw, iq, qe); q->iq_pkt_mask |= (1 << (dest_iq)); q->iq_pkt_count[dest_iq]++; q->stats.rx_pkts++; @@ -350,7 +325,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) if (!entry->ready) { entry->fragment_index = 0; - rte_ring_sp_enqueue( + rob_ring_enqueue( qid->reorder_buffer_freelist, entry); @@ -362,17 +337,17 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) return pkts_iter; } -static inline void __attribute__((always_inline)) +static __rte_always_inline void sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) { RTE_SET_USED(sw); - struct qe_ring *worker = port->rx_worker_ring; + struct rte_event_ring *worker = port->rx_worker_ring; port->pp_buf_start = 0; - port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf, - RTE_DIM(port->pp_buf)); + port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, + sw->sched_deq_burst_size, NULL); } -static inline uint32_t __attribute__((always_inline)) +static __rte_always_inline uint32_t __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) { static struct reorder_buffer_entry dummy_rob; @@ -380,7 +355,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -402,10 +377,6 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) uint32_t iq_num = PRIO_TO_IQ(qe->priority); struct sw_qid *qid = &sw->qids[qe->queue_id]; - if ((flags & QE_FLAG_VALID) && - iq_ring_free_count(qid->iq[iq_num]) == 0) - break; - /* now process based on flags. Note that for directed * queues, the enqueue_flush masks off all but the * valid flag. This makes FWD and PARTIAL enqueues just @@ -448,6 +419,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) struct reorder_buffer_entry *rob_entry = hist_entry->rob_entry; + hist_entry->rob_entry = NULL; /* Although fragmentation not currently * supported by eventdev API, we support it * here. Open: How do we alert the user that @@ -468,7 +440,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) */ qid->iq_pkt_mask |= (1 << (iq_num)); - iq_ring_enqueue(qid->iq[iq_num], qe); + iq_enqueue(sw, &qid->iq[iq_num], qe); qid->iq_pkt_count[iq_num]++; qid->stats.rx_pkts++; pkts_iter++; @@ -501,7 +473,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) struct sw_port *port = &sw->ports[port_id]; /* If shadow ring has 0 pkts, pull from worker ring */ - if (port->pp_buf_count == 0) + if (!sw->refill_once_per_iter && port->pp_buf_count == 0) sw_refill_pp_buf(sw, port); while (port->pp_buf_count) { @@ -513,10 +485,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) uint32_t iq_num = PRIO_TO_IQ(qe->priority); struct sw_qid *qid = &sw->qids[qe->queue_id]; - struct iq_ring *iq_ring = qid->iq[iq_num]; - - if (iq_ring_free_count(iq_ring) == 0) - break; /* move to next port */ + struct sw_iq *iq = &qid->iq[iq_num]; port->stats.rx_pkts++; @@ -524,7 +493,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) * into the qid at the right priority */ qid->iq_pkt_mask |= (1 << (iq_num)); - iq_ring_enqueue(iq_ring, qe); + iq_enqueue(sw, iq, qe); qid->iq_pkt_count[iq_num]++; qid->stats.rx_pkts++; pkts_iter++; @@ -547,7 +516,7 @@ sw_event_schedule(struct rte_eventdev *dev) uint32_t i; sw->sched_called++; - if (!sw->started) + if (unlikely(!sw->started)) return; do { @@ -556,13 +525,18 @@ sw_event_schedule(struct rte_eventdev *dev) /* Pull from rx_ring for ports */ do { in_pkts = 0; - for (i = 0; i < sw->port_count; i++) + for (i = 0; i < sw->port_count; i++) { + /* ack the unlinks in progress as done */ + if (sw->ports[i].unlinks_in_progress) + sw->ports[i].unlinks_in_progress = 0; + if (sw->ports[i].is_directed) in_pkts += sw_schedule_pull_port_dir(sw, i); else if (sw->ports[i].num_ordered_qids > 0) in_pkts += sw_schedule_pull_port_lb(sw, i); else in_pkts += sw_schedule_pull_port_no_reorder(sw, i); + } /* QID scan for re-ordered */ in_pkts += sw_schedule_reorder(sw, 0, @@ -571,8 +545,7 @@ sw_event_schedule(struct rte_eventdev *dev) } while (in_pkts > 4 && (int)in_pkts_this_iteration < sched_quanta); - out_pkts = 0; - out_pkts += sw_schedule_qid_to_cq(sw); + out_pkts = sw_schedule_qid_to_cq(sw); out_pkts_total += out_pkts; in_pkts_total += in_pkts_this_iteration; @@ -580,21 +553,48 @@ sw_event_schedule(struct rte_eventdev *dev) break; } while ((int)out_pkts_total < sched_quanta); + sw->stats.tx_pkts += out_pkts_total; + sw->stats.rx_pkts += in_pkts_total; + + sw->sched_no_iq_enqueues += (in_pkts_total == 0); + sw->sched_no_cq_enqueues += (out_pkts_total == 0); + /* push all the internal buffered QEs in port->cq_ring to the * worker cores: aka, do the ring transfers batched. */ + int no_enq = 1; for (i = 0; i < sw->port_count; i++) { - struct qe_ring *worker = sw->ports[i].cq_worker_ring; - qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf, - sw->ports[i].cq_buf_count, - &sw->cq_ring_space[i]); - sw->ports[i].cq_buf_count = 0; + struct sw_port *port = &sw->ports[i]; + struct rte_event_ring *worker = port->cq_worker_ring; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (sw->refill_once_per_iter && port->pp_buf_count == 0) + sw_refill_pp_buf(sw, port); + + if (port->cq_buf_count >= sw->sched_min_burst) { + rte_event_ring_enqueue_burst(worker, + port->cq_buf, + port->cq_buf_count, + &sw->cq_ring_space[i]); + port->cq_buf_count = 0; + no_enq = 0; + } else { + sw->cq_ring_space[i] = + rte_event_ring_free_count(worker) - + port->cq_buf_count; + } } - sw->stats.tx_pkts += out_pkts_total; - sw->stats.rx_pkts += in_pkts_total; - - sw->sched_no_iq_enqueues += (in_pkts_total == 0); - sw->sched_no_cq_enqueues += (out_pkts_total == 0); + if (no_enq) { + if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH)) + sw->sched_min_burst = 1; + else + sw->sched_flush_count++; + } else { + if (sw->sched_flush_count) + sw->sched_flush_count--; + else + sw->sched_min_burst = sw->sched_min_burst_size; + } }