1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
5 #include <rte_cycles.h>
6 #include <rte_common.h>
9 #include <rte_ethdev.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
20 #define BLOCK_CNT_THRESHOLD 10
21 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
26 #define RSS_KEY_SIZE 40
29 * There is an instance of this struct per polled Rx queue added to the
32 struct eth_rx_poll_entry {
33 /* Eth port to poll */
35 /* Eth rx queue to poll */
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41 /* Count of events in this buffer */
43 /* Array of events in this buffer */
44 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
47 struct rte_event_eth_rx_adapter {
49 uint8_t rss_key_be[RSS_KEY_SIZE];
50 /* Event device identifier */
52 /* Per ethernet device structure */
53 struct eth_device_info *eth_devices;
54 /* Event port identifier */
55 uint8_t event_port_id;
56 /* Lock to serialize config updates with service function */
57 rte_spinlock_t rx_lock;
58 /* Max mbufs processed in any service function invocation */
60 /* Receive queues that need to be polled */
61 struct eth_rx_poll_entry *eth_rx_poll;
62 /* Size of the eth_rx_poll array */
63 uint16_t num_rx_polled;
64 /* Weighted round robin schedule */
66 /* wrr_sched[] size */
68 /* Next entry in wrr[] to begin polling */
70 /* Event burst buffer */
71 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72 /* Per adapter stats */
73 struct rte_event_eth_rx_adapter_stats stats;
74 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75 uint16_t enq_block_count;
77 uint64_t rx_enq_block_start_ts;
78 /* Configuration callback for rte_service configuration */
79 rte_event_eth_rx_adapter_conf_cb conf_cb;
80 /* Configuration callback argument */
82 /* Set if default_cb is being used */
84 /* Service initialization state */
85 uint8_t service_inited;
86 /* Total count of Rx queues in adapter */
88 /* Memory allocation name */
89 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90 /* Socket identifier cached from eventdev */
92 /* Per adapter EAL service */
94 /* Adapter started flag */
96 } __rte_cache_aligned;
99 struct eth_device_info {
100 struct rte_eth_dev *dev;
101 struct eth_rx_queue_info *rx_queue;
102 /* Set if ethdev->eventdev packet transfer uses a
105 uint8_t internal_event_port;
106 /* Set if the adapter is processing rx queues for
107 * this eth device and packet processing has been
108 * started, allows for the code to know if the PMD
109 * rx_adapter_stop callback needs to be invoked
111 uint8_t dev_rx_started;
112 /* Number of queues added for this device */
113 uint16_t nb_dev_queues;
114 /* If nb_rx_poll > 0, the start callback will
115 * be invoked if not already invoked
118 /* sum(wrr(q)) for all queues within the device
119 * useful when deleting all device queues
125 struct eth_rx_queue_info {
126 int queue_enabled; /* True if added */
127 uint16_t wt; /* Polling weight */
128 uint8_t event_queue_id; /* Event queue to enqueue packets to */
129 uint8_t sched_type; /* Sched type for events */
130 uint8_t priority; /* Event priority */
131 uint32_t flow_id; /* App provided flow identifier */
132 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
135 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
138 rxa_validate_id(uint8_t id)
140 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
143 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
144 if (!rxa_validate_id(id)) { \
145 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
151 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
153 return rx_adapter->num_rx_polled;
156 /* Greatest common divisor */
157 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
161 return r ? rxa_gcd_u16(b, r) : b;
164 /* Returns the next queue in the polling sequence
166 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
169 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
170 unsigned int n, int *cw,
171 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
172 uint16_t gcd, int prev)
188 q = eth_rx_poll[i].eth_rx_qid;
189 d = eth_rx_poll[i].eth_dev_id;
190 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
198 rxa_polled_queue(struct eth_device_info *dev_info,
201 struct eth_rx_queue_info *queue_info;
203 queue_info = &dev_info->rx_queue[rx_queue_id];
204 return !dev_info->internal_event_port &&
205 dev_info->rx_queue &&
206 queue_info->queue_enabled && queue_info->wt != 0;
209 /* Calculate size of the eth_rx_poll and wrr_sched arrays
210 * after deleting poll mode rx queues
213 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
214 struct eth_device_info *dev_info,
216 uint32_t *nb_rx_poll,
220 uint32_t wrr_len_diff;
222 if (rx_queue_id == -1) {
223 poll_diff = dev_info->nb_rx_poll;
224 wrr_len_diff = dev_info->wrr_len;
226 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
227 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
231 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
232 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
235 /* Calculate nb_rx_* after adding poll mode rx queues
238 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
239 struct eth_device_info *dev_info,
242 uint32_t *nb_rx_poll,
246 uint32_t wrr_len_diff;
248 if (rx_queue_id == -1) {
249 poll_diff = dev_info->dev->data->nb_rx_queues -
250 dev_info->nb_rx_poll;
251 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
254 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
255 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
256 wt - dev_info->rx_queue[rx_queue_id].wt :
260 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
261 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
264 /* Calculate nb_rx_* after adding rx_queue_id */
266 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
267 struct eth_device_info *dev_info,
270 uint32_t *nb_rx_poll,
273 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
274 wt, nb_rx_poll, nb_wrr);
277 /* Calculate nb_rx_* after deleting rx_queue_id */
279 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
280 struct eth_device_info *dev_info,
282 uint32_t *nb_rx_poll,
285 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
290 * Allocate the rx_poll array
292 static struct eth_rx_poll_entry *
293 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
294 uint32_t num_rx_polled)
298 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
299 RTE_CACHE_LINE_SIZE);
300 return rte_zmalloc_socket(rx_adapter->mem_name,
303 rx_adapter->socket_id);
307 * Allocate the WRR array
310 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
314 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
315 RTE_CACHE_LINE_SIZE);
316 return rte_zmalloc_socket(rx_adapter->mem_name,
319 rx_adapter->socket_id);
323 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
326 struct eth_rx_poll_entry **rx_poll,
327 uint32_t **wrr_sched)
336 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
337 if (*rx_poll == NULL) {
342 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
343 if (*wrr_sched == NULL) {
350 /* Precalculate WRR polling sequence for all queues in rx_adapter */
352 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
353 struct eth_rx_poll_entry *rx_poll,
362 /* Initialize variables for calculation of wrr schedule */
363 uint16_t max_wrr_pos = 0;
364 unsigned int poll_q = 0;
371 /* Generate array of all queues to poll, the size of this
374 RTE_ETH_FOREACH_DEV(d) {
375 uint16_t nb_rx_queues;
376 struct eth_device_info *dev_info =
377 &rx_adapter->eth_devices[d];
378 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
379 if (dev_info->rx_queue == NULL)
381 if (dev_info->internal_event_port)
383 dev_info->wrr_len = 0;
384 for (q = 0; q < nb_rx_queues; q++) {
385 struct eth_rx_queue_info *queue_info =
386 &dev_info->rx_queue[q];
389 if (!rxa_polled_queue(dev_info, q))
392 rx_poll[poll_q].eth_dev_id = d;
393 rx_poll[poll_q].eth_rx_qid = q;
395 dev_info->wrr_len += wt;
396 max_wt = RTE_MAX(max_wt, wt);
397 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
402 /* Generate polling sequence based on weights */
405 for (i = 0; i < max_wrr_pos; i++) {
406 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
407 rx_poll, max_wt, gcd, prev);
413 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
414 struct ipv6_hdr **ipv6_hdr)
416 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
417 struct vlan_hdr *vlan_hdr;
422 switch (eth_hdr->ether_type) {
423 case RTE_BE16(ETHER_TYPE_IPv4):
424 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
427 case RTE_BE16(ETHER_TYPE_IPv6):
428 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
431 case RTE_BE16(ETHER_TYPE_VLAN):
432 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
433 switch (vlan_hdr->eth_proto) {
434 case RTE_BE16(ETHER_TYPE_IPv4):
435 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
437 case RTE_BE16(ETHER_TYPE_IPv6):
438 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
450 /* Calculate RSS hash for IPv4/6 */
451 static inline uint32_t
452 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
456 struct rte_ipv4_tuple ipv4_tuple;
457 struct rte_ipv6_tuple ipv6_tuple;
458 struct ipv4_hdr *ipv4_hdr;
459 struct ipv6_hdr *ipv6_hdr;
461 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
464 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
465 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
467 input_len = RTE_THASH_V4_L3_LEN;
468 } else if (ipv6_hdr) {
469 rte_thash_load_v6_addrs(ipv6_hdr,
470 (union rte_thash_tuple *)&ipv6_tuple);
472 input_len = RTE_THASH_V6_L3_LEN;
476 return rte_softrss_be(tuple, input_len, rss_key_be);
480 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
482 return !!rx_adapter->enq_block_count;
486 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
488 if (rx_adapter->rx_enq_block_start_ts)
491 rx_adapter->enq_block_count++;
492 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
495 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
499 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
500 struct rte_event_eth_rx_adapter_stats *stats)
502 if (unlikely(!stats->rx_enq_start_ts))
503 stats->rx_enq_start_ts = rte_get_tsc_cycles();
505 if (likely(!rxa_enq_blocked(rx_adapter)))
508 rx_adapter->enq_block_count = 0;
509 if (rx_adapter->rx_enq_block_start_ts) {
510 stats->rx_enq_end_ts = rte_get_tsc_cycles();
511 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
512 rx_adapter->rx_enq_block_start_ts;
513 rx_adapter->rx_enq_block_start_ts = 0;
517 /* Add event to buffer, free space check is done prior to calling
521 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
522 struct rte_event *ev)
524 struct rte_eth_event_enqueue_buffer *buf =
525 &rx_adapter->event_enqueue_buffer;
526 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
529 /* Enqueue buffered events to event device */
530 static inline uint16_t
531 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
533 struct rte_eth_event_enqueue_buffer *buf =
534 &rx_adapter->event_enqueue_buffer;
535 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
537 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
538 rx_adapter->event_port_id,
541 if (n != buf->count) {
544 (buf->count - n) * sizeof(struct rte_event));
545 stats->rx_enq_retry++;
548 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
549 rxa_enq_block_start_ts(rx_adapter);
552 stats->rx_enq_count += n;
558 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
560 uint16_t rx_queue_id,
561 struct rte_mbuf **mbufs,
565 struct eth_device_info *eth_device_info =
566 &rx_adapter->eth_devices[eth_dev_id];
567 struct eth_rx_queue_info *eth_rx_queue_info =
568 ð_device_info->rx_queue[rx_queue_id];
570 int32_t qid = eth_rx_queue_info->event_queue_id;
571 uint8_t sched_type = eth_rx_queue_info->sched_type;
572 uint8_t priority = eth_rx_queue_info->priority;
574 struct rte_event events[BATCH_SIZE];
575 struct rte_mbuf *m = mbufs[0];
581 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
582 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
583 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
585 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
586 ts = rte_get_tsc_cycles();
587 for (i = 0; i < num; i++) {
591 m->ol_flags |= PKT_RX_TIMESTAMP;
595 for (i = 0; i < num; i++) {
597 struct rte_event *ev = &events[i];
600 rxa_do_softrss(m, rx_adapter->rss_key_be) :
603 eth_rx_queue_info->flow_id &
604 eth_rx_queue_info->flow_id_mask;
605 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
606 ev->flow_id = flow_id;
607 ev->op = RTE_EVENT_OP_NEW;
608 ev->sched_type = sched_type;
610 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
611 ev->sub_event_type = 0;
612 ev->priority = priority;
615 rxa_buffer_event(rx_adapter, ev);
619 /* Enqueue packets from <port, q> to event buffer */
620 static inline uint32_t
621 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
627 struct rte_mbuf *mbufs[BATCH_SIZE];
628 struct rte_eth_event_enqueue_buffer *buf =
629 &rx_adapter->event_enqueue_buffer;
630 struct rte_event_eth_rx_adapter_stats *stats =
635 /* Don't do a batch dequeue from the rx queue if there isn't
636 * enough space in the enqueue buffer.
638 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
639 if (buf->count >= BATCH_SIZE)
640 rxa_flush_event_buffer(rx_adapter);
642 stats->rx_poll_count++;
643 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
646 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
648 if (rx_count + nb_rx > max_rx)
652 if (buf->count >= BATCH_SIZE)
653 rxa_flush_event_buffer(rx_adapter);
659 * Polls receive queues added to the event adapter and enqueues received
660 * packets to the event device.
662 * The receive code enqueues initially to a temporary buffer, the
663 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
665 * If there isn't space available in the temporary buffer, packets from the
666 * Rx queue aren't dequeued from the eth device, this back pressures the
667 * eth device, in virtual device environments this back pressure is relayed to
668 * the hypervisor's switching layer where adjustments can be made to deal with
672 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
676 struct rte_eth_event_enqueue_buffer *buf;
679 struct rte_event_eth_rx_adapter_stats *stats;
681 wrr_pos = rx_adapter->wrr_pos;
682 max_nb_rx = rx_adapter->max_nb_rx;
683 buf = &rx_adapter->event_enqueue_buffer;
684 stats = &rx_adapter->stats;
686 /* Iterate through a WRR sequence */
687 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
688 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
689 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
690 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
692 /* Don't do a batch dequeue from the rx queue if there isn't
693 * enough space in the enqueue buffer.
695 if (buf->count >= BATCH_SIZE)
696 rxa_flush_event_buffer(rx_adapter);
697 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
698 rx_adapter->wrr_pos = wrr_pos;
702 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
703 if (nb_rx > max_nb_rx) {
704 rx_adapter->wrr_pos =
705 (wrr_pos + 1) % rx_adapter->wrr_len;
709 if (++wrr_pos == rx_adapter->wrr_len)
713 stats->rx_packets += nb_rx;
717 rxa_service_func(void *args)
719 struct rte_event_eth_rx_adapter *rx_adapter = args;
721 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
723 if (!rx_adapter->rxa_started) {
725 rte_spinlock_unlock(&rx_adapter->rx_lock);
727 rxa_poll(rx_adapter);
728 rte_spinlock_unlock(&rx_adapter->rx_lock);
733 rte_event_eth_rx_adapter_init(void)
735 const char *name = "rte_event_eth_rx_adapter_array";
736 const struct rte_memzone *mz;
739 sz = sizeof(*event_eth_rx_adapter) *
740 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
741 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
743 mz = rte_memzone_lookup(name);
745 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
746 RTE_CACHE_LINE_SIZE);
748 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
754 event_eth_rx_adapter = mz->addr;
758 static inline struct rte_event_eth_rx_adapter *
759 rxa_id_to_adapter(uint8_t id)
761 return event_eth_rx_adapter ?
762 event_eth_rx_adapter[id] : NULL;
766 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
767 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
770 struct rte_eventdev *dev;
771 struct rte_event_dev_config dev_conf;
774 struct rte_event_port_conf *port_conf = arg;
775 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
777 dev = &rte_eventdevs[rx_adapter->eventdev_id];
778 dev_conf = dev->data->dev_conf;
780 started = dev->data->dev_started;
782 rte_event_dev_stop(dev_id);
783 port_id = dev_conf.nb_event_ports;
784 dev_conf.nb_event_ports += 1;
785 ret = rte_event_dev_configure(dev_id, &dev_conf);
787 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
790 if (rte_event_dev_start(dev_id))
796 ret = rte_event_port_setup(dev_id, port_id, port_conf);
798 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
803 conf->event_port_id = port_id;
804 conf->max_nb_rx = 128;
806 ret = rte_event_dev_start(dev_id);
807 rx_adapter->default_cb_arg = 1;
812 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
815 struct rte_service_spec service;
816 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
818 if (rx_adapter->service_inited)
821 memset(&service, 0, sizeof(service));
822 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
823 "rte_event_eth_rx_adapter_%d", id);
824 service.socket_id = rx_adapter->socket_id;
825 service.callback = rxa_service_func;
826 service.callback_userdata = rx_adapter;
827 /* Service function handles locking for queue add/del updates */
828 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
829 ret = rte_service_component_register(&service, &rx_adapter->service_id);
831 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
836 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
837 &rx_adapter_conf, rx_adapter->conf_arg);
839 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
843 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
844 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
845 rx_adapter->service_inited = 1;
849 rte_service_component_unregister(rx_adapter->service_id);
854 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
855 struct eth_device_info *dev_info,
859 struct eth_rx_queue_info *queue_info;
863 if (dev_info->rx_queue == NULL)
866 if (rx_queue_id == -1) {
867 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
868 rxa_update_queue(rx_adapter, dev_info, i, add);
870 queue_info = &dev_info->rx_queue[rx_queue_id];
871 enabled = queue_info->queue_enabled;
873 rx_adapter->nb_queues += !enabled;
874 dev_info->nb_dev_queues += !enabled;
876 rx_adapter->nb_queues -= enabled;
877 dev_info->nb_dev_queues -= enabled;
879 queue_info->queue_enabled = !!add;
884 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
885 struct eth_device_info *dev_info,
890 if (rx_adapter->nb_queues == 0)
893 if (rx_queue_id == -1) {
894 uint16_t nb_rx_queues;
897 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
898 for (i = 0; i < nb_rx_queues; i++)
899 rxa_sw_del(rx_adapter, dev_info, i);
903 pollq = rxa_polled_queue(dev_info, rx_queue_id);
904 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
905 rx_adapter->num_rx_polled -= pollq;
906 dev_info->nb_rx_poll -= pollq;
910 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
911 struct eth_device_info *dev_info,
913 const struct rte_event_eth_rx_adapter_queue_conf *conf)
915 struct eth_rx_queue_info *queue_info;
916 const struct rte_event *ev = &conf->ev;
919 if (rx_queue_id == -1) {
920 uint16_t nb_rx_queues;
923 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
924 for (i = 0; i < nb_rx_queues; i++)
925 rxa_add_queue(rx_adapter, dev_info, i, conf);
929 pollq = rxa_polled_queue(dev_info, rx_queue_id);
931 queue_info = &dev_info->rx_queue[rx_queue_id];
932 queue_info->event_queue_id = ev->queue_id;
933 queue_info->sched_type = ev->sched_type;
934 queue_info->priority = ev->priority;
935 queue_info->wt = conf->servicing_weight;
937 if (conf->rx_queue_flags &
938 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
939 queue_info->flow_id = ev->flow_id;
940 queue_info->flow_id_mask = ~0;
943 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
944 if (rxa_polled_queue(dev_info, rx_queue_id)) {
945 rx_adapter->num_rx_polled += !pollq;
946 dev_info->nb_rx_poll += !pollq;
950 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
953 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
955 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
956 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
958 struct eth_rx_poll_entry *rx_poll;
959 struct eth_rx_queue_info *rx_queue;
961 uint16_t nb_rx_queues;
962 uint32_t nb_rx_poll, nb_wrr;
964 if (queue_conf->servicing_weight == 0) {
966 struct rte_eth_dev_data *data = dev_info->dev->data;
967 if (data->dev_conf.intr_conf.rxq) {
968 RTE_EDEV_LOG_ERR("Interrupt driven queues"
972 temp_conf = *queue_conf;
974 /* If Rx interrupts are disabled set wt = 1 */
975 temp_conf.servicing_weight = 1;
976 queue_conf = &temp_conf;
979 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
980 rx_queue = dev_info->rx_queue;
982 if (dev_info->rx_queue == NULL) {
984 rte_zmalloc_socket(rx_adapter->mem_name,
986 sizeof(struct eth_rx_queue_info), 0,
987 rx_adapter->socket_id);
988 if (dev_info->rx_queue == NULL)
994 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
995 queue_conf->servicing_weight,
996 &nb_rx_poll, &nb_wrr);
998 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1001 goto err_free_rxqueue;
1003 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1004 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1006 rte_free(rx_adapter->eth_rx_poll);
1007 rte_free(rx_adapter->wrr_sched);
1009 rx_adapter->eth_rx_poll = rx_poll;
1010 rx_adapter->wrr_sched = rx_wrr;
1011 rx_adapter->wrr_len = nb_wrr;
1015 if (rx_queue == NULL) {
1016 rte_free(dev_info->rx_queue);
1017 dev_info->rx_queue = NULL;
1027 rxa_ctrl(uint8_t id, int start)
1029 struct rte_event_eth_rx_adapter *rx_adapter;
1030 struct rte_eventdev *dev;
1031 struct eth_device_info *dev_info;
1033 int use_service = 0;
1036 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1037 rx_adapter = rxa_id_to_adapter(id);
1038 if (rx_adapter == NULL)
1041 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1043 RTE_ETH_FOREACH_DEV(i) {
1044 dev_info = &rx_adapter->eth_devices[i];
1045 /* if start check for num dev queues */
1046 if (start && !dev_info->nb_dev_queues)
1048 /* if stop check if dev has been started */
1049 if (stop && !dev_info->dev_rx_started)
1051 use_service |= !dev_info->internal_event_port;
1052 dev_info->dev_rx_started = start;
1053 if (dev_info->internal_event_port == 0)
1055 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1056 &rte_eth_devices[i]) :
1057 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1058 &rte_eth_devices[i]);
1062 rte_spinlock_lock(&rx_adapter->rx_lock);
1063 rx_adapter->rxa_started = start;
1064 rte_service_runstate_set(rx_adapter->service_id, start);
1065 rte_spinlock_unlock(&rx_adapter->rx_lock);
1072 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1073 rte_event_eth_rx_adapter_conf_cb conf_cb,
1076 struct rte_event_eth_rx_adapter *rx_adapter;
1080 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1081 const uint8_t default_rss_key[] = {
1082 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1083 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1084 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1085 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1086 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1089 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1090 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1091 if (conf_cb == NULL)
1094 if (event_eth_rx_adapter == NULL) {
1095 ret = rte_event_eth_rx_adapter_init();
1100 rx_adapter = rxa_id_to_adapter(id);
1101 if (rx_adapter != NULL) {
1102 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1106 socket_id = rte_event_dev_socket_id(dev_id);
1107 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1108 "rte_event_eth_rx_adapter_%d",
1111 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1112 RTE_CACHE_LINE_SIZE, socket_id);
1113 if (rx_adapter == NULL) {
1114 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1118 rx_adapter->eventdev_id = dev_id;
1119 rx_adapter->socket_id = socket_id;
1120 rx_adapter->conf_cb = conf_cb;
1121 rx_adapter->conf_arg = conf_arg;
1122 strcpy(rx_adapter->mem_name, mem_name);
1123 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1124 /* FIXME: incompatible with hotplug */
1125 rte_eth_dev_count_total() *
1126 sizeof(struct eth_device_info), 0,
1128 rte_convert_rss_key((const uint32_t *)default_rss_key,
1129 (uint32_t *)rx_adapter->rss_key_be,
1130 RTE_DIM(default_rss_key));
1132 if (rx_adapter->eth_devices == NULL) {
1133 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1134 rte_free(rx_adapter);
1137 rte_spinlock_init(&rx_adapter->rx_lock);
1138 RTE_ETH_FOREACH_DEV(i)
1139 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1141 event_eth_rx_adapter[id] = rx_adapter;
1142 if (conf_cb == rxa_default_conf_cb)
1143 rx_adapter->default_cb_arg = 1;
1148 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
1149 struct rte_event_port_conf *port_config)
1151 struct rte_event_port_conf *pc;
1154 if (port_config == NULL)
1156 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1158 pc = rte_malloc(NULL, sizeof(*pc), 0);
1162 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
1163 rxa_default_conf_cb,
1171 rte_event_eth_rx_adapter_free(uint8_t id)
1173 struct rte_event_eth_rx_adapter *rx_adapter;
1175 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1177 rx_adapter = rxa_id_to_adapter(id);
1178 if (rx_adapter == NULL)
1181 if (rx_adapter->nb_queues) {
1182 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
1183 rx_adapter->nb_queues);
1187 if (rx_adapter->default_cb_arg)
1188 rte_free(rx_adapter->conf_arg);
1189 rte_free(rx_adapter->eth_devices);
1190 rte_free(rx_adapter);
1191 event_eth_rx_adapter[id] = NULL;
1197 rte_event_eth_rx_adapter_queue_add(uint8_t id,
1198 uint16_t eth_dev_id,
1199 int32_t rx_queue_id,
1200 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1204 struct rte_event_eth_rx_adapter *rx_adapter;
1205 struct rte_eventdev *dev;
1206 struct eth_device_info *dev_info;
1208 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1209 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1211 rx_adapter = rxa_id_to_adapter(id);
1212 if ((rx_adapter == NULL) || (queue_conf == NULL))
1215 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1216 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1220 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1221 "eth port %" PRIu16, id, eth_dev_id);
1225 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1226 && (queue_conf->rx_queue_flags &
1227 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1228 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1229 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1234 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1235 (rx_queue_id != -1)) {
1236 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1237 "event queue, eth port: %" PRIu16 " adapter id: %"
1238 PRIu8, eth_dev_id, id);
1242 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1243 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1244 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1245 (uint16_t)rx_queue_id);
1249 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1251 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1252 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1254 if (dev_info->rx_queue == NULL) {
1255 dev_info->rx_queue =
1256 rte_zmalloc_socket(rx_adapter->mem_name,
1257 dev_info->dev->data->nb_rx_queues *
1258 sizeof(struct eth_rx_queue_info), 0,
1259 rx_adapter->socket_id);
1260 if (dev_info->rx_queue == NULL)
1264 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1265 &rte_eth_devices[eth_dev_id],
1266 rx_queue_id, queue_conf);
1268 dev_info->internal_event_port = 1;
1269 rxa_update_queue(rx_adapter,
1270 &rx_adapter->eth_devices[eth_dev_id],
1275 rte_spinlock_lock(&rx_adapter->rx_lock);
1276 dev_info->internal_event_port = 0;
1277 ret = rxa_init_service(rx_adapter, id);
1279 uint32_t service_id = rx_adapter->service_id;
1280 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
1282 rte_service_component_runstate_set(service_id,
1283 rxa_sw_adapter_queue_count(rx_adapter));
1285 rte_spinlock_unlock(&rx_adapter->rx_lock);
1295 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1296 int32_t rx_queue_id)
1299 struct rte_eventdev *dev;
1300 struct rte_event_eth_rx_adapter *rx_adapter;
1301 struct eth_device_info *dev_info;
1303 uint32_t nb_rx_poll = 0;
1304 uint32_t nb_wrr = 0;
1305 struct eth_rx_poll_entry *rx_poll = NULL;
1306 uint32_t *rx_wrr = NULL;
1308 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1309 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1311 rx_adapter = rxa_id_to_adapter(id);
1312 if (rx_adapter == NULL)
1315 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1316 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1322 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1323 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1324 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1325 (uint16_t)rx_queue_id);
1329 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1331 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1332 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1334 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1335 &rte_eth_devices[eth_dev_id],
1338 rxa_update_queue(rx_adapter,
1339 &rx_adapter->eth_devices[eth_dev_id],
1342 if (dev_info->nb_dev_queues == 0) {
1343 rte_free(dev_info->rx_queue);
1344 dev_info->rx_queue = NULL;
1348 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
1349 &nb_rx_poll, &nb_wrr);
1350 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1355 rte_spinlock_lock(&rx_adapter->rx_lock);
1356 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
1357 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1359 rte_free(rx_adapter->eth_rx_poll);
1360 rte_free(rx_adapter->wrr_sched);
1362 rx_adapter->eth_rx_poll = rx_poll;
1363 rx_adapter->num_rx_polled = nb_rx_poll;
1364 rx_adapter->wrr_sched = rx_wrr;
1365 rx_adapter->wrr_len = nb_wrr;
1367 if (dev_info->nb_dev_queues == 0) {
1368 rte_free(dev_info->rx_queue);
1369 dev_info->rx_queue = NULL;
1371 rte_spinlock_unlock(&rx_adapter->rx_lock);
1373 rte_service_component_runstate_set(rx_adapter->service_id,
1374 rxa_sw_adapter_queue_count(rx_adapter));
1382 rte_event_eth_rx_adapter_start(uint8_t id)
1384 return rxa_ctrl(id, 1);
1388 rte_event_eth_rx_adapter_stop(uint8_t id)
1390 return rxa_ctrl(id, 0);
1394 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1395 struct rte_event_eth_rx_adapter_stats *stats)
1397 struct rte_event_eth_rx_adapter *rx_adapter;
1398 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1399 struct rte_event_eth_rx_adapter_stats dev_stats;
1400 struct rte_eventdev *dev;
1401 struct eth_device_info *dev_info;
1405 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1407 rx_adapter = rxa_id_to_adapter(id);
1408 if (rx_adapter == NULL || stats == NULL)
1411 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1412 memset(stats, 0, sizeof(*stats));
1413 RTE_ETH_FOREACH_DEV(i) {
1414 dev_info = &rx_adapter->eth_devices[i];
1415 if (dev_info->internal_event_port == 0 ||
1416 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1418 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1419 &rte_eth_devices[i],
1423 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1424 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1427 if (rx_adapter->service_inited)
1428 *stats = rx_adapter->stats;
1430 stats->rx_packets += dev_stats_sum.rx_packets;
1431 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1436 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1438 struct rte_event_eth_rx_adapter *rx_adapter;
1439 struct rte_eventdev *dev;
1440 struct eth_device_info *dev_info;
1443 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1445 rx_adapter = rxa_id_to_adapter(id);
1446 if (rx_adapter == NULL)
1449 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1450 RTE_ETH_FOREACH_DEV(i) {
1451 dev_info = &rx_adapter->eth_devices[i];
1452 if (dev_info->internal_event_port == 0 ||
1453 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1455 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1456 &rte_eth_devices[i]);
1459 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1464 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1466 struct rte_event_eth_rx_adapter *rx_adapter;
1468 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1470 rx_adapter = rxa_id_to_adapter(id);
1471 if (rx_adapter == NULL || service_id == NULL)
1474 if (rx_adapter->service_inited)
1475 *service_id = rx_adapter->service_id;
1477 return rx_adapter->service_inited ? 0 : -ESRCH;