net/dpaa: further push mode optimizations
authorNipun Gupta <nipun.gupta@nxp.com>
Tue, 23 Jan 2018 12:27:07 +0000 (17:57 +0530)
committerThomas Monjalon <thomas@monjalon.net>
Wed, 31 Jan 2018 12:44:56 +0000 (13:44 +0100)
This patch supports batch processing of multiple packets
in the Rx side

Signed-off-by: Nipun Gupta <nipun.gupta@nxp.com>
Signed-off-by: Hemant Agrawal <hemant.agrawal@nxp.com>
drivers/bus/dpaa/base/qbman/qman.c
drivers/bus/dpaa/include/fsl_qman.h
drivers/net/dpaa/dpaa_ethdev.c
drivers/net/dpaa/dpaa_rxtx.c
drivers/net/dpaa/dpaa_rxtx.h

index 4d8bdae..2b97671 100644 (file)
@@ -1055,64 +1055,63 @@ unsigned int qman_portal_poll_rx(unsigned int poll_limit,
                                 void **bufs,
                                 struct qman_portal *p)
 {
-       const struct qm_dqrr_entry *dq;
-       struct qman_fq *fq;
-       enum qman_cb_dqrr_result res;
-       unsigned int limit = 0;
-#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
-       struct qm_dqrr_entry *shadow;
-#endif
-       unsigned int rx_number = 0;
+       struct qm_portal *portal = &p->p;
+       register struct qm_dqrr *dqrr = &portal->dqrr;
+       struct qm_dqrr_entry *dq[QM_DQRR_SIZE], *shadow[QM_DQRR_SIZE];
+       struct qman_fq *fq[QM_DQRR_SIZE];
+       unsigned int limit = 0, rx_number = 0;
+       uint32_t consume = 0;
 
        do {
                qm_dqrr_pvb_update(&p->p);
-               dq = qm_dqrr_current(&p->p);
-               if (unlikely(!dq))
+               if (!dqrr->fill)
                        break;
+
+               dq[rx_number] = dqrr->cursor;
+               dqrr->cursor = DQRR_CARRYCLEAR(dqrr->cursor + 1);
+               /* Prefetch the next DQRR entry */
+               rte_prefetch0(dqrr->cursor);
+
 #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
-       /* If running on an LE system the fields of the
-        * dequeue entry must be swapper.  Because the
-        * QMan HW will ignore writes the DQRR entry is
-        * copied and the index stored within the copy
-        */
-               shadow = &p->shadow_dqrr[DQRR_PTR2IDX(dq)];
-               *shadow = *dq;
-               dq = shadow;
-               shadow->fqid = be32_to_cpu(shadow->fqid);
-               shadow->contextB = be32_to_cpu(shadow->contextB);
-               shadow->seqnum = be16_to_cpu(shadow->seqnum);
-               hw_fd_to_cpu(&shadow->fd);
+               /* If running on an LE system the fields of the
+                * dequeue entry must be swapper.  Because the
+                * QMan HW will ignore writes the DQRR entry is
+                * copied and the index stored within the copy
+                */
+               shadow[rx_number] =
+                       &p->shadow_dqrr[DQRR_PTR2IDX(dq[rx_number])];
+               shadow[rx_number]->fd.opaque_addr =
+                       dq[rx_number]->fd.opaque_addr;
+               shadow[rx_number]->fd.addr =
+                       be40_to_cpu(dq[rx_number]->fd.addr);
+               shadow[rx_number]->fd.opaque =
+                       be32_to_cpu(dq[rx_number]->fd.opaque);
+#else
+               shadow = dq;
 #endif
 
                /* SDQCR: context_b points to the FQ */
 #ifdef CONFIG_FSL_QMAN_FQ_LOOKUP
-               fq = get_fq_table_entry(dq->contextB);
+               fq[rx_number] = qman_fq_lookup_table[be32_to_cpu(
+                                               dq[rx_number]->contextB)];
 #else
-               fq = (void *)(uintptr_t)dq->contextB;
+               fq[rx_number] = (void *)(uintptr_t)be32_to_cpu(dq->contextB);
 #endif
-               /* Now let the callback do its stuff */
-               res = fq->cb.dqrr_dpdk_cb(NULL, p, fq, dq, &bufs[rx_number]);
+               fq[rx_number]->cb.dqrr_prepare(shadow[rx_number],
+                                                &bufs[rx_number]);
+
+               consume |= (1 << (31 - DQRR_PTR2IDX(shadow[rx_number])));
                rx_number++;
-               /* Interpret 'dq' from a driver perspective. */
-               /*
-                * Parking isn't possible unless HELDACTIVE was set. NB,
-                * FORCEELIGIBLE implies HELDACTIVE, so we only need to
-                * check for HELDACTIVE to cover both.
-                */
-               DPAA_ASSERT((dq->stat & QM_DQRR_STAT_FQ_HELDACTIVE) ||
-                           (res != qman_cb_dqrr_park));
-               qm_dqrr_cdc_consume_1ptr(&p->p, dq, res == qman_cb_dqrr_park);
-               /* Move forward */
-               qm_dqrr_next(&p->p);
-               /*
-                * Entry processed and consumed, increment our counter.  The
-                * callback can request that we exit after consuming the
-                * entry, and we also exit if we reach our processing limit,
-                * so loop back only if neither of these conditions is met.
-                */
-       } while (likely(++limit < poll_limit));
+               --dqrr->fill;
+       } while (++limit < poll_limit);
 
