#include <rte_alarm.h>
#include <rte_tailq.h>
#include <rte_ring_elem.h>
+#include <rte_ring_peek.h>
#include <mlx5_common.h>
#include "mlx5_vdpa_utils.h"
#include "mlx5_vdpa.h"
+static inline uint32_t
+mlx5_vdpa_c_thrd_ring_dequeue_bulk(struct rte_ring *r,
+ void **obj, uint32_t n, uint32_t *avail)
+{
+ uint32_t m;
+
+ m = rte_ring_dequeue_bulk_elem_start(r, obj,
+ sizeof(struct mlx5_vdpa_task), n, avail);
+ n = (m == n) ? n : 0;
+ rte_ring_dequeue_elem_finish(r, n);
+ return n;
+}
+
+static inline uint32_t
+mlx5_vdpa_c_thrd_ring_enqueue_bulk(struct rte_ring *r,
+ void * const *obj, uint32_t n, uint32_t *free)
+{
+ uint32_t m;
+
+ m = rte_ring_enqueue_bulk_elem_start(r, n, free);
+ n = (m == n) ? n : 0;
+ rte_ring_enqueue_elem_finish(r, obj,
+ sizeof(struct mlx5_vdpa_task), n);
+ return n;
+}
+
+bool
+mlx5_vdpa_task_add(struct mlx5_vdpa_priv *priv,
+ uint32_t thrd_idx,
+ uint32_t num)
+{
+ struct rte_ring *rng = conf_thread_mng.cthrd[thrd_idx].rng;
+ struct mlx5_vdpa_task task[MLX5_VDPA_TASKS_PER_DEV];
+ uint32_t i;
+
+ MLX5_ASSERT(num <= MLX5_VDPA_TASKS_PER_DEV);
+ for (i = 0 ; i < num; i++) {
+ task[i].priv = priv;
+ /* To be added later. */
+ }
+ if (!mlx5_vdpa_c_thrd_ring_enqueue_bulk(rng, (void **)&task, num, NULL))
+ return -1;
+ for (i = 0 ; i < num; i++)
+ if (task[i].remaining_cnt)
+ __atomic_fetch_add(task[i].remaining_cnt, 1,
+ __ATOMIC_RELAXED);
+ /* wake up conf thread. */
+ pthread_mutex_lock(&conf_thread_mng.cthrd_lock);
+ pthread_cond_signal(&conf_thread_mng.cthrd[thrd_idx].c_cond);
+ pthread_mutex_unlock(&conf_thread_mng.cthrd_lock);
+ return 0;
+}
+
static void *
mlx5_vdpa_c_thread_handle(void *arg)
{
- /* To be added later. */
- return arg;
+ struct mlx5_vdpa_conf_thread_mng *multhrd = arg;
+ pthread_t thread_id = pthread_self();
+ struct mlx5_vdpa_priv *priv;
+ struct mlx5_vdpa_task task;
+ struct rte_ring *rng;
+ uint32_t thrd_idx;
+ uint32_t task_num;
+
+ for (thrd_idx = 0; thrd_idx < multhrd->max_thrds;
+ thrd_idx++)
+ if (multhrd->cthrd[thrd_idx].tid == thread_id)
+ break;
+ if (thrd_idx >= multhrd->max_thrds)
+ return NULL;
+ rng = multhrd->cthrd[thrd_idx].rng;
+ while (1) {
+ task_num = mlx5_vdpa_c_thrd_ring_dequeue_bulk(rng,
+ (void **)&task, 1, NULL);
+ if (!task_num) {
+ /* No task and condition wait. */
+ pthread_mutex_lock(&multhrd->cthrd_lock);
+ pthread_cond_wait(
+ &multhrd->cthrd[thrd_idx].c_cond,
+ &multhrd->cthrd_lock);
+ pthread_mutex_unlock(&multhrd->cthrd_lock);
+ }
+ priv = task.priv;
+ if (priv == NULL)
+ continue;
+ __atomic_fetch_sub(task.remaining_cnt,
+ 1, __ATOMIC_RELAXED);
+ /* To be added later. */
+ }
+ return NULL;
}
static void
if (need_unlock)
pthread_mutex_init(&conf_thread_mng.cthrd_lock, NULL);
}
+ if (conf_thread_mng.cthrd[thrd_idx].rng) {
+ rte_ring_free(conf_thread_mng.cthrd[thrd_idx].rng);
+ conf_thread_mng.cthrd[thrd_idx].rng = NULL;
+ }
}
static int
rte_cpuset_t cpuset;
pthread_attr_t attr;
uint32_t thrd_idx;
+ uint32_t ring_num;
char name[32];
int ret;
DRV_LOG(ERR, "Failed to set thread priority.");
goto c_thread_err;
}
+ ring_num = MLX5_VDPA_MAX_TASKS_PER_THRD / conf_thread_mng.max_thrds;
+ if (!ring_num) {
+ DRV_LOG(ERR, "Invalid ring number for thread.");
+ goto c_thread_err;
+ }
for (thrd_idx = 0; thrd_idx < conf_thread_mng.max_thrds;
thrd_idx++) {
+ snprintf(name, sizeof(name), "vDPA-mthread-ring-%d",
+ thrd_idx);
+ conf_thread_mng.cthrd[thrd_idx].rng = rte_ring_create_elem(name,
+ sizeof(struct mlx5_vdpa_task), ring_num,
+ rte_socket_id(),
+ RING_F_MP_HTS_ENQ | RING_F_MC_HTS_DEQ |
+ RING_F_EXACT_SZ);
+ if (!conf_thread_mng.cthrd[thrd_idx].rng) {
+ DRV_LOG(ERR,
+ "Failed to create vdpa multi-threads %d ring.",
+ thrd_idx);
+ goto c_thread_err;
+ }
ret = pthread_create(&conf_thread_mng.cthrd[thrd_idx].tid,
&attr, mlx5_vdpa_c_thread_handle,
(void *)&conf_thread_mng);
name);
else
DRV_LOG(DEBUG, "Thread name: %s.", name);
+ pthread_cond_init(&conf_thread_mng.cthrd[thrd_idx].c_cond,
+ NULL);
}
pthread_mutex_unlock(&conf_thread_mng.cthrd_lock);
return 0;