event/dsw: sort events on dequeue
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #ifdef DSW_SORT_DEQUEUED
8 #include "dsw_sort.h"
9 #endif
10
11 #include <stdbool.h>
12 #include <string.h>
13
14 #include <rte_atomic.h>
15 #include <rte_cycles.h>
16 #include <rte_memcpy.h>
17 #include <rte_random.h>
18
19 static bool
20 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21                          int32_t credits)
22 {
23         int32_t inflight_credits = port->inflight_credits;
24         int32_t missing_credits = credits - inflight_credits;
25         int32_t total_on_loan;
26         int32_t available;
27         int32_t acquired_credits;
28         int32_t new_total_on_loan;
29
30         if (likely(missing_credits <= 0)) {
31                 port->inflight_credits -= credits;
32                 return true;
33         }
34
35         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
36         available = dsw->max_inflight - total_on_loan;
37         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
38
39         if (available < acquired_credits)
40                 return false;
41
42         /* This is a race, no locks are involved, and thus some other
43          * thread can allocate tokens in between the check and the
44          * allocation.
45          */
46         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
47                                                     acquired_credits);
48
49         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
50                 /* Some other port took the last credits */
51                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
52                 return false;
53         }
54
55         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
56                         acquired_credits);
57
58         port->inflight_credits += acquired_credits;
59         port->inflight_credits -= credits;
60
61         return true;
62 }
63
64 static void
65 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
66                         int32_t credits)
67 {
68         port->inflight_credits += credits;
69
70         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
71                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
72                 int32_t return_credits =
73                         port->inflight_credits - leave_credits;
74
75                 port->inflight_credits = leave_credits;
76
77                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
78
79                 DSW_LOG_DP_PORT(DEBUG, port->id,
80                                 "Returned %d tokens to pool.\n",
81                                 return_credits);
82         }
83 }
84
85 static void
86 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
87 {
88         if (dequeued > 0 && port->busy_start == 0)
89                 /* work period begins */
90                 port->busy_start = rte_get_timer_cycles();
91         else if (dequeued == 0 && port->busy_start > 0) {
92                 /* work period ends */
93                 uint64_t work_period =
94                         rte_get_timer_cycles() - port->busy_start;
95                 port->busy_cycles += work_period;
96                 port->busy_start = 0;
97         }
98 }
99
100 static int16_t
101 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
102 {
103         uint64_t passed = now - port->measurement_start;
104         uint64_t busy_cycles = port->busy_cycles;
105
106         if (port->busy_start > 0) {
107                 busy_cycles += (now - port->busy_start);
108                 port->busy_start = now;
109         }
110
111         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
112
113         port->measurement_start = now;
114         port->busy_cycles = 0;
115
116         port->total_busy_cycles += busy_cycles;
117
118         return load;
119 }
120
121 static void
122 dsw_port_load_update(struct dsw_port *port, uint64_t now)
123 {
124         int16_t old_load;
125         int16_t period_load;
126         int16_t new_load;
127
128         old_load = rte_atomic16_read(&port->load);
129
130         period_load = dsw_port_load_close_period(port, now);
131
132         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
133                 (DSW_OLD_LOAD_WEIGHT+1);
134
135         rte_atomic16_set(&port->load, new_load);
136 }
137
138 static void
139 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
140 {
141         if (now < port->next_load_update)
142                 return;
143
144         port->next_load_update = now + port->load_update_interval;
145
146         dsw_port_load_update(port, now);
147 }
148
149 static void
150 dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
151 {
152         void *raw_msg;
153
154         memcpy(&raw_msg, msg, sizeof(*msg));
155
156         /* there's always room on the ring */
157         while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
158                 rte_pause();
159 }
160
161 static int
162 dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
163 {
164         void *raw_msg;
165         int rc;
166
167         rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
168
169         if (rc == 0)
170                 memcpy(msg, &raw_msg, sizeof(*msg));
171
172         return rc;
173 }
174
175 static void
176 dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
177                        uint8_t type, uint8_t queue_id, uint16_t flow_hash)
178 {
179         uint16_t port_id;
180         struct dsw_ctl_msg msg = {
181                 .type = type,
182                 .originating_port_id = source_port->id,
183                 .queue_id = queue_id,
184                 .flow_hash = flow_hash
185         };
186
187         for (port_id = 0; port_id < dsw->num_ports; port_id++)
188                 if (port_id != source_port->id)
189                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
190 }
191
192 static bool
193 dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
194                         uint16_t flow_hash)
195 {
196         uint16_t i;
197
198         for (i = 0; i < port->paused_flows_len; i++) {
199                 struct dsw_queue_flow *qf = &port->paused_flows[i];
200                 if (qf->queue_id == queue_id &&
201                     qf->flow_hash == flow_hash)
202                         return true;
203         }
204         return false;
205 }
206
207 static void
208 dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
209                          uint16_t paused_flow_hash)
210 {
211         port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
212                 .queue_id = queue_id,
213                 .flow_hash = paused_flow_hash
214         };
215         port->paused_flows_len++;
216 }
217
218 static void
219 dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
220                             uint16_t paused_flow_hash)
221 {
222         uint16_t i;
223
224         for (i = 0; i < port->paused_flows_len; i++) {
225                 struct dsw_queue_flow *qf = &port->paused_flows[i];
226
227                 if (qf->queue_id == queue_id &&
228                     qf->flow_hash == paused_flow_hash) {
229                         uint16_t last_idx = port->paused_flows_len-1;
230                         if (i != last_idx)
231                                 port->paused_flows[i] =
232                                         port->paused_flows[last_idx];
233                         port->paused_flows_len--;
234                         break;
235                 }
236         }
237 }
238
239 static void
240 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
241
242 static void
243 dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
244                            uint8_t originating_port_id, uint8_t queue_id,
245                            uint16_t paused_flow_hash)
246 {
247         struct dsw_ctl_msg cfm = {
248                 .type = DSW_CTL_CFM,
249                 .originating_port_id = port->id,
250                 .queue_id = queue_id,
251                 .flow_hash = paused_flow_hash
252         };
253
254         DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
255                         queue_id, paused_flow_hash);
256
257         /* There might be already-scheduled events belonging to the
258          * paused flow in the output buffers.
259          */
260         dsw_port_flush_out_buffers(dsw, port);
261
262         dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
263
264         /* Make sure any stores to the original port's in_ring is seen
265          * before the ctl message.
266          */
267         rte_smp_wmb();
268
269         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
270 }
271
272 static void
273 dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
274                           uint8_t exclude_port_id, int16_t *port_loads,
275                           uint8_t *target_port_id, int16_t *target_load)
276 {
277         int16_t candidate_port_id = -1;
278         int16_t candidate_load = DSW_MAX_LOAD;
279         uint16_t i;
280
281         for (i = 0; i < num_port_ids; i++) {
282                 uint8_t port_id = port_ids[i];
283                 if (port_id != exclude_port_id) {
284                         int16_t load = port_loads[port_id];
285                         if (candidate_port_id == -1 ||
286                             load < candidate_load) {
287                                 candidate_port_id = port_id;
288                                 candidate_load = load;
289                         }
290                 }
291         }
292         *target_port_id = candidate_port_id;
293         *target_load = candidate_load;
294 }
295
296 struct dsw_queue_flow_burst {
297         struct dsw_queue_flow queue_flow;
298         uint16_t count;
299 };
300
301 static inline int
302 dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
303 {
304         const struct dsw_queue_flow_burst *burst_a = v_burst_a;
305         const struct dsw_queue_flow_burst *burst_b = v_burst_b;
306
307         int a_count = burst_a->count;
308         int b_count = burst_b->count;
309
310         return a_count - b_count;
311 }
312
313 #define DSW_QF_TO_INT(_qf)                                      \
314         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
315
316 static inline int
317 dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
318 {
319         const struct dsw_queue_flow *qf_a = v_qf_a;
320         const struct dsw_queue_flow *qf_b = v_qf_b;
321
322         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
323 }
324
325 static uint16_t
326 dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
327                        struct dsw_queue_flow_burst *bursts)
328 {
329         uint16_t i;
330         struct dsw_queue_flow_burst *current_burst = NULL;
331         uint16_t num_bursts = 0;
332
333         /* We don't need the stable property, and the list is likely
334          * large enough for qsort() to outperform dsw_stable_sort(),
335          * so we use qsort() here.
336          */
337         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
338
339         /* arrange the (now-consecutive) events into bursts */
340         for (i = 0; i < qfs_len; i++) {
341                 if (i == 0 ||
342                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
343                         current_burst = &bursts[num_bursts];
344                         current_burst->queue_flow = qfs[i];
345                         current_burst->count = 0;
346                         num_bursts++;
347                 }
348                 current_burst->count++;
349         }
350
351         qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
352
353         return num_bursts;
354 }
355
356 static bool
357 dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
358                         int16_t load_limit)
359 {
360         bool below_limit = false;
361         uint16_t i;
362
363         for (i = 0; i < dsw->num_ports; i++) {
364                 int16_t load = rte_atomic16_read(&dsw->ports[i].load);
365                 if (load < load_limit)
366                         below_limit = true;
367                 port_loads[i] = load;
368         }
369         return below_limit;
370 }
371
372 static bool
373 dsw_select_migration_target(struct dsw_evdev *dsw,
374                             struct dsw_port *source_port,
375                             struct dsw_queue_flow_burst *bursts,
376                             uint16_t num_bursts, int16_t *port_loads,
377                             int16_t max_load, struct dsw_queue_flow *target_qf,
378                             uint8_t *target_port_id)
379 {
380         uint16_t source_load = port_loads[source_port->id];
381         uint16_t i;
382
383         for (i = 0; i < num_bursts; i++) {
384                 struct dsw_queue_flow *qf = &bursts[i].queue_flow;
385
386                 if (dsw_port_is_flow_paused(source_port, qf->queue_id,
387                                             qf->flow_hash))
388                         continue;
389
390                 struct dsw_queue *queue = &dsw->queues[qf->queue_id];
391                 int16_t target_load;
392
393                 dsw_find_lowest_load_port(queue->serving_ports,
394                                           queue->num_serving_ports,
395                                           source_port->id, port_loads,
396                                           target_port_id, &target_load);
397
398                 if (target_load < source_load &&
399                     target_load < max_load) {
400                         *target_qf = *qf;
401                         return true;
402                 }
403         }
404
405         DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
406                         "no target port found with load less than %d.\n",
407                         num_bursts, DSW_LOAD_TO_PERCENT(max_load));
408
409         return false;
410 }
411
412 static uint8_t
413 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
414 {
415         struct dsw_queue *queue = &dsw->queues[queue_id];
416         uint8_t port_id;
417
418         if (queue->num_serving_ports > 1)
419                 port_id = queue->flow_to_port_map[flow_hash];
420         else
421                 /* A single-link queue, or atomic/ordered/parallel but
422                  * with just a single serving port.
423                  */
424                 port_id = queue->serving_ports[0];
425
426         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
427                    "to port %d.\n", queue_id, flow_hash, port_id);
428
429         return port_id;
430 }
431
432 static void
433 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
434                            uint8_t dest_port_id)
435 {
436         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
437         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
438         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
439         uint16_t enqueued = 0;
440
441         if (*buffer_len == 0)
442                 return;
443
444         /* The rings are dimensioned to fit all in-flight events (even
445          * on a single ring), so looping will work.
446          */
447         do {
448                 enqueued +=
449                         rte_event_ring_enqueue_burst(dest_port->in_ring,
450                                                      buffer+enqueued,
451                                                      *buffer_len-enqueued,
452                                                      NULL);
453         } while (unlikely(enqueued != *buffer_len));
454
455         (*buffer_len) = 0;
456 }
457
458 static uint16_t
459 dsw_port_get_parallel_flow_id(struct dsw_port *port)
460 {
461         uint16_t flow_id = port->next_parallel_flow_id;
462
463         port->next_parallel_flow_id =
464                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
465
466         return flow_id;
467 }
468
469 static void
470 dsw_port_buffer_paused(struct dsw_port *port,
471                        const struct rte_event *paused_event)
472 {
473         port->paused_events[port->paused_events_len] = *paused_event;
474         port->paused_events_len++;
475 }
476
477 static void
478 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
479                            uint8_t dest_port_id, const struct rte_event *event)
480 {
481         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
482         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
483
484         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
485                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
486
487         buffer[*buffer_len] = *event;
488
489         (*buffer_len)++;
490 }
491
492 #define DSW_FLOW_ID_BITS (24)
493 static uint16_t
494 dsw_flow_id_hash(uint32_t flow_id)
495 {
496         uint16_t hash = 0;
497         uint16_t offset = 0;
498
499         do {
500                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
501                 offset += DSW_MAX_FLOWS_BITS;
502         } while (offset < DSW_FLOW_ID_BITS);
503
504         return hash;
505 }
506
507 static void
508 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
509                          struct rte_event event)
510 {
511         uint8_t dest_port_id;
512
513         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
514
515         dest_port_id = dsw_schedule(dsw, event.queue_id,
516                                     dsw_flow_id_hash(event.flow_id));
517
518         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
519 }
520
521 static void
522 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
523                       const struct rte_event *event)
524 {
525         uint16_t flow_hash;
526         uint8_t dest_port_id;
527
528         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
529                      RTE_SCHED_TYPE_PARALLEL)) {
530                 dsw_port_buffer_parallel(dsw, source_port, *event);
531                 return;
532         }
533
534         flow_hash = dsw_flow_id_hash(event->flow_id);
535
536         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
537                                              flow_hash))) {
538                 dsw_port_buffer_paused(source_port, event);
539                 return;
540         }
541
542         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
543
544         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
545 }
546
547 static void
548 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
549                              struct dsw_port *source_port,
550                              uint8_t queue_id, uint16_t paused_flow_hash)
551 {
552         uint16_t paused_events_len = source_port->paused_events_len;
553         struct rte_event paused_events[paused_events_len];
554         uint8_t dest_port_id;
555         uint16_t i;
556
557         if (paused_events_len == 0)
558                 return;
559
560         if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
561                 return;
562
563         rte_memcpy(paused_events, source_port->paused_events,
564                    paused_events_len * sizeof(struct rte_event));
565
566         source_port->paused_events_len = 0;
567
568         dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
569
570         for (i = 0; i < paused_events_len; i++) {
571                 struct rte_event *event = &paused_events[i];
572                 uint16_t flow_hash;
573
574                 flow_hash = dsw_flow_id_hash(event->flow_id);
575
576                 if (event->queue_id == queue_id &&
577                     flow_hash == paused_flow_hash)
578                         dsw_port_buffer_non_paused(dsw, source_port,
579                                                    dest_port_id, event);
580                 else
581                         dsw_port_buffer_paused(source_port, event);
582         }
583 }
584
585 static void
586 dsw_port_migration_stats(struct dsw_port *port)
587 {
588         uint64_t migration_latency;
589
590         migration_latency = (rte_get_timer_cycles() - port->migration_start);
591         port->migration_latency += migration_latency;
592         port->migrations++;
593 }
594
595 static void
596 dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
597 {
598         uint8_t queue_id = port->migration_target_qf.queue_id;
599         uint16_t flow_hash = port->migration_target_qf.flow_hash;
600
601         port->migration_state = DSW_MIGRATION_STATE_IDLE;
602         port->seen_events_len = 0;
603
604         dsw_port_migration_stats(port);
605
606         if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
607                 dsw_port_remove_paused_flow(port, queue_id, flow_hash);
608                 dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
609         }
610
611         DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
612                         "%d flow_hash %d.\n", queue_id, flow_hash);
613 }
614
615 static void
616 dsw_port_consider_migration(struct dsw_evdev *dsw,
617                             struct dsw_port *source_port,
618                             uint64_t now)
619 {
620         bool any_port_below_limit;
621         struct dsw_queue_flow *seen_events = source_port->seen_events;
622         uint16_t seen_events_len = source_port->seen_events_len;
623         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
624         uint16_t num_bursts;
625         int16_t source_port_load;
626         int16_t port_loads[dsw->num_ports];
627
628         if (now < source_port->next_migration)
629                 return;
630
631         if (dsw->num_ports == 1)
632                 return;
633
634         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
635
636         /* Randomize interval to avoid having all threads considering
637          * migration at the same in point in time, which might lead to
638          * all choosing the same target port.
639          */
640         source_port->next_migration = now +
641                 source_port->migration_interval / 2 +
642                 rte_rand() % source_port->migration_interval;
643
644         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
645                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
646                                 "Migration already in progress.\n");
647                 return;
648         }
649
650         /* For simplicity, avoid migration in the unlikely case there
651          * is still events to consume in the in_buffer (from the last
652          * migration).
653          */
654         if (source_port->in_buffer_len > 0) {
655                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
656                                 "events in the input buffer.\n");
657                 return;
658         }
659
660         source_port_load = rte_atomic16_read(&source_port->load);
661         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
662                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
663                                 "Load %d is below threshold level %d.\n",
664                                 DSW_LOAD_TO_PERCENT(source_port_load),
665                        DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
666                 return;
667         }
668
669         /* Avoid starting any expensive operations (sorting etc), in
670          * case of a scenario with all ports above the load limit.
671          */
672         any_port_below_limit =
673                 dsw_retrieve_port_loads(dsw, port_loads,
674                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
675         if (!any_port_below_limit) {
676                 DSW_LOG_DP_PORT(DEBUG, source_port->id,
677                                 "Candidate target ports are all too highly "
678                                 "loaded.\n");
679                 return;
680         }
681
682         /* Sort flows into 'bursts' to allow attempting to migrating
683          * small (but still active) flows first - this it to avoid
684          * having large flows moving around the worker cores too much
685          * (to avoid cache misses, among other things). Of course, the
686          * number of recorded events (queue+flow ids) are limited, and
687          * provides only a snapshot, so only so many conclusions can
688          * be drawn from this data.
689          */
690         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
691                                             bursts);
692         /* For non-big-little systems, there's no point in moving the
693          * only (known) flow.
694          */
695         if (num_bursts < 2) {
696                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
697                                 "queue_id %d flow_hash %d has been seen.\n",
698                                 bursts[0].queue_flow.queue_id,
699                                 bursts[0].queue_flow.flow_hash);
700                 return;
701         }
702
703         /* The strategy is to first try to find a flow to move to a
704          * port with low load (below the migration-attempt
705          * threshold). If that fails, we try to find a port which is
706          * below the max threshold, and also less loaded than this
707          * port is.
708          */
709         if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
710                                          port_loads,
711                                          DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
712                                          &source_port->migration_target_qf,
713                                          &source_port->migration_target_port_id)
714             &&
715             !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
716                                          port_loads,
717                                          DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
718                                          &source_port->migration_target_qf,
719                                        &source_port->migration_target_port_id))
720                 return;
721
722         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
723                         "flow_hash %d from port %d to port %d.\n",
724                         source_port->migration_target_qf.queue_id,
725                         source_port->migration_target_qf.flow_hash,
726                         source_port->id, source_port->migration_target_port_id);
727
728         /* We have a winner. */
729
730         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
731         source_port->migration_start = rte_get_timer_cycles();
732
733         /* No need to go through the whole pause procedure for
734          * parallel queues, since atomic/ordered semantics need not to
735          * be maintained.
736          */
737
738         if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
739             == RTE_SCHED_TYPE_PARALLEL) {
740                 uint8_t queue_id = source_port->migration_target_qf.queue_id;
741                 uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
742                 uint8_t dest_port_id = source_port->migration_target_port_id;
743
744                 /* Single byte-sized stores are always atomic. */
745                 dsw->queues[queue_id].flow_to_port_map[flow_hash] =
746                         dest_port_id;
747                 rte_smp_wmb();
748
749                 dsw_port_end_migration(dsw, source_port);
750
751                 return;
752         }
753
754         /* There might be 'loopback' events already scheduled in the
755          * output buffers.
756          */
757         dsw_port_flush_out_buffers(dsw, source_port);
758
759         dsw_port_add_paused_flow(source_port,
760                                  source_port->migration_target_qf.queue_id,
761                                  source_port->migration_target_qf.flow_hash);
762
763         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
764                                source_port->migration_target_qf.queue_id,
765                                source_port->migration_target_qf.flow_hash);
766         source_port->cfm_cnt = 0;
767 }
768
769 static void
770 dsw_port_flush_paused_events(struct dsw_evdev *dsw,
771                              struct dsw_port *source_port,
772                              uint8_t queue_id, uint16_t paused_flow_hash);
773
774 static void
775 dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
776                              uint8_t originating_port_id, uint8_t queue_id,
777                              uint16_t paused_flow_hash)
778 {
779         struct dsw_ctl_msg cfm = {
780                 .type = DSW_CTL_CFM,
781                 .originating_port_id = port->id,
782                 .queue_id = queue_id,
783                 .flow_hash = paused_flow_hash
784         };
785
786         DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
787                         queue_id, paused_flow_hash);
788
789         dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
790
791         rte_smp_rmb();
792
793         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
794
795         dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
796 }
797
798 #define FORWARD_BURST_SIZE (32)
799
800 static void
801 dsw_port_forward_migrated_flow(struct dsw_port *source_port,
802                                struct rte_event_ring *dest_ring,
803                                uint8_t queue_id,
804                                uint16_t flow_hash)
805 {
806         uint16_t events_left;
807
808         /* Control ring message should been seen before the ring count
809          * is read on the port's in_ring.
810          */
811         rte_smp_rmb();
812
813         events_left = rte_event_ring_count(source_port->in_ring);
814
815         while (events_left > 0) {
816                 uint16_t in_burst_size =
817                         RTE_MIN(FORWARD_BURST_SIZE, events_left);
818                 struct rte_event in_burst[in_burst_size];
819                 uint16_t in_len;
820                 uint16_t i;
821
822                 in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
823                                                       in_burst,
824                                                       in_burst_size, NULL);
825                 /* No need to care about bursting forwarded events (to
826                  * the destination port's in_ring), since migration
827                  * doesn't happen very often, and also the majority of
828                  * the dequeued events will likely *not* be forwarded.
829                  */
830                 for (i = 0; i < in_len; i++) {
831                         struct rte_event *e = &in_burst[i];
832                         if (e->queue_id == queue_id &&
833                             dsw_flow_id_hash(e->flow_id) == flow_hash) {
834                                 while (rte_event_ring_enqueue_burst(dest_ring,
835                                                                     e, 1,
836                                                                     NULL) != 1)
837                                         rte_pause();
838                         } else {
839                                 uint16_t last_idx = source_port->in_buffer_len;
840                                 source_port->in_buffer[last_idx] = *e;
841                                 source_port->in_buffer_len++;
842                         }
843                 }
844
845                 events_left -= in_len;
846         }
847 }
848
849 static void
850 dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
851                              struct dsw_port *source_port)
852 {
853         uint8_t queue_id = source_port->migration_target_qf.queue_id;
854         uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
855         uint8_t dest_port_id = source_port->migration_target_port_id;
856         struct dsw_port *dest_port = &dsw->ports[dest_port_id];
857
858         dsw_port_flush_out_buffers(dsw, source_port);
859
860         rte_smp_wmb();
861
862         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
863                 dest_port_id;
864
865         dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
866                                        queue_id, flow_hash);
867
868         /* Flow table update and migration destination port's enqueues
869          * must be seen before the control message.
870          */
871         rte_smp_wmb();
872
873         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
874                                flow_hash);
875         source_port->cfm_cnt = 0;
876         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
877 }
878
879 static void
880 dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
881 {
882         port->cfm_cnt++;
883
884         if (port->cfm_cnt == (dsw->num_ports-1)) {
885                 switch (port->migration_state) {
886                 case DSW_MIGRATION_STATE_PAUSING:
887                         DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
888                                         "migration state.\n");
889                         port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
890                         break;
891                 case DSW_MIGRATION_STATE_UNPAUSING:
892                         dsw_port_end_migration(dsw, port);
893                         break;
894                 default:
895                         RTE_ASSERT(0);
896                         break;
897                 }
898         }
899 }
900
901 static void
902 dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
903 {
904         struct dsw_ctl_msg msg;
905
906         /* So any table loads happens before the ring dequeue, in the
907          * case of a 'paus' message.
908          */
909         rte_smp_rmb();
910
911         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
912                 switch (msg.type) {
913                 case DSW_CTL_PAUS_REQ:
914                         dsw_port_handle_pause_flow(dsw, port,
915                                                    msg.originating_port_id,
916                                                    msg.queue_id, msg.flow_hash);
917                         break;
918                 case DSW_CTL_UNPAUS_REQ:
919                         dsw_port_handle_unpause_flow(dsw, port,
920                                                      msg.originating_port_id,
921                                                      msg.queue_id,
922                                                      msg.flow_hash);
923                         break;
924                 case DSW_CTL_CFM:
925                         dsw_port_handle_confirm(dsw, port);
926                         break;
927                 }
928         }
929 }
930
931 static void
932 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
933 {
934         /* To pull the control ring reasonbly often on busy ports,
935          * each dequeued/enqueued event is considered an 'op' too.
936          */
937         port->ops_since_bg_task += (num_events+1);
938 }
939
940 static void
941 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
942 {
943         if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
944                      port->pending_releases == 0))
945                 dsw_port_move_migrating_flow(dsw, port);
946
947         /* Polling the control ring is relatively inexpensive, and
948          * polling it often helps bringing down migration latency, so
949          * do this for every iteration.
950          */
951         dsw_port_ctl_process(dsw, port);
952
953         /* To avoid considering migration and flushing output buffers
954          * on every dequeue/enqueue call, the scheduler only performs
955          * such 'background' tasks every nth
956          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
957          */
958         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
959                 uint64_t now;
960
961                 now = rte_get_timer_cycles();
962
963                 port->last_bg = now;
964
965                 /* Logic to avoid having events linger in the output
966                  * buffer too long.
967                  */
968                 dsw_port_flush_out_buffers(dsw, port);
969
970                 dsw_port_consider_load_update(port, now);
971
972                 dsw_port_consider_migration(dsw, port, now);
973
974                 port->ops_since_bg_task = 0;
975         }
976 }
977
978 static void
979 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
980 {
981         uint16_t dest_port_id;
982
983         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
984                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
985 }
986
987 uint16_t
988 dsw_event_enqueue(void *port, const struct rte_event *ev)
989 {
990         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
991 }
992
993 static __rte_always_inline uint16_t
994 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
995                                 uint16_t events_len, bool op_types_known,
996                                 uint16_t num_new, uint16_t num_release,
997                                 uint16_t num_non_release)
998 {
999         struct dsw_port *source_port = port;
1000         struct dsw_evdev *dsw = source_port->dsw;
1001         bool enough_credits;
1002         uint16_t i;
1003
1004         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1005                         "events to port %d.\n", events_len, source_port->id);
1006
1007         dsw_port_bg_process(dsw, source_port);
1008
1009         /* XXX: For performance (=ring efficiency) reasons, the
1010          * scheduler relies on internal non-ring buffers instead of
1011          * immediately sending the event to the destination ring. For
1012          * a producer that doesn't intend to produce or consume any
1013          * more events, the scheduler provides a way to flush the
1014          * buffer, by means of doing an enqueue of zero events. In
1015          * addition, a port cannot be left "unattended" (e.g. unused)
1016          * for long periods of time, since that would stall
1017          * migration. Eventdev API extensions to provide a cleaner way
1018          * to archieve both of these functions should be
1019          * considered.
1020          */
1021         if (unlikely(events_len == 0)) {
1022                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1023                 return 0;
1024         }
1025
1026         if (unlikely(events_len > source_port->enqueue_depth))
1027                 events_len = source_port->enqueue_depth;
1028
1029         dsw_port_note_op(source_port, events_len);
1030
1031         if (!op_types_known)
1032                 for (i = 0; i < events_len; i++) {
1033                         switch (events[i].op) {
1034                         case RTE_EVENT_OP_RELEASE:
1035                                 num_release++;
1036                                 break;
1037                         case RTE_EVENT_OP_NEW:
1038                                 num_new++;
1039                                 /* Falls through. */
1040                         default:
1041                                 num_non_release++;
1042                                 break;
1043                         }
1044                 }
1045
1046         /* Technically, we could allow the non-new events up to the
1047          * first new event in the array into the system, but for
1048          * simplicity reasons, we deny the whole burst if the port is
1049          * above the water mark.
1050          */
1051         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
1052                      source_port->new_event_threshold))
1053                 return 0;
1054
1055         enough_credits = dsw_port_acquire_credits(dsw, source_port,
1056                                                   num_non_release);
1057         if (unlikely(!enough_credits))
1058                 return 0;
1059
1060         source_port->pending_releases -= num_release;
1061
1062         for (i = 0; i < events_len; i++) {
1063                 const struct rte_event *event = &events[i];
1064
1065                 if (likely(num_release == 0 ||
1066                            event->op != RTE_EVENT_OP_RELEASE))
1067                         dsw_port_buffer_event(dsw, source_port, event);
1068         }
1069
1070         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1071                         "accepted.\n", num_non_release);
1072
1073         return num_non_release;
1074 }
1075
1076 uint16_t
1077 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1078                         uint16_t events_len)
1079 {
1080         return dsw_event_enqueue_burst_generic(port, events, events_len, false,
1081                                                0, 0, 0);
1082 }
1083
1084 uint16_t
1085 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1086                             uint16_t events_len)
1087 {
1088         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1089                                                events_len, 0, events_len);
1090 }
1091
1092 uint16_t
1093 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1094                                 uint16_t events_len)
1095 {
1096         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
1097                                                0, 0, events_len);
1098 }
1099
1100 uint16_t
1101 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1102 {
1103         return dsw_event_dequeue_burst(port, events, 1, wait);
1104 }
1105
1106 static void
1107 dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1108                             uint16_t num)
1109 {
1110         uint16_t i;
1111
1112         for (i = 0; i < num; i++) {
1113                 uint16_t l_idx = port->seen_events_idx;
1114                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1115                 struct rte_event *event = &events[i];
1116                 qf->queue_id = event->queue_id;
1117                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1118
1119                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1120         }
1121
1122         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1123                 port->seen_events_len =
1124                         RTE_MIN(port->seen_events_len + num,
1125                                 DSW_MAX_EVENTS_RECORDED);
1126 }
1127
1128 #ifdef DSW_SORT_DEQUEUED
1129
1130 #define DSW_EVENT_TO_INT(_event)                                \
1131         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1132
1133 static inline int
1134 dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1135 {
1136         const struct rte_event *event_a = v_event_a;
1137         const struct rte_event *event_b = v_event_b;
1138
1139         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1140 }
1141 #endif
1142
1143 static uint16_t
1144 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1145                        uint16_t num)
1146 {
1147         struct dsw_port *source_port = port;
1148         struct dsw_evdev *dsw = source_port->dsw;
1149
1150         dsw_port_ctl_process(dsw, source_port);
1151
1152         if (unlikely(port->in_buffer_len > 0)) {
1153                 uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1154
1155                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1156                            dequeued * sizeof(struct rte_event));
1157
1158                 port->in_buffer_start += dequeued;
1159                 port->in_buffer_len -= dequeued;
1160
1161                 if (port->in_buffer_len == 0)
1162                         port->in_buffer_start = 0;
1163
1164                 return dequeued;
1165         }
1166
1167         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
1168 }
1169
1170 uint16_t
1171 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1172                         uint64_t wait __rte_unused)
1173 {
1174         struct dsw_port *source_port = port;
1175         struct dsw_evdev *dsw = source_port->dsw;
1176         uint16_t dequeued;
1177
1178         source_port->pending_releases = 0;
1179
1180         dsw_port_bg_process(dsw, source_port);
1181
1182         if (unlikely(num > source_port->dequeue_depth))
1183                 num = source_port->dequeue_depth;
1184
1185         dequeued = dsw_port_dequeue_burst(source_port, events, num);
1186
1187         source_port->pending_releases = dequeued;
1188
1189         dsw_port_load_record(source_port, dequeued);
1190
1191         dsw_port_note_op(source_port, dequeued);
1192
1193         if (dequeued > 0) {
1194                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1195                                 dequeued);
1196
1197                 dsw_port_return_credits(dsw, source_port, dequeued);
1198
1199                 /* One potential optimization one might think of is to
1200                  * add a migration state (prior to 'pausing'), and
1201                  * only record seen events when the port is in this
1202                  * state (and transit to 'pausing' when enough events
1203                  * have been gathered). However, that schema doesn't
1204                  * seem to improve performance.
1205                  */
1206                 dsw_port_record_seen_events(port, events, dequeued);
1207         }
1208         /* XXX: Assuming the port can't produce any more work,
1209          *      consider flushing the output buffer, on dequeued ==
1210          *      0.
1211          */
1212
1213 #ifdef DSW_SORT_DEQUEUED
1214         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1215 #endif
1216
1217         return dequeued;
1218 }