event/dsw: add load balancing
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #include <stdbool.h>
8 #include <string.h>
9
10 #include <rte_atomic.h>
11 #include <rte_cycles.h>
12 #include <rte_memcpy.h>
13 #include <rte_random.h>
14
15 static bool
16 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
17                          int32_t credits)
18 {
19         int32_t inflight_credits = port->inflight_credits;
20         int32_t missing_credits = credits - inflight_credits;
21         int32_t total_on_loan;
22         int32_t available;
23         int32_t acquired_credits;
24         int32_t new_total_on_loan;
25
26         if (likely(missing_credits <= 0)) {
27                 port->inflight_credits -= credits;
28                 return true;
29         }
30
31         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
32         available = dsw->max_inflight - total_on_loan;
33         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
34
35         if (available < acquired_credits)
36                 return false;
37
38         /* This is a race, no locks are involved, and thus some other
39          * thread can allocate tokens in between the check and the
40          * allocation.
41          */
42         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
43                                                     acquired_credits);
44
45         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
46                 /* Some other port took the last credits */
47                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
48                 return false;
49         }
50
51         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
52                         acquired_credits);
53
54         port->inflight_credits += acquired_credits;
55         port->inflight_credits -= credits;
56
57         return true;
58 }
59
60 static void
61 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
62                         int32_t credits)
63 {
64         port->inflight_credits += credits;
65
66         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
67                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
68                 int32_t return_credits =
69                         port->inflight_credits - leave_credits;
70
71                 port->inflight_credits = leave_credits;
72
73                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
74
75                 DSW_LOG_DP_PORT(DEBUG, port->id,
76                                 "Returned %d tokens to pool.\n",
77                                 return_credits);
78         }
79 }
80
81 static void
82 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
83 {
84         if (dequeued > 0 && port->busy_start == 0)
85                 /* work period begins */
86                 port->busy_start = rte_get_timer_cycles();
87         else if (dequeued == 0 && port->busy_start > 0) {
88                 /* work period ends */
89                 uint64_t work_period =
90                         rte_get_timer_cycles() - port->busy_start;
91                 port->busy_cycles += work_period;
92                 port->busy_start = 0;
93         }
94 }
95
96 static int16_t
97 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
98 {
99         uint64_t passed = now - port->measurement_start;
100         uint64_t busy_cycles = port->busy_cycles;
101
102         if (port->busy_start > 0) {
103                 busy_cycles += (now - port->busy_start);
104                 port->busy_start = now;
105         }
106
107         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
108
109         port->measurement_start = now;
110         port->busy_cycles = 0;
111
112         port->total_busy_cycles += busy_cycles;
113
114         return load;
115 }
116
117 static void
118 dsw_port_load_update(struct dsw_port *port, uint64_t now)
119 {
120         int16_t old_load;
121         int16_t period_load;
122         int16_t new_load;
123
124         old_load = rte_atomic16_read(&port->load);
125
126         period_load = dsw_port_load_close_period(port, now);
127
128         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
129                 (DSW_OLD_LOAD_WEIGHT+1);
130
131         rte_atomic16_set(&port->load, new_load);
132 }
133
134 static void
135 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
136 {
137         if (now < port->next_load_update)
138                 return;
139
140         port->next_load_update = now + port->load_update_interval;
141
142         dsw_port_load_update(port, now);
143 }
144
145 static void
146 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
147 {
148         void *raw_msg;
149
150         memcpy(&raw_msg, msg, sizeof(*msg));
151
152         /* there's always room on the ring */
153         while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
154                 rte_pause();
155 }
156
157 static int
158 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
159 {
160         void *raw_msg;
161         int rc;
162
163         rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
164
165         if (rc == 0)
166                 memcpy(msg, &raw_msg, sizeof(*msg));
167
168         return rc;
169 }
170
171 static void
172 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
173                        uint8_t type, uint8_t queue_id, uint16_t flow_hash)
174 {
175         uint16_t port_id;
176         struct dsw_ctl_msg msg = {
177                 .type = type,
178                 .originating_port_id = source_port->id,
179                 .queue_id = queue_id,
180                 .flow_hash = flow_hash
181         };
182
183         for (port_id = 0; port_id < dsw->num_ports; port_id++)
184                 if (port_id != source_port->id)
185                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
186 }
187
188 static bool
189 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
190                         uint16_t flow_hash)
191 {
192         uint16_t i;
193
194         for (i = 0; i < port->paused_flows_len; i++) {
195                 struct dsw_queue_flow *qf = &port->paused_flows[i];
196                 if (qf->queue_id == queue_id &&
197                     qf->flow_hash == flow_hash)
198                         return true;
199         }
200         return false;
201 }
202
203 static void
204 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
205                          uint16_t paused_flow_hash)
206 {
207         port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
208                 .queue_id = queue_id,
209                 .flow_hash = paused_flow_hash
210         };
211         port->paused_flows_len++;
212 }
213
214 static void
215 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
216                             uint16_t paused_flow_hash)
217 {
218         uint16_t i;
219
220         for (i = 0; i < port->paused_flows_len; i++) {
221                 struct dsw_queue_flow *qf = &port->paused_flows[i];
222
223                 if (qf->queue_id == queue_id &&
224                     qf->flow_hash == paused_flow_hash) {
225                         uint16_t last_idx = port->paused_flows_len-1;
226                         if (i != last_idx)
227                                 port->paused_flows[i] =
228                                         port->paused_flows[last_idx];
229                         port->paused_flows_len--;
230                         break;
231                 }
232         }
233 }
234
235 static void
236 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
237
238 static void
239 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
240                            uint8_t originating_port_id, uint8_t queue_id,
241                            uint16_t paused_flow_hash)
242 {
243         struct dsw_ctl_msg cfm = {
244                 .type = DSW_CTL_CFM,
245                 .originating_port_id = port->id,
246                 .queue_id = queue_id,
247                 .flow_hash = paused_flow_hash
248         };
249
250         DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
251                         queue_id, paused_flow_hash);
252
253         /* There might be already-scheduled events belonging to the
254          * paused flow in the output buffers.
255          */
256         dsw_port_flush_out_buffers(dsw, port);
257
258         dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
259
260         /* Make sure any stores to the original port's in_ring is seen
261          * before the ctl message.
262          */
263         rte_smp_wmb();
264
265         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
266 }
267
268 static void
269 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
270                           uint8_t exclude_port_id, int16_t *port_loads,
271                           uint8_t *target_port_id, int16_t *target_load)
272 {
273         int16_t candidate_port_id = -1;
274         int16_t candidate_load = DSW_MAX_LOAD;
275         uint16_t i;
276
277         for (i = 0; i < num_port_ids; i++) {
278                 uint8_t port_id = port_ids[i];
279                 if (port_id != exclude_port_id) {
280                         int16_t load = port_loads[port_id];
281                         if (candidate_port_id == -1 ||
282                             load < candidate_load) {
283                                 candidate_port_id = port_id;
284                                 candidate_load = load;
285                         }
286                 }
287         }
288         *target_port_id = candidate_port_id;
289         *target_load = candidate_load;
290 }
291
292 struct dsw_queue_flow_burst {
293         struct dsw_queue_flow queue_flow;
294         uint16_t count;
295 };
296
297 static inline int
298 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
299 {
300         const struct dsw_queue_flow_burst *burst_a = v_burst_a;
301         const struct dsw_queue_flow_burst *burst_b = v_burst_b;
302
303         int a_count = burst_a->count;
304         int b_count = burst_b->count;
305
306         return a_count - b_count;
307 }
308
309 #define DSW_QF_TO_INT(_qf)                                      \
310         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
311
312 static inline int
313 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
314 {
315         const struct dsw_queue_flow *qf_a = v_qf_a;
316         const struct dsw_queue_flow *qf_b = v_qf_b;
317
318         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
319 }
320
321 static uint16_t
322 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
323                        struct dsw_queue_flow_burst *bursts)
324 {
325         uint16_t i;
326         struct dsw_queue_flow_burst *current_burst = NULL;
327         uint16_t num_bursts = 0;
328
329         /* We don't need the stable property, and the list is likely
330          * large enough for qsort() to outperform dsw_stable_sort(),
331          * so we use qsort() here.
332          */
333         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
334
335         /* arrange the (now-consecutive) events into bursts */
336         for (i = 0; i < qfs_len; i++) {
337                 if (i == 0 ||
338                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
339                         current_burst = &bursts[num_bursts];
340                         current_burst->queue_flow = qfs[i];
341                         current_burst->count = 0;
342                         num_bursts++;
343                 }
344                 current_burst->count++;
345         }
346
347         qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
348
349         return num_bursts;
350 }
351
352 static bool
353 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
354                         int16_t load_limit)
355 {
356         bool below_limit = false;
357         uint16_t i;
358
359         for (i = 0; i < dsw->num_ports; i++) {
360                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
361                 if (load < load_limit)
362                         below_limit = true;
363                 port_loads[i] = load;
364         }
365         return below_limit;
366 }
367
368 static bool
369 dsw_select_migration_target(struct dsw_evdev *dsw,
370                             struct dsw_port *source_port,
371                             struct dsw_queue_flow_burst *bursts,
372                             uint16_t num_bursts, int16_t *port_loads,
373                             int16_t max_load, struct dsw_queue_flow *target_qf,
374                             uint8_t *target_port_id)
375 {
376         uint16_t source_load = port_loads[source_port->id];
377         uint16_t i;
378
379         for (i = 0; i < num_bursts; i++) {
380                 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
381
382                 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
383                                             qf->flow_hash))
384                         continue;
385
386                 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
387                 int16_t target_load;
388
389                 dsw_find_lowest_load_port(queue->serving_ports,
390                                           queue->num_serving_ports,
391                                           source_port->id, port_loads,
392                                           target_port_id, &target_load);
393
394                 if (target_load < source_load &&
395                     target_load < max_load) {
396                         *target_qf = *qf;
397                         return true;
398                 }
399         }
400
401         DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
402                         "no target port found with load less than %d.\n",
403                         num_bursts, DSW_LOAD_TO_PERCENT(max_load));
404
405         return false;
406 }
407
408 static uint8_t
409 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
410 {
411         struct dsw_queue *queue = &dsw->queues[queue_id];
412         uint8_t port_id;
413
414         if (queue->num_serving_ports > 1)
415                 port_id = queue->flow_to_port_map[flow_hash];
416         else
417                 /* A single-link queue, or atomic/ordered/parallel but
418                  * with just a single serving port.
419                  */
420                 port_id = queue->serving_ports[0];
421
422         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
423                    "to port %d.\n", queue_id, flow_hash, port_id);
424
425         return port_id;
426 }
427
428 static void
429 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
430                            uint8_t dest_port_id)
431 {
432         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
433         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
434         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
435         uint16_t enqueued = 0;
436
437         if (*buffer_len == 0)
438                 return;
439
440         /* The rings are dimensioned to fit all in-flight events (even
441          * on a single ring), so looping will work.
442          */
443         do {
444                 enqueued +=
445                         rte_event_ring_enqueue_burst(dest_port->in_ring,
446                                                      buffer+enqueued,
447                                                      *buffer_len-enqueued,
448                                                      NULL);
449         } while (unlikely(enqueued != *buffer_len));
450
451         (*buffer_len) = 0;
452 }
453
454 static uint16_t
455 dsw_port_get_parallel_flow_id(struct dsw_port *port)
456 {
457         uint16_t flow_id = port->next_parallel_flow_id;
458
459         port->next_parallel_flow_id =
460                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
461
462         return flow_id;
463 }
464
465 static void
466 dsw_port_buffer_paused(struct dsw_port *port,
467                        const struct rte_event *paused_event)
468 {
469         port->paused_events[port->paused_events_len] = *paused_event;
470         port->paused_events_len++;
471 }
472
473 static void
474 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
475                            uint8_t dest_port_id, const struct rte_event *event)
476 {
477         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
478         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
479
480         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
481                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
482
483         buffer[*buffer_len] = *event;
484
485         (*buffer_len)++;
486 }
487
488 #define DSW_FLOW_ID_BITS (24)
489 static uint16_t
490 dsw_flow_id_hash(uint32_t flow_id)
491 {
492         uint16_t hash = 0;
493         uint16_t offset = 0;
494
495         do {
496                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
497                 offset += DSW_MAX_FLOWS_BITS;
498         } while (offset < DSW_FLOW_ID_BITS);
499
500         return hash;
501 }
502
503 static void
504 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
505                          struct rte_event event)
506 {
507         uint8_t dest_port_id;
508
509         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
510
511         dest_port_id = dsw_schedule(dsw, event.queue_id,
512                                     dsw_flow_id_hash(event.flow_id));
513
514         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
515 }
516
517 static void
518 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
519                       const struct rte_event *event)
520 {
521         uint16_t flow_hash;
522         uint8_t dest_port_id;
523
524         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
525                      RTE_SCHED_TYPE_PARALLEL)) {
526                 dsw_port_buffer_parallel(dsw, source_port, *event);
527                 return;
528         }
529
530         flow_hash = dsw_flow_id_hash(event->flow_id);
531
532         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
533                                              flow_hash))) {
534                 dsw_port_buffer_paused(source_port, event);
535                 return;
536         }
537
538         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
539
540         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
541 }
542
543 static void
544 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
545                              struct dsw_port *source_port,
546                              uint8_t queue_id, uint16_t paused_flow_hash)
547 {
548         uint16_t paused_events_len = source_port->paused_events_len;
549         struct rte_event paused_events[paused_events_len];
550         uint8_t dest_port_id;
551         uint16_t i;
552
553         if (paused_events_len == 0)
554                 return;
555
556         if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
557                 return;
558
559         rte_memcpy(paused_events, source_port->paused_events,
560                    paused_events_len * sizeof(struct rte_event));
561
562         source_port->paused_events_len = 0;
563
564         dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
565
566         for (i = 0; i < paused_events_len; i++) {
567                 struct rte_event *event = &paused_events[i];
568                 uint16_t flow_hash;
569
570                 flow_hash = dsw_flow_id_hash(event->flow_id);
571
572                 if (event->queue_id == queue_id &&
573                     flow_hash == paused_flow_hash)
574                         dsw_port_buffer_non_paused(dsw, source_port,
575                                                    dest_port_id, event);
576                 else
577                         dsw_port_buffer_paused(source_port, event);
578         }
579 }
580
581 static void
582 dsw_port_migration_stats(struct dsw_port *port)
583 {
584         uint64_t migration_latency;
585
586         migration_latency = (rte_get_timer_cycles() - port->migration_start);
587         port->migration_latency += migration_latency;
588         port->migrations++;
589 }
590
591 static void
592 dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
593 {
594         uint8_t queue_id = port->migration_target_qf.queue_id;
595         uint16_t flow_hash = port->migration_target_qf.flow_hash;
596
597         port->migration_state = DSW_MIGRATION_STATE_IDLE;
598         port->seen_events_len = 0;
599
600         dsw_port_migration_stats(port);
601
602         if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
603                 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
604                 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
605         }
606
607         DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
608                         "%d flow_hash %d.\n", queue_id, flow_hash);
609 }
610
611 static void
612 dsw_port_consider_migration(struct dsw_evdev *dsw,
613                             struct dsw_port *source_port,
614                             uint64_t now)
615 {
616         bool any_port_below_limit;
617         struct dsw_queue_flow *seen_events = source_port->seen_events;
618         uint16_t seen_events_len = source_port->seen_events_len;
619         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
620         uint16_t num_bursts;
621         int16_t source_port_load;
622         int16_t port_loads[dsw->num_ports];
623
624         if (now < source_port->next_migration)
625                 return;
626
627         if (dsw->num_ports == 1)
628                 return;
629
630         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
631
632         /* Randomize interval to avoid having all threads considering
633          * migration at the same in point in time, which might lead to
634          * all choosing the same target port.
635          */
636         source_port->next_migration = now +
637                 source_port->migration_interval / 2 +
638                 rte_rand() % source_port->migration_interval;
639
640         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
641                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
642                                 "Migration already in progress.\n");
643                 return;
644         }
645
646         /* For simplicity, avoid migration in the unlikely case there
647          * is still events to consume in the in_buffer (from the last
648          * migration).
649          */
650         if (source_port->in_buffer_len > 0) {
651                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
652                                 "events in the input buffer.\n");
653                 return;
654         }
655
656         source_port_load = rte_atomic16_read(&source_port->load);
657         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
658                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
659                                 "Load %d is below threshold level %d.\n",
660                                 DSW_LOAD_TO_PERCENT(source_port_load),
661                        DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
662                 return;
663         }
664
665         /* Avoid starting any expensive operations (sorting etc), in
666          * case of a scenario with all ports above the load limit.
667          */
668         any_port_below_limit =
669                 dsw_retrieve_port_loads(dsw, port_loads,
670                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
671         if (!any_port_below_limit) {
672                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
673                                 "Candidate target ports are all too highly "
674                                 "loaded.\n");
675                 return;
676         }
677
678         /* Sort flows into 'bursts' to allow attempting to migrating
679          * small (but still active) flows first - this it to avoid
680          * having large flows moving around the worker cores too much
681          * (to avoid cache misses, among other things). Of course, the
682          * number of recorded events (queue+flow ids) are limited, and
683          * provides only a snapshot, so only so many conclusions can
684          * be drawn from this data.
685          */
686         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
687                                             bursts);
688         /* For non-big-little systems, there's no point in moving the
689          * only (known) flow.
690          */
691         if (num_bursts < 2) {
692                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
693                                 "queue_id %d flow_hash %d has been seen.\n",
694                                 bursts[0].queue_flow.queue_id,
695                                 bursts[0].queue_flow.flow_hash);
696                 return;
697         }
698
699         /* The strategy is to first try to find a flow to move to a
700          * port with low load (below the migration-attempt
701          * threshold). If that fails, we try to find a port which is
702          * below the max threshold, and also less loaded than this
703          * port is.
704          */
705         if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
706                                          port_loads,
707                                          DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
708                                          &source_port->migration_target_qf,
709                                          &source_port->migration_target_port_id)
710             &&
711             !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
712                                          port_loads,
713                                          DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
714                                          &source_port->migration_target_qf,
715                                        &source_port->migration_target_port_id))
716                 return;
717
718         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
719                         "flow_hash %d from port %d to port %d.\n",
720                         source_port->migration_target_qf.queue_id,
721                         source_port->migration_target_qf.flow_hash,
722                         source_port->id, source_port->migration_target_port_id);
723
724         /* We have a winner. */
725
726         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
727         source_port->migration_start = rte_get_timer_cycles();
728
729         /* No need to go through the whole pause procedure for
730          * parallel queues, since atomic/ordered semantics need not to
731          * be maintained.
732          */
733
734         if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
735             == RTE_SCHED_TYPE_PARALLEL) {
736                 uint8_t queue_id = source_port->migration_target_qf.queue_id;
737                 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
738                 uint8_t dest_port_id = source_port->migration_target_port_id;
739
740                 /* Single byte-sized stores are always atomic. */
741                 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
742                         dest_port_id;
743                 rte_smp_wmb();
744
745                 dsw_port_end_migration(dsw, source_port);
746
747                 return;
748         }
749
750         /* There might be 'loopback' events already scheduled in the
751          * output buffers.
752          */
753         dsw_port_flush_out_buffers(dsw, source_port);
754
755         dsw_port_add_paused_flow(source_port,
756                                  source_port->migration_target_qf.queue_id,
757                                  source_port->migration_target_qf.flow_hash);
758
759         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
760                                source_port->migration_target_qf.queue_id,
761                                source_port->migration_target_qf.flow_hash);
762         source_port->cfm_cnt = 0;
763 }
764
765 static void
766 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
767                              struct dsw_port *source_port,
768                              uint8_t queue_id, uint16_t paused_flow_hash);
769
770 static void
771 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
772                              uint8_t originating_port_id, uint8_t queue_id,
773                              uint16_t paused_flow_hash)
774 {
775         struct dsw_ctl_msg cfm = {
776                 .type = DSW_CTL_CFM,
777                 .originating_port_id = port->id,
778                 .queue_id = queue_id,
779                 .flow_hash = paused_flow_hash
780         };
781
782         DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
783                         queue_id, paused_flow_hash);
784
785         dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
786
787         rte_smp_rmb();
788
789         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
790
791         dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
792 }
793
794 #define FORWARD_BURST_SIZE (32)
795
796 static void
797 dsw_port_forward_migrated_flow(struct dsw_port *source_port,
798                                struct rte_event_ring *dest_ring,
799                                uint8_t queue_id,
800                                uint16_t flow_hash)
801 {
802         uint16_t events_left;
803
804         /* Control ring message should been seen before the ring count
805          * is read on the port's in_ring.
806          */
807         rte_smp_rmb();
808
809         events_left = rte_event_ring_count(source_port->in_ring);
810
811         while (events_left > 0) {
812                 uint16_t in_burst_size =
813                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
814                 struct rte_event in_burst[in_burst_size];
815                 uint16_t in_len;
816                 uint16_t i;
817
818                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
819                                                       in_burst,
820                                                       in_burst_size, NULL);
821                 /* No need to care about bursting forwarded events (to
822                  * the destination port's in_ring), since migration
823                  * doesn't happen very often, and also the majority of
824                  * the dequeued events will likely *not* be forwarded.
825                  */
826                 for (i = 0; i < in_len; i++) {
827                         struct rte_event *e = &in_burst[i];
828                         if (e->queue_id == queue_id &&
829                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
830                                 while (rte_event_ring_enqueue_burst(dest_ring,
831                                                                     e, 1,
832                                                                     NULL) != 1)
833                                         rte_pause();
834                         } else {
835                                 uint16_t last_idx = source_port->in_buffer_len;
836                                 source_port->in_buffer[last_idx] = *e;
837                                 source_port->in_buffer_len++;
838                         }
839                 }
840
841                 events_left -= in_len;
842         }
843 }
844
845 static void
846 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
847                              struct dsw_port *source_port)
848 {
849         uint8_t queue_id = source_port->migration_target_qf.queue_id;
850         uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
851         uint8_t dest_port_id = source_port->migration_target_port_id;
852         struct dsw_port *dest_port = &dsw->ports[dest_port_id];
853
854         dsw_port_flush_out_buffers(dsw, source_port);
855
856         rte_smp_wmb();
857
858         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
859                 dest_port_id;
860
861         dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
862                                        queue_id, flow_hash);
863
864         /* Flow table update and migration destination port's enqueues
865          * must be seen before the control message.
866          */
867         rte_smp_wmb();
868
869         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
870                                flow_hash);
871         source_port->cfm_cnt = 0;
872         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
873 }
874
875 static void
876 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
877 {
878         port->cfm_cnt++;
879
880         if (port->cfm_cnt == (dsw->num_ports-1)) {
881                 switch (port->migration_state) {
882                 case DSW_MIGRATION_STATE_PAUSING:
883                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
884                                         "migration state.\n");
885                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
886                         break;
887                 case DSW_MIGRATION_STATE_UNPAUSING:
888                         dsw_port_end_migration(dsw, port);
889                         break;
890                 default:
891                         RTE_ASSERT(0);
892                         break;
893                 }
894         }
895 }
896
897 static void
898 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
899 {
900         struct dsw_ctl_msg msg;
901
902         /* So any table loads happens before the ring dequeue, in the
903          * case of a 'paus' message.
904          */
905         rte_smp_rmb();
906
907         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
908                 switch (msg.type) {
909                 case DSW_CTL_PAUS_REQ:
910                         dsw_port_handle_pause_flow(dsw, port,
911                                                    msg.originating_port_id,
912                                                    msg.queue_id, msg.flow_hash);
913                         break;
914                 case DSW_CTL_UNPAUS_REQ:
915                         dsw_port_handle_unpause_flow(dsw, port,
916                                                      msg.originating_port_id,
917                                                      msg.queue_id,
918                                                      msg.flow_hash);
919                         break;
920                 case DSW_CTL_CFM:
921                         dsw_port_handle_confirm(dsw, port);
922                         break;
923                 }
924         }
925 }
926
927 static void
928 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
929 {
930         /* To pull the control ring reasonbly often on busy ports,
931          * each dequeued/enqueued event is considered an 'op' too.
932          */
933         port->ops_since_bg_task += (num_events+1);
934 }
935
936 static void
937 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
938 {
939         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
940                      port->pending_releases == 0))
941                 dsw_port_move_migrating_flow(dsw, port);
942
943         /* Polling the control ring is relatively inexpensive, and
944          * polling it often helps bringing down migration latency, so
945          * do this for every iteration.
946          */
947         dsw_port_ctl_process(dsw, port);
948
949         /* To avoid considering migration and flushing output buffers
950          * on every dequeue/enqueue call, the scheduler only performs
951          * such 'background' tasks every nth
952          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
953          */
954         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
955                 uint64_t now;
956
957                 now = rte_get_timer_cycles();
958
959                 port->last_bg = now;
960
961                 /* Logic to avoid having events linger in the output
962                  * buffer too long.
963                  */
964                 dsw_port_flush_out_buffers(dsw, port);
965
966                 dsw_port_consider_load_update(port, now);
967
968                 dsw_port_consider_migration(dsw, port, now);
969
970                 port->ops_since_bg_task = 0;
971         }
972 }
973
974 static void
975 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
976 {
977         uint16_t dest_port_id;
978
979         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
980                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
981 }
982
983 uint16_t
984 dsw_event_enqueue(void *port, const struct rte_event *ev)
985 {
986         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
987 }
988
989 static __rte_always_inline uint16_t
990 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
991                                 uint16_t events_len, bool op_types_known,
992                                 uint16_t num_new, uint16_t num_release,
993                                 uint16_t num_non_release)
994 {
995         struct dsw_port *source_port = port;
996         struct dsw_evdev *dsw = source_port->dsw;
997         bool enough_credits;
998         uint16_t i;
999
1000         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1001                         "events to port %d.\n", events_len, source_port->id);
1002
1003         dsw_port_bg_process(dsw, source_port);
1004
1005         /* XXX: For performance (=ring efficiency) reasons, the
1006          * scheduler relies on internal non-ring buffers instead of
1007          * immediately sending the event to the destination ring. For
1008          * a producer that doesn't intend to produce or consume any
1009          * more events, the scheduler provides a way to flush the
1010          * buffer, by means of doing an enqueue of zero events. In
1011          * addition, a port cannot be left "unattended" (e.g. unused)
1012          * for long periods of time, since that would stall
1013          * migration. Eventdev API extensions to provide a cleaner way
1014          * to archieve both of these functions should be
1015          * considered.
1016          */
1017         if (unlikely(events_len == 0)) {
1018                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1019                 return 0;
1020         }
1021
1022         if (unlikely(events_len > source_port->enqueue_depth))
1023                 events_len = source_port->enqueue_depth;
1024
1025         dsw_port_note_op(source_port, events_len);
1026
1027         if (!op_types_known)
1028                 for (i = 0; i < events_len; i++) {
1029                         switch (events[i].op) {
1030                         case RTE_EVENT_OP_RELEASE:
1031                                 num_release++;
1032                                 break;
1033                         case RTE_EVENT_OP_NEW:
1034                                 num_new++;
1035                                 /* Falls through. */
1036                         default:
1037                                 num_non_release++;
1038                                 break;
1039                         }
1040                 }
1041
1042         /* Technically, we could allow the non-new events up to the
1043          * first new event in the array into the system, but for
1044          * simplicity reasons, we deny the whole burst if the port is
1045          * above the water mark.
1046          */
1047         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1048                      source_port->new_event_threshold))
1049                 return 0;
1050
1051         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1052                                                   num_non_release);
1053         if (unlikely(!enough_credits))
1054                 return 0;
1055
1056         source_port->pending_releases -= num_release;
1057
1058         for (i = 0; i < events_len; i++) {
1059                 const struct rte_event *event = &events[i];
1060
1061                 if (likely(num_release == 0 ||
1062                            event->op != RTE_EVENT_OP_RELEASE))
1063                         dsw_port_buffer_event(dsw, source_port, event);
1064         }
1065
1066         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1067                         "accepted.\n", num_non_release);
1068
1069         return num_non_release;
1070 }
1071
1072 uint16_t
1073 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1074                         uint16_t events_len)
1075 {
1076         return dsw_event_enqueue_burst_generic(port, events, events_len, false,
1077                                                0, 0, 0);
1078 }
1079
1080 uint16_t
1081 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1082                             uint16_t events_len)
1083 {
1084         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1085                                                events_len, 0, events_len);
1086 }
1087
1088 uint16_t
1089 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1090                                 uint16_t events_len)
1091 {
1092         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1093                                                0, 0, events_len);
1094 }
1095
1096 uint16_t
1097 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1098 {
1099         return dsw_event_dequeue_burst(port, events, 1, wait);
1100 }
1101
1102 static void
1103 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1104                             uint16_t num)
1105 {
1106         uint16_t i;
1107
1108         for (i = 0; i < num; i++) {
1109                 uint16_t l_idx = port->seen_events_idx;
1110                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1111                 struct rte_event *event = &events[i];
1112                 qf->queue_id = event->queue_id;
1113                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1114
1115                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1116         }
1117
1118         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1119                 port->seen_events_len =
1120                         RTE_MIN(port->seen_events_len + num,
1121                                 DSW_MAX_EVENTS_RECORDED);
1122 }
1123
1124 static uint16_t
1125 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1126                        uint16_t num)
1127 {
1128         struct dsw_port *source_port = port;
1129         struct dsw_evdev *dsw = source_port->dsw;
1130
1131         dsw_port_ctl_process(dsw, source_port);
1132
1133         if (unlikely(port->in_buffer_len > 0)) {
1134                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1135
1136                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1137                            dequeued * sizeof(struct rte_event));
1138
1139                 port->in_buffer_start += dequeued;
1140                 port->in_buffer_len -= dequeued;
1141
1142                 if (port->in_buffer_len == 0)
1143                         port->in_buffer_start = 0;
1144
1145                 return dequeued;
1146         }
1147
1148         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1149 }
1150
1151 uint16_t
1152 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1153                         uint64_t wait __rte_unused)
1154 {
1155         struct dsw_port *source_port = port;
1156         struct dsw_evdev *dsw = source_port->dsw;
1157         uint16_t dequeued;
1158
1159         source_port->pending_releases = 0;
1160
1161         dsw_port_bg_process(dsw, source_port);
1162
1163         if (unlikely(num > source_port->dequeue_depth))
1164                 num = source_port->dequeue_depth;
1165
1166         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1167
1168         source_port->pending_releases = dequeued;
1169
1170         dsw_port_load_record(source_port, dequeued);
1171
1172         dsw_port_note_op(source_port, dequeued);
1173
1174         if (dequeued > 0) {
1175                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1176                                 dequeued);
1177
1178                 dsw_port_return_credits(dsw, source_port, dequeued);
1179
1180                 /* One potential optimization one might think of is to
1181                  * add a migration state (prior to 'pausing'), and
1182                  * only record seen events when the port is in this
1183                  * state (and transit to 'pausing' when enough events
1184                  * have been gathered). However, that schema doesn't
1185                  * seem to improve performance.
1186                  */
1187                 dsw_port_record_seen_events(port, events, dequeued);
1188         }
1189         /* XXX: Assuming the port can't produce any more work,
1190          *      consider flushing the output buffer, on dequeued ==
1191          *      0.
1192          */
1193
1194         return dequeued;
1195 }