-       return limit;
+       if (rx_number)
+               fq[0]->cb.dqrr_dpdk_pull_cb(fq, shadow, bufs, rx_number);
+
+       /* Consume all the DQRR enries together */
+       qm_out(DQRR_DCAP, (1 << 8) | consume);
+
+       return rx_number;
 }
 
 u32 qman_portal_dequeue(struct rte_event ev[], unsigned int poll_limit,
index 99e46e1..e9793f3 100644 (file)
@@ -1131,6 +1131,14 @@ typedef enum qman_cb_dqrr_result (*qman_dpdk_cb_dqrr)(void *event,
                                        const struct qm_dqrr_entry *dqrr,
                                        void **bd);
 
+/* This callback type is used when handling buffers in dpdk pull mode */
+typedef void (*qman_dpdk_pull_cb_dqrr)(struct qman_fq **fq,
+                                       struct qm_dqrr_entry **dqrr,
+                                       void **bufs,
+                                       int num_bufs);
+
+typedef void (*qman_dpdk_cb_prepare)(struct qm_dqrr_entry *dq, void **bufs);
+
 /*
  * This callback type is used when handling ERNs, FQRNs and FQRLs via MR. They
  * are always consumed after the callback returns.
@@ -1191,8 +1199,10 @@ enum qman_fq_state {
 struct qman_fq_cb {
        union { /* for dequeued frames */
                qman_dpdk_cb_dqrr dqrr_dpdk_cb;
+               qman_dpdk_pull_cb_dqrr dqrr_dpdk_pull_cb;
                qman_cb_dqrr dqrr;
        };
+       qman_dpdk_cb_prepare dqrr_prepare;
        qman_cb_mr ern;         /* for s/w ERNs */
        qman_cb_mr fqs;         /* frame-queue state changes*/
 };
index a0978c1..9b69ef4 100644 (file)
@@ -503,7 +503,11 @@ int dpaa_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
                                   QM_FQCTRL_CTXASTASHING |
                                   QM_FQCTRL_PREFERINCACHE;
                opts.fqd.context_a.stashing.exclusive = 0;
-               opts.fqd.context_a.stashing.annotation_cl =
+               /* In muticore scenario stashing becomes a bottleneck on LS1046.
+                * So do not enable stashing in this case
+                */
+               if (dpaa_svr_family != SVR_LS1046A_FAMILY)
+                       opts.fqd.context_a.stashing.annotation_cl =
                                                DPAA_IF_RX_ANNOTATION_STASH;
                opts.fqd.context_a.stashing.data_cl = DPAA_IF_RX_DATA_STASH;
                opts.fqd.context_a.stashing.context_cl =
@@ -526,7 +530,8 @@ int dpaa_eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_idx,
                if (ret)
                        DPAA_PMD_ERR("Channel/Queue association failed. fqid %d"
                                     " ret: %d", rxq->fqid, ret);
-               rxq->cb.dqrr_dpdk_cb = dpaa_rx_cb;
+               rxq->cb.dqrr_dpdk_pull_cb = dpaa_rx_cb;
+               rxq->cb.dqrr_prepare = dpaa_rx_cb_prepare;
                rxq->is_static = true;
        }
        dev->data->rx_queues[queue_idx] = rxq;
index f969ccf..0dea8e7 100644 (file)
@@ -399,17 +399,80 @@ dpaa_eth_fd_to_mbuf(const struct qm_fd *fd, uint32_t ifid)
        return mbuf;
 }
 
