bcfa17bab2c6f8c467194076e7800f349897604b
[dpdk.git] / drivers / event / dsw / dsw_evdev.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include <stdbool.h>
6
7 #include <rte_cycles.h>
8 #include <rte_eventdev_pmd.h>
9 #include <rte_eventdev_pmd_vdev.h>
10 #include <rte_random.h>
11
12 #include "dsw_evdev.h"
13
14 #define EVENTDEV_NAME_DSW_PMD event_dsw
15
16 static int
17 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
18                const struct rte_event_port_conf *conf)
19 {
20         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
21         struct dsw_port *port;
22         struct rte_event_ring *in_ring;
23         char ring_name[RTE_RING_NAMESIZE];
24
25         port = &dsw->ports[port_id];
26
27         *port = (struct dsw_port) {
28                 .id = port_id,
29                 .dsw = dsw,
30                 .dequeue_depth = conf->dequeue_depth,
31                 .enqueue_depth = conf->enqueue_depth,
32                 .new_event_threshold = conf->new_event_threshold
33         };
34
35         snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
36                  port_id);
37
38         in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
39                                         dev->data->socket_id,
40                                         RING_F_SC_DEQ|RING_F_EXACT_SZ);
41
42         if (in_ring == NULL)
43                 return -ENOMEM;
44
45         port->in_ring = in_ring;
46
47         rte_atomic16_init(&port->load);
48
49         port->load_update_interval =
50                 (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
51
52         dev->data->ports[port_id] = port;
53
54         return 0;
55 }
56
57 static void
58 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
59                   uint8_t port_id __rte_unused,
60                   struct rte_event_port_conf *port_conf)
61 {
62         *port_conf = (struct rte_event_port_conf) {
63                 .new_event_threshold = 1024,
64                 .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
65                 .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
66         };
67 }
68
69 static void
70 dsw_port_release(void *p)
71 {
72         struct dsw_port *port = p;
73
74         rte_event_ring_free(port->in_ring);
75 }
76
77 static int
78 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
79                 const struct rte_event_queue_conf *conf)
80 {
81         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
82         struct dsw_queue *queue = &dsw->queues[queue_id];
83
84         if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
85                 return -ENOTSUP;
86
87         if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
88                 return -ENOTSUP;
89
90         /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
91          * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
92          * the queue will only have a single serving port, no
93          * migration will ever happen, so the extra TYPE_ATOMIC
94          * migration overhead is avoided.
95          */
96         if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
97                 queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
98         else /* atomic or parallel */
99                 queue->schedule_type = conf->schedule_type;
100
101         queue->num_serving_ports = 0;
102
103         return 0;
104 }
105
106 static void
107 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
108                    uint8_t queue_id __rte_unused,
109                    struct rte_event_queue_conf *queue_conf)
110 {
111         *queue_conf = (struct rte_event_queue_conf) {
112                 .nb_atomic_flows = 4096,
113                 .schedule_type = RTE_SCHED_TYPE_ATOMIC,
114                 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL
115         };
116 }
117
118 static void
119 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
120                   uint8_t queue_id __rte_unused)
121 {
122 }
123
124 static void
125 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
126 {
127         queue->serving_ports[queue->num_serving_ports] = port_id;
128         queue->num_serving_ports++;
129 }
130
131 static bool
132 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
133 {
134         uint16_t i;
135
136         for (i = 0; i < queue->num_serving_ports; i++)
137                 if (queue->serving_ports[i] == port_id) {
138                         uint16_t last_idx = queue->num_serving_ports - 1;
139                         if (i != last_idx)
140                                 queue->serving_ports[i] =
141                                         queue->serving_ports[last_idx];
142                         queue->num_serving_ports--;
143                         return true;
144                 }
145         return false;
146 }
147
148 static int
149 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
150                      const uint8_t queues[], uint16_t num, bool link)
151 {
152         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
153         struct dsw_port *p = port;
154         uint16_t i;
155         uint16_t count = 0;
156
157         for (i = 0; i < num; i++) {
158                 uint8_t qid = queues[i];
159                 struct dsw_queue *q = &dsw->queues[qid];
160                 if (link) {
161                         queue_add_port(q, p->id);
162                         count++;
163                 } else {
164                         bool removed = queue_remove_port(q, p->id);
165                         if (removed)
166                                 count++;
167                 }
168         }
169
170         return count;
171 }
172
173 static int
174 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
175               const uint8_t priorities[] __rte_unused, uint16_t num)
176 {
177         return dsw_port_link_unlink(dev, port, queues, num, true);
178 }
179
180 static int
181 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
182                 uint16_t num)
183 {
184         return dsw_port_link_unlink(dev, port, queues, num, false);
185 }
186
187 static void
188 dsw_info_get(struct rte_eventdev *dev __rte_unused,
189              struct rte_event_dev_info *info)
190 {
191         *info = (struct rte_event_dev_info) {
192                 .driver_name = DSW_PMD_NAME,
193                 .max_event_queues = DSW_MAX_QUEUES,
194                 .max_event_queue_flows = DSW_MAX_FLOWS,
195                 .max_event_queue_priority_levels = 1,
196                 .max_event_priority_levels = 1,
197                 .max_event_ports = DSW_MAX_PORTS,
198                 .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
199                 .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
200                 .max_num_events = DSW_MAX_EVENTS,
201                 .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
202                 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
203         };
204 }
205
206 static int
207 dsw_configure(const struct rte_eventdev *dev)
208 {
209         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
210         const struct rte_event_dev_config *conf = &dev->data->dev_conf;
211         int32_t min_max_in_flight;
212
213         dsw->num_ports = conf->nb_event_ports;
214         dsw->num_queues = conf->nb_event_queues;
215
216         /* Avoid a situation where consumer ports are holding all the
217          * credits, without making use of them.
218          */
219         min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
220
221         dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
222
223         return 0;
224 }
225
226
227 static void
228 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
229 {
230         uint8_t queue_id;
231         for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
232                 struct dsw_queue *queue = &dsw->queues[queue_id];
233                 uint16_t flow_hash;
234                 for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
235                         uint8_t port_idx =
236                                 rte_rand() % queue->num_serving_ports;
237                         uint8_t port_id =
238                                 queue->serving_ports[port_idx];
239                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
240                                 port_id;
241                 }
242         }
243 }
244
245 static int
246 dsw_start(struct rte_eventdev *dev)
247 {
248         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
249         uint16_t i;
250         uint64_t now;
251
252         rte_atomic32_init(&dsw->credits_on_loan);
253
254         initial_flow_to_port_assignment(dsw);
255
256         now = rte_get_timer_cycles();
257         for (i = 0; i < dsw->num_ports; i++) {
258                 dsw->ports[i].measurement_start = now;
259                 dsw->ports[i].busy_start = now;
260         }
261
262         return 0;
263 }
264
265 static void
266 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
267                    eventdev_stop_flush_t flush, void *flush_arg)
268 {
269         uint16_t i;
270
271         for (i = 0; i < buf_len; i++)
272                 flush(dev_id, buf[i], flush_arg);
273 }
274
275 static void
276 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
277                    eventdev_stop_flush_t flush, void *flush_arg)
278 {
279         uint16_t dport_id;
280
281         for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
282                 if (dport_id != port->id)
283                         dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
284                                            port->out_buffer_len[dport_id],
285                                            flush, flush_arg);
286 }
287
288 static void
289 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
290                        eventdev_stop_flush_t flush, void *flush_arg)
291 {
292         struct rte_event ev;
293
294         while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
295                 flush(dev_id, ev, flush_arg);
296 }
297
298 static void
299 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
300           eventdev_stop_flush_t flush, void *flush_arg)
301 {
302         uint16_t port_id;
303
304         if (flush == NULL)
305                 return;
306
307         for (port_id = 0; port_id < dsw->num_ports; port_id++) {
308                 struct dsw_port *port = &dsw->ports[port_id];
309
310                 dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
311                 dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
312         }
313 }
314
315 static void
316 dsw_stop(struct rte_eventdev *dev)
317 {
318         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
319         uint8_t dev_id;
320         eventdev_stop_flush_t flush;
321         void *flush_arg;
322
323         dev_id = dev->data->dev_id;
324         flush = dev->dev_ops->dev_stop_flush;
325         flush_arg = dev->data->dev_stop_flush_arg;
326
327         dsw_drain(dev_id, dsw, flush, flush_arg);
328 }
329
330 static int
331 dsw_close(struct rte_eventdev *dev)
332 {
333         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
334
335         dsw->num_ports = 0;
336         dsw->num_queues = 0;
337
338         return 0;
339 }
340
341 static struct rte_eventdev_ops dsw_evdev_ops = {
342         .port_setup = dsw_port_setup,
343         .port_def_conf = dsw_port_def_conf,
344         .port_release = dsw_port_release,
345         .queue_setup = dsw_queue_setup,
346         .queue_def_conf = dsw_queue_def_conf,
347         .queue_release = dsw_queue_release,
348         .port_link = dsw_port_link,
349         .port_unlink = dsw_port_unlink,
350         .dev_infos_get = dsw_info_get,
351         .dev_configure = dsw_configure,
352         .dev_start = dsw_start,
353         .dev_stop = dsw_stop,
354         .dev_close = dsw_close
355 };
356
357 static int
358 dsw_probe(struct rte_vdev_device *vdev)
359 {
360         const char *name;
361         struct rte_eventdev *dev;
362         struct dsw_evdev *dsw;
363
364         name = rte_vdev_device_name(vdev);
365
366         dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
367                                       rte_socket_id());
368         if (dev == NULL)
369                 return -EFAULT;
370
371         dev->dev_ops = &dsw_evdev_ops;
372         dev->enqueue = dsw_event_enqueue;
373         dev->enqueue_burst = dsw_event_enqueue_burst;
374         dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
375         dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
376         dev->dequeue = dsw_event_dequeue;
377         dev->dequeue_burst = dsw_event_dequeue_burst;
378
379         if (rte_eal_process_type() != RTE_PROC_PRIMARY)
380                 return 0;
381
382         dsw = dev->data->dev_private;
383         dsw->data = dev->data;
384
385         return 0;
386 }
387
388 static int
389 dsw_remove(struct rte_vdev_device *vdev)
390 {
391         const char *name;
392
393         name = rte_vdev_device_name(vdev);
394         if (name == NULL)
395                 return -EINVAL;
396
397         return rte_event_pmd_vdev_uninit(name);
398 }
399
400 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
401         .probe = dsw_probe,
402         .remove = dsw_remove
403 };
404
405 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);