net/mlx5: add reference counter on DPDK Tx queues
authorNélio Laranjeiro <nelio.laranjeiro@6wind.com>
Mon, 9 Oct 2017 14:44:48 +0000 (16:44 +0200)
committerFerruh Yigit <ferruh.yigit@intel.com>
Thu, 12 Oct 2017 00:36:58 +0000 (01:36 +0100)
Use the same design for DPDK queue as for Verbs queue for symmetry, this
also helps in fixing some issues like the DPDK release queue API which
is not expected to fail.  With such design, the queue is released when
the reference counters reaches 0.

Signed-off-by: Nelio Laranjeiro <nelio.laranjeiro@6wind.com>
Acked-by: Yongseok Koh <yskoh@mellanox.com>
drivers/net/mlx5/mlx5.c
drivers/net/mlx5/mlx5.h
drivers/net/mlx5/mlx5_mr.c
drivers/net/mlx5/mlx5_rxtx.h
drivers/net/mlx5/mlx5_trigger.c
drivers/net/mlx5/mlx5_txq.c

index bd160c5..276401d 100644 (file)
@@ -225,17 +225,8 @@ mlx5_dev_close(struct rte_eth_dev *dev)
        if (priv->txqs != NULL) {
                /* XXX race condition if mlx5_tx_burst() is still running. */
                usleep(1000);
-               for (i = 0; (i != priv->txqs_n); ++i) {
-                       struct mlx5_txq_data *txq = (*priv->txqs)[i];
-                       struct mlx5_txq_ctrl *txq_ctrl;
-
-                       if (txq == NULL)
-                               continue;
-                       txq_ctrl = container_of(txq, struct mlx5_txq_ctrl, txq);
-                       (*priv->txqs)[i] = NULL;
-                       mlx5_txq_cleanup(txq_ctrl);
-                       rte_free(txq_ctrl);
-               }
+               for (i = 0; (i != priv->txqs_n); ++i)
+                       mlx5_priv_txq_release(priv, i);
                priv->txqs_n = 0;
                priv->txqs = NULL;
        }
@@ -259,6 +250,9 @@ mlx5_dev_close(struct rte_eth_dev *dev)
        ret = mlx5_priv_txq_ibv_verify(priv);
        if (ret)
                WARN("%p: some Verbs Tx queue still remain", (void *)priv);
+       ret = mlx5_priv_txq_verify(priv);
+       if (ret)
+               WARN("%p: some Tx Queues still remain", (void *)priv);
        ret = priv_flow_verify(priv);
        if (ret)
                WARN("%p: some flows still remain", (void *)priv);
