power: clean common header
[dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_ring.h"
10
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
12
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14  * CLZ twice is faster than caching the value due to data dependencies
15  */
16 #define PKT_MASK_TO_IQ(pkts) \
17         (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
18
19 #if SW_IQS_MAX != 4
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
21 #endif
22 #define PRIO_TO_IQ(prio) (prio >> 6)
23
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
28
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
31                 uint32_t iq_num, unsigned int count)
32 {
33         struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
34         struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
35         uint32_t nb_blocked = 0;
36         uint32_t i;
37
38         if (count > MAX_PER_IQ_DEQUEUE)
39                 count = MAX_PER_IQ_DEQUEUE;
40
41         /* This is the QID ID. The QID ID is static, hence it can be
42          * used to identify the stage of processing in history lists etc
43          */
44         uint32_t qid_id = qid->id;
45
46         iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
47         for (i = 0; i < count; i++) {
48                 const struct rte_event *qe = &qes[i];
49                 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
50                 struct sw_fid_t *fid = &qid->fids[flow_id];
51                 int cq = fid->cq;
52
53                 if (cq < 0) {
54                         uint32_t cq_idx = qid->cq_next_tx++;
55                         if (qid->cq_next_tx == qid->cq_num_mapped_cqs)
56                                 qid->cq_next_tx = 0;
57                         cq = qid->cq_map[cq_idx];
58
59                         /* find least used */
60                         int cq_free_cnt = sw->cq_ring_space[cq];
61                         for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
62                                         cq_idx++) {
63                                 int test_cq = qid->cq_map[cq_idx];
64                                 int test_cq_free = sw->cq_ring_space[test_cq];
65                                 if (test_cq_free > cq_free_cnt) {
66                                         cq = test_cq;
67                                         cq_free_cnt = test_cq_free;
68                                 }
69                         }
70
71                         fid->cq = cq; /* this pins early */
72                 }
73
74                 if (sw->cq_ring_space[cq] == 0 ||
75                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
76                         blocked_qes[nb_blocked++] = *qe;
77                         continue;
78                 }
79
80                 struct sw_port *p = &sw->ports[cq];
81
82                 /* at this point we can queue up the packet on the cq_buf */
83                 fid->pcount++;
84                 p->cq_buf[p->cq_buf_count++] = *qe;
85                 p->inflights++;
86                 sw->cq_ring_space[cq]--;
87
88                 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
89                 p->hist_list[head].fid = flow_id;
90                 p->hist_list[head].qid = qid_id;
91
92                 p->stats.tx_pkts++;
93                 qid->stats.tx_pkts++;
94                 qid->to_port[cq]++;
95
96                 /* if we just filled in the last slot, flush the buffer */
97                 if (sw->cq_ring_space[cq] == 0) {
98                         struct rte_event_ring *worker = p->cq_worker_ring;
99                         rte_event_ring_enqueue_burst(worker, p->cq_buf,
100                                         p->cq_buf_count,
101                                         &sw->cq_ring_space[cq]);
102                         p->cq_buf_count = 0;
103                 }
104         }
105         iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
106
107         return count - nb_blocked;
108 }
109
110 static inline uint32_t
111 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
112                 uint32_t iq_num, unsigned int count, int keep_order)
113 {
114         uint32_t i;
115         uint32_t cq_idx = qid->cq_next_tx;
116
117         /* This is the QID ID. The QID ID is static, hence it can be
118          * used to identify the stage of processing in history lists etc
119          */
120         uint32_t qid_id = qid->id;
121
122         if (count > MAX_PER_IQ_DEQUEUE)
123                 count = MAX_PER_IQ_DEQUEUE;
124
125         if (keep_order)
126                 /* only schedule as many as we have reorder buffer entries */
127                 count = RTE_MIN(count,
128                                 rte_ring_count(qid->reorder_buffer_freelist));
129
130         for (i = 0; i < count; i++) {
131                 const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
132                 uint32_t cq_check_count = 0;
133                 uint32_t cq;
134
135                 /*
136                  *  for parallel, just send to next available CQ in round-robin
137                  * fashion. So scan for an available CQ. If all CQs are full
138                  * just return and move on to next QID
139                  */
140                 do {
141                         if (++cq_check_count > qid->cq_num_mapped_cqs)
142                                 goto exit;
143                         cq = qid->cq_map[cq_idx];
144                         if (++cq_idx == qid->cq_num_mapped_cqs)
145                                 cq_idx = 0;
146                 } while (rte_event_ring_free_count(
147                                 sw->ports[cq].cq_worker_ring) == 0 ||
148                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST);
149
150                 struct sw_port *p = &sw->ports[cq];
151                 if (sw->cq_ring_space[cq] == 0 ||
152                                 p->inflights == SW_PORT_HIST_LIST)
153                         break;
154
155                 sw->cq_ring_space[cq]--;
156
157                 qid->stats.tx_pkts++;
158
159                 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
160                 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
161                 p->hist_list[head].qid = qid_id;
162
163                 if (keep_order)
164                         rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
165                                         (void *)&p->hist_list[head].rob_entry);
166
167                 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
168                 iq_ring_pop(qid->iq[iq_num]);
169
170                 rte_compiler_barrier();
171                 p->inflights++;
172                 p->stats.tx_pkts++;
173                 p->hist_head++;
174         }
175 exit:
176         qid->cq_next_tx = cq_idx;
177         return i;
178 }
179
180 static uint32_t
181 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
182                 uint32_t iq_num, unsigned int count __rte_unused)
183 {
184         uint32_t cq_id = qid->cq_map[0];
185         struct sw_port *port = &sw->ports[cq_id];
186
187         /* get max burst enq size for cq_ring */
188         uint32_t count_free = sw->cq_ring_space[cq_id];
189         if (count_free == 0)
190                 return 0;
191
192         /* burst dequeue from the QID IQ ring */
193         struct iq_ring *ring = qid->iq[iq_num];
194         uint32_t ret = iq_ring_dequeue_burst(ring,
195                         &port->cq_buf[port->cq_buf_count], count_free);
196         port->cq_buf_count += ret;
197
198         /* Update QID, Port and Total TX stats */
199         qid->stats.tx_pkts += ret;
200         port->stats.tx_pkts += ret;
201
202         /* Subtract credits from cached value */
203         sw->cq_ring_space[cq_id] -= ret;
204
205         return ret;
206 }
207
208 static uint32_t
209 sw_schedule_qid_to_cq(struct sw_evdev *sw)
210 {
211         uint32_t pkts = 0;
212         uint32_t qid_idx;
213
214         sw->sched_cq_qid_called++;
215
216         for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
217                 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
218
219                 int type = qid->type;
220                 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
221
222                 /* zero mapped CQs indicates directed */
223                 if (iq_num >= SW_IQS_MAX)
224                         continue;
225
226                 uint32_t pkts_done = 0;
227                 uint32_t count = iq_ring_count(qid->iq[iq_num]);
228
229                 if (count > 0) {
230                         if (type == SW_SCHED_TYPE_DIRECT)
231                                 pkts_done += sw_schedule_dir_to_cq(sw, qid,
232                                                 iq_num, count);
233                         else if (type == RTE_SCHED_TYPE_ATOMIC)
234                                 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
235                                                 iq_num, count);
236                         else
237                                 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
238                                                 iq_num, count,
239                                                 type == RTE_SCHED_TYPE_ORDERED);
240                 }
241
242                 /* Check if the IQ that was polled is now empty, and unset it
243                  * in the IQ mask if its empty.
244                  */
245                 int all_done = (pkts_done == count);
246
247                 qid->iq_pkt_mask &= ~(all_done << (iq_num));
248                 pkts += pkts_done;
249         }
250
251         return pkts;
252 }
253
254 /* This function will perform re-ordering of packets, and injecting into
255  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
256  * contiguous in that array, this function accepts a "range" of QIDs to scan.
257  */
258 static uint16_t
259 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
260 {
261         /* Perform egress reordering */
262         struct rte_event *qe;
263         uint32_t pkts_iter = 0;
264
265         for (; qid_start < qid_end; qid_start++) {
266                 struct sw_qid *qid = &sw->qids[qid_start];
267                 int i, num_entries_in_use;
268
269                 if (qid->type != RTE_SCHED_TYPE_ORDERED)
270                         continue;
271
272                 num_entries_in_use = rte_ring_free_count(
273                                         qid->reorder_buffer_freelist);
274
275                 for (i = 0; i < num_entries_in_use; i++) {
276                         struct reorder_buffer_entry *entry;
277                         int j;
278
279                         entry = &qid->reorder_buffer[qid->reorder_buffer_index];
280
281                         if (!entry->ready)
282                                 break;
283
284                         for (j = 0; j < entry->num_fragments; j++) {
285                                 uint16_t dest_qid;
286                                 uint16_t dest_iq;
287
288                                 int idx = entry->fragment_index + j;
289                                 qe = &entry->fragments[idx];
290
291                                 dest_qid = qe->queue_id;
292                                 dest_iq  = PRIO_TO_IQ(qe->priority);
293
294                                 if (dest_qid >= sw->qid_count) {
295                                         sw->stats.rx_dropped++;
296                                         continue;
297                                 }
298
299                                 struct sw_qid *dest_qid_ptr =
300                                         &sw->qids[dest_qid];
301                                 const struct iq_ring *dest_iq_ptr =
302                                         dest_qid_ptr->iq[dest_iq];
303                                 if (iq_ring_free_count(dest_iq_ptr) == 0)
304                                         break;
305
306                                 pkts_iter++;
307
308                                 struct sw_qid *q = &sw->qids[dest_qid];
309                                 struct iq_ring *r = q->iq[dest_iq];
310
311                                 /* we checked for space above, so enqueue must
312                                  * succeed
313                                  */
314                                 iq_ring_enqueue(r, qe);
315                                 q->iq_pkt_mask |= (1 << (dest_iq));
316                                 q->iq_pkt_count[dest_iq]++;
317                                 q->stats.rx_pkts++;
318                         }
319
320                         entry->ready = (j != entry->num_fragments);
321                         entry->num_fragments -= j;
322                         entry->fragment_index += j;
323
324                         if (!entry->ready) {
325                                 entry->fragment_index = 0;
326
327                                 rte_ring_sp_enqueue(
328                                                 qid->reorder_buffer_freelist,
329                                                 entry);
330
331                                 qid->reorder_buffer_index++;
332                                 qid->reorder_buffer_index %= qid->window_size;
333                         }
334                 }
335         }
336         return pkts_iter;
337 }
338
339 static __rte_always_inline void
340 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
341 {
342         RTE_SET_USED(sw);
343         struct rte_event_ring *worker = port->rx_worker_ring;
344         port->pp_buf_start = 0;
345         port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
346                         RTE_DIM(port->pp_buf), NULL);
347 }
348
349 static __rte_always_inline uint32_t
350 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
351 {
352         static struct reorder_buffer_entry dummy_rob;
353         uint32_t pkts_iter = 0;
354         struct sw_port *port = &sw->ports[port_id];
355
356         /* If shadow ring has 0 pkts, pull from worker ring */
357         if (port->pp_buf_count == 0)
358                 sw_refill_pp_buf(sw, port);
359
360         while (port->pp_buf_count) {
361                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
362                 struct sw_hist_list_entry *hist_entry = NULL;
363                 uint8_t flags = qe->op;
364                 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
365                 int needs_reorder = 0;
366                 /* if no-reordering, having PARTIAL == NEW */
367                 if (!allow_reorder && !eop)
368                         flags = QE_FLAG_VALID;
369
370                 /*
371                  * if we don't have space for this packet in an IQ,
372                  * then move on to next queue. Technically, for a
373                  * packet that needs reordering, we don't need to check
374                  * here, but it simplifies things not to special-case
375                  */
376                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
377                 struct sw_qid *qid = &sw->qids[qe->queue_id];
378
379                 if ((flags & QE_FLAG_VALID) &&
380                                 iq_ring_free_count(qid->iq[iq_num]) == 0)
381                         break;
382
383                 /* now process based on flags. Note that for directed
384                  * queues, the enqueue_flush masks off all but the
385                  * valid flag. This makes FWD and PARTIAL enqueues just
386                  * NEW type, and makes DROPS no-op calls.
387                  */
388                 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
389                         const uint32_t hist_tail = port->hist_tail &
390                                         (SW_PORT_HIST_LIST - 1);
391
392                         hist_entry = &port->hist_list[hist_tail];
393                         const uint32_t hist_qid = hist_entry->qid;
394                         const uint32_t hist_fid = hist_entry->fid;
395
396                         struct sw_fid_t *fid =
397                                 &sw->qids[hist_qid].fids[hist_fid];
398                         fid->pcount -= eop;
399                         if (fid->pcount == 0)
400                                 fid->cq = -1;
401
402                         if (allow_reorder) {
403                                 /* set reorder ready if an ordered QID */
404                                 uintptr_t rob_ptr =
405                                         (uintptr_t)hist_entry->rob_entry;
406                                 const uintptr_t valid = (rob_ptr != 0);
407                                 needs_reorder = valid;
408                                 rob_ptr |=
409                                         ((valid - 1) & (uintptr_t)&dummy_rob);
410                                 struct reorder_buffer_entry *tmp_rob_ptr =
411                                         (struct reorder_buffer_entry *)rob_ptr;
412                                 tmp_rob_ptr->ready = eop * needs_reorder;
413                         }
414
415                         port->inflights -= eop;
416                         port->hist_tail += eop;
417                 }
418                 if (flags & QE_FLAG_VALID) {
419                         port->stats.rx_pkts++;
420
421                         if (allow_reorder && needs_reorder) {
422                                 struct reorder_buffer_entry *rob_entry =
423                                                 hist_entry->rob_entry;
424
425                                 hist_entry->rob_entry = NULL;
426                                 /* Although fragmentation not currently
427                                  * supported by eventdev API, we support it
428                                  * here. Open: How do we alert the user that
429                                  * they've exceeded max frags?
430                                  */
431                                 int num_frag = rob_entry->num_fragments;
432                                 if (num_frag == SW_FRAGMENTS_MAX)
433                                         sw->stats.rx_dropped++;
434                                 else {
435                                         int idx = rob_entry->num_fragments++;
436                                         rob_entry->fragments[idx] = *qe;
437                                 }
438                                 goto end_qe;
439                         }
440
441                         /* Use the iq_num from above to push the QE
442                          * into the qid at the right priority
443                          */
444
445                         qid->iq_pkt_mask |= (1 << (iq_num));
446                         iq_ring_enqueue(qid->iq[iq_num], qe);
447                         qid->iq_pkt_count[iq_num]++;
448                         qid->stats.rx_pkts++;
449                         pkts_iter++;
450                 }
451
452 end_qe:
453                 port->pp_buf_start++;
454                 port->pp_buf_count--;
455         } /* while (avail_qes) */
456
457         return pkts_iter;
458 }
459
460 static uint32_t
461 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
462 {
463         return __pull_port_lb(sw, port_id, 1);
464 }
465
466 static uint32_t
467 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
468 {
469         return __pull_port_lb(sw, port_id, 0);
470 }
471
472 static uint32_t
473 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
474 {
475         uint32_t pkts_iter = 0;
476         struct sw_port *port = &sw->ports[port_id];
477
478         /* If shadow ring has 0 pkts, pull from worker ring */
479         if (port->pp_buf_count == 0)
480                 sw_refill_pp_buf(sw, port);
481
482         while (port->pp_buf_count) {
483                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
484                 uint8_t flags = qe->op;
485
486                 if ((flags & QE_FLAG_VALID) == 0)
487                         goto end_qe;
488
489                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
490                 struct sw_qid *qid = &sw->qids[qe->queue_id];
491                 struct iq_ring *iq_ring = qid->iq[iq_num];
492
493                 if (iq_ring_free_count(iq_ring) == 0)
494                         break; /* move to next port */
495
496                 port->stats.rx_pkts++;
497
498                 /* Use the iq_num from above to push the QE
499                  * into the qid at the right priority
500                  */
501                 qid->iq_pkt_mask |= (1 << (iq_num));
502                 iq_ring_enqueue(iq_ring, qe);
503                 qid->iq_pkt_count[iq_num]++;
504                 qid->stats.rx_pkts++;
505                 pkts_iter++;
506
507 end_qe:
508                 port->pp_buf_start++;
509                 port->pp_buf_count--;
510         } /* while port->pp_buf_count */
511
512         return pkts_iter;
513 }
514
515 void
516 sw_event_schedule(struct rte_eventdev *dev)
517 {
518         struct sw_evdev *sw = sw_pmd_priv(dev);
519         uint32_t in_pkts, out_pkts;
520         uint32_t out_pkts_total = 0, in_pkts_total = 0;
521         int32_t sched_quanta = sw->sched_quanta;
522         uint32_t i;
523
524         sw->sched_called++;
525         if (!sw->started)
526                 return;
527
528         do {
529                 uint32_t in_pkts_this_iteration = 0;
530
531                 /* Pull from rx_ring for ports */
532                 do {
533                         in_pkts = 0;
534                         for (i = 0; i < sw->port_count; i++)
535                                 if (sw->ports[i].is_directed)
536                                         in_pkts += sw_schedule_pull_port_dir(sw, i);
537                                 else if (sw->ports[i].num_ordered_qids > 0)
538                                         in_pkts += sw_schedule_pull_port_lb(sw, i);
539                                 else
540                                         in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
541
542                         /* QID scan for re-ordered */
543                         in_pkts += sw_schedule_reorder(sw, 0,
544                                         sw->qid_count);
545                         in_pkts_this_iteration += in_pkts;
546                 } while (in_pkts > 4 &&
547                                 (int)in_pkts_this_iteration < sched_quanta);
548
549                 out_pkts = 0;
550                 out_pkts += sw_schedule_qid_to_cq(sw);
551                 out_pkts_total += out_pkts;
552                 in_pkts_total += in_pkts_this_iteration;
553
554                 if (in_pkts == 0 && out_pkts == 0)
555                         break;
556         } while ((int)out_pkts_total < sched_quanta);
557
558         /* push all the internal buffered QEs in port->cq_ring to the
559          * worker cores: aka, do the ring transfers batched.
560          */
561         for (i = 0; i < sw->port_count; i++) {
562                 struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
563                 rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
564                                 sw->ports[i].cq_buf_count,
565                                 &sw->cq_ring_space[i]);
566                 sw->ports[i].cq_buf_count = 0;
567         }
568
569         sw->stats.tx_pkts += out_pkts_total;
570         sw->stats.rx_pkts += in_pkts_total;
571
572         sw->sched_no_iq_enqueues += (in_pkts_total == 0);
573         sw->sched_no_cq_enqueues += (out_pkts_total == 0);
574
575 }