+#define RECV_FILE_LEN 30
+static int
+start_polling_recv_file(void *param)
+{
+ struct rte_rawdev_buf *pkts_recv[NTB_MAX_PKT_BURST];
+ struct ntb_fwd_lcore_conf *conf = param;
+ struct rte_mbuf *mbuf;
+ char filepath[RECV_FILE_LEN];
+ uint64_t val, size, file_len;
+ uint16_t nb_rx, i, file_no;
+ size_t queue_id = 0;
+ FILE *file;
+ int ret;
+
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ pkts_recv[i] = (struct rte_rawdev_buf *)
+ malloc(sizeof(struct rte_rawdev_buf));
+
+ file_no = 0;
+ while (!conf->stopped) {
+ snprintf(filepath, RECV_FILE_LEN, "ntb_recv_file%d", file_no);
+ file = fopen(filepath, "w");
+ if (file == NULL) {
+ printf("Fail to open the file.\n");
+ return -EINVAL;
+ }
+
+ rte_rawdev_get_attr(dev_id, "spad_user_0", &val);
+ size = val << 32;
+ rte_rawdev_get_attr(dev_id, "spad_user_1", &val);
+ size |= val;
+
+ if (!size) {
+ fclose(file);
+ continue;
+ }
+
+ file_len = 0;
+ nb_rx = NTB_MAX_PKT_BURST;
+ while (file_len < size && !conf->stopped) {
+ ret = rte_rawdev_dequeue_buffers(dev_id, pkts_recv,
+ pkt_burst, (void *)queue_id);
+ if (ret < 0) {
+ printf("Dequeue failed with err %d\n", ret);
+ fclose(file);
+ goto clean;
+ }
+ nb_rx = ret;
+ ntb_port_stats[0].rx += nb_rx;
+ for (i = 0; i < nb_rx; i++) {
+ mbuf = pkts_recv[i]->buf_addr;
+ fwrite(rte_pktmbuf_mtod(mbuf, void *), 1,
+ mbuf->data_len, file);
+ file_len += mbuf->data_len;
+ rte_pktmbuf_free(mbuf);
+ pkts_recv[i]->buf_addr = NULL;
+ }
+ }
+
+ printf("Received file (size: %" PRIu64 ") from peer to %s.\n",
+ size, filepath);
+ fclose(file);
+ file_no++;
+ }
+
+clean:
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ free(pkts_recv[i]);
+ return 0;
+}
+
+static int
+start_iofwd_per_lcore(void *param)
+{
+ struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+ struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
+ struct ntb_fwd_lcore_conf *conf = param;
+ struct ntb_fwd_stream fs;
+ uint16_t nb_rx, nb_tx;
+ int i, j, ret;
+
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ ntb_buf[i] = (struct rte_rawdev_buf *)
+ malloc(sizeof(struct rte_rawdev_buf));
+
+ while (!conf->stopped) {
+ for (i = 0; i < conf->nb_stream; i++) {
+ fs = fwd_streams[conf->stream_id + i];
+ if (fs.tx_ntb) {
+ nb_rx = rte_eth_rx_burst(fs.rx_port,
+ fs.qp_id, pkts_burst,
+ pkt_burst);
+ if (unlikely(nb_rx == 0))
+ continue;
+ for (j = 0; j < nb_rx; j++)
+ ntb_buf[j]->buf_addr = pkts_burst[j];
+ ret = rte_rawdev_enqueue_buffers(fs.tx_port,
+ ntb_buf, nb_rx,
+ (void *)(size_t)fs.qp_id);
+ if (ret < 0) {
+ printf("Enqueue failed with err %d\n",
+ ret);
+ for (j = 0; j < nb_rx; j++)
+ rte_pktmbuf_free(pkts_burst[j]);
+ goto clean;
+ }
+ nb_tx = ret;
+ ntb_port_stats[0].tx += nb_tx;
+ ntb_port_stats[1].rx += nb_rx;
+ } else {
+ ret = rte_rawdev_dequeue_buffers(fs.rx_port,
+ ntb_buf, pkt_burst,
+ (void *)(size_t)fs.qp_id);
+ if (ret < 0) {
+ printf("Dequeue failed with err %d\n",
+ ret);
+ goto clean;
+ }
+ nb_rx = ret;
+ if (unlikely(nb_rx == 0))
+ continue;
+ for (j = 0; j < nb_rx; j++)
+ pkts_burst[j] = ntb_buf[j]->buf_addr;
+ nb_tx = rte_eth_tx_burst(fs.tx_port,
+ fs.qp_id, pkts_burst, nb_rx);
+ ntb_port_stats[1].tx += nb_tx;
+ ntb_port_stats[0].rx += nb_rx;
+ }
+ if (unlikely(nb_tx < nb_rx)) {
+ do {
+ rte_pktmbuf_free(pkts_burst[nb_tx]);
+ } while (++nb_tx < nb_rx);
+ }
+ }
+ }
+
+clean:
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ free(ntb_buf[i]);
+
+ return 0;
+}
+
+static int
+start_rxonly_per_lcore(void *param)
+{
+ struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+ struct ntb_fwd_lcore_conf *conf = param;
+ struct ntb_fwd_stream fs;
+ uint16_t nb_rx;
+ int i, j, ret;
+
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ ntb_buf[i] = (struct rte_rawdev_buf *)
+ malloc(sizeof(struct rte_rawdev_buf));
+
+ while (!conf->stopped) {
+ for (i = 0; i < conf->nb_stream; i++) {
+ fs = fwd_streams[conf->stream_id + i];
+ ret = rte_rawdev_dequeue_buffers(fs.rx_port,
+ ntb_buf, pkt_burst, (void *)(size_t)fs.qp_id);
+ if (ret < 0) {
+ printf("Dequeue failed with err %d\n", ret);
+ goto clean;
+ }
+ nb_rx = ret;
+ if (unlikely(nb_rx == 0))
+ continue;
+ ntb_port_stats[0].rx += nb_rx;
+
+ for (j = 0; j < nb_rx; j++)
+ rte_pktmbuf_free(ntb_buf[j]->buf_addr);
+ }
+ }
+
+clean:
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ free(ntb_buf[i]);
+
+ return 0;
+}
+
+
+static int
+start_txonly_per_lcore(void *param)
+{
+ struct rte_rawdev_buf *ntb_buf[NTB_MAX_PKT_BURST];
+ struct rte_mbuf *pkts_burst[NTB_MAX_PKT_BURST];
+ struct ntb_fwd_lcore_conf *conf = param;
+ struct ntb_fwd_stream fs;
+ uint16_t nb_pkt, nb_tx;
+ int i, j, ret;
+
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ ntb_buf[i] = (struct rte_rawdev_buf *)
+ malloc(sizeof(struct rte_rawdev_buf));
+
+ while (!conf->stopped) {
+ for (i = 0; i < conf->nb_stream; i++) {
+ fs = fwd_streams[conf->stream_id + i];
+ if (rte_mempool_get_bulk(mbuf_pool, (void **)pkts_burst,
+ pkt_burst) == 0) {
+ for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
+ pkts_burst[nb_pkt]->port = dev_id;
+ pkts_burst[nb_pkt]->data_len =
+ pkts_burst[nb_pkt]->buf_len -
+ RTE_PKTMBUF_HEADROOM;
+ pkts_burst[nb_pkt]->pkt_len =
+ pkts_burst[nb_pkt]->data_len;
+ ntb_buf[nb_pkt]->buf_addr =
+ pkts_burst[nb_pkt];
+ }
+ } else {
+ for (nb_pkt = 0; nb_pkt < pkt_burst; nb_pkt++) {
+ pkts_burst[nb_pkt] =
+ rte_pktmbuf_alloc(mbuf_pool);
+ if (pkts_burst[nb_pkt] == NULL)
+ break;
+ pkts_burst[nb_pkt]->port = dev_id;
+ pkts_burst[nb_pkt]->data_len =
+ pkts_burst[nb_pkt]->buf_len -
+ RTE_PKTMBUF_HEADROOM;
+ pkts_burst[nb_pkt]->pkt_len =
+ pkts_burst[nb_pkt]->data_len;
+ ntb_buf[nb_pkt]->buf_addr =
+ pkts_burst[nb_pkt];
+ }
+ }
+ ret = rte_rawdev_enqueue_buffers(fs.tx_port, ntb_buf,
+ nb_pkt, (void *)(size_t)fs.qp_id);
+ if (ret < 0) {
+ printf("Enqueue failed with err %d\n", ret);
+ for (j = 0; j < nb_pkt; j++)
+ rte_pktmbuf_free(pkts_burst[j]);
+ goto clean;
+ }
+ nb_tx = ret;
+ ntb_port_stats[0].tx += nb_tx;
+ if (unlikely(nb_tx < nb_pkt)) {
+ do {
+ rte_pktmbuf_free(pkts_burst[nb_tx]);
+ } while (++nb_tx < nb_pkt);
+ }
+ }
+ }
+
+clean:
+ for (i = 0; i < NTB_MAX_PKT_BURST; i++)
+ free(ntb_buf[i]);
+
+ return 0;
+}
+
+static int
+ntb_fwd_config_setup(void)
+{
+ uint16_t i;
+
+ /* Make sure iofwd has valid ethdev. */
+ if (fwd_mode == IOFWD && eth_port_id >= RTE_MAX_ETHPORTS) {
+ printf("No ethdev, cannot be in iofwd mode.");
+ return -EINVAL;
+ }
+
+ if (fwd_mode == IOFWD) {
+ fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+ sizeof(struct ntb_fwd_stream) * num_queues * 2,
+ RTE_CACHE_LINE_SIZE);
+ for (i = 0; i < num_queues; i++) {
+ fwd_streams[i * 2].qp_id = i;
+ fwd_streams[i * 2].tx_port = dev_id;
+ fwd_streams[i * 2].rx_port = eth_port_id;
+ fwd_streams[i * 2].tx_ntb = 1;
+
+ fwd_streams[i * 2 + 1].qp_id = i;
+ fwd_streams[i * 2 + 1].tx_port = eth_port_id;
+ fwd_streams[i * 2 + 1].rx_port = dev_id;
+ fwd_streams[i * 2 + 1].tx_ntb = 0;
+ }
+ return 0;
+ }
+
+ if (fwd_mode == RXONLY || fwd_mode == FILE_TRANS) {
+ /* Only support 1 queue in file-trans for in order. */
+ if (fwd_mode == FILE_TRANS)
+ num_queues = 1;
+
+ fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+ sizeof(struct ntb_fwd_stream) * num_queues,
+ RTE_CACHE_LINE_SIZE);
+ for (i = 0; i < num_queues; i++) {
+ fwd_streams[i].qp_id = i;
+ fwd_streams[i].tx_port = RTE_MAX_ETHPORTS;
+ fwd_streams[i].rx_port = dev_id;
+ fwd_streams[i].tx_ntb = 0;
+ }
+ return 0;
+ }
+
+ if (fwd_mode == TXONLY) {
+ fwd_streams = rte_zmalloc("ntb_fwd: fwd_streams",
+ sizeof(struct ntb_fwd_stream) * num_queues,
+ RTE_CACHE_LINE_SIZE);
+ for (i = 0; i < num_queues; i++) {
+ fwd_streams[i].qp_id = i;
+ fwd_streams[i].tx_port = dev_id;
+ fwd_streams[i].rx_port = RTE_MAX_ETHPORTS;
+ fwd_streams[i].tx_ntb = 1;
+ }
+ }
+ return 0;
+}