1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14 * CLZ twice is faster than caching the value due to data dependencies
16 #define PKT_MASK_TO_IQ(pkts) \
17 (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #define PRIO_TO_IQ(prio) (prio >> 6)
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31 uint32_t iq_num, unsigned int count)
33 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35 uint32_t nb_blocked = 0;
38 if (count > MAX_PER_IQ_DEQUEUE)
39 count = MAX_PER_IQ_DEQUEUE;
41 /* This is the QID ID. The QID ID is static, hence it can be
42 * used to identify the stage of processing in history lists etc
44 uint32_t qid_id = qid->id;
46 iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
47 for (i = 0; i < count; i++) {
48 const struct rte_event *qe = &qes[i];
49 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50 struct sw_fid_t *fid = &qid->fids[flow_id];
54 uint32_t cq_idx = qid->cq_next_tx++;
55 if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
57 cq = qid->cq_map[cq_idx];
60 int cq_free_cnt = sw->cq_ring_space[cq];
61 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
63 int test_cq = qid->cq_map[cq_idx];
64 int test_cq_free = sw->cq_ring_space[test_cq];
65 if (test_cq_free > cq_free_cnt) {
67 cq_free_cnt = test_cq_free;
71 fid->cq = cq; /* this pins early */
74 if (sw->cq_ring_space[cq] == 0 ||
75 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
76 blocked_qes[nb_blocked++] = *qe;
80 struct sw_port *p = &sw->ports[cq];
82 /* at this point we can queue up the packet on the cq_buf */
84 p->cq_buf[p->cq_buf_count++] = *qe;
86 sw->cq_ring_space[cq]--;
88 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
89 p->hist_list[head].fid = flow_id;
90 p->hist_list[head].qid = qid_id;
96 /* if we just filled in the last slot, flush the buffer */
97 if (sw->cq_ring_space[cq] == 0) {
98 struct rte_event_ring *worker = p->cq_worker_ring;
99 rte_event_ring_enqueue_burst(worker, p->cq_buf,
101 &sw->cq_ring_space[cq]);
105 iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
107 return count - nb_blocked;
110 static inline uint32_t
111 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
112 uint32_t iq_num, unsigned int count, int keep_order)
115 uint32_t cq_idx = qid->cq_next_tx;
117 /* This is the QID ID. The QID ID is static, hence it can be
118 * used to identify the stage of processing in history lists etc
120 uint32_t qid_id = qid->id;
122 if (count > MAX_PER_IQ_DEQUEUE)
123 count = MAX_PER_IQ_DEQUEUE;
126 /* only schedule as many as we have reorder buffer entries */
127 count = RTE_MIN(count,
128 rte_ring_count(qid->reorder_buffer_freelist));
130 for (i = 0; i < count; i++) {
131 const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
132 uint32_t cq_check_count = 0;
136 * for parallel, just send to next available CQ in round-robin
137 * fashion. So scan for an available CQ. If all CQs are full
138 * just return and move on to next QID
141 if (++cq_check_count > qid->cq_num_mapped_cqs)
143 cq = qid->cq_map[cq_idx];
144 if (++cq_idx == qid->cq_num_mapped_cqs)
146 } while (rte_event_ring_free_count(
147 sw->ports[cq].cq_worker_ring) == 0 ||
148 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
150 struct sw_port *p = &sw->ports[cq];
151 if (sw->cq_ring_space[cq] == 0 ||
152 p->inflights == SW_PORT_HIST_LIST)
155 sw->cq_ring_space[cq]--;
157 qid->stats.tx_pkts++;
159 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
160 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
161 p->hist_list[head].qid = qid_id;
164 rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
165 (void *)&p->hist_list[head].rob_entry);
167 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
168 iq_ring_pop(qid->iq[iq_num]);
170 rte_compiler_barrier();
176 qid->cq_next_tx = cq_idx;
181 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
182 uint32_t iq_num, unsigned int count __rte_unused)
184 uint32_t cq_id = qid->cq_map[0];
185 struct sw_port *port = &sw->ports[cq_id];
187 /* get max burst enq size for cq_ring */
188 uint32_t count_free = sw->cq_ring_space[cq_id];
192 /* burst dequeue from the QID IQ ring */
193 struct iq_ring *ring = qid->iq[iq_num];
194 uint32_t ret = iq_ring_dequeue_burst(ring,
195 &port->cq_buf[port->cq_buf_count], count_free);
196 port->cq_buf_count += ret;
198 /* Update QID, Port and Total TX stats */
199 qid->stats.tx_pkts += ret;
200 port->stats.tx_pkts += ret;
202 /* Subtract credits from cached value */
203 sw->cq_ring_space[cq_id] -= ret;
209 sw_schedule_qid_to_cq(struct sw_evdev *sw)
214 sw->sched_cq_qid_called++;
216 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
217 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
219 int type = qid->type;
220 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
222 /* zero mapped CQs indicates directed */
223 if (iq_num >= SW_IQS_MAX)
226 uint32_t pkts_done = 0;
227 uint32_t count = iq_ring_count(qid->iq[iq_num]);
230 if (type == SW_SCHED_TYPE_DIRECT)
231 pkts_done += sw_schedule_dir_to_cq(sw, qid,
233 else if (type == RTE_SCHED_TYPE_ATOMIC)
234 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
237 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
239 type == RTE_SCHED_TYPE_ORDERED);
242 /* Check if the IQ that was polled is now empty, and unset it
243 * in the IQ mask if its empty.
245 int all_done = (pkts_done == count);
247 qid->iq_pkt_mask &= ~(all_done << (iq_num));
254 /* This function will perform re-ordering of packets, and injecting into
255 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
256 * contiguous in that array, this function accepts a "range" of QIDs to scan.
259 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
261 /* Perform egress reordering */
262 struct rte_event *qe;
263 uint32_t pkts_iter = 0;
265 for (; qid_start < qid_end; qid_start++) {
266 struct sw_qid *qid = &sw->qids[qid_start];
267 int i, num_entries_in_use;
269 if (qid->type != RTE_SCHED_TYPE_ORDERED)
272 num_entries_in_use = rte_ring_free_count(
273 qid->reorder_buffer_freelist);
275 for (i = 0; i < num_entries_in_use; i++) {
276 struct reorder_buffer_entry *entry;
279 entry = &qid->reorder_buffer[qid->reorder_buffer_index];
284 for (j = 0; j < entry->num_fragments; j++) {
288 int idx = entry->fragment_index + j;
289 qe = &entry->fragments[idx];
291 dest_qid = qe->queue_id;
292 dest_iq = PRIO_TO_IQ(qe->priority);
294 if (dest_qid >= sw->qid_count) {
295 sw->stats.rx_dropped++;
299 struct sw_qid *dest_qid_ptr =
301 const struct iq_ring *dest_iq_ptr =
302 dest_qid_ptr->iq[dest_iq];
303 if (iq_ring_free_count(dest_iq_ptr) == 0)
308 struct sw_qid *q = &sw->qids[dest_qid];
309 struct iq_ring *r = q->iq[dest_iq];
311 /* we checked for space above, so enqueue must
314 iq_ring_enqueue(r, qe);
315 q->iq_pkt_mask |= (1 << (dest_iq));
316 q->iq_pkt_count[dest_iq]++;
320 entry->ready = (j != entry->num_fragments);
321 entry->num_fragments -= j;
322 entry->fragment_index += j;
325 entry->fragment_index = 0;
328 qid->reorder_buffer_freelist,
331 qid->reorder_buffer_index++;
332 qid->reorder_buffer_index %= qid->window_size;
339 static __rte_always_inline void
340 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
343 struct rte_event_ring *worker = port->rx_worker_ring;
344 port->pp_buf_start = 0;
345 port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
346 RTE_DIM(port->pp_buf), NULL);
349 static __rte_always_inline uint32_t
350 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
352 static struct reorder_buffer_entry dummy_rob;
353 uint32_t pkts_iter = 0;
354 struct sw_port *port = &sw->ports[port_id];
356 /* If shadow ring has 0 pkts, pull from worker ring */
357 if (port->pp_buf_count == 0)
358 sw_refill_pp_buf(sw, port);
360 while (port->pp_buf_count) {
361 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
362 struct sw_hist_list_entry *hist_entry = NULL;
363 uint8_t flags = qe->op;
364 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
365 int needs_reorder = 0;
366 /* if no-reordering, having PARTIAL == NEW */
367 if (!allow_reorder && !eop)
368 flags = QE_FLAG_VALID;
371 * if we don't have space for this packet in an IQ,
372 * then move on to next queue. Technically, for a
373 * packet that needs reordering, we don't need to check
374 * here, but it simplifies things not to special-case
376 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
377 struct sw_qid *qid = &sw->qids[qe->queue_id];
379 if ((flags & QE_FLAG_VALID) &&
380 iq_ring_free_count(qid->iq[iq_num]) == 0)
383 /* now process based on flags. Note that for directed
384 * queues, the enqueue_flush masks off all but the
385 * valid flag. This makes FWD and PARTIAL enqueues just
386 * NEW type, and makes DROPS no-op calls.
388 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
389 const uint32_t hist_tail = port->hist_tail &
390 (SW_PORT_HIST_LIST - 1);
392 hist_entry = &port->hist_list[hist_tail];
393 const uint32_t hist_qid = hist_entry->qid;
394 const uint32_t hist_fid = hist_entry->fid;
396 struct sw_fid_t *fid =
397 &sw->qids[hist_qid].fids[hist_fid];
399 if (fid->pcount == 0)
403 /* set reorder ready if an ordered QID */
405 (uintptr_t)hist_entry->rob_entry;
406 const uintptr_t valid = (rob_ptr != 0);
407 needs_reorder = valid;
409 ((valid - 1) & (uintptr_t)&dummy_rob);
410 struct reorder_buffer_entry *tmp_rob_ptr =
411 (struct reorder_buffer_entry *)rob_ptr;
412 tmp_rob_ptr->ready = eop * needs_reorder;
415 port->inflights -= eop;
416 port->hist_tail += eop;
418 if (flags & QE_FLAG_VALID) {
419 port->stats.rx_pkts++;
421 if (allow_reorder && needs_reorder) {
422 struct reorder_buffer_entry *rob_entry =
423 hist_entry->rob_entry;
425 hist_entry->rob_entry = NULL;
426 /* Although fragmentation not currently
427 * supported by eventdev API, we support it
428 * here. Open: How do we alert the user that
429 * they've exceeded max frags?
431 int num_frag = rob_entry->num_fragments;
432 if (num_frag == SW_FRAGMENTS_MAX)
433 sw->stats.rx_dropped++;
435 int idx = rob_entry->num_fragments++;
436 rob_entry->fragments[idx] = *qe;
441 /* Use the iq_num from above to push the QE
442 * into the qid at the right priority
445 qid->iq_pkt_mask |= (1 << (iq_num));
446 iq_ring_enqueue(qid->iq[iq_num], qe);
447 qid->iq_pkt_count[iq_num]++;
448 qid->stats.rx_pkts++;
453 port->pp_buf_start++;
454 port->pp_buf_count--;
455 } /* while (avail_qes) */
461 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
463 return __pull_port_lb(sw, port_id, 1);
467 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
469 return __pull_port_lb(sw, port_id, 0);
473 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
475 uint32_t pkts_iter = 0;
476 struct sw_port *port = &sw->ports[port_id];
478 /* If shadow ring has 0 pkts, pull from worker ring */
479 if (port->pp_buf_count == 0)
480 sw_refill_pp_buf(sw, port);
482 while (port->pp_buf_count) {
483 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
484 uint8_t flags = qe->op;
486 if ((flags & QE_FLAG_VALID) == 0)
489 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
490 struct sw_qid *qid = &sw->qids[qe->queue_id];
491 struct iq_ring *iq_ring = qid->iq[iq_num];
493 if (iq_ring_free_count(iq_ring) == 0)
494 break; /* move to next port */
496 port->stats.rx_pkts++;
498 /* Use the iq_num from above to push the QE
499 * into the qid at the right priority
501 qid->iq_pkt_mask |= (1 << (iq_num));
502 iq_ring_enqueue(iq_ring, qe);
503 qid->iq_pkt_count[iq_num]++;
504 qid->stats.rx_pkts++;
508 port->pp_buf_start++;
509 port->pp_buf_count--;
510 } /* while port->pp_buf_count */
516 sw_event_schedule(struct rte_eventdev *dev)
518 struct sw_evdev *sw = sw_pmd_priv(dev);
519 uint32_t in_pkts, out_pkts;
520 uint32_t out_pkts_total = 0, in_pkts_total = 0;
521 int32_t sched_quanta = sw->sched_quanta;
529 uint32_t in_pkts_this_iteration = 0;
531 /* Pull from rx_ring for ports */
534 for (i = 0; i < sw->port_count; i++)
535 if (sw->ports[i].is_directed)
536 in_pkts += sw_schedule_pull_port_dir(sw, i);
537 else if (sw->ports[i].num_ordered_qids > 0)
538 in_pkts += sw_schedule_pull_port_lb(sw, i);
540 in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
542 /* QID scan for re-ordered */
543 in_pkts += sw_schedule_reorder(sw, 0,
545 in_pkts_this_iteration += in_pkts;
546 } while (in_pkts > 4 &&
547 (int)in_pkts_this_iteration < sched_quanta);
550 out_pkts += sw_schedule_qid_to_cq(sw);
551 out_pkts_total += out_pkts;
552 in_pkts_total += in_pkts_this_iteration;
554 if (in_pkts == 0 && out_pkts == 0)
556 } while ((int)out_pkts_total < sched_quanta);
558 /* push all the internal buffered QEs in port->cq_ring to the
559 * worker cores: aka, do the ring transfers batched.
561 for (i = 0; i < sw->port_count; i++) {
562 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
563 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
564 sw->ports[i].cq_buf_count,
565 &sw->cq_ring_space[i]);
566 sw->ports[i].cq_buf_count = 0;
569 sw->stats.tx_pkts += out_pkts_total;
570 sw->stats.rx_pkts += in_pkts_total;
572 sw->sched_no_iq_enqueues += (in_pkts_total == 0);
573 sw->sched_no_cq_enqueues += (out_pkts_total == 0);