event/dsw: extend xstats
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
87                        uint16_t num_forward, uint16_t num_release)
88 {
89         port->new_enqueued += num_new;
90         port->forward_enqueued += num_forward;
91         port->release_enqueued += num_release;
92 }
93
94 static void
95 dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
96 {
97         source_port->queue_enqueued[queue_id]++;
98 }
99
100 static void
101 dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
102 {
103         port->dequeued += num;
104 }
105
106 static void
107 dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
108 {
109         source_port->queue_dequeued[queue_id]++;
110 }
111
112 static void
113 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
114 {
115         if (dequeued > 0 && port->busy_start == 0)
116                 /* work period begins */
117                 port->busy_start = rte_get_timer_cycles();
118         else if (dequeued == 0 && port->busy_start > 0) {
119                 /* work period ends */
120                 uint64_t work_period =
121                         rte_get_timer_cycles() - port->busy_start;
122                 port->busy_cycles += work_period;
123                 port->busy_start = 0;
124         }
125 }
126
127 static int16_t
128 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
129 {
130         uint64_t passed = now - port->measurement_start;
131         uint64_t busy_cycles = port->busy_cycles;
132
133         if (port->busy_start > 0) {
134                 busy_cycles += (now - port->busy_start);
135                 port->busy_start = now;
136         }
137
138         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
139
140         port->measurement_start = now;
141         port->busy_cycles = 0;
142
143         port->total_busy_cycles += busy_cycles;
144
145         return load;
146 }
147
148 static void
149 dsw_port_load_update(struct dsw_port *port, uint64_t now)
150 {
151         int16_t old_load;
152         int16_t period_load;
153         int16_t new_load;
154
155         old_load = rte_atomic16_read(&port->load);
156
157         period_load = dsw_port_load_close_period(port, now);
158
159         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
160                 (DSW_OLD_LOAD_WEIGHT+1);
161
162         rte_atomic16_set(&port->load, new_load);
163 }
164
165 static void
166 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
167 {
168         if (now < port->next_load_update)
169                 return;
170
171         port->next_load_update = now + port->load_update_interval;
172
173         dsw_port_load_update(port, now);
174 }
175
176 static void
177 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
178 {
179         /* there's always room on the ring */
180         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
181                 rte_pause();
182 }
183
184 static int
185 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
186 {
187         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
188 }
189
190 static void
191 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
192                        uint8_t type, uint8_t queue_id, uint16_t flow_hash)
193 {
194         uint16_t port_id;
195         struct dsw_ctl_msg msg = {
196                 .type = type,
197                 .originating_port_id = source_port->id,
198                 .queue_id = queue_id,
199                 .flow_hash = flow_hash
200         };
201
202         for (port_id = 0; port_id < dsw->num_ports; port_id++)
203                 if (port_id != source_port->id)
204                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
205 }
206
207 static bool
208 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
209                         uint16_t flow_hash)
210 {
211         uint16_t i;
212
213         for (i = 0; i < port->paused_flows_len; i++) {
214                 struct dsw_queue_flow *qf = &port->paused_flows[i];
215                 if (qf->queue_id == queue_id &&
216                     qf->flow_hash == flow_hash)
217                         return true;
218         }
219         return false;
220 }
221
222 static void
223 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
224                          uint16_t paused_flow_hash)
225 {
226         port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
227                 .queue_id = queue_id,
228                 .flow_hash = paused_flow_hash
229         };
230         port->paused_flows_len++;
231 }
232
233 static void
234 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
235                             uint16_t paused_flow_hash)
236 {
237         uint16_t i;
238
239         for (i = 0; i < port->paused_flows_len; i++) {
240                 struct dsw_queue_flow *qf = &port->paused_flows[i];
241
242                 if (qf->queue_id == queue_id &&
243                     qf->flow_hash == paused_flow_hash) {
244                         uint16_t last_idx = port->paused_flows_len-1;
245                         if (i != last_idx)
246                                 port->paused_flows[i] =
247                                         port->paused_flows[last_idx];
248                         port->paused_flows_len--;
249                         break;
250                 }
251         }
252 }
253
254 static void
255 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
256
257 static void
258 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
259                            uint8_t originating_port_id, uint8_t queue_id,
260                            uint16_t paused_flow_hash)
261 {
262         struct dsw_ctl_msg cfm = {
263                 .type = DSW_CTL_CFM,
264                 .originating_port_id = port->id,
265                 .queue_id = queue_id,
266                 .flow_hash = paused_flow_hash
267         };
268
269         DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
270                         queue_id, paused_flow_hash);
271
272         /* There might be already-scheduled events belonging to the
273          * paused flow in the output buffers.
274          */
275         dsw_port_flush_out_buffers(dsw, port);
276
277         dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
278
279         /* Make sure any stores to the original port's in_ring is seen
280          * before the ctl message.
281          */
282         rte_smp_wmb();
283
284         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
285 }
286
287 static void
288 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
289                           uint8_t exclude_port_id, int16_t *port_loads,
290                           uint8_t *target_port_id, int16_t *target_load)
291 {
292         int16_t candidate_port_id = -1;
293         int16_t candidate_load = DSW_MAX_LOAD;
294         uint16_t i;
295
296         for (i = 0; i < num_port_ids; i++) {
297                 uint8_t port_id = port_ids[i];
298                 if (port_id != exclude_port_id) {
299                         int16_t load = port_loads[port_id];
300                         if (candidate_port_id == -1 ||
301                             load < candidate_load) {
302                                 candidate_port_id = port_id;
303                                 candidate_load = load;
304                         }
305                 }
306         }
307         *target_port_id = candidate_port_id;
308         *target_load = candidate_load;
309 }
310
311 struct dsw_queue_flow_burst {
312         struct dsw_queue_flow queue_flow;
313         uint16_t count;
314 };
315
316 static inline int
317 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
318 {
319         const struct dsw_queue_flow_burst *burst_a = v_burst_a;
320         const struct dsw_queue_flow_burst *burst_b = v_burst_b;
321
322         int a_count = burst_a->count;
323         int b_count = burst_b->count;
324
325         return a_count - b_count;
326 }
327
328 #define DSW_QF_TO_INT(_qf)                                      \
329         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
330
331 static inline int
332 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
333 {
334         const struct dsw_queue_flow *qf_a = v_qf_a;
335         const struct dsw_queue_flow *qf_b = v_qf_b;
336
337         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
338 }
339
340 static uint16_t
341 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
342                        struct dsw_queue_flow_burst *bursts)
343 {
344         uint16_t i;
345         struct dsw_queue_flow_burst *current_burst = NULL;
346         uint16_t num_bursts = 0;
347
348         /* We don't need the stable property, and the list is likely
349          * large enough for qsort() to outperform dsw_stable_sort(),
350          * so we use qsort() here.
351          */
352         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
353
354         /* arrange the (now-consecutive) events into bursts */
355         for (i = 0; i < qfs_len; i++) {
356                 if (i == 0 ||
357                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
358                         current_burst = &bursts[num_bursts];
359                         current_burst->queue_flow = qfs[i];
360                         current_burst->count = 0;
361                         num_bursts++;
362                 }
363                 current_burst->count++;
364         }
365
366         qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
367
368         return num_bursts;
369 }
370
371 static bool
372 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
373                         int16_t load_limit)
374 {
375         bool below_limit = false;
376         uint16_t i;
377
378         for (i = 0; i < dsw->num_ports; i++) {
379                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
380                 if (load < load_limit)
381                         below_limit = true;
382                 port_loads[i] = load;
383         }
384         return below_limit;
385 }
386
387 static bool
388 dsw_select_emigration_target(struct dsw_evdev *dsw,
389                              struct dsw_port *source_port,
390                              struct dsw_queue_flow_burst *bursts,
391                              uint16_t num_bursts, int16_t *port_loads,
392                              int16_t max_load, struct dsw_queue_flow *target_qf,
393                              uint8_t *target_port_id)
394 {
395         uint16_t source_load = port_loads[source_port->id];
396         uint16_t i;
397
398         for (i = 0; i < num_bursts; i++) {
399                 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
400
401                 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
402                                             qf->flow_hash))
403                         continue;
404
405                 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
406                 int16_t target_load;
407
408                 dsw_find_lowest_load_port(queue->serving_ports,
409                                           queue->num_serving_ports,
410                                           source_port->id, port_loads,
411                                           target_port_id, &target_load);
412
413                 if (target_load < source_load &&
414                     target_load < max_load) {
415                         *target_qf = *qf;
416                         return true;
417                 }
418         }
419
420         DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
421                         "no target port found with load less than %d.\n",
422                         num_bursts, DSW_LOAD_TO_PERCENT(max_load));
423
424         return false;
425 }
426
427 static uint8_t
428 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
429 {
430         struct dsw_queue *queue = &dsw->queues[queue_id];
431         uint8_t port_id;
432
433         if (queue->num_serving_ports > 1)
434                 port_id = queue->flow_to_port_map[flow_hash];
435         else
436                 /* A single-link queue, or atomic/ordered/parallel but
437                  * with just a single serving port.
438                  */
439                 port_id = queue->serving_ports[0];
440
441         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
442                    "to port %d.\n", queue_id, flow_hash, port_id);
443
444         return port_id;
445 }
446
447 static void
448 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
449                            uint8_t dest_port_id)
450 {
451         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
452         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
453         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
454         uint16_t enqueued = 0;
455
456         if (*buffer_len == 0)
457                 return;
458
459         /* The rings are dimensioned to fit all in-flight events (even
460          * on a single ring), so looping will work.
461          */
462         do {
463                 enqueued +=
464                         rte_event_ring_enqueue_burst(dest_port->in_ring,
465                                                      buffer+enqueued,
466                                                      *buffer_len-enqueued,
467                                                      NULL);
468         } while (unlikely(enqueued != *buffer_len));
469
470         (*buffer_len) = 0;
471 }
472
473 static uint16_t
474 dsw_port_get_parallel_flow_id(struct dsw_port *port)
475 {
476         uint16_t flow_id = port->next_parallel_flow_id;
477
478         port->next_parallel_flow_id =
479                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
480
481         return flow_id;
482 }
483
484 static void
485 dsw_port_buffer_paused(struct dsw_port *port,
486                        const struct rte_event *paused_event)
487 {
488         port->paused_events[port->paused_events_len] = *paused_event;
489         port->paused_events_len++;
490 }
491
492 static void
493 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
494                            uint8_t dest_port_id, const struct rte_event *event)
495 {
496         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
497         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
498
499         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
500                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
501
502         buffer[*buffer_len] = *event;
503
504         (*buffer_len)++;
505 }
506
507 #define DSW_FLOW_ID_BITS (24)
508 static uint16_t
509 dsw_flow_id_hash(uint32_t flow_id)
510 {
511         uint16_t hash = 0;
512         uint16_t offset = 0;
513
514         do {
515                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
516                 offset += DSW_MAX_FLOWS_BITS;
517         } while (offset < DSW_FLOW_ID_BITS);
518
519         return hash;
520 }
521
522 static void
523 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
524                          struct rte_event event)
525 {
526         uint8_t dest_port_id;
527
528         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
529
530         dest_port_id = dsw_schedule(dsw, event.queue_id,
531                                     dsw_flow_id_hash(event.flow_id));
532
533         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
534 }
535
536 static void
537 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
538                       const struct rte_event *event)
539 {
540         uint16_t flow_hash;
541         uint8_t dest_port_id;
542
543         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
544                      RTE_SCHED_TYPE_PARALLEL)) {
545                 dsw_port_buffer_parallel(dsw, source_port, *event);
546                 return;
547         }
548
549         flow_hash = dsw_flow_id_hash(event->flow_id);
550
551         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
552                                              flow_hash))) {
553                 dsw_port_buffer_paused(source_port, event);
554                 return;
555         }
556
557         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
558
559         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
560 }
561
562 static void
563 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
564                              struct dsw_port *source_port,
565                              uint8_t queue_id, uint16_t paused_flow_hash)
566 {
567         uint16_t paused_events_len = source_port->paused_events_len;
568         struct rte_event paused_events[paused_events_len];
569         uint8_t dest_port_id;
570         uint16_t i;
571
572         if (paused_events_len == 0)
573                 return;
574
575         if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
576                 return;
577
578         rte_memcpy(paused_events, source_port->paused_events,
579                    paused_events_len * sizeof(struct rte_event));
580
581         source_port->paused_events_len = 0;
582
583         dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
584
585         for (i = 0; i < paused_events_len; i++) {
586                 struct rte_event *event = &paused_events[i];
587                 uint16_t flow_hash;
588
589                 flow_hash = dsw_flow_id_hash(event->flow_id);
590
591                 if (event->queue_id == queue_id &&
592                     flow_hash == paused_flow_hash)
593                         dsw_port_buffer_non_paused(dsw, source_port,
594                                                    dest_port_id, event);
595                 else
596                         dsw_port_buffer_paused(source_port, event);
597         }
598 }
599
600 static void
601 dsw_port_emigration_stats(struct dsw_port *port)
602 {
603         uint64_t emigration_latency;
604
605         emigration_latency = (rte_get_timer_cycles() - port->emigration_start);
606         port->emigration_latency += emigration_latency;
607         port->emigrations++;
608 }
609
610 static void
611 dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port)
612 {
613         uint8_t queue_id = port->emigration_target_qf.queue_id;
614         uint16_t flow_hash = port->emigration_target_qf.flow_hash;
615
616         port->migration_state = DSW_MIGRATION_STATE_IDLE;
617         port->seen_events_len = 0;
618
619         dsw_port_emigration_stats(port);
620
621         if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
622                 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
623                 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
624         }
625
626         DSW_LOG_DP_PORT(DEBUG, port->id, "Emigration completed for queue_id "
627                         "%d flow_hash %d.\n", queue_id, flow_hash);
628 }
629
630 static void
631 dsw_port_consider_emigration(struct dsw_evdev *dsw,
632                              struct dsw_port *source_port,
633                              uint64_t now)
634 {
635         bool any_port_below_limit;
636         struct dsw_queue_flow *seen_events = source_port->seen_events;
637         uint16_t seen_events_len = source_port->seen_events_len;
638         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
639         uint16_t num_bursts;
640         int16_t source_port_load;
641         int16_t port_loads[dsw->num_ports];
642
643         if (now < source_port->next_emigration)
644                 return;
645
646         if (dsw->num_ports == 1)
647                 return;
648
649         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
650
651         /* Randomize interval to avoid having all threads considering
652          * emigration at the same in point in time, which might lead
653          * to all choosing the same target port.
654          */
655         source_port->next_emigration = now +
656                 source_port->migration_interval / 2 +
657                 rte_rand() % source_port->migration_interval;
658
659         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
660                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
661                                 "Emigration already in progress.\n");
662                 return;
663         }
664
665         /* For simplicity, avoid migration in the unlikely case there
666          * is still events to consume in the in_buffer (from the last
667          * emigration).
668          */
669         if (source_port->in_buffer_len > 0) {
670                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
671                                 "events in the input buffer.\n");
672                 return;
673         }
674
675         source_port_load = rte_atomic16_read(&source_port->load);
676         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
677                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
678                                 "Load %d is below threshold level %d.\n",
679                                 DSW_LOAD_TO_PERCENT(source_port_load),
680                        DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
681                 return;
682         }
683
684         /* Avoid starting any expensive operations (sorting etc), in
685          * case of a scenario with all ports above the load limit.
686          */
687         any_port_below_limit =
688                 dsw_retrieve_port_loads(dsw, port_loads,
689                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
690         if (!any_port_below_limit) {
691                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
692                                 "Candidate target ports are all too highly "
693                                 "loaded.\n");
694                 return;
695         }
696
697         /* Sort flows into 'bursts' to allow attempting to migrating
698          * small (but still active) flows first - this it to avoid
699          * having large flows moving around the worker cores too much
700          * (to avoid cache misses, among other things). Of course, the
701          * number of recorded events (queue+flow ids) are limited, and
702          * provides only a snapshot, so only so many conclusions can
703          * be drawn from this data.
704          */
705         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
706                                             bursts);
707         /* For non-big-little systems, there's no point in moving the
708          * only (known) flow.
709          */
710         if (num_bursts < 2) {
711                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
712                                 "queue_id %d flow_hash %d has been seen.\n",
713                                 bursts[0].queue_flow.queue_id,
714                                 bursts[0].queue_flow.flow_hash);
715                 return;
716         }
717
718         /* The strategy is to first try to find a flow to move to a
719          * port with low load (below the emigration-attempt
720          * threshold). If that fails, we try to find a port which is
721          * below the max threshold, and also less loaded than this
722          * port is.
723          */
724         if (!dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
725                                       port_loads,
726                                       DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
727                                       &source_port->emigration_target_qf,
728                                       &source_port->emigration_target_port_id)
729             &&
730             !dsw_select_emigration_target(dsw, source_port, bursts, num_bursts,
731                                       port_loads,
732                                       DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
733                                       &source_port->emigration_target_qf,
734                                       &source_port->emigration_target_port_id))
735                 return;
736
737         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
738                         "flow_hash %d from port %d to port %d.\n",
739                         source_port->emigration_target_qf.queue_id,
740                         source_port->emigration_target_qf.flow_hash,
741                         source_port->id,
742                         source_port->emigration_target_port_id);
743
744         /* We have a winner. */
745
746         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
747         source_port->emigration_start = rte_get_timer_cycles();
748
749         /* No need to go through the whole pause procedure for
750          * parallel queues, since atomic/ordered semantics need not to
751          * be maintained.
752          */
753
754         if (dsw->queues[source_port->emigration_target_qf.queue_id].
755             schedule_type == RTE_SCHED_TYPE_PARALLEL) {
756                 uint8_t queue_id =
757                         source_port->emigration_target_qf.queue_id;
758                 uint16_t flow_hash =
759                         source_port->emigration_target_qf.flow_hash;
760                 uint8_t dest_port_id =
761                         source_port->emigration_target_port_id;
762
763                 /* Single byte-sized stores are always atomic. */
764                 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
765                         dest_port_id;
766                 rte_smp_wmb();
767
768                 dsw_port_end_emigration(dsw, source_port);
769
770                 return;
771         }
772
773         /* There might be 'loopback' events already scheduled in the
774          * output buffers.
775          */
776         dsw_port_flush_out_buffers(dsw, source_port);
777
778         dsw_port_add_paused_flow(source_port,
779                                  source_port->emigration_target_qf.queue_id,
780                                  source_port->emigration_target_qf.flow_hash);
781
782         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
783                                source_port->emigration_target_qf.queue_id,
784                                source_port->emigration_target_qf.flow_hash);
785         source_port->cfm_cnt = 0;
786 }
787
788 static void
789 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
790                              struct dsw_port *source_port,
791                              uint8_t queue_id, uint16_t paused_flow_hash);
792
793 static void
794 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
795                              uint8_t originating_port_id, uint8_t queue_id,
796                              uint16_t paused_flow_hash)
797 {
798         struct dsw_ctl_msg cfm = {
799                 .type = DSW_CTL_CFM,
800                 .originating_port_id = port->id,
801                 .queue_id = queue_id,
802                 .flow_hash = paused_flow_hash
803         };
804
805         DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
806                         queue_id, paused_flow_hash);
807
808         dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
809
810         rte_smp_rmb();
811
812         if (dsw_schedule(dsw, queue_id, paused_flow_hash) == port->id)
813                 port->immigrations++;
814
815         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
816
817         dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
818 }
819
820 #define FORWARD_BURST_SIZE (32)
821
822 static void
823 dsw_port_forward_emigrated_flow(struct dsw_port *source_port,
824                                 struct rte_event_ring *dest_ring,
825                                 uint8_t queue_id,
826                                 uint16_t flow_hash)
827 {
828         uint16_t events_left;
829
830         /* Control ring message should been seen before the ring count
831          * is read on the port's in_ring.
832          */
833         rte_smp_rmb();
834
835         events_left = rte_event_ring_count(source_port->in_ring);
836
837         while (events_left > 0) {
838                 uint16_t in_burst_size =
839                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
840                 struct rte_event in_burst[in_burst_size];
841                 uint16_t in_len;
842                 uint16_t i;
843
844                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
845                                                       in_burst,
846                                                       in_burst_size, NULL);
847                 /* No need to care about bursting forwarded events (to
848                  * the destination port's in_ring), since migration
849                  * doesn't happen very often, and also the majority of
850                  * the dequeued events will likely *not* be forwarded.
851                  */
852                 for (i = 0; i < in_len; i++) {
853                         struct rte_event *e = &in_burst[i];
854                         if (e->queue_id == queue_id &&
855                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
856                                 while (rte_event_ring_enqueue_burst(dest_ring,
857                                                                     e, 1,
858                                                                     NULL) != 1)
859                                         rte_pause();
860                         } else {
861                                 uint16_t last_idx = source_port->in_buffer_len;
862                                 source_port->in_buffer[last_idx] = *e;
863                                 source_port->in_buffer_len++;
864                         }
865                 }
866
867                 events_left -= in_len;
868         }
869 }
870
871 static void
872 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
873                              struct dsw_port *source_port)
874 {
875         uint8_t queue_id = source_port->emigration_target_qf.queue_id;
876         uint16_t flow_hash = source_port->emigration_target_qf.flow_hash;
877         uint8_t dest_port_id = source_port->emigration_target_port_id;
878         struct dsw_port *dest_port = &dsw->ports[dest_port_id];
879
880         dsw_port_flush_out_buffers(dsw, source_port);
881
882         rte_smp_wmb();
883
884         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
885                 dest_port_id;
886
887         dsw_port_forward_emigrated_flow(source_port, dest_port->in_ring,
888                                         queue_id, flow_hash);
889
890         /* Flow table update and migration destination port's enqueues
891          * must be seen before the control message.
892          */
893         rte_smp_wmb();
894
895         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
896                                flow_hash);
897         source_port->cfm_cnt = 0;
898         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
899 }
900
901 static void
902 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
903 {
904         port->cfm_cnt++;
905
906         if (port->cfm_cnt == (dsw->num_ports-1)) {
907                 switch (port->migration_state) {
908                 case DSW_MIGRATION_STATE_PAUSING:
909                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
910                                         "migration state.\n");
911                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
912                         break;
913                 case DSW_MIGRATION_STATE_UNPAUSING:
914                         dsw_port_end_emigration(dsw, port);
915                         break;
916                 default:
917                         RTE_ASSERT(0);
918                         break;
919                 }
920         }
921 }
922
923 static void
924 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
925 {
926         struct dsw_ctl_msg msg;
927
928         /* So any table loads happens before the ring dequeue, in the
929          * case of a 'paus' message.
930          */
931         rte_smp_rmb();
932
933         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
934                 switch (msg.type) {
935                 case DSW_CTL_PAUS_REQ:
936                         dsw_port_handle_pause_flow(dsw, port,
937                                                    msg.originating_port_id,
938                                                    msg.queue_id, msg.flow_hash);
939                         break;
940                 case DSW_CTL_UNPAUS_REQ:
941                         dsw_port_handle_unpause_flow(dsw, port,
942                                                      msg.originating_port_id,
943                                                      msg.queue_id,
944                                                      msg.flow_hash);
945                         break;
946                 case DSW_CTL_CFM:
947                         dsw_port_handle_confirm(dsw, port);
948                         break;
949                 }
950         }
951 }
952
953 static void
954 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
955 {
956         /* To pull the control ring reasonbly often on busy ports,
957          * each dequeued/enqueued event is considered an 'op' too.
958          */
959         port->ops_since_bg_task += (num_events+1);
960 }
961
962 static void
963 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
964 {
965         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
966                      port->pending_releases == 0))
967                 dsw_port_move_migrating_flow(dsw, port);
968
969         /* Polling the control ring is relatively inexpensive, and
970          * polling it often helps bringing down migration latency, so
971          * do this for every iteration.
972          */
973         dsw_port_ctl_process(dsw, port);
974
975         /* To avoid considering migration and flushing output buffers
976          * on every dequeue/enqueue call, the scheduler only performs
977          * such 'background' tasks every nth
978          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
979          */
980         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
981                 uint64_t now;
982
983                 now = rte_get_timer_cycles();
984
985                 port->last_bg = now;
986
987                 /* Logic to avoid having events linger in the output
988                  * buffer too long.
989                  */
990                 dsw_port_flush_out_buffers(dsw, port);
991
992                 dsw_port_consider_load_update(port, now);
993
994                 dsw_port_consider_emigration(dsw, port, now);
995
996                 port->ops_since_bg_task = 0;
997         }
998 }
999
1000 static void
1001 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1002 {
1003         uint16_t dest_port_id;
1004
1005         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
1006                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1007 }
1008
1009 uint16_t
1010 dsw_event_enqueue(void *port, const struct rte_event *ev)
1011 {
1012         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1013 }
1014
1015 static __rte_always_inline uint16_t
1016 dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1017                                 const struct rte_event events[],
1018                                 uint16_t events_len, bool op_types_known,
1019                                 uint16_t num_new, uint16_t num_release,
1020                                 uint16_t num_non_release)
1021 {
1022         struct dsw_evdev *dsw = source_port->dsw;
1023         bool enough_credits;
1024         uint16_t i;
1025
1026         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1027                         "events to port %d.\n", events_len, source_port->id);
1028
1029         dsw_port_bg_process(dsw, source_port);
1030
1031         /* XXX: For performance (=ring efficiency) reasons, the
1032          * scheduler relies on internal non-ring buffers instead of
1033          * immediately sending the event to the destination ring. For
1034          * a producer that doesn't intend to produce or consume any
1035          * more events, the scheduler provides a way to flush the
1036          * buffer, by means of doing an enqueue of zero events. In
1037          * addition, a port cannot be left "unattended" (e.g. unused)
1038          * for long periods of time, since that would stall
1039          * migration. Eventdev API extensions to provide a cleaner way
1040          * to archieve both of these functions should be
1041          * considered.
1042          */
1043         if (unlikely(events_len == 0)) {
1044                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1045                 dsw_port_flush_out_buffers(dsw, source_port);
1046                 return 0;
1047         }
1048
1049         dsw_port_note_op(source_port, events_len);
1050
1051         if (!op_types_known)
1052                 for (i = 0; i < events_len; i++) {
1053                         switch (events[i].op) {
1054                         case RTE_EVENT_OP_RELEASE:
1055                                 num_release++;
1056                                 break;
1057                         case RTE_EVENT_OP_NEW:
1058                                 num_new++;
1059                                 /* Falls through. */
1060                         default:
1061                                 num_non_release++;
1062                                 break;
1063                         }
1064                 }
1065
1066         /* Technically, we could allow the non-new events up to the
1067          * first new event in the array into the system, but for
1068          * simplicity reasons, we deny the whole burst if the port is
1069          * above the water mark.
1070          */
1071         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1072                      source_port->new_event_threshold))
1073                 return 0;
1074
1075         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1076                                                   num_non_release);
1077         if (unlikely(!enough_credits))
1078                 return 0;
1079
1080         source_port->pending_releases -= num_release;
1081
1082         dsw_port_enqueue_stats(source_port, num_new,
1083                                num_non_release-num_new, num_release);
1084
1085         for (i = 0; i < events_len; i++) {
1086                 const struct rte_event *event = &events[i];
1087
1088                 if (likely(num_release == 0 ||
1089                            event->op != RTE_EVENT_OP_RELEASE))
1090                         dsw_port_buffer_event(dsw, source_port, event);
1091                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1092         }
1093
1094         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1095                         "accepted.\n", num_non_release);
1096
1097         return num_non_release;
1098 }
1099
1100 uint16_t
1101 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1102                         uint16_t events_len)
1103 {
1104         struct dsw_port *source_port = port;
1105
1106         if (unlikely(events_len > source_port->enqueue_depth))
1107                 events_len = source_port->enqueue_depth;
1108
1109         return dsw_event_enqueue_burst_generic(source_port, events,
1110                                                events_len, false, 0, 0, 0);
1111 }
1112
1113 uint16_t
1114 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1115                             uint16_t events_len)
1116 {
1117         struct dsw_port *source_port = port;
1118
1119         if (unlikely(events_len > source_port->enqueue_depth))
1120                 events_len = source_port->enqueue_depth;
1121
1122         return dsw_event_enqueue_burst_generic(source_port, events,
1123                                                events_len, true, events_len,
1124                                                0, events_len);
1125 }
1126
1127 uint16_t
1128 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1129                                 uint16_t events_len)
1130 {
1131         struct dsw_port *source_port = port;
1132
1133         if (unlikely(events_len > source_port->enqueue_depth))
1134                 events_len = source_port->enqueue_depth;
1135
1136         return dsw_event_enqueue_burst_generic(source_port, events,
1137                                                events_len, true, 0, 0,
1138                                                events_len);
1139 }
1140
1141 uint16_t
1142 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1143 {
1144         return dsw_event_dequeue_burst(port, events, 1, wait);
1145 }
1146
1147 static void
1148 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1149                             uint16_t num)
1150 {
1151         uint16_t i;
1152
1153         dsw_port_dequeue_stats(port, num);
1154
1155         for (i = 0; i < num; i++) {
1156                 uint16_t l_idx = port->seen_events_idx;
1157                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1158                 struct rte_event *event = &events[i];
1159                 qf->queue_id = event->queue_id;
1160                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1161
1162                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1163
1164                 dsw_port_queue_dequeued_stats(port, event->queue_id);
1165         }
1166
1167         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1168                 port->seen_events_len =
1169                         RTE_MIN(port->seen_events_len + num,
1170                                 DSW_MAX_EVENTS_RECORDED);
1171 }
1172
1173 #ifdef DSW_SORT_DEQUEUED
1174
1175 #define DSW_EVENT_TO_INT(_event)                                \
1176         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1177
1178 static inline int
1179 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1180 {
1181         const struct rte_event *event_a = v_event_a;
1182         const struct rte_event *event_b = v_event_b;
1183
1184         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1185 }
1186 #endif
1187
1188 static uint16_t
1189 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1190                        uint16_t num)
1191 {
1192         struct dsw_port *source_port = port;
1193         struct dsw_evdev *dsw = source_port->dsw;
1194
1195         dsw_port_ctl_process(dsw, source_port);
1196
1197         if (unlikely(port->in_buffer_len > 0)) {
1198                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1199
1200                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1201                            dequeued * sizeof(struct rte_event));
1202
1203                 port->in_buffer_start += dequeued;
1204                 port->in_buffer_len -= dequeued;
1205
1206                 if (port->in_buffer_len == 0)
1207                         port->in_buffer_start = 0;
1208
1209                 return dequeued;
1210         }
1211
1212         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1213 }
1214
1215 uint16_t
1216 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1217                         uint64_t wait __rte_unused)
1218 {
1219         struct dsw_port *source_port = port;
1220         struct dsw_evdev *dsw = source_port->dsw;
1221         uint16_t dequeued;
1222
1223         source_port->pending_releases = 0;
1224
1225         dsw_port_bg_process(dsw, source_port);
1226
1227         if (unlikely(num > source_port->dequeue_depth))
1228                 num = source_port->dequeue_depth;
1229
1230         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1231
1232         source_port->pending_releases = dequeued;
1233
1234         dsw_port_load_record(source_port, dequeued);
1235
1236         dsw_port_note_op(source_port, dequeued);
1237
1238         if (dequeued > 0) {
1239                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1240                                 dequeued);
1241
1242                 dsw_port_return_credits(dsw, source_port, dequeued);
1243
1244                 /* One potential optimization one might think of is to
1245                  * add a migration state (prior to 'pausing'), and
1246                  * only record seen events when the port is in this
1247                  * state (and transit to 'pausing' when enough events
1248                  * have been gathered). However, that schema doesn't
1249                  * seem to improve performance.
1250                  */
1251                 dsw_port_record_seen_events(port, events, dequeued);
1252         } else /* Zero-size dequeue means a likely idle port, and thus
1253                 * we can afford trading some efficiency for a slightly
1254                 * reduced event wall-time latency.
1255                 */
1256                 dsw_port_flush_out_buffers(dsw, port);
1257
1258 #ifdef DSW_SORT_DEQUEUED
1259         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1260 #endif
1261
1262         return dequeued;
1263 }