power: support callbacks for multiple Rx queues
[dpdk.git] / lib / power / rte_power_pmd_mgmt.c
index 9b95cf1..3077279 100644 (file)
@@ -33,18 +33,98 @@ enum pmd_mgmt_state {
        PMD_MGMT_ENABLED
 };
 
-struct pmd_queue_cfg {
+union queue {
+       uint32_t val;
+       struct {
+               uint16_t portid;
+               uint16_t qid;
+       };
+};
+
+struct queue_list_entry {
+       TAILQ_ENTRY(queue_list_entry) next;
+       union queue queue;
+       uint64_t n_empty_polls;
+       uint64_t n_sleeps;
+       const struct rte_eth_rxtx_callback *cb;
+};
+
+struct pmd_core_cfg {
+       TAILQ_HEAD(queue_list_head, queue_list_entry) head;
+       /**< List of queues associated with this lcore */
+       size_t n_queues;
+       /**< How many queues are in the list? */
        volatile enum pmd_mgmt_state pwr_mgmt_state;
        /**< State of power management for this queue */
        enum rte_power_pmd_mgmt_type cb_mode;
        /**< Callback mode for this queue */
-       const struct rte_eth_rxtx_callback *cur_cb;
-       /**< Callback instance */
-       uint64_t empty_poll_stats;
-       /**< Number of empty polls */
+       uint64_t n_queues_ready_to_sleep;
+       /**< Number of queues ready to enter power optimized state */
+       uint64_t sleep_target;
+       /**< Prevent a queue from triggering sleep multiple times */
 } __rte_cache_aligned;
+static struct pmd_core_cfg lcore_cfgs[RTE_MAX_LCORE];
+
+static inline bool
+queue_equal(const union queue *l, const union queue *r)
+{
+       return l->val == r->val;
+}
+
+static inline void
+queue_copy(union queue *dst, const union queue *src)
+{
+       dst->val = src->val;
+}
+
+static struct queue_list_entry *
+queue_list_find(const struct pmd_core_cfg *cfg, const union queue *q)
+{
+       struct queue_list_entry *cur;
+
+       TAILQ_FOREACH(cur, &cfg->head, next) {
+               if (queue_equal(&cur->queue, q))
+                       return cur;
+       }
+       return NULL;
+}
+
+static int
+queue_list_add(struct pmd_core_cfg *cfg, const union queue *q)
+{
+       struct queue_list_entry *qle;
+
+       /* is it already in the list? */
+       if (queue_list_find(cfg, q) != NULL)
+               return -EEXIST;
+
+       qle = malloc(sizeof(*qle));
+       if (qle == NULL)
+               return -ENOMEM;
+       memset(qle, 0, sizeof(*qle));
+
+       queue_copy(&qle->queue, q);
+       TAILQ_INSERT_TAIL(&cfg->head, qle, next);
+       cfg->n_queues++;
+
+       return 0;
+}
 
-static struct pmd_queue_cfg port_cfg[RTE_MAX_ETHPORTS][RTE_MAX_QUEUES_PER_PORT];
+static struct queue_list_entry *
+queue_list_take(struct pmd_core_cfg *cfg, const union queue *q)
+{
+       struct queue_list_entry *found;
+
+       found = queue_list_find(cfg, q);
+       if (found == NULL)
+               return NULL;
+
+       TAILQ_REMOVE(&cfg->head, found, next);
+       cfg->n_queues--;
+
+       /* freeing is responsibility of the caller */
+       return found;
+}
 
 static void
 calc_tsc(void)
@@ -74,21 +154,79 @@ calc_tsc(void)
        }
 }
 
