1 #include <rte_cycles.h>
2 #include <rte_common.h>
5 #include <rte_ethdev.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
16 #define BLOCK_CNT_THRESHOLD 10
17 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
22 #define RSS_KEY_SIZE 40
25 * There is an instance of this struct per polled Rx queue added to the
28 struct eth_rx_poll_entry {
29 /* Eth port to poll */
31 /* Eth rx queue to poll */
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37 /* Count of events in this buffer */
39 /* Array of events in this buffer */
40 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
43 struct rte_event_eth_rx_adapter {
45 uint8_t rss_key_be[RSS_KEY_SIZE];
46 /* Event device identifier */
48 /* Per ethernet device structure */
49 struct eth_device_info *eth_devices;
50 /* Event port identifier */
51 uint8_t event_port_id;
52 /* Lock to serialize config updates with service function */
53 rte_spinlock_t rx_lock;
54 /* Max mbufs processed in any service function invocation */
56 /* Receive queues that need to be polled */
57 struct eth_rx_poll_entry *eth_rx_poll;
58 /* Size of the eth_rx_poll array */
59 uint16_t num_rx_polled;
60 /* Weighted round robin schedule */
62 /* wrr_sched[] size */
64 /* Next entry in wrr[] to begin polling */
66 /* Event burst buffer */
67 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68 /* Per adapter stats */
69 struct rte_event_eth_rx_adapter_stats stats;
70 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71 uint16_t enq_block_count;
73 uint64_t rx_enq_block_start_ts;
74 /* Configuration callback for rte_service configuration */
75 rte_event_eth_rx_adapter_conf_cb conf_cb;
76 /* Configuration callback argument */
78 /* Set if default_cb is being used */
80 /* Service initialization state */
81 uint8_t service_inited;
82 /* Total count of Rx queues in adapter */
84 /* Memory allocation name */
85 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86 /* Socket identifier cached from eventdev */
88 /* Per adapter EAL service */
90 } __rte_cache_aligned;
93 struct eth_device_info {
94 struct rte_eth_dev *dev;
95 struct eth_rx_queue_info *rx_queue;
96 /* Set if ethdev->eventdev packet transfer uses a
99 uint8_t internal_event_port;
100 /* Set if the adapter is processing rx queues for
101 * this eth device and packet processing has been
102 * started, allows for the code to know if the PMD
103 * rx_adapter_stop callback needs to be invoked
105 uint8_t dev_rx_started;
106 /* If nb_dev_queues > 0, the start callback will
107 * be invoked if not already invoked
109 uint16_t nb_dev_queues;
113 struct eth_rx_queue_info {
114 int queue_enabled; /* True if added */
115 uint16_t wt; /* Polling weight */
116 uint8_t event_queue_id; /* Event queue to enqueue packets to */
117 uint8_t sched_type; /* Sched type for events */
118 uint8_t priority; /* Event priority */
119 uint32_t flow_id; /* App provided flow identifier */
120 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
123 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
128 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
131 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
132 if (!valid_id(id)) { \
133 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
139 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
141 return rx_adapter->num_rx_polled;
144 /* Greatest common divisor */
145 static uint16_t gcd_u16(uint16_t a, uint16_t b)
149 return r ? gcd_u16(b, r) : b;
152 /* Returns the next queue in the polling sequence
154 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
157 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
158 unsigned int n, int *cw,
159 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
160 uint16_t gcd, int prev)
176 q = eth_rx_poll[i].eth_rx_qid;
177 d = eth_rx_poll[i].eth_dev_id;
178 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
185 /* Precalculate WRR polling sequence for all queues in rx_adapter */
187 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
193 /* Initialize variables for calculation of wrr schedule */
194 uint16_t max_wrr_pos = 0;
195 unsigned int poll_q = 0;
199 struct eth_rx_poll_entry *rx_poll = NULL;
200 uint32_t *rx_wrr = NULL;
202 if (rx_adapter->num_rx_polled) {
203 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
204 sizeof(*rx_adapter->eth_rx_poll),
205 RTE_CACHE_LINE_SIZE);
206 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
209 rx_adapter->socket_id);
213 /* Generate array of all queues to poll, the size of this
216 for (d = 0; d < rte_eth_dev_count(); d++) {
217 uint16_t nb_rx_queues;
218 struct eth_device_info *dev_info =
219 &rx_adapter->eth_devices[d];
220 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
221 if (dev_info->rx_queue == NULL)
223 for (q = 0; q < nb_rx_queues; q++) {
224 struct eth_rx_queue_info *queue_info =
225 &dev_info->rx_queue[q];
226 if (queue_info->queue_enabled == 0)
229 uint16_t wt = queue_info->wt;
230 rx_poll[poll_q].eth_dev_id = d;
231 rx_poll[poll_q].eth_rx_qid = q;
233 max_wt = RTE_MAX(max_wt, wt);
234 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
239 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
240 RTE_CACHE_LINE_SIZE);
241 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
244 rx_adapter->socket_id);
245 if (rx_wrr == NULL) {
250 /* Generate polling sequence based on weights */
253 for (i = 0; i < max_wrr_pos; i++) {
254 rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
255 rx_poll, max_wt, gcd, prev);
260 rte_free(rx_adapter->eth_rx_poll);
261 rte_free(rx_adapter->wrr_sched);
263 rx_adapter->eth_rx_poll = rx_poll;
264 rx_adapter->wrr_sched = rx_wrr;
265 rx_adapter->wrr_len = max_wrr_pos;
271 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
272 struct ipv6_hdr **ipv6_hdr)
274 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
275 struct vlan_hdr *vlan_hdr;
280 switch (eth_hdr->ether_type) {
281 case RTE_BE16(ETHER_TYPE_IPv4):
282 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
285 case RTE_BE16(ETHER_TYPE_IPv6):
286 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
289 case RTE_BE16(ETHER_TYPE_VLAN):
290 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
291 switch (vlan_hdr->eth_proto) {
292 case RTE_BE16(ETHER_TYPE_IPv4):
293 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
295 case RTE_BE16(ETHER_TYPE_IPv6):
296 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
308 /* Calculate RSS hash for IPv4/6 */
309 static inline uint32_t
310 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
314 struct rte_ipv4_tuple ipv4_tuple;
315 struct rte_ipv6_tuple ipv6_tuple;
316 struct ipv4_hdr *ipv4_hdr;
317 struct ipv6_hdr *ipv6_hdr;
319 mtoip(m, &ipv4_hdr, &ipv6_hdr);
322 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
323 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
325 input_len = RTE_THASH_V4_L3_LEN;
326 } else if (ipv6_hdr) {
327 rte_thash_load_v6_addrs(ipv6_hdr,
328 (union rte_thash_tuple *)&ipv6_tuple);
330 input_len = RTE_THASH_V6_L3_LEN;
334 return rte_softrss_be(tuple, input_len, rss_key_be);
338 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
340 return !!rx_adapter->enq_block_count;
344 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
346 if (rx_adapter->rx_enq_block_start_ts)
349 rx_adapter->enq_block_count++;
350 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
353 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
357 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
358 struct rte_event_eth_rx_adapter_stats *stats)
360 if (unlikely(!stats->rx_enq_start_ts))
361 stats->rx_enq_start_ts = rte_get_tsc_cycles();
363 if (likely(!rx_enq_blocked(rx_adapter)))
366 rx_adapter->enq_block_count = 0;
367 if (rx_adapter->rx_enq_block_start_ts) {
368 stats->rx_enq_end_ts = rte_get_tsc_cycles();
369 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
370 rx_adapter->rx_enq_block_start_ts;
371 rx_adapter->rx_enq_block_start_ts = 0;
375 /* Add event to buffer, free space check is done prior to calling
379 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
380 struct rte_event *ev)
382 struct rte_eth_event_enqueue_buffer *buf =
383 &rx_adapter->event_enqueue_buffer;
384 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
387 /* Enqueue buffered events to event device */
388 static inline uint16_t
389 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
391 struct rte_eth_event_enqueue_buffer *buf =
392 &rx_adapter->event_enqueue_buffer;
393 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
395 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
396 rx_adapter->event_port_id,
399 if (n != buf->count) {
402 (buf->count - n) * sizeof(struct rte_event));
403 stats->rx_enq_retry++;
406 n ? rx_enq_block_end_ts(rx_adapter, stats) :
407 rx_enq_block_start_ts(rx_adapter);
410 stats->rx_enq_count += n;
416 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
418 uint16_t rx_queue_id,
419 struct rte_mbuf **mbufs,
423 struct eth_device_info *eth_device_info =
424 &rx_adapter->eth_devices[dev_id];
425 struct eth_rx_queue_info *eth_rx_queue_info =
426 ð_device_info->rx_queue[rx_queue_id];
428 int32_t qid = eth_rx_queue_info->event_queue_id;
429 uint8_t sched_type = eth_rx_queue_info->sched_type;
430 uint8_t priority = eth_rx_queue_info->priority;
432 struct rte_event events[BATCH_SIZE];
433 struct rte_mbuf *m = mbufs[0];
438 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
439 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
440 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
442 for (i = 0; i < num; i++) {
444 struct rte_event *ev = &events[i];
447 do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
449 eth_rx_queue_info->flow_id &
450 eth_rx_queue_info->flow_id_mask;
451 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
453 ev->flow_id = flow_id;
454 ev->op = RTE_EVENT_OP_NEW;
455 ev->sched_type = sched_type;
457 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
458 ev->sub_event_type = 0;
459 ev->priority = priority;
462 buf_event_enqueue(rx_adapter, ev);
467 * Polls receive queues added to the event adapter and enqueues received
468 * packets to the event device.
470 * The receive code enqueues initially to a temporary buffer, the
471 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
473 * If there isn't space available in the temporary buffer, packets from the
474 * Rx queue aren't dequeued from the eth device, this back pressures the
475 * eth device, in virtual device environments this back pressure is relayed to
476 * the hypervisor's switching layer where adjustments can be made to deal with
479 static inline uint32_t
480 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
485 struct rte_mbuf *mbufs[BATCH_SIZE];
486 struct rte_eth_event_enqueue_buffer *buf;
490 wrr_pos = rx_adapter->wrr_pos;
491 max_nb_rx = rx_adapter->max_nb_rx;
492 buf = &rx_adapter->event_enqueue_buffer;
493 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
495 /* Iterate through a WRR sequence */
496 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
497 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
498 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
499 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
501 /* Don't do a batch dequeue from the rx queue if there isn't
502 * enough space in the enqueue buffer.
504 if (buf->count >= BATCH_SIZE)
505 flush_event_buffer(rx_adapter);
506 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
509 stats->rx_poll_count++;
510 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
513 stats->rx_packets += n;
514 /* The check before rte_eth_rx_burst() ensures that
515 * all n mbufs can be buffered
517 fill_event_buffer(rx_adapter, d, qid, mbufs, n);
519 if (nb_rx > max_nb_rx) {
520 rx_adapter->wrr_pos =
521 (wrr_pos + 1) % rx_adapter->wrr_len;
526 if (++wrr_pos == rx_adapter->wrr_len)
534 event_eth_rx_adapter_service_func(void *args)
536 struct rte_event_eth_rx_adapter *rx_adapter = args;
537 struct rte_eth_event_enqueue_buffer *buf;
539 buf = &rx_adapter->event_enqueue_buffer;
540 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
542 if (eth_rx_poll(rx_adapter) == 0 && buf->count)
543 flush_event_buffer(rx_adapter);
544 rte_spinlock_unlock(&rx_adapter->rx_lock);
549 rte_event_eth_rx_adapter_init(void)
551 const char *name = "rte_event_eth_rx_adapter_array";
552 const struct rte_memzone *mz;
555 sz = sizeof(*event_eth_rx_adapter) *
556 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
557 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
559 mz = rte_memzone_lookup(name);
561 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
562 RTE_CACHE_LINE_SIZE);
564 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
570 event_eth_rx_adapter = mz->addr;
574 static inline struct rte_event_eth_rx_adapter *
575 id_to_rx_adapter(uint8_t id)
577 return event_eth_rx_adapter ?
578 event_eth_rx_adapter[id] : NULL;
582 default_conf_cb(uint8_t id, uint8_t dev_id,
583 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
586 struct rte_eventdev *dev;
587 struct rte_event_dev_config dev_conf;
590 struct rte_event_port_conf *port_conf = arg;
591 struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
593 dev = &rte_eventdevs[rx_adapter->eventdev_id];
594 dev_conf = dev->data->dev_conf;
596 started = dev->data->dev_started;
598 rte_event_dev_stop(dev_id);
599 port_id = dev_conf.nb_event_ports;
600 dev_conf.nb_event_ports += 1;
601 ret = rte_event_dev_configure(dev_id, &dev_conf);
603 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
606 if (rte_event_dev_start(dev_id))
612 ret = rte_event_port_setup(dev_id, port_id, port_conf);
614 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
619 conf->event_port_id = port_id;
620 conf->max_nb_rx = 128;
622 ret = rte_event_dev_start(dev_id);
623 rx_adapter->default_cb_arg = 1;
628 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
631 struct rte_service_spec service;
632 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
634 if (rx_adapter->service_inited)
637 memset(&service, 0, sizeof(service));
638 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
639 "rte_event_eth_rx_adapter_%d", id);
640 service.socket_id = rx_adapter->socket_id;
641 service.callback = event_eth_rx_adapter_service_func;
642 service.callback_userdata = rx_adapter;
643 /* Service function handles locking for queue add/del updates */
644 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
645 ret = rte_service_component_register(&service, &rx_adapter->service_id);
647 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
652 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
653 &rx_adapter_conf, rx_adapter->conf_arg);
655 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
659 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
660 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
661 rx_adapter->service_inited = 1;
665 rte_service_component_unregister(rx_adapter->service_id);
671 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
672 struct eth_device_info *dev_info,
676 struct eth_rx_queue_info *queue_info;
680 if (dev_info->rx_queue == NULL)
683 if (rx_queue_id == -1) {
684 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
685 update_queue_info(rx_adapter, dev_info, i, add);
687 queue_info = &dev_info->rx_queue[rx_queue_id];
688 enabled = queue_info->queue_enabled;
690 rx_adapter->nb_queues += !enabled;
691 dev_info->nb_dev_queues += !enabled;
693 rx_adapter->nb_queues -= enabled;
694 dev_info->nb_dev_queues -= enabled;
696 queue_info->queue_enabled = !!add;
701 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
702 struct eth_device_info *dev_info,
703 uint16_t rx_queue_id)
705 struct eth_rx_queue_info *queue_info;
707 if (rx_adapter->nb_queues == 0)
710 queue_info = &dev_info->rx_queue[rx_queue_id];
711 rx_adapter->num_rx_polled -= queue_info->queue_enabled;
712 update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
717 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
718 struct eth_device_info *dev_info,
719 uint16_t rx_queue_id,
720 const struct rte_event_eth_rx_adapter_queue_conf *conf)
723 struct eth_rx_queue_info *queue_info;
724 const struct rte_event *ev = &conf->ev;
726 queue_info = &dev_info->rx_queue[rx_queue_id];
727 queue_info->event_queue_id = ev->queue_id;
728 queue_info->sched_type = ev->sched_type;
729 queue_info->priority = ev->priority;
730 queue_info->wt = conf->servicing_weight;
732 if (conf->rx_queue_flags &
733 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
734 queue_info->flow_id = ev->flow_id;
735 queue_info->flow_id_mask = ~0;
738 /* The same queue can be added more than once */
739 rx_adapter->num_rx_polled += !queue_info->queue_enabled;
740 update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
743 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
746 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
748 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
749 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
753 if (queue_conf->servicing_weight == 0) {
755 struct rte_eth_dev_data *data = dev_info->dev->data;
756 if (data->dev_conf.intr_conf.rxq) {
757 RTE_EDEV_LOG_ERR("Interrupt driven queues"
761 temp_conf = *queue_conf;
763 /* If Rx interrupts are disabled set wt = 1 */
764 temp_conf.servicing_weight = 1;
765 queue_conf = &temp_conf;
768 if (dev_info->rx_queue == NULL) {
770 rte_zmalloc_socket(rx_adapter->mem_name,
771 dev_info->dev->data->nb_rx_queues *
772 sizeof(struct eth_rx_queue_info), 0,
773 rx_adapter->socket_id);
774 if (dev_info->rx_queue == NULL)
778 if (rx_queue_id == -1) {
779 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
780 event_eth_rx_adapter_queue_add(rx_adapter,
784 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
785 (uint16_t)rx_queue_id,
789 ret = eth_poll_wrr_calc(rx_adapter);
791 event_eth_rx_adapter_queue_del(rx_adapter,
792 dev_info, rx_queue_id);
800 rx_adapter_ctrl(uint8_t id, int start)
802 struct rte_event_eth_rx_adapter *rx_adapter;
803 struct rte_eventdev *dev;
804 struct eth_device_info *dev_info;
809 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
810 rx_adapter = id_to_rx_adapter(id);
811 if (rx_adapter == NULL)
814 dev = &rte_eventdevs[rx_adapter->eventdev_id];
816 for (i = 0; i < rte_eth_dev_count(); i++) {
817 dev_info = &rx_adapter->eth_devices[i];
818 /* if start check for num dev queues */
819 if (start && !dev_info->nb_dev_queues)
821 /* if stop check if dev has been started */
822 if (stop && !dev_info->dev_rx_started)
824 use_service |= !dev_info->internal_event_port;
825 dev_info->dev_rx_started = start;
826 if (dev_info->internal_event_port == 0)
828 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
829 &rte_eth_devices[i]) :
830 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
831 &rte_eth_devices[i]);
835 rte_service_runstate_set(rx_adapter->service_id, start);
841 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
842 rte_event_eth_rx_adapter_conf_cb conf_cb,
845 struct rte_event_eth_rx_adapter *rx_adapter;
849 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
850 const uint8_t default_rss_key[] = {
851 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
852 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
853 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
854 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
855 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
858 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
859 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
863 if (event_eth_rx_adapter == NULL) {
864 ret = rte_event_eth_rx_adapter_init();
869 rx_adapter = id_to_rx_adapter(id);
870 if (rx_adapter != NULL) {
871 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
875 socket_id = rte_event_dev_socket_id(dev_id);
876 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
877 "rte_event_eth_rx_adapter_%d",
880 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
881 RTE_CACHE_LINE_SIZE, socket_id);
882 if (rx_adapter == NULL) {
883 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
887 rx_adapter->eventdev_id = dev_id;
888 rx_adapter->socket_id = socket_id;
889 rx_adapter->conf_cb = conf_cb;
890 rx_adapter->conf_arg = conf_arg;
891 strcpy(rx_adapter->mem_name, mem_name);
892 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
893 rte_eth_dev_count() *
894 sizeof(struct eth_device_info), 0,
896 rte_convert_rss_key((const uint32_t *)default_rss_key,
897 (uint32_t *)rx_adapter->rss_key_be,
898 RTE_DIM(default_rss_key));
900 if (rx_adapter->eth_devices == NULL) {
901 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
902 rte_free(rx_adapter);
905 rte_spinlock_init(&rx_adapter->rx_lock);
906 for (i = 0; i < rte_eth_dev_count(); i++)
907 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
909 event_eth_rx_adapter[id] = rx_adapter;
910 if (conf_cb == default_conf_cb)
911 rx_adapter->default_cb_arg = 1;
916 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
917 struct rte_event_port_conf *port_config)
919 struct rte_event_port_conf *pc;
922 if (port_config == NULL)
924 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
926 pc = rte_malloc(NULL, sizeof(*pc), 0);
930 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
939 rte_event_eth_rx_adapter_free(uint8_t id)
941 struct rte_event_eth_rx_adapter *rx_adapter;
943 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
945 rx_adapter = id_to_rx_adapter(id);
946 if (rx_adapter == NULL)
949 if (rx_adapter->nb_queues) {
950 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
951 rx_adapter->nb_queues);
955 if (rx_adapter->default_cb_arg)
956 rte_free(rx_adapter->conf_arg);
957 rte_free(rx_adapter->eth_devices);
958 rte_free(rx_adapter);
959 event_eth_rx_adapter[id] = NULL;
965 rte_event_eth_rx_adapter_queue_add(uint8_t id,
968 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
972 struct rte_event_eth_rx_adapter *rx_adapter;
973 struct rte_eventdev *dev;
974 struct eth_device_info *dev_info;
977 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
978 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
980 rx_adapter = id_to_rx_adapter(id);
981 if ((rx_adapter == NULL) || (queue_conf == NULL))
984 dev = &rte_eventdevs[rx_adapter->eventdev_id];
985 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
989 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
990 "eth port %" PRIu8, id, eth_dev_id);
994 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
995 && (queue_conf->rx_queue_flags &
996 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
997 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
998 " eth port: %" PRIu8 " adapter id: %" PRIu8,
1003 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1004 (rx_queue_id != -1)) {
1005 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1006 "event queue id %u eth port %u", id, eth_dev_id);
1010 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1011 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1012 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1013 (uint16_t)rx_queue_id);
1018 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1020 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1021 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1023 if (dev_info->rx_queue == NULL) {
1024 dev_info->rx_queue =
1025 rte_zmalloc_socket(rx_adapter->mem_name,
1026 dev_info->dev->data->nb_rx_queues *
1027 sizeof(struct eth_rx_queue_info), 0,
1028 rx_adapter->socket_id);
1029 if (dev_info->rx_queue == NULL)
1033 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1034 &rte_eth_devices[eth_dev_id],
1035 rx_queue_id, queue_conf);
1037 update_queue_info(rx_adapter,
1038 &rx_adapter->eth_devices[eth_dev_id],
1043 rte_spinlock_lock(&rx_adapter->rx_lock);
1044 ret = init_service(rx_adapter, id);
1046 ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1048 rte_spinlock_unlock(&rx_adapter->rx_lock);
1050 start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1057 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1063 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1064 int32_t rx_queue_id)
1067 struct rte_eventdev *dev;
1068 struct rte_event_eth_rx_adapter *rx_adapter;
1069 struct eth_device_info *dev_info;
1073 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1074 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1076 rx_adapter = id_to_rx_adapter(id);
1077 if (rx_adapter == NULL)
1080 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1081 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1087 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1088 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1089 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1090 (uint16_t)rx_queue_id);
1094 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1096 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1097 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1099 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1100 &rte_eth_devices[eth_dev_id],
1103 update_queue_info(rx_adapter,
1104 &rx_adapter->eth_devices[eth_dev_id],
1107 if (dev_info->nb_dev_queues == 0) {
1108 rte_free(dev_info->rx_queue);
1109 dev_info->rx_queue = NULL;
1114 rte_spinlock_lock(&rx_adapter->rx_lock);
1115 if (rx_queue_id == -1) {
1116 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1117 event_eth_rx_adapter_queue_del(rx_adapter,
1121 event_eth_rx_adapter_queue_del(rx_adapter,
1123 (uint16_t)rx_queue_id);
1126 rc = eth_poll_wrr_calc(rx_adapter);
1128 RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1131 if (dev_info->nb_dev_queues == 0) {
1132 rte_free(dev_info->rx_queue);
1133 dev_info->rx_queue = NULL;
1136 rte_spinlock_unlock(&rx_adapter->rx_lock);
1137 rte_service_component_runstate_set(rx_adapter->service_id,
1138 sw_rx_adapter_queue_count(rx_adapter));
1146 rte_event_eth_rx_adapter_start(uint8_t id)
1148 return rx_adapter_ctrl(id, 1);
1152 rte_event_eth_rx_adapter_stop(uint8_t id)
1154 return rx_adapter_ctrl(id, 0);
1158 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1159 struct rte_event_eth_rx_adapter_stats *stats)
1161 struct rte_event_eth_rx_adapter *rx_adapter;
1162 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1163 struct rte_event_eth_rx_adapter_stats dev_stats;
1164 struct rte_eventdev *dev;
1165 struct eth_device_info *dev_info;
1169 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1171 rx_adapter = id_to_rx_adapter(id);
1172 if (rx_adapter == NULL || stats == NULL)
1175 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1176 memset(stats, 0, sizeof(*stats));
1177 for (i = 0; i < rte_eth_dev_count(); i++) {
1178 dev_info = &rx_adapter->eth_devices[i];
1179 if (dev_info->internal_event_port == 0 ||
1180 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1182 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1183 &rte_eth_devices[i],
1187 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1188 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1191 if (rx_adapter->service_inited)
1192 *stats = rx_adapter->stats;
1194 stats->rx_packets += dev_stats_sum.rx_packets;
1195 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1200 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1202 struct rte_event_eth_rx_adapter *rx_adapter;
1203 struct rte_eventdev *dev;
1204 struct eth_device_info *dev_info;
1207 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1209 rx_adapter = id_to_rx_adapter(id);
1210 if (rx_adapter == NULL)
1213 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1214 for (i = 0; i < rte_eth_dev_count(); i++) {
1215 dev_info = &rx_adapter->eth_devices[i];
1216 if (dev_info->internal_event_port == 0 ||
1217 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1219 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1220 &rte_eth_devices[i]);
1223 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1228 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1230 struct rte_event_eth_rx_adapter *rx_adapter;
1232 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1234 rx_adapter = id_to_rx_adapter(id);
1235 if (rx_adapter == NULL || service_id == NULL)
1238 if (rx_adapter->service_inited)
1239 *service_id = rx_adapter->service_id;
1241 return rx_adapter->service_inited ? 0 : -ESRCH;