net/virtio: improve perf via one-way barrier on avail flag
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
index 42dd7f8..95dd478 100644 (file)
@@ -147,6 +147,10 @@ struct rte_event_eth_rx_adapter {
 struct eth_device_info {
        struct rte_eth_dev *dev;
        struct eth_rx_queue_info *rx_queue;
+       /* Rx callback */
+       rte_event_eth_rx_adapter_cb_fn cb_fn;
+       /* Rx callback argument */
+       void *cb_arg;
        /* Set if ethdev->eventdev packet transfer uses a
         * hardware mechanism
         */
@@ -194,11 +198,8 @@ struct eth_rx_queue_info {
        int queue_enabled;      /* True if added */
        int intr_enabled;
        uint16_t wt;            /* Polling weight */
-       uint8_t event_queue_id; /* Event queue to enqueue packets to */
-       uint8_t sched_type;     /* Sched type for events */
-       uint8_t priority;       /* Event priority */
-       uint32_t flow_id;       /* App provided flow identifier */
        uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
+       uint64_t event;
 };
 
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -607,32 +608,33 @@ rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
 }
 
 static inline void
-rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
-       struct ipv6_hdr **ipv6_hdr)
+rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
+       struct rte_ipv6_hdr **ipv6_hdr)
 {
-       struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
-       struct vlan_hdr *vlan_hdr;
+       struct rte_ether_hdr *eth_hdr =
+               rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
+       struct rte_vlan_hdr *vlan_hdr;
 
        *ipv4_hdr = NULL;
        *ipv6_hdr = NULL;
 
        switch (eth_hdr->ether_type) {
-       case RTE_BE16(ETHER_TYPE_IPv4):
-               *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
+       case RTE_BE16(RTE_ETHER_TYPE_IPV4):
+               *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
                break;
 
-       case RTE_BE16(ETHER_TYPE_IPv6):
-               *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
+       case RTE_BE16(RTE_ETHER_TYPE_IPV6):
+               *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
                break;
 
-       case RTE_BE16(ETHER_TYPE_VLAN):
-               vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
+       case RTE_BE16(RTE_ETHER_TYPE_VLAN):
+               vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
                switch (vlan_hdr->eth_proto) {
-               case RTE_BE16(ETHER_TYPE_IPv4):
-                       *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
+               case RTE_BE16(RTE_ETHER_TYPE_IPV4):
+                       *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
                        break;
-               case RTE_BE16(ETHER_TYPE_IPv6):
-                       *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
+               case RTE_BE16(RTE_ETHER_TYPE_IPV6):
+                       *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
                        break;
                default:
                        break;
@@ -652,8 +654,8 @@ rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
        void *tuple;
        struct rte_ipv4_tuple ipv4_tuple;
        struct rte_ipv6_tuple ipv6_tuple;
-       struct ipv4_hdr *ipv4_hdr;
-       struct ipv6_hdr *ipv6_hdr;
+       struct rte_ipv4_hdr *ipv4_hdr;
+       struct rte_ipv6_hdr *ipv6_hdr;
 
        rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
 
@@ -711,18 +713,6 @@ rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
        }
 }
 
-/* Add event to buffer, free space check is done prior to calling
- * this function
- */
-static inline void
-rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
-               struct rte_event *ev)
-{
-       struct rte_eth_event_enqueue_buffer *buf =
-           &rx_adapter->event_enqueue_buffer;
-       rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
-}
-
 /* Enqueue buffered events to event device */
 static inline uint16_t
 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
@@ -759,21 +749,22 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
                uint16_t num)
 {
        uint32_t i;
-       struct eth_device_info *eth_device_info =
+       struct eth_device_info *dev_info =
                                        &rx_adapter->eth_devices[eth_dev_id];
        struct eth_rx_queue_info *eth_rx_queue_info =
-                                       &eth_device_info->rx_queue[rx_queue_id];
-
-       int32_t qid = eth_rx_queue_info->event_queue_id;
-       uint8_t sched_type = eth_rx_queue_info->sched_type;
-       uint8_t priority = eth_rx_queue_info->priority;
-       uint32_t flow_id;
-       struct rte_event events[BATCH_SIZE];
+                                       &dev_info->rx_queue[rx_queue_id];
+       struct rte_eth_event_enqueue_buffer *buf =
+                                       &rx_adapter->event_enqueue_buffer;
+       struct rte_event *ev = &buf->events[buf->count];
+       uint64_t event = eth_rx_queue_info->event;
+       uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
        struct rte_mbuf *m = mbufs[0];
        uint32_t rss_mask;
        uint32_t rss;
        int do_rss;
        uint64_t ts;
+       uint16_t nb_cb;
+       uint16_t dropped;
 
        /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
        rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
@@ -791,26 +782,33 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 
        for (i = 0; i < num; i++) {
                m = mbufs[i];
-               struct rte_event *ev = &events[i];
 
                rss = do_rss ?
                        rxa_do_softrss(m, rx_adapter->rss_key_be) :
                        m->hash.rss;
-               flow_id =
-                   eth_rx_queue_info->flow_id &
-                               eth_rx_queue_info->flow_id_mask;
-               flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
-               ev->flow_id = flow_id;
-               ev->op = RTE_EVENT_OP_NEW;
-               ev->sched_type = sched_type;
-               ev->queue_id = qid;
-               ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
-               ev->sub_event_type = 0;
-               ev->priority = priority;
+               ev->event = event;
+               ev->flow_id = (rss & ~flow_id_mask) |
+                               (ev->flow_id & flow_id_mask);
                ev->mbuf = m;
+               ev++;
+       }
+
+       if (dev_info->cb_fn) {
 
-               rxa_buffer_event(rx_adapter, ev);
+               dropped = 0;
+               nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
+                                       ETH_EVENT_BUFFER_SIZE, buf->count, ev,
+                                       num, dev_info->cb_arg, &dropped);
+               if (unlikely(nb_cb > num))
+                       RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
+                               nb_cb, num);
+               else
+                       num = nb_cb;
+               if (dropped)
+                       rx_adapter->stats.rx_dropped += dropped;
        }