-static uint16_t
-clb_umwait(uint16_t port_id, uint16_t qidx, struct rte_mbuf **pkts __rte_unused,
-               uint16_t nb_rx, uint16_t max_pkts __rte_unused,
-               void *addr __rte_unused)
+static inline void
+queue_reset(struct pmd_core_cfg *cfg, struct queue_list_entry *qcfg)
 {
+       const bool is_ready_to_sleep = qcfg->n_sleeps == cfg->sleep_target;
+
+       /* reset empty poll counter for this queue */
+       qcfg->n_empty_polls = 0;
+       /* reset the queue sleep counter as well */
+       qcfg->n_sleeps = 0;
+       /* remove the queue from list of queues ready to sleep */
+       if (is_ready_to_sleep)
+               cfg->n_queues_ready_to_sleep--;
+       /*
+        * no need change the lcore sleep target counter because this lcore will
+        * reach the n_sleeps anyway, and the other cores are already counted so
+        * there's no need to do anything else.
+        */
+}
+
+static inline bool
+queue_can_sleep(struct pmd_core_cfg *cfg, struct queue_list_entry *qcfg)
+{
+       /* this function is called - that means we have an empty poll */
+       qcfg->n_empty_polls++;
+
+       /* if we haven't reached threshold for empty polls, we can't sleep */
+       if (qcfg->n_empty_polls <= EMPTYPOLL_MAX)
+               return false;
+
+       /*
+        * we've reached a point where we are able to sleep, but we still need
+        * to check if this queue has already been marked for sleeping.
+        */
+       if (qcfg->n_sleeps == cfg->sleep_target)
+               return true;
 
-       struct pmd_queue_cfg *q_conf;
+       /* mark this queue as ready for sleep */
+       qcfg->n_sleeps = cfg->sleep_target;
+       cfg->n_queues_ready_to_sleep++;
 
-       q_conf = &port_cfg[port_id][qidx];
+       return true;
+}
+
+static inline bool
+lcore_can_sleep(struct pmd_core_cfg *cfg)
+{
+       /* are all queues ready to sleep? */
+       if (cfg->n_queues_ready_to_sleep != cfg->n_queues)
+               return false;
+
+       /* we've reached an iteration where we can sleep, reset sleep counter */
+       cfg->n_queues_ready_to_sleep = 0;
+       cfg->sleep_target++;
+       /*
+        * we do not reset any individual queue empty poll counters, because
+        * we want to keep sleeping on every poll until we actually get traffic.
+        */
+
+       return true;
+}
 
