eventdev: improve error handling for Rx adapter queue add/del
authorNikhil Rao <nikhil.rao@intel.com>
Mon, 2 Jul 2018 09:11:11 +0000 (14:41 +0530)
committerJerin Jacob <jerin.jacob@caviumnetworks.com>
Fri, 6 Jul 2018 04:54:49 +0000 (06:54 +0200)
The new WRR sequence applicable after queue add/del is set
up after setting the new queue state, so a memory allocation
failure will leave behind an incorrect state.

This change separates the memory sizing + allocation for the
Rx poll and WRR array from calculation of the WRR sequence.
If there is a memory allocation failure, existing Rx queue
configuration remains unchanged.

Signed-off-by: Nikhil Rao <nikhil.rao@intel.com>
lib/librte_eventdev/rte_event_eth_rx_adapter.c

index 9361d48..926f83a 100644 (file)
@@ -109,10 +109,16 @@ struct eth_device_info {
         * rx_adapter_stop callback needs to be invoked
         */
        uint8_t dev_rx_started;
-       /* If nb_dev_queues > 0, the start callback will
+       /* Number of queues added for this device */
+       uint16_t nb_dev_queues;
+       /* If nb_rx_poll > 0, the start callback will
         * be invoked if not already invoked
         */
-       uint16_t nb_dev_queues;
+       uint16_t nb_rx_poll;
+       /* sum(wrr(q)) for all queues within the device
+        * useful when deleting all device queues
+        */
+       uint32_t wrr_len;
 };
 
 /* Per Rx queue */
@@ -188,13 +194,170 @@ rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
        }
 }
 
-/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static inline int
+rxa_polled_queue(struct eth_device_info *dev_info,
+       int rx_queue_id)
+{
+       struct eth_rx_queue_info *queue_info;
+
+       queue_info = &dev_info->rx_queue[rx_queue_id];
+       return !dev_info->internal_event_port &&
+               dev_info->rx_queue &&
+               queue_info->queue_enabled && queue_info->wt != 0;
+}
+
+/* Calculate size of the eth_rx_poll and wrr_sched arrays
+ * after deleting poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
+                       struct eth_device_info *dev_info,
+                       int rx_queue_id,
+                       uint32_t *nb_rx_poll,
+                       uint32_t *nb_wrr)
+{
+       uint32_t poll_diff;
+       uint32_t wrr_len_diff;
+
+       if (rx_queue_id == -1) {
+               poll_diff = dev_info->nb_rx_poll;
+               wrr_len_diff = dev_info->wrr_len;
+       } else {
+               poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
+               wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
+                                       0;
+       }
+
+       *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
+       *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding poll mode rx queues
+ */
+static void
+rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+                       struct eth_device_info *dev_info,
+                       int rx_queue_id,
+                       uint16_t wt,
+                       uint32_t *nb_rx_poll,
+                       uint32_t *nb_wrr)
+{
+       uint32_t poll_diff;
+       uint32_t wrr_len_diff;
+
+       if (rx_queue_id == -1) {
+               poll_diff = dev_info->dev->data->nb_rx_queues -
+                                               dev_info->nb_rx_poll;
+               wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
+                               - dev_info->wrr_len;
+       } else {
+               poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
+               wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
+                               wt - dev_info->rx_queue[rx_queue_id].wt :
+                               wt;
+       }
+
+       *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
+       *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
+}
+
+/* Calculate nb_rx_* after adding rx_queue_id */
+static void
+rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
+               struct eth_device_info *dev_info,
+               int rx_queue_id,
+               uint16_t wt,
+               uint32_t *nb_rx_poll,
+               uint32_t *nb_wrr)
+{
+       rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
+                               wt, nb_rx_poll, nb_wrr);
+}
+
+/* Calculate nb_rx_* after deleting rx_queue_id */
+static void
+rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
+               struct eth_device_info *dev_info,
+               int rx_queue_id,
+               uint32_t *nb_rx_poll,
+               uint32_t *nb_wrr)
+{
+       rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
+                               nb_wrr);
+}
+
+/*
+ * Allocate the rx_poll array
+ */
+static struct eth_rx_poll_entry *
+rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
+       uint32_t num_rx_polled)
+{
+       size_t len;
+
+       len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
+                                                       RTE_CACHE_LINE_SIZE);
+       return  rte_zmalloc_socket(rx_adapter->mem_name,
+                               len,
+                               RTE_CACHE_LINE_SIZE,
+                               rx_adapter->socket_id);
+}
+
+/*
+ * Allocate the WRR array
+ */
+static uint32_t *
+rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
+{
+       size_t len;
+
+       len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
+                       RTE_CACHE_LINE_SIZE);
+       return  rte_zmalloc_socket(rx_adapter->mem_name,
+                               len,
+                               RTE_CACHE_LINE_SIZE,
+                               rx_adapter->socket_id);
+}
+
 static int
-rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
+rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
+               uint32_t nb_poll,
+               uint32_t nb_wrr,
+               struct eth_rx_poll_entry **rx_poll,
+               uint32_t **wrr_sched)
+{
+
+       if (nb_poll == 0) {
+               *rx_poll = NULL;
+               *wrr_sched = NULL;
+               return 0;
+       }
+
+       *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
+       if (*rx_poll == NULL) {
+               *wrr_sched = NULL;
+               return -ENOMEM;
+       }
+
+       *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
+       if (*wrr_sched == NULL) {
+               rte_free(*rx_poll);
+               return -ENOMEM;
+       }
+       return 0;
+}
+
+/* Precalculate WRR polling sequence for all queues in rx_adapter */
+static void
+rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
+               struct eth_rx_poll_entry *rx_poll,
+               uint32_t *rx_wrr)
 {
        uint16_t d;
        uint16_t q;
        unsigned int i;
+       int prev = -1;
+       int cw = -1;
 
        /* Initialize variables for calculation of wrr schedule */
        uint16_t max_wrr_pos = 0;
@@ -202,77 +365,48 @@ rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
        uint16_t max_wt = 0;
        uint16_t gcd = 0;
 
-       struct eth_rx_poll_entry *rx_poll = NULL;
-       uint32_t *rx_wrr = NULL;
+       if (rx_poll == NULL)
+               return;
 
-       if (rx_adapter->num_rx_polled) {
-               size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
-                               sizeof(*rx_adapter->eth_rx_poll),
-                               RTE_CACHE_LINE_SIZE);
-               rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
-                                            len,
-                                            RTE_CACHE_LINE_SIZE,
-                                            rx_adapter->socket_id);
-               if (rx_poll == NULL)
-                       return -ENOMEM;
+       /* Generate array of all queues to poll, the size of this
+        * array is poll_q
+        */
+       RTE_ETH_FOREACH_DEV(d) {
+               uint16_t nb_rx_queues;
+               struct eth_device_info *dev_info =
+                               &rx_adapter->eth_devices[d];
+               nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+               if (dev_info->rx_queue == NULL)
+                       continue;
+               if (dev_info->internal_event_port)
+                       continue;
+               dev_info->wrr_len = 0;
+               for (q = 0; q < nb_rx_queues; q++) {
+                       struct eth_rx_queue_info *queue_info =
+                               &dev_info->rx_queue[q];
+                       uint16_t wt;
 
-               /* Generate array of all queues to poll, the size of this
-                * array is poll_q
-                */
-               RTE_ETH_FOREACH_DEV(d) {
-                       uint16_t nb_rx_queues;
-                       struct eth_device_info *dev_info =
-                                       &rx_adapter->eth_devices[d];
-                       nb_rx_queues = dev_info->dev->data->nb_rx_queues;
-                       if (dev_info->rx_queue == NULL)
-                               continue;
-                       if (dev_info->internal_event_port)
+                       if (!rxa_polled_queue(dev_info, q))
                                continue;
-                       for (q = 0; q < nb_rx_queues; q++) {
-                               struct eth_rx_queue_info *queue_info =
-                                       &dev_info->rx_queue[q];
-                               if (queue_info->queue_enabled == 0)
-                                       continue;
-
-                               uint16_t wt = queue_info->wt;
-                               rx_poll[poll_q].eth_dev_id = d;
-                               rx_poll[poll_q].eth_rx_qid = q;
-                               max_wrr_pos += wt;
-                               max_wt = RTE_MAX(max_wt, wt);
-                               gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
-                               poll_q++;
-                       }
-               }
-
-               len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
-                               RTE_CACHE_LINE_SIZE);
-               rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
-                                           len,
-                                           RTE_CACHE_LINE_SIZE,
-                                           rx_adapter->socket_id);
-               if (rx_wrr == NULL) {
-                       rte_free(rx_poll);
-                       return -ENOMEM;
-               }
-
-               /* Generate polling sequence based on weights */
-               int prev = -1;
-               int cw = -1;
-               for (i = 0; i < max_wrr_pos; i++) {
-                       rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
-                                            rx_poll, max_wt, gcd, prev);
-                       prev = rx_wrr[i];
+                       wt = queue_info->wt;
+                       rx_poll[poll_q].eth_dev_id = d;
+                       rx_poll[poll_q].eth_rx_qid = q;
+                       max_wrr_pos += wt;
+                       dev_info->wrr_len += wt;
+                       max_wt = RTE_MAX(max_wt, wt);
+                       gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
+                       poll_q++;
                }
        }
 
