crypto/aesni_mb: support CPU crypto
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87                        uint16_t num_forward, uint16_t num_release)
88 {
89         port->new_enqueued += num_new;
90         port->forward_enqueued += num_forward;
91         port->release_enqueued += num_release;
92 }
93
94 static void
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97         source_port->queue_enqueued[queue_id]++;
98 }
99
100 static void
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103         port->dequeued += num;
104 }
105
106 static void
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109         source_port->queue_dequeued[queue_id]++;
110 }
111
112 static void
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115         if (dequeued > 0 && port->busy_start == 0)
116                 /* work period begins */
117                 port->busy_start = rte_get_timer_cycles();
118         else if (dequeued == 0 && port->busy_start > 0) {
119                 /* work period ends */
120                 uint64_t work_period =
121                         rte_get_timer_cycles() - port->busy_start;
122                 port->busy_cycles += work_period;
123                 port->busy_start = 0;
124         }
125 }
126
127 static int16_t
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130         uint64_t passed = now - port->measurement_start;
131         uint64_t busy_cycles = port->busy_cycles;
132
133         if (port->busy_start > 0) {
134                 busy_cycles += (now - port->busy_start);
135                 port->busy_start = now;
136         }
137
138         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139
140         port->measurement_start = now;
141         port->busy_cycles = 0;
142
143         port->total_busy_cycles += busy_cycles;
144
145         return load;
146 }
147
148 static void
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151         int16_t old_load;
152         int16_t period_load;
153         int16_t new_load;
154
155         old_load = rte_atomic16_read(&port->load);
156
157         period_load = dsw_port_load_close_period(port, now);
158
159         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160                 (DSW_OLD_LOAD_WEIGHT+1);
161
162         rte_atomic16_set(&port->load, new_load);
163
164         /* The load of the recently immigrated flows should hopefully
165          * be reflected the load estimate by now.
166          */
167         rte_atomic32_set(&port->immigration_load, 0);
168 }
169
170 static void
171 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
172 {
173         if (now < port->next_load_update)
174                 return;
175
176         port->next_load_update = now + port->load_update_interval;
177
178         dsw_port_load_update(port, now);
179 }
180
181 static void
182 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
183 {
184         /* there's always room on the ring */
185         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
186                 rte_pause();
187 }
188
189 static int
190 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
191 {
192         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
193 }
194
195 static void
196 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
197                        uint8_t type, struct dsw_queue_flow *qfs,
198                        uint8_t qfs_len)
199 {
200         uint16_t port_id;
201         struct dsw_ctl_msg msg = {
202                 .type = type,
203                 .originating_port_id = source_port->id,
204                 .qfs_len = qfs_len
205         };
206
207         memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
208
209         for (port_id = 0; port_id < dsw->num_ports; port_id++)
210                 if (port_id != source_port->id)
211                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
212 }
213
214 static __rte_always_inline bool
215 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
216                          uint8_t queue_id, uint16_t flow_hash)
217 {
218         uint16_t i;
219
220         for (i = 0; i < qfs_len; i++)
221                 if (qfs[i].queue_id == queue_id &&
222                     qfs[i].flow_hash == flow_hash)
223                         return true;
224
225         return false;
226 }
227
228 static __rte_always_inline bool
229 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
230                         uint16_t flow_hash)
231 {
232         return dsw_is_queue_flow_in_ary(port->paused_flows,
233                                         port->paused_flows_len,
234                                         queue_id, flow_hash);
235 }
236
237 static void
238 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
239                           uint8_t qfs_len)
240 {
241         uint8_t i;
242
243         for (i = 0; i < qfs_len; i++) {
244                 struct dsw_queue_flow *qf = &qfs[i];
245
246                 DSW_LOG_DP_PORT(DEBUG, port->id,
247                                 "Pausing queue_id %d flow_hash %d.\n",
248                                 qf->queue_id, qf->flow_hash);
249
250                 port->paused_flows[port->paused_flows_len] = *qf;
251                 port->paused_flows_len++;
252         };
253 }
254
255 static void
256 dsw_port_remove_paused_flow(struct dsw_port *port,
257                             struct dsw_queue_flow *target_qf)
258 {
259         uint16_t i;
260
261         for (i = 0; i < port->paused_flows_len; i++) {
262                 struct dsw_queue_flow *qf = &port->paused_flows[i];
263
264                 if (qf->queue_id == target_qf->queue_id &&
265                     qf->flow_hash == target_qf->flow_hash) {
266                         uint16_t last_idx = port->paused_flows_len-1;
267                         if (i != last_idx)
268                                 port->paused_flows[i] =
269                                         port->paused_flows[last_idx];
270                         port->paused_flows_len--;
271                         break;
272                 }
273         }
274 }
275
276 static void
277 dsw_port_remove_paused_flows(struct dsw_port *port,
278                              struct dsw_queue_flow *qfs, uint8_t qfs_len)
279 {
280         uint8_t i;
281
282         for (i = 0; i < qfs_len; i++)
283                 dsw_port_remove_paused_flow(port, &qfs[i]);
284
285 }
286
287 static void
288 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
289
290 static void
291 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
292                             uint8_t originating_port_id,
293                             struct dsw_queue_flow *paused_qfs,
294                             uint8_t qfs_len)
295 {
296         struct dsw_ctl_msg cfm = {
297                 .type = DSW_CTL_CFM,
298                 .originating_port_id = port->id
299         };
300
301         /* There might be already-scheduled events belonging to the
302          * paused flow in the output buffers.
303          */
304         dsw_port_flush_out_buffers(dsw, port);
305
306         dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
307
308         /* Make sure any stores to the original port's in_ring is seen
309          * before the ctl message.
310          */
311         rte_smp_wmb();
312
313         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
314 }
315
316 struct dsw_queue_flow_burst {
317         struct dsw_queue_flow queue_flow;
318         uint16_t count;
319 };
320
321 #define DSW_QF_TO_INT(_qf)                                      \
322         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
323
324 static inline int
325 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
326 {
327         const struct dsw_queue_flow *qf_a = v_qf_a;
328         const struct dsw_queue_flow *qf_b = v_qf_b;
329
330         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
331 }
332
333 static uint16_t
334 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
335                        struct dsw_queue_flow_burst *bursts)
336 {
337         uint16_t i;
338         struct dsw_queue_flow_burst *current_burst = NULL;
339         uint16_t num_bursts = 0;
340
341         /* We don't need the stable property, and the list is likely
342          * large enough for qsort() to outperform dsw_stable_sort(),
343          * so we use qsort() here.
344          */
345         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
346
347         /* arrange the (now-consecutive) events into bursts */
348         for (i = 0; i < qfs_len; i++) {
349                 if (i == 0 ||
350                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
351                         current_burst = &bursts[num_bursts];
352                         current_burst->queue_flow = qfs[i];
353                         current_burst->count = 0;
354                         num_bursts++;
355                 }
356                 current_burst->count++;
357         }
358
359         return num_bursts;
360 }
361
362 static bool
363 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
364                         int16_t load_limit)
365 {
366         bool below_limit = false;
367         uint16_t i;
368
369         for (i = 0; i < dsw->num_ports; i++) {
370                 int16_t measured_load = rte_atomic16_read(&dsw->ports[i].load);
371                 int32_t immigration_load =
372                         rte_atomic32_read(&dsw->ports[i].immigration_load);
373                 int32_t load = measured_load + immigration_load;
374
375                 load = RTE_MIN(load, DSW_MAX_LOAD);
376
377                 if (load < load_limit)
378                         below_limit = true;
379                 port_loads[i] = load;
380         }
381         return below_limit;
382 }
383
384 static int16_t
385 dsw_flow_load(uint16_t num_events, int16_t port_load)
386 {
387         return ((int32_t)port_load * (int32_t)num_events) /
388                 DSW_MAX_EVENTS_RECORDED;
389 }
390
391 static int16_t
392 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
393                        int16_t flow_load)
394 {
395         int32_t res_target_load;
396         int32_t imbalance;
397
398         if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
399                 return -1;
400
401         imbalance = source_load - target_load;
402
403         if (imbalance < DSW_REBALANCE_THRESHOLD)
404                 return -1;
405
406         res_target_load = target_load + flow_load;
407
408         /* If the estimated load of the target port will be higher
409          * than the source port's load, it doesn't make sense to move
410          * the flow.
411          */
412         if (res_target_load > source_load)
413                 return -1;
414
415         /* The more idle the target will be, the better. This will
416          * make migration prefer moving smaller flows, and flows to
417          * lightly loaded ports.
418          */
419         return DSW_MAX_LOAD - res_target_load;
420 }
421
422 static bool
423 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
424 {
425         struct dsw_queue *queue = &dsw->queues[queue_id];
426         uint16_t i;
427
428         for (i = 0; i < queue->num_serving_ports; i++)
429                 if (queue->serving_ports[i] == port_id)
430                         return true;
431
432         return false;
433 }
434
435 static bool
436 dsw_select_emigration_target(struct dsw_evdev *dsw,
437                             struct dsw_queue_flow_burst *bursts,
438                             uint16_t num_bursts, uint8_t source_port_id,
439                             int16_t *port_loads, uint16_t num_ports,
440                             uint8_t *target_port_ids,
441                             struct dsw_queue_flow *target_qfs,
442                             uint8_t *targets_len)
443 {
444         int16_t source_port_load = port_loads[source_port_id];
445         struct dsw_queue_flow *candidate_qf = NULL;
446         uint8_t candidate_port_id = 0;
447         int16_t candidate_weight = -1;
448         int16_t candidate_flow_load = -1;
449         uint16_t i;
450
451         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
452                 return false;
453
454         for (i = 0; i < num_bursts; i++) {
455                 struct dsw_queue_flow_burst *burst = &bursts[i];
456                 struct dsw_queue_flow *qf = &burst->queue_flow;
457                 int16_t flow_load;
458                 uint16_t port_id;
459
460                 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
461                                              qf->queue_id, qf->flow_hash))
462                         continue;
463
464                 flow_load = dsw_flow_load(burst->count, source_port_load);
465
466                 for (port_id = 0; port_id < num_ports; port_id++) {
467                         int16_t weight;
468
469                         if (port_id == source_port_id)
470                                 continue;
471
472                         if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
473                                 continue;
474
475                         weight = dsw_evaluate_migration(source_port_load,
476                                                         port_loads[port_id],
477                                                         flow_load);
478
479                         if (weight > candidate_weight) {
480                                 candidate_qf = qf;
481                                 candidate_port_id = port_id;
482                                 candidate_weight = weight;
483                                 candidate_flow_load = flow_load;
484                         }
485                 }
486         }
487
488         if (candidate_weight < 0)
489                 return false;
490
491         DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
492                         "flow_hash %d (with flow load %d) for migration "
493                         "to port %d.\n", candidate_qf->queue_id,
494                         candidate_qf->flow_hash,
495                         DSW_LOAD_TO_PERCENT(candidate_flow_load),
496                         candidate_port_id);
497
498         port_loads[candidate_port_id] += candidate_flow_load;
499         port_loads[source_port_id] -= candidate_flow_load;
500
501         target_port_ids[*targets_len] = candidate_port_id;
502         target_qfs[*targets_len] = *candidate_qf;
503         (*targets_len)++;
504
505         rte_atomic32_add(&dsw->ports[candidate_port_id].immigration_load,
506                          candidate_flow_load);
507
508         return true;
509 }
510
511 static void
512 dsw_select_emigration_targets(struct dsw_evdev *dsw,
513                               struct dsw_port *source_port,
514                               struct dsw_queue_flow_burst *bursts,
515                               uint16_t num_bursts, int16_t *port_loads)
516 {
517         struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
518         uint8_t *target_port_ids = source_port->emigration_target_port_ids;
519         uint8_t *targets_len = &source_port->emigration_targets_len;
520         uint16_t i;
521
522         for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
523                 bool found;
524
525                 found = dsw_select_emigration_target(dsw, bursts, num_bursts,
526                                                      source_port->id,
527                                                      port_loads, dsw->num_ports,
528                                                      target_port_ids,
529                                                      target_qfs,
530                                                      targets_len);
531                 if (!found)
532                         break;
533         }
534
535         if (*targets_len == 0)
536                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
537                                 "For the %d flows considered, no target port "
538                                 "was found.\n", num_bursts);
539 }
540
541 static uint8_t
542 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
543 {
544         struct dsw_queue *queue = &dsw->queues[queue_id];
545         uint8_t port_id;
546
547         if (queue->num_serving_ports > 1)
548                 port_id = queue->flow_to_port_map[flow_hash];
549         else
550                 /* A single-link queue, or atomic/ordered/parallel but
551                  * with just a single serving port.
552                  */
553                 port_id = queue->serving_ports[0];
554
555         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
556                    "to port %d.\n", queue_id, flow_hash, port_id);
557
558         return port_id;
559 }
560
561 static void
562 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
563                            uint8_t dest_port_id)
564 {
565         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
566         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
567         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
568         uint16_t enqueued = 0;
569
570         if (*buffer_len == 0)
571                 return;
572
573         /* The rings are dimensioned to fit all in-flight events (even
574          * on a single ring), so looping will work.
575          */
576         do {
577                 enqueued +=
578                         rte_event_ring_enqueue_burst(dest_port->in_ring,
579                                                      buffer+enqueued,
580                                                      *buffer_len-enqueued,
581                                                      NULL);
582         } while (unlikely(enqueued != *buffer_len));
583
584         (*buffer_len) = 0;
585 }
586
587 static uint16_t
588 dsw_port_get_parallel_flow_id(struct dsw_port *port)
589 {
590         uint16_t flow_id = port->next_parallel_flow_id;
591
592         port->next_parallel_flow_id =
593                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
594
595         return flow_id;
596 }
597
598 static void
599 dsw_port_buffer_paused(struct dsw_port *port,
600                        const struct rte_event *paused_event)
601 {
602         port->paused_events[port->paused_events_len] = *paused_event;
603         port->paused_events_len++;
604 }
605
606 static void
607 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
608                            uint8_t dest_port_id, const struct rte_event *event)
609 {
610         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
611         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
612
613         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
614                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
615
616         buffer[*buffer_len] = *event;
617
618         (*buffer_len)++;
619 }
620
621 #define DSW_FLOW_ID_BITS (24)
622 static uint16_t
623 dsw_flow_id_hash(uint32_t flow_id)
624 {
625         uint16_t hash = 0;
626         uint16_t offset = 0;
627
628         do {
629                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
630                 offset += DSW_MAX_FLOWS_BITS;
631         } while (offset < DSW_FLOW_ID_BITS);
632
633         return hash;
634 }
635
636 static void
637 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
638                          struct rte_event event)
639 {
640         uint8_t dest_port_id;
641
642         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
643
644         dest_port_id = dsw_schedule(dsw, event.queue_id,
645                                     dsw_flow_id_hash(event.flow_id));
646
647         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
648 }
649
650 static void
651 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
652                       const struct rte_event *event)
653 {
654         uint16_t flow_hash;
655         uint8_t dest_port_id;
656
657         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
658                      RTE_SCHED_TYPE_PARALLEL)) {
659                 dsw_port_buffer_parallel(dsw, source_port, *event);
660                 return;
661         }
662
663         flow_hash = dsw_flow_id_hash(event->flow_id);
664
665         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
666                                              flow_hash))) {
667                 dsw_port_buffer_paused(source_port, event);
668                 return;
669         }
670
671         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
672
673         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
674 }
675
676 static void
677 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
678                              struct dsw_port *source_port,
679                              const struct dsw_queue_flow *qf)
680 {
681         uint16_t paused_events_len = source_port->paused_events_len;
682         struct rte_event paused_events[paused_events_len];
683         uint8_t dest_port_id;
684         uint16_t i;
685
686         if (paused_events_len == 0)
687                 return;
688
689         if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
690                 return;
691
692         rte_memcpy(paused_events, source_port->paused_events,
693                    paused_events_len * sizeof(struct rte_event));
694
695         source_port->paused_events_len = 0;
696
697         dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
698
699         for (i = 0; i < paused_events_len; i++) {
700                 struct rte_event *event = &paused_events[i];
701                 uint16_t flow_hash;
702
703                 flow_hash = dsw_flow_id_hash(event->flow_id);
704
705                 if (event->queue_id == qf->queue_id &&
706                     flow_hash == qf->flow_hash)
707                         dsw_port_buffer_non_paused(dsw, source_port,
708                                                    dest_port_id, event);
709                 else
710                         dsw_port_buffer_paused(source_port, event);
711         }
712 }
713
714 static void
715 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
716 {
717         uint64_t flow_migration_latency;
718
719         flow_migration_latency =
720                 (rte_get_timer_cycles() - port->emigration_start);
721         port->emigration_latency += (flow_migration_latency * finished);
722         port->emigrations += finished;
723 }
724
725 static void
726 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
727                         uint8_t schedule_type)
728 {
729         uint8_t i;
730         struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
731         uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
732         uint8_t left_qfs_len = 0;
733         uint8_t finished;
734
735         for (i = 0; i < port->emigration_targets_len; i++) {
736                 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
737                 uint8_t queue_id = qf->queue_id;
738                 uint8_t queue_schedule_type =
739                         dsw->queues[queue_id].schedule_type;
740                 uint16_t flow_hash = qf->flow_hash;
741
742                 if (queue_schedule_type != schedule_type) {
743                         left_port_ids[left_qfs_len] =
744                                 port->emigration_target_port_ids[i];
745                         left_qfs[left_qfs_len] = *qf;
746                         left_qfs_len++;
747                         continue;
748                 }
749
750                 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
751                                 "queue_id %d flow_hash %d.\n", queue_id,
752                                 flow_hash);
753
754                 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
755                         dsw_port_remove_paused_flow(port, qf);
756                         dsw_port_flush_paused_events(dsw, port, qf);
757                 }
758         }
759
760         finished = port->emigration_targets_len - left_qfs_len;
761
762         if (finished > 0)
763                 dsw_port_emigration_stats(port, finished);
764
765         for (i = 0; i < left_qfs_len; i++) {
766                 port->emigration_target_port_ids[i] = left_port_ids[i];
767                 port->emigration_target_qfs[i] = left_qfs[i];
768         }
769         port->emigration_targets_len = left_qfs_len;
770
771         if (port->emigration_targets_len == 0) {
772                 port->migration_state = DSW_MIGRATION_STATE_IDLE;
773                 port->seen_events_len = 0;
774         }
775 }
776
777 static void
778 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
779                              struct dsw_port *source_port)
780 {
781         uint8_t i;
782
783         for (i = 0; i < source_port->emigration_targets_len; i++) {
784                 struct dsw_queue_flow *qf =
785                         &source_port->emigration_target_qfs[i];
786                 uint8_t queue_id = qf->queue_id;
787
788                 if (dsw->queues[queue_id].schedule_type ==
789                     RTE_SCHED_TYPE_PARALLEL) {
790                         uint8_t dest_port_id =
791                                 source_port->emigration_target_port_ids[i];
792                         uint16_t flow_hash = qf->flow_hash;
793
794                         /* Single byte-sized stores are always atomic. */
795                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
796                                 dest_port_id;
797                 }
798         }
799
800         rte_smp_wmb();
801
802         dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
803 }
804
805 static void
806 dsw_port_consider_emigration(struct dsw_evdev *dsw,
807                              struct dsw_port *source_port,
808                              uint64_t now)
809 {
810         bool any_port_below_limit;
811         struct dsw_queue_flow *seen_events = source_port->seen_events;
812         uint16_t seen_events_len = source_port->seen_events_len;
813         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
814         uint16_t num_bursts;
815         int16_t source_port_load;
816         int16_t port_loads[dsw->num_ports];
817
818         if (now < source_port->next_emigration)
819                 return;
820
821         if (dsw->num_ports == 1)
822                 return;
823
824         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
825
826         /* Randomize interval to avoid having all threads considering
827          * emigration at the same in point in time, which might lead
828          * to all choosing the same target port.
829          */
830         source_port->next_emigration = now +
831                 source_port->migration_interval / 2 +
832                 rte_rand() % source_port->migration_interval;
833
834         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
835                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
836                                 "Emigration already in progress.\n");
837                 return;
838         }
839
840         /* For simplicity, avoid migration in the unlikely case there
841          * is still events to consume in the in_buffer (from the last
842          * emigration).
843          */
844         if (source_port->in_buffer_len > 0) {
845                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
846                                 "events in the input buffer.\n");
847                 return;
848         }
849
850         source_port_load = rte_atomic16_read(&source_port->load);
851         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
852                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
853                       "Load %d is below threshold level %d.\n",
854                       DSW_LOAD_TO_PERCENT(source_port_load),
855                       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
856                 return;
857         }
858
859         /* Avoid starting any expensive operations (sorting etc), in
860          * case of a scenario with all ports above the load limit.
861          */
862         any_port_below_limit =
863                 dsw_retrieve_port_loads(dsw, port_loads,
864                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
865         if (!any_port_below_limit) {
866                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
867                                 "Candidate target ports are all too highly "
868                                 "loaded.\n");
869                 return;
870         }
871
872         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
873                                             bursts);
874
875         /* For non-big-little systems, there's no point in moving the
876          * only (known) flow.
877          */
878         if (num_bursts < 2) {
879                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
880                                 "queue_id %d flow_hash %d has been seen.\n",
881                                 bursts[0].queue_flow.queue_id,
882                                 bursts[0].queue_flow.flow_hash);
883                 return;
884         }
885
886         dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
887                                       port_loads);
888
889         if (source_port->emigration_targets_len == 0)
890                 return;
891
892         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
893         source_port->emigration_start = rte_get_timer_cycles();
894
895         /* No need to go through the whole pause procedure for
896          * parallel queues, since atomic/ordered semantics need not to
897          * be maintained.
898          */
899         dsw_port_move_parallel_flows(dsw, source_port);
900
901         /* All flows were on PARALLEL queues. */
902         if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
903                 return;
904
905         /* There might be 'loopback' events already scheduled in the
906          * output buffers.
907          */
908         dsw_port_flush_out_buffers(dsw, source_port);
909
910         dsw_port_add_paused_flows(source_port,
911                                   source_port->emigration_target_qfs,
912                                   source_port->emigration_targets_len);
913
914         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
915                                source_port->emigration_target_qfs,
916                                source_port->emigration_targets_len);
917         source_port->cfm_cnt = 0;
918 }
919
920 static void
921 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
922                              struct dsw_port *source_port,
923                              const struct dsw_queue_flow *qf);
924
925 static void
926 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
927                               uint8_t originating_port_id,
928                               struct dsw_queue_flow *paused_qfs,
929                               uint8_t qfs_len)
930 {
931         uint16_t i;
932         struct dsw_ctl_msg cfm = {
933                 .type = DSW_CTL_CFM,
934                 .originating_port_id = port->id
935         };
936
937         dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
938
939         rte_smp_rmb();
940
941         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
942
943         for (i = 0; i < qfs_len; i++) {
944                 struct dsw_queue_flow *qf = &paused_qfs[i];
945
946                 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
947                         port->immigrations++;
948
949                 dsw_port_flush_paused_events(dsw, port, qf);
950         }
951 }
952
953 #define FORWARD_BURST_SIZE (32)
954
955 static void
956 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
957                                 struct rte_event_ring *dest_ring,
958                                 uint8_t queue_id,
959                                 uint16_t flow_hash)
960 {
961         uint16_t events_left;
962
963         /* Control ring message should been seen before the ring count
964          * is read on the port's in_ring.
965          */
966         rte_smp_rmb();
967
968         events_left = rte_event_ring_count(source_port->in_ring);
969
970         while (events_left > 0) {
971                 uint16_t in_burst_size =
972                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
973                 struct rte_event in_burst[in_burst_size];
974                 uint16_t in_len;
975                 uint16_t i;
976
977                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
978                                                       in_burst,
979                                                       in_burst_size, NULL);
980                 /* No need to care about bursting forwarded events (to
981                  * the destination port's in_ring), since migration
982                  * doesn't happen very often, and also the majority of
983                  * the dequeued events will likely *not* be forwarded.
984                  */
985                 for (i = 0; i < in_len; i++) {
986                         struct rte_event *e = &in_burst[i];
987                         if (e->queue_id == queue_id &&
988                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
989                                 while (rte_event_ring_enqueue_burst(dest_ring,
990                                                                     e, 1,
991                                                                     NULL) != 1)
992                                         rte_pause();
993                         } else {
994                                 uint16_t last_idx = source_port->in_buffer_len;
995                                 source_port->in_buffer[last_idx] = *e;
996                                 source_port->in_buffer_len++;
997                         }
998                 }
999
1000                 events_left -= in_len;
1001         }
1002 }
1003
1004 static void
1005 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1006                                struct dsw_port *source_port)
1007 {
1008         uint8_t i;
1009
1010         dsw_port_flush_out_buffers(dsw, source_port);
1011
1012         rte_smp_wmb();
1013
1014         for (i = 0; i < source_port->emigration_targets_len; i++) {
1015                 struct dsw_queue_flow *qf =
1016                         &source_port->emigration_target_qfs[i];
1017                 uint8_t dest_port_id =
1018                         source_port->emigration_target_port_ids[i];
1019                 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1020
1021                 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1022                         dest_port_id;
1023
1024                 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1025                                                 qf->queue_id, qf->flow_hash);
1026         }
1027
1028         /* Flow table update and migration destination port's enqueues
1029          * must be seen before the control message.
1030          */
1031         rte_smp_wmb();
1032
1033         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1034                                source_port->emigration_target_qfs,
1035                                source_port->emigration_targets_len);
1036         source_port->cfm_cnt = 0;
1037         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1038 }
1039
1040 static void
1041 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1042 {
1043         port->cfm_cnt++;
1044
1045         if (port->cfm_cnt == (dsw->num_ports-1)) {
1046                 switch (port->migration_state) {
1047                 case DSW_MIGRATION_STATE_PAUSING:
1048                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1049                                         "migration state.\n");
1050                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1051                         break;
1052                 case DSW_MIGRATION_STATE_UNPAUSING:
1053                         dsw_port_end_emigration(dsw, port,
1054                                                 RTE_SCHED_TYPE_ATOMIC);
1055                         break;
1056                 default:
1057                         RTE_ASSERT(0);
1058                         break;
1059                 }
1060         }
1061 }
1062
1063 static void
1064 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1065 {
1066         struct dsw_ctl_msg msg;
1067
1068         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1069                 switch (msg.type) {
1070                 case DSW_CTL_PAUS_REQ:
1071                         dsw_port_handle_pause_flows(dsw, port,
1072                                                     msg.originating_port_id,
1073                                                     msg.qfs, msg.qfs_len);
1074                         break;
1075                 case DSW_CTL_UNPAUS_REQ:
1076                         dsw_port_handle_unpause_flows(dsw, port,
1077                                                       msg.originating_port_id,
1078                                                       msg.qfs, msg.qfs_len);
1079                         break;
1080                 case DSW_CTL_CFM:
1081                         dsw_port_handle_confirm(dsw, port);
1082                         break;
1083                 }
1084         }
1085 }
1086
1087 static void
1088 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1089 {
1090         /* To pull the control ring reasonbly often on busy ports,
1091          * each dequeued/enqueued event is considered an 'op' too.
1092          */
1093         port->ops_since_bg_task += (num_events+1);
1094 }
1095
1096 static void
1097 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1098 {
1099         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1100                      port->pending_releases == 0))
1101                 dsw_port_move_emigrating_flows(dsw, port);
1102
1103         /* Polling the control ring is relatively inexpensive, and
1104          * polling it often helps bringing down migration latency, so
1105          * do this for every iteration.
1106          */
1107         dsw_port_ctl_process(dsw, port);
1108
1109         /* To avoid considering migration and flushing output buffers
1110          * on every dequeue/enqueue call, the scheduler only performs
1111          * such 'background' tasks every nth
1112          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1113          */
1114         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1115                 uint64_t now;
1116
1117                 now = rte_get_timer_cycles();
1118
1119                 port->last_bg = now;
1120
1121                 /* Logic to avoid having events linger in the output
1122                  * buffer too long.
1123                  */
1124                 dsw_port_flush_out_buffers(dsw, port);
1125
1126                 dsw_port_consider_load_update(port, now);
1127
1128                 dsw_port_consider_emigration(dsw, port, now);
1129
1130                 port->ops_since_bg_task = 0;
1131         }
1132 }
1133
1134 static void
1135 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1136 {
1137         uint16_t dest_port_id;
1138
1139         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1140                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1141 }
1142
1143 uint16_t
1144 dsw_event_enqueue(void *port, const struct rte_event *ev)
1145 {
1146         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1147 }
1148
1149 static __rte_always_inline uint16_t
1150 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1151                                 const struct rte_event events[],
1152                                 uint16_t events_len, bool op_types_known,
1153                                 uint16_t num_new, uint16_t num_release,
1154                                 uint16_t num_non_release)
1155 {
1156         struct dsw_evdev *dsw = source_port->dsw;
1157         bool enough_credits;
1158         uint16_t i;
1159
1160         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1161                         "events to port %d.\n", events_len, source_port->id);
1162
1163         dsw_port_bg_process(dsw, source_port);
1164
1165         /* XXX: For performance (=ring efficiency) reasons, the
1166          * scheduler relies on internal non-ring buffers instead of
1167          * immediately sending the event to the destination ring. For
1168          * a producer that doesn't intend to produce or consume any
1169          * more events, the scheduler provides a way to flush the
1170          * buffer, by means of doing an enqueue of zero events. In
1171          * addition, a port cannot be left "unattended" (e.g. unused)
1172          * for long periods of time, since that would stall
1173          * migration. Eventdev API extensions to provide a cleaner way
1174          * to archieve both of these functions should be
1175          * considered.
1176          */
1177         if (unlikely(events_len == 0)) {
1178                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1179                 dsw_port_flush_out_buffers(dsw, source_port);
1180                 return 0;
1181         }
1182
1183         dsw_port_note_op(source_port, events_len);
1184
1185         if (!op_types_known)
1186                 for (i = 0; i < events_len; i++) {
1187                         switch (events[i].op) {
1188                         case RTE_EVENT_OP_RELEASE:
1189                                 num_release++;
1190                                 break;
1191                         case RTE_EVENT_OP_NEW:
1192                                 num_new++;
1193                                 /* Falls through. */
1194                         default:
1195                                 num_non_release++;
1196                                 break;
1197                         }
1198                 }
1199
1200         /* Technically, we could allow the non-new events up to the
1201          * first new event in the array into the system, but for
1202          * simplicity reasons, we deny the whole burst if the port is
1203          * above the water mark.
1204          */
1205         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1206                      source_port->new_event_threshold))
1207                 return 0;
1208
1209         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1210                                                   num_non_release);
1211         if (unlikely(!enough_credits))
1212                 return 0;
1213
1214         source_port->pending_releases -= num_release;
1215
1216         dsw_port_enqueue_stats(source_port, num_new,
1217                                num_non_release-num_new, num_release);
1218
1219         for (i = 0; i < events_len; i++) {
1220                 const struct rte_event *event = &events[i];
1221
1222                 if (likely(num_release == 0 ||
1223                            event->op != RTE_EVENT_OP_RELEASE))
1224                         dsw_port_buffer_event(dsw, source_port, event);
1225                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1226         }
1227
1228         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1229                         "accepted.\n", num_non_release);
1230
1231         return num_non_release;
1232 }
1233
1234 uint16_t
1235 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1236                         uint16_t events_len)
1237 {
1238         struct dsw_port *source_port = port;
1239
1240         if (unlikely(events_len > source_port->enqueue_depth))
1241                 events_len = source_port->enqueue_depth;
1242
1243         return dsw_event_enqueue_burst_generic(source_port, events,
1244                                                events_len, false, 0, 0, 0);
1245 }
1246
1247 uint16_t
1248 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1249                             uint16_t events_len)
1250 {
1251         struct dsw_port *source_port = port;
1252
1253         if (unlikely(events_len > source_port->enqueue_depth))
1254                 events_len = source_port->enqueue_depth;
1255
1256         return dsw_event_enqueue_burst_generic(source_port, events,
1257                                                events_len, true, events_len,
1258                                                0, events_len);
1259 }
1260
1261 uint16_t
1262 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1263                                 uint16_t events_len)
1264 {
1265         struct dsw_port *source_port = port;
1266
1267         if (unlikely(events_len > source_port->enqueue_depth))
1268                 events_len = source_port->enqueue_depth;
1269
1270         return dsw_event_enqueue_burst_generic(source_port, events,
1271                                                events_len, true, 0, 0,
1272                                                events_len);
1273 }
1274
1275 uint16_t
1276 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1277 {
1278         return dsw_event_dequeue_burst(port, events, 1, wait);
1279 }
1280
1281 static void
1282 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1283                             uint16_t num)
1284 {
1285         uint16_t i;
1286
1287         dsw_port_dequeue_stats(port, num);
1288
1289         for (i = 0; i < num; i++) {
1290                 uint16_t l_idx = port->seen_events_idx;
1291                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1292                 struct rte_event *event = &events[i];
1293                 qf->queue_id = event->queue_id;
1294                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1295
1296                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1297
1298                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1299         }
1300
1301         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1302                 port->seen_events_len =
1303                         RTE_MIN(port->seen_events_len + num,
1304                                 DSW_MAX_EVENTS_RECORDED);
1305 }
1306
1307 #ifdef DSW_SORT_DEQUEUED
1308
1309 #define DSW_EVENT_TO_INT(_event)                                \
1310         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1311
1312 static inline int
1313 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1314 {
1315         const struct rte_event *event_a = v_event_a;
1316         const struct rte_event *event_b = v_event_b;
1317
1318         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1319 }
1320 #endif
1321
1322 static uint16_t
1323 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1324                        uint16_t num)
1325 {
1326         if (unlikely(port->in_buffer_len > 0)) {
1327                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1328
1329                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1330                            dequeued * sizeof(struct rte_event));
1331
1332                 port->in_buffer_start += dequeued;
1333                 port->in_buffer_len -= dequeued;
1334
1335                 if (port->in_buffer_len == 0)
1336                         port->in_buffer_start = 0;
1337
1338                 return dequeued;
1339         }
1340
1341         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1342 }
1343
1344 uint16_t
1345 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1346                         uint64_t wait __rte_unused)
1347 {
1348         struct dsw_port *source_port = port;
1349         struct dsw_evdev *dsw = source_port->dsw;
1350         uint16_t dequeued;
1351
1352         source_port->pending_releases = 0;
1353
1354         dsw_port_bg_process(dsw, source_port);
1355
1356         if (unlikely(num > source_port->dequeue_depth))
1357                 num = source_port->dequeue_depth;
1358
1359         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1360
1361         source_port->pending_releases = dequeued;
1362
1363         dsw_port_load_record(source_port, dequeued);
1364
1365         dsw_port_note_op(source_port, dequeued);
1366
1367         if (dequeued > 0) {
1368                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1369                                 dequeued);
1370
1371                 dsw_port_return_credits(dsw, source_port, dequeued);
1372
1373                 /* One potential optimization one might think of is to
1374                  * add a migration state (prior to 'pausing'), and
1375                  * only record seen events when the port is in this
1376                  * state (and transit to 'pausing' when enough events
1377                  * have been gathered). However, that schema doesn't
1378                  * seem to improve performance.
1379                  */
1380                 dsw_port_record_seen_events(port, events, dequeued);
1381         } else /* Zero-size dequeue means a likely idle port, and thus
1382                 * we can afford trading some efficiency for a slightly
1383                 * reduced event wall-time latency.
1384                 */
1385                 dsw_port_flush_out_buffers(dsw, port);
1386
1387 #ifdef DSW_SORT_DEQUEUED
1388         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1389 #endif
1390
1391         return dequeued;
1392 }