.allmulticast_disable = hn_dev_allmulticast_disable,
.tx_queue_setup = hn_dev_tx_queue_setup,
.tx_queue_release = hn_dev_tx_queue_release,
+ .tx_done_cleanup = hn_dev_tx_done_cleanup,
.rx_queue_setup = hn_dev_rx_queue_setup,
.rx_queue_release = hn_dev_rx_queue_release,
.link_update = hn_dev_link_update,
}
}
+int
+hn_dev_tx_done_cleanup(void *arg, uint32_t free_cnt)
+{
+ struct hn_tx_queue *txq = arg;
+
+ return hn_process_events(txq->hv, txq->queue_id, free_cnt);
+}
+
void
hn_dev_rx_queue_info(struct rte_eth_dev *dev, uint16_t queue_idx,
struct rte_eth_rxq_info *qinfo)
* Process pending events on the channel.
* Called from both Rx queue poll and Tx cleanup
*/
-void hn_process_events(struct hn_data *hv, uint16_t queue_id)
+uint32_t hn_process_events(struct hn_data *hv, uint16_t queue_id,
+ uint32_t tx_limit)
{
struct rte_eth_dev *dev = &rte_eth_devices[hv->port_id];
struct hn_rx_queue *rxq;
uint32_t bytes_read = 0;
+ uint32_t tx_done = 0;
int ret = 0;
rxq = queue_id == 0 ? hv->primary : dev->data->rx_queues[queue_id];
/* If no pending data then nothing to do */
if (rte_vmbus_chan_rx_empty(rxq->chan))
- return;
+ return 0;
/*
* Since channel is shared between Rx and TX queue need to have a lock
* since DPDK does not force same CPU to be used for Rx/Tx.
*/
if (unlikely(!rte_spinlock_trylock(&rxq->ring_lock)))
- return;
+ return 0;
for (;;) {
const struct vmbus_chanpkt_hdr *pkt;
switch (pkt->type) {
case VMBUS_CHANPKT_TYPE_COMP:
+ ++tx_done;
hn_nvs_handle_comp(dev, queue_id, pkt, data);
break;
break;
}
+ if (tx_limit && tx_done >= tx_limit)
+ break;
+
if (rxq->rx_ring && rte_ring_full(rxq->rx_ring))
break;
}
rte_vmbus_chan_signal_read(rxq->chan, bytes_read);
rte_spinlock_unlock(&rxq->ring_lock);
+
+ return tx_done;
}
static void hn_append_to_chim(struct hn_tx_queue *txq,
return 0;
if (rte_mempool_avail_count(hv->tx_pool) <= txq->free_thresh)
- hn_process_events(hv, txq->queue_id);
+ hn_process_events(hv, txq->queue_id, 0);
for (nb_tx = 0; nb_tx < nb_pkts; nb_tx++) {
struct rte_mbuf *m = tx_pkts[nb_tx];
/* If ring is empty then process more */
if (rte_ring_count(rxq->rx_ring) < nb_pkts)
- hn_process_events(hv, rxq->queue_id);
+ hn_process_events(hv, rxq->queue_id, 0);
/* Get mbufs off staging ring */
return rte_ring_sc_dequeue_burst(rxq->rx_ring, (void **)rx_pkts,
return hv->channels[0];
}
-void hn_process_events(struct hn_data *hv, uint16_t queue_id);
+uint32_t hn_process_events(struct hn_data *hv, uint16_t queue_id,
+ uint32_t tx_limit);
uint16_t hn_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts,
uint16_t nb_pkts);
void hn_dev_tx_queue_release(void *arg);
void hn_dev_tx_queue_info(struct rte_eth_dev *dev, uint16_t queue_idx,
struct rte_eth_txq_info *qinfo);
+int hn_dev_tx_done_cleanup(void *arg, uint32_t free_cnt);
struct hn_rx_queue *hn_rx_queue_alloc(struct hn_data *hv,
uint16_t queue_id,