+
+static int
+memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
+ memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
+ uint16_t slot, uint16_t n_free)
+{
+ memif_desc_t *d0;
+ int used_slots = 1;
+
+next_in_chain:
+ /* store pointer to mbuf to free it later */
+ mq->buffers[slot & mask] = mbuf;
+ /* Increment refcnt to make sure the buffer is not freed before master
+ * receives it. (current segment)
+ */
+ rte_mbuf_refcnt_update(mbuf, 1);
+ /* populate descriptor */
+ d0 = &ring->desc[slot & mask];
+ d0->length = rte_pktmbuf_data_len(mbuf);
+ /* FIXME: get region index */
+ d0->region = 1;
+ d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
+ (uint8_t *)proc_private->regions[d0->region]->addr;
+ d0->flags = 0;
+
+ /* check if buffer is chained */
+ if (rte_pktmbuf_is_contiguous(mbuf) == 0) {
+ if (n_free < 2)
+ return 0;
+ /* mark buffer as chained */
+ d0->flags |= MEMIF_DESC_FLAG_NEXT;
+ /* advance mbuf */
+ mbuf = mbuf->next;
+ /* update counters */
+ used_slots++;
+ slot++;
+ n_free--;
+ goto next_in_chain;
+ }
+ return used_slots;
+}
+
+static uint16_t
+eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+ struct memif_queue *mq = queue;
+ struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
+ struct pmd_process_private *proc_private =
+ rte_eth_devices[mq->in_port].process_private;
+ memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
+ uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
+ memif_ring_type_t type = mq->type;
+ struct rte_eth_link link;
+
+ if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
+ return 0;
+ if (unlikely(ring == NULL)) {
+ /* Secondary process will attempt to request regions. */
+ rte_eth_link_get(mq->in_port, &link);
+ return 0;
+ }
+
+ ring_size = 1 << mq->log2_ring_size;
+ mask = ring_size - 1;
+
+ /* free mbufs received by master */
+ memif_free_stored_mbufs(proc_private, mq);
+
+ /* ring type always MEMIF_RING_S2M */
+ slot = ring->head;
+ n_free = ring_size - ring->head + mq->last_tail;
+
+ int used_slots;
+
+ while (n_free && (n_tx_pkts < nb_pkts)) {
+ while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
+ if ((nb_pkts - n_tx_pkts) > 8) {
+ rte_prefetch0(*bufs + 4);
+ rte_prefetch0(*bufs + 5);
+ rte_prefetch0(*bufs + 6);
+ rte_prefetch0(*bufs + 7);
+ }
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+ }
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+ }
+
+no_free_slots:
+ rte_mb();
+ /* update ring pointers */
+ if (type == MEMIF_RING_S2M)
+ ring->head = slot;
+ else
+ ring->tail = slot;
+
+ /* Send interrupt, if enabled. */
+ if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
+ uint64_t a = 1;
+ ssize_t size = write(mq->intr_handle.fd, &a, sizeof(a));
+ if (unlikely(size < 0)) {
+ MIF_LOG(WARNING,
+ "Failed to send interrupt. %s", strerror(errno));
+ }
+ }
+
+ /* increment queue counters */
+ mq->n_pkts += n_tx_pkts;
+
+ return n_tx_pkts;
+}
+