+void
+mlx5_mprq_buf_free_cb(void *addr __rte_unused, void *opaque)
+{
+ struct mlx5_mprq_buf *buf = opaque;
+
+ if (rte_atomic16_read(&buf->refcnt) == 1) {
+ rte_mempool_put(buf->mp, buf);
+ } else if (rte_atomic16_add_return(&buf->refcnt, -1) == 0) {
+ rte_atomic16_set(&buf->refcnt, 1);
+ rte_mempool_put(buf->mp, buf);
+ }
+}
+
+void
+mlx5_mprq_buf_free(struct mlx5_mprq_buf *buf)
+{
+ mlx5_mprq_buf_free_cb(NULL, buf);
+}
+
+static inline void
+mprq_buf_replace(struct mlx5_rxq_data *rxq, uint16_t rq_idx)
+{
+ struct mlx5_mprq_buf *rep = rxq->mprq_repl;
+ volatile struct mlx5_wqe_data_seg *wqe =
+ &((volatile struct mlx5_wqe_mprq *)rxq->wqes)[rq_idx].dseg;
+ void *addr;
+
+ assert(rep != NULL);
+ /* Replace MPRQ buf. */
+ (*rxq->mprq_bufs)[rq_idx] = rep;
+ /* Replace WQE. */
+ addr = mlx5_mprq_buf_addr(rep);
+ wqe->addr = rte_cpu_to_be_64((uintptr_t)addr);
+ /* If there's only one MR, no need to replace LKey in WQE. */
+ if (unlikely(mlx5_mr_btree_len(&rxq->mr_ctrl.cache_bh) > 1))
+ wqe->lkey = mlx5_rx_addr2mr(rxq, (uintptr_t)addr);
+ /* Stash a mbuf for next replacement. */
+ if (likely(!rte_mempool_get(rxq->mprq_mp, (void **)&rep)))
+ rxq->mprq_repl = rep;
+ else
+ rxq->mprq_repl = NULL;
+}
+
+/**
+ * DPDK callback for RX with Multi-Packet RQ support.
+ *
+ * @param dpdk_rxq
+ * Generic pointer to RX queue structure.
+ * @param[out] pkts
+ * Array to store received packets.
+ * @param pkts_n
+ * Maximum number of packets in array.
+ *
+ * @return
+ * Number of packets successfully received (<= pkts_n).
+ */
+uint16_t
+mlx5_rx_burst_mprq(void *dpdk_rxq, struct rte_mbuf **pkts, uint16_t pkts_n)
+{
+ struct mlx5_rxq_data *rxq = dpdk_rxq;
+ const unsigned int strd_n = 1 << rxq->strd_num_n;
+ const unsigned int strd_sz = 1 << rxq->strd_sz_n;
+ const unsigned int strd_shift =
+ MLX5_MPRQ_STRIDE_SHIFT_BYTE * rxq->strd_shift_en;
+ const unsigned int cq_mask = (1 << rxq->cqe_n) - 1;
+ const unsigned int wq_mask = (1 << rxq->elts_n) - 1;
+ volatile struct mlx5_cqe *cqe = &(*rxq->cqes)[rxq->cq_ci & cq_mask];
+ unsigned int i = 0;
+ uint16_t rq_ci = rxq->rq_ci;
+ uint16_t strd_idx = rxq->strd_ci;
+ struct mlx5_mprq_buf *buf = (*rxq->mprq_bufs)[rq_ci & wq_mask];
+
+ while (i < pkts_n) {
+ struct rte_mbuf *pkt;
+ void *addr;
+ int ret;
+ unsigned int len;
+ uint16_t consumed_strd;
+ uint32_t offset;
+ uint32_t byte_cnt;
+ uint32_t rss_hash_res = 0;
+
+ if (strd_idx == strd_n) {
+ /* Replace WQE only if the buffer is still in use. */
+ if (rte_atomic16_read(&buf->refcnt) > 1) {
+ mprq_buf_replace(rxq, rq_ci & wq_mask);
+ /* Release the old buffer. */
+ mlx5_mprq_buf_free(buf);
+ } else if (unlikely(rxq->mprq_repl == NULL)) {
+ struct mlx5_mprq_buf *rep;
+
+ /*
+ * Currently, the MPRQ mempool is out of buffer
+ * and doing memcpy regardless of the size of Rx
+ * packet. Retry allocation to get back to
+ * normal.
+ */
+ if (!rte_mempool_get(rxq->mprq_mp,
+ (void **)&rep))
+ rxq->mprq_repl = rep;
+ }
+ /* Advance to the next WQE. */
+ strd_idx = 0;
+ ++rq_ci;
+ buf = (*rxq->mprq_bufs)[rq_ci & wq_mask];
+ }
+ cqe = &(*rxq->cqes)[rxq->cq_ci & cq_mask];
+ ret = mlx5_rx_poll_len(rxq, cqe, cq_mask, &rss_hash_res);
+ if (!ret)
+ break;
+ if (unlikely(ret == -1)) {
+ /* RX error, packet is likely too large. */
+ ++rxq->stats.idropped;
+ continue;
+ }
+ byte_cnt = ret;
+ consumed_strd = (byte_cnt & MLX5_MPRQ_STRIDE_NUM_MASK) >>
+ MLX5_MPRQ_STRIDE_NUM_SHIFT;
+ assert(consumed_strd);
+ /* Calculate offset before adding up stride index. */
+ offset = strd_idx * strd_sz + strd_shift;
+ strd_idx += consumed_strd;
+ if (byte_cnt & MLX5_MPRQ_FILLER_MASK)
+ continue;
+ /*
+ * Currently configured to receive a packet per a stride. But if
+ * MTU is adjusted through kernel interface, device could
+ * consume multiple strides without raising an error. In this
+ * case, the packet should be dropped because it is bigger than
+ * the max_rx_pkt_len.
+ */
+ if (unlikely(consumed_strd > 1)) {
+ ++rxq->stats.idropped;
+ continue;
+ }
+ pkt = rte_pktmbuf_alloc(rxq->mp);
+ if (unlikely(pkt == NULL)) {
+ ++rxq->stats.rx_nombuf;
+ break;
+ }
+ len = (byte_cnt & MLX5_MPRQ_LEN_MASK) >> MLX5_MPRQ_LEN_SHIFT;
+ assert((int)len >= (rxq->crc_present << 2));
+ if (rxq->crc_present)
+ len -= ETHER_CRC_LEN;
+ addr = RTE_PTR_ADD(mlx5_mprq_buf_addr(buf), offset);
+ /* Initialize the offload flag. */
+ pkt->ol_flags = 0;
+ /*
+ * Memcpy packets to the target mbuf if:
+ * - The size of packet is smaller than mprq_max_memcpy_len.
+ * - Out of buffer in the Mempool for Multi-Packet RQ.
+ */
+ if (len <= rxq->mprq_max_memcpy_len || rxq->mprq_repl == NULL) {
+ /*
+ * When memcpy'ing packet due to out-of-buffer, the
+ * packet must be smaller than the target mbuf.
+ */
+ if (unlikely(rte_pktmbuf_tailroom(pkt) < len)) {
+ rte_pktmbuf_free_seg(pkt);
+ ++rxq->stats.idropped;
+ continue;
+ }
+ rte_memcpy(rte_pktmbuf_mtod(pkt, void *), addr, len);
+ } else {
+ rte_iova_t buf_iova;
+ struct rte_mbuf_ext_shared_info *shinfo;
+ uint16_t buf_len = consumed_strd * strd_sz;
+
+ /* Increment the refcnt of the whole chunk. */
+ rte_atomic16_add_return(&buf->refcnt, 1);
+ assert((uint16_t)rte_atomic16_read(&buf->refcnt) <=
+ strd_n + 1);
+ addr = RTE_PTR_SUB(addr, RTE_PKTMBUF_HEADROOM);
+ /*
+ * MLX5 device doesn't use iova but it is necessary in a
+ * case where the Rx packet is transmitted via a
+ * different PMD.
+ */
+ buf_iova = rte_mempool_virt2iova(buf) +
+ RTE_PTR_DIFF(addr, buf);
+ shinfo = rte_pktmbuf_ext_shinfo_init_helper(addr,
+ &buf_len, mlx5_mprq_buf_free_cb, buf);
+ /*
+ * EXT_ATTACHED_MBUF will be set to pkt->ol_flags when
+ * attaching the stride to mbuf and more offload flags
+ * will be added below by calling rxq_cq_to_mbuf().
+ * Other fields will be overwritten.
+ */
+ rte_pktmbuf_attach_extbuf(pkt, addr, buf_iova, buf_len,
+ shinfo);
+ rte_pktmbuf_reset_headroom(pkt);
+ assert(pkt->ol_flags == EXT_ATTACHED_MBUF);
+ /*
+ * Prevent potential overflow due to MTU change through
+ * kernel interface.
+ */
+ if (unlikely(rte_pktmbuf_tailroom(pkt) < len)) {
+ rte_pktmbuf_free_seg(pkt);
+ ++rxq->stats.idropped;
+ continue;
+ }
+ }
+ rxq_cq_to_mbuf(rxq, pkt, cqe, rss_hash_res);
+ PKT_LEN(pkt) = len;
+ DATA_LEN(pkt) = len;
+ PORT(pkt) = rxq->port_id;
+#ifdef MLX5_PMD_SOFT_COUNTERS
+ /* Increment bytes counter. */
+ rxq->stats.ibytes += PKT_LEN(pkt);
+#endif
+ /* Return packet. */
+ *(pkts++) = pkt;
+ ++i;
+ }
+ /* Update the consumer indexes. */
+ rxq->strd_ci = strd_idx;
+ rte_io_wmb();
+ *rxq->cq_db = rte_cpu_to_be_32(rxq->cq_ci);
+ if (rq_ci != rxq->rq_ci) {
+ rxq->rq_ci = rq_ci;
+ rte_io_wmb();
+ *rxq->rq_db = rte_cpu_to_be_32(rxq->rq_ci);
+ }
+#ifdef MLX5_PMD_SOFT_COUNTERS
+ /* Increment packets counter. */
+ rxq->stats.ipackets += i;
+#endif
+ return i;
+}
+