4 * Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in
14 * the documentation and/or other materials provided with the
16 * * Neither the name of Intel Corporation nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include <rte_hash_crc.h>
37 #include "event_ring.h"
39 #define SW_IQS_MASK (SW_IQS_MAX-1)
41 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
42 * CLZ twice is faster than caching the value due to data dependencies
44 #define PKT_MASK_TO_IQ(pkts) \
45 (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
48 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
50 #define PRIO_TO_IQ(prio) (prio >> 6)
52 #define MAX_PER_IQ_DEQUEUE 48
53 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
55 static inline uint32_t
56 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
57 uint32_t iq_num, unsigned int count)
59 struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
60 struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
61 uint32_t nb_blocked = 0;
64 if (count > MAX_PER_IQ_DEQUEUE)
65 count = MAX_PER_IQ_DEQUEUE;
67 /* This is the QID ID. The QID ID is static, hence it can be
68 * used to identify the stage of processing in history lists etc
70 uint32_t qid_id = qid->id;
72 iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
73 for (i = 0; i < count; i++) {
74 const struct rte_event *qe = &qes[i];
75 /* use cheap bit mixing, we only need to lose a few bits */
76 uint32_t flow_id32 = (qes[i].flow_id) ^ (qes[i].flow_id >> 10);
77 const uint16_t flow_id = FLOWID_MASK & flow_id32;
78 struct sw_fid_t *fid = &qid->fids[flow_id];
82 uint32_t cq_idx = qid->cq_next_tx++;
83 if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
85 cq = qid->cq_map[cq_idx];
88 int cq_free_cnt = sw->cq_ring_space[cq];
89 for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
91 int test_cq = qid->cq_map[cq_idx];
92 int test_cq_free = sw->cq_ring_space[test_cq];
93 if (test_cq_free > cq_free_cnt) {
95 cq_free_cnt = test_cq_free;
99 fid->cq = cq; /* this pins early */
102 if (sw->cq_ring_space[cq] == 0 ||
103 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
104 blocked_qes[nb_blocked++] = *qe;
108 struct sw_port *p = &sw->ports[cq];
110 /* at this point we can queue up the packet on the cq_buf */
112 p->cq_buf[p->cq_buf_count++] = *qe;
114 sw->cq_ring_space[cq]--;
116 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
117 p->hist_list[head].fid = flow_id;
118 p->hist_list[head].qid = qid_id;
121 qid->stats.tx_pkts++;
123 /* if we just filled in the last slot, flush the buffer */
124 if (sw->cq_ring_space[cq] == 0) {
125 struct qe_ring *worker = p->cq_worker_ring;
126 qe_ring_enqueue_burst(worker, p->cq_buf,
128 &sw->cq_ring_space[cq]);
132 iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
134 return count - nb_blocked;
137 static inline uint32_t
138 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
139 uint32_t iq_num, unsigned int count, int keep_order)
142 uint32_t cq_idx = qid->cq_next_tx;
144 /* This is the QID ID. The QID ID is static, hence it can be
145 * used to identify the stage of processing in history lists etc
147 uint32_t qid_id = qid->id;
149 if (count > MAX_PER_IQ_DEQUEUE)
150 count = MAX_PER_IQ_DEQUEUE;
153 /* only schedule as many as we have reorder buffer entries */
154 count = RTE_MIN(count,
155 rte_ring_count(qid->reorder_buffer_freelist));
157 for (i = 0; i < count; i++) {
158 const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
159 uint32_t cq_check_count = 0;
163 * for parallel, just send to next available CQ in round-robin
164 * fashion. So scan for an available CQ. If all CQs are full
165 * just return and move on to next QID
168 if (++cq_check_count > qid->cq_num_mapped_cqs)
170 cq = qid->cq_map[cq_idx];
171 if (++cq_idx == qid->cq_num_mapped_cqs)
173 } while (qe_ring_free_count(sw->ports[cq].cq_worker_ring) == 0 ||
174 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
176 struct sw_port *p = &sw->ports[cq];
177 if (sw->cq_ring_space[cq] == 0 ||
178 p->inflights == SW_PORT_HIST_LIST)
181 sw->cq_ring_space[cq]--;
183 qid->stats.tx_pkts++;
185 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
187 p->hist_list[head].fid = qe->flow_id;
188 p->hist_list[head].qid = qid_id;
191 rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
192 (void *)&p->hist_list[head].rob_entry);
194 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
195 iq_ring_pop(qid->iq[iq_num]);
197 rte_compiler_barrier();
203 qid->cq_next_tx = cq_idx;
208 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
209 uint32_t iq_num, unsigned int count __rte_unused)
211 uint32_t cq_id = qid->cq_map[0];
212 struct sw_port *port = &sw->ports[cq_id];
214 /* get max burst enq size for cq_ring */
215 uint32_t count_free = sw->cq_ring_space[cq_id];
219 /* burst dequeue from the QID IQ ring */
220 struct iq_ring *ring = qid->iq[iq_num];
221 uint32_t ret = iq_ring_dequeue_burst(ring,
222 &port->cq_buf[port->cq_buf_count], count_free);
223 port->cq_buf_count += ret;
225 /* Update QID, Port and Total TX stats */
226 qid->stats.tx_pkts += ret;
227 port->stats.tx_pkts += ret;
229 /* Subtract credits from cached value */
230 sw->cq_ring_space[cq_id] -= ret;
236 sw_schedule_qid_to_cq(struct sw_evdev *sw)
241 sw->sched_cq_qid_called++;
243 for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
244 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
246 int type = qid->type;
247 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
249 /* zero mapped CQs indicates directed */
250 if (iq_num >= SW_IQS_MAX)
253 uint32_t pkts_done = 0;
254 uint32_t count = iq_ring_count(qid->iq[iq_num]);
257 if (type == SW_SCHED_TYPE_DIRECT)
258 pkts_done += sw_schedule_dir_to_cq(sw, qid,
260 else if (type == RTE_SCHED_TYPE_ATOMIC)
261 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
264 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
266 type == RTE_SCHED_TYPE_ORDERED);
269 /* Check if the IQ that was polled is now empty, and unset it
270 * in the IQ mask if its empty.
272 int all_done = (pkts_done == count);
274 qid->iq_pkt_mask &= ~(all_done << (iq_num));
281 /* This function will perform re-ordering of packets, and injecting into
282 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
283 * contiguous in that array, this function accepts a "range" of QIDs to scan.
286 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
288 /* Perform egress reordering */
289 struct rte_event *qe;
290 uint32_t pkts_iter = 0;
292 for (; qid_start < qid_end; qid_start++) {
293 struct sw_qid *qid = &sw->qids[qid_start];
294 int i, num_entries_in_use;
296 if (qid->type != RTE_SCHED_TYPE_ORDERED)
299 num_entries_in_use = rte_ring_free_count(
300 qid->reorder_buffer_freelist);
302 for (i = 0; i < num_entries_in_use; i++) {
303 struct reorder_buffer_entry *entry;
306 entry = &qid->reorder_buffer[qid->reorder_buffer_index];
311 for (j = 0; j < entry->num_fragments; j++) {
315 int idx = entry->fragment_index + j;
316 qe = &entry->fragments[idx];
318 dest_qid = qe->queue_id;
319 dest_iq = PRIO_TO_IQ(qe->priority);
321 if (dest_qid >= sw->qid_count) {
322 sw->stats.rx_dropped++;
326 struct sw_qid *dest_qid_ptr =
328 const struct iq_ring *dest_iq_ptr =
329 dest_qid_ptr->iq[dest_iq];
330 if (iq_ring_free_count(dest_iq_ptr) == 0)
335 struct sw_qid *q = &sw->qids[dest_qid];
336 struct iq_ring *r = q->iq[dest_iq];
338 /* we checked for space above, so enqueue must
341 iq_ring_enqueue(r, qe);
342 q->iq_pkt_mask |= (1 << (dest_iq));
343 q->iq_pkt_count[dest_iq]++;
347 entry->ready = (j != entry->num_fragments);
348 entry->num_fragments -= j;
349 entry->fragment_index += j;
352 entry->fragment_index = 0;
355 qid->reorder_buffer_freelist,
358 qid->reorder_buffer_index++;
359 qid->reorder_buffer_index %= qid->window_size;
366 static inline void __attribute__((always_inline))
367 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
370 struct qe_ring *worker = port->rx_worker_ring;
371 port->pp_buf_start = 0;
372 port->pp_buf_count = qe_ring_dequeue_burst(worker, port->pp_buf,
373 RTE_DIM(port->pp_buf));
376 static inline uint32_t __attribute__((always_inline))
377 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
379 static const struct reorder_buffer_entry dummy_rob;
380 uint32_t pkts_iter = 0;
381 struct sw_port *port = &sw->ports[port_id];
383 /* If shadow ring has 0 pkts, pull from worker ring */
384 if (port->pp_buf_count == 0)
385 sw_refill_pp_buf(sw, port);
387 while (port->pp_buf_count) {
388 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
389 struct sw_hist_list_entry *hist_entry = NULL;
390 uint8_t flags = qe->op;
391 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
392 int needs_reorder = 0;
393 /* if no-reordering, having PARTIAL == NEW */
394 if (!allow_reorder && !eop)
395 flags = QE_FLAG_VALID;
398 * if we don't have space for this packet in an IQ,
399 * then move on to next queue. Technically, for a
400 * packet that needs reordering, we don't need to check
401 * here, but it simplifies things not to special-case
403 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
404 struct sw_qid *qid = &sw->qids[qe->queue_id];
406 if ((flags & QE_FLAG_VALID) &&
407 iq_ring_free_count(qid->iq[iq_num]) == 0)
410 /* now process based on flags. Note that for directed
411 * queues, the enqueue_flush masks off all but the
412 * valid flag. This makes FWD and PARTIAL enqueues just
413 * NEW type, and makes DROPS no-op calls.
415 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
416 const uint32_t hist_tail = port->hist_tail &
417 (SW_PORT_HIST_LIST - 1);
419 hist_entry = &port->hist_list[hist_tail];
420 const uint32_t hist_qid = hist_entry->qid;
421 const uint32_t hist_fid = hist_entry->fid;
423 struct sw_fid_t *fid =
424 &sw->qids[hist_qid].fids[hist_fid];
426 if (fid->pcount == 0)
430 /* set reorder ready if an ordered QID */
432 (uintptr_t)hist_entry->rob_entry;
433 const uintptr_t valid = (rob_ptr != 0);
434 needs_reorder = valid;
436 ((valid - 1) & (uintptr_t)&dummy_rob);
437 struct reorder_buffer_entry *tmp_rob_ptr =
438 (struct reorder_buffer_entry *)rob_ptr;
439 tmp_rob_ptr->ready = eop * needs_reorder;
442 port->inflights -= eop;
443 port->hist_tail += eop;
445 if (flags & QE_FLAG_VALID) {
446 port->stats.rx_pkts++;
448 if (allow_reorder && needs_reorder) {
449 struct reorder_buffer_entry *rob_entry =
450 hist_entry->rob_entry;
452 /* Although fragmentation not currently
453 * supported by eventdev API, we support it
454 * here. Open: How do we alert the user that
455 * they've exceeded max frags?
457 int num_frag = rob_entry->num_fragments;
458 if (num_frag == SW_FRAGMENTS_MAX)
459 sw->stats.rx_dropped++;
461 int idx = rob_entry->num_fragments++;
462 rob_entry->fragments[idx] = *qe;
467 /* Use the iq_num from above to push the QE
468 * into the qid at the right priority
471 qid->iq_pkt_mask |= (1 << (iq_num));
472 iq_ring_enqueue(qid->iq[iq_num], qe);
473 qid->iq_pkt_count[iq_num]++;
474 qid->stats.rx_pkts++;
479 port->pp_buf_start++;
480 port->pp_buf_count--;
481 } /* while (avail_qes) */
487 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
489 return __pull_port_lb(sw, port_id, 1);
493 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
495 return __pull_port_lb(sw, port_id, 0);
499 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
501 uint32_t pkts_iter = 0;
502 struct sw_port *port = &sw->ports[port_id];
504 /* If shadow ring has 0 pkts, pull from worker ring */
505 if (port->pp_buf_count == 0)
506 sw_refill_pp_buf(sw, port);
508 while (port->pp_buf_count) {
509 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
510 uint8_t flags = qe->op;
512 if ((flags & QE_FLAG_VALID) == 0)
515 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
516 struct sw_qid *qid = &sw->qids[qe->queue_id];
517 struct iq_ring *iq_ring = qid->iq[iq_num];
519 if (iq_ring_free_count(iq_ring) == 0)
520 break; /* move to next port */
522 port->stats.rx_pkts++;
524 /* Use the iq_num from above to push the QE
525 * into the qid at the right priority
527 qid->iq_pkt_mask |= (1 << (iq_num));
528 iq_ring_enqueue(iq_ring, qe);
529 qid->iq_pkt_count[iq_num]++;
530 qid->stats.rx_pkts++;
534 port->pp_buf_start++;
535 port->pp_buf_count--;
536 } /* while port->pp_buf_count */
542 sw_event_schedule(struct rte_eventdev *dev)
544 struct sw_evdev *sw = sw_pmd_priv(dev);
545 uint32_t in_pkts, out_pkts;
546 uint32_t out_pkts_total = 0, in_pkts_total = 0;
547 int32_t sched_quanta = sw->sched_quanta;
555 uint32_t in_pkts_this_iteration = 0;
557 /* Pull from rx_ring for ports */
560 for (i = 0; i < sw->port_count; i++)
561 if (sw->ports[i].is_directed)
562 in_pkts += sw_schedule_pull_port_dir(sw, i);
563 else if (sw->ports[i].num_ordered_qids > 0)
564 in_pkts += sw_schedule_pull_port_lb(sw, i);
566 in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
568 /* QID scan for re-ordered */
569 in_pkts += sw_schedule_reorder(sw, 0,
571 in_pkts_this_iteration += in_pkts;
572 } while (in_pkts > 4 &&
573 (int)in_pkts_this_iteration < sched_quanta);
576 out_pkts += sw_schedule_qid_to_cq(sw);
577 out_pkts_total += out_pkts;
578 in_pkts_total += in_pkts_this_iteration;
580 if (in_pkts == 0 && out_pkts == 0)
582 } while ((int)out_pkts_total < sched_quanta);
584 /* push all the internal buffered QEs in port->cq_ring to the
585 * worker cores: aka, do the ring transfers batched.
587 for (i = 0; i < sw->port_count; i++) {
588 struct qe_ring *worker = sw->ports[i].cq_worker_ring;
589 qe_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
590 sw->ports[i].cq_buf_count,
591 &sw->cq_ring_space[i]);
592 sw->ports[i].cq_buf_count = 0;
595 sw->stats.tx_pkts += out_pkts_total;
596 sw->stats.rx_pkts += in_pkts_total;
598 sw->sched_no_iq_enqueues += (in_pkts_total == 0);
599 sw->sched_no_cq_enqueues += (out_pkts_total == 0);