f326147c986ce26a1e14ec7991cf071bfcd4ff8e
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #include <stdbool.h>
8
9 #include <rte_atomic.h>
10 #include <rte_cycles.h>
11 #include <rte_random.h>
12
13 static bool
14 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
15                          int32_t credits)
16 {
17         int32_t inflight_credits = port->inflight_credits;
18         int32_t missing_credits = credits - inflight_credits;
19         int32_t total_on_loan;
20         int32_t available;
21         int32_t acquired_credits;
22         int32_t new_total_on_loan;
23
24         if (likely(missing_credits <= 0)) {
25                 port->inflight_credits -= credits;
26                 return true;
27         }
28
29         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
30         available = dsw->max_inflight - total_on_loan;
31         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
32
33         if (available < acquired_credits)
34                 return false;
35
36         /* This is a race, no locks are involved, and thus some other
37          * thread can allocate tokens in between the check and the
38          * allocation.
39          */
40         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
41                                                     acquired_credits);
42
43         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
44                 /* Some other port took the last credits */
45                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
46                 return false;
47         }
48
49         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
50                         acquired_credits);
51
52         port->inflight_credits += acquired_credits;
53         port->inflight_credits -= credits;
54
55         return true;
56 }
57
58 static void
59 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
60                         int32_t credits)
61 {
62         port->inflight_credits += credits;
63
64         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
65                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
66                 int32_t return_credits =
67                         port->inflight_credits - leave_credits;
68
69                 port->inflight_credits = leave_credits;
70
71                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
72
73                 DSW_LOG_DP_PORT(DEBUG, port->id,
74                                 "Returned %d tokens to pool.\n",
75                                 return_credits);
76         }
77 }
78
79 static void
80 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
81 {
82         if (dequeued > 0 && port->busy_start == 0)
83                 /* work period begins */
84                 port->busy_start = rte_get_timer_cycles();
85         else if (dequeued == 0 && port->busy_start > 0) {
86                 /* work period ends */
87                 uint64_t work_period =
88                         rte_get_timer_cycles() - port->busy_start;
89                 port->busy_cycles += work_period;
90                 port->busy_start = 0;
91         }
92 }
93
94 static int16_t
95 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
96 {
97         uint64_t passed = now - port->measurement_start;
98         uint64_t busy_cycles = port->busy_cycles;
99
100         if (port->busy_start > 0) {
101                 busy_cycles += (now - port->busy_start);
102                 port->busy_start = now;
103         }
104
105         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
106
107         port->measurement_start = now;
108         port->busy_cycles = 0;
109
110         port->total_busy_cycles += busy_cycles;
111
112         return load;
113 }
114
115 static void
116 dsw_port_load_update(struct dsw_port *port, uint64_t now)
117 {
118         int16_t old_load;
119         int16_t period_load;
120         int16_t new_load;
121
122         old_load = rte_atomic16_read(&port->load);
123
124         period_load = dsw_port_load_close_period(port, now);
125
126         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
127                 (DSW_OLD_LOAD_WEIGHT+1);
128
129         rte_atomic16_set(&port->load, new_load);
130 }
131
132 static void
133 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
134 {
135         if (now < port->next_load_update)
136                 return;
137
138         port->next_load_update = now + port->load_update_interval;
139
140         dsw_port_load_update(port, now);
141 }
142
143 static uint8_t
144 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
145 {
146         struct dsw_queue *queue = &dsw->queues[queue_id];
147         uint8_t port_id;
148
149         if (queue->num_serving_ports > 1)
150                 port_id = queue->flow_to_port_map[flow_hash];
151         else
152                 /* A single-link queue, or atomic/ordered/parallel but
153                  * with just a single serving port.
154                  */
155                 port_id = queue->serving_ports[0];
156
157         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
158                    "to port %d.\n", queue_id, flow_hash, port_id);
159
160         return port_id;
161 }
162
163 static void
164 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
165                            uint8_t dest_port_id)
166 {
167         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
168         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
169         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
170         uint16_t enqueued = 0;
171
172         if (*buffer_len == 0)
173                 return;
174
175         /* The rings are dimensioned to fit all in-flight events (even
176          * on a single ring), so looping will work.
177          */
178         do {
179                 enqueued +=
180                         rte_event_ring_enqueue_burst(dest_port->in_ring,
181                                                      buffer+enqueued,
182                                                      *buffer_len-enqueued,
183                                                      NULL);
184         } while (unlikely(enqueued != *buffer_len));
185
186         (*buffer_len) = 0;
187 }
188
189 static uint16_t
190 dsw_port_get_parallel_flow_id(struct dsw_port *port)
191 {
192         uint16_t flow_id = port->next_parallel_flow_id;
193
194         port->next_parallel_flow_id =
195                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
196
197         return flow_id;
198 }
199
200 static void
201 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
202                            uint8_t dest_port_id, const struct rte_event *event)
203 {
204         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
205         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
206
207         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
208                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
209
210         buffer[*buffer_len] = *event;
211
212         (*buffer_len)++;
213 }
214
215 #define DSW_FLOW_ID_BITS (24)
216 static uint16_t
217 dsw_flow_id_hash(uint32_t flow_id)
218 {
219         uint16_t hash = 0;
220         uint16_t offset = 0;
221
222         do {
223                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
224                 offset += DSW_MAX_FLOWS_BITS;
225         } while (offset < DSW_FLOW_ID_BITS);
226
227         return hash;
228 }
229
230 static void
231 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
232                          struct rte_event event)
233 {
234         uint8_t dest_port_id;
235
236         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
237
238         dest_port_id = dsw_schedule(dsw, event.queue_id,
239                                     dsw_flow_id_hash(event.flow_id));
240
241         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
242 }
243
244 static void
245 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
246                       const struct rte_event *event)
247 {
248         uint16_t flow_hash;
249         uint8_t dest_port_id;
250
251         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
252                      RTE_SCHED_TYPE_PARALLEL)) {
253                 dsw_port_buffer_parallel(dsw, source_port, *event);
254                 return;
255         }
256
257         flow_hash = dsw_flow_id_hash(event->flow_id);
258
259         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
260
261         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
262 }
263
264 static void
265 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
266 {
267         /* To pull the control ring reasonbly often on busy ports,
268          * each dequeued/enqueued event is considered an 'op' too.
269          */
270         port->ops_since_bg_task += (num_events+1);
271 }
272
273 static void
274 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
275
276 static void
277 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
278 {
279         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
280                 uint64_t now;
281
282                 now = rte_get_timer_cycles();
283
284                 port->last_bg = now;
285
286                 /* Logic to avoid having events linger in the output
287                  * buffer too long.
288                  */
289                 dsw_port_flush_out_buffers(dsw, port);
290
291                 dsw_port_consider_load_update(port, now);
292
293                 port->ops_since_bg_task = 0;
294         }
295 }
296
297 static void
298 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
299 {
300         uint16_t dest_port_id;
301
302         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
303                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
304 }
305
306 uint16_t
307 dsw_event_enqueue(void *port, const struct rte_event *ev)
308 {
309         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
310 }
311
312 static __rte_always_inline uint16_t
313 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
314                                 uint16_t events_len, bool op_types_known,
315                                 uint16_t num_new, uint16_t num_release,
316                                 uint16_t num_non_release)
317 {
318         struct dsw_port *source_port = port;
319         struct dsw_evdev *dsw = source_port->dsw;
320         bool enough_credits;
321         uint16_t i;
322
323         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
324                         "events to port %d.\n", events_len, source_port->id);
325
326         dsw_port_bg_process(dsw, source_port);
327
328         /* XXX: For performance (=ring efficiency) reasons, the
329          * scheduler relies on internal non-ring buffers instead of
330          * immediately sending the event to the destination ring. For
331          * a producer that doesn't intend to produce or consume any
332          * more events, the scheduler provides a way to flush the
333          * buffer, by means of doing an enqueue of zero events. In
334          * addition, a port cannot be left "unattended" (e.g. unused)
335          * for long periods of time, since that would stall
336          * migration. Eventdev API extensions to provide a cleaner way
337          * to archieve both of these functions should be
338          * considered.
339          */
340         if (unlikely(events_len == 0)) {
341                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
342                 dsw_port_flush_out_buffers(dsw, source_port);
343                 return 0;
344         }
345
346         if (unlikely(events_len > source_port->enqueue_depth))
347                 events_len = source_port->enqueue_depth;
348
349         dsw_port_note_op(source_port, events_len);
350
351         if (!op_types_known)
352                 for (i = 0; i < events_len; i++) {
353                         switch (events[i].op) {
354                         case RTE_EVENT_OP_RELEASE:
355                                 num_release++;
356                                 break;
357                         case RTE_EVENT_OP_NEW:
358                                 num_new++;
359                                 /* Falls through. */
360                         default:
361                                 num_non_release++;
362                                 break;
363                         }
364                 }
365
366         /* Technically, we could allow the non-new events up to the
367          * first new event in the array into the system, but for
368          * simplicity reasons, we deny the whole burst if the port is
369          * above the water mark.
370          */
371         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
372                      source_port->new_event_threshold))
373                 return 0;
374
375         enough_credits = dsw_port_acquire_credits(dsw, source_port,
376                                                   num_non_release);
377         if (unlikely(!enough_credits))
378                 return 0;
379
380         source_port->pending_releases -= num_release;
381
382         for (i = 0; i < events_len; i++) {
383                 const struct rte_event *event = &events[i];
384
385                 if (likely(num_release == 0 ||
386                            event->op != RTE_EVENT_OP_RELEASE))
387                         dsw_port_buffer_event(dsw, source_port, event);
388         }
389
390         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
391                         "accepted.\n", num_non_release);
392
393         return num_non_release;
394 }
395
396 uint16_t
397 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
398                         uint16_t events_len)
399 {
400         return dsw_event_enqueue_burst_generic(port, events, events_len, false,
401                                                0, 0, 0);
402 }
403
404 uint16_t
405 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
406                             uint16_t events_len)
407 {
408         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
409                                                events_len, 0, events_len);
410 }
411
412 uint16_t
413 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
414                                 uint16_t events_len)
415 {
416         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
417                                                0, 0, events_len);
418 }
419
420 uint16_t
421 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
422 {
423         return dsw_event_dequeue_burst(port, events, 1, wait);
424 }
425
426 static uint16_t
427 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
428                        uint16_t num)
429 {
430         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
431 }
432
433 uint16_t
434 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
435                         uint64_t wait __rte_unused)
436 {
437         struct dsw_port *source_port = port;
438         struct dsw_evdev *dsw = source_port->dsw;
439         uint16_t dequeued;
440
441         source_port->pending_releases = 0;
442
443         dsw_port_bg_process(dsw, source_port);
444
445         if (unlikely(num > source_port->dequeue_depth))
446                 num = source_port->dequeue_depth;
447
448         dequeued = dsw_port_dequeue_burst(source_port, events, num);
449
450         source_port->pending_releases = dequeued;
451
452         dsw_port_load_record(source_port, dequeued);
453
454         dsw_port_note_op(source_port, dequeued);
455
456         if (dequeued > 0) {
457                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
458                                 dequeued);
459
460                 dsw_port_return_credits(dsw, source_port, dequeued);
461         }
462         /* XXX: Assuming the port can't produce any more work,
463          *      consider flushing the output buffer, on dequeued ==
464          *      0.
465          */
466
467         return dequeued;
468 }