-       rte_free(rx_adapter->eth_rx_poll);
-       rte_free(rx_adapter->wrr_sched);
-
-       rx_adapter->eth_rx_poll = rx_poll;
-       rx_adapter->wrr_sched = rx_wrr;
-       rx_adapter->wrr_len = max_wrr_pos;
-
-       return 0;
+       /* Generate polling sequence based on weights */
+       prev = -1;
+       cw = -1;
+       for (i = 0; i < max_wrr_pos; i++) {
+               rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
+                                    rx_poll, max_wt, gcd, prev);
+               prev = rx_wrr[i];
+       }
 }
 
 static inline void
@@ -719,31 +853,53 @@ rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
        }
 }
 
-static int
+static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
        struct eth_device_info *dev_info,
-       uint16_t rx_queue_id)
+       int32_t rx_queue_id)
 {
-       struct eth_rx_queue_info *queue_info;
+       int pollq;
 
        if (rx_adapter->nb_queues == 0)
-               return 0;
+               return;
 
-       queue_info = &dev_info->rx_queue[rx_queue_id];
-       rx_adapter->num_rx_polled -= queue_info->queue_enabled;
+       if (rx_queue_id == -1) {
+               uint16_t nb_rx_queues;
+               uint16_t i;
+
+               nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+               for (i = 0; i < nb_rx_queues; i++)
+                       rxa_sw_del(rx_adapter, dev_info, i);
+               return;
+       }
+
+       pollq = rxa_polled_queue(dev_info, rx_queue_id);
        rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
-       return 0;
+       rx_adapter->num_rx_polled -= pollq;
+       dev_info->nb_rx_poll -= pollq;
 }
 
 static void
 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
        struct eth_device_info *dev_info,
-       uint16_t rx_queue_id,
+       int32_t rx_queue_id,
        const struct rte_event_eth_rx_adapter_queue_conf *conf)
-
 {
        struct eth_rx_queue_info *queue_info;
        const struct rte_event *ev = &conf->ev;
+       int pollq;
+
+       if (rx_queue_id == -1) {
+               uint16_t nb_rx_queues;
+               uint16_t i;
+
+               nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+               for (i = 0; i < nb_rx_queues; i++)
+                       rxa_add_queue(rx_adapter, dev_info, i, conf);
+               return;
+       }
+
+       pollq = rxa_polled_queue(dev_info, rx_queue_id);
 
        queue_info = &dev_info->rx_queue[rx_queue_id];
        queue_info->event_queue_id = ev->queue_id;
@@ -757,9 +913,11 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
                queue_info->flow_id_mask = ~0;
        }
 
-       /* The same queue can be added more than once */
-       rx_adapter->num_rx_polled += !queue_info->queue_enabled;
        rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
+       if (rxa_polled_queue(dev_info, rx_queue_id)) {
+               rx_adapter->num_rx_polled += !pollq;
+               dev_info->nb_rx_poll += !pollq;
+       }
 }
 
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
@@ -769,8 +927,12 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
 {
        struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
        struct rte_event_eth_rx_adapter_queue_conf temp_conf;
-       uint32_t i;
        int ret;
+       struct eth_rx_poll_entry *rx_poll;
+       struct eth_rx_queue_info *rx_queue;
+       uint32_t *rx_wrr;
+       uint16_t nb_rx_queues;
+       uint32_t nb_rx_poll, nb_wrr;
 
        if (queue_conf->servicing_weight == 0) {
 
@@ -787,31 +949,51 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
                queue_conf = &temp_conf;
        }
 
+       nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+       rx_queue = dev_info->rx_queue;
+
        if (dev_info->rx_queue == NULL) {
                dev_info->rx_queue =
                    rte_zmalloc_socket(rx_adapter->mem_name,
-                                      dev_info->dev->data->nb_rx_queues *
+                                      nb_rx_queues *
                                       sizeof(struct eth_rx_queue_info), 0,
                                       rx_adapter->socket_id);
                if (dev_info->rx_queue == NULL)
                        return -ENOMEM;
        }
+       rx_wrr = NULL;
+       rx_poll = NULL;
 
-       if (rx_queue_id == -1) {
-               for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-                       rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
-       } else {
-               rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
-                       queue_conf);
-       }
+       rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
+                       queue_conf->servicing_weight,
+                       &nb_rx_poll, &nb_wrr);
 
