event/dsw: flag adapters capabilities
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_cycles.h>
15 #include <rte_memcpy.h>
16 #include <rte_random.h>
17
18 static bool
19 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
20                          int32_t credits)
21 {
22         int32_t inflight_credits = port->inflight_credits;
23         int32_t missing_credits = credits - inflight_credits;
24         int32_t total_on_loan;
25         int32_t available;
26         int32_t acquired_credits;
27         int32_t new_total_on_loan;
28
29         if (likely(missing_credits <= 0)) {
30                 port->inflight_credits -= credits;
31                 return true;
32         }
33
34         total_on_loan =
35                 __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan =
47             __atomic_add_fetch(&dsw->credits_on_loan, acquired_credits,
48                                __ATOMIC_RELAXED);
49
50         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
51                 /* Some other port took the last credits */
52                 __atomic_sub_fetch(&dsw->credits_on_loan, acquired_credits,
53                                    __ATOMIC_RELAXED);
54                 return false;
55         }
56
57         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
58                         acquired_credits);
59
60         port->inflight_credits += acquired_credits;
61         port->inflight_credits -= credits;
62
63         return true;
64 }
65
66 static void
67 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
68                         int32_t credits)
69 {
70         port->inflight_credits += credits;
71
72         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
73                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
74                 int32_t return_credits =
75                         port->inflight_credits - leave_credits;
76
77                 port->inflight_credits = leave_credits;
78
79                 __atomic_sub_fetch(&dsw->credits_on_loan, return_credits,
80                                    __ATOMIC_RELAXED);
81
82                 DSW_LOG_DP_PORT(DEBUG, port->id,
83                                 "Returned %d tokens to pool.\n",
84                                 return_credits);
85         }
86 }
87
88 static void
89 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
90                        uint16_t num_forward, uint16_t num_release)
91 {
92         port->new_enqueued += num_new;
93         port->forward_enqueued += num_forward;
94         port->release_enqueued += num_release;
95 }
96
97 static void
98 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
99 {
100         source_port->queue_enqueued[queue_id]++;
101 }
102
103 static void
104 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
105 {
106         port->dequeued += num;
107 }
108
109 static void
110 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
111 {
112         source_port->queue_dequeued[queue_id]++;
113 }
114
115 static void
116 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
117 {
118         if (dequeued > 0 && port->busy_start == 0)
119                 /* work period begins */
120                 port->busy_start = rte_get_timer_cycles();
121         else if (dequeued == 0 && port->busy_start > 0) {
122                 /* work period ends */
123                 uint64_t work_period =
124                         rte_get_timer_cycles() - port->busy_start;
125                 port->busy_cycles += work_period;
126                 port->busy_start = 0;
127         }
128 }
129
130 static int16_t
131 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
132 {
133         uint64_t passed = now - port->measurement_start;
134         uint64_t busy_cycles = port->busy_cycles;
135
136         if (port->busy_start > 0) {
137                 busy_cycles += (now - port->busy_start);
138                 port->busy_start = now;
139         }
140
141         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
142
143         port->measurement_start = now;
144         port->busy_cycles = 0;
145
146         port->total_busy_cycles += busy_cycles;
147
148         return load;
149 }
150
151 static void
152 dsw_port_load_update(struct dsw_port *port, uint64_t now)
153 {
154         int16_t old_load;
155         int16_t period_load;
156         int16_t new_load;
157
158         old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
159
160         period_load = dsw_port_load_close_period(port, now);
161
162         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
163                 (DSW_OLD_LOAD_WEIGHT+1);
164
165         __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
166
167         /* The load of the recently immigrated flows should hopefully
168          * be reflected the load estimate by now.
169          */
170         __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
171 }
172
173 static void
174 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
175 {
176         if (now < port->next_load_update)
177                 return;
178
179         port->next_load_update = now + port->load_update_interval;
180
181         dsw_port_load_update(port, now);
182 }
183
184 static void
185 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186 {
187         /* there's always room on the ring */
188         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
189                 rte_pause();
190 }
191
192 static int
193 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
194 {
195         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
196 }
197
198 static void
199 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
200                        uint8_t type, struct dsw_queue_flow *qfs,
201                        uint8_t qfs_len)
202 {
203         uint16_t port_id;
204         struct dsw_ctl_msg msg = {
205                 .type = type,
206                 .originating_port_id = source_port->id,
207                 .qfs_len = qfs_len
208         };
209
210         memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
211
212         for (port_id = 0; port_id < dsw->num_ports; port_id++)
213                 if (port_id != source_port->id)
214                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
215 }
216
217 static __rte_always_inline bool
218 dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
219                          uint8_t queue_id, uint16_t flow_hash)
220 {
221         uint16_t i;
222
223         for (i = 0; i < qfs_len; i++)
224                 if (qfs[i].queue_id == queue_id &&
225                     qfs[i].flow_hash == flow_hash)
226                         return true;
227
228         return false;
229 }
230
231 static __rte_always_inline bool
232 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
233                         uint16_t flow_hash)
234 {
235         return dsw_is_queue_flow_in_ary(port->paused_flows,
236                                         port->paused_flows_len,
237                                         queue_id, flow_hash);
238 }
239
240 static void
241 dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
242                           uint8_t qfs_len)
243 {
244         uint8_t i;
245
246         for (i = 0; i < qfs_len; i++) {
247                 struct dsw_queue_flow *qf = &qfs[i];
248
249                 DSW_LOG_DP_PORT(DEBUG, port->id,
250                                 "Pausing queue_id %d flow_hash %d.\n",
251                                 qf->queue_id, qf->flow_hash);
252
253                 port->paused_flows[port->paused_flows_len] = *qf;
254                 port->paused_flows_len++;
255         };
256 }
257
258 static void
259 dsw_port_remove_paused_flow(struct dsw_port *port,
260                             struct dsw_queue_flow *target_qf)
261 {
262         uint16_t i;
263
264         for (i = 0; i < port->paused_flows_len; i++) {
265                 struct dsw_queue_flow *qf = &port->paused_flows[i];
266
267                 if (qf->queue_id == target_qf->queue_id &&
268                     qf->flow_hash == target_qf->flow_hash) {
269                         uint16_t last_idx = port->paused_flows_len-1;
270                         if (i != last_idx)
271                                 port->paused_flows[i] =
272                                         port->paused_flows[last_idx];
273                         port->paused_flows_len--;
274                         break;
275                 }
276         }
277 }
278
279 static void
280 dsw_port_remove_paused_flows(struct dsw_port *port,
281                              struct dsw_queue_flow *qfs, uint8_t qfs_len)
282 {
283         uint8_t i;
284
285         for (i = 0; i < qfs_len; i++)
286                 dsw_port_remove_paused_flow(port, &qfs[i]);
287
288 }
289
290 static void
291 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
292
293 static void
294 dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
295                             uint8_t originating_port_id,
296                             struct dsw_queue_flow *paused_qfs,
297                             uint8_t qfs_len)
298 {
299         struct dsw_ctl_msg cfm = {
300                 .type = DSW_CTL_CFM,
301                 .originating_port_id = port->id
302         };
303
304         /* There might be already-scheduled events belonging to the
305          * paused flow in the output buffers.
306          */
307         dsw_port_flush_out_buffers(dsw, port);
308
309         dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
310
311         /* Make sure any stores to the original port's in_ring is seen
312          * before the ctl message.
313          */
314         rte_smp_wmb();
315
316         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
317 }
318
319 struct dsw_queue_flow_burst {
320         struct dsw_queue_flow queue_flow;
321         uint16_t count;
322 };
323
324 #define DSW_QF_TO_INT(_qf)                                      \
325         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
326
327 static inline int
328 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
329 {
330         const struct dsw_queue_flow *qf_a = v_qf_a;
331         const struct dsw_queue_flow *qf_b = v_qf_b;
332
333         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
334 }
335
336 static uint16_t
337 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
338                        struct dsw_queue_flow_burst *bursts)
339 {
340         uint16_t i;
341         struct dsw_queue_flow_burst *current_burst = NULL;
342         uint16_t num_bursts = 0;
343
344         /* We don't need the stable property, and the list is likely
345          * large enough for qsort() to outperform dsw_stable_sort(),
346          * so we use qsort() here.
347          */
348         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
349
350         /* arrange the (now-consecutive) events into bursts */
351         for (i = 0; i < qfs_len; i++) {
352                 if (i == 0 ||
353                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
354                         current_burst = &bursts[num_bursts];
355                         current_burst->queue_flow = qfs[i];
356                         current_burst->count = 0;
357                         num_bursts++;
358                 }
359                 current_burst->count++;
360         }
361
362         return num_bursts;
363 }
364
365 static bool
366 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
367                         int16_t load_limit)
368 {
369         bool below_limit = false;
370         uint16_t i;
371
372         for (i = 0; i < dsw->num_ports; i++) {
373                 int16_t measured_load =
374                         __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
375                 int32_t immigration_load =
376                         __atomic_load_n(&dsw->ports[i].immigration_load,
377                                         __ATOMIC_RELAXED);
378                 int32_t load = measured_load + immigration_load;
379
380                 load = RTE_MIN(load, DSW_MAX_LOAD);
381
382                 if (load < load_limit)
383                         below_limit = true;
384                 port_loads[i] = load;
385         }
386         return below_limit;
387 }
388
389 static int16_t
390 dsw_flow_load(uint16_t num_events, int16_t port_load)
391 {
392         return ((int32_t)port_load * (int32_t)num_events) /
393                 DSW_MAX_EVENTS_RECORDED;
394 }
395
396 static int16_t
397 dsw_evaluate_migration(int16_t source_load, int16_t target_load,
398                        int16_t flow_load)
399 {
400         int32_t res_target_load;
401         int32_t imbalance;
402
403         if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
404                 return -1;
405
406         imbalance = source_load - target_load;
407
408         if (imbalance < DSW_REBALANCE_THRESHOLD)
409                 return -1;
410
411         res_target_load = target_load + flow_load;
412
413         /* If the estimated load of the target port will be higher
414          * than the source port's load, it doesn't make sense to move
415          * the flow.
416          */
417         if (res_target_load > source_load)
418                 return -1;
419
420         /* The more idle the target will be, the better. This will
421          * make migration prefer moving smaller flows, and flows to
422          * lightly loaded ports.
423          */
424         return DSW_MAX_LOAD - res_target_load;
425 }
426
427 static bool
428 dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
429 {
430         struct dsw_queue *queue = &dsw->queues[queue_id];
431         uint16_t i;
432
433         for (i = 0; i < queue->num_serving_ports; i++)
434                 if (queue->serving_ports[i] == port_id)
435                         return true;
436
437         return false;
438 }
439
440 static bool
441 dsw_select_emigration_target(struct dsw_evdev *dsw,
442                             struct dsw_queue_flow_burst *bursts,
443                             uint16_t num_bursts, uint8_t source_port_id,
444                             int16_t *port_loads, uint16_t num_ports,
445                             uint8_t *target_port_ids,
446                             struct dsw_queue_flow *target_qfs,
447                             uint8_t *targets_len)
448 {
449         int16_t source_port_load = port_loads[source_port_id];
450         struct dsw_queue_flow *candidate_qf = NULL;
451         uint8_t candidate_port_id = 0;
452         int16_t candidate_weight = -1;
453         int16_t candidate_flow_load = -1;
454         uint16_t i;
455
456         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
457                 return false;
458
459         for (i = 0; i < num_bursts; i++) {
460                 struct dsw_queue_flow_burst *burst = &bursts[i];
461                 struct dsw_queue_flow *qf = &burst->queue_flow;
462                 int16_t flow_load;
463                 uint16_t port_id;
464
465                 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
466                                              qf->queue_id, qf->flow_hash))
467                         continue;
468
469                 flow_load = dsw_flow_load(burst->count, source_port_load);
470
471                 for (port_id = 0; port_id < num_ports; port_id++) {
472                         int16_t weight;
473
474                         if (port_id == source_port_id)
475                                 continue;
476
477                         if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
478                                 continue;
479
480                         weight = dsw_evaluate_migration(source_port_load,
481                                                         port_loads[port_id],
482                                                         flow_load);
483
484                         if (weight > candidate_weight) {
485                                 candidate_qf = qf;
486                                 candidate_port_id = port_id;
487                                 candidate_weight = weight;
488                                 candidate_flow_load = flow_load;
489                         }
490                 }
491         }
492
493         if (candidate_weight < 0)
494                 return false;
495
496         DSW_LOG_DP_PORT(DEBUG, source_port_id, "Selected queue_id %d "
497                         "flow_hash %d (with flow load %d) for migration "
498                         "to port %d.\n", candidate_qf->queue_id,
499                         candidate_qf->flow_hash,
500                         DSW_LOAD_TO_PERCENT(candidate_flow_load),
501                         candidate_port_id);
502
503         port_loads[candidate_port_id] += candidate_flow_load;
504         port_loads[source_port_id] -= candidate_flow_load;
505
506         target_port_ids[*targets_len] = candidate_port_id;
507         target_qfs[*targets_len] = *candidate_qf;
508         (*targets_len)++;
509
510         __atomic_add_fetch(&dsw->ports[candidate_port_id].immigration_load,
511                            candidate_flow_load, __ATOMIC_RELAXED);
512
513         return true;
514 }
515
516 static void
517 dsw_select_emigration_targets(struct dsw_evdev *dsw,
518                               struct dsw_port *source_port,
519                               struct dsw_queue_flow_burst *bursts,
520                               uint16_t num_bursts, int16_t *port_loads)
521 {
522         struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
523         uint8_t *target_port_ids = source_port->emigration_target_port_ids;
524         uint8_t *targets_len = &source_port->emigration_targets_len;
525         uint16_t i;
526
527         for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
528                 bool found;
529
530                 found = dsw_select_emigration_target(dsw, bursts, num_bursts,
531                                                      source_port->id,
532                                                      port_loads, dsw->num_ports,
533                                                      target_port_ids,
534                                                      target_qfs,
535                                                      targets_len);
536                 if (!found)
537                         break;
538         }
539
540         if (*targets_len == 0)
541                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
542                                 "For the %d flows considered, no target port "
543                                 "was found.\n", num_bursts);
544 }
545
546 static uint8_t
547 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
548 {
549         struct dsw_queue *queue = &dsw->queues[queue_id];
550         uint8_t port_id;
551
552         if (queue->num_serving_ports > 1)
553                 port_id = queue->flow_to_port_map[flow_hash];
554         else
555                 /* A single-link queue, or atomic/ordered/parallel but
556                  * with just a single serving port.
557                  */
558                 port_id = queue->serving_ports[0];
559
560         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
561                    "to port %d.\n", queue_id, flow_hash, port_id);
562
563         return port_id;
564 }
565
566 static void
567 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
568                            uint8_t dest_port_id)
569 {
570         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
571         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
572         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
573         uint16_t enqueued = 0;
574
575         if (*buffer_len == 0)
576                 return;
577
578         /* The rings are dimensioned to fit all in-flight events (even
579          * on a single ring), so looping will work.
580          */
581         do {
582                 enqueued +=
583                         rte_event_ring_enqueue_burst(dest_port->in_ring,
584                                                      buffer+enqueued,
585                                                      *buffer_len-enqueued,
586                                                      NULL);
587         } while (unlikely(enqueued != *buffer_len));
588
589         (*buffer_len) = 0;
590 }
591
592 static uint16_t
593 dsw_port_get_parallel_flow_id(struct dsw_port *port)
594 {
595         uint16_t flow_id = port->next_parallel_flow_id;
596
597         port->next_parallel_flow_id =
598                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
599
600         return flow_id;
601 }
602
603 static void
604 dsw_port_buffer_paused(struct dsw_port *port,
605                        const struct rte_event *paused_event)
606 {
607         port->paused_events[port->paused_events_len] = *paused_event;
608         port->paused_events_len++;
609 }
610
611 static void
612 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
613                            uint8_t dest_port_id, const struct rte_event *event)
614 {
615         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
616         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
617
618         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
619                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
620
621         buffer[*buffer_len] = *event;
622
623         (*buffer_len)++;
624 }
625
626 #define DSW_FLOW_ID_BITS (24)
627 static uint16_t
628 dsw_flow_id_hash(uint32_t flow_id)
629 {
630         uint16_t hash = 0;
631         uint16_t offset = 0;
632
633         do {
634                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
635                 offset += DSW_MAX_FLOWS_BITS;
636         } while (offset < DSW_FLOW_ID_BITS);
637
638         return hash;
639 }
640
641 static void
642 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
643                          struct rte_event event)
644 {
645         uint8_t dest_port_id;
646
647         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
648
649         dest_port_id = dsw_schedule(dsw, event.queue_id,
650                                     dsw_flow_id_hash(event.flow_id));
651
652         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
653 }
654
655 static void
656 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
657                       const struct rte_event *event)
658 {
659         uint16_t flow_hash;
660         uint8_t dest_port_id;
661
662         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
663                      RTE_SCHED_TYPE_PARALLEL)) {
664                 dsw_port_buffer_parallel(dsw, source_port, *event);
665                 return;
666         }
667
668         flow_hash = dsw_flow_id_hash(event->flow_id);
669
670         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
671                                              flow_hash))) {
672                 dsw_port_buffer_paused(source_port, event);
673                 return;
674         }
675
676         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
677
678         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
679 }
680
681 static void
682 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
683                              struct dsw_port *source_port,
684                              const struct dsw_queue_flow *qf)
685 {
686         uint16_t paused_events_len = source_port->paused_events_len;
687         struct rte_event paused_events[paused_events_len];
688         uint8_t dest_port_id;
689         uint16_t i;
690
691         if (paused_events_len == 0)
692                 return;
693
694         if (dsw_port_is_flow_paused(source_port, qf->queue_id, qf->flow_hash))
695                 return;
696
697         rte_memcpy(paused_events, source_port->paused_events,
698                    paused_events_len * sizeof(struct rte_event));
699
700         source_port->paused_events_len = 0;
701
702         dest_port_id = dsw_schedule(dsw, qf->queue_id, qf->flow_hash);
703
704         for (i = 0; i < paused_events_len; i++) {
705                 struct rte_event *event = &paused_events[i];
706                 uint16_t flow_hash;
707
708                 flow_hash = dsw_flow_id_hash(event->flow_id);
709
710                 if (event->queue_id == qf->queue_id &&
711                     flow_hash == qf->flow_hash)
712                         dsw_port_buffer_non_paused(dsw, source_port,
713                                                    dest_port_id, event);
714                 else
715                         dsw_port_buffer_paused(source_port, event);
716         }
717 }
718
719 static void
720 dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
721 {
722         uint64_t flow_migration_latency;
723
724         flow_migration_latency =
725                 (rte_get_timer_cycles() - port->emigration_start);
726         port->emigration_latency += (flow_migration_latency * finished);
727         port->emigrations += finished;
728 }
729
730 static void
731 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
732                         uint8_t schedule_type)
733 {
734         uint8_t i;
735         struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
736         uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
737         uint8_t left_qfs_len = 0;
738         uint8_t finished;
739
740         for (i = 0; i < port->emigration_targets_len; i++) {
741                 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
742                 uint8_t queue_id = qf->queue_id;
743                 uint8_t queue_schedule_type =
744                         dsw->queues[queue_id].schedule_type;
745                 uint16_t flow_hash = qf->flow_hash;
746
747                 if (queue_schedule_type != schedule_type) {
748                         left_port_ids[left_qfs_len] =
749                                 port->emigration_target_port_ids[i];
750                         left_qfs[left_qfs_len] = *qf;
751                         left_qfs_len++;
752                         continue;
753                 }
754
755                 DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
756                                 "queue_id %d flow_hash %d.\n", queue_id,
757                                 flow_hash);
758
759                 if (queue_schedule_type == RTE_SCHED_TYPE_ATOMIC) {
760                         dsw_port_remove_paused_flow(port, qf);
761                         dsw_port_flush_paused_events(dsw, port, qf);
762                 }
763         }
764
765         finished = port->emigration_targets_len - left_qfs_len;
766
767         if (finished > 0)
768                 dsw_port_emigration_stats(port, finished);
769
770         for (i = 0; i < left_qfs_len; i++) {
771                 port->emigration_target_port_ids[i] = left_port_ids[i];
772                 port->emigration_target_qfs[i] = left_qfs[i];
773         }
774         port->emigration_targets_len = left_qfs_len;
775
776         if (port->emigration_targets_len == 0) {
777                 port->migration_state = DSW_MIGRATION_STATE_IDLE;
778                 port->seen_events_len = 0;
779         }
780 }
781
782 static void
783 dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
784                              struct dsw_port *source_port)
785 {
786         uint8_t i;
787
788         for (i = 0; i < source_port->emigration_targets_len; i++) {
789                 struct dsw_queue_flow *qf =
790                         &source_port->emigration_target_qfs[i];
791                 uint8_t queue_id = qf->queue_id;
792
793                 if (dsw->queues[queue_id].schedule_type ==
794                     RTE_SCHED_TYPE_PARALLEL) {
795                         uint8_t dest_port_id =
796                                 source_port->emigration_target_port_ids[i];
797                         uint16_t flow_hash = qf->flow_hash;
798
799                         /* Single byte-sized stores are always atomic. */
800                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
801                                 dest_port_id;
802                 }
803         }
804
805         rte_smp_wmb();
806
807         dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
808 }
809
810 static void
811 dsw_port_consider_emigration(struct dsw_evdev *dsw,
812                              struct dsw_port *source_port,
813                              uint64_t now)
814 {
815         bool any_port_below_limit;
816         struct dsw_queue_flow *seen_events = source_port->seen_events;
817         uint16_t seen_events_len = source_port->seen_events_len;
818         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
819         uint16_t num_bursts;
820         int16_t source_port_load;
821         int16_t port_loads[dsw->num_ports];
822
823         if (now < source_port->next_emigration)
824                 return;
825
826         if (dsw->num_ports == 1)
827                 return;
828
829         if (seen_events_len < DSW_MAX_EVENTS_RECORDED)
830                 return;
831
832         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
833
834         /* Randomize interval to avoid having all threads considering
835          * emigration at the same in point in time, which might lead
836          * to all choosing the same target port.
837          */
838         source_port->next_emigration = now +
839                 source_port->migration_interval / 2 +
840                 rte_rand() % source_port->migration_interval;
841
842         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
843                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
844                                 "Emigration already in progress.\n");
845                 return;
846         }
847
848         /* For simplicity, avoid migration in the unlikely case there
849          * is still events to consume in the in_buffer (from the last
850          * emigration).
851          */
852         if (source_port->in_buffer_len > 0) {
853                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
854                                 "events in the input buffer.\n");
855                 return;
856         }
857
858         source_port_load =
859                 __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
860         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
861                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
862                       "Load %d is below threshold level %d.\n",
863                       DSW_LOAD_TO_PERCENT(source_port_load),
864                       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
865                 return;
866         }
867
868         /* Avoid starting any expensive operations (sorting etc), in
869          * case of a scenario with all ports above the load limit.
870          */
871         any_port_below_limit =
872                 dsw_retrieve_port_loads(dsw, port_loads,
873                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
874         if (!any_port_below_limit) {
875                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
876                                 "Candidate target ports are all too highly "
877                                 "loaded.\n");
878                 return;
879         }
880
881         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
882                                             bursts);
883
884         /* For non-big-little systems, there's no point in moving the
885          * only (known) flow.
886          */
887         if (num_bursts < 2) {
888                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
889                                 "queue_id %d flow_hash %d has been seen.\n",
890                                 bursts[0].queue_flow.queue_id,
891                                 bursts[0].queue_flow.flow_hash);
892                 return;
893         }
894
895         dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
896                                       port_loads);
897
898         if (source_port->emigration_targets_len == 0)
899                 return;
900
901         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
902         source_port->emigration_start = rte_get_timer_cycles();
903
904         /* No need to go through the whole pause procedure for
905          * parallel queues, since atomic/ordered semantics need not to
906          * be maintained.
907          */
908         dsw_port_move_parallel_flows(dsw, source_port);
909
910         /* All flows were on PARALLEL queues. */
911         if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
912                 return;
913
914         /* There might be 'loopback' events already scheduled in the
915          * output buffers.
916          */
917         dsw_port_flush_out_buffers(dsw, source_port);
918
919         dsw_port_add_paused_flows(source_port,
920                                   source_port->emigration_target_qfs,
921                                   source_port->emigration_targets_len);
922
923         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
924                                source_port->emigration_target_qfs,
925                                source_port->emigration_targets_len);
926         source_port->cfm_cnt = 0;
927 }
928
929 static void
930 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
931                              struct dsw_port *source_port,
932                              const struct dsw_queue_flow *qf);
933
934 static void
935 dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
936                               uint8_t originating_port_id,
937                               struct dsw_queue_flow *paused_qfs,
938                               uint8_t qfs_len)
939 {
940         uint16_t i;
941         struct dsw_ctl_msg cfm = {
942                 .type = DSW_CTL_CFM,
943                 .originating_port_id = port->id
944         };
945
946         dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
947
948         rte_smp_rmb();
949
950         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
951
952         for (i = 0; i < qfs_len; i++) {
953                 struct dsw_queue_flow *qf = &paused_qfs[i];
954
955                 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
956                         port->immigrations++;
957
958                 dsw_port_flush_paused_events(dsw, port, qf);
959         }
960 }
961
962 #define FORWARD_BURST_SIZE (32)
963
964 static void
965 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
966                                 struct rte_event_ring *dest_ring,
967                                 uint8_t queue_id,
968                                 uint16_t flow_hash)
969 {
970         uint16_t events_left;
971
972         /* Control ring message should been seen before the ring count
973          * is read on the port's in_ring.
974          */
975         rte_smp_rmb();
976
977         events_left = rte_event_ring_count(source_port->in_ring);
978
979         while (events_left > 0) {
980                 uint16_t in_burst_size =
981                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
982                 struct rte_event in_burst[in_burst_size];
983                 uint16_t in_len;
984                 uint16_t i;
985
986                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
987                                                       in_burst,
988                                                       in_burst_size, NULL);
989                 /* No need to care about bursting forwarded events (to
990                  * the destination port's in_ring), since migration
991                  * doesn't happen very often, and also the majority of
992                  * the dequeued events will likely *not* be forwarded.
993                  */
994                 for (i = 0; i < in_len; i++) {
995                         struct rte_event *e = &in_burst[i];
996                         if (e->queue_id == queue_id &&
997                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
998                                 while (rte_event_ring_enqueue_burst(dest_ring,
999                                                                     e, 1,
1000                                                                     NULL) != 1)
1001                                         rte_pause();
1002                         } else {
1003                                 uint16_t last_idx = source_port->in_buffer_len;
1004                                 source_port->in_buffer[last_idx] = *e;
1005                                 source_port->in_buffer_len++;
1006                         }
1007                 }
1008
1009                 events_left -= in_len;
1010         }
1011 }
1012
1013 static void
1014 dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1015                                struct dsw_port *source_port)
1016 {
1017         uint8_t i;
1018
1019         dsw_port_flush_out_buffers(dsw, source_port);
1020
1021         rte_smp_wmb();
1022
1023         for (i = 0; i < source_port->emigration_targets_len; i++) {
1024                 struct dsw_queue_flow *qf =
1025                         &source_port->emigration_target_qfs[i];
1026                 uint8_t dest_port_id =
1027                         source_port->emigration_target_port_ids[i];
1028                 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1029
1030                 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1031                         dest_port_id;
1032
1033                 dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
1034                                                 qf->queue_id, qf->flow_hash);
1035         }
1036
1037         /* Flow table update and migration destination port's enqueues
1038          * must be seen before the control message.
1039          */
1040         rte_smp_wmb();
1041
1042         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1043                                source_port->emigration_target_qfs,
1044                                source_port->emigration_targets_len);
1045         source_port->cfm_cnt = 0;
1046         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1047 }
1048
1049 static void
1050 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1051 {
1052         port->cfm_cnt++;
1053
1054         if (port->cfm_cnt == (dsw->num_ports-1)) {
1055                 switch (port->migration_state) {
1056                 case DSW_MIGRATION_STATE_PAUSING:
1057                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
1058                                         "migration state.\n");
1059                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
1060                         break;
1061                 case DSW_MIGRATION_STATE_UNPAUSING:
1062                         dsw_port_end_emigration(dsw, port,
1063                                                 RTE_SCHED_TYPE_ATOMIC);
1064                         break;
1065                 default:
1066                         RTE_ASSERT(0);
1067                         break;
1068                 }
1069         }
1070 }
1071
1072 static void
1073 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1074 {
1075         struct dsw_ctl_msg msg;
1076
1077         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1078                 switch (msg.type) {
1079                 case DSW_CTL_PAUS_REQ:
1080                         dsw_port_handle_pause_flows(dsw, port,
1081                                                     msg.originating_port_id,
1082                                                     msg.qfs, msg.qfs_len);
1083                         break;
1084                 case DSW_CTL_UNPAUS_REQ:
1085                         dsw_port_handle_unpause_flows(dsw, port,
1086                                                       msg.originating_port_id,
1087                                                       msg.qfs, msg.qfs_len);
1088                         break;
1089                 case DSW_CTL_CFM:
1090                         dsw_port_handle_confirm(dsw, port);
1091                         break;
1092                 }
1093         }
1094 }
1095
1096 static void
1097 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1098 {
1099         /* To pull the control ring reasonbly often on busy ports,
1100          * each dequeued/enqueued event is considered an 'op' too.
1101          */
1102         port->ops_since_bg_task += (num_events+1);
1103 }
1104
1105 static void
1106 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1107 {
1108         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
1109                      port->pending_releases == 0))
1110                 dsw_port_move_emigrating_flows(dsw, port);
1111
1112         /* Polling the control ring is relatively inexpensive, and
1113          * polling it often helps bringing down migration latency, so
1114          * do this for every iteration.
1115          */
1116         dsw_port_ctl_process(dsw, port);
1117
1118         /* To avoid considering migration and flushing output buffers
1119          * on every dequeue/enqueue call, the scheduler only performs
1120          * such 'background' tasks every nth
1121          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1122          */
1123         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1124                 uint64_t now;
1125
1126                 now = rte_get_timer_cycles();
1127
1128                 port->last_bg = now;
1129
1130                 /* Logic to avoid having events linger in the output
1131                  * buffer too long.
1132                  */
1133                 dsw_port_flush_out_buffers(dsw, port);
1134
1135                 dsw_port_consider_load_update(port, now);
1136
1137                 dsw_port_consider_emigration(dsw, port, now);
1138
1139                 port->ops_since_bg_task = 0;
1140         }
1141 }
1142
1143 static void
1144 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1145 {
1146         uint16_t dest_port_id;
1147
1148         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1149                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1150 }
1151
1152 uint16_t
1153 dsw_event_enqueue(void *port, const struct rte_event *ev)
1154 {
1155         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1156 }
1157
1158 static __rte_always_inline uint16_t
1159 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1160                                 const struct rte_event events[],
1161                                 uint16_t events_len, bool op_types_known,
1162                                 uint16_t num_new, uint16_t num_release,
1163                                 uint16_t num_non_release)
1164 {
1165         struct dsw_evdev *dsw = source_port->dsw;
1166         bool enough_credits;
1167         uint16_t i;
1168
1169         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1170                         "events to port %d.\n", events_len, source_port->id);
1171
1172         dsw_port_bg_process(dsw, source_port);
1173
1174         /* XXX: For performance (=ring efficiency) reasons, the
1175          * scheduler relies on internal non-ring buffers instead of
1176          * immediately sending the event to the destination ring. For
1177          * a producer that doesn't intend to produce or consume any
1178          * more events, the scheduler provides a way to flush the
1179          * buffer, by means of doing an enqueue of zero events. In
1180          * addition, a port cannot be left "unattended" (e.g. unused)
1181          * for long periods of time, since that would stall
1182          * migration. Eventdev API extensions to provide a cleaner way
1183          * to archieve both of these functions should be
1184          * considered.
1185          */
1186         if (unlikely(events_len == 0)) {
1187                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1188                 dsw_port_flush_out_buffers(dsw, source_port);
1189                 return 0;
1190         }
1191
1192         dsw_port_note_op(source_port, events_len);
1193
1194         if (!op_types_known)
1195                 for (i = 0; i < events_len; i++) {
1196                         switch (events[i].op) {
1197                         case RTE_EVENT_OP_RELEASE:
1198                                 num_release++;
1199                                 break;
1200                         case RTE_EVENT_OP_NEW:
1201                                 num_new++;
1202                                 /* Falls through. */
1203                         default:
1204                                 num_non_release++;
1205                                 break;
1206                         }
1207                 }
1208
1209         /* Technically, we could allow the non-new events up to the
1210          * first new event in the array into the system, but for
1211          * simplicity reasons, we deny the whole burst if the port is
1212          * above the water mark.
1213          */
1214         if (unlikely(num_new > 0 &&
1215                      __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1216                      source_port->new_event_threshold))
1217                 return 0;
1218
1219         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1220                                                   num_non_release);
1221         if (unlikely(!enough_credits))
1222                 return 0;
1223
1224         source_port->pending_releases -= num_release;
1225
1226         dsw_port_enqueue_stats(source_port, num_new,
1227                                num_non_release-num_new, num_release);
1228
1229         for (i = 0; i < events_len; i++) {
1230                 const struct rte_event *event = &events[i];
1231
1232                 if (likely(num_release == 0 ||
1233                            event->op != RTE_EVENT_OP_RELEASE))
1234                         dsw_port_buffer_event(dsw, source_port, event);
1235                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1236         }
1237
1238         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1239                         "accepted.\n", num_non_release);
1240
1241         return (num_non_release + num_release);
1242 }
1243
1244 uint16_t
1245 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1246                         uint16_t events_len)
1247 {
1248         struct dsw_port *source_port = port;
1249
1250         if (unlikely(events_len > source_port->enqueue_depth))
1251                 events_len = source_port->enqueue_depth;
1252
1253         return dsw_event_enqueue_burst_generic(source_port, events,
1254                                                events_len, false, 0, 0, 0);
1255 }
1256
1257 uint16_t
1258 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1259                             uint16_t events_len)
1260 {
1261         struct dsw_port *source_port = port;
1262
1263         if (unlikely(events_len > source_port->enqueue_depth))
1264                 events_len = source_port->enqueue_depth;
1265
1266         return dsw_event_enqueue_burst_generic(source_port, events,
1267                                                events_len, true, events_len,
1268                                                0, events_len);
1269 }
1270
1271 uint16_t
1272 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1273                                 uint16_t events_len)
1274 {
1275         struct dsw_port *source_port = port;
1276
1277         if (unlikely(events_len > source_port->enqueue_depth))
1278                 events_len = source_port->enqueue_depth;
1279
1280         return dsw_event_enqueue_burst_generic(source_port, events,
1281                                                events_len, true, 0, 0,
1282                                                events_len);
1283 }
1284
1285 uint16_t
1286 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1287 {
1288         return dsw_event_dequeue_burst(port, events, 1, wait);
1289 }
1290
1291 static void
1292 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1293                             uint16_t num)
1294 {
1295         uint16_t i;
1296
1297         dsw_port_dequeue_stats(port, num);
1298
1299         for (i = 0; i < num; i++) {
1300                 uint16_t l_idx = port->seen_events_idx;
1301                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1302                 struct rte_event *event = &events[i];
1303                 qf->queue_id = event->queue_id;
1304                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1305
1306                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1307
1308                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1309         }
1310
1311         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1312                 port->seen_events_len =
1313                         RTE_MIN(port->seen_events_len + num,
1314                                 DSW_MAX_EVENTS_RECORDED);
1315 }
1316
1317 #ifdef DSW_SORT_DEQUEUED
1318
1319 #define DSW_EVENT_TO_INT(_event)                                \
1320         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1321
1322 static inline int
1323 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1324 {
1325         const struct rte_event *event_a = v_event_a;
1326         const struct rte_event *event_b = v_event_b;
1327
1328         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1329 }
1330 #endif
1331
1332 static uint16_t
1333 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1334                        uint16_t num)
1335 {
1336         if (unlikely(port->in_buffer_len > 0)) {
1337                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1338
1339                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1340                            dequeued * sizeof(struct rte_event));
1341
1342                 port->in_buffer_start += dequeued;
1343                 port->in_buffer_len -= dequeued;
1344
1345                 if (port->in_buffer_len == 0)
1346                         port->in_buffer_start = 0;
1347
1348                 return dequeued;
1349         }
1350
1351         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1352 }
1353
1354 uint16_t
1355 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1356                         uint64_t wait __rte_unused)
1357 {
1358         struct dsw_port *source_port = port;
1359         struct dsw_evdev *dsw = source_port->dsw;
1360         uint16_t dequeued;
1361
1362         source_port->pending_releases = 0;
1363
1364         dsw_port_bg_process(dsw, source_port);
1365
1366         if (unlikely(num > source_port->dequeue_depth))
1367                 num = source_port->dequeue_depth;
1368
1369         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1370
1371         source_port->pending_releases = dequeued;
1372
1373         dsw_port_load_record(source_port, dequeued);
1374
1375         dsw_port_note_op(source_port, dequeued);
1376
1377         if (dequeued > 0) {
1378                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1379                                 dequeued);
1380
1381                 dsw_port_return_credits(dsw, source_port, dequeued);
1382
1383                 /* One potential optimization one might think of is to
1384                  * add a migration state (prior to 'pausing'), and
1385                  * only record seen events when the port is in this
1386                  * state (and transit to 'pausing' when enough events
1387                  * have been gathered). However, that schema doesn't
1388                  * seem to improve performance.
1389                  */
1390                 dsw_port_record_seen_events(port, events, dequeued);
1391         } else /* Zero-size dequeue means a likely idle port, and thus
1392                 * we can afford trading some efficiency for a slightly
1393                 * reduced event wall-time latency.
1394                 */
1395                 dsw_port_flush_out_buffers(dsw, port);
1396
1397 #ifdef DSW_SORT_DEQUEUED
1398         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1399 #endif
1400
1401         return dequeued;
1402 }