a8161fd76a0979e138838098f602cc902dbefbaa
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87                        uint16_t num_forward, uint16_t num_release)
88 {
89         port->new_enqueued += num_new;
90         port->forward_enqueued += num_forward;
91         port->release_enqueued += num_release;
92 }
93
94 static void
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97         source_port->queue_enqueued[queue_id]++;
98 }
99
100 static void
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103         port->dequeued += num;
104 }
105
106 static void
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109         source_port->queue_dequeued[queue_id]++;
110 }
111
112 static void
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115         if (dequeued > 0 && port->busy_start == 0)
116                 /* work period begins */
117                 port->busy_start = rte_get_timer_cycles();
118         else if (dequeued == 0 && port->busy_start > 0) {
119                 /* work period ends */
120                 uint64_t work_period =
121                         rte_get_timer_cycles() - port->busy_start;
122                 port->busy_cycles += work_period;
123                 port->busy_start = 0;
124         }
125 }
126
127 static int16_t
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130         uint64_t passed = now - port->measurement_start;
131         uint64_t busy_cycles = port->busy_cycles;
132
133         if (port->busy_start > 0) {
134                 busy_cycles += (now - port->busy_start);
135                 port->busy_start = now;
136         }
137
138         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139
140         port->measurement_start = now;
141         port->busy_cycles = 0;
142
143         port->total_busy_cycles += busy_cycles;
144
145         return load;
146 }
147
148 static void
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151         int16_t old_load;
152         int16_t period_load;
153         int16_t new_load;
154
155         old_load = rte_atomic16_read(&port->load);
156
157         period_load = dsw_port_load_close_period(port, now);
158
159         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160                 (DSW_OLD_LOAD_WEIGHT+1);
161
162         rte_atomic16_set(&port->load, new_load);
163 }
164
165 static void
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
167 {
168         if (now < port->next_load_update)
169                 return;
170
171         port->next_load_update = now + port->load_update_interval;
172
173         dsw_port_load_update(port, now);
174 }
175
176 static void
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
178 {
179         /* there's always room on the ring */
180         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
181                 rte_pause();
182 }
183
184 static int
185 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186 {
187         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
188 }
189
190 static void
191 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
192                        uint8_t type, struct dsw_queue_flow *qfs,
193                        uint8_t qfs_len)
194 {
195         uint16_t port_id;
196         struct dsw_ctl_msg msg = {
197                 .type = type,
198                 .originating_port_id = source_port->id,
199                 .qfs_len = qfs_len
200         };
201
202         memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
203
204         for (port_id = 0; port_id < dsw->num_ports; port_id++)
205                 if (port_id != source_port->id)
206                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
207 }
208
209 static __rte_always_inline bool
210 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
211                          uint8_t queue_id, uint16_t flow_hash)
212 {
213         uint16_t i;
214
215         for (i = 0; i < qfs_len; i++)
216                 if (qfs[i].queue_id == queue_id &&
217                     qfs[i].flow_hash == flow_hash)
218                         return true;
219
220         return false;
221 }
222
223 static __rte_always_inline bool
224 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
225                         uint16_t flow_hash)
226 {
227         return dsw_is_queue_flow_in_ary(port->paused_flows,
228                                         port->paused_flows_len,
229                                         queue_id, flow_hash);
230 }
231
232 static void
233 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
234                           uint8_t qfs_len)
235 {
236         uint8_t i;
237
238         for (i = 0; i < qfs_len; i++) {
239                 struct dsw_queue_flow *qf = &qfs[i];
240
241                 DSW_LOG_DP_PORT(DEBUG, port->id,
242                                 "Pausing queue_id %d flow_hash %d.\n",
243                                 qf->queue_id, qf->flow_hash);
244
245                 port->paused_flows[port->paused_flows_len] = *qf;
246                 port->paused_flows_len++;
247         };
248 }
249
250 static void
251 dsw_port_remove_paused_flow(struct dsw_port *port,
252                             struct dsw_queue_flow *target_qf)
253 {
254         uint16_t i;
255
256         for (i = 0; i < port->paused_flows_len; i++) {
257                 struct dsw_queue_flow *qf = &port->paused_flows[i];
258
259                 if (qf->queue_id == target_qf->queue_id &&
260                     qf->flow_hash == target_qf->flow_hash) {
261                         uint16_t last_idx = port->paused_flows_len-1;
262                         if (i != last_idx)
263                                 port->paused_flows[i] =
264                                         port->paused_flows[last_idx];
265                         port->paused_flows_len--;
266                         break;
267                 }
268         }
269 }
270
271 static void
272 dsw_port_remove_paused_flows(struct dsw_port *port,
273                              struct dsw_queue_flow *qfs, uint8_t qfs_len)
274 {
275         uint8_t i;
276
277         for (i = 0; i < qfs_len; i++)
278                 dsw_port_remove_paused_flow(port, &qfs[i]);
279
280 }
281
282 static void
283 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
284
285 static void
286 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
287                             uint8_t originating_port_id,
288                             struct dsw_queue_flow *paused_qfs,
289                             uint8_t qfs_len)
290 {
291         struct dsw_ctl_msg cfm = {
292                 .type = DSW_CTL_CFM,
293                 .originating_port_id = port->id
294         };
295
296         /* There might be already-scheduled events belonging to the
297          * paused flow in the output buffers.
298          */
299         dsw_port_flush_out_buffers(dsw, port);
300
301         dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
302
303         /* Make sure any stores to the original port's in_ring is seen
304          * before the ctl message.
305          */
306         rte_smp_wmb();
307
308         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
309 }
310
311 struct dsw_queue_flow_burst {
312         struct dsw_queue_flow queue_flow;
313         uint16_t count;
314 };
315
316 #define DSW_QF_TO_INT(_qf)                                      \
317         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
318
319 static inline int
320 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
321 {
322         const struct dsw_queue_flow *qf_a = v_qf_a;
323         const struct dsw_queue_flow *qf_b = v_qf_b;
324
325         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
326 }
327
328 static uint16_t
329 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
330                        struct dsw_queue_flow_burst *bursts)
331 {
332         uint16_t i;
333         struct dsw_queue_flow_burst *current_burst = NULL;
334         uint16_t num_bursts = 0;
335
336         /* We don't need the stable property, and the list is likely
337          * large enough for qsort() to outperform dsw_stable_sort(),
338          * so we use qsort() here.
339          */
340         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
341
342         /* arrange the (now-consecutive) events into bursts */
343         for (i = 0; i < qfs_len; i++) {
344                 if (i == 0 ||
345                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
346                         current_burst = &bursts[num_bursts];
347                         current_burst->queue_flow = qfs[i];
348                         current_burst->count = 0;
349                         num_bursts++;
350                 }
351                 current_burst->count++;
352         }
353
354         return num_bursts;
355 }
356
357 static bool
358 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
359                         int16_t load_limit)
360 {
361         bool below_limit = false;
362         uint16_t i;
363
364         for (i = 0; i < dsw->num_ports; i++) {
365                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
366                 if (load < load_limit)
367                         below_limit = true;
368                 port_loads[i] = load;
369         }
370         return below_limit;
371 }
372
373 static int16_t
374 dsw_flow_load(uint16_t num_events, int16_t port_load)
375 {
376         return ((int32_t)port_load * (int32_t)num_events) /
377                 DSW_MAX_EVENTS_RECORDED;
378 }
379
380 static int16_t
381 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
382                        int16_t flow_load)
383 {
384         int32_t res_target_load;
385         int32_t imbalance;
386
387         if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
388                 return -1;
389
390         imbalance = source_load - target_load;
391
392         if (imbalance < DSW_REBALANCE_THRESHOLD)
393                 return -1;
394
395         res_target_load = target_load + flow_load;
396
397         /* If the estimated load of the target port will be higher
398          * than the source port's load, it doesn't make sense to move
399          * the flow.
400          */
401         if (res_target_load > source_load)
402                 return -1;
403
404         /* The more idle the target will be, the better. This will
405          * make migration prefer moving smaller flows, and flows to
406          * lightly loaded ports.
407          */
408         return DSW_MAX_LOAD - res_target_load;
409 }
410
411 static bool
412 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
413 {
414         struct dsw_queue *queue = &dsw->queues[queue_id];
415         uint16_t i;
416
417         for (i = 0; i < queue->num_serving_ports; i++)
418                 if (queue->serving_ports[i] == port_id)
419                         return true;
420
421         return false;
422 }
423
424 static bool
425 dsw_select_emigration_target(struct dsw_evdev *dsw,
426                             struct dsw_queue_flow_burst *bursts,
427                             uint16_t num_bursts, uint8_t source_port_id,
428                             int16_t *port_loads, uint16_t num_ports,
429                             uint8_t *target_port_ids,
430                             struct dsw_queue_flow *target_qfs,
431                             uint8_t *targets_len)
432 {
433         int16_t source_port_load = port_loads[source_port_id];
434         struct dsw_queue_flow *candidate_qf;
435         uint8_t candidate_port_id;
436         int16_t candidate_weight = -1;
437         int16_t candidate_flow_load;
438         uint16_t i;
439
440         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
441                 return false;
442
443         for (i = 0; i < num_bursts; i++) {
444                 struct dsw_queue_flow_burst *burst = &bursts[i];
445                 struct dsw_queue_flow *qf = &burst->queue_flow;
446                 int16_t flow_load;
447                 uint16_t port_id;
448
449                 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
450                                              qf->queue_id, qf->flow_hash))
451                         continue;
452
453                 flow_load = dsw_flow_load(burst->count, source_port_load);
454
455                 for (port_id = 0; port_id < num_ports; port_id++) {
456                         int16_t weight;
457
458                         if (port_id == source_port_id)
459                                 continue;
460
461                         if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
462                                 continue;
463
464                         weight = dsw_evaluate_migration(source_port_load,
465                                                         port_loads[port_id],
466                                                         flow_load);
467
468                         if (weight > candidate_weight) {
469                                 candidate_qf = qf;
470                                 candidate_port_id = port_id;
471                                 candidate_weight = weight;
472                                 candidate_flow_load = flow_load;
473                         }
474                 }
475         }
476
477         if (candidate_weight < 0)
478                 return false;
479
480         DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
481                         "flow_hash %d (with flow load %d) for migration "
482                         "to port %d.\n", candidate_qf->queue_id,
483                         candidate_qf->flow_hash,
484                         DSW_LOAD_TO_PERCENT(candidate_flow_load),
485                         candidate_port_id);
486
487         port_loads[candidate_port_id] += candidate_flow_load;
488         port_loads[source_port_id] -= candidate_flow_load;
489
490         target_port_ids[*targets_len] = candidate_port_id;
491         target_qfs[*targets_len] = *candidate_qf;
492         (*targets_len)++;
493
494         return true;
495 }
496
497 static void
498 dsw_select_emigration_targets(struct dsw_evdev *dsw,
499                               struct dsw_port *source_port,
500                               struct dsw_queue_flow_burst *bursts,
501                               uint16_t num_bursts, int16_t *port_loads)
502 {
503         struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
504         uint8_t *target_port_ids = source_port->emigration_target_port_ids;
505         uint8_t *targets_len = &source_port->emigration_targets_len;
506         uint8_t i;
507
508         for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
509                 bool found;
510
511                 found = dsw_select_emigration_target(dsw, bursts, num_bursts,
512                                                      source_port->id,
513                                                      port_loads, dsw->num_ports,
514                                                      target_port_ids,
515                                                      target_qfs,
516                                                      targets_len);
517                 if (!found)
518                         break;
519         }
520
521         if (*targets_len == 0)
522                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
523                                 "For the %d flows considered, no target port "
524                                 "was found.\n", num_bursts);
525 }
526
527 static uint8_t
528 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
529 {
530         struct dsw_queue *queue = &dsw->queues[queue_id];
531         uint8_t port_id;
532
533         if (queue->num_serving_ports > 1)
534                 port_id = queue->flow_to_port_map[flow_hash];
535         else
536                 /* A single-link queue, or atomic/ordered/parallel but
537                  * with just a single serving port.
538                  */
539                 port_id = queue->serving_ports[0];
540
541         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
542                    "to port %d.\n", queue_id, flow_hash, port_id);
543
544         return port_id;
545 }
546
547 static void
548 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
549                            uint8_t dest_port_id)
550 {
551         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
552         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
553         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
554         uint16_t enqueued = 0;
555
556         if (*buffer_len == 0)
557                 return;
558
559         /* The rings are dimensioned to fit all in-flight events (even
560          * on a single ring), so looping will work.
561          */
562         do {
563                 enqueued +=
564                         rte_event_ring_enqueue_burst(dest_port->in_ring,
565                                                      buffer+enqueued,
566                                                      *buffer_len-enqueued,
567                                                      NULL);
568         } while (unlikely(enqueued != *buffer_len));
569
570         (*buffer_len) = 0;
571 }
572
573 static uint16_t
574 dsw_port_get_parallel_flow_id(struct dsw_port *port)
575 {
576         uint16_t flow_id = port->next_parallel_flow_id;
577
578         port->next_parallel_flow_id =
579                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
580
581         return flow_id;
582 }
583
584 static void
585 dsw_port_buffer_paused(struct dsw_port *port,
586                        const struct rte_event *paused_event)
587 {
588         port->paused_events[port->paused_events_len] = *paused_event;
589         port->paused_events_len++;
590 }
591
592 static void
593 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
594                            uint8_t dest_port_id, const struct rte_event *event)
595 {
596         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
597         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
598
599         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
600                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
601
602         buffer[*buffer_len] = *event;
603
604         (*buffer_len)++;
605 }
606
607 #define DSW_FLOW_ID_BITS (24)
608 static uint16_t
609 dsw_flow_id_hash(uint32_t flow_id)
610 {
611         uint16_t hash = 0;
612         uint16_t offset = 0;
613
614         do {
615                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
616                 offset += DSW_MAX_FLOWS_BITS;
617         } while (offset < DSW_FLOW_ID_BITS);
618
619         return hash;
620 }
621
622 static void
623 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
624                          struct rte_event event)
625 {
626         uint8_t dest_port_id;
627
628         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
629
630         dest_port_id = dsw_schedule(dsw, event.queue_id,
631                                     dsw_flow_id_hash(event.flow_id));
632
633         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
634 }
635
636 static void
637 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
638                       const struct rte_event *event)
639 {
640         uint16_t flow_hash;
641         uint8_t dest_port_id;
642
643         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
644                      RTE_SCHED_TYPE_PARALLEL)) {
645                 dsw_port_buffer_parallel(dsw, source_port, *event);
646                 return;
647         }
648
649         flow_hash = dsw_flow_id_hash(event->flow_id);
650
651         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
652                                              flow_hash))) {
653                 dsw_port_buffer_paused(source_port, event);
654                 return;
655         }
656
657         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
658
659         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
660 }
661
662 static void
663 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
664                              struct dsw_port *source_port,
665                              const struct dsw_queue_flow *qf)
666 {
667         uint16_t paused_events_len = source_port->paused_events_len;
668         struct rte_event paused_events[paused_events_len];
669         uint8_t dest_port_id;
670         uint16_t i;
671
672         if (paused_events_len == 0)
673                 return;
674
675         if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
676                 return;
677
678         rte_memcpy(paused_events, source_port->paused_events,
679                    paused_events_len * sizeof(struct rte_event));
680
681         source_port->paused_events_len = 0;
682
683         dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
684
685         for (i = 0; i < paused_events_len; i++) {
686                 struct rte_event *event = &paused_events[i];
687                 uint16_t flow_hash;
688
689                 flow_hash = dsw_flow_id_hash(event->flow_id);
690
691                 if (event->queue_id == qf->queue_id &&
692                     flow_hash == qf->flow_hash)
693                         dsw_port_buffer_non_paused(dsw, source_port,
694                                                    dest_port_id, event);
695                 else
696                         dsw_port_buffer_paused(source_port, event);
697         }
698 }
699
700 static void
701 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
702 {
703         uint64_t flow_migration_latency;
704
705         flow_migration_latency =
706                 (rte_get_timer_cycles() - port->emigration_start);
707         port->emigration_latency += (flow_migration_latency * finished);
708         port->emigrations += finished;
709 }
710
711 static void
712 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
713                         uint8_t schedule_type)
714 {
715         uint8_t i;
716         struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
717         uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
718         uint8_t left_qfs_len = 0;
719         uint8_t finished;
720
721         for (i = 0; i < port->emigration_targets_len; i++) {
722                 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
723                 uint8_t queue_id = qf->queue_id;
724                 uint8_t queue_schedule_type =
725                         dsw->queues[queue_id].schedule_type;
726                 uint16_t flow_hash = qf->flow_hash;
727
728                 if (queue_schedule_type != schedule_type) {
729                         left_port_ids[left_qfs_len] =
730                                 port->emigration_target_port_ids[i];
731                         left_qfs[left_qfs_len] = *qf;
732                         left_qfs_len++;
733                         continue;
734                 }
735
736                 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
737                                 "queue_id %d flow_hash %d.\n", queue_id,
738                                 flow_hash);
739
740                 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
741                         dsw_port_remove_paused_flow(port, qf);
742                         dsw_port_flush_paused_events(dsw, port, qf);
743                 }
744         }
745
746         finished = port->emigration_targets_len - left_qfs_len;
747
748         if (finished > 0)
749                 dsw_port_emigration_stats(port, finished);
750
751         for (i = 0; i < left_qfs_len; i++) {
752                 port->emigration_target_port_ids[i] = left_port_ids[i];
753                 port->emigration_target_qfs[i] = left_qfs[i];
754         }
755         port->emigration_targets_len = left_qfs_len;
756
757         if (port->emigration_targets_len == 0) {
758                 port->migration_state = DSW_MIGRATION_STATE_IDLE;
759                 port->seen_events_len = 0;
760         }
761 }
762
763 static void
764 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
765                              struct dsw_port *source_port)
766 {
767         uint8_t i;
768
769         for (i = 0; i < source_port->emigration_targets_len; i++) {
770                 struct dsw_queue_flow *qf =
771                         &source_port->emigration_target_qfs[i];
772                 uint8_t queue_id = qf->queue_id;
773
774                 if (dsw->queues[queue_id].schedule_type ==
775                     RTE_SCHED_TYPE_PARALLEL) {
776                         uint8_t dest_port_id =
777                                 source_port->emigration_target_port_ids[i];
778                         uint16_t flow_hash = qf->flow_hash;
779
780                         /* Single byte-sized stores are always atomic. */
781                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
782                                 dest_port_id;
783                 }
784         }
785
786         rte_smp_wmb();
787
788         dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
789 }
790
791 static void
792 dsw_port_consider_emigration(struct dsw_evdev *dsw,
793                              struct dsw_port *source_port,
794                              uint64_t now)
795 {
796         bool any_port_below_limit;
797         struct dsw_queue_flow *seen_events = source_port->seen_events;
798         uint16_t seen_events_len = source_port->seen_events_len;
799         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
800         uint16_t num_bursts;
801         int16_t source_port_load;
802         int16_t port_loads[dsw->num_ports];
803
804         if (now < source_port->next_emigration)
805                 return;
806
807         if (dsw->num_ports == 1)
808                 return;
809
810         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
811
812         /* Randomize interval to avoid having all threads considering
813          * emigration at the same in point in time, which might lead
814          * to all choosing the same target port.
815          */
816         source_port->next_emigration = now +
817                 source_port->migration_interval / 2 +
818                 rte_rand() % source_port->migration_interval;
819
820         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
821                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
822                                 "Emigration already in progress.\n");
823                 return;
824         }
825
826         /* For simplicity, avoid migration in the unlikely case there
827          * is still events to consume in the in_buffer (from the last
828          * emigration).
829          */
830         if (source_port->in_buffer_len > 0) {
831                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
832                                 "events in the input buffer.\n");
833                 return;
834         }
835
836         source_port_load = rte_atomic16_read(&source_port->load);
837         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
838                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
839                       "Load %d is below threshold level %d.\n",
840                       DSW_LOAD_TO_PERCENT(source_port_load),
841                       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
842                 return;
843         }
844
845         /* Avoid starting any expensive operations (sorting etc), in
846          * case of a scenario with all ports above the load limit.
847          */
848         any_port_below_limit =
849                 dsw_retrieve_port_loads(dsw, port_loads,
850                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
851         if (!any_port_below_limit) {
852                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
853                                 "Candidate target ports are all too highly "
854                                 "loaded.\n");
855                 return;
856         }
857
858         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
859                                             bursts);
860
861         /* For non-big-little systems, there's no point in moving the
862          * only (known) flow.
863          */
864         if (num_bursts < 2) {
865                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
866                                 "queue_id %d flow_hash %d has been seen.\n",
867                                 bursts[0].queue_flow.queue_id,
868                                 bursts[0].queue_flow.flow_hash);
869                 return;
870         }
871
872         dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
873                                       port_loads);
874
875         if (source_port->emigration_targets_len == 0)
876                 return;
877
878         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
879         source_port->emigration_start = rte_get_timer_cycles();
880
881         /* No need to go through the whole pause procedure for
882          * parallel queues, since atomic/ordered semantics need not to
883          * be maintained.
884          */
885         dsw_port_move_parallel_flows(dsw, source_port);
886
887         /* All flows were on PARALLEL queues. */
888         if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
889                 return;
890
891         /* There might be 'loopback' events already scheduled in the
892          * output buffers.
893          */
894         dsw_port_flush_out_buffers(dsw, source_port);
895
896         dsw_port_add_paused_flows(source_port,
897                                   source_port->emigration_target_qfs,
898                                   source_port->emigration_targets_len);
899
900         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
901                                source_port->emigration_target_qfs,
902                                source_port->emigration_targets_len);
903         source_port->cfm_cnt = 0;
904 }
905
906 static void
907 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
908                              struct dsw_port *source_port,
909                              const struct dsw_queue_flow *qf);
910
911 static void
912 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
913                               uint8_t originating_port_id,
914                               struct dsw_queue_flow *paused_qfs,
915                               uint8_t qfs_len)
916 {
917         uint16_t i;
918         struct dsw_ctl_msg cfm = {
919                 .type = DSW_CTL_CFM,
920                 .originating_port_id = port->id
921         };
922
923         dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
924
925         rte_smp_rmb();
926
927         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
928
929         for (i = 0; i < qfs_len; i++) {
930                 struct dsw_queue_flow *qf = &paused_qfs[i];
931
932                 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
933                         port->immigrations++;
934
935                 dsw_port_flush_paused_events(dsw, port, qf);
936         }
937 }
938
939 #define FORWARD_BURST_SIZE (32)
940
941 static void
942 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
943                                 struct rte_event_ring *dest_ring,
944                                 uint8_t queue_id,
945                                 uint16_t flow_hash)
946 {
947         uint16_t events_left;
948
949         /* Control ring message should been seen before the ring count
950          * is read on the port's in_ring.
951          */
952         rte_smp_rmb();
953
954         events_left = rte_event_ring_count(source_port->in_ring);
955
956         while (events_left > 0) {
957                 uint16_t in_burst_size =
958                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
959                 struct rte_event in_burst[in_burst_size];
960                 uint16_t in_len;
961                 uint16_t i;
962
963                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
964                                                       in_burst,
965                                                       in_burst_size, NULL);
966                 /* No need to care about bursting forwarded events (to
967                  * the destination port's in_ring), since migration
968                  * doesn't happen very often, and also the majority of
969                  * the dequeued events will likely *not* be forwarded.
970                  */
971                 for (i = 0; i < in_len; i++) {
972                         struct rte_event *e = &in_burst[i];
973                         if (e->queue_id == queue_id &&
974                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
975                                 while (rte_event_ring_enqueue_burst(dest_ring,
976                                                                     e, 1,
977                                                                     NULL) != 1)
978                                         rte_pause();
979                         } else {
980                                 uint16_t last_idx = source_port->in_buffer_len;
981                                 source_port->in_buffer[last_idx] = *e;
982                                 source_port->in_buffer_len++;
983                         }
984                 }
985
986                 events_left -= in_len;
987         }
988 }
989
990 static void
991 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
992                                struct dsw_port *source_port)
993 {
994         uint8_t i;
995
996         dsw_port_flush_out_buffers(dsw, source_port);
997
998         rte_smp_wmb();
999
1000         for (i = 0; i < source_port->emigration_targets_len; i++) {
1001                 struct dsw_queue_flow *qf =
1002                         &source_port->emigration_target_qfs[i];
1003                 uint8_t dest_port_id =
1004                         source_port->emigration_target_port_ids[i];
1005                 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1006
1007                 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1008                         dest_port_id;
1009
1010                 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1011                                                 qf->queue_id, qf->flow_hash);
1012         }
1013
1014         /* Flow table update and migration destination port's enqueues
1015          * must be seen before the control message.
1016          */
1017         rte_smp_wmb();
1018
1019         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1020                                source_port->emigration_target_qfs,
1021                                source_port->emigration_targets_len);
1022         source_port->cfm_cnt = 0;
1023         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1024 }
1025
1026 static void
1027 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1028 {
1029         port->cfm_cnt++;
1030
1031         if (port->cfm_cnt == (dsw->num_ports-1)) {
1032                 switch (port->migration_state) {
1033                 case DSW_MIGRATION_STATE_PAUSING:
1034                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1035                                         "migration state.\n");
1036                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1037                         break;
1038                 case DSW_MIGRATION_STATE_UNPAUSING:
1039                         dsw_port_end_emigration(dsw, port,
1040                                                 RTE_SCHED_TYPE_ATOMIC);
1041                         break;
1042                 default:
1043                         RTE_ASSERT(0);
1044                         break;
1045                 }
1046         }
1047 }
1048
1049 static void
1050 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1051 {
1052         struct dsw_ctl_msg msg;
1053
1054         /* So any table loads happens before the ring dequeue, in the
1055          * case of a 'paus' message.
1056          */
1057         rte_smp_rmb();
1058
1059         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1060                 switch (msg.type) {
1061                 case DSW_CTL_PAUS_REQ:
1062                         dsw_port_handle_pause_flows(dsw, port,
1063                                                     msg.originating_port_id,
1064                                                     msg.qfs, msg.qfs_len);
1065                         break;
1066                 case DSW_CTL_UNPAUS_REQ:
1067                         dsw_port_handle_unpause_flows(dsw, port,
1068                                                       msg.originating_port_id,
1069                                                       msg.qfs, msg.qfs_len);
1070                         break;
1071                 case DSW_CTL_CFM:
1072                         dsw_port_handle_confirm(dsw, port);
1073                         break;
1074                 }
1075         }
1076 }
1077
1078 static void
1079 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1080 {
1081         /* To pull the control ring reasonbly often on busy ports,
1082          * each dequeued/enqueued event is considered an 'op' too.
1083          */
1084         port->ops_since_bg_task += (num_events+1);
1085 }
1086
1087 static void
1088 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1089 {
1090         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1091                      port->pending_releases == 0))
1092                 dsw_port_move_emigrating_flows(dsw, port);
1093
1094         /* Polling the control ring is relatively inexpensive, and
1095          * polling it often helps bringing down migration latency, so
1096          * do this for every iteration.
1097          */
1098         dsw_port_ctl_process(dsw, port);
1099
1100         /* To avoid considering migration and flushing output buffers
1101          * on every dequeue/enqueue call, the scheduler only performs
1102          * such 'background' tasks every nth
1103          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1104          */
1105         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1106                 uint64_t now;
1107
1108                 now = rte_get_timer_cycles();
1109
1110                 port->last_bg = now;
1111
1112                 /* Logic to avoid having events linger in the output
1113                  * buffer too long.
1114                  */
1115                 dsw_port_flush_out_buffers(dsw, port);
1116
1117                 dsw_port_consider_load_update(port, now);
1118
1119                 dsw_port_consider_emigration(dsw, port, now);
1120
1121                 port->ops_since_bg_task = 0;
1122         }
1123 }
1124
1125 static void
1126 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1127 {
1128         uint16_t dest_port_id;
1129
1130         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1131                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1132 }
1133
1134 uint16_t
1135 dsw_event_enqueue(void *port, const struct rte_event *ev)
1136 {
1137         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1138 }
1139
1140 static __rte_always_inline uint16_t
1141 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1142                                 const struct rte_event events[],
1143                                 uint16_t events_len, bool op_types_known,
1144                                 uint16_t num_new, uint16_t num_release,
1145                                 uint16_t num_non_release)
1146 {
1147         struct dsw_evdev *dsw = source_port->dsw;
1148         bool enough_credits;
1149         uint16_t i;
1150
1151         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1152                         "events to port %d.\n", events_len, source_port->id);
1153
1154         dsw_port_bg_process(dsw, source_port);
1155
1156         /* XXX: For performance (=ring efficiency) reasons, the
1157          * scheduler relies on internal non-ring buffers instead of
1158          * immediately sending the event to the destination ring. For
1159          * a producer that doesn't intend to produce or consume any
1160          * more events, the scheduler provides a way to flush the
1161          * buffer, by means of doing an enqueue of zero events. In
1162          * addition, a port cannot be left "unattended" (e.g. unused)
1163          * for long periods of time, since that would stall
1164          * migration. Eventdev API extensions to provide a cleaner way
1165          * to archieve both of these functions should be
1166          * considered.
1167          */
1168         if (unlikely(events_len == 0)) {
1169                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1170                 dsw_port_flush_out_buffers(dsw, source_port);
1171                 return 0;
1172         }
1173
1174         dsw_port_note_op(source_port, events_len);
1175
1176         if (!op_types_known)
1177                 for (i = 0; i < events_len; i++) {
1178                         switch (events[i].op) {
1179                         case RTE_EVENT_OP_RELEASE:
1180                                 num_release++;
1181                                 break;
1182                         case RTE_EVENT_OP_NEW:
1183                                 num_new++;
1184                                 /* Falls through. */
1185                         default:
1186                                 num_non_release++;
1187                                 break;
1188                         }
1189                 }
1190
1191         /* Technically, we could allow the non-new events up to the
1192          * first new event in the array into the system, but for
1193          * simplicity reasons, we deny the whole burst if the port is
1194          * above the water mark.
1195          */
1196         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1197                      source_port->new_event_threshold))
1198                 return 0;
1199
1200         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1201                                                   num_non_release);
1202         if (unlikely(!enough_credits))
1203                 return 0;
1204
1205         source_port->pending_releases -= num_release;
1206
1207         dsw_port_enqueue_stats(source_port, num_new,
1208                                num_non_release-num_new, num_release);
1209
1210         for (i = 0; i < events_len; i++) {
1211                 const struct rte_event *event = &events[i];
1212
1213                 if (likely(num_release == 0 ||
1214                            event->op != RTE_EVENT_OP_RELEASE))
1215                         dsw_port_buffer_event(dsw, source_port, event);
1216                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1217         }
1218
1219         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1220                         "accepted.\n", num_non_release);
1221
1222         return num_non_release;
1223 }
1224
1225 uint16_t
1226 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1227                         uint16_t events_len)
1228 {
1229         struct dsw_port *source_port = port;
1230
1231         if (unlikely(events_len > source_port->enqueue_depth))
1232                 events_len = source_port->enqueue_depth;
1233
1234         return dsw_event_enqueue_burst_generic(source_port, events,
1235                                                events_len, false, 0, 0, 0);
1236 }
1237
1238 uint16_t
1239 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1240                             uint16_t events_len)
1241 {
1242         struct dsw_port *source_port = port;
1243
1244         if (unlikely(events_len > source_port->enqueue_depth))
1245                 events_len = source_port->enqueue_depth;
1246
1247         return dsw_event_enqueue_burst_generic(source_port, events,
1248                                                events_len, true, events_len,
1249                                                0, events_len);
1250 }
1251
1252 uint16_t
1253 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1254                                 uint16_t events_len)
1255 {
1256         struct dsw_port *source_port = port;
1257
1258         if (unlikely(events_len > source_port->enqueue_depth))
1259                 events_len = source_port->enqueue_depth;
1260
1261         return dsw_event_enqueue_burst_generic(source_port, events,
1262                                                events_len, true, 0, 0,
1263                                                events_len);
1264 }
1265
1266 uint16_t
1267 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1268 {
1269         return dsw_event_dequeue_burst(port, events, 1, wait);
1270 }
1271
1272 static void
1273 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1274                             uint16_t num)
1275 {
1276         uint16_t i;
1277
1278         dsw_port_dequeue_stats(port, num);
1279
1280         for (i = 0; i < num; i++) {
1281                 uint16_t l_idx = port->seen_events_idx;
1282                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1283                 struct rte_event *event = &events[i];
1284                 qf->queue_id = event->queue_id;
1285                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1286
1287                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1288
1289                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1290         }
1291
1292         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1293                 port->seen_events_len =
1294                         RTE_MIN(port->seen_events_len + num,
1295                                 DSW_MAX_EVENTS_RECORDED);
1296 }
1297
1298 #ifdef DSW_SORT_DEQUEUED
1299
1300 #define DSW_EVENT_TO_INT(_event)                                \
1301         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1302
1303 static inline int
1304 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1305 {
1306         const struct rte_event *event_a = v_event_a;
1307         const struct rte_event *event_b = v_event_b;
1308
1309         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1310 }
1311 #endif
1312
1313 static uint16_t
1314 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1315                        uint16_t num)
1316 {
1317         struct dsw_port *source_port = port;
1318         struct dsw_evdev *dsw = source_port->dsw;
1319
1320         dsw_port_ctl_process(dsw, source_port);
1321
1322         if (unlikely(port->in_buffer_len > 0)) {
1323                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1324
1325                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1326                            dequeued * sizeof(struct rte_event));
1327
1328                 port->in_buffer_start += dequeued;
1329                 port->in_buffer_len -= dequeued;
1330
1331                 if (port->in_buffer_len == 0)
1332                         port->in_buffer_start = 0;
1333
1334                 return dequeued;
1335         }
1336
1337         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1338 }
1339
1340 uint16_t
1341 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1342                         uint64_t wait __rte_unused)
1343 {
1344         struct dsw_port *source_port = port;
1345         struct dsw_evdev *dsw = source_port->dsw;
1346         uint16_t dequeued;
1347
1348         source_port->pending_releases = 0;
1349
1350         dsw_port_bg_process(dsw, source_port);
1351
1352         if (unlikely(num > source_port->dequeue_depth))
1353                 num = source_port->dequeue_depth;
1354
1355         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1356
1357         source_port->pending_releases = dequeued;
1358
1359         dsw_port_load_record(source_port, dequeued);
1360
1361         dsw_port_note_op(source_port, dequeued);
1362
1363         if (dequeued > 0) {
1364                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1365                                 dequeued);
1366
1367                 dsw_port_return_credits(dsw, source_port, dequeued);
1368
1369                 /* One potential optimization one might think of is to
1370                  * add a migration state (prior to 'pausing'), and
1371                  * only record seen events when the port is in this
1372                  * state (and transit to 'pausing' when enough events
1373                  * have been gathered). However, that schema doesn't
1374                  * seem to improve performance.
1375                  */
1376                 dsw_port_record_seen_events(port, events, dequeued);
1377         } else /* Zero-size dequeue means a likely idle port, and thus
1378                 * we can afford trading some efficiency for a slightly
1379                 * reduced event wall-time latency.
1380                 */
1381                 dsw_port_flush_out_buffers(dsw, port);
1382
1383 #ifdef DSW_SORT_DEQUEUED
1384         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1385 #endif
1386
1387         return dequeued;
1388 }