-       ret = rxa_calc_wrr_sequence(rx_adapter);
-       if (ret) {
-               rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
-               return ret;
+       ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+                               &rx_poll, &rx_wrr);
+       if (ret)
+               goto err_free_rxqueue;
+
+       rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
+       rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+       rte_free(rx_adapter->eth_rx_poll);
+       rte_free(rx_adapter->wrr_sched);
+
+       rx_adapter->eth_rx_poll = rx_poll;
+       rx_adapter->wrr_sched = rx_wrr;
+       rx_adapter->wrr_len = nb_wrr;
+       return 0;
+
+err_free_rxqueue:
+       if (rx_queue == NULL) {
+               rte_free(dev_info->rx_queue);
+               dev_info->rx_queue = NULL;
        }
 
-       return ret;
+       rte_free(rx_poll);
+       rte_free(rx_wrr);
+
+       return 0;
 }
 
 static int
@@ -995,7 +1177,6 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
        struct rte_event_eth_rx_adapter *rx_adapter;
        struct rte_eventdev *dev;
        struct eth_device_info *dev_info;
-       int start_service;
 
        RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
        RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1038,7 +1219,6 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
                return -EINVAL;
        }
 
-       start_service = 0;
        dev_info = &rx_adapter->eth_devices[eth_dev_id];
 
        if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
@@ -1068,21 +1248,19 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
                rte_spinlock_lock(&rx_adapter->rx_lock);
                dev_info->internal_event_port = 0;
                ret = rxa_init_service(rx_adapter, id);
-               if (ret == 0)
+               if (ret == 0) {
+                       uint32_t service_id = rx_adapter->service_id;
                        ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
                                        queue_conf);
+                       rte_service_component_runstate_set(service_id,
+                               rxa_sw_adapter_queue_count(rx_adapter));
+               }
                rte_spinlock_unlock(&rx_adapter->rx_lock);
-               if (ret == 0)
-                       start_service =
-                               !!rxa_sw_adapter_queue_count(rx_adapter);
        }
 
        if (ret)
                return ret;
 
-       if (start_service)
-               rte_service_component_runstate_set(rx_adapter->service_id, 1);
-
        return 0;
 }
 
@@ -1095,7 +1273,10 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
        struct rte_event_eth_rx_adapter *rx_adapter;
        struct eth_device_info *dev_info;
        uint32_t cap;
-       uint16_t i;
+       uint32_t nb_rx_poll = 0;
+       uint32_t nb_wrr = 0;
+       struct eth_rx_poll_entry *rx_poll = NULL;
+       uint32_t *rx_wrr = NULL;
 
        RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
        RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
@@ -1137,26 +1318,31 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
                        }
                }
        } else {
-               int rc;
+               rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
+                       &nb_rx_poll, &nb_wrr);
+               ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
+                       &rx_poll, &rx_wrr);
+               if (ret)
+                       return ret;
+
                rte_spinlock_lock(&rx_adapter->rx_lock);
-               if (rx_queue_id == -1) {
-                       for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
-                               rxa_sw_del(rx_adapter, dev_info, i);
-               } else {
-                       rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
-               }
+               rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
+               rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
+
+               rte_free(rx_adapter->eth_rx_poll);
+               rte_free(rx_adapter->wrr_sched);
 
-               rc = rxa_calc_wrr_sequence(rx_adapter);
-               if (rc)
-                       RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
-                                       rc);
+               rx_adapter->eth_rx_poll = rx_poll;
+               rx_adapter->num_rx_polled = nb_rx_poll;
+               rx_adapter->wrr_sched = rx_wrr;
+               rx_adapter->wrr_len = nb_wrr;
 
                if (dev_info->nb_dev_queues == 0) {
                        rte_free(dev_info->rx_queue);
                        dev_info->rx_queue = NULL;
                }
-
                rte_spinlock_unlock(&rx_adapter->rx_lock);
+
                rte_service_component_runstate_set(rx_adapter->service_id,
                                rxa_sw_adapter_queue_count(rx_adapter));
        }