Add the ring input/output port type for the SWX pipeline.
Signed-off-by: Cristian Dumitrescu <cristian.dumitrescu@intel.com>
* SWX port:
[port] (@ref rte_swx_port.h),
[ethdev] (@ref rte_swx_port_ethdev.h),
+ [ring] (@ref rte_swx_port_ring.h),
[src/sink] (@ref rte_swx_port_source_sink.h)
* SWX table:
[table] (@ref rte_swx_table.h),
#include <rte_common.h>
#include <rte_ethdev.h>
#include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_ring.h>
#include <rte_swx_port_source_sink.h>
#include <rte_swx_pipeline.h>
#include <rte_swx_ctl.h>
}
}
+static const char cmd_ring_help[] =
+"ring <ring_name> size <size> numa <numa_node>\n";
+
+static void
+cmd_ring(char **tokens,
+ uint32_t n_tokens,
+ char *out,
+ size_t out_size,
+ void *obj)
+{
+ struct ring_params p;
+ char *name;
+ struct ring *ring;
+
+ if (n_tokens != 6) {
+ snprintf(out, out_size, MSG_ARG_MISMATCH, tokens[0]);
+ return;
+ }
+
+ name = tokens[1];
+
+ if (strcmp(tokens[2], "size") != 0) {
+ snprintf(out, out_size, MSG_ARG_NOT_FOUND, "size");
+ return;
+ }
+
+ if (parser_read_uint32(&p.size, tokens[3]) != 0) {
+ snprintf(out, out_size, MSG_ARG_INVALID, "size");
+ return;
+ }
+
+ if (strcmp(tokens[4], "numa") != 0) {
+ snprintf(out, out_size, MSG_ARG_NOT_FOUND, "numa");
+ return;
+ }
+
+ if (parser_read_uint32(&p.numa_node, tokens[5]) != 0) {
+ snprintf(out, out_size, MSG_ARG_INVALID, "numa_node");
+ return;
+ }
+
+ ring = ring_create(obj, name, &p);
+ if (!ring) {
+ snprintf(out, out_size, MSG_CMD_FAIL, tokens[0]);
+ return;
+ }
+}
+
static const char cmd_pipeline_create_help[] =
"pipeline <pipeline_name> create <numa_node>\n";
static const char cmd_pipeline_port_in_help[] =
"pipeline <pipeline_name> port in <port_id>\n"
" link <link_name> rxq <queue_id> bsz <burst_size>\n"
+" ring <ring_name> bsz <burst_size>\n"
" | source <mempool_name> <file_name>\n";
static void
port_id,
"ethdev",
¶ms);
+ } else if (strcmp(tokens[t0], "ring") == 0) {
+ struct rte_swx_port_ring_reader_params params;
+ struct ring *ring;
+
+ if (n_tokens < t0 + 4) {
+ snprintf(out, out_size, MSG_ARG_MISMATCH,
+ "pipeline port in ring");
+ return;
+ }
+
+ ring = ring_find(obj, tokens[t0 + 1]);
+ if (!ring) {
+ snprintf(out, out_size, MSG_ARG_INVALID,
+ "ring_name");
+ return;
+ }
+ params.name = ring->name;
+
+ if (strcmp(tokens[t0 + 2], "bsz") != 0) {
+ snprintf(out, out_size, MSG_ARG_NOT_FOUND, "bsz");
+ return;
+ }
+
+ if (parser_read_uint32(¶ms.burst_size, tokens[t0 + 3])) {
+ snprintf(out, out_size, MSG_ARG_INVALID,
+ "burst_size");
+ return;
+ }
+
+ t0 += 4;
+
+ status = rte_swx_pipeline_port_in_config(p->p,
+ port_id,
+ "ring",
+ ¶ms);
} else if (strcmp(tokens[t0], "source") == 0) {
struct rte_swx_port_source_params params;
struct mempool *mp;
static const char cmd_pipeline_port_out_help[] =
"pipeline <pipeline_name> port out <port_id>\n"
" link <link_name> txq <txq_id> bsz <burst_size>\n"
+" ring <ring_name> bsz <burst_size>\n"
" | sink <file_name> | none\n";
static void
port_id,
"ethdev",
¶ms);
+ } else if (strcmp(tokens[t0], "ring") == 0) {
+ struct rte_swx_port_ring_writer_params params;
+ struct ring *ring;
+
+ if (n_tokens < t0 + 4) {
+ snprintf(out, out_size, MSG_ARG_MISMATCH,
+ "pipeline port out link");
+ return;
+ }
+
+ ring = ring_find(obj, tokens[t0 + 1]);
+ if (!ring) {
+ snprintf(out, out_size, MSG_ARG_INVALID,
+ "ring_name");
+ return;
+ }
+ params.name = ring->name;
+
+ if (strcmp(tokens[t0 + 2], "bsz") != 0) {
+ snprintf(out, out_size, MSG_ARG_NOT_FOUND, "bsz");
+ return;
+ }
+
+ if (parser_read_uint32(¶ms.burst_size, tokens[t0 + 3])) {
+ snprintf(out, out_size, MSG_ARG_INVALID,
+ "burst_size");
+ return;
+ }
+
+ t0 += 4;
+
+ status = rte_swx_pipeline_port_out_config(p->p,
+ port_id,
+ "ring",
+ ¶ms);
} else if (strcmp(tokens[t0], "sink") == 0) {
struct rte_swx_port_sink_params params;
return;
}
+ if (strcmp(tokens[0], "ring") == 0) {
+ snprintf(out, out_size, "\n%s\n", cmd_ring_help);
+ return;
+ }
+
if ((strcmp(tokens[0], "pipeline") == 0) &&
(n_tokens == 2) && (strcmp(tokens[1], "create") == 0)) {
snprintf(out, out_size, "\n%s\n", cmd_pipeline_create_help);
return;
}
+ if (strcmp(tokens[0], "ring") == 0) {
+ cmd_ring(tokens, n_tokens, out, out_size, obj);
+ return;
+ }
+
if (strcmp(tokens[0], "pipeline") == 0) {
if ((n_tokens >= 3) &&
(strcmp(tokens[2], "create") == 0)) {
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_swx_port_ethdev.h>
+#include <rte_swx_port_ring.h>
#include <rte_swx_port_source_sink.h>
#include <rte_swx_table_em.h>
#include <rte_swx_pipeline.h>
*/
TAILQ_HEAD(link_list, link);
+/*
+ * ring
+ */
+TAILQ_HEAD(ring_list, ring);
+
/*
* pipeline
*/
struct obj {
struct mempool_list mempool_list;
struct link_list link_list;
+ struct ring_list ring_list;
struct pipeline_list pipeline_list;
};
TAILQ_FIRST(&obj->link_list) : TAILQ_NEXT(link, node);
}
+/*
+ * ring
+ */
+struct ring *
+ring_create(struct obj *obj, const char *name, struct ring_params *params)
+{
+ struct ring *ring;
+ struct rte_ring *r;
+ unsigned int flags = RING_F_SP_ENQ | RING_F_SC_DEQ;
+
+ /* Check input params */
+ if (!name || ring_find(obj, name) || !params || !params->size)
+ return NULL;
+
+ /**
+ * Resource create
+ */
+ r = rte_ring_create(
+ name,
+ params->size,
+ params->numa_node,
+ flags);
+ if (!r)
+ return NULL;
+
+ /* Node allocation */
+ ring = calloc(1, sizeof(struct ring));
+ if (!ring) {
+ rte_ring_free(r);
+ return NULL;
+ }
+
+ /* Node fill in */
+ strlcpy(ring->name, name, sizeof(ring->name));
+
+ /* Node add to list */
+ TAILQ_INSERT_TAIL(&obj->ring_list, ring, node);
+
+ return ring;
+}
+
+struct ring *
+ring_find(struct obj *obj, const char *name)
+{
+ struct ring *ring;
+
+ if (!obj || !name)
+ return NULL;
+
+ TAILQ_FOREACH(ring, &obj->ring_list, node)
+ if (strcmp(ring->name, name) == 0)
+ return ring;
+
+ return NULL;
+}
+
/*
* pipeline
*/
if (status)
goto error;
+ status = rte_swx_pipeline_port_in_type_register(p,
+ "ring",
+ &rte_swx_port_ring_reader_ops);
+ if (status)
+ goto error;
+
+ status = rte_swx_pipeline_port_out_type_register(p,
+ "ring",
+ &rte_swx_port_ring_writer_ops);
+ if (status)
+ goto error;
+
#ifdef RTE_PORT_PCAP
status = rte_swx_pipeline_port_in_type_register(p,
"source",
TAILQ_INIT(&obj->mempool_list);
TAILQ_INIT(&obj->link_list);
+ TAILQ_INIT(&obj->ring_list);
TAILQ_INIT(&obj->pipeline_list);
return obj;
struct link *
link_next(struct obj *obj, struct link *link);
+/*
+ * ring
+ */
+struct ring_params {
+ uint32_t size;
+ uint32_t numa_node;
+};
+
+struct ring {
+ TAILQ_ENTRY(ring) node;
+ char name[NAME_SIZE];
+};
+
+struct ring *
+ring_create(struct obj *obj,
+ const char *name,
+ struct ring_params *params);
+
+struct ring *
+ring_find(struct obj *obj, const char *name);
+
/*
* pipeline
*/
'rte_port_sym_crypto.c',
'rte_port_eventdev.c',
'rte_swx_port_ethdev.c',
- 'rte_swx_port_source_sink.c',)
+ 'rte_swx_port_ring.c',
+ 'rte_swx_port_source_sink.c',
+ )
headers = files(
'rte_port_ethdev.h',
'rte_port_fd.h',
'rte_port_eventdev.h',
'rte_swx_port.h',
'rte_swx_port_ethdev.h',
- 'rte_swx_port_source_sink.h',)
+ 'rte_swx_port_ring.h',
+ 'rte_swx_port_source_sink.h',
+ )
deps += ['ethdev', 'sched', 'ip_frag', 'cryptodev', 'eventdev']
if dpdk_conf.has('RTE_PORT_PCAP')
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Intel Corporation
+ */
+#include <string.h>
+#include <stdint.h>
+
+#include <rte_mbuf.h>
+#include <rte_ring.h>
+#include <rte_hexdump.h>
+
+#include "rte_swx_port_ring.h"
+
+#ifndef TRACE_LEVEL
+#define TRACE_LEVEL 0
+#endif
+
+#if TRACE_LEVEL
+#define TRACE(...) printf(__VA_ARGS__)
+#else
+#define TRACE(...)
+#endif
+
+/*
+ * Reader
+ */
+struct reader {
+ struct {
+ struct rte_ring *ring;
+ char *name;
+ uint32_t burst_size;
+ } params;
+ struct rte_swx_port_in_stats stats;
+ struct rte_mbuf **pkts;
+ int n_pkts;
+ int pos;
+};
+
+static void *
+reader_create(void *args)
+{
+ struct rte_swx_port_ring_reader_params *params = args;
+ struct rte_ring *ring;
+ struct reader *p = NULL;
+
+ /* Check input parameters. */
+ if (!params || !params->name || !params->burst_size)
+ goto error;
+
+ ring = rte_ring_lookup(params->name);
+ if (!ring)
+ goto error;
+
+ /* Memory allocation. */
+ p = calloc(1, sizeof(struct reader));
+ if (!p)
+ goto error;
+
+ p->params.name = strdup(params->name);
+ if (!p->params.name)
+ goto error;
+
+ p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
+ if (!p->pkts)
+ goto error;
+
+ /* Initialization. */
+ p->params.ring = ring;
+ p->params.burst_size = params->burst_size;
+
+ return p;
+
+error:
+ if (!p)
+ return NULL;
+
+ free(p->pkts);
+ free(p->params.name);
+ free(p);
+ return NULL;
+}
+
+static int
+reader_pkt_rx(void *port, struct rte_swx_pkt *pkt)
+{
+ struct reader *p = port;
+ struct rte_mbuf *m;
+
+ if (p->pos == p->n_pkts) {
+ int n_pkts;
+
+ n_pkts = rte_ring_sc_dequeue_burst(p->params.ring,
+ (void **) p->pkts,
+ p->params.burst_size,
+ NULL);
+ if (!n_pkts) {
+ p->stats.n_empty++;
+ return 0;
+ }
+
+ TRACE("[Ring %s] %d packets in\n",
+ p->params.name,
+ n_pkts);
+
+ p->n_pkts = n_pkts;
+ p->pos = 0;
+ }
+
+ m = p->pkts[p->pos++];
+ pkt->handle = m;
+ pkt->pkt = m->buf_addr;
+ pkt->offset = m->data_off;
+ pkt->length = m->pkt_len;
+
+ TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
+ (uint32_t)p->params.name,
+ p->pos - 1,
+ pkt->length,
+ pkt->offset);
+ if (TRACE_LEVEL)
+ rte_hexdump(stdout,
+ NULL,
+ &((uint8_t *)m->buf_addr)[m->data_off],
+ m->data_len);
+
+ p->stats.n_pkts++;
+ p->stats.n_bytes += pkt->length;
+
+ return 1;
+}
+
+static void
+reader_free(void *port)
+{
+ struct reader *p = port;
+ int i;
+
+ if (!p)
+ return;
+
+ for (i = 0; i < p->n_pkts; i++) {
+ struct rte_mbuf *pkt = p->pkts[i];
+
+ rte_pktmbuf_free(pkt);
+ }
+
+ free(p->pkts);
+ free(p->params.name);
+ free(p);
+}
+
+static void
+reader_stats_read(void *port, struct rte_swx_port_in_stats *stats)
+{
+ struct reader *p = port;
+
+ if (!stats)
+ return;
+
+ memcpy(stats, &p->stats, sizeof(p->stats));
+}
+
+/*
+ * Writer
+ */
+struct writer {
+ struct {
+ struct rte_ring *ring;
+ char *name;
+ uint32_t burst_size;
+ } params;
+ struct rte_swx_port_out_stats stats;
+
+ struct rte_mbuf **pkts;
+ int n_pkts;
+};
+
+static void *
+writer_create(void *args)
+{
+ struct rte_swx_port_ring_writer_params *params = args;
+ struct rte_ring *ring;
+ struct writer *p = NULL;
+
+ /* Check input parameters. */
+ if (!params || !params->name || !params->burst_size)
+ goto error;
+
+ ring = rte_ring_lookup(params->name);
+ if (!ring)
+ goto error;
+
+ /* Memory allocation. */
+ p = calloc(1, sizeof(struct writer));
+ if (!p)
+ goto error;
+
+ p->params.name = strdup(params->name);
+ if (!p)
+ goto error;
+
+ p->pkts = calloc(params->burst_size, sizeof(struct rte_mbuf *));
+ if (!p->pkts)
+ goto error;
+
+ /* Initialization. */
+ p->params.ring = ring;
+ p->params.burst_size = params->burst_size;
+
+ return p;
+
+error:
+ if (!p)
+ return NULL;
+
+ free(p->params.name);
+ free(p->pkts);
+ free(p);
+ return NULL;
+}
+
+static void
+__writer_flush(struct writer *p)
+{
+ int n_pkts;
+
+ for (n_pkts = 0; ; ) {
+ n_pkts += rte_ring_sp_enqueue_burst(p->params.ring,
+ (void **)p->pkts + n_pkts,
+ p->n_pkts - n_pkts,
+ NULL);
+
+ TRACE("[Ring %s] %d packets out\n", p->params.name, n_pkts);
+
+ if (n_pkts == p->n_pkts)
+ break;
+ }
+
+ p->n_pkts = 0;
+}
+
+static void
+writer_pkt_tx(void *port, struct rte_swx_pkt *pkt)
+{
+ struct writer *p = port;
+ struct rte_mbuf *m = pkt->handle;
+
+ TRACE("[Ring %s] Pkt %d (%u bytes at offset %u)\n",
+ p->params.name,
+ p->n_pkts - 1,
+ pkt->length,
+ pkt->offset);
+ if (TRACE_LEVEL)
+ rte_hexdump(stdout, NULL, &pkt->pkt[pkt->offset], pkt->length);
+
+ m->pkt_len = pkt->length;
+ m->data_len = (uint16_t)pkt->length;
+ m->data_off = (uint16_t)pkt->offset;
+
+ p->stats.n_pkts++;
+ p->stats.n_bytes += pkt->length;
+
+ p->pkts[p->n_pkts++] = m;
+ if (p->n_pkts == (int)p->params.burst_size)
+ __writer_flush(p);
+}
+
+static void
+writer_flush(void *port)
+{
+ struct writer *p = port;
+
+ if (p->n_pkts)
+ __writer_flush(p);
+}
+
+static void
+writer_free(void *port)
+{
+ struct writer *p = port;
+
+ if (!p)
+ return;
+
+ writer_flush(p);
+ free(p->pkts);
+ free(p->params.name);
+ free(port);
+}
+
+static void
+writer_stats_read(void *port, struct rte_swx_port_out_stats *stats)
+{
+ struct writer *p = port;
+
+ if (!stats)
+ return;
+
+ memcpy(stats, &p->stats, sizeof(p->stats));
+}
+
+/*
+ * Summary of port operations
+ */
+struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops = {
+ .create = reader_create,
+ .free = reader_free,
+ .pkt_rx = reader_pkt_rx,
+ .stats_read = reader_stats_read,
+};
+
+struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops = {
+ .create = writer_create,
+ .free = writer_free,
+ .pkt_tx = writer_pkt_tx,
+ .flush = writer_flush,
+ .stats_read = writer_stats_read,
+};
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2021 Intel Corporation
+ */
+
+#ifndef __INCLUDE_RTE_SWX_PORT_RING_H__
+#define __INCLUDE_RTE_SWX_PORT_RING_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ * RTE SWX Ring Input and Output Ports
+ ***/
+
+#include <stdint.h>
+
+#include <rte_ring.h>
+
+#include "rte_swx_port.h"
+
+/** Ring input port (reader) creation parameters. */
+struct rte_swx_port_ring_reader_params {
+ /** Name of valid RTE ring. */
+ const char *name;
+
+ /** Read burst size. */
+ uint32_t burst_size;
+};
+
+/** Ring_reader operations. */
+extern struct rte_swx_port_in_ops rte_swx_port_ring_reader_ops;
+
+/** Ring output port (writer) creation parameters. */
+struct rte_swx_port_ring_writer_params {
+ /** Name of valid RTE ring. */
+ const char *name;
+
+ /** Read burst size. */
+ uint32_t burst_size;
+};
+
+/** Ring writer operations. */
+extern struct rte_swx_port_out_ops rte_swx_port_ring_writer_ops;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __INCLUDE_RTE_SWX_PORT_RING_H__ */
rte_swx_port_ethdev_writer_ops;
rte_swx_port_sink_ops;
rte_swx_port_source_ops;
+
+ # added in 21.05
+ rte_swx_port_ring_reader_ops;
+ rte_swx_port_ring_writer_ops;
};