event/dlb2: add delayed token pop logic
authorTimothy McDaniel <timothy.mcdaniel@intel.com>
Wed, 11 Nov 2020 20:26:59 +0000 (14:26 -0600)
committerJerin Jacob <jerinj@marvell.com>
Thu, 12 Nov 2020 17:40:22 +0000 (18:40 +0100)
The code contained in this commit was inadvertently omitted
when dissecting the dlb2 code base into discrete patches for
upstream.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
Reviewed-by: Mike Ximing Chen <mike.ximing.chen@intel.com>
drivers/event/dlb2/dlb2.c
drivers/event/dlb2/dlb2_selftest.c

index d42e48b..8672486 100644 (file)
@@ -1082,6 +1082,25 @@ error_exit:
        return ret;
 }
 
+static inline uint16_t
+dlb2_event_enqueue_delayed(void *event_port,
+                          const struct rte_event events[]);
+
+static inline uint16_t
+dlb2_event_enqueue_burst_delayed(void *event_port,
+                                const struct rte_event events[],
+                                uint16_t num);
+
+static inline uint16_t
+dlb2_event_enqueue_new_burst_delayed(void *event_port,
+                                    const struct rte_event events[],
+                                    uint16_t num);
+
+static inline uint16_t
+dlb2_event_enqueue_forward_burst_delayed(void *event_port,
+                                        const struct rte_event events[],
+                                        uint16_t num);
+
 static int
 dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
                        struct dlb2_eventdev_port *ev_port,
@@ -1198,6 +1217,20 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
 
        qm_port->dequeue_depth = dequeue_depth;
        qm_port->token_pop_thresh = dequeue_depth;
+
+       /* The default enqueue functions do not include delayed-pop support for
+        * performance reasons.
+        */
+       if (qm_port->token_pop_mode == DELAYED_POP) {
+               dlb2->event_dev->enqueue = dlb2_event_enqueue_delayed;
+               dlb2->event_dev->enqueue_burst =
+                       dlb2_event_enqueue_burst_delayed;
+               dlb2->event_dev->enqueue_new_burst =
+                       dlb2_event_enqueue_new_burst_delayed;
+               dlb2->event_dev->enqueue_forward_burst =
+                       dlb2_event_enqueue_forward_burst_delayed;
+       }
+
        qm_port->owed_tokens = 0;
        qm_port->issued_releases = 0;
 
@@ -2427,11 +2460,6 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
        case 3:
        case 2:
        case 1:
-               /* At least one QE will be valid, so only zero out three */
-               qe[1].cmd_byte = 0;
-               qe[2].cmd_byte = 0;
-               qe[3].cmd_byte = 0;
-
                for (i = 0; i < num; i++) {
                        qe[i].cmd_byte =
                                cmd_byte_map[qm_port->is_directed][ev[i].op];
@@ -2452,6 +2480,8 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
                        qe[i].u.event_type.sub = ev[i].sub_event_type;
                }
                break;
+       case 0:
+               break;
        }
 }
 
@@ -2578,29 +2608,57 @@ op_check:
 }
 
 static inline uint16_t
