net/liquidio: add Rx data path
authorShijith Thotton <shijith.thotton@caviumnetworks.com>
Sat, 25 Mar 2017 06:24:33 +0000 (11:54 +0530)
committerFerruh Yigit <ferruh.yigit@intel.com>
Tue, 4 Apr 2017 16:59:48 +0000 (18:59 +0200)
Add APIs to receive packets and re-fill ring buffers.

Signed-off-by: Shijith Thotton <shijith.thotton@caviumnetworks.com>
Signed-off-by: Jerin Jacob <jerin.jacob@caviumnetworks.com>
Signed-off-by: Derek Chickles <derek.chickles@caviumnetworks.com>
Signed-off-by: Venkat Koppula <venkat.koppula@caviumnetworks.com>
Signed-off-by: Srisivasubramanian S <ssrinivasan@caviumnetworks.com>
Signed-off-by: Mallesham Jatharakonda <mjatharakonda@oneconvergence.com>
doc/guides/nics/features/liquidio.ini
drivers/net/liquidio/base/lio_hw_defs.h
drivers/net/liquidio/lio_ethdev.c
drivers/net/liquidio/lio_rxtx.c
drivers/net/liquidio/lio_rxtx.h

index 6c5d8d1..554d921 100644 (file)
@@ -4,6 +4,10 @@
 ; Refer to default.ini for the full list of available PMD features.
 ;
 [Features]
+Scattered Rx         = Y
+CRC offload          = Y
+L3 checksum offload  = Y
+L4 checksum offload  = Y
 Multiprocess aware   = Y
 Linux UIO            = Y
 Linux VFIO           = Y
index 4271730..2db7085 100644 (file)
@@ -112,12 +112,24 @@ enum octeon_tag_type {
 /* used for NIC operations */
 #define LIO_OPCODE     1
 
+/* Subcodes are used by host driver/apps to identify the sub-operation
+ * for the core. They only need to by unique for a given subsystem.
+ */
+#define LIO_OPCODE_SUBCODE(op, sub)            \
+               ((((op) & 0x0f) << 8) | ((sub) & 0x7f))
+
 /** LIO_OPCODE subcodes */
 /* This subcode is sent by core PCI driver to indicate cores are ready. */
+#define LIO_OPCODE_NW_DATA             0x02 /* network packet data */
 #define LIO_OPCODE_IF_CFG              0x09
 
 #define LIO_MAX_RX_PKTLEN              (64 * 1024)
 
+/* RX(packets coming from wire) Checksum verification flags */
+/* TCP/UDP csum */
+#define LIO_L4_CSUM_VERIFIED           0x1
+#define LIO_IP_CSUM_VERIFIED           0x2
+
 /* Interface flags communicated between host driver and core app. */
 enum lio_ifflags {
        LIO_IFFLAG_UNICAST      = 0x10
index a93fa4a..ebfdf7a 100644 (file)
@@ -404,6 +404,8 @@ lio_eth_dev_uninit(struct rte_eth_dev *eth_dev)
        rte_free(eth_dev->data->mac_addrs);
        eth_dev->data->mac_addrs = NULL;
 
+       eth_dev->rx_pkt_burst = NULL;
+
        return 0;
 }
 
@@ -415,6 +417,8 @@ lio_eth_dev_init(struct rte_eth_dev *eth_dev)
 
        PMD_INIT_FUNC_TRACE();
 
+       eth_dev->rx_pkt_burst = &lio_dev_recv_pkts;
+
        /* Primary does the initialization. */
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
                return 0;
@@ -448,6 +452,7 @@ lio_eth_dev_init(struct rte_eth_dev *eth_dev)
                lio_dev_err(lio_dev,
                            "MAC addresses memory allocation failed\n");
                eth_dev->dev_ops = NULL;
+               eth_dev->rx_pkt_burst = NULL;
                return -ENOMEM;
        }
 