+
+       buf->count += num;
 }
 
 /* Enqueue packets from  <port, q>  to event buffer */
@@ -852,7 +850,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
                        break;
        }
 
-       if (buf->count >= BATCH_SIZE)
+       if (buf->count > 0)
                rxa_flush_event_buffer(rx_adapter);
 
        return nb_rx;
@@ -892,7 +890,7 @@ rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
                 */
                if (err)
                        RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
-                               " to ring: %s", strerror(err));
+                               " to ring: %s", strerror(-err));
                else
                        rte_eth_dev_rx_intr_disable(port_id, queue);
        }
@@ -1105,7 +1103,6 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
        wrr_pos = rx_adapter->wrr_pos;
        max_nb_rx = rx_adapter->max_nb_rx;
        buf = &rx_adapter->event_enqueue_buffer;
-       stats = &rx_adapter->stats;
 
        /* Iterate through a WRR sequence */
        for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
@@ -1146,8 +1143,8 @@ rxa_service_func(void *args)
        if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
                return 0;
        if (!rx_adapter->rxa_started) {
-               return 0;
                rte_spinlock_unlock(&rx_adapter->rx_lock);
+               return 0;
        }
 
        stats = &rx_adapter->stats;
@@ -1698,6 +1695,7 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
        int pollq;
        int intrq;
        int sintrq;
+       struct rte_event *qi_ev;
 
        if (rx_queue_id == -1) {
                uint16_t nb_rx_queues;
@@ -1714,16 +1712,19 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
        sintrq = rxa_shared_intr(dev_info, rx_queue_id);
 
        queue_info = &dev_info->rx_queue[rx_queue_id];
-       queue_info->event_queue_id = ev->queue_id;
-       queue_info->sched_type = ev->sched_type;
-       queue_info->priority = ev->priority;
        queue_info->wt = conf->servicing_weight;
 
+       qi_ev = (struct rte_event *)&queue_info->event;
+       qi_ev->event = ev->event;
+       qi_ev->op = RTE_EVENT_OP_NEW;
+       qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
+       qi_ev->sub_event_type = 0;
+
        if (conf->rx_queue_flags &
                        RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
-               queue_info->flow_id = ev->flow_id;
                queue_info->flow_id_mask = ~0;
-       }
+       } else
+               qi_ev->flow_id = 0;
 
        rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
        if (rxa_polled_queue(dev_info, rx_queue_id)) {
@@ -1978,8 +1979,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
        rx_adapter->id = id;
        strcpy(rx_adapter->mem_name, mem_name);
        rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
-                                       /* FIXME: incompatible with hotplug */
-                                       rte_eth_dev_count_total() *
+                                       RTE_MAX_ETHPORTS *
                                        sizeof(struct eth_device_info), 0,
                                        socket_id);
        rte_convert_rss_key((const uint32_t *)default_rss_key,
@@ -1992,7 +1992,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
                return -ENOMEM;
        }
        rte_spinlock_init(&rx_adapter->rx_lock);
-       RTE_ETH_FOREACH_DEV(i)
+       for (i = 0; i < RTE_MAX_ETHPORTS; i++)
                rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
 
        event_eth_rx_adapter[id] = rx_adapter;
@@ -2364,3 +2364,48 @@ rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
 
        return rx_adapter->service_inited ? 0 : -ESRCH;
 }
+
+int
+rte_event_eth_rx_adapter_cb_register(uint8_t id,
+                                       uint16_t eth_dev_id,
+                                       rte_event_eth_rx_adapter_cb_fn cb_fn,
+                                       void *cb_arg)
+{
+       struct rte_event_eth_rx_adapter *rx_adapter;
+       struct eth_device_info *dev_info;
+       uint32_t cap;
+       int ret;
+
+       RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
+       RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
+
+       rx_adapter = rxa_id_to_adapter(id);
+       if (rx_adapter == NULL)
+               return -EINVAL;
+
+       dev_info = &rx_adapter->eth_devices[eth_dev_id];
+       if (dev_info->rx_queue == NULL)
+               return -EINVAL;
+
+       ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
+                                               eth_dev_id,
+                                               &cap);
+       if (ret) {
+               RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
+                       "eth port %" PRIu16, id, eth_dev_id);
+               return ret;
+       }
+
+       if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
+               RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
+                               PRIu16, eth_dev_id);
+               return -EINVAL;
+       }
+
+       rte_spinlock_lock(&rx_adapter->rx_lock);
+       dev_info->cb_fn = cb_fn;
+       dev_info->cb_arg = cb_arg;
+       rte_spinlock_unlock(&rx_adapter->rx_lock);
+
+       return 0;
+}