#include <rte_event_ring.h>
#include "sw_evdev.h"
#include "iq_chunk.h"
+#include "event_ring.h"
#define SW_IQS_MASK (SW_IQS_MAX-1)
/* use cheap bit mixing, we only need to lose a few bits */
#define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
+
static inline uint32_t
sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
uint32_t iq_num, unsigned int count)
if (keep_order)
/* only schedule as many as we have reorder buffer entries */
count = RTE_MIN(count,
- rte_ring_count(qid->reorder_buffer_freelist));
+ rob_ring_count(qid->reorder_buffer_freelist));
for (i = 0; i < count; i++) {
const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
cq_idx = 0;
cq = qid->cq_map[cq_idx++];
- } while (rte_event_ring_free_count(
- sw->ports[cq].cq_worker_ring) == 0 ||
- sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+ } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
+ rte_event_ring_free_count(
+ sw->ports[cq].cq_worker_ring) == 0);
struct sw_port *p = &sw->ports[cq];
if (sw->cq_ring_space[cq] == 0 ||
p->hist_list[head].qid = qid_id;
if (keep_order)
- rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+ rob_ring_dequeue(qid->reorder_buffer_freelist,
(void *)&p->hist_list[head].rob_entry);
sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
uint32_t pkts_done = 0;
uint32_t count = iq_count(&qid->iq[iq_num]);
- if (count > 0) {
+ if (count >= sw->sched_min_burst) {
if (type == SW_SCHED_TYPE_DIRECT)
pkts_done += sw_schedule_dir_to_cq(sw, qid,
iq_num, count);
for (; qid_start < qid_end; qid_start++) {
struct sw_qid *qid = &sw->qids[qid_start];
- int i, num_entries_in_use;
+ unsigned int i, num_entries_in_use;
if (qid->type != RTE_SCHED_TYPE_ORDERED)
continue;
- num_entries_in_use = rte_ring_free_count(
+ num_entries_in_use = rob_ring_free_count(
qid->reorder_buffer_freelist);
+ if (num_entries_in_use < sw->sched_min_burst)
+ num_entries_in_use = 0;
+
for (i = 0; i < num_entries_in_use; i++) {
struct reorder_buffer_entry *entry;
int j;
if (!entry->ready) {
entry->fragment_index = 0;
- rte_ring_sp_enqueue(
+ rob_ring_enqueue(
qid->reorder_buffer_freelist,
entry);
struct rte_event_ring *worker = port->rx_worker_ring;
port->pp_buf_start = 0;
port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
- RTE_DIM(port->pp_buf), NULL);
+ sw->sched_deq_burst_size, NULL);
}
static __rte_always_inline uint32_t
struct sw_port *port = &sw->ports[port_id];
/* If shadow ring has 0 pkts, pull from worker ring */
- if (port->pp_buf_count == 0)
+ if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
sw_refill_pp_buf(sw, port);
while (port->pp_buf_count) {
struct sw_port *port = &sw->ports[port_id];
/* If shadow ring has 0 pkts, pull from worker ring */
- if (port->pp_buf_count == 0)
+ if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
sw_refill_pp_buf(sw, port);
while (port->pp_buf_count) {
/* Pull from rx_ring for ports */
do {
in_pkts = 0;
- for (i = 0; i < sw->port_count; i++)
+ for (i = 0; i < sw->port_count; i++) {
+ /* ack the unlinks in progress as done */
+ if (sw->ports[i].unlinks_in_progress)
+ sw->ports[i].unlinks_in_progress = 0;
+
if (sw->ports[i].is_directed)
in_pkts += sw_schedule_pull_port_dir(sw, i);
else if (sw->ports[i].num_ordered_qids > 0)
in_pkts += sw_schedule_pull_port_lb(sw, i);
else
in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
+ }
/* QID scan for re-ordered */
in_pkts += sw_schedule_reorder(sw, 0,
/* push all the internal buffered QEs in port->cq_ring to the
* worker cores: aka, do the ring transfers batched.
*/
+ int no_enq = 1;
for (i = 0; i < sw->port_count; i++) {
- struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
- rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
- sw->ports[i].cq_buf_count,
- &sw->cq_ring_space[i]);
- sw->ports[i].cq_buf_count = 0;
+ struct sw_port *port = &sw->ports[i];
+ struct rte_event_ring *worker = port->cq_worker_ring;
+
+ /* If shadow ring has 0 pkts, pull from worker ring */
+ if (sw->refill_once_per_iter && port->pp_buf_count == 0)
+ sw_refill_pp_buf(sw, port);
+
+ if (port->cq_buf_count >= sw->sched_min_burst) {
+ rte_event_ring_enqueue_burst(worker,
+ port->cq_buf,
+ port->cq_buf_count,
+ &sw->cq_ring_space[i]);
+ port->cq_buf_count = 0;
+ no_enq = 0;
+ } else {
+ sw->cq_ring_space[i] =
+ rte_event_ring_free_count(worker) -
+ port->cq_buf_count;
+ }
+ }
+
+ if (no_enq) {
+ if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
+ sw->sched_min_burst = 1;
+ else
+ sw->sched_flush_count++;
+ } else {
+ if (sw->sched_flush_count)
+ sw->sched_flush_count--;
+ else
+ sw->sched_min_burst = sw->sched_min_burst_size;
}
}