X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=examples%2Feventdev_pipeline%2Fpipeline_worker_tx.c;h=a82e064c1c1837ec1562cd5c7be16c0b19d17db3;hb=e53ed84acbbb853396bfd959815da7e141756ad2;hp=a0f40c27c6cee27723e00d1db9e1ea383b4a7578;hpb=b21302a1072118c2c2950639133720b3bff86984;p=dpdk.git diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c index a0f40c27c6..a82e064c1c 100644 --- a/examples/eventdev_pipeline/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline/pipeline_worker_tx.c @@ -18,21 +18,22 @@ static __rte_always_inline void worker_event_enqueue(const uint8_t dev, const uint8_t port, struct rte_event *ev) { - while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) + while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done) rte_pause(); } -static __rte_always_inline void +static __rte_always_inline uint16_t worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, - struct rte_event *ev, const uint16_t nb_rx) + struct rte_event *ev, const uint16_t nb_rx) { uint16_t enq; enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); - while (enq < nb_rx) { + while (enq < nb_rx && !fdata->done) enq += rte_event_enqueue_burst(dev, port, ev + enq, nb_rx - enq); - } + + return enq; } static __rte_always_inline void @@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) { exchange_mac(ev->mbuf); rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); - while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0)) + while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) && + !fdata->done) rte_pause(); } @@ -76,6 +78,11 @@ worker_do_tx_single(void *arg) } } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg) } } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg) const uint8_t dev = data->dev_id; const uint8_t port = data->port_id; size_t fwd = 0, received = 0, tx = 0; + uint16_t nb_tx = 0, nb_rx = 0, i; while (!fdata->done) { - uint16_t i; - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg) const uint8_t dev = data->dev_id; const uint8_t port = data->port_id; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev, - BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (!nb_rx) { rte_pause(); @@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -251,6 +265,11 @@ worker_do_tx(void *arg) fwd++; } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg) fwd++; } + if (ev.u64) { + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev, port, &ev, 1); + } + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg) uint8_t port = data->port_id; uint8_t lst_qid = cdata.num_stages - 1; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, - ev, BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (nb_rx == 0) { rte_pause(); @@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg) } work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg) uint8_t port = data->port_id; uint8_t lst_qid = cdata.num_stages - 1; size_t fwd = 0, received = 0, tx = 0; + uint16_t i, nb_rx = 0, nb_tx = 0; while (!fdata->done) { - uint16_t i; - - const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, - ev, BATCH_SIZE, 0); + nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0); if (nb_rx == 0) { rte_pause(); @@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg) work(); } - worker_event_enqueue_burst(dev, port, ev, nb_rx); - fwd += nb_rx; + nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_tx; } + worker_cleanup(dev, port, ev, nb_tx, nb_rx); + if (!cdata.quiet) printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", rte_lcore_id(), received, fwd, tx); @@ -436,6 +461,7 @@ setup_eventdev_worker_tx_enq(struct worker_data *worker_data) struct rte_event_dev_config config = { .nb_event_queues = nb_queues, .nb_event_ports = nb_ports, + .nb_single_link_event_port_queues = 0, .nb_events_limit = 4096, .nb_event_queue_flows = 1024, .nb_event_port_dequeue_depth = 128, @@ -445,6 +471,7 @@ setup_eventdev_worker_tx_enq(struct worker_data *worker_data) .dequeue_depth = cdata.worker_cq_depth, .enqueue_depth = 64, .new_event_threshold = 4096, + .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_WORKER, }; struct rte_event_queue_conf wkr_q_conf = { .schedule_type = cdata.queue_type, @@ -603,6 +630,129 @@ service_rx_adapter(void *arg) return 0; } +/* + * Initializes a given port using global settings and with the RX buffers + * coming from the mbuf_pool passed as a parameter. + */ +static inline int +port_init(uint8_t port, struct rte_mempool *mbuf_pool) +{ + struct rte_eth_rxconf rx_conf; + static const struct rte_eth_conf port_conf_default = { + .rxmode = { + .mq_mode = RTE_ETH_MQ_RX_RSS, + }, + .rx_adv_conf = { + .rss_conf = { + .rss_hf = RTE_ETH_RSS_IP | + RTE_ETH_RSS_TCP | + RTE_ETH_RSS_UDP, + } + } + }; + const uint16_t rx_rings = 1, tx_rings = 1; + const uint16_t rx_ring_size = 512, tx_ring_size = 512; + struct rte_eth_conf port_conf = port_conf_default; + int retval; + uint16_t q; + struct rte_eth_dev_info dev_info; + struct rte_eth_txconf txconf; + + if (!rte_eth_dev_is_valid_port(port)) + return -1; + + retval = rte_eth_dev_info_get(port, &dev_info); + if (retval != 0) { + printf("Error during getting device (port %u) info: %s\n", + port, strerror(-retval)); + return retval; + } + + if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) + port_conf.txmode.offloads |= + RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE; + rx_conf = dev_info.default_rxconf; + rx_conf.offloads = port_conf.rxmode.offloads; + + port_conf.rx_adv_conf.rss_conf.rss_hf &= + dev_info.flow_type_rss_offloads; + if (port_conf.rx_adv_conf.rss_conf.rss_hf != + port_conf_default.rx_adv_conf.rss_conf.rss_hf) { + printf("Port %u modified RSS hash function based on hardware support," + "requested:%#"PRIx64" configured:%#"PRIx64"\n", + port, + port_conf_default.rx_adv_conf.rss_conf.rss_hf, + port_conf.rx_adv_conf.rss_conf.rss_hf); + } + + /* Configure the Ethernet device. */ + retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf); + if (retval != 0) + return retval; + + /* Allocate and set up 1 RX queue per Ethernet port. */ + for (q = 0; q < rx_rings; q++) { + retval = rte_eth_rx_queue_setup(port, q, rx_ring_size, + rte_eth_dev_socket_id(port), &rx_conf, + mbuf_pool); + if (retval < 0) + return retval; + } + + txconf = dev_info.default_txconf; + txconf.offloads = port_conf_default.txmode.offloads; + /* Allocate and set up 1 TX queue per Ethernet port. */ + for (q = 0; q < tx_rings; q++) { + retval = rte_eth_tx_queue_setup(port, q, tx_ring_size, + rte_eth_dev_socket_id(port), &txconf); + if (retval < 0) + return retval; + } + + /* Display the port MAC address. */ + struct rte_ether_addr addr; + retval = rte_eth_macaddr_get(port, &addr); + if (retval != 0) { + printf("Failed to get MAC address (port %u): %s\n", + port, rte_strerror(-retval)); + return retval; + } + + printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 + " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n", + (unsigned int)port, RTE_ETHER_ADDR_BYTES(&addr)); + + /* Enable RX in promiscuous mode for the Ethernet device. */ + retval = rte_eth_promiscuous_enable(port); + if (retval != 0) + return retval; + + return 0; +} + +static int +init_ports(uint16_t num_ports) +{ + uint16_t portid; + + if (!cdata.num_mbuf) + cdata.num_mbuf = 16384 * num_ports; + + struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool", + /* mbufs */ cdata.num_mbuf, + /* cache_size */ 512, + /* priv_size*/ 0, + /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE, + rte_socket_id()); + + RTE_ETH_FOREACH_DEV(portid) + if (port_init(portid, mp) != 0) + rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", + portid); + + return 0; +} + static void init_adapters(uint16_t nb_ports) { @@ -619,8 +769,10 @@ init_adapters(uint16_t nb_ports) .dequeue_depth = cdata.worker_cq_depth, .enqueue_depth = 64, .new_event_threshold = 4096, + .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_PRODUCER, }; + init_ports(nb_ports); if (adptr_p_conf.new_event_threshold > dev_info.max_num_events) adptr_p_conf.new_event_threshold = dev_info.max_num_events; if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)