index 9948023..9e4da3a 100644 (file)
@@ -326,6 +326,386 @@ lio_setup_droq(struct lio_device *lio_dev, int oq_no, int num_descs,
        return 0;
 }
 
+static inline uint32_t
+lio_droq_get_bufcount(uint32_t buf_size, uint32_t total_len)
+{
+       uint32_t buf_cnt = 0;
+
+       while (total_len > (buf_size * buf_cnt))
+               buf_cnt++;
+
+       return buf_cnt;
+}
+
+/* If we were not able to refill all buffers, try to move around
+ * the buffers that were not dispatched.
+ */
+static inline uint32_t
+lio_droq_refill_pullup_descs(struct lio_droq *droq,
+                            struct lio_droq_desc *desc_ring)
+{
+       uint32_t refill_index = droq->refill_idx;
+       uint32_t desc_refilled = 0;
+
+       while (refill_index != droq->read_idx) {
+               if (droq->recv_buf_list[refill_index].buffer) {
+                       droq->recv_buf_list[droq->refill_idx].buffer =
+                               droq->recv_buf_list[refill_index].buffer;
+                       desc_ring[droq->refill_idx].buffer_ptr =
+                               desc_ring[refill_index].buffer_ptr;
+                       droq->recv_buf_list[refill_index].buffer = NULL;
+                       desc_ring[refill_index].buffer_ptr = 0;
+                       do {
+                               droq->refill_idx = lio_incr_index(
+                                                       droq->refill_idx, 1,
+                                                       droq->max_count);
+                               desc_refilled++;
+                               droq->refill_count--;
+                       } while (droq->recv_buf_list[droq->refill_idx].buffer);
+               }
+               refill_index = lio_incr_index(refill_index, 1,
+                                             droq->max_count);
+       }       /* while */
+
+       return desc_refilled;
+}
+
+/* lio_droq_refill
+ *
+ * @param lio_dev      - pointer to the lio device structure
+ * @param droq         - droq in which descriptors require new buffers.
+ *
+ * Description:
+ *  Called during normal DROQ processing in interrupt mode or by the poll
+ *  thread to refill the descriptors from which buffers were dispatched
+ *  to upper layers. Attempts to allocate new buffers. If that fails, moves
+ *  up buffers (that were not dispatched) to form a contiguous ring.
+ *
+ * Returns:
+ *  No of descriptors refilled.
+ *
+ * Locks:
+ * This routine is called with droq->lock held.
+ */
+static uint32_t
+lio_droq_refill(struct lio_device *lio_dev, struct lio_droq *droq)
+{
+       struct lio_droq_desc *desc_ring;
+       uint32_t desc_refilled = 0;
+       void *buf = NULL;
+
+       desc_ring = droq->desc_ring;
+
+       while (droq->refill_count && (desc_refilled < droq->max_count)) {
+               /* If a valid buffer exists (happens if there is no dispatch),
+                * reuse the buffer, else allocate.
+                */
+               if (droq->recv_buf_list[droq->refill_idx].buffer == NULL) {
+                       buf = lio_recv_buffer_alloc(lio_dev, droq->q_no);
+                       /* If a buffer could not be allocated, no point in
+                        * continuing
+                        */
+                       if (buf == NULL)
+                               break;
+
+                       droq->recv_buf_list[droq->refill_idx].buffer = buf;
+               }
+
+               desc_ring[droq->refill_idx].buffer_ptr =
+                   lio_map_ring(droq->recv_buf_list[droq->refill_idx].buffer);
+               /* Reset any previous values in the length field. */
+               droq->info_list[droq->refill_idx].length = 0;
+
+               droq->refill_idx = lio_incr_index(droq->refill_idx, 1,
+                                                 droq->max_count);
+               desc_refilled++;
+               droq->refill_count--;
+       }
+
+       if (droq->refill_count)
+               desc_refilled += lio_droq_refill_pullup_descs(droq, desc_ring);
+
+       /* if droq->refill_count
+        * The refill count would not change in pass two. We only moved buffers
+        * to close the gap in the ring, but we would still have the same no. of
+        * buffers to refill.
+        */
+       return desc_refilled;
+}
+
+static int
+lio_droq_fast_process_packet(struct lio_device *lio_dev,
+                            struct lio_droq *droq,
+                            struct rte_mbuf **rx_pkts)
+{
+       struct rte_mbuf *nicbuf = NULL;
+       struct lio_droq_info *info;
+       uint32_t total_len = 0;
+       int data_total_len = 0;
+       uint32_t pkt_len = 0;
+       union octeon_rh *rh;
+       int data_pkts = 0;
+
+       info = &droq->info_list[droq->read_idx];
+       lio_swap_8B_data((uint64_t *)info, 2);
+
+       if (!info->length)
+               return -1;
+
+       /* Len of resp hdr in included in the received data len. */
+       info->length -= OCTEON_RH_SIZE;
+       rh = &info->rh;
+
+       total_len += (uint32_t)info->length;
+
+       if (lio_opcode_slow_path(rh)) {
+               uint32_t buf_cnt;
+
+               buf_cnt = lio_droq_get_bufcount(droq->buffer_size,
+                                               (uint32_t)info->length);
+               droq->read_idx = lio_incr_index(droq->read_idx, buf_cnt,
+                                               droq->max_count);
+               droq->refill_count += buf_cnt;
+       } else {
+               if (info->length <= droq->buffer_size) {
+                       if (rh->r_dh.has_hash)
+                               pkt_len = (uint32_t)(info->length - 8);
+                       else
+                               pkt_len = (uint32_t)info->length;
+
+                       nicbuf = droq->recv_buf_list[droq->read_idx].buffer;
+                       droq->recv_buf_list[droq->read_idx].buffer = NULL;
+                       droq->read_idx = lio_incr_index(
+                                               droq->read_idx, 1,
+                                               droq->max_count);
+                       droq->refill_count++;
+
+                       if (likely(nicbuf != NULL)) {
+                               nicbuf->data_off = RTE_PKTMBUF_HEADROOM;
+                               nicbuf->nb_segs = 1;
+                               nicbuf->next = NULL;
+                               /* We don't have a way to pass flags yet */
+                               nicbuf->ol_flags = 0;
+                               if (rh->r_dh.has_hash) {
+                                       uint64_t *hash_ptr;
+
+                                       nicbuf->ol_flags |= PKT_RX_RSS_HASH;
+                                       hash_ptr = rte_pktmbuf_mtod(nicbuf,
+                                                                   uint64_t *);
+                                       lio_swap_8B_data(hash_ptr, 1);
+                                       nicbuf->hash.rss = (uint32_t)*hash_ptr;
+                                       nicbuf->data_off += 8;
+                               }
+
+                               nicbuf->pkt_len = pkt_len;
+                               nicbuf->data_len = pkt_len;
+                               nicbuf->port = lio_dev->port_id;
+                               /* Store the mbuf */
+                               rx_pkts[data_pkts++] = nicbuf;
+                               data_total_len += pkt_len;
+                       }
+
+                       /* Prefetch buffer pointers when on a cache line
+                        * boundary
+                        */
+                       if ((droq->read_idx & 3) == 0) {
+                               rte_prefetch0(
+                                   &droq->recv_buf_list[droq->read_idx]);
+                               rte_prefetch0(
+                                   &droq->info_list[droq->read_idx]);
+                       }
+               } else {
+                       struct rte_mbuf *first_buf = NULL;
+                       struct rte_mbuf *last_buf = NULL;
+
+                       while (pkt_len < info->length) {
+                               int cpy_len = 0;
+
+                               cpy_len = ((pkt_len + droq->buffer_size) >
+                                               info->length)
+                                               ? ((uint32_t)info->length -
+                                                       pkt_len)
+                                               : droq->buffer_size;
+
+                               nicbuf =
+                                   droq->recv_buf_list[droq->read_idx].buffer;
+                               droq->recv_buf_list[droq->read_idx].buffer =
+                                   NULL;
+
+                               if (likely(nicbuf != NULL)) {
+                                       /* Note the first seg */
+                                       if (!pkt_len)
+                                               first_buf = nicbuf;
+
+                                       nicbuf->data_off = RTE_PKTMBUF_HEADROOM;
+                                       nicbuf->nb_segs = 1;
+                                       nicbuf->next = NULL;
+                                       nicbuf->port = lio_dev->port_id;
+                                       /* We don't have a way to pass
+                                        * flags yet
+                                        */
+                                       nicbuf->ol_flags = 0;
+                                       if ((!pkt_len) && (rh->r_dh.has_hash)) {
+                                               uint64_t *hash_ptr;
+
+                                               nicbuf->ol_flags |=
+                                                   PKT_RX_RSS_HASH;
+                                               hash_ptr = rte_pktmbuf_mtod(
+                                                   nicbuf, uint64_t *);
+                                               lio_swap_8B_data(hash_ptr, 1);
+                                               nicbuf->hash.rss =
+                                                   (uint32_t)*hash_ptr;
+                                               nicbuf->data_off += 8;
+                                               nicbuf->pkt_len = cpy_len - 8;
+                                               nicbuf->data_len = cpy_len - 8;
+                                       } else {
+                                               nicbuf->pkt_len = cpy_len;
+                                               nicbuf->data_len = cpy_len;
+                                       }
+
+                                       if (pkt_len)
+                                               first_buf->nb_segs++;
+
+                                       if (last_buf)
+                                               last_buf->next = nicbuf;
+
+                                       last_buf = nicbuf;
+                               } else {
+                                       PMD_RX_LOG(lio_dev, ERR, "no buf\n");
+                               }
+
+                               pkt_len += cpy_len;
+                               droq->read_idx = lio_incr_index(
+                                                       droq->read_idx,
+                                                       1, droq->max_count);
+                               droq->refill_count++;
+
+                               /* Prefetch buffer pointers when on a
+                                * cache line boundary
+                                */
+                               if ((droq->read_idx & 3) == 0) {
+                                       rte_prefetch0(&droq->recv_buf_list
+                                                             [droq->read_idx]);
+
+                                       rte_prefetch0(
+                                           &droq->info_list[droq->read_idx]);
+                               }
+                       }
+                       rx_pkts[data_pkts++] = first_buf;
+                       if (rh->r_dh.has_hash)
+                               data_total_len += (pkt_len - 8);
+                       else
+                               data_total_len += pkt_len;
+               }
+
+               /* Inform upper layer about packet checksum verification */
+               struct rte_mbuf *m = rx_pkts[data_pkts - 1];
+
+               if (rh->r_dh.csum_verified & LIO_IP_CSUM_VERIFIED)
+                       m->ol_flags |= PKT_RX_IP_CKSUM_GOOD;
+
+               if (rh->r_dh.csum_verified & LIO_L4_CSUM_VERIFIED)
+                       m->ol_flags |= PKT_RX_L4_CKSUM_GOOD;
+       }
+
+       if (droq->refill_count >= droq->refill_threshold) {
+               int desc_refilled = lio_droq_refill(lio_dev, droq);
+
+               /* Flush the droq descriptor data to memory to be sure
+                * that when we update the credits the data in memory is
+                * accurate.
+                */
+               rte_wmb();
+               rte_write32(desc_refilled, droq->pkts_credit_reg);
+               /* make sure mmio write completes */
+               rte_wmb();
+       }
+
+       info->length = 0;
+       info->rh.rh64 = 0;
+
+       return data_pkts;
+}
+
+static uint32_t
+lio_droq_fast_process_packets(struct lio_device *lio_dev,
+                             struct lio_droq *droq,
+                             struct rte_mbuf **rx_pkts,
+                             uint32_t pkts_to_process)
+{
+       int ret, data_pkts = 0;
+       uint32_t pkt;
+
+       for (pkt = 0; pkt < pkts_to_process; pkt++) {
+               ret = lio_droq_fast_process_packet(lio_dev, droq,
+                                                  &rx_pkts[data_pkts]);
+               if (ret < 0) {
+                       lio_dev_err(lio_dev, "Port[%d] DROQ[%d] idx: %d len:0, pkt_cnt: %d\n",
+                                   lio_dev->port_id, droq->q_no,
+                                   droq->read_idx, pkts_to_process);
+                       break;
+               }
+               data_pkts += ret;
+       }
+
+       rte_atomic64_sub(&droq->pkts_pending, pkt);
+
+       return data_pkts;
+}
+
+static inline uint32_t
+lio_droq_check_hw_for_pkts(struct lio_droq *droq)
+{
+       uint32_t last_count;
+       uint32_t pkt_count;
+
+       pkt_count = rte_read32(droq->pkts_sent_reg);
+
+       last_count = pkt_count - droq->pkt_count;
+       droq->pkt_count = pkt_count;
+
+       if (last_count)
+               rte_atomic64_add(&droq->pkts_pending, last_count);
+
+       return last_count;
+}
+
+uint16_t
+lio_dev_recv_pkts(void *rx_queue,
+                 struct rte_mbuf **rx_pkts,
+                 uint16_t budget)
+{
+       struct lio_droq *droq = rx_queue;
+       struct lio_device *lio_dev = droq->lio_dev;
+       uint32_t pkts_processed = 0;
+       uint32_t pkt_count = 0;
+
+       lio_droq_check_hw_for_pkts(droq);
+
+       pkt_count = rte_atomic64_read(&droq->pkts_pending);
+       if (!pkt_count)
+               return 0;
+
+       if (pkt_count > budget)
+               pkt_count = budget;
+
+       /* Grab the lock */
+       rte_spinlock_lock(&droq->lock);
+       pkts_processed = lio_droq_fast_process_packets(lio_dev,
+                                                      droq, rx_pkts,
+                                                      pkt_count);
+
+       if (droq->pkt_count) {
+               rte_write32(droq->pkt_count, droq->pkts_sent_reg);
+               droq->pkt_count = 0;
+       }
+
+       /* Release the spin lock */
+       rte_spinlock_unlock(&droq->lock);
+
+       return pkts_processed;
+}
+
 /**
  *  lio_init_instr_queue()
  *  @param lio_dev     - pointer to the lio device structure.
index fc623ad..420b893 100644 (file)
@@ -515,6 +515,17 @@ lio_map_ring_info(struct lio_droq *droq, uint32_t i)
        return (uint64_t)dma_addr;
 }
 
+static inline int
+lio_opcode_slow_path(union octeon_rh *rh)
+{
+       uint16_t subcode1, subcode2;
+
+       subcode1 = LIO_OPCODE_SUBCODE(rh->r.opcode, rh->r.subcode);
+       subcode2 = LIO_OPCODE_SUBCODE(LIO_OPCODE, LIO_OPCODE_NW_DATA);
+
+       return subcode2 != subcode1;
+}
+
 /* Macro to increment index.
  * Index is incremented by count; if the sum exceeds
  * max, index is wrapped-around to the start.
@@ -533,6 +544,8 @@ lio_incr_index(uint32_t index, uint32_t count, uint32_t max)
 int lio_setup_droq(struct lio_device *lio_dev, int q_no, int num_descs,
                   int desc_size, struct rte_mempool *mpool,
                   unsigned int socket_id);
+uint16_t lio_dev_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
+                          uint16_t budget);
 
 /** Setup instruction queue zero for the device
  *  @param lio_dev which lio device to setup