+
+static int
+memif_tx_one_zc(struct pmd_process_private *proc_private, struct memif_queue *mq,
+ memif_ring_t *ring, struct rte_mbuf *mbuf, const uint16_t mask,
+ uint16_t slot, uint16_t n_free)
+{
+ memif_desc_t *d0;
+ int used_slots = 1;
+
+next_in_chain:
+ /* store pointer to mbuf to free it later */
+ mq->buffers[slot & mask] = mbuf;
+ /* Increment refcnt to make sure the buffer is not freed before server
+ * receives it. (current segment)
+ */
+ rte_mbuf_refcnt_update(mbuf, 1);
+ /* populate descriptor */
+ d0 = &ring->desc[slot & mask];
+ d0->length = rte_pktmbuf_data_len(mbuf);
+ mq->n_bytes += rte_pktmbuf_data_len(mbuf);
+ /* FIXME: get region index */
+ d0->region = 1;
+ d0->offset = rte_pktmbuf_mtod(mbuf, uint8_t *) -
+ (uint8_t *)proc_private->regions[d0->region]->addr;
+ d0->flags = 0;
+
+ /* check if buffer is chained */
+ if (rte_pktmbuf_is_contiguous(mbuf) == 0) {
+ if (n_free < 2)
+ return 0;
+ /* mark buffer as chained */
+ d0->flags |= MEMIF_DESC_FLAG_NEXT;
+ /* advance mbuf */
+ mbuf = mbuf->next;
+ /* update counters */
+ used_slots++;
+ slot++;
+ n_free--;
+ goto next_in_chain;
+ }
+ return used_slots;
+}
+
+static uint16_t
+eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+ struct memif_queue *mq = queue;
+ struct pmd_internals *pmd = rte_eth_devices[mq->in_port].data->dev_private;
+ struct pmd_process_private *proc_private =
+ rte_eth_devices[mq->in_port].process_private;
+ memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
+ uint16_t slot, n_free, ring_size, mask, n_tx_pkts = 0;
+ struct rte_eth_link link;
+
+ if (unlikely((pmd->flags & ETH_MEMIF_FLAG_CONNECTED) == 0))
+ return 0;
+ if (unlikely(ring == NULL)) {
+ /* Secondary process will attempt to request regions. */
+ rte_eth_link_get(mq->in_port, &link);
+ return 0;
+ }
+
+ ring_size = 1 << mq->log2_ring_size;
+ mask = ring_size - 1;
+
+ /* free mbufs received by server */
+ memif_free_stored_mbufs(proc_private, mq);
+
+ /* ring type always MEMIF_RING_C2S */
+ /* For C2S queues ring->head is updated by the sender and
+ * this function is called in the context of sending thread.
+ * The loads in the sender do not need to synchronize with
+ * its own stores. Hence, the following load can be a
+ * relaxed load.
+ */
+ slot = __atomic_load_n(&ring->head, __ATOMIC_RELAXED);
+ n_free = ring_size - slot + mq->last_tail;
+
+ int used_slots;
+
+ while (n_free && (n_tx_pkts < nb_pkts)) {
+ while ((n_free > 4) && ((nb_pkts - n_tx_pkts) > 4)) {
+ if ((nb_pkts - n_tx_pkts) > 8) {
+ rte_prefetch0(*bufs + 4);
+ rte_prefetch0(*bufs + 5);
+ rte_prefetch0(*bufs + 6);
+ rte_prefetch0(*bufs + 7);
+ }
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+ }
+ used_slots = memif_tx_one_zc(proc_private, mq, ring, *bufs++,
+ mask, slot, n_free);
+ if (unlikely(used_slots < 1))
+ goto no_free_slots;
+ n_tx_pkts++;
+ slot += used_slots;
+ n_free -= used_slots;
+ }
+
+no_free_slots:
+ /* ring type always MEMIF_RING_C2S */
+ /* The ring->head acts as a guard variable between Tx and Rx
+ * threads, so using store-release pairs with load-acquire
+ * in function eth_memif_rx for C2S rings.
+ */
+ __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
+
+ /* Send interrupt, if enabled. */
+ if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
+ uint64_t a = 1;
+ ssize_t size = write(mq->intr_handle.fd, &a, sizeof(a));
+ if (unlikely(size < 0)) {
+ MIF_LOG(WARNING,
+ "Failed to send interrupt. %s", strerror(errno));
+ }
+ }
+
+ /* increment queue counters */
+ mq->n_pkts += n_tx_pkts;
+
+ return n_tx_pkts;
+}
+