mempool/cnxk: add generic operations
[dpdk.git] / drivers / event / sw / sw_evdev_scheduler.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <rte_ring.h>
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
8 #include "sw_evdev.h"
9 #include "iq_chunk.h"
10 #include "event_ring.h"
11
12 #define SW_IQS_MASK (SW_IQS_MAX-1)
13
14 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
15  * CLZ twice is faster than caching the value due to data dependencies
16  */
17 #define PKT_MASK_TO_IQ(pkts) \
18         (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
19
20 #if SW_IQS_MAX != 4
21 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #endif
23 #define PRIO_TO_IQ(prio) (prio >> 6)
24
25 #define MAX_PER_IQ_DEQUEUE 48
26 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
27 /* use cheap bit mixing, we only need to lose a few bits */
28 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29
30
31 static inline uint32_t
32 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
33                 uint32_t iq_num, unsigned int count)
34 {
35         struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */
36         struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE];
37         uint32_t nb_blocked = 0;
38         uint32_t i;
39
40         if (count > MAX_PER_IQ_DEQUEUE)
41                 count = MAX_PER_IQ_DEQUEUE;
42
43         /* This is the QID ID. The QID ID is static, hence it can be
44          * used to identify the stage of processing in history lists etc
45          */
46         uint32_t qid_id = qid->id;
47
48         iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
49         for (i = 0; i < count; i++) {
50                 const struct rte_event *qe = &qes[i];
51                 const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
52                 struct sw_fid_t *fid = &qid->fids[flow_id];
53                 int cq = fid->cq;
54
55                 if (cq < 0) {
56                         uint32_t cq_idx;
57                         if (qid->cq_next_tx >= qid->cq_num_mapped_cqs)
58                                 qid->cq_next_tx = 0;
59                         cq_idx = qid->cq_next_tx++;
60
61                         cq = qid->cq_map[cq_idx];
62
63                         /* find least used */
64                         int cq_free_cnt = sw->cq_ring_space[cq];
65                         for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs;
66                                         cq_idx++) {
67                                 int test_cq = qid->cq_map[cq_idx];
68                                 int test_cq_free = sw->cq_ring_space[test_cq];
69                                 if (test_cq_free > cq_free_cnt) {
70                                         cq = test_cq;
71                                         cq_free_cnt = test_cq_free;
72                                 }
73                         }
74
75                         fid->cq = cq; /* this pins early */
76                 }
77
78                 if (sw->cq_ring_space[cq] == 0 ||
79                                 sw->ports[cq].inflights == SW_PORT_HIST_LIST) {
80                         blocked_qes[nb_blocked++] = *qe;
81                         continue;
82                 }
83
84                 struct sw_port *p = &sw->ports[cq];
85
86                 /* at this point we can queue up the packet on the cq_buf */
87                 fid->pcount++;
88                 p->cq_buf[p->cq_buf_count++] = *qe;
89                 p->inflights++;
90                 sw->cq_ring_space[cq]--;
91
92                 int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1));
93                 p->hist_list[head].fid = flow_id;
94                 p->hist_list[head].qid = qid_id;
95
96                 p->stats.tx_pkts++;
97                 qid->stats.tx_pkts++;
98                 qid->to_port[cq]++;
99
100                 /* if we just filled in the last slot, flush the buffer */
101                 if (sw->cq_ring_space[cq] == 0) {
102                         struct rte_event_ring *worker = p->cq_worker_ring;
103                         rte_event_ring_enqueue_burst(worker, p->cq_buf,
104                                         p->cq_buf_count,
105                                         &sw->cq_ring_space[cq]);
106                         p->cq_buf_count = 0;
107                 }
108         }
109         iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
110
111         return count - nb_blocked;
112 }
113
114 static inline uint32_t
115 sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
116                 uint32_t iq_num, unsigned int count, int keep_order)
117 {
118         uint32_t i;
119         uint32_t cq_idx = qid->cq_next_tx;
120
121         /* This is the QID ID. The QID ID is static, hence it can be
122          * used to identify the stage of processing in history lists etc
123          */
124         uint32_t qid_id = qid->id;
125
126         if (count > MAX_PER_IQ_DEQUEUE)
127                 count = MAX_PER_IQ_DEQUEUE;
128
129         if (keep_order)
130                 /* only schedule as many as we have reorder buffer entries */
131                 count = RTE_MIN(count,
132                                 rob_ring_count(qid->reorder_buffer_freelist));
133
134         for (i = 0; i < count; i++) {
135                 const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
136                 uint32_t cq_check_count = 0;
137                 uint32_t cq;
138
139                 /*
140                  *  for parallel, just send to next available CQ in round-robin
141                  * fashion. So scan for an available CQ. If all CQs are full
142                  * just return and move on to next QID
143                  */
144                 do {
145                         if (++cq_check_count > qid->cq_num_mapped_cqs)
146                                 goto exit;
147                         if (cq_idx >= qid->cq_num_mapped_cqs)
148                                 cq_idx = 0;
149                         cq = qid->cq_map[cq_idx++];
150
151                 } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
152                                 rte_event_ring_free_count(
153                                         sw->ports[cq].cq_worker_ring) == 0);
154
155                 struct sw_port *p = &sw->ports[cq];
156                 if (sw->cq_ring_space[cq] == 0 ||
157                                 p->inflights == SW_PORT_HIST_LIST)
158                         break;
159
160                 sw->cq_ring_space[cq]--;
161
162                 qid->stats.tx_pkts++;
163
164                 const int head = (p->hist_head & (SW_PORT_HIST_LIST-1));
165                 p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id);
166                 p->hist_list[head].qid = qid_id;
167
168                 if (keep_order)
169                         rob_ring_dequeue(qid->reorder_buffer_freelist,
170                                         (void *)&p->hist_list[head].rob_entry);
171
172                 sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
173                 iq_pop(sw, &qid->iq[iq_num]);
174
175                 rte_compiler_barrier();
176                 p->inflights++;
177                 p->stats.tx_pkts++;
178                 p->hist_head++;
179         }
180 exit:
181         qid->cq_next_tx = cq_idx;
182         return i;
183 }
184
185 static uint32_t
186 sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
187                 uint32_t iq_num, unsigned int count __rte_unused)
188 {
189         uint32_t cq_id = qid->cq_map[0];
190         struct sw_port *port = &sw->ports[cq_id];
191
192         /* get max burst enq size for cq_ring */
193         uint32_t count_free = sw->cq_ring_space[cq_id];
194         if (count_free == 0)
195                 return 0;
196
197         /* burst dequeue from the QID IQ ring */
198         struct sw_iq *iq = &qid->iq[iq_num];
199         uint32_t ret = iq_dequeue_burst(sw, iq,
200                         &port->cq_buf[port->cq_buf_count], count_free);
201         port->cq_buf_count += ret;
202
203         /* Update QID, Port and Total TX stats */
204         qid->stats.tx_pkts += ret;
205         port->stats.tx_pkts += ret;
206
207         /* Subtract credits from cached value */
208         sw->cq_ring_space[cq_id] -= ret;
209
210         return ret;
211 }
212
213 static uint32_t
214 sw_schedule_qid_to_cq(struct sw_evdev *sw)
215 {
216         uint32_t pkts = 0;
217         uint32_t qid_idx;
218
219         sw->sched_cq_qid_called++;
220
221         for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) {
222                 struct sw_qid *qid = sw->qids_prioritized[qid_idx];
223
224                 int type = qid->type;
225                 int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask);
226
227                 /* zero mapped CQs indicates directed */
228                 if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0)
229                         continue;
230
231                 uint32_t pkts_done = 0;
232                 uint32_t count = iq_count(&qid->iq[iq_num]);
233
234                 if (count >= sw->sched_min_burst) {
235                         if (type == SW_SCHED_TYPE_DIRECT)
236                                 pkts_done += sw_schedule_dir_to_cq(sw, qid,
237                                                 iq_num, count);
238                         else if (type == RTE_SCHED_TYPE_ATOMIC)
239                                 pkts_done += sw_schedule_atomic_to_cq(sw, qid,
240                                                 iq_num, count);
241                         else
242                                 pkts_done += sw_schedule_parallel_to_cq(sw, qid,
243                                                 iq_num, count,
244                                                 type == RTE_SCHED_TYPE_ORDERED);
245                 }
246
247                 /* Check if the IQ that was polled is now empty, and unset it
248                  * in the IQ mask if its empty.
249                  */
250                 int all_done = (pkts_done == count);
251
252                 qid->iq_pkt_mask &= ~(all_done << (iq_num));
253                 pkts += pkts_done;
254         }
255
256         return pkts;
257 }
258
259 /* This function will perform re-ordering of packets, and injecting into
260  * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
261  * contiguous in that array, this function accepts a "range" of QIDs to scan.
262  */
263 static uint16_t
264 sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end)
265 {
266         /* Perform egress reordering */
267         struct rte_event *qe;
268         uint32_t pkts_iter = 0;
269
270         for (; qid_start < qid_end; qid_start++) {
271                 struct sw_qid *qid = &sw->qids[qid_start];
272                 unsigned int i, num_entries_in_use;
273
274                 if (qid->type != RTE_SCHED_TYPE_ORDERED)
275                         continue;
276
277                 num_entries_in_use = rob_ring_free_count(
278                                         qid->reorder_buffer_freelist);
279
280                 if (num_entries_in_use < sw->sched_min_burst)
281                         num_entries_in_use = 0;
282
283                 for (i = 0; i < num_entries_in_use; i++) {
284                         struct reorder_buffer_entry *entry;
285                         int j;
286
287                         entry = &qid->reorder_buffer[qid->reorder_buffer_index];
288
289                         if (!entry->ready)
290                                 break;
291
292                         for (j = 0; j < entry->num_fragments; j++) {
293                                 uint16_t dest_qid;
294                                 uint16_t dest_iq;
295
296                                 int idx = entry->fragment_index + j;
297                                 qe = &entry->fragments[idx];
298
299                                 dest_qid = qe->queue_id;
300                                 dest_iq  = PRIO_TO_IQ(qe->priority);
301
302                                 if (dest_qid >= sw->qid_count) {
303                                         sw->stats.rx_dropped++;
304                                         continue;
305                                 }
306
307                                 pkts_iter++;
308
309                                 struct sw_qid *q = &sw->qids[dest_qid];
310                                 struct sw_iq *iq = &q->iq[dest_iq];
311
312                                 /* we checked for space above, so enqueue must
313                                  * succeed
314                                  */
315                                 iq_enqueue(sw, iq, qe);
316                                 q->iq_pkt_mask |= (1 << (dest_iq));
317                                 q->iq_pkt_count[dest_iq]++;
318                                 q->stats.rx_pkts++;
319                         }
320
321                         entry->ready = (j != entry->num_fragments);
322                         entry->num_fragments -= j;
323                         entry->fragment_index += j;
324
325                         if (!entry->ready) {
326                                 entry->fragment_index = 0;
327
328                                 rob_ring_enqueue(
329                                                 qid->reorder_buffer_freelist,
330                                                 entry);
331
332                                 qid->reorder_buffer_index++;
333                                 qid->reorder_buffer_index %= qid->window_size;
334                         }
335                 }
336         }
337         return pkts_iter;
338 }
339
340 static __rte_always_inline void
341 sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
342 {
343         RTE_SET_USED(sw);
344         struct rte_event_ring *worker = port->rx_worker_ring;
345         port->pp_buf_start = 0;
346         port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
347                         sw->sched_deq_burst_size, NULL);
348 }
349
350 static __rte_always_inline uint32_t
351 __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder)
352 {
353         static struct reorder_buffer_entry dummy_rob;
354         uint32_t pkts_iter = 0;
355         struct sw_port *port = &sw->ports[port_id];
356
357         /* If shadow ring has 0 pkts, pull from worker ring */
358         if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
359                 sw_refill_pp_buf(sw, port);
360
361         while (port->pp_buf_count) {
362                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
363                 struct sw_hist_list_entry *hist_entry = NULL;
364                 uint8_t flags = qe->op;
365                 const uint16_t eop = !(flags & QE_FLAG_NOT_EOP);
366                 int needs_reorder = 0;
367                 /* if no-reordering, having PARTIAL == NEW */
368                 if (!allow_reorder && !eop)
369                         flags = QE_FLAG_VALID;
370
371                 /*
372                  * if we don't have space for this packet in an IQ,
373                  * then move on to next queue. Technically, for a
374                  * packet that needs reordering, we don't need to check
375                  * here, but it simplifies things not to special-case
376                  */
377                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
378                 struct sw_qid *qid = &sw->qids[qe->queue_id];
379
380                 /* now process based on flags. Note that for directed
381                  * queues, the enqueue_flush masks off all but the
382                  * valid flag. This makes FWD and PARTIAL enqueues just
383                  * NEW type, and makes DROPS no-op calls.
384                  */
385                 if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) {
386                         const uint32_t hist_tail = port->hist_tail &
387                                         (SW_PORT_HIST_LIST - 1);
388
389                         hist_entry = &port->hist_list[hist_tail];
390                         const uint32_t hist_qid = hist_entry->qid;
391                         const uint32_t hist_fid = hist_entry->fid;
392
393                         struct sw_fid_t *fid =
394                                 &sw->qids[hist_qid].fids[hist_fid];
395                         fid->pcount -= eop;
396                         if (fid->pcount == 0)
397                                 fid->cq = -1;
398
399                         if (allow_reorder) {
400                                 /* set reorder ready if an ordered QID */
401                                 uintptr_t rob_ptr =
402                                         (uintptr_t)hist_entry->rob_entry;
403                                 const uintptr_t valid = (rob_ptr != 0);
404                                 needs_reorder = valid;
405                                 rob_ptr |=
406                                         ((valid - 1) & (uintptr_t)&dummy_rob);
407                                 struct reorder_buffer_entry *tmp_rob_ptr =
408                                         (struct reorder_buffer_entry *)rob_ptr;
409                                 tmp_rob_ptr->ready = eop * needs_reorder;
410                         }
411
412                         port->inflights -= eop;
413                         port->hist_tail += eop;
414                 }
415                 if (flags & QE_FLAG_VALID) {
416                         port->stats.rx_pkts++;
417
418                         if (allow_reorder && needs_reorder) {
419                                 struct reorder_buffer_entry *rob_entry =
420                                                 hist_entry->rob_entry;
421
422                                 hist_entry->rob_entry = NULL;
423                                 /* Although fragmentation not currently
424                                  * supported by eventdev API, we support it
425                                  * here. Open: How do we alert the user that
426                                  * they've exceeded max frags?
427                                  */
428                                 int num_frag = rob_entry->num_fragments;
429                                 if (num_frag == SW_FRAGMENTS_MAX)
430                                         sw->stats.rx_dropped++;
431                                 else {
432                                         int idx = rob_entry->num_fragments++;
433                                         rob_entry->fragments[idx] = *qe;
434                                 }
435                                 goto end_qe;
436                         }
437
438                         /* Use the iq_num from above to push the QE
439                          * into the qid at the right priority
440                          */
441
442                         qid->iq_pkt_mask |= (1 << (iq_num));
443                         iq_enqueue(sw, &qid->iq[iq_num], qe);
444                         qid->iq_pkt_count[iq_num]++;
445                         qid->stats.rx_pkts++;
446                         pkts_iter++;
447                 }
448
449 end_qe:
450                 port->pp_buf_start++;
451                 port->pp_buf_count--;
452         } /* while (avail_qes) */
453
454         return pkts_iter;
455 }
456
457 static uint32_t
458 sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id)
459 {
460         return __pull_port_lb(sw, port_id, 1);
461 }
462
463 static uint32_t
464 sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id)
465 {
466         return __pull_port_lb(sw, port_id, 0);
467 }
468
469 static uint32_t
470 sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id)
471 {
472         uint32_t pkts_iter = 0;
473         struct sw_port *port = &sw->ports[port_id];
474
475         /* If shadow ring has 0 pkts, pull from worker ring */
476         if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
477                 sw_refill_pp_buf(sw, port);
478
479         while (port->pp_buf_count) {
480                 const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
481                 uint8_t flags = qe->op;
482
483                 if ((flags & QE_FLAG_VALID) == 0)
484                         goto end_qe;
485
486                 uint32_t iq_num = PRIO_TO_IQ(qe->priority);
487                 struct sw_qid *qid = &sw->qids[qe->queue_id];
488                 struct sw_iq *iq = &qid->iq[iq_num];
489
490                 port->stats.rx_pkts++;
491
492                 /* Use the iq_num from above to push the QE
493                  * into the qid at the right priority
494                  */
495                 qid->iq_pkt_mask |= (1 << (iq_num));
496                 iq_enqueue(sw, iq, qe);
497                 qid->iq_pkt_count[iq_num]++;
498                 qid->stats.rx_pkts++;
499                 pkts_iter++;
500
501 end_qe:
502                 port->pp_buf_start++;
503                 port->pp_buf_count--;
504         } /* while port->pp_buf_count */
505
506         return pkts_iter;
507 }
508
509 void
510 sw_event_schedule(struct rte_eventdev *dev)
511 {
512         struct sw_evdev *sw = sw_pmd_priv(dev);
513         uint32_t in_pkts, out_pkts;
514         uint32_t out_pkts_total = 0, in_pkts_total = 0;
515         int32_t sched_quanta = sw->sched_quanta;
516         uint32_t i;
517
518         sw->sched_called++;
519         if (unlikely(!sw->started))
520                 return;
521
522         do {
523                 uint32_t in_pkts_this_iteration = 0;
524
525                 /* Pull from rx_ring for ports */
526                 do {
527                         in_pkts = 0;
528                         for (i = 0; i < sw->port_count; i++) {
529                                 /* ack the unlinks in progress as done */
530                                 if (sw->ports[i].unlinks_in_progress)
531                                         sw->ports[i].unlinks_in_progress = 0;
532
533                                 if (sw->ports[i].is_directed)
534                                         in_pkts += sw_schedule_pull_port_dir(sw, i);
535                                 else if (sw->ports[i].num_ordered_qids > 0)
536                                         in_pkts += sw_schedule_pull_port_lb(sw, i);
537                                 else
538                                         in_pkts += sw_schedule_pull_port_no_reorder(sw, i);
539                         }
540
541                         /* QID scan for re-ordered */
542                         in_pkts += sw_schedule_reorder(sw, 0,
543                                         sw->qid_count);
544                         in_pkts_this_iteration += in_pkts;
545                 } while (in_pkts > 4 &&
546                                 (int)in_pkts_this_iteration < sched_quanta);
547
548                 out_pkts = sw_schedule_qid_to_cq(sw);
549                 out_pkts_total += out_pkts;
550                 in_pkts_total += in_pkts_this_iteration;
551
552                 if (in_pkts == 0 && out_pkts == 0)
553                         break;
554         } while ((int)out_pkts_total < sched_quanta);
555
556         sw->stats.tx_pkts += out_pkts_total;
557         sw->stats.rx_pkts += in_pkts_total;
558
559         sw->sched_no_iq_enqueues += (in_pkts_total == 0);
560         sw->sched_no_cq_enqueues += (out_pkts_total == 0);
561
562         /* push all the internal buffered QEs in port->cq_ring to the
563          * worker cores: aka, do the ring transfers batched.
564          */
565         int no_enq = 1;
566         for (i = 0; i < sw->port_count; i++) {
567                 struct sw_port *port = &sw->ports[i];
568                 struct rte_event_ring *worker = port->cq_worker_ring;
569
570                 /* If shadow ring has 0 pkts, pull from worker ring */
571                 if (sw->refill_once_per_iter && port->pp_buf_count == 0)
572                         sw_refill_pp_buf(sw, port);
573
574                 if (port->cq_buf_count >= sw->sched_min_burst) {
575                         rte_event_ring_enqueue_burst(worker,
576                                         port->cq_buf,
577                                         port->cq_buf_count,
578                                         &sw->cq_ring_space[i]);
579                         port->cq_buf_count = 0;
580                         no_enq = 0;
581                 } else {
582                         sw->cq_ring_space[i] =
583                                         rte_event_ring_free_count(worker) -
584                                         port->cq_buf_count;
585                 }
586         }
587
588         if (no_enq) {
589                 if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
590                         sw->sched_min_burst = 1;
591                 else
592                         sw->sched_flush_count++;
593         } else {
594                 if (sw->sched_flush_count)
595                         sw->sched_flush_count--;
596                 else
597                         sw->sched_min_burst = sw->sched_min_burst_size;
598         }
599
600 }