+ wrr_pos = rx_adapter->wrr_pos;
+ max_nb_rx = rx_adapter->max_nb_rx;
+ buf = &rx_adapter->event_enqueue_buffer;
+
+ /* Iterate through a WRR sequence */
+ for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
+ unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
+ uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
+ uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
+
+ /* Don't do a batch dequeue from the rx queue if there isn't
+ * enough space in the enqueue buffer.
+ */
+ if (buf->count >= BATCH_SIZE)
+ rxa_flush_event_buffer(rx_adapter);
+ if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
+ rx_adapter->wrr_pos = wrr_pos;
+ return nb_rx;
+ }
+
+ nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
+ NULL);
+ if (nb_rx > max_nb_rx) {
+ rx_adapter->wrr_pos =
+ (wrr_pos + 1) % rx_adapter->wrr_len;
+ break;
+ }
+
+ if (++wrr_pos == rx_adapter->wrr_len)
+ wrr_pos = 0;
+ }
+ return nb_rx;
+}
+
+static int
+rxa_service_func(void *args)
+{
+ struct rte_event_eth_rx_adapter *rx_adapter = args;
+ struct rte_event_eth_rx_adapter_stats *stats;
+
+ if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
+ return 0;
+ if (!rx_adapter->rxa_started) {
+ rte_spinlock_unlock(&rx_adapter->rx_lock);
+ return 0;
+ }
+
+ stats = &rx_adapter->stats;
+ stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
+ stats->rx_packets += rxa_poll(rx_adapter);
+ rte_spinlock_unlock(&rx_adapter->rx_lock);
+ return 0;
+}
+
+static int
+rte_event_eth_rx_adapter_init(void)
+{
+ const char *name = "rte_event_eth_rx_adapter_array";
+ const struct rte_memzone *mz;
+ unsigned int sz;
+
+ sz = sizeof(*event_eth_rx_adapter) *
+ RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
+ sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+
+ mz = rte_memzone_lookup(name);
+ if (mz == NULL) {
+ mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
+ RTE_CACHE_LINE_SIZE);
+ if (mz == NULL) {
+ RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
+ PRId32, rte_errno);
+ return -rte_errno;
+ }
+ }
+
+ event_eth_rx_adapter = mz->addr;
+ return 0;
+}
+
+static inline struct rte_event_eth_rx_adapter *
+rxa_id_to_adapter(uint8_t id)
+{
+ return event_eth_rx_adapter ?
+ event_eth_rx_adapter[id] : NULL;
+}
+
+static int
+rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
+ struct rte_event_eth_rx_adapter_conf *conf, void *arg)
+{
+ int ret;
+ struct rte_eventdev *dev;
+ struct rte_event_dev_config dev_conf;
+ int started;
+ uint8_t port_id;
+ struct rte_event_port_conf *port_conf = arg;
+ struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
+
+ dev = &rte_eventdevs[rx_adapter->eventdev_id];
+ dev_conf = dev->data->dev_conf;
+
+ started = dev->data->dev_started;
+ if (started)
+ rte_event_dev_stop(dev_id);
+ port_id = dev_conf.nb_event_ports;
+ dev_conf.nb_event_ports += 1;
+ ret = rte_event_dev_configure(dev_id, &dev_conf);
+ if (ret) {
+ RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
+ dev_id);
+ if (started) {
+ if (rte_event_dev_start(dev_id))
+ return -EIO;
+ }
+ return ret;
+ }
+
+ ret = rte_event_port_setup(dev_id, port_id, port_conf);
+ if (ret) {
+ RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
+ port_id);
+ return ret;
+ }
+
+ conf->event_port_id = port_id;
+ conf->max_nb_rx = 128;
+ if (started)
+ ret = rte_event_dev_start(dev_id);
+ rx_adapter->default_cb_arg = 1;
+ return ret;
+}
+
+static int
+rxa_epoll_create1(void)
+{
+#if defined(LINUX)
+ int fd;
+ fd = epoll_create1(EPOLL_CLOEXEC);
+ return fd < 0 ? -errno : fd;
+#elif defined(BSD)
+ return -ENOTSUP;
+#endif
+}
+
+static int
+rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ if (rx_adapter->epd != INIT_FD)
+ return 0;
+
+ rx_adapter->epd = rxa_epoll_create1();
+ if (rx_adapter->epd < 0) {
+ int err = rx_adapter->epd;
+ rx_adapter->epd = INIT_FD;
+ RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
+ return err;
+ }
+
+ return 0;
+}
+
+static int
+rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int err;
+ char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+ if (rx_adapter->intr_ring)
+ return 0;
+
+ rx_adapter->intr_ring = rte_ring_create("intr_ring",
+ RTE_EVENT_ETH_INTR_RING_SIZE,
+ rte_socket_id(), 0);
+ if (!rx_adapter->intr_ring)
+ return -ENOMEM;
+
+ rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
+ RTE_EVENT_ETH_INTR_RING_SIZE *
+ sizeof(struct rte_epoll_event),
+ RTE_CACHE_LINE_SIZE,
+ rx_adapter->socket_id);
+ if (!rx_adapter->epoll_events) {
+ err = -ENOMEM;
+ goto error;
+ }
+
+ rte_spinlock_init(&rx_adapter->intr_ring_lock);
+
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
+ "rx-intr-thread-%d", rx_adapter->id);
+
+ err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
+ NULL, rxa_intr_thread, rx_adapter);
+ if (!err) {
+ rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
+ return 0;
+ }
+
+ RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
+error:
+ rte_ring_free(rx_adapter->intr_ring);
+ rx_adapter->intr_ring = NULL;
+ rx_adapter->epoll_events = NULL;
+ return err;
+}
+
+static int
+rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int err;
+
+ err = pthread_cancel(rx_adapter->rx_intr_thread);
+ if (err)
+ RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
+ err);
+
+ err = pthread_join(rx_adapter->rx_intr_thread, NULL);
+ if (err)
+ RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
+
+ rte_free(rx_adapter->epoll_events);
+ rte_ring_free(rx_adapter->intr_ring);
+ rx_adapter->intr_ring = NULL;
+ rx_adapter->epoll_events = NULL;
+ return 0;
+}
+
+static int
+rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
+{
+ int ret;
+
+ if (rx_adapter->num_rx_intr == 0)
+ return 0;
+
+ ret = rxa_destroy_intr_thread(rx_adapter);
+ if (ret)
+ return ret;
+
+ close(rx_adapter->epd);
+ rx_adapter->epd = INIT_FD;
+
+ return ret;
+}
+
+static int
+rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ uint16_t rx_queue_id)
+{
+ int err;
+ uint16_t eth_dev_id = dev_info->dev->data->port_id;
+ int sintr = rxa_shared_intr(dev_info, rx_queue_id);
+
+ err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
+ if (err) {
+ RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
+ rx_queue_id);
+ return err;
+ }
+
+ err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
+ rx_adapter->epd,
+ RTE_INTR_EVENT_DEL,
+ 0);
+ if (err)
+ RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
+
+ if (sintr)
+ dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
+ else
+ dev_info->shared_intr_enabled = 0;
+ return err;
+}
+
+static int
+rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_device_info *dev_info,
+ int rx_queue_id)
+{
+ int err;
+ int i;
+ int s;