+static uint16_t
+clb_umwait(uint16_t port_id, uint16_t qidx, struct rte_mbuf **pkts __rte_unused,
+               uint16_t nb_rx, uint16_t max_pkts __rte_unused, void *arg)
+{
+       struct queue_list_entry *queue_conf = arg;
+
+       /* this callback can't do more than one queue, omit multiqueue logic */
        if (unlikely(nb_rx == 0)) {
-               q_conf->empty_poll_stats++;
-               if (unlikely(q_conf->empty_poll_stats > EMPTYPOLL_MAX)) {
+               queue_conf->n_empty_polls++;
+               if (unlikely(queue_conf->n_empty_polls > EMPTYPOLL_MAX)) {
                        struct rte_power_monitor_cond pmc;
-                       uint16_t ret;
+                       int ret;
 
                        /* use monitoring condition to sleep */
                        ret = rte_eth_get_monitor_addr(port_id, qidx,
@@ -97,60 +235,77 @@ clb_umwait(uint16_t port_id, uint16_t qidx, struct rte_mbuf **pkts __rte_unused,
                                rte_power_monitor(&pmc, UINT64_MAX);
                }
        } else
-               q_conf->empty_poll_stats = 0;
+               queue_conf->n_empty_polls = 0;
 
        return nb_rx;
 }
 
 static uint16_t
-clb_pause(uint16_t port_id, uint16_t qidx, struct rte_mbuf **pkts __rte_unused,
-               uint16_t nb_rx, uint16_t max_pkts __rte_unused,
-               void *addr __rte_unused)
+clb_pause(uint16_t port_id __rte_unused, uint16_t qidx __rte_unused,
+               struct rte_mbuf **pkts __rte_unused, uint16_t nb_rx,
+               uint16_t max_pkts __rte_unused, void *arg)
 {
-       struct pmd_queue_cfg *q_conf;
-
-       q_conf = &port_cfg[port_id][qidx];
-
-       if (unlikely(nb_rx == 0)) {
-               q_conf->empty_poll_stats++;
-               /* sleep for 1 microsecond */
-               if (unlikely(q_conf->empty_poll_stats > EMPTYPOLL_MAX)) {
-                       /* use tpause if we have it */
-                       if (global_data.intrinsics_support.power_pause) {
-                               const uint64_t cur = rte_rdtsc();
-                               const uint64_t wait_tsc =
-                                               cur + global_data.tsc_per_us;
-                               rte_power_pause(wait_tsc);
-                       } else {
-                               uint64_t i;
-                               for (i = 0; i < global_data.pause_per_us; i++)
-                                       rte_pause();
-                       }
+       const unsigned int lcore = rte_lcore_id();
+       struct queue_list_entry *queue_conf = arg;
+       struct pmd_core_cfg *lcore_conf;
+       const bool empty = nb_rx == 0;
+
+       lcore_conf = &lcore_cfgs[lcore];
+
+       if (likely(!empty))
+               /* early exit */
+               queue_reset(lcore_conf, queue_conf);
+       else {
+               /* can this queue sleep? */
+               if (!queue_can_sleep(lcore_conf, queue_conf))
+                       return nb_rx;
+
+               /* can this lcore sleep? */
+               if (!lcore_can_sleep(lcore_conf))
+                       return nb_rx;
+
+               /* sleep for 1 microsecond, use tpause if we have it */
+               if (global_data.intrinsics_support.power_pause) {
+                       const uint64_t cur = rte_rdtsc();
+                       const uint64_t wait_tsc =
+                                       cur + global_data.tsc_per_us;
+                       rte_power_pause(wait_tsc);
+               } else {
+                       uint64_t i;
+                       for (i = 0; i < global_data.pause_per_us; i++)
+                               rte_pause();
                }
-       } else
-               q_conf->empty_poll_stats = 0;
+       }
 
        return nb_rx;
 }
 
 static uint16_t
-clb_scale_freq(uint16_t port_id, uint16_t qidx,
+clb_scale_freq(uint16_t port_id __rte_unused, uint16_t qidx __rte_unused,
                struct rte_mbuf **pkts __rte_unused, uint16_t nb_rx,
-               uint16_t max_pkts __rte_unused, void *_  __rte_unused)
+               uint16_t max_pkts __rte_unused, void *arg)
 {
-       struct pmd_queue_cfg *q_conf;
+       const unsigned int lcore = rte_lcore_id();
+       const bool empty = nb_rx == 0;
+       struct pmd_core_cfg *lcore_conf = &lcore_cfgs[lcore];
+       struct queue_list_entry *queue_conf = arg;
 
-       q_conf = &port_cfg[port_id][qidx];
+       if (likely(!empty)) {
+               /* early exit */
+               queue_reset(lcore_conf, queue_conf);
 
-       if (unlikely(nb_rx == 0)) {
-               q_conf->empty_poll_stats++;
-               if (unlikely(q_conf->empty_poll_stats > EMPTYPOLL_MAX))
-                       /* scale down freq */
-                       rte_power_freq_min(rte_lcore_id());
-       } else {
-               q_conf->empty_poll_stats = 0;
-               /* scale up freq */
+               /* scale up freq immediately */
                rte_power_freq_max(rte_lcore_id());
+       } else {
+               /* can this queue sleep? */
+               if (!queue_can_sleep(lcore_conf, queue_conf))
+                       return nb_rx;
+
+               /* can this lcore sleep? */
+               if (!lcore_can_sleep(lcore_conf))
+                       return nb_rx;
+
+               rte_power_freq_min(rte_lcore_id());
        }
 
        return nb_rx;
@@ -167,11 +322,80 @@ queue_stopped(const uint16_t port_id, const uint16_t queue_id)
        return qinfo.queue_state == RTE_ETH_QUEUE_STATE_STOPPED;
 }
 
+static int
+cfg_queues_stopped(struct pmd_core_cfg *queue_cfg)
+{
+       const struct queue_list_entry *entry;
+
+       TAILQ_FOREACH(entry, &queue_cfg->head, next) {
+               const union queue *q = &entry->queue;
+               int ret = queue_stopped(q->portid, q->qid);
+               if (ret != 1)
+                       return ret;
+       }
+       return 1;
+}
+
+static int
+check_scale(unsigned int lcore)
+{
+       enum power_management_env env;
+
+       /* only PSTATE and ACPI modes are supported */
+       if (!rte_power_check_env_supported(PM_ENV_ACPI_CPUFREQ) &&
+           !rte_power_check_env_supported(PM_ENV_PSTATE_CPUFREQ)) {
+               RTE_LOG(DEBUG, POWER, "Neither ACPI nor PSTATE modes are supported\n");
+               return -ENOTSUP;
+       }
+       /* ensure we could initialize the power library */
+       if (rte_power_init(lcore))
+               return -EINVAL;
+
+       /* ensure we initialized the correct env */
+       env = rte_power_get_env();
+       if (env != PM_ENV_ACPI_CPUFREQ && env != PM_ENV_PSTATE_CPUFREQ) {
+               RTE_LOG(DEBUG, POWER, "Neither ACPI nor PSTATE modes were initialized\n");
+               return -ENOTSUP;
+       }
+
+       /* we're done */
+       return 0;
+}
+
+static int
+check_monitor(struct pmd_core_cfg *cfg, const union queue *qdata)
+{
+       struct rte_power_monitor_cond dummy;
+
+       /* check if rte_power_monitor is supported */
+       if (!global_data.intrinsics_support.power_monitor) {
+               RTE_LOG(DEBUG, POWER, "Monitoring intrinsics are not supported\n");
+               return -ENOTSUP;
+       }
+
+       if (cfg->n_queues > 0) {
+               RTE_LOG(DEBUG, POWER, "Monitoring multiple queues is not supported\n");
+               return -ENOTSUP;
+       }
+
+       /* check if the device supports the necessary PMD API */
+       if (rte_eth_get_monitor_addr(qdata->portid, qdata->qid,
+                       &dummy) == -ENOTSUP) {
+               RTE_LOG(DEBUG, POWER, "The device does not support rte_eth_get_monitor_addr\n");
+               return -ENOTSUP;
+       }
+
+       /* we're done */
+       return 0;
+}
+
 int
 rte_power_ethdev_pmgmt_queue_enable(unsigned int lcore_id, uint16_t port_id,
                uint16_t queue_id, enum rte_power_pmd_mgmt_type mode)
 {
-       struct pmd_queue_cfg *queue_cfg;
+       const union queue qdata = {.portid = port_id, .qid = queue_id};
+       struct pmd_core_cfg *lcore_cfg;
+       struct queue_list_entry *queue_cfg;
        struct rte_eth_dev_info info;
        rte_rx_callback_fn clb;
        int ret;
@@ -202,9 +426,19 @@ rte_power_ethdev_pmgmt_queue_enable(unsigned int lcore_id, uint16_t port_id,
                goto end;
        }
 
-       queue_cfg = &port_cfg[port_id][queue_id];
+       lcore_cfg = &lcore_cfgs[lcore_id];
+
+       /* check if other queues are stopped as well */
+       ret = cfg_queues_stopped(lcore_cfg);
+       if (ret != 1) {
+               /* error means invalid queue, 0 means queue wasn't stopped */
+               ret = ret < 0 ? -EINVAL : -EBUSY;
+               goto end;
+       }
 
-       if (queue_cfg->pwr_mgmt_state != PMD_MGMT_DISABLED) {
+       /* if callback was already enabled, check current callback type */
+       if (lcore_cfg->pwr_mgmt_state != PMD_MGMT_DISABLED &&
+                       lcore_cfg->cb_mode != mode) {
                ret = -EINVAL;
                goto end;
        }
@@ -214,53 +448,20 @@ rte_power_ethdev_pmgmt_queue_enable(unsigned int lcore_id, uint16_t port_id,
 
        switch (mode) {
        case RTE_POWER_MGMT_TYPE_MONITOR:
-       {
-               struct rte_power_monitor_cond dummy;
-
-               /* check if rte_power_monitor is supported */
-               if (!global_data.intrinsics_support.power_monitor) {
-                       RTE_LOG(DEBUG, POWER, "Monitoring intrinsics are not supported\n");
-                       ret = -ENOTSUP;
+               /* check if we can add a new queue */
+               ret = check_monitor(lcore_cfg, &qdata);
+               if (ret < 0)
                        goto end;
-               }
 
-               /* check if the device supports the necessary PMD API */
-               if (rte_eth_get_monitor_addr(port_id, queue_id,
-                               &dummy) == -ENOTSUP) {
-                       RTE_LOG(DEBUG, POWER, "The device does not support rte_eth_get_monitor_addr\n");
-                       ret = -ENOTSUP;
-                       goto end;
-               }
                clb = clb_umwait;
                break;
-       }
        case RTE_POWER_MGMT_TYPE_SCALE:
-       {
-               enum power_management_env env;
-               /* only PSTATE and ACPI modes are supported */
-               if (!rte_power_check_env_supported(PM_ENV_ACPI_CPUFREQ) &&
-                               !rte_power_check_env_supported(
-                                       PM_ENV_PSTATE_CPUFREQ)) {
-                       RTE_LOG(DEBUG, POWER, "Neither ACPI nor PSTATE modes are supported\n");
-                       ret = -ENOTSUP;
-                       goto end;
-               }
-               /* ensure we could initialize the power library */
-               if (rte_power_init(lcore_id)) {
-                       ret = -EINVAL;
-                       goto end;
-               }
-               /* ensure we initialized the correct env */
-               env = rte_power_get_env();
-               if (env != PM_ENV_ACPI_CPUFREQ &&
-                               env != PM_ENV_PSTATE_CPUFREQ) {
-                       RTE_LOG(DEBUG, POWER, "Neither ACPI nor PSTATE modes were initialized\n");
-                       ret = -ENOTSUP;
+               /* check if we can add a new queue */
+               ret = check_scale(lcore_id);
+               if (ret < 0)
                        goto end;
-               }
                clb = clb_scale_freq;
                break;
-       }
        case RTE_POWER_MGMT_TYPE_PAUSE:
                /* figure out various time-to-tsc conversions */
                if (global_data.tsc_per_us == 0)
@@ -273,13 +474,27 @@ rte_power_ethdev_pmgmt_queue_enable(unsigned int lcore_id, uint16_t port_id,
                ret = -EINVAL;
                goto end;
        }
+       /* add this queue to the list */
+       ret = queue_list_add(lcore_cfg, &qdata);
+       if (ret < 0) {
+               RTE_LOG(DEBUG, POWER, "Failed to add queue to list: %s\n",
+                               strerror(-ret));
+               goto end;
+       }
+       /* new queue is always added last */
+       queue_cfg = TAILQ_LAST(&lcore_cfg->head, queue_list_head);
+
+       /* when enabling first queue, ensure sleep target is not 0 */
+       if (lcore_cfg->n_queues == 1 && lcore_cfg->sleep_target == 0)
+               lcore_cfg->sleep_target = 1;
 
        /* initialize data before enabling the callback */
-       queue_cfg->empty_poll_stats = 0;
-       queue_cfg->cb_mode = mode;
-       queue_cfg->pwr_mgmt_state = PMD_MGMT_ENABLED;
-       queue_cfg->cur_cb = rte_eth_add_rx_callback(port_id, queue_id,
-                       clb, NULL);
+       if (lcore_cfg->n_queues == 1) {
+               lcore_cfg->cb_mode = mode;
+               lcore_cfg->pwr_mgmt_state = PMD_MGMT_ENABLED;
+       }
+       queue_cfg->cb = rte_eth_add_rx_callback(port_id, queue_id,
+                       clb, queue_cfg);
 
        ret = 0;
 end:
@@ -290,7 +505,9 @@ int
 rte_power_ethdev_pmgmt_queue_disable(unsigned int lcore_id,
                uint16_t port_id, uint16_t queue_id)
 {
-       struct pmd_queue_cfg *queue_cfg;
+       const union queue qdata = {.portid = port_id, .qid = queue_id};
+       struct pmd_core_cfg *lcore_cfg;
+       struct queue_list_entry *queue_cfg;
        int ret;
 
        RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
@@ -306,24 +523,40 @@ rte_power_ethdev_pmgmt_queue_disable(unsigned int lcore_id,
        }
 
        /* no need to check queue id as wrong queue id would not be enabled */
-       queue_cfg = &port_cfg[port_id][queue_id];
+       lcore_cfg = &lcore_cfgs[lcore_id];
+
+       /* check if other queues are stopped as well */
+       ret = cfg_queues_stopped(lcore_cfg);
+       if (ret != 1) {
+               /* error means invalid queue, 0 means queue wasn't stopped */
+               return ret < 0 ? -EINVAL : -EBUSY;
+       }
 
-       if (queue_cfg->pwr_mgmt_state != PMD_MGMT_ENABLED)
+       if (lcore_cfg->pwr_mgmt_state != PMD_MGMT_ENABLED)
                return -EINVAL;
 
-       /* stop any callbacks from progressing */
-       queue_cfg->pwr_mgmt_state = PMD_MGMT_DISABLED;
+       /*
+        * There is no good/easy way to do this without race conditions, so we
+        * are just going to throw our hands in the air and hope that the user
+        * has read the documentation and has ensured that ports are stopped at
+        * the time we enter the API functions.
+        */
+       queue_cfg = queue_list_take(lcore_cfg, &qdata);
+       if (queue_cfg == NULL)
+               return -ENOENT;
 
-       switch (queue_cfg->cb_mode) {
+       /* if we've removed all queues from the lists, set state to disabled */
+       if (lcore_cfg->n_queues == 0)
+               lcore_cfg->pwr_mgmt_state = PMD_MGMT_DISABLED;
+
+       switch (lcore_cfg->cb_mode) {
        case RTE_POWER_MGMT_TYPE_MONITOR: /* fall-through */
        case RTE_POWER_MGMT_TYPE_PAUSE:
-               rte_eth_remove_rx_callback(port_id, queue_id,
-                               queue_cfg->cur_cb);
+               rte_eth_remove_rx_callback(port_id, queue_id, queue_cfg->cb);
                break;
        case RTE_POWER_MGMT_TYPE_SCALE:
                rte_power_freq_max(lcore_id);
-               rte_eth_remove_rx_callback(port_id, queue_id,
-                               queue_cfg->cur_cb);
+               rte_eth_remove_rx_callback(port_id, queue_id, queue_cfg->cb);
                rte_power_exit(lcore_id);
                break;
        }
@@ -332,7 +565,18 @@ rte_power_ethdev_pmgmt_queue_disable(unsigned int lcore_id,
         * ports before calling any of these API's, so we can assume that the
         * callbacks can be freed. we're intentionally casting away const-ness.
         */
-       rte_free((void *)queue_cfg->cur_cb);
+       rte_free((void *)queue_cfg->cb);
+       free(queue_cfg);
 
        return 0;
 }
+
+RTE_INIT(rte_power_ethdev_pmgmt_init) {
+       size_t i;
+
+       /* initialize all tailqs */
+       for (i = 0; i < RTE_DIM(lcore_cfgs); i++) {
+               struct pmd_core_cfg *cfg = &lcore_cfgs[i];
+               TAILQ_INIT(&cfg->head);
+       }
+}