net/netvsc: fix crash during Tx
[dpdk.git] / drivers / net / netvsc / hn_rxtx.c
index 622a839..0428c58 100644 (file)
@@ -18,6 +18,7 @@
 #include <rte_memzone.h>
 #include <rte_malloc.h>
 #include <rte_atomic.h>
+#include <rte_bitmap.h>
 #include <rte_branch_prediction.h>
 #include <rte_ether.h>
 #include <rte_common.h>
@@ -83,7 +84,7 @@ struct hn_txdesc {
        struct rte_mbuf *m;
 
        uint16_t        queue_id;
-       uint16_t        chim_index;
+       uint32_t        chim_index;
        uint32_t        chim_size;
        uint32_t        data_size;
        uint32_t        packets;
@@ -98,17 +99,19 @@ struct hn_txdesc {
         RNDIS_PKTINFO_SIZE(NDIS_LSO2_INFO_SIZE) +      \
         RNDIS_PKTINFO_SIZE(NDIS_TXCSUM_INFO_SIZE))
 
+#define HN_RNDIS_PKT_ALIGNED   RTE_ALIGN(HN_RNDIS_PKT_LEN, RTE_CACHE_LINE_SIZE)
+
 /* Minimum space required for a packet */
 #define HN_PKTSIZE_MIN(align) \
-       RTE_ALIGN(ETHER_MIN_LEN + HN_RNDIS_PKT_LEN, align)
+       RTE_ALIGN(RTE_ETHER_MIN_LEN + HN_RNDIS_PKT_LEN, align)
 
-#define DEFAULT_TX_FREE_THRESH 32U
+#define DEFAULT_TX_FREE_THRESH 32
 
 static void
 hn_update_packet_stats(struct hn_stats *stats, const struct rte_mbuf *m)
 {
        uint32_t s = m->pkt_len;
-       const struct ether_addr *ea;
+       const struct rte_ether_addr *ea;
 
        if (s == 64) {
                stats->size_bins[1]++;
@@ -123,13 +126,13 @@ hn_update_packet_stats(struct hn_stats *stats, const struct rte_mbuf *m)
                        stats->size_bins[0]++;
                else if (s < 1519)
                        stats->size_bins[6]++;
-               else if (s >= 1519)
+               else
                        stats->size_bins[7]++;
        }
 
-       ea = rte_pktmbuf_mtod(m, const struct ether_addr *);
-       if (is_multicast_ether_addr(ea)) {
-               if (is_broadcast_ether_addr(ea))
+       ea = rte_pktmbuf_mtod(m, const struct rte_ether_addr *);
+       if (rte_is_multicast_ether_addr(ea)) {
+               if (rte_is_broadcast_ether_addr(ea))
                        stats->broadcast++;
                else
                        stats->multicast++;
@@ -150,55 +153,80 @@ hn_rndis_pktmsg_offset(uint32_t ofs)
 static void hn_txd_init(struct rte_mempool *mp __rte_unused,
                        void *opaque, void *obj, unsigned int idx)
 {
+       struct hn_tx_queue *txq = opaque;
        struct hn_txdesc *txd = obj;
-       struct rte_eth_dev *dev = opaque;
-       struct rndis_packet_msg *pkt;
 
        memset(txd, 0, sizeof(*txd));
-       txd->chim_index = idx;
-
-       pkt = rte_malloc_socket("RNDIS_TX", HN_RNDIS_PKT_LEN,
-                               rte_align32pow2(HN_RNDIS_PKT_LEN),
-                               dev->device->numa_node);
-       if (!pkt)
-               rte_exit(EXIT_FAILURE, "can not allocate RNDIS header");
 
-       txd->rndis_pkt = pkt;
+       txd->queue_id = txq->queue_id;
+       txd->chim_index = NVS_CHIM_IDX_INVALID;
+       txd->rndis_pkt = (struct rndis_packet_msg *)(char *)txq->tx_rndis
+               + idx * HN_RNDIS_PKT_ALIGNED;
 }
 
-/*
- * Unlike Linux and FreeBSD, this driver uses a mempool
- * to limit outstanding transmits and reserve buffers
- */
 int
-hn_tx_pool_init(struct rte_eth_dev *dev)
+hn_chim_init(struct rte_eth_dev *dev)
 {
        struct hn_data *hv = dev->data->dev_private;
-       char name[RTE_MEMPOOL_NAMESIZE];
-       struct rte_mempool *mp;
+       uint32_t i, chim_bmp_size;
+
+       rte_spinlock_init(&hv->chim_lock);
+       chim_bmp_size = rte_bitmap_get_memory_footprint(hv->chim_cnt);
+       hv->chim_bmem = rte_zmalloc("hn_chim_bitmap", chim_bmp_size,
+                                   RTE_CACHE_LINE_SIZE);
+       if (hv->chim_bmem == NULL) {
+               PMD_INIT_LOG(ERR, "failed to allocate bitmap size %u",
+                            chim_bmp_size);
+               return -1;
+       }
 
-       snprintf(name, sizeof(name),
-                "hn_txd_%u", dev->data->port_id);
-
-       PMD_INIT_LOG(DEBUG, "create a TX send pool %s n=%u size=%zu socket=%d",
-                    name, hv->chim_cnt, sizeof(struct hn_txdesc),
-                    dev->device->numa_node);
-
-       mp = rte_mempool_create(name, hv->chim_cnt, sizeof(struct hn_txdesc),
-                               HN_TXD_CACHE_SIZE, 0,
-                               NULL, NULL,
-                               hn_txd_init, dev,
-                               dev->device->numa_node, 0);
-       if (!mp) {
-               PMD_DRV_LOG(ERR,
-                           "mempool %s create failed: %d", name, rte_errno);
-               return -rte_errno;
+       hv->chim_bmap = rte_bitmap_init(hv->chim_cnt,
+                                       hv->chim_bmem, chim_bmp_size);
+       if (hv->chim_bmap == NULL) {
+               PMD_INIT_LOG(ERR, "failed to init chim bitmap");
+               return -1;
        }
 
-       hv->tx_pool = mp;
+       for (i = 0; i < hv->chim_cnt; i++)
+               rte_bitmap_set(hv->chim_bmap, i);
+
        return 0;
 }
 
+void
+hn_chim_uninit(struct rte_eth_dev *dev)
+{
+       struct hn_data *hv = dev->data->dev_private;
+
+       rte_bitmap_free(hv->chim_bmap);
+       rte_free(hv->chim_bmem);
+       hv->chim_bmem = NULL;
+}
+
+static uint32_t hn_chim_alloc(struct hn_data *hv)
+{
+       uint32_t index = NVS_CHIM_IDX_INVALID;
+       uint64_t slab;
+
+       rte_spinlock_lock(&hv->chim_lock);
+       if (rte_bitmap_scan(hv->chim_bmap, &index, &slab))
+               rte_bitmap_clear(hv->chim_bmap, index);
+       rte_spinlock_unlock(&hv->chim_lock);
+
+       return index;
+}
+
+static void hn_chim_free(struct hn_data *hv, uint32_t chim_idx)
+{
+       if (chim_idx >= hv->chim_cnt) {
+               PMD_DRV_LOG(ERR, "Invalid chimney index %u", chim_idx);
+       } else {
+               rte_spinlock_lock(&hv->chim_lock);
+               rte_bitmap_set(hv->chim_bmap, chim_idx);
+               rte_spinlock_unlock(&hv->chim_lock);
+       }
+}
+
 static void hn_reset_txagg(struct hn_tx_queue *txq)
 {
        txq->agg_szleft = txq->agg_szmax;
@@ -209,15 +237,16 @@ static void hn_reset_txagg(struct hn_tx_queue *txq)
 
 int
 hn_dev_tx_queue_setup(struct rte_eth_dev *dev,
-                     uint16_t queue_idx, uint16_t nb_desc __rte_unused,
+                     uint16_t queue_idx, uint16_t nb_desc,
                      unsigned int socket_id,
                      const struct rte_eth_txconf *tx_conf)
 
 {
        struct hn_data *hv = dev->data->dev_private;
        struct hn_tx_queue *txq;
+       char name[RTE_MEMPOOL_NAMESIZE];
        uint32_t tx_free_thresh;
-       int err;
+       int err = -ENOMEM;
 
        PMD_INIT_FUNC_TRACE();
 
@@ -233,14 +262,42 @@ hn_dev_tx_queue_setup(struct rte_eth_dev *dev,
 
        tx_free_thresh = tx_conf->tx_free_thresh;
        if (tx_free_thresh == 0)
-               tx_free_thresh = RTE_MIN(hv->chim_cnt / 4,
+               tx_free_thresh = RTE_MIN(nb_desc / 4,
                                         DEFAULT_TX_FREE_THRESH);
 
-       if (tx_free_thresh >= hv->chim_cnt - 3)
-               tx_free_thresh = hv->chim_cnt - 3;
+       if (tx_free_thresh + 3 >= nb_desc) {
+               PMD_INIT_LOG(ERR,
+                            "tx_free_thresh must be less than the number of TX entries minus 3(%u)."
+                            " (tx_free_thresh=%u port=%u queue=%u)\n",
+                            nb_desc - 3,
+                            tx_free_thresh, dev->data->port_id, queue_idx);
+               return -EINVAL;
+       }
 
        txq->free_thresh = tx_free_thresh;
 
+       snprintf(name, sizeof(name),
+                "hn_txd_%u_%u", dev->data->port_id, queue_idx);
+
+       PMD_INIT_LOG(DEBUG, "TX descriptor pool %s n=%u size=%zu",
+                    name, nb_desc, sizeof(struct hn_txdesc));
+
+       txq->tx_rndis = rte_calloc("hn_txq_rndis", nb_desc,
+                                  HN_RNDIS_PKT_ALIGNED, RTE_CACHE_LINE_SIZE);
+       if (txq->tx_rndis == NULL)
+               goto error;
+
+       txq->txdesc_pool = rte_mempool_create(name, nb_desc,
+                                             sizeof(struct hn_txdesc),
+                                             0, 0, NULL, NULL,
+                                             hn_txd_init, txq,
+                                             dev->device->numa_node, 0);
+       if (txq->txdesc_pool == NULL) {
+               PMD_DRV_LOG(ERR,
+                           "mempool %s create failed: %d", name, rte_errno);
+               goto error;
+       }
+
        txq->agg_szmax  = RTE_MIN(hv->chim_szmax, hv->rndis_agg_size);
        txq->agg_pktmax = hv->rndis_agg_pkts;
        txq->agg_align  = hv->rndis_agg_align;
@@ -249,38 +306,97 @@ hn_dev_tx_queue_setup(struct rte_eth_dev *dev,
 
        err = hn_vf_tx_queue_setup(dev, queue_idx, nb_desc,
                                     socket_id, tx_conf);
-       if (err) {
-               rte_free(txq);
-               return err;
+       if (err == 0) {
+               dev->data->tx_queues[queue_idx] = txq;
+               return 0;
        }
 
-       dev->data->tx_queues[queue_idx] = txq;
-       return 0;
+error:
+       if (txq->txdesc_pool)
+               rte_mempool_free(txq->txdesc_pool);
+       rte_free(txq->tx_rndis);
+       rte_free(txq);
+       return err;
+}
+
+void
+hn_dev_tx_queue_info(struct rte_eth_dev *dev, uint16_t queue_id,
+                    struct rte_eth_txq_info *qinfo)
+{
+       struct hn_tx_queue *txq = dev->data->tx_queues[queue_id];
+
+       qinfo->nb_desc = txq->txdesc_pool->size;
+       qinfo->conf.offloads = dev->data->dev_conf.txmode.offloads;
+}
+
+static struct hn_txdesc *hn_txd_get(struct hn_tx_queue *txq)
+{
+       struct hn_txdesc *txd;
+
+       if (rte_mempool_get(txq->txdesc_pool, (void **)&txd)) {
+               ++txq->stats.ring_full;
+               PMD_TX_LOG(DEBUG, "tx pool exhausted!");
+               return NULL;
+       }
+
+       txd->m = NULL;
+       txd->packets = 0;
+       txd->data_size = 0;
+       txd->chim_size = 0;
+
+       return txd;
+}
+
+static void hn_txd_put(struct hn_tx_queue *txq, struct hn_txdesc *txd)
+{
+       rte_mempool_put(txq->txdesc_pool, txd);
 }
 
 void
 hn_dev_tx_queue_release(void *arg)
 {
        struct hn_tx_queue *txq = arg;
-       struct hn_txdesc *txd;
 
        PMD_INIT_FUNC_TRACE();
 
        if (!txq)
                return;
 
-       /* If any pending data is still present just drop it */
-       txd = txq->agg_txd;
-       if (txd)
-               rte_mempool_put(txq->hv->tx_pool, txd);
+       if (txq->txdesc_pool)
+               rte_mempool_free(txq->txdesc_pool);
 
+       rte_free(txq->tx_rndis);
        rte_free(txq);
 }
 
+/*
+ * Check the status of a Tx descriptor in the queue.
+ *
+ * returns:
+ *  - -EINVAL              - offset outside of tx_descriptor pool.
+ *  - RTE_ETH_TX_DESC_FULL - descriptor is not acknowledged by host.
+ *  - RTE_ETH_TX_DESC_DONE - descriptor is available.
+ */
+int hn_dev_tx_descriptor_status(void *arg, uint16_t offset)
+{
+       const struct hn_tx_queue *txq = arg;
+
+       hn_process_events(txq->hv, txq->queue_id, 0);
+
+       if (offset >= rte_mempool_avail_count(txq->txdesc_pool))
+               return -EINVAL;
+
+       if (offset < rte_mempool_in_use_count(txq->txdesc_pool))
+               return RTE_ETH_TX_DESC_FULL;
+       else
+               return RTE_ETH_TX_DESC_DONE;
+}
+
 static void
 hn_nvs_send_completed(struct rte_eth_dev *dev, uint16_t queue_id,
                      unsigned long xactid, const struct hn_nvs_rndis_ack *ack)
 {
+       struct hn_data *hv = dev->data->dev_private;
        struct hn_txdesc *txd = (struct hn_txdesc *)xactid;
        struct hn_tx_queue *txq;
 
@@ -296,14 +412,16 @@ hn_nvs_send_completed(struct rte_eth_dev *dev, uint16_t queue_id,
                txq->stats.bytes += txd->data_size;
                txq->stats.packets += txd->packets;
        } else {
-               PMD_TX_LOG(NOTICE, "port %u:%u complete tx %u failed status %u",
-                          txq->port_id, txq->queue_id, txd->chim_index, ack->status);
+               PMD_DRV_LOG(NOTICE, "port %u:%u complete tx %u failed status %u",
+                           txq->port_id, txq->queue_id, txd->chim_index, ack->status);
                ++txq->stats.errors;
        }
 
-       rte_pktmbuf_free(txd->m);
+       if (txd->chim_index != NVS_CHIM_IDX_INVALID)
+               hn_chim_free(hv, txd->chim_index);
 
-       rte_mempool_put(txq->hv->tx_pool, txd);
+       rte_pktmbuf_free(txd->m);
+       hn_txd_put(txq, txd);
 }
 
 /* Handle transmit completion events */
@@ -320,8 +438,7 @@ hn_nvs_handle_comp(struct rte_eth_dev *dev, uint16_t queue_id,
                break;
 
        default:
-               PMD_TX_LOG(NOTICE,
-                          "unexpected send completion type %u",
+               PMD_DRV_LOG(NOTICE, "unexpected send completion type %u",
                           hdr->type);
        }
 }
@@ -402,24 +519,13 @@ next:
        return 0;
 }
 
-/*
- * Ack the consumed RXBUF associated w/ this channel packet,
- * so that this RXBUF can be recycled by the hypervisor.
- */
-static void hn_rx_buf_release(struct hn_rx_bufinfo *rxb)
+static void hn_rx_buf_free_cb(void *buf __rte_unused, void *opaque)
 {
-       struct rte_mbuf_ext_shared_info *shinfo = &rxb->shinfo;
+       struct hn_rx_bufinfo *rxb = opaque;
        struct hn_data *hv = rxb->hv;
 
-       if (rte_mbuf_ext_refcnt_update(shinfo, -1) == 0) {
-               hn_nvs_ack_rxbuf(rxb->chan, rxb->xactid);
-               --hv->rxbuf_outstanding;
-       }
-}
-
-static void hn_rx_buf_free_cb(void *buf __rte_unused, void *opaque)
-{
-       hn_rx_buf_release(opaque);
+       rte_atomic32_dec(&hv->rxbuf_outstanding);
+       hn_nvs_ack_rxbuf(rxb->chan, rxb->xactid);
 }
 
 static struct hn_rx_bufinfo *hn_rx_buf_init(const struct hn_rx_queue *rxq,
@@ -444,6 +550,7 @@ static void hn_rxpkt(struct hn_rx_queue *rxq, struct hn_rx_bufinfo *rxb,
 {
        struct hn_data *hv = rxq->hv;
        struct rte_mbuf *m;
+       bool use_extbuf = false;
 
        m = rte_pktmbuf_alloc(rxq->mb_pool);
        if (unlikely(!m)) {
@@ -459,7 +566,8 @@ static void hn_rxpkt(struct hn_rx_queue *rxq, struct hn_rx_bufinfo *rxb,
         * some space available in receive area for later packets.
         */
        if (dlen >= HN_RXCOPY_THRESHOLD &&
-           hv->rxbuf_outstanding < hv->rxbuf_section_cnt / 2) {
+           (uint32_t)rte_atomic32_read(&hv->rxbuf_outstanding) <
+                       hv->rxbuf_section_cnt / 2) {
                struct rte_mbuf_ext_shared_info *shinfo;
                const void *rxbuf;
                rte_iova_t iova;
@@ -473,12 +581,14 @@ static void hn_rxpkt(struct hn_rx_queue *rxq, struct hn_rx_bufinfo *rxb,
                iova = rte_mem_virt2iova(rxbuf) + RTE_PTR_DIFF(data, rxbuf);
                shinfo = &rxb->shinfo;
 
-               if (rte_mbuf_ext_refcnt_update(shinfo, 1) == 1)
-                       ++hv->rxbuf_outstanding;
+               /* shinfo is already set to 1 by the caller */
+               if (rte_mbuf_ext_refcnt_update(shinfo, 1) == 2)
+                       rte_atomic32_inc(&hv->rxbuf_outstanding);
 
                rte_pktmbuf_attach_extbuf(m, data, iova,
                                          dlen + headroom, shinfo);
                m->data_off = headroom;
+               use_extbuf = true;
        } else {
                /* Mbuf's in pool must be large enough to hold small packets */
                if (unlikely(rte_pktmbuf_tailroom(m) < dlen)) {
@@ -506,6 +616,8 @@ static void hn_rxpkt(struct hn_rx_queue *rxq, struct hn_rx_bufinfo *rxb,
                if (!hv->vlan_strip && rte_vlan_insert(&m)) {
                        PMD_DRV_LOG(DEBUG, "vlan insert failed");
                        ++rxq->stats.errors;
+                       if (use_extbuf)
+                               rte_pktmbuf_detach_extbuf(m);
                        rte_pktmbuf_free(m);
                        return;
                }
@@ -539,6 +651,9 @@ static void hn_rxpkt(struct hn_rx_queue *rxq, struct hn_rx_bufinfo *rxb,
 
        if (unlikely(rte_ring_sp_enqueue(rxq->rx_ring, m) != 0)) {
                ++rxq->stats.ring_full;
+               PMD_RX_LOG(DEBUG, "rx ring full");
+               if (use_extbuf)
+                       rte_pktmbuf_detach_extbuf(m);
                rte_pktmbuf_free(m);
        }
 }
@@ -595,7 +710,7 @@ static void hn_rndis_rx_data(struct hn_rx_queue *rxq,
        if (unlikely(data_off + data_len > pkt->len))
                goto error;
 
-       if (unlikely(data_len < ETHER_HDR_LEN))
+       if (unlikely(data_len < RTE_ETHER_HDR_LEN))
                goto error;
 
        hn_rxpkt(rxq, rxb, data, data_off, data_len, &info);
@@ -714,7 +829,8 @@ hn_nvs_handle_rxbuf(struct rte_eth_dev *dev,
        }
 
        /* Send ACK now if external mbuf not used */
-       hn_rx_buf_release(rxb);
+       if (rte_mbuf_ext_refcnt_update(&rxb->shinfo, -1) == 0)
+               hn_nvs_ack_rxbuf(rxb->chan, rxb->xactid);
 }
 
 /*
@@ -773,6 +889,17 @@ struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
        return rxq;
 }
 
+void
+hn_dev_rx_queue_info(struct rte_eth_dev *dev, uint16_t queue_id,
+                    struct rte_eth_rxq_info *qinfo)
+{
+       struct hn_rx_queue *rxq = dev->data->rx_queues[queue_id];
+
+       qinfo->mp = rxq->mb_pool;
+       qinfo->nb_desc = rxq->rx_ring->size;
+       qinfo->conf.offloads = dev->data->dev_conf.rxmode.offloads;
+}
+
 int
 hn_dev_rx_queue_setup(struct rte_eth_dev *dev,
                      uint16_t queue_idx, uint16_t nb_desc,
@@ -829,12 +956,9 @@ fail:
        return error;
 }
 
-void
-hn_dev_rx_queue_release(void *arg)
+static void
+hn_rx_queue_free(struct hn_rx_queue *rxq, bool keep_primary)
 {
-       struct hn_rx_queue *rxq = arg;
-
-       PMD_INIT_FUNC_TRACE();
 
        if (!rxq)
                return;
@@ -846,10 +970,55 @@ hn_dev_rx_queue_release(void *arg)
        hn_vf_rx_queue_release(rxq->hv, rxq->queue_id);
 
        /* Keep primary queue to allow for control operations */
-       if (rxq != rxq->hv->primary) {
-               rte_free(rxq->event_buf);
-               rte_free(rxq);
-       }
+       if (keep_primary && rxq == rxq->hv->primary)
+               return;
+
+       rte_free(rxq->event_buf);
+       rte_free(rxq);
+}
+
+void
+hn_dev_rx_queue_release(void *arg)
+{
+       struct hn_rx_queue *rxq = arg;
+
+       PMD_INIT_FUNC_TRACE();
+
+       hn_rx_queue_free(rxq, true);
+}
+
+/*
+ * Get the number of used descriptor in a rx queue
+ * For this device that means how many packets are pending in the ring.
+ */
+uint32_t
+hn_dev_rx_queue_count(struct rte_eth_dev *dev, uint16_t queue_id)
+{
+       struct hn_rx_queue *rxq = dev->data->rx_queues[queue_id];
+
+       return rte_ring_count(rxq->rx_ring);
+}
+
+/*
+ * Check the status of a Rx descriptor in the queue
+ *
+ * returns:
+ *  - -EINVAL               - offset outside of ring
+ *  - RTE_ETH_RX_DESC_AVAIL - no data available yet
+ *  - RTE_ETH_RX_DESC_DONE  - data is waiting in stagin ring
+ */
+int hn_dev_rx_queue_status(void *arg, uint16_t offset)
+{
+       const struct hn_rx_queue *rxq = arg;
+
+       hn_process_events(rxq->hv, rxq->queue_id, 0);
+       if (offset >= rxq->rx_ring->capacity)
+               return -EINVAL;
+
+       if (offset < rte_ring_count(rxq->rx_ring))
+               return RTE_ETH_RX_DESC_DONE;
+       else
+               return RTE_ETH_RX_DESC_AVAIL;
 }
 
 int
@@ -875,10 +1044,6 @@ uint32_t hn_process_events(struct hn_data *hv, uint16_t queue_id,
 
        rxq = queue_id == 0 ? hv->primary : dev->data->rx_queues[queue_id];
 
-       /* If no pending data then nothing to do */
-       if (rte_vmbus_chan_rx_empty(rxq->chan))
-               return 0;
-
        /*
         * Since channel is shared between Rx and TX queue need to have a lock
         * since DPDK does not force same CPU to be used for Rx/Tx.
@@ -942,9 +1107,6 @@ retry:
 
                if (tx_limit && tx_done >= tx_limit)
                        break;
-
-               if (rxq->rx_ring && rte_ring_full(rxq->rx_ring))
-                       break;
        }
 
        if (bytes_read > 0)
@@ -1010,35 +1172,28 @@ static int hn_flush_txagg(struct hn_tx_queue *txq, bool *need_sig)
 
        if (likely(ret == 0))
                hn_reset_txagg(txq);
-       else
-               PMD_TX_LOG(NOTICE, "port %u:%u send failed: %d",
-                          txq->port_id, txq->queue_id, ret);
-
-       return ret;
-}
-
-static struct hn_txdesc *hn_new_txd(struct hn_data *hv,
-                                   struct hn_tx_queue *txq)
-{
-       struct hn_txdesc *txd;
+       else if (ret == -EAGAIN) {
+               PMD_TX_LOG(DEBUG, "port %u:%u channel full",
+                          txq->port_id, txq->queue_id);
+               ++txq->stats.channel_full;
+       } else {
+               ++txq->stats.errors;
 
-       if (rte_mempool_get(hv->tx_pool, (void **)&txd)) {
-               ++txq->stats.ring_full;
-               PMD_TX_LOG(DEBUG, "tx pool exhausted!");
-               return NULL;
+               PMD_DRV_LOG(NOTICE, "port %u:%u send failed: %d",
+                          txq->port_id, txq->queue_id, ret);
        }
-
-       txd->m = NULL;
-       txd->queue_id = txq->queue_id;
-       txd->packets = 0;
-       txd->data_size = 0;
-       txd->chim_size = 0;
-
-       return txd;
+       return ret;
 }
 
+/*
+ * Try and find a place in a send chimney buffer to put
+ * the small packet. If space is available, this routine
+ * returns a pointer of where to place the data.
+ * If no space, caller should try direct transmit.
+ */
 static void *
-hn_try_txagg(struct hn_data *hv, struct hn_tx_queue *txq, uint32_t pktsize)
+hn_try_txagg(struct hn_data *hv, struct hn_tx_queue *txq,
+            struct hn_txdesc *txd, uint32_t pktsize)
 {
        struct hn_txdesc *agg_txd = txq->agg_txd;
        struct rndis_packet_msg *pkt;
@@ -1066,7 +1221,7 @@ hn_try_txagg(struct hn_data *hv, struct hn_tx_queue *txq, uint32_t pktsize)
                }
 
                chim = (uint8_t *)pkt + pkt->len;
-
+               txq->agg_prevpkt = chim;
                txq->agg_pktleft--;
                txq->agg_szleft -= pktsize;
                if (txq->agg_szleft < HN_PKTSIZE_MIN(txq->agg_align)) {
@@ -1076,18 +1231,21 @@ hn_try_txagg(struct hn_data *hv, struct hn_tx_queue *txq, uint32_t pktsize)
                         */
                        txq->agg_pktleft = 0;
                }
-       } else {
-               agg_txd = hn_new_txd(hv, txq);
-               if (!agg_txd)
-                       return NULL;
-
-               chim = (uint8_t *)hv->chim_res->addr
-                       + agg_txd->chim_index * hv->chim_szmax;
 
-               txq->agg_txd = agg_txd;
-               txq->agg_pktleft = txq->agg_pktmax - 1;
-               txq->agg_szleft = txq->agg_szmax - pktsize;
+               hn_txd_put(txq, txd);
+               return chim;
        }
+
+       txd->chim_index = hn_chim_alloc(hv);
+       if (txd->chim_index == NVS_CHIM_IDX_INVALID)
+               return NULL;
+
+       chim = (uint8_t *)hv->chim_res->addr
+                       + txd->chim_index * hv->chim_szmax;
+
+       txq->agg_txd = txd;
+       txq->agg_pktleft = txq->agg_pktmax - 1;
+       txq->agg_szleft = txq->agg_szmax - pktsize;
        txq->agg_prevpkt = chim;
 
        return chim;
@@ -1263,11 +1421,12 @@ static int hn_xmit_sg(struct hn_tx_queue *txq,
        hn_rndis_dump(txd->rndis_pkt);
 
        /* pass IOVA of rndis header in first segment */
-       addr = rte_malloc_virt2iova(txd->rndis_pkt);
+       addr = rte_malloc_virt2iova(txq->tx_rndis);
        if (unlikely(addr == RTE_BAD_IOVA)) {
                PMD_DRV_LOG(ERR, "RNDIS transmit can not get iova");
                return -EINVAL;
        }
+       addr = addr + ((char *)txd->rndis_pkt - (char *)txq->tx_rndis);
 
        sg[0].page = addr / PAGE_SIZE;
        sg[0].ofs = addr & PAGE_MASK;
@@ -1295,28 +1454,42 @@ hn_xmit_pkts(void *ptxq, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
        struct hn_data *hv = txq->hv;
        struct rte_eth_dev *vf_dev;
        bool need_sig = false;
-       uint16_t nb_tx;
+       uint16_t nb_tx, tx_thresh;
        int ret;
 
        if (unlikely(hv->closed))
                return 0;
 
+       /*
+        * Always check for events on the primary channel
+        * because that is where hotplug notifications occur.
+        */
+       tx_thresh = RTE_MAX(txq->free_thresh, nb_pkts);
+       if (txq->queue_id == 0 ||
+           rte_mempool_avail_count(txq->txdesc_pool) < tx_thresh)
+               hn_process_events(hv, txq->queue_id, 0);
+
        /* Transmit over VF if present and up */
-       vf_dev = hv->vf_dev;
-       rte_compiler_barrier();
+       rte_rwlock_read_lock(&hv->vf_lock);
+       vf_dev = hn_get_vf_dev(hv);
        if (vf_dev && vf_dev->data->dev_started) {
                void *sub_q = vf_dev->data->tx_queues[queue_id];
 
-               return (*vf_dev->tx_pkt_burst)(sub_q, tx_pkts, nb_pkts);
+               nb_tx = (*vf_dev->tx_pkt_burst)(sub_q, tx_pkts, nb_pkts);
+               rte_rwlock_read_unlock(&hv->vf_lock);
+               return nb_tx;
        }
-
-       if (rte_mempool_avail_count(hv->tx_pool) <= txq->free_thresh)
-               hn_process_events(hv, txq->queue_id, 0);
+       rte_rwlock_read_unlock(&hv->vf_lock);
 
        for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
                struct rte_mbuf *m = tx_pkts[nb_tx];
                uint32_t pkt_size = m->pkt_len + HN_RNDIS_PKT_LEN;
                struct rndis_packet_msg *pkt;
+               struct hn_txdesc *txd;
+
+               txd = hn_txd_get(txq);
+               if (txd == NULL)
+                       break;
 
                /* For small packets aggregate them in chimney buffer */
                if (m->pkt_len < HN_TXCOPY_THRESHOLD && pkt_size <= txq->agg_szmax) {
@@ -1327,7 +1500,8 @@ hn_xmit_pkts(void *ptxq, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
                                        goto fail;
                        }
 
-                       pkt = hn_try_txagg(hv, txq, pkt_size);
+
+                       pkt = hn_try_txagg(hv, txq, txd, pkt_size);
                        if (unlikely(!pkt))
                                break;
 
@@ -1341,30 +1515,27 @@ hn_xmit_pkts(void *ptxq, struct rte_mbuf **tx_pkts, uint16_t nb_pkts)
                            hn_flush_txagg(txq, &need_sig))
                                goto fail;
                } else {
-                       struct hn_txdesc *txd;
-
-                       /* can send chimney data and large packet at once */
-                       txd = txq->agg_txd;
-                       if (txd) {
-                               hn_reset_txagg(txq);
-                       } else {
-                               txd = hn_new_txd(hv, txq);
-                               if (unlikely(!txd))
-                                       break;
-                       }
+                       /* Send any outstanding packets in buffer */
+                       if (txq->agg_txd && hn_flush_txagg(txq, &need_sig))
+                               goto fail;
 
                        pkt = txd->rndis_pkt;
                        txd->m = m;
-                       txd->data_size += m->pkt_len;
+                       txd->data_size = m->pkt_len;
                        ++txd->packets;
 
                        hn_encap(pkt, queue_id, m);
 
                        ret = hn_xmit_sg(txq, txd, m, &need_sig);
                        if (unlikely(ret != 0)) {
-                               PMD_TX_LOG(NOTICE, "sg send failed: %d", ret);
-                               ++txq->stats.errors;
-                               rte_mempool_put(hv->tx_pool, txd);
+                               if (ret == -EAGAIN) {
+                                       PMD_TX_LOG(DEBUG, "sg channel full");
+                                       ++txq->stats.channel_full;
+                               } else {
+                                       PMD_DRV_LOG(NOTICE, "sg send failed: %d", ret);
+                                       ++txq->stats.errors;
+                               }
+                               hn_txd_put(txq, txd);
                                goto fail;
                        }
                }
@@ -1382,6 +1553,24 @@ fail:
        return nb_tx;
 }
 
+static uint16_t
+hn_recv_vf(uint16_t vf_port, const struct hn_rx_queue *rxq,
+          struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
+{
+       uint16_t i, n;
+
+       if (unlikely(nb_pkts == 0))
+               return 0;
+
+       n = rte_eth_rx_burst(vf_port, rxq->queue_id, rx_pkts, nb_pkts);
+
+       /* relabel the received mbufs */
+       for (i = 0; i < n; i++)
+               rx_pkts[i]->port = rxq->port_id;
+
+       return n;
+}
+
 uint16_t
 hn_recv_pkts(void *prxq, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
 {
@@ -1393,30 +1582,41 @@ hn_recv_pkts(void *prxq, struct rte_mbuf **rx_pkts, uint16_t nb_pkts)
        if (unlikely(hv->closed))
                return 0;
 
-       vf_dev = hv->vf_dev;
-       rte_compiler_barrier();
-
-       if (vf_dev && vf_dev->data->dev_started) {
-               /* Normally, with SR-IOV the ring buffer will be empty */
+       /* Check for new completions (and hotplug) */
+       if (likely(rte_ring_count(rxq->rx_ring) < nb_pkts))
                hn_process_events(hv, rxq->queue_id, 0);
 
-               /* Get mbufs some bufs off of staging ring */
-               nb_rcv = rte_ring_sc_dequeue_burst(rxq->rx_ring,
-                                                  (void **)rx_pkts,
-                                                  nb_pkts / 2, NULL);
-               /* And rest off of VF */
-               nb_rcv += rte_eth_rx_burst(vf_dev->data->port_id,
-                                          rxq->queue_id,
-                                          rx_pkts + nb_rcv, nb_pkts - nb_rcv);
-       } else {
-               /* If receive ring is not full then get more */
-               if (rte_ring_count(rxq->rx_ring) < nb_pkts)
-                       hn_process_events(hv, rxq->queue_id, 0);
+       /* Always check the vmbus path for multicast and new flows */
+       nb_rcv = rte_ring_sc_dequeue_burst(rxq->rx_ring,
+                                          (void **)rx_pkts, nb_pkts, NULL);
 
-               nb_rcv = rte_ring_sc_dequeue_burst(rxq->rx_ring,
-                                                  (void **)rx_pkts,
-                                                  nb_pkts, NULL);
-       }
+       /* If VF is available, check that as well */
+       rte_rwlock_read_lock(&hv->vf_lock);
+       vf_dev = hn_get_vf_dev(hv);
+       if (vf_dev && vf_dev->data->dev_started)
+               nb_rcv += hn_recv_vf(vf_dev->data->port_id, rxq,
+                                    rx_pkts + nb_rcv, nb_pkts - nb_rcv);
 
+       rte_rwlock_read_unlock(&hv->vf_lock);
        return nb_rcv;
 }
+
+void
+hn_dev_free_queues(struct rte_eth_dev *dev)
+{
+       unsigned int i;
+
+       for (i = 0; i < dev->data->nb_rx_queues; i++) {
+               struct hn_rx_queue *rxq = dev->data->rx_queues[i];
+
+               hn_rx_queue_free(rxq, false);
+               dev->data->rx_queues[i] = NULL;
+       }
+       dev->data->nb_rx_queues = 0;
+
+       for (i = 0; i < dev->data->nb_tx_queues; i++) {
+               hn_dev_tx_queue_release(dev->data->tx_queues[i]);
+               dev->data->tx_queues[i] = NULL;
+       }
+       dev->data->nb_tx_queues = 0;
+}