-enum qman_cb_dqrr_result dpaa_rx_cb(void *event __always_unused,
-                                   struct qman_portal *qm __always_unused,
-                                   struct qman_fq *fq,
-                                   const struct qm_dqrr_entry *dqrr,
-                                   void **bufs)
+void
+dpaa_rx_cb(struct qman_fq **fq, struct qm_dqrr_entry **dqrr,
+          void **bufs, int num_bufs)
 {
-       const struct qm_fd *fd = &dqrr->fd;
+       struct rte_mbuf *mbuf;
+       struct dpaa_bp_info *bp_info;
+       const struct qm_fd *fd;
+       void *ptr;
+       struct dpaa_if *dpaa_intf;
+       uint16_t offset, i;
+       uint32_t length;
+       uint8_t format;
+
+       if (dpaa_svr_family != SVR_LS1046A_FAMILY) {
+               bp_info = DPAA_BPID_TO_POOL_INFO(dqrr[0]->fd.bpid);
+               ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dqrr[0]->fd));
+               rte_prefetch0((void *)((uint8_t *)ptr + DEFAULT_RX_ICEOF));
+               bufs[0] = (struct rte_mbuf *)((char *)ptr -
+                               bp_info->meta_data_size);
+       }
 
-       *bufs = dpaa_eth_fd_to_mbuf(fd,
-                       ((struct dpaa_if *)fq->dpaa_intf)->ifid);
-       return qman_cb_dqrr_consume;
+       for (i = 0; i < num_bufs; i++) {
+               if (dpaa_svr_family != SVR_LS1046A_FAMILY &&
+                   i < num_bufs - 1) {
+                       bp_info = DPAA_BPID_TO_POOL_INFO(dqrr[i + 1]->fd.bpid);
+                       ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dqrr[i + 1]->fd));
+                       rte_prefetch0((void *)((uint8_t *)ptr +
+                                       DEFAULT_RX_ICEOF));
+                       bufs[i + 1] = (struct rte_mbuf *)((char *)ptr -
+                                       bp_info->meta_data_size);
+               }
+
+               fd = &dqrr[i]->fd;
+               dpaa_intf = fq[i]->dpaa_intf;
+
+               format = (fd->opaque & DPAA_FD_FORMAT_MASK) >>
+                               DPAA_FD_FORMAT_SHIFT;
+               if (unlikely(format == qm_fd_sg)) {
+                       bufs[i] = dpaa_eth_sg_to_mbuf(fd, dpaa_intf->ifid);
+                       continue;
+               }
+
+               offset = (fd->opaque & DPAA_FD_OFFSET_MASK) >>
+                               DPAA_FD_OFFSET_SHIFT;
+               length = fd->opaque & DPAA_FD_LENGTH_MASK;
+
+               mbuf = bufs[i];
+               mbuf->data_off = offset;
+               mbuf->data_len = length;
+               mbuf->pkt_len = length;
+               mbuf->port = dpaa_intf->ifid;
+
+               mbuf->nb_segs = 1;
+               mbuf->ol_flags = 0;
+               mbuf->next = NULL;
+               rte_mbuf_refcnt_set(mbuf, 1);
+               dpaa_eth_packet_info(mbuf, (uint64_t)mbuf->buf_addr);
+       }
+}
+
+void dpaa_rx_cb_prepare(struct qm_dqrr_entry *dq, void **bufs)
+{
+       struct dpaa_bp_info *bp_info = DPAA_BPID_TO_POOL_INFO(dq->fd.bpid);
+       void *ptr = rte_dpaa_mem_ptov(qm_fd_addr(&dq->fd));
+
+       /* In case of LS1046, annotation stashing is disabled due to L2 cache
+        * being bottleneck in case of multicore scanario for this platform.
+        * So we prefetch the annoation beforehand, so that it is available
+        * in cache when accessed.
+        */
+       if (dpaa_svr_family == SVR_LS1046A_FAMILY)
+               rte_prefetch0((void *)((uint8_t *)ptr + DEFAULT_RX_ICEOF));
+
+       *bufs = (struct rte_mbuf *)((char *)ptr - bp_info->meta_data_size);
 }
 
 static uint16_t
index 29d8f95..d3e6351 100644 (file)
@@ -268,9 +268,8 @@ int dpaa_eth_mbuf_to_sg_fd(struct rte_mbuf *mbuf,
                           struct qm_fd *fd,
                           uint32_t bpid);
 
-enum qman_cb_dqrr_result dpaa_rx_cb(void *event,
-                                   struct qman_portal *qm,
-                                   struct qman_fq *fq,
-                                   const struct qm_dqrr_entry *dqrr,
-                                   void **bd);
+void dpaa_rx_cb(struct qman_fq **fq,
+               struct qm_dqrr_entry **dqrr, void **bufs, int num_bufs);
+
+void dpaa_rx_cb_prepare(struct qm_dqrr_entry *dq, void **bufs);
 #endif