event/dsw: add event scheduling and device start/stop
[dpdk.git] / drivers / event / dsw / dsw_evdev.c
index 5dccc23..40a7435 100644 (file)
@@ -6,6 +6,7 @@
 
 #include <rte_eventdev_pmd.h>
 #include <rte_eventdev_pmd_vdev.h>
+#include <rte_random.h>
 
 #include "dsw_evdev.h"
 
@@ -201,10 +202,125 @@ dsw_configure(const struct rte_eventdev *dev)
 {
        struct dsw_evdev *dsw = dsw_pmd_priv(dev);
        const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+       int32_t min_max_in_flight;
 
        dsw->num_ports = conf->nb_event_ports;
        dsw->num_queues = conf->nb_event_queues;
 
+       /* Avoid a situation where consumer ports are holding all the
+        * credits, without making use of them.
+        */
+       min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
+
+       dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
+
+       return 0;
+}
+
+
+static void
+initial_flow_to_port_assignment(struct dsw_evdev *dsw)
+{
+       uint8_t queue_id;
+       for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
+               struct dsw_queue *queue = &dsw->queues[queue_id];
+               uint16_t flow_hash;
+               for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
+                       uint8_t port_idx =
+                               rte_rand() % queue->num_serving_ports;
+                       uint8_t port_id =
+                               queue->serving_ports[port_idx];
+                       dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+                               port_id;
+               }
+       }
+}
+
+static int
+dsw_start(struct rte_eventdev *dev)
+{
+       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+       rte_atomic32_init(&dsw->credits_on_loan);
+
+       initial_flow_to_port_assignment(dsw);
+
+       return 0;
+}
+
+static void
+dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
+                  eventdev_stop_flush_t flush, void *flush_arg)
+{
+       uint16_t i;
+
+       for (i = 0; i < buf_len; i++)
+               flush(dev_id, buf[i], flush_arg);
+}
+
+static void
+dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
+                  eventdev_stop_flush_t flush, void *flush_arg)
+{
+       uint16_t dport_id;
+
+       for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
+               if (dport_id != port->id)
+                       dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
+                                          port->out_buffer_len[dport_id],
+                                          flush, flush_arg);
+}
+
+static void
+dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
+                      eventdev_stop_flush_t flush, void *flush_arg)
+{
+       struct rte_event ev;
+
+       while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
+               flush(dev_id, ev, flush_arg);
+}
+
+static void
+dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
+         eventdev_stop_flush_t flush, void *flush_arg)
+{
+       uint16_t port_id;
+
+       if (flush == NULL)
+               return;
+
+       for (port_id = 0; port_id < dsw->num_ports; port_id++) {
+               struct dsw_port *port = &dsw->ports[port_id];
+
+               dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+               dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
+       }
+}
+
+static void
+dsw_stop(struct rte_eventdev *dev)
+{
+       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+       uint8_t dev_id;
+       eventdev_stop_flush_t flush;
+       void *flush_arg;
+
+       dev_id = dev->data->dev_id;
+       flush = dev->dev_ops->dev_stop_flush;
+       flush_arg = dev->data->dev_stop_flush_arg;
+
+       dsw_drain(dev_id, dsw, flush, flush_arg);
+}
+
+static int
+dsw_close(struct rte_eventdev *dev)
+{
+       struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+       dsw->num_ports = 0;
+       dsw->num_queues = 0;
+
        return 0;
 }
 
@@ -219,6 +335,9 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
        .port_unlink = dsw_port_unlink,
        .dev_infos_get = dsw_info_get,
        .dev_configure = dsw_configure,
+       .dev_start = dsw_start,
+       .dev_stop = dsw_stop,
+       .dev_close = dsw_close
 };
 
 static int
@@ -236,6 +355,12 @@ dsw_probe(struct rte_vdev_device *vdev)
                return -EFAULT;
 
        dev->dev_ops = &dsw_evdev_ops;
+       dev->enqueue = dsw_event_enqueue;
+       dev->enqueue_burst = dsw_event_enqueue_burst;
+       dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
+       dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
+       dev->dequeue = dsw_event_dequeue;
+       dev->dequeue_burst = dsw_event_dequeue_burst;
 
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
                return 0;