bus/vmbus: avoid signalling host on read
authorStephen Hemminger <sthemmin@microsoft.com>
Tue, 24 Jul 2018 21:08:53 +0000 (14:08 -0700)
committerThomas Monjalon <thomas@monjalon.net>
Sun, 5 Aug 2018 09:03:18 +0000 (11:03 +0200)
Don't signal host that receive ring has been read until all events
have been processed. This reduces the number of guest exits and
therefore improves performance.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
drivers/bus/vmbus/rte_bus_vmbus.h
drivers/bus/vmbus/rte_bus_vmbus_version.map
drivers/bus/vmbus/vmbus_bufring.c
drivers/bus/vmbus/vmbus_channel.c
drivers/net/netvsc/hn_rxtx.c
drivers/net/netvsc/hn_var.h

index 0100f80..4a2c1f6 100644 (file)
@@ -337,12 +337,23 @@ int rte_vmbus_chan_recv(struct vmbus_channel *chan,
  * @param len
  *     Pointer to size of receive buffer (in/out)
  * @return
- *   On success, returns 0
+ *   On success, returns number of bytes read.
  *   On failure, returns negative errno.
  */
 int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
                            void *data, uint32_t *len);
 
+/**
+ * Notify host of bytes read (after recv_raw)
+ * Signals host if required.
+ *
+ * @param channel
+ *     Pointer to vmbus_channel structure.
+ * @param bytes_read
+ *     Number of bytes read since last signal
+ */
+void rte_vmbus_chan_signal_read(struct vmbus_channel *chan, uint32_t bytes_read);
+
 /**
  * Determine sub channel index of the given channel
  *
index 5324fef..dabb920 100644 (file)
@@ -10,6 +10,7 @@ DPDK_18.08 {
        rte_vmbus_chan_rx_empty;
        rte_vmbus_chan_send;
        rte_vmbus_chan_send_sglist;
+       rte_vmbus_chan_signal_read;
        rte_vmbus_chan_signal_tx;
        rte_vmbus_irq_mask;
        rte_vmbus_irq_read;
index c2d7d8c..c880016 100644 (file)
@@ -221,6 +221,9 @@ vmbus_rxbr_read(struct vmbus_br *rbr, void *data, size_t dlen, size_t skip)
        if (vmbus_br_availread(rbr) < dlen + skip + sizeof(uint64_t))
                return -EAGAIN;
 
+       /* Record where host was when we started read (for debug) */
+       rbr->windex = rbr->vbr->windex;
+
        /*
         * Copy channel packet from RX bufring.
         */
index f9feada..cc5f3e8 100644 (file)
@@ -176,49 +176,37 @@ bool rte_vmbus_chan_rx_empty(const struct vmbus_channel *channel)
        return br->vbr->rindex == br->vbr->windex;
 }
 
-static int vmbus_read_and_signal(struct vmbus_channel *chan,
-                                void *data, size_t dlen, size_t skip)
+/* Signal host after reading N bytes */
+void rte_vmbus_chan_signal_read(struct vmbus_channel *chan, uint32_t bytes_read)
 {
        struct vmbus_br *rbr = &chan->rxbr;
-       uint32_t write_sz, pending_sz, bytes_read;
-       int error;
-
-       /* Record where host was when we started read (for debug) */
-       rbr->windex = rbr->vbr->windex;
-
-       /* Read data and skip packet header */
-       error = vmbus_rxbr_read(rbr, data, dlen, skip);
-       if (error)
-               return error;
+       uint32_t write_sz, pending_sz;
 
        /* No need for signaling on older versions */
        if (!rbr->vbr->feature_bits.feat_pending_send_sz)
-               return 0;
+               return;
 
        /* Make sure reading of pending happens after new read index */
        rte_mb();
 
        pending_sz = rbr->vbr->pending_send;
        if (!pending_sz)
-               return 0;
+               return;
 
        rte_smp_rmb();
        write_sz = vmbus_br_availwrite(rbr, rbr->vbr->windex);
-       bytes_read = dlen + skip + sizeof(uint64_t);
 
        /* If there was space before then host was not blocked */
        if (write_sz - bytes_read > pending_sz)
-               return 0;
+               return;
 
        /* If pending write will not fit */
        if (write_sz <= pending_sz)
-               return 0;
+               return;
 
        vmbus_set_event(chan->device, chan);
-       return 0;
 }
 