-dlb2_event_enqueue_burst(void *event_port,
-                        const struct rte_event events[],
-                        uint16_t num)
+__dlb2_event_enqueue_burst(void *event_port,
+                          const struct rte_event events[],
+                          uint16_t num,
+                          bool use_delayed)
 {
        struct dlb2_eventdev_port *ev_port = event_port;
        struct dlb2_port *qm_port = &ev_port->qm_port;
        struct process_local_port_data *port_data;
-       int i, cnt;
+       int i;
 
        RTE_ASSERT(ev_port->enq_configured);
        RTE_ASSERT(events != NULL);
 
-       cnt = 0;
+       i = 0;
 
        port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
 
-       for (i = 0; i < num; i += DLB2_NUM_QES_PER_CACHE_LINE) {
+       while (i < num) {
                uint8_t sched_types[DLB2_NUM_QES_PER_CACHE_LINE];
                uint8_t queue_ids[DLB2_NUM_QES_PER_CACHE_LINE];
+               int pop_offs = 0;
                int j = 0;
 
+               memset(qm_port->qe4,
+                      0,
+                      DLB2_NUM_QES_PER_CACHE_LINE *
+                      sizeof(struct dlb2_enqueue_qe));
+
                for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
                        const struct rte_event *ev = &events[i + j];
+                       int16_t thresh = qm_port->token_pop_thresh;
+
+                       if (use_delayed &&
+                           qm_port->token_pop_mode == DELAYED_POP &&
+                           (ev->op == RTE_EVENT_OP_FORWARD ||
+                            ev->op == RTE_EVENT_OP_RELEASE) &&
+                           qm_port->issued_releases >= thresh - 1) {
+                               /* Insert the token pop QE and break out. This
+                                * may result in a partial HCW, but that is
+                                * simpler than supporting arbitrary QE
+                                * insertion.
+                                */
+                               dlb2_construct_token_pop_qe(qm_port, j);
+
+                               /* Reset the releases for the next QE batch */
+                               qm_port->issued_releases -= thresh;
+
+                               pop_offs = 1;
+                               j++;
+                               break;
+                       }
 
                        if (dlb2_event_enqueue_prep(ev_port, qm_port, ev,
                                                    &sched_types[j],
@@ -2611,38 +2669,52 @@ dlb2_event_enqueue_burst(void *event_port,
                if (j == 0)
                        break;
 
-               dlb2_event_build_hcws(qm_port, &events[i], j,
+               dlb2_event_build_hcws(qm_port, &events[i], j - pop_offs,
                                      sched_types, queue_ids);
 
-               if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
-                   qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
-                       dlb2_construct_token_pop_qe(qm_port, j);
-
-                       /* Reset the releases counter for the next QE batch */
-                       qm_port->issued_releases -= qm_port->token_pop_thresh;
-               }
-
                dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
 
-               cnt += j;
+               /* Don't include the token pop QE in the enqueue count */
+               i += j - pop_offs;
 
-               if (j < DLB2_NUM_QES_PER_CACHE_LINE)
+               /* Don't interpret j < DLB2_NUM_... as out-of-credits if
+                * pop_offs != 0
+                */
+               if (j < DLB2_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
                        break;
        }
 
-       if (qm_port->token_pop_mode == DELAYED_POP &&
-           qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
-               dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
-               qm_port->issued_releases -= qm_port->token_pop_thresh;
-       }
-       return cnt;
+       return i;
+}
+
+static uint16_t
+dlb2_event_enqueue_burst(void *event_port,
+                            const struct rte_event events[],
+                            uint16_t num)
+{
+       return __dlb2_event_enqueue_burst(event_port, events, num, false);
+}
+
+static uint16_t
+dlb2_event_enqueue_burst_delayed(void *event_port,
+                                    const struct rte_event events[],
+                                    uint16_t num)
+{
+       return __dlb2_event_enqueue_burst(event_port, events, num, true);
 }
 
 static inline uint16_t
 dlb2_event_enqueue(void *event_port,
                   const struct rte_event events[])
 {
-       return dlb2_event_enqueue_burst(event_port, events, 1);
+       return __dlb2_event_enqueue_burst(event_port, events, 1, false);
+}
+
+static inline uint16_t
+dlb2_event_enqueue_delayed(void *event_port,
+                          const struct rte_event events[])
+{
+       return __dlb2_event_enqueue_burst(event_port, events, 1, true);
 }
 
 static uint16_t
@@ -2650,7 +2722,15 @@ dlb2_event_enqueue_new_burst(void *event_port,
                             const struct rte_event events[],
                             uint16_t num)
 {
-       return dlb2_event_enqueue_burst(event_port, events, num);
+       return __dlb2_event_enqueue_burst(event_port, events, num, false);
+}
+
+static uint16_t
+dlb2_event_enqueue_new_burst_delayed(void *event_port,
+                                    const struct rte_event events[],
+                                    uint16_t num)
+{
+       return __dlb2_event_enqueue_burst(event_port, events, num, true);
 }
 
 static uint16_t
@@ -2658,7 +2738,93 @@ dlb2_event_enqueue_forward_burst(void *event_port,
                                 const struct rte_event events[],
                                 uint16_t num)
 {
-       return dlb2_event_enqueue_burst(event_port, events, num);
+       return __dlb2_event_enqueue_burst(event_port, events, num, false);
+}
+
+static uint16_t
+dlb2_event_enqueue_forward_burst_delayed(void *event_port,
+                                        const struct rte_event events[],
+                                        uint16_t num)
+{
+       return __dlb2_event_enqueue_burst(event_port, events, num, true);
+}
+
+static void
+dlb2_event_release(struct dlb2_eventdev *dlb2,
+                  uint8_t port_id,
+                  int n)
+{
+       struct process_local_port_data *port_data;
+       struct dlb2_eventdev_port *ev_port;
+       struct dlb2_port *qm_port;
+       int i;
+
+       if (port_id > dlb2->num_ports) {
+               DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
+                            port_id);
+               rte_errno = -EINVAL;
+               return;
+       }
+
+       ev_port = &dlb2->ev_ports[port_id];
+       qm_port = &ev_port->qm_port;
+       port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
+
+       i = 0;
+
+       if (qm_port->is_directed) {
+               i = n;
+               goto sw_credit_update;
+       }
+
+       while (i < n) {
+               int pop_offs = 0;
+               int j = 0;
+
+               /* Zero-out QEs */
+               qm_port->qe4[0].cmd_byte = 0;
+               qm_port->qe4[1].cmd_byte = 0;
+               qm_port->qe4[2].cmd_byte = 0;
+               qm_port->qe4[3].cmd_byte = 0;
+
+               for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
+                       int16_t thresh = qm_port->token_pop_thresh;
+
+                       if (qm_port->token_pop_mode == DELAYED_POP &&
+                           qm_port->issued_releases >= thresh - 1) {
+                               /* Insert the token pop QE */
+                               dlb2_construct_token_pop_qe(qm_port, j);
+
+                               /* Reset the releases for the next QE batch */
+                               qm_port->issued_releases -= thresh;
+
+                               pop_offs = 1;
+                               j++;
+                               break;
+                       }
+
+                       qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
+                       qm_port->issued_releases++;
+               }
+
+               dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
+
+               /* Don't include the token pop QE in the release count */
+               i += j - pop_offs;
+       }
+
+sw_credit_update:
+       /* each release returns one credit */
+       if (!ev_port->outstanding_releases) {
+               DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
+                            __func__);
+               return;
+       }
+       ev_port->outstanding_releases -= i;
+       ev_port->inflight_credits += i;
+
+       /* Replenish s/w credits if enough releases are performed */
+       dlb2_replenish_sw_credits(dlb2, ev_port);
 }
 
 static inline void
@@ -3067,86 +3233,6 @@ dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
        qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
 }
 
-static int
-dlb2_event_release(struct dlb2_eventdev *dlb2,
-                  uint8_t port_id,
-                  int n)
-{
-       struct process_local_port_data *port_data;
-       struct dlb2_eventdev_port *ev_port;
-       struct dlb2_port *qm_port;
-       int i, cnt;
-
-       if (port_id > dlb2->num_ports) {
-               DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
-                            port_id);
-               rte_errno = -EINVAL;
-               return rte_errno;
-       }
-
-       ev_port = &dlb2->ev_ports[port_id];
-       qm_port = &ev_port->qm_port;
-       port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
-
-       cnt = 0;
-
-       if (qm_port->is_directed) {
-               cnt = n;
-               goto sw_credit_update;
-       }
-
-       for (i = 0; i < n; i += DLB2_NUM_QES_PER_CACHE_LINE) {
-               int j;
-
-               /* Zero-out QEs */
-               qm_port->qe4[0].cmd_byte = 0;
-               qm_port->qe4[1].cmd_byte = 0;
-               qm_port->qe4[2].cmd_byte = 0;
-               qm_port->qe4[3].cmd_byte = 0;
-
-               for (j = 0; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++)
-                       qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
-
-               qm_port->issued_releases += j;
-
-               if (j == 0)
-                       break;
-
-               if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
-                   qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
-                       dlb2_construct_token_pop_qe(qm_port, j);
-
-                       /* Reset the releases counter for the next QE batch */
-                       qm_port->issued_releases -= qm_port->token_pop_thresh;
-               }
-
-               dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
-
-               cnt += j;
-       }
-
-       if (qm_port->token_pop_mode == DELAYED_POP &&
-           qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
-               dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
-               qm_port->issued_releases -= qm_port->token_pop_thresh;
-       }
-
-sw_credit_update:
-       /* each release returns one credit */
-       if (!ev_port->outstanding_releases) {
-               DLB2_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
-               rte_errno = -ENOTRECOVERABLE;
-               return rte_errno;
-       }
-
-       ev_port->outstanding_releases -= cnt;
-       ev_port->inflight_credits += cnt;
-
-       /* Replenish s/w credits if enough releases are performed */
-       dlb2_replenish_sw_credits(dlb2, ev_port);
-       return 0;
-}
-
 static inline int16_t
 dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
                       struct dlb2_eventdev_port *ev_port,
