Up till now pipeline was bound to thread selected in the initial config.
This patch allows binding pipeline to other threads at runtime using CLI
commands.
Signed-off-by: Maciej Gajdzica <maciejx.t.gajdzica@intel.com>
Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com>
Acked-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += config_check.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += init.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread.c
+SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += thread_fe.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += cpu_core_map.c
SRCS-$(CONFIG_RTE_LIBRTE_PIPELINE) += pipeline_common_be.c
void *be;
void *fe;
uint64_t timer_period;
+ uint32_t enabled;
};
struct app_thread_pipeline_data {
+ uint32_t pipeline_id;
void *be;
pipeline_be_op_run f_run;
pipeline_be_op_timer f_timer;
#define APP_MAX_THREAD_PIPELINES 16
#endif
+#ifndef APP_THREAD_TIMER_PERIOD
+#define APP_THREAD_TIMER_PERIOD 1
+#endif
+
struct app_thread_data {
struct app_thread_pipeline_data regular[APP_MAX_THREAD_PIPELINES];
struct app_thread_pipeline_data custom[APP_MAX_THREAD_PIPELINES];
uint32_t n_regular;
uint32_t n_custom;
+ uint64_t timer_period;
+ uint64_t thread_req_deadline;
+
uint64_t deadline;
+
+ struct rte_ring *msgq_in;
+ struct rte_ring *msgq_out;
};
struct app_eal_params {
return 0;
}
-static int
+int
parse_pipeline_core(uint32_t *socket,
uint32_t *core,
uint32_t *ht,
#include "pipeline_firewall.h"
#include "pipeline_flow_classification.h"
#include "pipeline_routing.h"
+#include "thread_fe.h"
#define APP_NAME_SIZE 32
t = &app->thread_data[lcore_id];
+ t->timer_period = (rte_get_tsc_hz() * APP_THREAD_TIMER_PERIOD) / 1000;
+ t->thread_req_deadline = time + t->timer_period;
+
+ t->msgq_in = app_thread_msgq_in_get(app,
+ params->socket_id,
+ params->core_id,
+ params->hyper_th_id);
+ if (t->msgq_in == NULL)
+ rte_panic("Init error: Cannot find MSGQ_IN for thread %" PRId32,
+ lcore_id);
+
+ t->msgq_out = app_thread_msgq_out_get(app,
+ params->socket_id,
+ params->core_id,
+ params->hyper_th_id);
+ if (t->msgq_out == NULL)
+ rte_panic("Init error: Cannot find MSGQ_OUT for thread %" PRId32,
+ lcore_id);
+
ptype = app_pipeline_type_find(app, params->type);
if (ptype == NULL)
rte_panic("Init error: Unknown pipeline "
&t->regular[t->n_regular] :
&t->custom[t->n_custom];
+ p->pipeline_id = p_id;
p->be = data->be;
p->f_run = ptype->be_ops->f_run;
p->f_timer = ptype->be_ops->f_timer;
p->timer_period = data->timer_period;
p->deadline = time + data->timer_period;
+ data->enabled = 1;
+
if (ptype->be_ops->f_run == NULL)
t->n_regular++;
else
app_init_msgq(app);
app_pipeline_common_cmd_push(app);
+ app_pipeline_thread_cmd_push(app);
app_pipeline_type_register(app, &pipeline_master);
app_pipeline_type_register(app, &pipeline_passthrough);
app_pipeline_type_register(app, &pipeline_flow_classification);
return n_cmds;
}
+int
+parse_pipeline_core(uint32_t *socket,
+ uint32_t *core,
+ uint32_t *ht,
+ const char *entry);
+
#endif
if (pipeline_data == NULL)
return NULL;
+ if (pipeline_data->enabled == 0)
+ return NULL;
+
return pipeline_data->fe;
}
#include "pipeline_common_be.h"
#include "app.h"
+#include "thread.h"
-int app_thread(void *arg)
+static inline void *
+thread_msg_recv(struct rte_ring *r)
+{
+ void *msg;
+ int status = rte_ring_sc_dequeue(r, &msg);
+
+ if (status != 0)
+ return NULL;
+
+ return msg;
+}
+
+static inline void
+thread_msg_send(struct rte_ring *r,
+ void *msg)
+{
+ int status;
+
+ do {
+ status = rte_ring_sp_enqueue(r, msg);
+ } while (status == -ENOBUFS);
+}
+
+static int
+thread_pipeline_enable(struct app_thread_data *t,
+ struct thread_pipeline_enable_msg_req *req)
+{
+ struct app_thread_pipeline_data *p;
+
+ if (req->f_run == NULL) {
+ if (t->n_regular >= APP_MAX_THREAD_PIPELINES)
+ return -1;
+ } else {
+ if (t->n_custom >= APP_MAX_THREAD_PIPELINES)
+ return -1;
+ }
+
+ p = (req->f_run == NULL) ?
+ &t->regular[t->n_regular] :
+ &t->custom[t->n_custom];
+
+ p->pipeline_id = req->pipeline_id;
+ p->be = req->be;
+ p->f_run = req->f_run;
+ p->f_timer = req->f_timer;
+ p->timer_period = req->timer_period;
+ p->deadline = 0;
+
+ if (req->f_run == NULL)
+ t->n_regular++;
+ else
+ t->n_custom++;
+
+ return 0;
+}
+
+static int
+thread_pipeline_disable(struct app_thread_data *t,
+ struct thread_pipeline_disable_msg_req *req)
+{
+ uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
+ uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
+ uint32_t i;
+
+ /* search regular pipelines of current thread */
+ for (i = 0; i < n_regular; i++) {
+ if (t->regular[i].pipeline_id != req->pipeline_id)
+ continue;
+
+ if (i < n_regular - 1)
+ memcpy(&t->regular[i],
+ &t->regular[i+1],
+ (n_regular - 1 - i) * sizeof(struct app_thread_pipeline_data));
+
+ n_regular--;
+ t->n_regular = n_regular;
+
+ return 0;
+ }
+
+ /* search custom pipelines of current thread */
+ for (i = 0; i < n_custom; i++) {
+ if (t->custom[i].pipeline_id != req->pipeline_id)
+ continue;
+
+ if (i < n_custom - 1)
+ memcpy(&t->custom[i],
+ &t->custom[i+1],
+ (n_custom - 1 - i) * sizeof(struct app_thread_pipeline_data));
+
+ n_custom--;
+ t->n_custom = n_custom;
+
+ return 0;
+ }
+
+ /* return if pipeline not found */
+ return -1;
+}
+
+static int
+thread_msg_req_handle(struct app_thread_data *t)
+{
+ void *msg_ptr;
+ struct thread_msg_req *req;
+ struct thread_msg_rsp *rsp;
+
+ msg_ptr = thread_msg_recv(t->msgq_in);
+ req = msg_ptr;
+ rsp = msg_ptr;
+
+ if (req != NULL)
+ switch (req->type) {
+ case THREAD_MSG_REQ_PIPELINE_ENABLE: {
+ rsp->status = thread_pipeline_enable(t,
+ (struct thread_pipeline_enable_msg_req *) req);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+
+ case THREAD_MSG_REQ_PIPELINE_DISABLE: {
+ rsp->status = thread_pipeline_disable(t,
+ (struct thread_pipeline_disable_msg_req *) req);
+ thread_msg_send(t->msgq_out, rsp);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+int
+app_thread(void *arg)
{
struct app_params *app = (struct app_params *) arg;
uint32_t core_id = rte_lcore_id(), i, j;
struct app_thread_data *t = &app->thread_data[core_id];
- uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
- uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
for (i = 0; ; i++) {
+ uint32_t n_regular = RTE_MIN(t->n_regular, RTE_DIM(t->regular));
+ uint32_t n_custom = RTE_MIN(t->n_custom, RTE_DIM(t->custom));
+
/* Run regular pipelines */
for (j = 0; j < n_regular; j++) {
struct app_thread_pipeline_data *data = &t->regular[j];
t_deadline = p_deadline;
}
+ /* Timer for thread message request */
+ {
+ uint64_t deadline = t->thread_req_deadline;
+
+ if (deadline <= time) {
+ thread_msg_req_handle(t);
+ deadline = time + t->timer_period;
+ t->thread_req_deadline = deadline;
+ }
+
+ if (deadline < t_deadline)
+ t_deadline = deadline;
+ }
+
t->deadline = t_deadline;
}
}
--- /dev/null
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef THREAD_H_
+#define THREAD_H_
+
+#include "app.h"
+#include "pipeline_be.h"
+
+enum thread_msg_req_type {
+ THREAD_MSG_REQ_PIPELINE_ENABLE = 0,
+ THREAD_MSG_REQ_PIPELINE_DISABLE,
+ THREAD_MSG_REQS
+};
+
+struct thread_msg_req {
+ enum thread_msg_req_type type;
+};
+
+struct thread_msg_rsp {
+ int status;
+};
+
+/*
+ * PIPELINE ENABLE
+ */
+struct thread_pipeline_enable_msg_req {
+ enum thread_msg_req_type type;
+
+ uint32_t pipeline_id;
+ void *be;
+ pipeline_be_op_run f_run;
+ pipeline_be_op_timer f_timer;
+ uint64_t timer_period;
+};
+
+struct thread_pipeline_enable_msg_rsp {
+ int status;
+};
+
+/*
+ * PIPELINE DISABLE
+ */
+struct thread_pipeline_disable_msg_req {
+ enum thread_msg_req_type type;
+
+ uint32_t pipeline_id;
+};
+
+struct thread_pipeline_disable_msg_rsp {
+ int status;
+};
+
+#endif /* THREAD_H_ */
--- /dev/null
+#include <rte_common.h>
+#include <rte_ring.h>
+#include <rte_malloc.h>
+#include <cmdline_rdline.h>
+#include <cmdline_parse.h>
+#include <cmdline_parse_num.h>
+#include <cmdline_parse_string.h>
+#include <cmdline_parse_ipaddr.h>
+#include <cmdline_parse_etheraddr.h>
+#include <cmdline_socket.h>
+#include <cmdline.h>
+
+#include "thread.h"
+#include "thread_fe.h"
+#include "pipeline.h"
+#include "pipeline_common_fe.h"
+#include "app.h"
+
+static inline void *
+thread_msg_send_recv(struct app_params *app,
+ uint32_t socket_id, uint32_t core_id, uint32_t ht_id,
+ void *msg,
+ uint32_t timeout_ms)
+{
+ struct rte_ring *r_req = app_thread_msgq_in_get(app,
+ socket_id, core_id, ht_id);
+ struct rte_ring *r_rsp = app_thread_msgq_out_get(app,
+ socket_id, core_id, ht_id);
+ uint64_t hz = rte_get_tsc_hz();
+ void *msg_recv;
+ uint64_t deadline;
+ int status;
+
+ /* send */
+ do {
+ status = rte_ring_sp_enqueue(r_req, (void *) msg);
+ } while (status == -ENOBUFS);
+
+ /* recv */
+ deadline = (timeout_ms) ?
+ (rte_rdtsc() + ((hz * timeout_ms) / 1000)) :
+ UINT64_MAX;
+
+ do {
+ if (rte_rdtsc() > deadline)
+ return NULL;
+
+ status = rte_ring_sc_dequeue(r_rsp, &msg_recv);
+ } while (status != 0);
+
+ return msg_recv;
+}
+
+int
+app_pipeline_enable(struct app_params *app,
+ uint32_t socket_id,
+ uint32_t core_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id)
+{
+ struct thread_pipeline_enable_msg_req *req;
+ struct thread_pipeline_enable_msg_rsp *rsp;
+ int thread_id;
+ struct app_pipeline_data *p;
+ struct app_pipeline_params *p_params;
+ struct pipeline_type *p_type;
+ int status;
+
+ if (app == NULL)
+ return -1;
+
+ thread_id = cpu_core_map_get_lcore_id(app->core_map,
+ socket_id,
+ core_id,
+ hyper_th_id);
+
+ if ((thread_id < 0) ||
+ ((app->core_mask & (1LLU << thread_id)) == 0))
+ return -1;
+
+ if (app_pipeline_data(app, pipeline_id) == NULL)
+ return -1;
+
+ p = &app->pipeline_data[pipeline_id];
+ p_params = &app->pipeline_params[pipeline_id];
+ p_type = app_pipeline_type_find(app, p_params->type);
+
+ if (p->enabled == 1)
+ return -1;
+
+ req = app_msg_alloc(app);
+ if (req == NULL)
+ return -1;
+
+ req->type = THREAD_MSG_REQ_PIPELINE_ENABLE;
+ req->pipeline_id = pipeline_id;
+ req->be = p->be;
+ req->f_run = p_type->be_ops->f_run;
+ req->f_timer = p_type->be_ops->f_timer;
+ req->timer_period = p->timer_period;
+
+ rsp = thread_msg_send_recv(app,
+ socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT);
+ if (rsp == NULL)
+ return -1;
+
+ status = rsp->status;
+ app_msg_free(app, rsp);
+
+ if (status != 0)
+ return -1;
+
+ p->enabled = 1;
+ return 0;
+}
+
+int
+app_pipeline_disable(struct app_params *app,
+ uint32_t socket_id,
+ uint32_t core_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id)
+{
+ struct thread_pipeline_disable_msg_req *req;
+ struct thread_pipeline_disable_msg_rsp *rsp;
+ int thread_id;
+ struct app_pipeline_data *p;
+ int status;
+
+ if (app == NULL)
+ return -1;
+
+ thread_id = cpu_core_map_get_lcore_id(app->core_map,
+ socket_id,
+ core_id,
+ hyper_th_id);
+
+ if ((thread_id < 0) ||
+ ((app->core_mask & (1LLU << thread_id)) == 0))
+ return -1;
+
+ if (app_pipeline_data(app, pipeline_id) == NULL)
+ return -1;
+
+ p = &app->pipeline_data[pipeline_id];
+
+ if (p->enabled == 0)
+ return -1;
+
+ req = app_msg_alloc(app);
+ if (req == NULL)
+ return -1;
+
+ req->type = THREAD_MSG_REQ_PIPELINE_DISABLE;
+ req->pipeline_id = pipeline_id;
+
+ rsp = thread_msg_send_recv(app,
+ socket_id, core_id, hyper_th_id, req, MSG_TIMEOUT_DEFAULT);
+
+ if (rsp == NULL)
+ return -1;
+
+ status = rsp->status;
+ app_msg_free(app, rsp);
+
+ if (status != 0)
+ return -1;
+
+ p->enabled = 0;
+ return 0;
+}
+
+/*
+ * pipeline enable
+ */
+
+struct cmd_pipeline_enable_result {
+ cmdline_fixed_string_t t_string;
+ cmdline_fixed_string_t t_id_string;
+ cmdline_fixed_string_t pipeline_string;
+ uint32_t pipeline_id;
+ cmdline_fixed_string_t enable_string;
+};
+
+static void
+cmd_pipeline_enable_parsed(
+ void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ void *data)
+{
+ struct cmd_pipeline_enable_result *params = parsed_result;
+ struct app_params *app = data;
+ int status;
+ uint32_t core_id, socket_id, hyper_th_id;
+
+ if (parse_pipeline_core(&socket_id,
+ &core_id,
+ &hyper_th_id,
+ params->t_id_string) != 0) {
+ printf("Command failed\n");
+ return;
+ }
+
+ status = app_pipeline_enable(app,
+ socket_id,
+ core_id,
+ hyper_th_id,
+ params->pipeline_id);
+
+ if (status != 0)
+ printf("Command failed\n");
+}
+
+cmdline_parse_token_string_t cmd_pipeline_enable_t_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_string, "t");
+
+cmdline_parse_token_string_t cmd_pipeline_enable_t_id_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, t_id_string,
+ NULL);
+
+cmdline_parse_token_string_t cmd_pipeline_enable_pipeline_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_string,
+ "pipeline");
+
+cmdline_parse_token_num_t cmd_pipeline_enable_pipeline_id =
+ TOKEN_NUM_INITIALIZER(struct cmd_pipeline_enable_result, pipeline_id,
+ UINT32);
+
+cmdline_parse_token_string_t cmd_pipeline_enable_enable_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_enable_result, enable_string,
+ "enable");
+
+cmdline_parse_inst_t cmd_pipeline_enable = {
+ .f = cmd_pipeline_enable_parsed,
+ .data = NULL,
+ .help_str = "Enable pipeline on specified core",
+ .tokens = {
+ (void *)&cmd_pipeline_enable_t_string,
+ (void *)&cmd_pipeline_enable_t_id_string,
+ (void *)&cmd_pipeline_enable_pipeline_string,
+ (void *)&cmd_pipeline_enable_pipeline_id,
+ (void *)&cmd_pipeline_enable_enable_string,
+ NULL,
+ },
+};
+
+/*
+ * pipeline disable
+ */
+
+struct cmd_pipeline_disable_result {
+ cmdline_fixed_string_t t_string;
+ cmdline_fixed_string_t t_id_string;
+ cmdline_fixed_string_t pipeline_string;
+ uint32_t pipeline_id;
+ cmdline_fixed_string_t disable_string;
+};
+
+static void
+cmd_pipeline_disable_parsed(
+ void *parsed_result,
+ __rte_unused struct cmdline *cl,
+ void *data)
+{
+ struct cmd_pipeline_disable_result *params = parsed_result;
+ struct app_params *app = data;
+ int status;
+ uint32_t core_id, socket_id, hyper_th_id;
+
+ if (parse_pipeline_core(&socket_id,
+ &core_id,
+ &hyper_th_id,
+ params->t_id_string) != 0) {
+ printf("Command failed\n");
+ return;
+ }
+
+ status = app_pipeline_disable(app,
+ socket_id,
+ core_id,
+ hyper_th_id,
+ params->pipeline_id);
+
+ if (status != 0)
+ printf("Command failed\n");
+}
+
+cmdline_parse_token_string_t cmd_pipeline_disable_t_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_string, "t");
+
+cmdline_parse_token_string_t cmd_pipeline_disable_t_id_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, t_id_string,
+ NULL);
+
+cmdline_parse_token_string_t cmd_pipeline_disable_pipeline_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result,
+ pipeline_string, "pipeline");
+
+cmdline_parse_token_num_t cmd_pipeline_disable_pipeline_id =
+ TOKEN_NUM_INITIALIZER(struct cmd_pipeline_disable_result, pipeline_id,
+ UINT32);
+
+cmdline_parse_token_string_t cmd_pipeline_disable_disable_string =
+ TOKEN_STRING_INITIALIZER(struct cmd_pipeline_disable_result, disable_string,
+ "disable");
+
+cmdline_parse_inst_t cmd_pipeline_disable = {
+ .f = cmd_pipeline_disable_parsed,
+ .data = NULL,
+ .help_str = "Disable pipeline on specified core",
+ .tokens = {
+ (void *)&cmd_pipeline_disable_t_string,
+ (void *)&cmd_pipeline_disable_t_id_string,
+ (void *)&cmd_pipeline_disable_pipeline_string,
+ (void *)&cmd_pipeline_disable_pipeline_id,
+ (void *)&cmd_pipeline_disable_disable_string,
+ NULL,
+ },
+};
+
+static cmdline_parse_ctx_t thread_cmds[] = {
+ (cmdline_parse_inst_t *) &cmd_pipeline_enable,
+ (cmdline_parse_inst_t *) &cmd_pipeline_disable,
+ NULL,
+};
+
+int
+app_pipeline_thread_cmd_push(struct app_params *app)
+{
+ uint32_t n_cmds, i;
+
+ /* Check for available slots in the application commands array */
+ n_cmds = RTE_DIM(thread_cmds) - 1;
+ if (n_cmds > APP_MAX_CMDS - app->n_cmds)
+ return -ENOMEM;
+
+ /* Push thread commands into the application */
+ memcpy(&app->cmds[app->n_cmds],
+ thread_cmds,
+ n_cmds * sizeof(cmdline_parse_ctx_t *));
+
+ for (i = 0; i < n_cmds; i++)
+ app->cmds[app->n_cmds + i]->data = app;
+
+ app->n_cmds += n_cmds;
+ app->cmds[app->n_cmds] = NULL;
+
+ return 0;
+}
--- /dev/null
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2015 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef THREAD_FE_H_
+#define THREAD_FE_H_
+
+static inline struct rte_ring *
+app_thread_msgq_in_get(struct app_params *app,
+ uint32_t socket_id, uint32_t core_id, uint32_t ht_id)
+{
+ char msgq_name[32];
+ ssize_t param_idx;
+
+ snprintf(msgq_name, sizeof(msgq_name),
+ "MSGQ-REQ-CORE-s%" PRIu32 "c%" PRIu32 "%s",
+ socket_id,
+ core_id,
+ (ht_id) ? "h" : "");
+ param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
+
+ if (param_idx < 0)
+ return NULL;
+
+ return app->msgq[param_idx];
+}
+
+static inline struct rte_ring *
+app_thread_msgq_out_get(struct app_params *app,
+ uint32_t socket_id, uint32_t core_id, uint32_t ht_id)
+{
+ char msgq_name[32];
+ ssize_t param_idx;
+
+ snprintf(msgq_name, sizeof(msgq_name),
+ "MSGQ-RSP-CORE-s%" PRIu32 "c%" PRIu32 "%s",
+ socket_id,
+ core_id,
+ (ht_id) ? "h" : "");
+ param_idx = APP_PARAM_FIND(app->msgq_params, msgq_name);
+
+ if (param_idx < 0)
+ return NULL;
+
+ return app->msgq[param_idx];
+
+}
+
+int
+app_pipeline_thread_cmd_push(struct app_params *app);
+
+int
+app_pipeline_enable(struct app_params *app,
+ uint32_t core_id,
+ uint32_t socket_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id);
+
+int
+app_pipeline_disable(struct app_params *app,
+ uint32_t core_id,
+ uint32_t socket_id,
+ uint32_t hyper_th_id,
+ uint32_t pipeline_id);
+
+#endif /* THREAD_FE_H_ */