-/* TODO: replace this with inplace ring buffer (no copy) */
 int rte_vmbus_chan_recv(struct vmbus_channel *chan, void *data, uint32_t *len,
                        uint64_t *request_id)
 {
@@ -256,10 +244,16 @@ int rte_vmbus_chan_recv(struct vmbus_channel *chan, void *data, uint32_t *len,
        if (request_id)
                *request_id = pkt.xactid;
 
-       /* Read data and skip the header */
-       return vmbus_read_and_signal(chan, data, dlen, hlen);
+       /* Read data and skip packet header */
+       error = vmbus_rxbr_read(&chan->rxbr, data, dlen, hlen);
+       if (error)
+               return error;
+
+       rte_vmbus_chan_signal_read(chan, dlen + hlen + sizeof(uint64_t));
+       return 0;
 }
 
+/* TODO: replace this with inplace ring buffer (no copy) */
 int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
                            void *data, uint32_t *len)
 {
@@ -291,8 +285,13 @@ int rte_vmbus_chan_recv_raw(struct vmbus_channel *chan,
        if (unlikely(dlen > bufferlen))
                return -ENOBUFS;
 
-       /* Put packet header in data buffer */
-       return vmbus_read_and_signal(chan, data, dlen, 0);
+       /* Read data and skip packet header */
+       error = vmbus_rxbr_read(&chan->rxbr, data, dlen, 0);
+       if (error)
+               return error;
+
+       /* Return the number of bytes read */
+       return dlen + sizeof(uint64_t);
 }
 
 int vmbus_chan_create(const struct rte_vmbus_device *device,
index ec133d4..cc8a534 100644 (file)
@@ -40,7 +40,7 @@
 #define HN_TXCOPY_THRESHOLD    512
 
 #define HN_RXCOPY_THRESHOLD    256
-#define HN_RXQ_EVENT_DEFAULT   1024
+#define HN_RXQ_EVENT_DEFAULT   2048
 
 struct hn_rxinfo {
        uint32_t        vlan_info;
@@ -709,7 +709,8 @@ struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
 {
        struct hn_rx_queue *rxq;
 
-       rxq = rte_zmalloc_socket("HN_RXQ", sizeof(*rxq),
+       rxq = rte_zmalloc_socket("HN_RXQ",
+                                sizeof(*rxq) + HN_RXQ_EVENT_DEFAULT,
                                 RTE_CACHE_LINE_SIZE, socket_id);
        if (rxq) {
                rxq->hv = hv;
@@ -717,16 +718,6 @@ struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
                rte_spinlock_init(&rxq->ring_lock);
                rxq->port_id = hv->port_id;
                rxq->queue_id = queue_id;
-
-               rxq->event_sz = HN_RXQ_EVENT_DEFAULT;
-               rxq->event_buf = rte_malloc_socket("RX_EVENTS",
-                                                  rxq->event_sz,
-                                                  RTE_CACHE_LINE_SIZE,
-                                                  socket_id);
-               if (!rxq->event_buf) {
-                       rte_free(rxq);
-                       rxq = NULL;
-               }
        }
        return rxq;
 }
@@ -835,6 +826,7 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
 {
        struct rte_eth_dev *dev = &rte_eth_devices[hv->port_id];
        struct hn_rx_queue *rxq;
+       uint32_t bytes_read = 0;
        int ret = 0;
 
        rxq = queue_id == 0 ? hv->primary : dev->data->rx_queues[queue_id];
@@ -852,34 +844,21 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
 
        for (;;) {
                const struct vmbus_chanpkt_hdr *pkt;
-               uint32_t len = rxq->event_sz;
+               uint32_t len = HN_RXQ_EVENT_DEFAULT;
                const void *data;
 
                ret = rte_vmbus_chan_recv_raw(rxq->chan, rxq->event_buf, &len);
                if (ret == -EAGAIN)
                        break;  /* ring is empty */
 
-               if (ret == -ENOBUFS) {
-                       /* expanded buffer needed */
-                       len = rte_align32pow2(len);
-                       PMD_DRV_LOG(DEBUG, "expand event buf to %u", len);
-
-                       rxq->event_buf = rte_realloc(rxq->event_buf,
-                                                    len, RTE_CACHE_LINE_SIZE);
-                       if (rxq->event_buf) {
-                               rxq->event_sz = len;
-                               continue;
-                       }
-
-                       rte_exit(EXIT_FAILURE, "can not expand event buf!\n");
-                       break;
-               }
-
-               if (ret != 0) {
-                       PMD_DRV_LOG(ERR, "vmbus ring buffer error: %d", ret);
-                       break;
-               }
+               else if (ret == -ENOBUFS)
+                       rte_exit(EXIT_FAILURE, "event buffer not big enough (%u < %u)",
+                                HN_RXQ_EVENT_DEFAULT, len);
+               else if (ret <= 0)
+                       rte_exit(EXIT_FAILURE,
+                                "vmbus ring buffer error: %d", ret);
 
+               bytes_read += ret;
                pkt = (const struct vmbus_chanpkt_hdr *)rxq->event_buf;
                data = (char *)rxq->event_buf + vmbus_chanpkt_getlen(pkt->hlen);
 
@@ -904,6 +883,10 @@ void hn_process_events(struct hn_data *hv, uint16_t queue_id)
                if (rxq->rx_ring && rte_ring_full(rxq->rx_ring))
                        break;
        }
+
+       if (bytes_read > 0)
+               rte_vmbus_chan_signal_read(rxq->chan, bytes_read);
+
        rte_spinlock_unlock(&rxq->ring_lock);
 }
 
index 3f3b442..f7ff858 100644 (file)
@@ -69,7 +69,6 @@ struct hn_rx_queue {
        struct vmbus_channel *chan;
        struct rte_mempool *mb_pool;
        struct rte_ring *rx_ring;
-       void    *event_buf;
 
        rte_spinlock_t ring_lock;
        uint32_t event_sz;
@@ -77,6 +76,8 @@ struct hn_rx_queue {
        uint16_t queue_id;
        struct hn_stats stats;
        uint64_t ring_full;
+
+       uint8_t event_buf[];
 };