@@ -3367,8 +3453,7 @@ dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
        if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
                uint16_t out_rels = ev_port->outstanding_releases;
 
-               if (dlb2_event_release(dlb2, ev_port->id, out_rels))
-                       return 0; /* rte_errno is set */
+               dlb2_event_release(dlb2, ev_port->id, out_rels);
 
                DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
        }
@@ -3405,8 +3490,7 @@ dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
        if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
                uint16_t out_rels = ev_port->outstanding_releases;
 
-               if (dlb2_event_release(dlb2, ev_port->id, out_rels))
-                       return 0; /* rte_errno is set */
+               dlb2_event_release(dlb2, ev_port->id, out_rels);
 
                DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
        }
index f433654..5cf66c5 100644 (file)
@@ -1320,7 +1320,7 @@ test_delayed_pop(void)
                }
        }
 
-       /* Dequeue dequeue_depth events but only release dequeue_depth - 2.
+       /* Dequeue dequeue_depth events but only release dequeue_depth - 1.
         * Delayed pop won't perform the pop and no more events will be
         * scheduled.
         */
@@ -1336,7 +1336,7 @@ test_delayed_pop(void)
 
        ev.op = RTE_EVENT_OP_RELEASE;
 
-       for (i = 0; i < port_conf.dequeue_depth - 2; i++) {
+       for (i = 0; i < port_conf.dequeue_depth - 1; i++) {
                if (rte_event_enqueue_burst(evdev, 0, &ev, 1) != 1) {
                        printf("%d: RELEASE enqueue expected to succeed\n",
                               __LINE__);