event/dsw: add event scheduling and device start/stop
[dpdk.git] / drivers / event / dsw / dsw_event.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include "dsw_evdev.h"
6
7 #include <stdbool.h>
8
9 #include <rte_atomic.h>
10 #include <rte_random.h>
11
12 static bool
13 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
14                          int32_t credits)
15 {
16         int32_t inflight_credits = port->inflight_credits;
17         int32_t missing_credits = credits - inflight_credits;
18         int32_t total_on_loan;
19         int32_t available;
20         int32_t acquired_credits;
21         int32_t new_total_on_loan;
22
23         if (likely(missing_credits <= 0)) {
24                 port->inflight_credits -= credits;
25                 return true;
26         }
27
28         total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
29         available = dsw->max_inflight - total_on_loan;
30         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
31
32         if (available < acquired_credits)
33                 return false;
34
35         /* This is a race, no locks are involved, and thus some other
36          * thread can allocate tokens in between the check and the
37          * allocation.
38          */
39         new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
40                                                     acquired_credits);
41
42         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
43                 /* Some other port took the last credits */
44                 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
45                 return false;
46         }
47
48         DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
49                         acquired_credits);
50
51         port->inflight_credits += acquired_credits;
52         port->inflight_credits -= credits;
53
54         return true;
55 }
56
57 static void
58 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
59                         int32_t credits)
60 {
61         port->inflight_credits += credits;
62
63         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
64                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
65                 int32_t return_credits =
66                         port->inflight_credits - leave_credits;
67
68                 port->inflight_credits = leave_credits;
69
70                 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
71
72                 DSW_LOG_DP_PORT(DEBUG, port->id,
73                                 "Returned %d tokens to pool.\n",
74                                 return_credits);
75         }
76 }
77
78 static uint8_t
79 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
80 {
81         struct dsw_queue *queue = &dsw->queues[queue_id];
82         uint8_t port_id;
83
84         if (queue->num_serving_ports > 1)
85                 port_id = queue->flow_to_port_map[flow_hash];
86         else
87                 /* A single-link queue, or atomic/ordered/parallel but
88                  * with just a single serving port.
89                  */
90                 port_id = queue->serving_ports[0];
91
92         DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
93                    "to port %d.\n", queue_id, flow_hash, port_id);
94
95         return port_id;
96 }
97
98 static void
99 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
100                            uint8_t dest_port_id)
101 {
102         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
103         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
104         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
105         uint16_t enqueued = 0;
106
107         if (*buffer_len == 0)
108                 return;
109
110         /* The rings are dimensioned to fit all in-flight events (even
111          * on a single ring), so looping will work.
112          */
113         do {
114                 enqueued +=
115                         rte_event_ring_enqueue_burst(dest_port->in_ring,
116                                                      buffer+enqueued,
117                                                      *buffer_len-enqueued,
118                                                      NULL);
119         } while (unlikely(enqueued != *buffer_len));
120
121         (*buffer_len) = 0;
122 }
123
124 static uint16_t
125 dsw_port_get_parallel_flow_id(struct dsw_port *port)
126 {
127         uint16_t flow_id = port->next_parallel_flow_id;
128
129         port->next_parallel_flow_id =
130                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
131
132         return flow_id;
133 }
134
135 static void
136 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
137                            uint8_t dest_port_id, const struct rte_event *event)
138 {
139         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
140         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
141
142         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
143                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
144
145         buffer[*buffer_len] = *event;
146
147         (*buffer_len)++;
148 }
149
150 #define DSW_FLOW_ID_BITS (24)
151 static uint16_t
152 dsw_flow_id_hash(uint32_t flow_id)
153 {
154         uint16_t hash = 0;
155         uint16_t offset = 0;
156
157         do {
158                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
159                 offset += DSW_MAX_FLOWS_BITS;
160         } while (offset < DSW_FLOW_ID_BITS);
161
162         return hash;
163 }
164
165 static void
166 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
167                          struct rte_event event)
168 {
169         uint8_t dest_port_id;
170
171         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
172
173         dest_port_id = dsw_schedule(dsw, event.queue_id,
174                                     dsw_flow_id_hash(event.flow_id));
175
176         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
177 }
178
179 static void
180 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
181                       const struct rte_event *event)
182 {
183         uint16_t flow_hash;
184         uint8_t dest_port_id;
185
186         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
187                      RTE_SCHED_TYPE_PARALLEL)) {
188                 dsw_port_buffer_parallel(dsw, source_port, *event);
189                 return;
190         }
191
192         flow_hash = dsw_flow_id_hash(event->flow_id);
193
194         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
195
196         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
197 }
198
199 static void
200 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
201 {
202         uint16_t dest_port_id;
203
204         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
205                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
206 }
207
208 uint16_t
209 dsw_event_enqueue(void *port, const struct rte_event *ev)
210 {
211         return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
212 }
213
214 static __rte_always_inline uint16_t
215 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
216                                 uint16_t events_len, bool op_types_known,
217                                 uint16_t num_new, uint16_t num_release,
218                                 uint16_t num_non_release)
219 {
220         struct dsw_port *source_port = port;
221         struct dsw_evdev *dsw = source_port->dsw;
222         bool enough_credits;
223         uint16_t i;
224
225         DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
226                         "events to port %d.\n", events_len, source_port->id);
227
228         /* XXX: For performance (=ring efficiency) reasons, the
229          * scheduler relies on internal non-ring buffers instead of
230          * immediately sending the event to the destination ring. For
231          * a producer that doesn't intend to produce or consume any
232          * more events, the scheduler provides a way to flush the
233          * buffer, by means of doing an enqueue of zero events. In
234          * addition, a port cannot be left "unattended" (e.g. unused)
235          * for long periods of time, since that would stall
236          * migration. Eventdev API extensions to provide a cleaner way
237          * to archieve both of these functions should be
238          * considered.
239          */
240         if (unlikely(events_len == 0)) {
241                 dsw_port_flush_out_buffers(dsw, source_port);
242                 return 0;
243         }
244
245         if (unlikely(events_len > source_port->enqueue_depth))
246                 events_len = source_port->enqueue_depth;
247
248         if (!op_types_known)
249                 for (i = 0; i < events_len; i++) {
250                         switch (events[i].op) {
251                         case RTE_EVENT_OP_RELEASE:
252                                 num_release++;
253                                 break;
254                         case RTE_EVENT_OP_NEW:
255                                 num_new++;
256                                 /* Falls through. */
257                         default:
258                                 num_non_release++;
259                                 break;
260                         }
261                 }
262
263         /* Technically, we could allow the non-new events up to the
264          * first new event in the array into the system, but for
265          * simplicity reasons, we deny the whole burst if the port is
266          * above the water mark.
267          */
268         if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
269                      source_port->new_event_threshold))
270                 return 0;
271
272         enough_credits = dsw_port_acquire_credits(dsw, source_port,
273                                                   num_non_release);
274         if (unlikely(!enough_credits))
275                 return 0;
276
277         source_port->pending_releases -= num_release;
278
279         for (i = 0; i < events_len; i++) {
280                 const struct rte_event *event = &events[i];
281
282                 if (likely(num_release == 0 ||
283                            event->op != RTE_EVENT_OP_RELEASE))
284                         dsw_port_buffer_event(dsw, source_port, event);
285         }
286
287         DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
288                         "accepted.\n", num_non_release);
289
290         return num_non_release;
291 }
292
293 uint16_t
294 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
295                         uint16_t events_len)
296 {
297         return dsw_event_enqueue_burst_generic(port, events, events_len, false,
298                                                0, 0, 0);
299 }
300
301 uint16_t
302 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
303                             uint16_t events_len)
304 {
305         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
306                                                events_len, 0, events_len);
307 }
308
309 uint16_t
310 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
311                                 uint16_t events_len)
312 {
313         return dsw_event_enqueue_burst_generic(port, events, events_len, true,
314                                                0, 0, events_len);
315 }
316
317 uint16_t
318 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
319 {
320         return dsw_event_dequeue_burst(port, events, 1, wait);
321 }
322
323 static uint16_t
324 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
325                        uint16_t num)
326 {
327         return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
328 }
329
330 uint16_t
331 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
332                         uint64_t wait __rte_unused)
333 {
334         struct dsw_port *source_port = port;
335         struct dsw_evdev *dsw = source_port->dsw;
336         uint16_t dequeued;
337
338         source_port->pending_releases = 0;
339
340         if (unlikely(num > source_port->dequeue_depth))
341                 num = source_port->dequeue_depth;
342
343         dequeued = dsw_port_dequeue_burst(source_port, events, num);
344
345         source_port->pending_releases = dequeued;
346
347         if (dequeued > 0) {
348                 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
349                                 dequeued);
350
351                 dsw_port_return_credits(dsw, source_port, dequeued);
352         }
353         /* XXX: Assuming the port can't produce any more work,
354          *      consider flushing the output buffer, on dequeued ==
355          *      0.
356          */
357
358         return dequeued;
359 }