index 67d2edb..b20c39c 100644 (file)
@@ -148,6 +148,7 @@ struct priv {
        TAILQ_HEAD(mlx5_flows, rte_flow) flows; /* RTE Flow rules. */
        LIST_HEAD(mr, mlx5_mr) mr; /* Memory region. */
        LIST_HEAD(rxqibv, mlx5_rxq_ibv) rxqsibv; /* Verbs Rx queues. */
+       LIST_HEAD(txq, mlx5_txq_ctrl) txqsctrl; /* DPDK Tx queues. */
        LIST_HEAD(txqibv, mlx5_txq_ibv) txqsibv; /* Verbs Tx queues. */
        uint32_t link_speed_capa; /* Link speed capabilities. */
        struct mlx5_xstats_ctrl xstats_ctrl; /* Extended stats control. */
index 54fdc16..6b29eed 100644 (file)
@@ -117,6 +117,8 @@ static int mlx5_check_mempool(struct rte_mempool *mp, uintptr_t *start,
  *
  * This function should only be called by txq_mp2mr().
  *
+ * @param priv
+ *   Pointer to private structure.
  * @param txq
  *   Pointer to TX queue structure.
  * @param[in] mp
@@ -128,8 +130,8 @@ static int mlx5_check_mempool(struct rte_mempool *mp, uintptr_t *start,
  *   mr on success, NULL on failure.
  */
 struct mlx5_mr*
-mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
-                  unsigned int idx)
+priv_txq_mp2mr_reg(struct priv *priv, struct mlx5_txq_data *txq,
+                  struct rte_mempool *mp, unsigned int idx)
 {
        struct mlx5_txq_ctrl *txq_ctrl =
                container_of(txq, struct mlx5_txq_ctrl, txq);
@@ -138,9 +140,9 @@ mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
        /* Add a new entry, register MR first. */
        DEBUG("%p: discovered new memory pool \"%s\" (%p)",
              (void *)txq_ctrl, mp->name, (void *)mp);
-       mr = priv_mr_get(txq_ctrl->priv, mp);
+       mr = priv_mr_get(priv, mp);
        if (mr == NULL)
-               mr = priv_mr_new(txq_ctrl->priv, mp);
+               mr = priv_mr_new(priv, mp);
        if (unlikely(mr == NULL)) {
                DEBUG("%p: unable to configure MR, ibv_reg_mr() failed.",
                      (void *)txq_ctrl);
@@ -151,7 +153,7 @@ mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
                DEBUG("%p: MR <-> MP table full, dropping oldest entry.",
                      (void *)txq_ctrl);
                --idx;
-               priv_mr_release(txq_ctrl->priv, txq->mp2mr[0]);
+               priv_mr_release(priv, txq->mp2mr[0]);
                memmove(&txq->mp2mr[0], &txq->mp2mr[1],
                        (sizeof(txq->mp2mr) - sizeof(txq->mp2mr[0])));
        }
@@ -163,7 +165,37 @@ mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
        return mr;
 }
 
-struct txq_mp2mr_mbuf_check_data {
+/**
+ * Register a Memory Region (MR) <-> Memory Pool (MP) association in
+ * txq->mp2mr[]. If mp2mr[] is full, remove an entry first.
+ *
+ * This function should only be called by txq_mp2mr().
+ *
+ * @param txq
+ *   Pointer to TX queue structure.
+ * @param[in] mp
+ *   Memory Pool for which a Memory Region lkey must be returned.
+ * @param idx
+ *   Index of the next available entry.
+ *
+ * @return
+ *   mr on success, NULL on failure.
+ */
+struct mlx5_mr*
+mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
+                  unsigned int idx)
+{
+       struct mlx5_txq_ctrl *txq_ctrl =
+               container_of(txq, struct mlx5_txq_ctrl, txq);
+       struct mlx5_mr *mr;
+
+       priv_lock(txq_ctrl->priv);
+       mr = priv_txq_mp2mr_reg(txq_ctrl->priv, txq, mp, idx);
+       priv_unlock(txq_ctrl->priv);
+       return mr;
+}
+
+struct mlx5_mp2mr_mbuf_check_data {
        int ret;
 };
 
@@ -185,7 +217,7 @@ static void
 txq_mp2mr_mbuf_check(struct rte_mempool *mp, void *arg, void *obj,
        uint32_t index __rte_unused)
 {
-       struct txq_mp2mr_mbuf_check_data *data = arg;
+       struct mlx5_mp2mr_mbuf_check_data *data = arg;
        struct rte_mbuf *buf = obj;
 
        /*
@@ -206,35 +238,24 @@ txq_mp2mr_mbuf_check(struct rte_mempool *mp, void *arg, void *obj,
  *   Pointer to TX queue structure.
  */
 void
-mlx5_txq_mp2mr_iter(struct rte_mempool *mp, void *arg)
+mlx5_mp2mr_iter(struct rte_mempool *mp, void *arg)
 {
-       struct mlx5_txq_ctrl *txq_ctrl = arg;
-       struct txq_mp2mr_mbuf_check_data data = {
+       struct priv *priv = (struct priv *)arg;
+       struct mlx5_mp2mr_mbuf_check_data data = {
                .ret = 0,
        };
-       uintptr_t start;
-       uintptr_t end;
-       unsigned int i;
+       struct mlx5_mr *mr;
 
        /* Register mempool only if the first element looks like a mbuf. */
        if (rte_mempool_obj_iter(mp, txq_mp2mr_mbuf_check, &data) == 0 ||
                        data.ret == -1)
                return;
-       if (mlx5_check_mempool(mp, &start, &end) != 0) {
-               ERROR("mempool %p: not virtually contiguous",
-                     (void *)mp);
+       mr = priv_mr_get(priv, mp);
+       if (mr) {
+               priv_mr_release(priv, mr);
                return;
        }
-       for (i = 0; (i != RTE_DIM(txq_ctrl->txq.mp2mr)); ++i) {
-               if (unlikely(txq_ctrl->txq.mp2mr[i] == NULL)) {
-                       /* Unknown MP, add a new MR for it. */
-                       break;
-               }
-               if (start >= (uintptr_t)txq_ctrl->txq.mp2mr[i]->start &&
-                   end <= (uintptr_t)txq_ctrl->txq.mp2mr[i]->end)
-                       return;
-       }
-       mlx5_txq_mp2mr_reg(&txq_ctrl->txq, mp, i);
+       priv_mr_new(priv, mp);
 }
 
 /**
index 30ad363..69344f6 100644 (file)
@@ -297,6 +297,8 @@ struct mlx5_txq_ibv {
 
 /* TX queue control descriptor. */
 struct mlx5_txq_ctrl {
+       LIST_ENTRY(mlx5_txq_ctrl) next; /* Pointer to the next element. */
+       rte_atomic32_t refcnt; /* Reference counter. */
        struct priv *priv; /* Back pointer to private data. */
        unsigned int socket; /* CPU socket ID for allocations. */
        unsigned int max_inline_data; /* Max inline data. */
@@ -336,9 +338,6 @@ int mlx5_priv_rxq_ibv_verify(struct priv *);
 
 /* mlx5_txq.c */
 
-void mlx5_txq_cleanup(struct mlx5_txq_ctrl *);
-int mlx5_txq_ctrl_setup(struct rte_eth_dev *, struct mlx5_txq_ctrl *, uint16_t,
-                       unsigned int, const struct rte_eth_txconf *);
 int mlx5_tx_queue_setup(struct rte_eth_dev *, uint16_t, uint16_t, unsigned int,
                        const struct rte_eth_txconf *);
 void mlx5_tx_queue_release(void *);
@@ -348,6 +347,14 @@ struct mlx5_txq_ibv *mlx5_priv_txq_ibv_get(struct priv *, uint16_t);
 int mlx5_priv_txq_ibv_release(struct priv *, struct mlx5_txq_ibv *);
 int mlx5_priv_txq_ibv_releasable(struct priv *, struct mlx5_txq_ibv *);
 int mlx5_priv_txq_ibv_verify(struct priv *);
+struct mlx5_txq_ctrl *mlx5_priv_txq_new(struct priv *, uint16_t,
+                                       uint16_t, unsigned int,
+                                       const struct rte_eth_txconf *);
+struct mlx5_txq_ctrl *mlx5_priv_txq_get(struct priv *, uint16_t);
+int mlx5_priv_txq_release(struct priv *, uint16_t);
+int mlx5_priv_txq_releasable(struct priv *, uint16_t);
+int mlx5_priv_txq_verify(struct priv *);
+void txq_alloc_elts(struct mlx5_txq_ctrl *);
 
 /* mlx5_rxtx.c */
 
@@ -375,7 +382,9 @@ uint16_t mlx5_rx_burst_vec(void *, struct rte_mbuf **, uint16_t);
 
 /* mlx5_mr.c */
 
-void mlx5_txq_mp2mr_iter(struct rte_mempool *, void *);
+void mlx5_mp2mr_iter(struct rte_mempool *, void *);
+struct mlx5_mr *priv_txq_mp2mr_reg(struct priv *priv, struct mlx5_txq_data *,
+                                  struct rte_mempool *, unsigned int);
 struct mlx5_mr *mlx5_txq_mp2mr_reg(struct mlx5_txq_data *, struct rte_mempool *,
                                   unsigned int);
 
index eeb9585..7a12768 100644 (file)
 #include "mlx5_rxtx.h"
 #include "mlx5_utils.h"
 
+static void
+priv_txq_stop(struct priv *priv)
+{
+       unsigned int i;
+
+       for (i = 0; i != priv->txqs_n; ++i)
+               mlx5_priv_txq_release(priv, i);
+}
+
+static int
+priv_txq_start(struct priv *priv)
+{
+       unsigned int i;
+       int ret = 0;
+
+       /* Add memory regions to Tx queues. */
+       for (i = 0; i != priv->txqs_n; ++i) {
+               unsigned int idx = 0;
+               struct mlx5_mr *mr;
+               struct mlx5_txq_ctrl *txq_ctrl = mlx5_priv_txq_get(priv, i);
+
+               if (!txq_ctrl)
+                       continue;
+               LIST_FOREACH(mr, &priv->mr, next)
+                       priv_txq_mp2mr_reg(priv, &txq_ctrl->txq, mr->mp, idx++);
+               txq_alloc_elts(txq_ctrl);
+               txq_ctrl->ibv = mlx5_priv_txq_ibv_new(priv, i);
+               if (!txq_ctrl->ibv) {
+                       ret = ENOMEM;
+                       goto error;
+               }
+       }
+       return -ret;
+error:
+       priv_txq_stop(priv);
+       return -ret;
+}
+
 /**
  * DPDK callback to start the device.
  *
@@ -56,6 +94,7 @@ int
 mlx5_dev_start(struct rte_eth_dev *dev)
 {
        struct priv *priv = dev->data->dev_private;
+       struct mlx5_mr *mr = NULL;
        int err;
 
        if (mlx5_is_secondary())
@@ -63,9 +102,17 @@ mlx5_dev_start(struct rte_eth_dev *dev)
 
        priv_lock(priv);
        /* Update Rx/Tx callback. */
-       priv_dev_select_tx_function(priv, dev);
        priv_dev_select_rx_function(priv, dev);
        DEBUG("%p: allocating and configuring hash RX queues", (void *)dev);
+       rte_mempool_walk(mlx5_mp2mr_iter, priv);
+       err = priv_txq_start(priv);
+       if (err) {
+               ERROR("%p: TXQ allocation failed: %s",
+                     (void *)dev, strerror(err));
+               goto error;
+       }
+       /* Update send callback. */
+       priv_dev_select_tx_function(priv, dev);
        err = priv_create_hash_rxqs(priv);
        if (!err)
                err = priv_rehash_flows(priv);
@@ -94,10 +141,13 @@ mlx5_dev_start(struct rte_eth_dev *dev)
        return 0;
 error:
        /* Rollback. */
+       LIST_FOREACH(mr, &priv->mr, next)
+               priv_mr_release(priv, mr);
        priv_special_flow_disable_all(priv);
        priv_mac_addrs_disable(priv);
        priv_destroy_hash_rxqs(priv);
        priv_flow_stop(priv);
+       priv_txq_stop(priv);
        priv_unlock(priv);
        return -err;
 }
@@ -114,6 +164,7 @@ void
 mlx5_dev_stop(struct rte_eth_dev *dev)
 {
        struct priv *priv = dev->data->dev_private;
+       struct mlx5_mr *mr;
 
        if (mlx5_is_secondary())
                return;
@@ -131,6 +182,10 @@ mlx5_dev_stop(struct rte_eth_dev *dev)
        priv_destroy_hash_rxqs(priv);
        priv_flow_stop(priv);
        priv_rx_intr_vec_disable(priv);
+       priv_txq_stop(priv);
+       LIST_FOREACH(mr, &priv->mr, next) {
+               priv_mr_release(priv, mr);
+       }
        priv_dev_interrupt_handler_uninstall(priv, dev);
        priv_unlock(priv);
 }
index 3a6ef39..9deaa7e 100644 (file)
  *
  * @param txq_ctrl
  *   Pointer to TX queue structure.
- * @param elts_n
- *   Number of elements to allocate.
  */
-static void
-txq_alloc_elts(struct mlx5_txq_ctrl *txq_ctrl, unsigned int elts_n)
+void
+txq_alloc_elts(struct mlx5_txq_ctrl *txq_ctrl)
 {
+       const unsigned int elts_n = 1 << txq_ctrl->txq.elts_n;
        unsigned int i;
 
        for (i = 0; (i != elts_n); ++i)
@@ -116,152 +115,6 @@ txq_free_elts(struct mlx5_txq_ctrl *txq_ctrl)
        }
 }
 
-/**
- * Clean up a TX queue.
- *
- * Destroy objects, free allocated memory and reset the structure for reuse.
- *
- * @param txq_ctrl
- *   Pointer to TX queue structure.
- */
-void
-mlx5_txq_cleanup(struct mlx5_txq_ctrl *txq_ctrl)
-{
-       size_t i;
-
-       DEBUG("cleaning up %p", (void *)txq_ctrl);
-       txq_free_elts(txq_ctrl);
-       for (i = 0; (i != RTE_DIM(txq_ctrl->txq.mp2mr)); ++i)
-               if (txq_ctrl->txq.mp2mr[i])
-                       priv_mr_release(txq_ctrl->priv, txq_ctrl->txq.mp2mr[i]);
-       if (txq_ctrl->ibv)
-               mlx5_priv_txq_ibv_release(txq_ctrl->priv, txq_ctrl->ibv);
-       memset(txq_ctrl, 0, sizeof(*txq_ctrl));
-}
-
-/**
- * Configure a TX queue.
- *
- * @param dev
- *   Pointer to Ethernet device structure.
- * @param txq_ctrl
- *   Pointer to TX queue structure.
- * @param desc
- *   Number of descriptors to configure in queue.
- * @param socket
- *   NUMA socket on which memory must be allocated.
- * @param[in] conf
- *   Thresholds parameters.
- *
- * @return
- *   0 on success, errno value on failure.
- */
-int
-mlx5_txq_ctrl_setup(struct rte_eth_dev *dev, struct mlx5_txq_ctrl *txq_ctrl,
-                   uint16_t desc, unsigned int socket,
-                   const struct rte_eth_txconf *conf)
-{
-       struct priv *priv = mlx5_get_priv(dev);
-       struct mlx5_txq_ctrl tmpl = {
-               .priv = priv,
-               .socket = socket,
-       };
-       const unsigned int max_tso_inline = ((MLX5_MAX_TSO_HEADER +
-                                            (RTE_CACHE_LINE_SIZE - 1)) /
-                                             RTE_CACHE_LINE_SIZE);
-
-       if (mlx5_getenv_int("MLX5_ENABLE_CQE_COMPRESSION")) {
-               ERROR("MLX5_ENABLE_CQE_COMPRESSION must never be set");
-               return ENOTSUP;
-       }
-       tmpl.txq.flags = conf->txq_flags;
-       assert(desc > MLX5_TX_COMP_THRESH);
-       tmpl.txq.elts_n = log2above(desc);
-       if (priv->mps == MLX5_MPW_ENHANCED)
-               tmpl.txq.mpw_hdr_dseg = priv->mpw_hdr_dseg;
-       /* MRs will be registered in mp2mr[] later. */
-       DEBUG("priv->device_attr.max_qp_wr is %d",
-             priv->device_attr.orig_attr.max_qp_wr);
-       DEBUG("priv->device_attr.max_sge is %d",
-             priv->device_attr.orig_attr.max_sge);
-       if (priv->txq_inline && (priv->txqs_n >= priv->txqs_inline)) {
-               unsigned int ds_cnt;
-
-               tmpl.txq.max_inline =
-                       ((priv->txq_inline + (RTE_CACHE_LINE_SIZE - 1)) /
-                        RTE_CACHE_LINE_SIZE);
-               tmpl.txq.inline_en = 1;
-               /* TSO and MPS can't be enabled concurrently. */
-               assert(!priv->tso || !priv->mps);
-               if (priv->mps == MLX5_MPW_ENHANCED) {
-                       tmpl.txq.inline_max_packet_sz =
-                               priv->inline_max_packet_sz;
-                       /* To minimize the size of data set, avoid requesting
-                        * too large WQ.
-                        */
-                       tmpl.max_inline_data =
-                               ((RTE_MIN(priv->txq_inline,
-                                         priv->inline_max_packet_sz) +
-                                 (RTE_CACHE_LINE_SIZE - 1)) /
-                                RTE_CACHE_LINE_SIZE) * RTE_CACHE_LINE_SIZE;
-               } else if (priv->tso) {
-                       int inline_diff = tmpl.txq.max_inline - max_tso_inline;
-
-                       /*
-                        * Adjust inline value as Verbs aggregates
-                        * tso_inline and txq_inline fields.
-                        */
-                       tmpl.max_inline_data = inline_diff > 0 ?
-                                              inline_diff *
-                                              RTE_CACHE_LINE_SIZE :
-                                              0;
-               } else {
-                       tmpl.max_inline_data =
-                               tmpl.txq.max_inline * RTE_CACHE_LINE_SIZE;
-               }
-               /*
-                * Check if the inline size is too large in a way which
-                * can make the WQE DS to overflow.
-                * Considering in calculation:
-                *      WQE CTRL (1 DS)
-                *      WQE ETH  (1 DS)
-                *      Inline part (N DS)
-                */
-               ds_cnt = 2 + (tmpl.max_inline_data / MLX5_WQE_DWORD_SIZE);
-               if (ds_cnt > MLX5_DSEG_MAX) {
-                       unsigned int max_inline = (MLX5_DSEG_MAX - 2) *
-                                                  MLX5_WQE_DWORD_SIZE;
-
-                       max_inline = max_inline - (max_inline %
-                                                  RTE_CACHE_LINE_SIZE);
-                       WARN("txq inline is too large (%d) setting it to "
-                            "the maximum possible: %d\n",
-                            priv->txq_inline, max_inline);
-                       tmpl.txq.max_inline = max_inline / RTE_CACHE_LINE_SIZE;
-               }
-       }
-       if (priv->tso) {
-               tmpl.max_tso_header = max_tso_inline * RTE_CACHE_LINE_SIZE;
-               tmpl.txq.max_inline = RTE_MAX(tmpl.txq.max_inline,
-                                             max_tso_inline);
-               tmpl.txq.tso_en = 1;
-       }
-       if (priv->tunnel_en)
-               tmpl.txq.tunnel_en = 1;
-       tmpl.txq.elts =
-               (struct rte_mbuf *(*)[1 << tmpl.txq.elts_n])
-               ((uintptr_t)txq_ctrl + sizeof(*txq_ctrl));
-       txq_alloc_elts(&tmpl, desc);
-       /* Clean up txq in case we're reinitializing it. */
-       DEBUG("%p: cleaning-up old txq just in case", (void *)txq_ctrl);
-       mlx5_txq_cleanup(txq_ctrl);
-       *txq_ctrl = tmpl;
-       DEBUG("%p: txq updated with %p", (void *)txq_ctrl, (void *)&tmpl);
-       /* Pre-register known mempools. */
-       rte_mempool_walk(mlx5_txq_mp2mr_iter, txq_ctrl);
-       return 0;
-}
-
 /**
  * DPDK callback to configure a TX queue.
  *
@@ -287,7 +140,7 @@ mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
        struct mlx5_txq_data *txq = (*priv->txqs)[idx];
        struct mlx5_txq_ctrl *txq_ctrl =
                container_of(txq, struct mlx5_txq_ctrl, txq);
-       int ret;
+       int ret = 0;
 
        if (mlx5_is_secondary())
                return -E_RTE_SECONDARY;
@@ -314,57 +167,23 @@ mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
                priv_unlock(priv);
                return -EOVERFLOW;
        }
-       if (txq != NULL) {
-               DEBUG("%p: reusing already allocated queue index %u (%p)",
-                     (void *)dev, idx, (void *)txq);
-               if (dev->data->dev_started) {
-                       priv_unlock(priv);
-                       return -EEXIST;
-               }
-               (*priv->txqs)[idx] = NULL;
-               mlx5_txq_cleanup(txq_ctrl);
-               /* Resize if txq size is changed. */
-               if (txq_ctrl->txq.elts_n != log2above(desc)) {
-                       txq_ctrl = rte_realloc(txq_ctrl,
-                                              sizeof(*txq_ctrl) +
-                                              desc * sizeof(struct rte_mbuf *),
-                                              RTE_CACHE_LINE_SIZE);
-                       if (!txq_ctrl) {
-                               ERROR("%p: unable to reallocate queue index %u",
-                                       (void *)dev, idx);
-                               priv_unlock(priv);
-                               return -ENOMEM;
-                       }
-               }
-       } else {
-               txq_ctrl =
-                       rte_calloc_socket("TXQ", 1,
-                                         sizeof(*txq_ctrl) +
-                                         desc * sizeof(struct rte_mbuf *),
-                                         0, socket);
-               if (txq_ctrl == NULL) {
-                       ERROR("%p: unable to allocate queue index %u",
-                             (void *)dev, idx);
-                       priv_unlock(priv);
-                       return -ENOMEM;
-               }
+       if (!mlx5_priv_txq_releasable(priv, idx)) {
+               ret = EBUSY;
+               ERROR("%p: unable to release queue index %u",
+                     (void *)dev, idx);
+               goto out;
        }
-       ret = mlx5_txq_ctrl_setup(dev, txq_ctrl, desc, socket, conf);
-       if (ret) {
-               rte_free(txq_ctrl);
+       mlx5_priv_txq_release(priv, idx);
+       txq_ctrl = mlx5_priv_txq_new(priv, idx, desc, socket, conf);
+       if (!txq_ctrl) {
+               ERROR("%p: unable to allocate queue index %u",
+                     (void *)dev, idx);
+               ret = ENOMEM;
                goto out;
        }
-       txq_ctrl->txq.stats.idx = idx;
        DEBUG("%p: adding TX queue %p to list",
              (void *)dev, (void *)txq_ctrl);
        (*priv->txqs)[idx] = &txq_ctrl->txq;
-       txq_ctrl->ibv = mlx5_priv_txq_ibv_new(priv, idx);
-       if (!txq_ctrl->ibv) {
-               ret = EAGAIN;
-               goto out;
-       }
-       /* Update send callback. */
-       priv_dev_select_tx_function(priv, priv->dev);
 out:
        priv_unlock(priv);
        return -ret;
@@ -396,11 +215,9 @@ mlx5_tx_queue_release(void *dpdk_txq)
                if ((*priv->txqs)[i] == txq) {
                        DEBUG("%p: removing TX queue %p from list",
                              (void *)priv->dev, (void *)txq_ctrl);
-                       (*priv->txqs)[i] = NULL;
+                       mlx5_priv_txq_release(priv, i);
                        break;
                }
-       mlx5_txq_cleanup(txq_ctrl);
-       rte_free(txq_ctrl);
        priv_unlock(priv);
 }
 
@@ -719,3 +536,248 @@ mlx5_priv_txq_ibv_verify(struct priv *priv)
        }
        return ret;
 }
+
+/**
+ * Create a DPDK Tx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ * @param desc
+ *   Number of descriptors to configure in queue.
+ * @param socket
+ *   NUMA socket on which memory must be allocated.
+ * @param[in] conf
+ *  Thresholds parameters.
+ *
+ * @return
+ *   A DPDK queue object on success.
+ */
+struct mlx5_txq_ctrl*
+mlx5_priv_txq_new(struct priv *priv, uint16_t idx, uint16_t desc,
+                 unsigned int socket,
+                 const struct rte_eth_txconf *conf)
+{
+       const unsigned int max_tso_inline =
+               ((MLX5_MAX_TSO_HEADER + (RTE_CACHE_LINE_SIZE - 1)) /
+                RTE_CACHE_LINE_SIZE);
+       struct mlx5_txq_ctrl *tmpl;
+
+       tmpl = rte_calloc_socket("TXQ", 1,
+                                sizeof(*tmpl) +
+                                desc * sizeof(struct rte_mbuf *),
+                                0, socket);
+       if (!tmpl)
+               return NULL;
+       assert(desc > MLX5_TX_COMP_THRESH);
+       tmpl->txq.flags = conf->txq_flags;
+       tmpl->priv = priv;
+       tmpl->txq.elts_n = log2above(desc);
+       if (priv->mps == MLX5_MPW_ENHANCED)
+               tmpl->txq.mpw_hdr_dseg = priv->mpw_hdr_dseg;
+       /* MRs will be registered in mp2mr[] later. */
+       DEBUG("priv->device_attr.max_qp_wr is %d",
+             priv->device_attr.orig_attr.max_qp_wr);
+       DEBUG("priv->device_attr.max_sge is %d",
+             priv->device_attr.orig_attr.max_sge);
+       if (priv->txq_inline && (priv->txqs_n >= priv->txqs_inline)) {
+               unsigned int ds_cnt;
+
+               tmpl->txq.max_inline =
+                       ((priv->txq_inline + (RTE_CACHE_LINE_SIZE - 1)) /
+                        RTE_CACHE_LINE_SIZE);
+               tmpl->txq.inline_en = 1;
+               /* TSO and MPS can't be enabled concurrently. */
+               assert(!priv->tso || !priv->mps);
+               if (priv->mps == MLX5_MPW_ENHANCED) {
+                       tmpl->txq.inline_max_packet_sz =
+                               priv->inline_max_packet_sz;
+                       /* To minimize the size of data set, avoid requesting
+                        * too large WQ.
+                        */
+                       tmpl->max_inline_data =
+                               ((RTE_MIN(priv->txq_inline,
+                                         priv->inline_max_packet_sz) +
+                                 (RTE_CACHE_LINE_SIZE - 1)) /
+                                RTE_CACHE_LINE_SIZE) * RTE_CACHE_LINE_SIZE;
+               } else if (priv->tso) {
+                       int inline_diff = tmpl->txq.max_inline - max_tso_inline;
+
+                       /*
+                        * Adjust inline value as Verbs aggregates
+                        * tso_inline and txq_inline fields.
+                        */
+                       tmpl->max_inline_data = inline_diff > 0 ?
+                                              inline_diff *
+                                              RTE_CACHE_LINE_SIZE :
+                                              0;
+               } else {
+                       tmpl->max_inline_data =
+                               tmpl->txq.max_inline * RTE_CACHE_LINE_SIZE;
+               }
+               /*
+                * Check if the inline size is too large in a way which
+                * can make the WQE DS to overflow.
+                * Considering in calculation:
+                *      WQE CTRL (1 DS)
+                *      WQE ETH  (1 DS)
+                *      Inline part (N DS)
+                */
+               ds_cnt = 2 + (tmpl->txq.max_inline / MLX5_WQE_DWORD_SIZE);
+               if (ds_cnt > MLX5_DSEG_MAX) {
+                       unsigned int max_inline = (MLX5_DSEG_MAX - 2) *
+                                                 MLX5_WQE_DWORD_SIZE;
+
+                       max_inline = max_inline - (max_inline %
+                                                  RTE_CACHE_LINE_SIZE);
+                       WARN("txq inline is too large (%d) setting it to "
+                            "the maximum possible: %d\n",
+                            priv->txq_inline, max_inline);
+                       tmpl->txq.max_inline = max_inline / RTE_CACHE_LINE_SIZE;
+               }
+       }
+       if (priv->tso) {
+               tmpl->max_tso_header = max_tso_inline * RTE_CACHE_LINE_SIZE;
+               tmpl->txq.max_inline = RTE_MAX(tmpl->txq.max_inline,
+                                              max_tso_inline);
+               tmpl->txq.tso_en = 1;
+       }
+       if (priv->tunnel_en)
+               tmpl->txq.tunnel_en = 1;
+       tmpl->txq.elts =
+               (struct rte_mbuf *(*)[1 << tmpl->txq.elts_n])(tmpl + 1);
+       tmpl->txq.stats.idx = idx;
+       rte_atomic32_inc(&tmpl->refcnt);
+       DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
+             (void *)tmpl, rte_atomic32_read(&tmpl->refcnt));
+       LIST_INSERT_HEAD(&priv->txqsctrl, tmpl, next);
+       return tmpl;
+}
+
+/**
+ * Get a Tx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   A pointer to the queue if it exists.
+ */
+struct mlx5_txq_ctrl*
+mlx5_priv_txq_get(struct priv *priv, uint16_t idx)
+{
+       struct mlx5_txq_ctrl *ctrl = NULL;
+
+       if ((*priv->txqs)[idx]) {
+               ctrl = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl,
+                                   txq);
+               unsigned int i;
+
+               mlx5_priv_txq_ibv_get(priv, idx);
+               for (i = 0; i != MLX5_PMD_TX_MP_CACHE; ++i) {
+                       struct mlx5_mr *mr = NULL;
+
+                       (void)mr;
+                       if (ctrl->txq.mp2mr[i]) {
+                               mr = priv_mr_get(priv, ctrl->txq.mp2mr[i]->mp);
+                               assert(mr);
+                       }
+               }
+               rte_atomic32_inc(&ctrl->refcnt);
+               DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
+                     (void *)ctrl, rte_atomic32_read(&ctrl->refcnt));
+       }
+       return ctrl;
+}
+
+/**
+ * Release a Tx queue.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   0 on success, errno on failure.
+ */
+int
+mlx5_priv_txq_release(struct priv *priv, uint16_t idx)
+{
+       unsigned int i;
+       struct mlx5_txq_ctrl *txq;
+
+       if (!(*priv->txqs)[idx])
+               return 0;
+       txq = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl, txq);
+       DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
+             (void *)txq, rte_atomic32_read(&txq->refcnt));
+       if (txq->ibv) {
+               int ret;
+
+               ret = mlx5_priv_txq_ibv_release(priv, txq->ibv);
+               if (!ret)
+                       txq->ibv = NULL;
+       }
+       for (i = 0; i != MLX5_PMD_TX_MP_CACHE; ++i) {
+               if (txq->txq.mp2mr[i]) {
+                       priv_mr_release(priv, txq->txq.mp2mr[i]);
+                       txq->txq.mp2mr[i] = NULL;
+               }
+       }
+       if (rte_atomic32_dec_and_test(&txq->refcnt)) {
+               txq_free_elts(txq);
+               LIST_REMOVE(txq, next);
+               rte_free(txq);
+               (*priv->txqs)[idx] = NULL;
+               return 0;
+       }
+       return EBUSY;
+}
+
+/**
+ * Verify if the queue can be released.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ * @param idx
+ *   TX queue index.
+ *
+ * @return
+ *   1 if the queue can be released.
+ */
+int
+mlx5_priv_txq_releasable(struct priv *priv, uint16_t idx)
+{
+       struct mlx5_txq_ctrl *txq;
+
+       if (!(*priv->txqs)[idx])
+               return -1;
+       txq = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl, txq);
+       return (rte_atomic32_read(&txq->refcnt) == 1);
+}
+
+/**
+ * Verify the Tx Queue list is empty
+ *
+ * @param priv
+ *  Pointer to private structure.
+ *
+ * @return the number of object not released.
+ */
+int
+mlx5_priv_txq_verify(struct priv *priv)
+{
+       struct mlx5_txq_ctrl *txq;
+       int ret = 0;
+
+       LIST_FOREACH(txq, &priv->txqsctrl, next) {
+               DEBUG("%p: Tx Queue %p still referenced", (void *)priv,
+                     (void *)txq);
+               ++ret;
+       }
+       return ret;
+}