+
+
+static int
+lcore_distributor(struct lcore_params *p)
+{
+ struct rte_ring *in_r = p->rx_dist_ring;
+ struct rte_ring *out_r = p->dist_tx_ring;
+ struct rte_mbuf *bufs[BURST_SIZE * 4];
+ struct rte_distributor *d = p->d;
+
+ printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+ while (!quit_signal_dist) {
+ const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+ (void *)bufs, BURST_SIZE*1, NULL);
+ if (nb_rx) {
+ app_stats.dist.in_pkts += nb_rx;
+
+ /* Distribute the packets */
+ rte_distributor_process(d, bufs, nb_rx);
+ /* Handle Returns */
+ const uint16_t nb_ret =
+ rte_distributor_returned_pkts(d,
+ bufs, BURST_SIZE*2);
+
+ if (unlikely(nb_ret == 0))
+ continue;
+ app_stats.dist.ret_pkts += nb_ret;
+
+ uint16_t sent = rte_ring_enqueue_burst(out_r,
+ (void *)bufs, nb_ret, NULL);
+ app_stats.dist.sent_pkts += sent;
+ if (unlikely(sent < nb_ret)) {
+ app_stats.dist.enqdrop_pkts += nb_ret - sent;
+ RTE_LOG(DEBUG, DISTRAPP,
+ "%s:Packet loss due to full out ring\n",
+ __func__);
+ while (sent < nb_ret)
+ rte_pktmbuf_free(bufs[sent++]);
+ }
+ }
+ }
+ printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+ quit_signal_work = 1;
+
+ rte_distributor_flush(d);
+ /* Unblock any returns so workers can exit */
+ rte_distributor_clear_returns(d);
+ quit_signal_rx = 1;
+ return 0;
+}
+
+