event/dsw: add event scheduling and device start/stop
[dpdk.git] / drivers / event / dsw / dsw_evdev.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include <stdbool.h>
6
7 #include <rte_eventdev_pmd.h>
8 #include <rte_eventdev_pmd_vdev.h>
9 #include <rte_random.h>
10
11 #include "dsw_evdev.h"
12
13 #define EVENTDEV_NAME_DSW_PMD event_dsw
14
15 static int
16 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
17                const struct rte_event_port_conf *conf)
18 {
19         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
20         struct dsw_port *port;
21         struct rte_event_ring *in_ring;
22         char ring_name[RTE_RING_NAMESIZE];
23
24         port = &dsw->ports[port_id];
25
26         *port = (struct dsw_port) {
27                 .id = port_id,
28                 .dsw = dsw,
29                 .dequeue_depth = conf->dequeue_depth,
30                 .enqueue_depth = conf->enqueue_depth,
31                 .new_event_threshold = conf->new_event_threshold
32         };
33
34         snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
35                  port_id);
36
37         in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
38                                         dev->data->socket_id,
39                                         RING_F_SC_DEQ|RING_F_EXACT_SZ);
40
41         if (in_ring == NULL)
42                 return -ENOMEM;
43
44         port->in_ring = in_ring;
45
46         dev->data->ports[port_id] = port;
47
48         return 0;
49 }
50
51 static void
52 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
53                   uint8_t port_id __rte_unused,
54                   struct rte_event_port_conf *port_conf)
55 {
56         *port_conf = (struct rte_event_port_conf) {
57                 .new_event_threshold = 1024,
58                 .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
59                 .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
60         };
61 }
62
63 static void
64 dsw_port_release(void *p)
65 {
66         struct dsw_port *port = p;
67
68         rte_event_ring_free(port->in_ring);
69 }
70
71 static int
72 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
73                 const struct rte_event_queue_conf *conf)
74 {
75         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
76         struct dsw_queue *queue = &dsw->queues[queue_id];
77
78         if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
79                 return -ENOTSUP;
80
81         if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
82                 return -ENOTSUP;
83
84         /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
85          * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
86          * the queue will only have a single serving port, no
87          * migration will ever happen, so the extra TYPE_ATOMIC
88          * migration overhead is avoided.
89          */
90         if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
91                 queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
92         else /* atomic or parallel */
93                 queue->schedule_type = conf->schedule_type;
94
95         queue->num_serving_ports = 0;
96
97         return 0;
98 }
99
100 static void
101 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
102                    uint8_t queue_id __rte_unused,
103                    struct rte_event_queue_conf *queue_conf)
104 {
105         *queue_conf = (struct rte_event_queue_conf) {
106                 .nb_atomic_flows = 4096,
107                 .schedule_type = RTE_SCHED_TYPE_ATOMIC,
108                 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL
109         };
110 }
111
112 static void
113 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
114                   uint8_t queue_id __rte_unused)
115 {
116 }
117
118 static void
119 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
120 {
121         queue->serving_ports[queue->num_serving_ports] = port_id;
122         queue->num_serving_ports++;
123 }
124
125 static bool
126 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
127 {
128         uint16_t i;
129
130         for (i = 0; i < queue->num_serving_ports; i++)
131                 if (queue->serving_ports[i] == port_id) {
132                         uint16_t last_idx = queue->num_serving_ports - 1;
133                         if (i != last_idx)
134                                 queue->serving_ports[i] =
135                                         queue->serving_ports[last_idx];
136                         queue->num_serving_ports--;
137                         return true;
138                 }
139         return false;
140 }
141
142 static int
143 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
144                      const uint8_t queues[], uint16_t num, bool link)
145 {
146         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
147         struct dsw_port *p = port;
148         uint16_t i;
149         uint16_t count = 0;
150
151         for (i = 0; i < num; i++) {
152                 uint8_t qid = queues[i];
153                 struct dsw_queue *q = &dsw->queues[qid];
154                 if (link) {
155                         queue_add_port(q, p->id);
156                         count++;
157                 } else {
158                         bool removed = queue_remove_port(q, p->id);
159                         if (removed)
160                                 count++;
161                 }
162         }
163
164         return count;
165 }
166
167 static int
168 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
169               const uint8_t priorities[] __rte_unused, uint16_t num)
170 {
171         return dsw_port_link_unlink(dev, port, queues, num, true);
172 }
173
174 static int
175 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
176                 uint16_t num)
177 {
178         return dsw_port_link_unlink(dev, port, queues, num, false);
179 }
180
181 static void
182 dsw_info_get(struct rte_eventdev *dev __rte_unused,
183              struct rte_event_dev_info *info)
184 {
185         *info = (struct rte_event_dev_info) {
186                 .driver_name = DSW_PMD_NAME,
187                 .max_event_queues = DSW_MAX_QUEUES,
188                 .max_event_queue_flows = DSW_MAX_FLOWS,
189                 .max_event_queue_priority_levels = 1,
190                 .max_event_priority_levels = 1,
191                 .max_event_ports = DSW_MAX_PORTS,
192                 .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
193                 .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
194                 .max_num_events = DSW_MAX_EVENTS,
195                 .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
196                 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
197         };
198 }
199
200 static int
201 dsw_configure(const struct rte_eventdev *dev)
202 {
203         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
204         const struct rte_event_dev_config *conf = &dev->data->dev_conf;
205         int32_t min_max_in_flight;
206
207         dsw->num_ports = conf->nb_event_ports;
208         dsw->num_queues = conf->nb_event_queues;
209
210         /* Avoid a situation where consumer ports are holding all the
211          * credits, without making use of them.
212          */
213         min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
214
215         dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
216
217         return 0;
218 }
219
220
221 static void
222 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
223 {
224         uint8_t queue_id;
225         for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
226                 struct dsw_queue *queue = &dsw->queues[queue_id];
227                 uint16_t flow_hash;
228                 for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
229                         uint8_t port_idx =
230                                 rte_rand() % queue->num_serving_ports;
231                         uint8_t port_id =
232                                 queue->serving_ports[port_idx];
233                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
234                                 port_id;
235                 }
236         }
237 }
238
239 static int
240 dsw_start(struct rte_eventdev *dev)
241 {
242         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
243
244         rte_atomic32_init(&dsw->credits_on_loan);
245
246         initial_flow_to_port_assignment(dsw);
247
248         return 0;
249 }
250
251 static void
252 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
253                    eventdev_stop_flush_t flush, void *flush_arg)
254 {
255         uint16_t i;
256
257         for (i = 0; i < buf_len; i++)
258                 flush(dev_id, buf[i], flush_arg);
259 }
260
261 static void
262 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
263                    eventdev_stop_flush_t flush, void *flush_arg)
264 {
265         uint16_t dport_id;
266
267         for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
268                 if (dport_id != port->id)
269                         dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
270                                            port->out_buffer_len[dport_id],
271                                            flush, flush_arg);
272 }
273
274 static void
275 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
276                        eventdev_stop_flush_t flush, void *flush_arg)
277 {
278         struct rte_event ev;
279
280         while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
281                 flush(dev_id, ev, flush_arg);
282 }
283
284 static void
285 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
286           eventdev_stop_flush_t flush, void *flush_arg)
287 {
288         uint16_t port_id;
289
290         if (flush == NULL)
291                 return;
292
293         for (port_id = 0; port_id < dsw->num_ports; port_id++) {
294                 struct dsw_port *port = &dsw->ports[port_id];
295
296                 dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
297                 dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
298         }
299 }
300
301 static void
302 dsw_stop(struct rte_eventdev *dev)
303 {
304         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
305         uint8_t dev_id;
306         eventdev_stop_flush_t flush;
307         void *flush_arg;
308
309         dev_id = dev->data->dev_id;
310         flush = dev->dev_ops->dev_stop_flush;
311         flush_arg = dev->data->dev_stop_flush_arg;
312
313         dsw_drain(dev_id, dsw, flush, flush_arg);
314 }
315
316 static int
317 dsw_close(struct rte_eventdev *dev)
318 {
319         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
320
321         dsw->num_ports = 0;
322         dsw->num_queues = 0;
323
324         return 0;
325 }
326
327 static struct rte_eventdev_ops dsw_evdev_ops = {
328         .port_setup = dsw_port_setup,
329         .port_def_conf = dsw_port_def_conf,
330         .port_release = dsw_port_release,
331         .queue_setup = dsw_queue_setup,
332         .queue_def_conf = dsw_queue_def_conf,
333         .queue_release = dsw_queue_release,
334         .port_link = dsw_port_link,
335         .port_unlink = dsw_port_unlink,
336         .dev_infos_get = dsw_info_get,
337         .dev_configure = dsw_configure,
338         .dev_start = dsw_start,
339         .dev_stop = dsw_stop,
340         .dev_close = dsw_close
341 };
342
343 static int
344 dsw_probe(struct rte_vdev_device *vdev)
345 {
346         const char *name;
347         struct rte_eventdev *dev;
348         struct dsw_evdev *dsw;
349
350         name = rte_vdev_device_name(vdev);
351
352         dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
353                                       rte_socket_id());
354         if (dev == NULL)
355                 return -EFAULT;
356
357         dev->dev_ops = &dsw_evdev_ops;
358         dev->enqueue = dsw_event_enqueue;
359         dev->enqueue_burst = dsw_event_enqueue_burst;
360         dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
361         dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
362         dev->dequeue = dsw_event_dequeue;
363         dev->dequeue_burst = dsw_event_dequeue_burst;
364
365         if (rte_eal_process_type() != RTE_PROC_PRIMARY)
366                 return 0;
367
368         dsw = dev->data->dev_private;
369         dsw->data = dev->data;
370
371         return 0;
372 }
373
374 static int
375 dsw_remove(struct rte_vdev_device *vdev)
376 {
377         const char *name;
378
379         name = rte_vdev_device_name(vdev);
380         if (name == NULL)
381                 return -EINVAL;
382
383         return rte_event_pmd_vdev_uninit(name);
384 }
385
386 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
387         .probe = dsw_probe,
388         .remove = dsw_remove
389 };
390
391 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);