net/mlx5: reduce Tx completion index memory loads
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87                        uint16_t num_forward, uint16_t num_release)
88 {
89         port->new_enqueued += num_new;
90         port->forward_enqueued += num_forward;
91         port->release_enqueued += num_release;
92 }
93
94 static void
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97         source_port->queue_enqueued[queue_id]++;
98 }
99
100 static void
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103         port->dequeued += num;
104 }
105
106 static void
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109         source_port->queue_dequeued[queue_id]++;
110 }
111
112 static void
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115         if (dequeued > 0 && port->busy_start == 0)
116                 /* work period begins */
117                 port->busy_start = rte_get_timer_cycles();
118         else if (dequeued == 0 && port->busy_start > 0) {
119                 /* work period ends */
120                 uint64_t work_period =
121                         rte_get_timer_cycles() - port->busy_start;
122                 port->busy_cycles += work_period;
123                 port->busy_start = 0;
124         }
125 }
126
127 static int16_t
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130         uint64_t passed = now - port->measurement_start;
131         uint64_t busy_cycles = port->busy_cycles;
132
133         if (port->busy_start > 0) {
134                 busy_cycles += (now - port->busy_start);
135                 port->busy_start = now;
136         }
137
138         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139
140         port->measurement_start = now;
141         port->busy_cycles = 0;
142
143         port->total_busy_cycles += busy_cycles;
144
145         return load;
146 }
147
148 static void
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151         int16_t old_load;
152         int16_t period_load;
153         int16_t new_load;
154
155         old_load = rte_atomic16_read(&port->load);
156
157         period_load = dsw_port_load_close_period(port, now);
158
159         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160                 (DSW_OLD_LOAD_WEIGHT+1);
161
162         rte_atomic16_set(&port->load, new_load);
163 }
164
165 static void
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
167 {
168         if (now < port->next_load_update)
169                 return;
170
171         port->next_load_update = now + port->load_update_interval;
172
173         dsw_port_load_update(port, now);
174 }
175
176 static void
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
178 {
179         /* there's always room on the ring */
180         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
181                 rte_pause();
182 }
183
184 static int
185 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186 {
187         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
188 }
189
190 static void
191 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
192                        uint8_t type, uint8_t queue_id, uint16_t flow_hash)
193 {
194         uint16_t port_id;
195         struct dsw_ctl_msg msg = {
196                 .type = type,
197                 .originating_port_id = source_port->id,
198                 .queue_id = queue_id,
199                 .flow_hash = flow_hash
200         };
201
202         for (port_id = 0; port_id < dsw->num_ports; port_id++)
203                 if (port_id != source_port->id)
204                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
205 }
206
207 static bool
208 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
209                         uint16_t flow_hash)
210 {
211         uint16_t i;
212
213         for (i = 0; i < port->paused_flows_len; i++) {
214                 struct dsw_queue_flow *qf = &port->paused_flows[i];
215                 if (qf->queue_id == queue_id &&
216                     qf->flow_hash == flow_hash)
217                         return true;
218         }
219         return false;
220 }
221
222 static void
223 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
224                          uint16_t paused_flow_hash)
225 {
226         port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
227                 .queue_id = queue_id,
228                 .flow_hash = paused_flow_hash
229         };
230         port->paused_flows_len++;
231 }
232
233 static void
234 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
235                             uint16_t paused_flow_hash)
236 {
237         uint16_t i;
238
239         for (i = 0; i < port->paused_flows_len; i++) {
240                 struct dsw_queue_flow *qf = &port->paused_flows[i];
241
242                 if (qf->queue_id == queue_id &&
243                     qf->flow_hash == paused_flow_hash) {
244                         uint16_t last_idx = port->paused_flows_len-1;
245                         if (i != last_idx)
246                                 port->paused_flows[i] =
247                                         port->paused_flows[last_idx];
248                         port->paused_flows_len--;
249                         break;
250                 }
251         }
252 }
253
254 static void
255 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
256
257 static void
258 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
259                            uint8_t originating_port_id, uint8_t queue_id,
260                            uint16_t paused_flow_hash)
261 {
262         struct dsw_ctl_msg cfm = {
263                 .type = DSW_CTL_CFM,
264                 .originating_port_id = port->id,
265                 .queue_id = queue_id,
266                 .flow_hash = paused_flow_hash
267         };
268
269         DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
270                         queue_id, paused_flow_hash);
271
272         /* There might be already-scheduled events belonging to the
273          * paused flow in the output buffers.
274          */
275         dsw_port_flush_out_buffers(dsw, port);
276
277         dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
278
279         /* Make sure any stores to the original port's in_ring is seen
280          * before the ctl message.
281          */
282         rte_smp_wmb();
283
284         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
285 }
286
287 static void
288 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
289                           uint8_t exclude_port_id, int16_t *port_loads,
290                           uint8_t *target_port_id, int16_t *target_load)
291 {
292         int16_t candidate_port_id = -1;
293         int16_t candidate_load = DSW_MAX_LOAD;
294         uint16_t i;
295
296         for (i = 0; i < num_port_ids; i++) {
297                 uint8_t port_id = port_ids[i];
298                 if (port_id != exclude_port_id) {
299                         int16_t load = port_loads[port_id];
300                         if (candidate_port_id == -1 ||
301                             load < candidate_load) {
302                                 candidate_port_id = port_id;
303                                 candidate_load = load;
304                         }
305                 }
306         }
307         *target_port_id = candidate_port_id;
308         *target_load = candidate_load;
309 }
310
311 struct dsw_queue_flow_burst {
312         struct dsw_queue_flow queue_flow;
313         uint16_t count;
314 };
315
316 static inline int
317 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
318 {
319         const struct dsw_queue_flow_burst *burst_a = v_burst_a;
320         const struct dsw_queue_flow_burst *burst_b = v_burst_b;
321
322         int a_count = burst_a->count;
323         int b_count = burst_b->count;
324
325         return a_count - b_count;
326 }
327
328 #define DSW_QF_TO_INT(_qf)                                      \
329         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
330
331 static inline int
332 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
333 {
334         const struct dsw_queue_flow *qf_a = v_qf_a;
335         const struct dsw_queue_flow *qf_b = v_qf_b;
336
337         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
338 }
339
340 static uint16_t
341 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
342                        struct dsw_queue_flow_burst *bursts)
343 {
344         uint16_t i;
345         struct dsw_queue_flow_burst *current_burst = NULL;
346         uint16_t num_bursts = 0;
347
348         /* We don't need the stable property, and the list is likely
349          * large enough for qsort() to outperform dsw_stable_sort(),
350          * so we use qsort() here.
351          */
352         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
353
354         /* arrange the (now-consecutive) events into bursts */
355         for (i = 0; i < qfs_len; i++) {
356                 if (i == 0 ||
357                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
358                         current_burst = &bursts[num_bursts];
359                         current_burst->queue_flow = qfs[i];
360                         current_burst->count = 0;
361                         num_bursts++;
362                 }
363                 current_burst->count++;
364         }
365
366         qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
367
368         return num_bursts;
369 }
370
371 static bool
372 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
373                         int16_t load_limit)
374 {
375         bool below_limit = false;
376         uint16_t i;
377
378         for (i = 0; i < dsw->num_ports; i++) {
379                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
380                 if (load < load_limit)
381                         below_limit = true;
382                 port_loads[i] = load;
383         }
384         return below_limit;
385 }
386
387 static bool
388 dsw_select_migration_target(struct dsw_evdev *dsw,
389                             struct dsw_port *source_port,
390                             struct dsw_queue_flow_burst *bursts,
391                             uint16_t num_bursts, int16_t *port_loads,
392                             int16_t max_load, struct dsw_queue_flow *target_qf,
393                             uint8_t *target_port_id)
394 {
395         uint16_t source_load = port_loads[source_port->id];
396         uint16_t i;
397
398         for (i = 0; i < num_bursts; i++) {
399                 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
400
401                 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
402                                             qf->flow_hash))
403                         continue;
404
405                 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
406                 int16_t target_load;
407
408                 dsw_find_lowest_load_port(queue->serving_ports,
409                                           queue->num_serving_ports,
410                                           source_port->id, port_loads,
411                                           target_port_id, &target_load);
412
413                 if (target_load < source_load &&
414                     target_load < max_load) {
415                         *target_qf = *qf;
416                         return true;
417                 }
418         }
419
420         DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
421                         "no target port found with load less than %d.\n",
422                         num_bursts, DSW_LOAD_TO_PERCENT(max_load));
423
424         return false;
425 }
426
427 static uint8_t
428 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
429 {
430         struct dsw_queue *queue = &dsw->queues[queue_id];
431         uint8_t port_id;
432
433         if (queue->num_serving_ports > 1)
434                 port_id = queue->flow_to_port_map[flow_hash];
435         else
436                 /* A single-link queue, or atomic/ordered/parallel but
437                  * with just a single serving port.
438                  */
439                 port_id = queue->serving_ports[0];
440
441         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
442                    "to port %d.\n", queue_id, flow_hash, port_id);
443
444         return port_id;
445 }
446
447 static void
448 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
449                            uint8_t dest_port_id)
450 {
451         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
452         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
453         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
454         uint16_t enqueued = 0;
455
456         if (*buffer_len == 0)
457                 return;
458
459         /* The rings are dimensioned to fit all in-flight events (even
460          * on a single ring), so looping will work.
461          */
462         do {
463                 enqueued +=
464                         rte_event_ring_enqueue_burst(dest_port->in_ring,
465                                                      buffer+enqueued,
466                                                      *buffer_len-enqueued,
467                                                      NULL);
468         } while (unlikely(enqueued != *buffer_len));
469
470         (*buffer_len) = 0;
471 }
472
473 static uint16_t
474 dsw_port_get_parallel_flow_id(struct dsw_port *port)
475 {
476         uint16_t flow_id = port->next_parallel_flow_id;
477
478         port->next_parallel_flow_id =
479                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
480
481         return flow_id;
482 }
483
484 static void
485 dsw_port_buffer_paused(struct dsw_port *port,
486                        const struct rte_event *paused_event)
487 {
488         port->paused_events[port->paused_events_len] = *paused_event;
489         port->paused_events_len++;
490 }
491
492 static void
493 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
494                            uint8_t dest_port_id, const struct rte_event *event)
495 {
496         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
497         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
498
499         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
500                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
501
502         buffer[*buffer_len] = *event;
503
504         (*buffer_len)++;
505 }
506
507 #define DSW_FLOW_ID_BITS (24)
508 static uint16_t
509 dsw_flow_id_hash(uint32_t flow_id)
510 {
511         uint16_t hash = 0;
512         uint16_t offset = 0;
513
514         do {
515                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
516                 offset += DSW_MAX_FLOWS_BITS;
517         } while (offset < DSW_FLOW_ID_BITS);
518
519         return hash;
520 }
521
522 static void
523 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
524                          struct rte_event event)
525 {
526         uint8_t dest_port_id;
527
528         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
529
530         dest_port_id = dsw_schedule(dsw, event.queue_id,
531                                     dsw_flow_id_hash(event.flow_id));
532
533         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
534 }
535
536 static void
537 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
538                       const struct rte_event *event)
539 {
540         uint16_t flow_hash;
541         uint8_t dest_port_id;
542
543         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
544                      RTE_SCHED_TYPE_PARALLEL)) {
545                 dsw_port_buffer_parallel(dsw, source_port, *event);
546                 return;
547         }
548
549         flow_hash = dsw_flow_id_hash(event->flow_id);
550
551         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
552                                              flow_hash))) {
553                 dsw_port_buffer_paused(source_port, event);
554                 return;
555         }
556
557         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
558
559         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
560 }
561
562 static void
563 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
564                              struct dsw_port *source_port,
565                              uint8_t queue_id, uint16_t paused_flow_hash)
566 {
567         uint16_t paused_events_len = source_port->paused_events_len;
568         struct rte_event paused_events[paused_events_len];
569         uint8_t dest_port_id;
570         uint16_t i;
571
572         if (paused_events_len == 0)
573                 return;
574
575         if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
576                 return;
577
578         rte_memcpy(paused_events, source_port->paused_events,
579                    paused_events_len * sizeof(struct rte_event));
580
581         source_port->paused_events_len = 0;
582
583         dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
584
585         for (i = 0; i < paused_events_len; i++) {
586                 struct rte_event *event = &paused_events[i];
587                 uint16_t flow_hash;
588
589                 flow_hash = dsw_flow_id_hash(event->flow_id);
590
591                 if (event->queue_id == queue_id &&
592                     flow_hash == paused_flow_hash)
593                         dsw_port_buffer_non_paused(dsw, source_port,
594                                                    dest_port_id, event);
595                 else
596                         dsw_port_buffer_paused(source_port, event);
597         }
598 }
599
600 static void
601 dsw_port_migration_stats(struct dsw_port *port)
602 {
603         uint64_t migration_latency;
604
605         migration_latency = (rte_get_timer_cycles() - port->migration_start);
606         port->migration_latency += migration_latency;
607         port->migrations++;
608 }
609
610 static void
611 dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
612 {
613         uint8_t queue_id = port->migration_target_qf.queue_id;
614         uint16_t flow_hash = port->migration_target_qf.flow_hash;
615
616         port->migration_state = DSW_MIGRATION_STATE_IDLE;
617         port->seen_events_len = 0;
618
619         dsw_port_migration_stats(port);
620
621         if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
622                 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
623                 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
624         }
625
626         DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
627                         "%d flow_hash %d.\n", queue_id, flow_hash);
628 }
629
630 static void
631 dsw_port_consider_migration(struct dsw_evdev *dsw,
632                             struct dsw_port *source_port,
633                             uint64_t now)
634 {
635         bool any_port_below_limit;
636         struct dsw_queue_flow *seen_events = source_port->seen_events;
637         uint16_t seen_events_len = source_port->seen_events_len;
638         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
639         uint16_t num_bursts;
640         int16_t source_port_load;
641         int16_t port_loads[dsw->num_ports];
642
643         if (now < source_port->next_migration)
644                 return;
645
646         if (dsw->num_ports == 1)
647                 return;
648
649         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
650
651         /* Randomize interval to avoid having all threads considering
652          * migration at the same in point in time, which might lead to
653          * all choosing the same target port.
654          */
655         source_port->next_migration = now +
656                 source_port->migration_interval / 2 +
657                 rte_rand() % source_port->migration_interval;
658
659         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
660                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
661                                 "Migration already in progress.\n");
662                 return;
663         }
664
665         /* For simplicity, avoid migration in the unlikely case there
666          * is still events to consume in the in_buffer (from the last
667          * migration).
668          */
669         if (source_port->in_buffer_len > 0) {
670                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
671                                 "events in the input buffer.\n");
672                 return;
673         }
674
675         source_port_load = rte_atomic16_read(&source_port->load);
676         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
677                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
678                                 "Load %d is below threshold level %d.\n",
679                                 DSW_LOAD_TO_PERCENT(source_port_load),
680                        DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
681                 return;
682         }
683
684         /* Avoid starting any expensive operations (sorting etc), in
685          * case of a scenario with all ports above the load limit.
686          */
687         any_port_below_limit =
688                 dsw_retrieve_port_loads(dsw, port_loads,
689                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
690         if (!any_port_below_limit) {
691                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
692                                 "Candidate target ports are all too highly "
693                                 "loaded.\n");
694                 return;
695         }
696
697         /* Sort flows into 'bursts' to allow attempting to migrating
698          * small (but still active) flows first - this it to avoid
699          * having large flows moving around the worker cores too much
700          * (to avoid cache misses, among other things). Of course, the
701          * number of recorded events (queue+flow ids) are limited, and
702          * provides only a snapshot, so only so many conclusions can
703          * be drawn from this data.
704          */
705         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
706                                             bursts);
707         /* For non-big-little systems, there's no point in moving the
708          * only (known) flow.
709          */
710         if (num_bursts < 2) {
711                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
712                                 "queue_id %d flow_hash %d has been seen.\n",
713                                 bursts[0].queue_flow.queue_id,
714                                 bursts[0].queue_flow.flow_hash);
715                 return;
716         }
717
718         /* The strategy is to first try to find a flow to move to a
719          * port with low load (below the migration-attempt
720          * threshold). If that fails, we try to find a port which is
721          * below the max threshold, and also less loaded than this
722          * port is.
723          */
724         if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
725                                          port_loads,
726                                          DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
727                                          &source_port->migration_target_qf,
728                                          &source_port->migration_target_port_id)
729             &&
730             !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
731                                          port_loads,
732                                          DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
733                                          &source_port->migration_target_qf,
734                                        &source_port->migration_target_port_id))
735                 return;
736
737         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
738                         "flow_hash %d from port %d to port %d.\n",
739                         source_port->migration_target_qf.queue_id,
740                         source_port->migration_target_qf.flow_hash,
741                         source_port->id, source_port->migration_target_port_id);
742
743         /* We have a winner. */
744
745         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
746         source_port->migration_start = rte_get_timer_cycles();
747
748         /* No need to go through the whole pause procedure for
749          * parallel queues, since atomic/ordered semantics need not to
750          * be maintained.
751          */
752
753         if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
754             == RTE_SCHED_TYPE_PARALLEL) {
755                 uint8_t queue_id = source_port->migration_target_qf.queue_id;
756                 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
757                 uint8_t dest_port_id = source_port->migration_target_port_id;
758
759                 /* Single byte-sized stores are always atomic. */
760                 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
761                         dest_port_id;
762                 rte_smp_wmb();
763
764                 dsw_port_end_migration(dsw, source_port);
765
766                 return;
767         }
768
769         /* There might be 'loopback' events already scheduled in the
770          * output buffers.
771          */
772         dsw_port_flush_out_buffers(dsw, source_port);
773
774         dsw_port_add_paused_flow(source_port,
775                                  source_port->migration_target_qf.queue_id,
776                                  source_port->migration_target_qf.flow_hash);
777
778         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
779                                source_port->migration_target_qf.queue_id,
780                                source_port->migration_target_qf.flow_hash);
781         source_port->cfm_cnt = 0;
782 }
783
784 static void
785 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
786                              struct dsw_port *source_port,
787                              uint8_t queue_id, uint16_t paused_flow_hash);
788
789 static void
790 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
791                              uint8_t originating_port_id, uint8_t queue_id,
792                              uint16_t paused_flow_hash)
793 {
794         struct dsw_ctl_msg cfm = {
795                 .type = DSW_CTL_CFM,
796                 .originating_port_id = port->id,
797                 .queue_id = queue_id,
798                 .flow_hash = paused_flow_hash
799         };
800
801         DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
802                         queue_id, paused_flow_hash);
803
804         dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
805
806         rte_smp_rmb();
807
808         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
809
810         dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
811 }
812
813 #define FORWARD_BURST_SIZE (32)
814
815 static void
816 dsw_port_forward_migrated_flow(struct dsw_port *source_port,
817                                struct rte_event_ring *dest_ring,
818                                uint8_t queue_id,
819                                uint16_t flow_hash)
820 {
821         uint16_t events_left;
822
823         /* Control ring message should been seen before the ring count
824          * is read on the port's in_ring.
825          */
826         rte_smp_rmb();
827
828         events_left = rte_event_ring_count(source_port->in_ring);
829
830         while (events_left > 0) {
831                 uint16_t in_burst_size =
832                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
833                 struct rte_event in_burst[in_burst_size];
834                 uint16_t in_len;
835                 uint16_t i;
836
837                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
838                                                       in_burst,
839                                                       in_burst_size, NULL);
840                 /* No need to care about bursting forwarded events (to
841                  * the destination port's in_ring), since migration
842                  * doesn't happen very often, and also the majority of
843                  * the dequeued events will likely *not* be forwarded.
844                  */
845                 for (i = 0; i < in_len; i++) {
846                         struct rte_event *e = &in_burst[i];
847                         if (e->queue_id == queue_id &&
848                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
849                                 while (rte_event_ring_enqueue_burst(dest_ring,
850                                                                     e, 1,
851                                                                     NULL) != 1)
852                                         rte_pause();
853                         } else {
854                                 uint16_t last_idx = source_port->in_buffer_len;
855                                 source_port->in_buffer[last_idx] = *e;
856                                 source_port->in_buffer_len++;
857                         }
858                 }
859
860                 events_left -= in_len;
861         }
862 }
863
864 static void
865 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
866                              struct dsw_port *source_port)
867 {
868         uint8_t queue_id = source_port->migration_target_qf.queue_id;
869         uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
870         uint8_t dest_port_id = source_port->migration_target_port_id;
871         struct dsw_port *dest_port = &dsw->ports[dest_port_id];
872
873         dsw_port_flush_out_buffers(dsw, source_port);
874
875         rte_smp_wmb();
876
877         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
878                 dest_port_id;
879
880         dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
881                                        queue_id, flow_hash);
882
883         /* Flow table update and migration destination port's enqueues
884          * must be seen before the control message.
885          */
886         rte_smp_wmb();
887
888         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
889                                flow_hash);
890         source_port->cfm_cnt = 0;
891         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
892 }
893
894 static void
895 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
896 {
897         port->cfm_cnt++;
898
899         if (port->cfm_cnt == (dsw->num_ports-1)) {
900                 switch (port->migration_state) {
901                 case DSW_MIGRATION_STATE_PAUSING:
902                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
903                                         "migration state.\n");
904                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
905                         break;
906                 case DSW_MIGRATION_STATE_UNPAUSING:
907                         dsw_port_end_migration(dsw, port);
908                         break;
909                 default:
910                         RTE_ASSERT(0);
911                         break;
912                 }
913         }
914 }
915
916 static void
917 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
918 {
919         struct dsw_ctl_msg msg;
920
921         /* So any table loads happens before the ring dequeue, in the
922          * case of a 'paus' message.
923          */
924         rte_smp_rmb();
925
926         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
927                 switch (msg.type) {
928                 case DSW_CTL_PAUS_REQ:
929                         dsw_port_handle_pause_flow(dsw, port,
930                                                    msg.originating_port_id,
931                                                    msg.queue_id, msg.flow_hash);
932                         break;
933                 case DSW_CTL_UNPAUS_REQ:
934                         dsw_port_handle_unpause_flow(dsw, port,
935                                                      msg.originating_port_id,
936                                                      msg.queue_id,
937                                                      msg.flow_hash);
938                         break;
939                 case DSW_CTL_CFM:
940                         dsw_port_handle_confirm(dsw, port);
941                         break;
942                 }
943         }
944 }
945
946 static void
947 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
948 {
949         /* To pull the control ring reasonbly often on busy ports,
950          * each dequeued/enqueued event is considered an 'op' too.
951          */
952         port->ops_since_bg_task += (num_events+1);
953 }
954
955 static void
956 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
957 {
958         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
959                      port->pending_releases == 0))
960                 dsw_port_move_migrating_flow(dsw, port);
961
962         /* Polling the control ring is relatively inexpensive, and
963          * polling it often helps bringing down migration latency, so
964          * do this for every iteration.
965          */
966         dsw_port_ctl_process(dsw, port);
967
968         /* To avoid considering migration and flushing output buffers
969          * on every dequeue/enqueue call, the scheduler only performs
970          * such 'background' tasks every nth
971          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
972          */
973         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
974                 uint64_t now;
975
976                 now = rte_get_timer_cycles();
977
978                 port->last_bg = now;
979
980                 /* Logic to avoid having events linger in the output
981                  * buffer too long.
982                  */
983                 dsw_port_flush_out_buffers(dsw, port);
984
985                 dsw_port_consider_load_update(port, now);
986
987                 dsw_port_consider_migration(dsw, port, now);
988
989                 port->ops_since_bg_task = 0;
990         }
991 }
992
993 static void
994 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
995 {
996         uint16_t dest_port_id;
997
998         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
999                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1000 }
1001
1002 uint16_t
1003 dsw_event_enqueue(void *port, const struct rte_event *ev)
1004 {
1005         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1006 }
1007
1008 static __rte_always_inline uint16_t
1009 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1010                                 const struct rte_event events[],
1011                                 uint16_t events_len, bool op_types_known,
1012                                 uint16_t num_new, uint16_t num_release,
1013                                 uint16_t num_non_release)
1014 {
1015         struct dsw_evdev *dsw = source_port->dsw;
1016         bool enough_credits;
1017         uint16_t i;
1018
1019         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1020                         "events to port %d.\n", events_len, source_port->id);
1021
1022         dsw_port_bg_process(dsw, source_port);
1023
1024         /* XXX: For performance (=ring efficiency) reasons, the
1025          * scheduler relies on internal non-ring buffers instead of
1026          * immediately sending the event to the destination ring. For
1027          * a producer that doesn't intend to produce or consume any
1028          * more events, the scheduler provides a way to flush the
1029          * buffer, by means of doing an enqueue of zero events. In
1030          * addition, a port cannot be left "unattended" (e.g. unused)
1031          * for long periods of time, since that would stall
1032          * migration. Eventdev API extensions to provide a cleaner way
1033          * to archieve both of these functions should be
1034          * considered.
1035          */
1036         if (unlikely(events_len == 0)) {
1037                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1038                 dsw_port_flush_out_buffers(dsw, source_port);
1039                 return 0;
1040         }
1041
1042         dsw_port_note_op(source_port, events_len);
1043
1044         if (!op_types_known)
1045                 for (i = 0; i < events_len; i++) {
1046                         switch (events[i].op) {
1047                         case RTE_EVENT_OP_RELEASE:
1048                                 num_release++;
1049                                 break;
1050                         case RTE_EVENT_OP_NEW:
1051                                 num_new++;
1052                                 /* Falls through. */
1053                         default:
1054                                 num_non_release++;
1055                                 break;
1056                         }
1057                 }
1058
1059         /* Technically, we could allow the non-new events up to the
1060          * first new event in the array into the system, but for
1061          * simplicity reasons, we deny the whole burst if the port is
1062          * above the water mark.
1063          */
1064         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1065                      source_port->new_event_threshold))
1066                 return 0;
1067
1068         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1069                                                   num_non_release);
1070         if (unlikely(!enough_credits))
1071                 return 0;
1072
1073         source_port->pending_releases -= num_release;
1074
1075         dsw_port_enqueue_stats(source_port, num_new,
1076                                num_non_release-num_new, num_release);
1077
1078         for (i = 0; i < events_len; i++) {
1079                 const struct rte_event *event = &events[i];
1080
1081                 if (likely(num_release == 0 ||
1082                            event->op != RTE_EVENT_OP_RELEASE))
1083                         dsw_port_buffer_event(dsw, source_port, event);
1084                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1085         }
1086
1087         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1088                         "accepted.\n", num_non_release);
1089
1090         return num_non_release;
1091 }
1092
1093 uint16_t
1094 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1095                         uint16_t events_len)
1096 {
1097         struct dsw_port *source_port = port;
1098
1099         if (unlikely(events_len > source_port->enqueue_depth))
1100                 events_len = source_port->enqueue_depth;
1101
1102         return dsw_event_enqueue_burst_generic(source_port, events,
1103                                                events_len, false, 0, 0, 0);
1104 }
1105
1106 uint16_t
1107 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1108                             uint16_t events_len)
1109 {
1110         struct dsw_port *source_port = port;
1111
1112         if (unlikely(events_len > source_port->enqueue_depth))
1113                 events_len = source_port->enqueue_depth;
1114
1115         return dsw_event_enqueue_burst_generic(source_port, events,
1116                                                events_len, true, events_len,
1117                                                0, events_len);
1118 }
1119
1120 uint16_t
1121 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1122                                 uint16_t events_len)
1123 {
1124         struct dsw_port *source_port = port;
1125
1126         if (unlikely(events_len > source_port->enqueue_depth))
1127                 events_len = source_port->enqueue_depth;
1128
1129         return dsw_event_enqueue_burst_generic(source_port, events,
1130                                                events_len, true, 0, 0,
1131                                                events_len);
1132 }
1133
1134 uint16_t
1135 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1136 {
1137         return dsw_event_dequeue_burst(port, events, 1, wait);
1138 }
1139
1140 static void
1141 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1142                             uint16_t num)
1143 {
1144         uint16_t i;
1145
1146         dsw_port_dequeue_stats(port, num);
1147
1148         for (i = 0; i < num; i++) {
1149                 uint16_t l_idx = port->seen_events_idx;
1150                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1151                 struct rte_event *event = &events[i];
1152                 qf->queue_id = event->queue_id;
1153                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1154
1155                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1156
1157                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1158         }
1159
1160         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1161                 port->seen_events_len =
1162                         RTE_MIN(port->seen_events_len + num,
1163                                 DSW_MAX_EVENTS_RECORDED);
1164 }
1165
1166 #ifdef DSW_SORT_DEQUEUED
1167
1168 #define DSW_EVENT_TO_INT(_event)                                \
1169         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1170
1171 static inline int
1172 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1173 {
1174         const struct rte_event *event_a = v_event_a;
1175         const struct rte_event *event_b = v_event_b;
1176
1177         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1178 }
1179 #endif
1180
1181 static uint16_t
1182 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1183                        uint16_t num)
1184 {
1185         struct dsw_port *source_port = port;
1186         struct dsw_evdev *dsw = source_port->dsw;
1187
1188         dsw_port_ctl_process(dsw, source_port);
1189
1190         if (unlikely(port->in_buffer_len > 0)) {
1191                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1192
1193                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1194                            dequeued * sizeof(struct rte_event));
1195
1196                 port->in_buffer_start += dequeued;
1197                 port->in_buffer_len -= dequeued;
1198
1199                 if (port->in_buffer_len == 0)
1200                         port->in_buffer_start = 0;
1201
1202                 return dequeued;
1203         }
1204
1205         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1206 }
1207
1208 uint16_t
1209 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1210                         uint64_t wait __rte_unused)
1211 {
1212         struct dsw_port *source_port = port;
1213         struct dsw_evdev *dsw = source_port->dsw;
1214         uint16_t dequeued;
1215
1216         source_port->pending_releases = 0;
1217
1218         dsw_port_bg_process(dsw, source_port);
1219
1220         if (unlikely(num > source_port->dequeue_depth))
1221                 num = source_port->dequeue_depth;
1222
1223         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1224
1225         source_port->pending_releases = dequeued;
1226
1227         dsw_port_load_record(source_port, dequeued);
1228
1229         dsw_port_note_op(source_port, dequeued);
1230
1231         if (dequeued > 0) {
1232                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1233                                 dequeued);
1234
1235                 dsw_port_return_credits(dsw, source_port, dequeued);
1236
1237                 /* One potential optimization one might think of is to
1238                  * add a migration state (prior to 'pausing'), and
1239                  * only record seen events when the port is in this
1240                  * state (and transit to 'pausing' when enough events
1241                  * have been gathered). However, that schema doesn't
1242                  * seem to improve performance.
1243                  */
1244                 dsw_port_record_seen_events(port, events, dequeued);
1245         }
1246         /* XXX: Assuming the port can't produce any more work,
1247          *      consider flushing the output buffer, on dequeued ==
1248          *      0.
1249          */
1250
1251 #ifdef DSW_SORT_DEQUEUED
1252         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1253 #endif
1254
1255         return dequeued;
1256 }