1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(C) 2020 Marvell International Ltd.
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
9 * @file rte_graph_worker.h
13 * All functions in this file may be changed or removed without prior notice.
15 * This API allows a worker thread to walk over a graph and nodes to create,
16 * process, enqueue and move streams of objects to the next nodes.
19 #include <rte_common.h>
20 #include <rte_cycles.h>
21 #include <rte_prefetch.h>
22 #include <rte_memcpy.h>
23 #include <rte_memory.h>
25 #include "rte_graph.h"
34 * Data structure to hold graph data.
37 uint32_t tail; /**< Tail of circular buffer. */
38 uint32_t head; /**< Head of circular buffer. */
39 uint32_t cir_mask; /**< Circular buffer wrap around mask. */
40 rte_node_t nb_nodes; /**< Number of nodes in the graph. */
41 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */
42 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
43 rte_graph_t id; /**< Graph identifier. */
44 int socket; /**< Socket ID where memory is allocated. */
45 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */
46 uint64_t fence; /**< Fence. */
47 } __rte_cache_aligned;
52 * Data structure to hold node data.
56 uint64_t fence; /**< Fence. */
57 rte_graph_off_t next; /**< Index to next node. */
58 rte_node_t id; /**< Node identifier. */
59 rte_node_t parent_id; /**< Parent Node identifier. */
60 rte_edge_t nb_edges; /**< Number of edges from this node. */
61 uint32_t realloc_count; /**< Number of times realloced. */
63 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
64 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */
67 #define RTE_NODE_CTX_SZ 16
68 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
69 uint16_t size; /**< Total number of objects available. */
70 uint16_t idx; /**< Number of objects used. */
71 rte_graph_off_t off; /**< Offset of node in the graph reel. */
72 uint64_t total_cycles; /**< Cycles spent in this node. */
73 uint64_t total_calls; /**< Calls done to this node. */
74 uint64_t total_objs; /**< Objects processed by this node. */
77 void **objs; /**< Array of object pointers. */
82 rte_node_process_t process; /**< Process function. */
85 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
86 } __rte_cache_aligned;
91 * Allocate a stream of objects.
93 * If stream already exists then re-allocate it to a larger size.
96 * Pointer to the graph object.
98 * Pointer to the node object.
101 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
106 * Allocate a stream with requested number of objects.
108 * If stream already exists then re-allocate it to a larger size.
111 * Pointer to the graph object.
113 * Pointer to the node object.
115 * Number of objects to be allocated.
118 void __rte_node_stream_alloc_size(struct rte_graph *graph,
119 struct rte_node *node, uint16_t req_size);
122 * Perform graph walk on the circular buffer and invoke the process function
123 * of the nodes and collect the stats.
126 * Graph pointer returned from rte_graph_lookup function.
128 * @see rte_graph_lookup()
132 rte_graph_walk(struct rte_graph *graph)
134 const rte_graph_off_t *cir_start = graph->cir_start;
135 const rte_node_t mask = graph->cir_mask;
136 uint32_t head = graph->head;
137 struct rte_node *node;
143 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
144 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
145 * in a circular buffer fashion.
147 * +-----+ <= cir_start - head [number of source nodes]
149 * | ... | <= source nodes
151 * +-----+ <= cir_start [head = 0] [tail = 0]
153 * | ... | <= pending streams
155 * +-----+ <= cir_start + mask
157 while (likely(head != graph->tail)) {
158 node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
159 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
163 if (rte_graph_has_stats_feature()) {
165 rc = node->process(graph, node, objs, node->idx);
166 node->total_cycles += rte_rdtsc() - start;
168 node->total_objs += rc;
170 node->process(graph, node, objs, node->idx);
173 head = likely((int32_t)head > 0) ? head & mask : head;
178 /* Fast path helper functions */
183 * Enqueue a given node to the tail of the graph reel.
186 * Pointer Graph object.
188 * Pointer to node object to be enqueued.
190 static __rte_always_inline void
191 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
196 graph->cir_start[tail++] = node->off;
197 graph->tail = tail & graph->cir_mask;
203 * Enqueue sequence prologue function.
205 * Updates the node to tail of graph reel and resizes the number of objects
206 * available in the stream as needed.
209 * Pointer to the graph object.
211 * Pointer to the node object.
213 * Index at which the object enqueue starts from.
215 * Space required for the object enqueue.
217 static __rte_always_inline void
218 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
219 const uint16_t idx, const uint16_t space)
222 /* Add to the pending stream list if the node is new */
224 __rte_node_enqueue_tail_update(graph, node);
226 if (unlikely(node->size < (idx + space)))
227 __rte_node_stream_alloc(graph, node);
233 * Get the node pointer from current node edge id.
236 * Current node pointer.
238 * Edge id of the required node.
241 * Pointer to the node denoted by the edge id.
243 static __rte_always_inline struct rte_node *
244 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
246 RTE_ASSERT(next < node->nb_edges);
247 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
248 node = node->nodes[next];
249 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
255 * Enqueue the objs to next node for further processing and set
256 * the next node to pending state in the circular buffer.
259 * Graph pointer returned from rte_graph_lookup().
261 * Current node pointer.
263 * Relative next node index to enqueue objs.
267 * Number of objs to enqueue.
271 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
272 rte_edge_t next, void **objs, uint16_t nb_objs)
274 node = __rte_node_next_node_get(node, next);
275 const uint16_t idx = node->idx;
277 __rte_node_enqueue_prologue(graph, node, idx, nb_objs);
279 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
280 node->idx = idx + nb_objs;
284 * Enqueue only one obj to next node for further processing and
285 * set the next node to pending state in the circular buffer.
288 * Graph pointer returned from rte_graph_lookup().
290 * Current node pointer.
292 * Relative next node index to enqueue objs.
298 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
299 rte_edge_t next, void *obj)
301 node = __rte_node_next_node_get(node, next);
302 uint16_t idx = node->idx;
304 __rte_node_enqueue_prologue(graph, node, idx, 1);
306 node->objs[idx++] = obj;
311 * Enqueue only two objs to next node for further processing and
312 * set the next node to pending state in the circular buffer.
313 * Same as rte_node_enqueue_x1 but enqueue two objs.
316 * Graph pointer returned from rte_graph_lookup().
318 * Current node pointer.
320 * Relative next node index to enqueue objs.
328 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
329 rte_edge_t next, void *obj0, void *obj1)
331 node = __rte_node_next_node_get(node, next);
332 uint16_t idx = node->idx;
334 __rte_node_enqueue_prologue(graph, node, idx, 2);
336 node->objs[idx++] = obj0;
337 node->objs[idx++] = obj1;
342 * Enqueue only four objs to next node for further processing and
343 * set the next node to pending state in the circular buffer.
344 * Same as rte_node_enqueue_x1 but enqueue four objs.
347 * Graph pointer returned from rte_graph_lookup().
349 * Current node pointer.
351 * Relative next node index to enqueue objs.
353 * 1st obj to enqueue.
355 * 2nd obj to enqueue.
357 * 3rd obj to enqueue.
359 * 4th obj to enqueue.
363 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
364 rte_edge_t next, void *obj0, void *obj1, void *obj2,
367 node = __rte_node_next_node_get(node, next);
368 uint16_t idx = node->idx;
370 __rte_node_enqueue_prologue(graph, node, idx, 4);
372 node->objs[idx++] = obj0;
373 node->objs[idx++] = obj1;
374 node->objs[idx++] = obj2;
375 node->objs[idx++] = obj3;
380 * Enqueue objs to multiple next nodes for further processing and
381 * set the next nodes to pending state in the circular buffer.
382 * objs[i] will be enqueued to nexts[i].
385 * Graph pointer returned from rte_graph_lookup().
387 * Current node pointer.
389 * List of relative next node indices to enqueue objs.
391 * List of objs to enqueue.
393 * Number of objs to enqueue.
397 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
398 rte_edge_t *nexts, void **objs, uint16_t nb_objs)
402 for (i = 0; i < nb_objs; i++)
403 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
407 * Get the stream of next node to enqueue the objs.
408 * Once done with the updating the objs, needs to call
409 * rte_node_next_stream_put to put the next node to pending state.
412 * Graph pointer returned from rte_graph_lookup().
414 * Current node pointer.
416 * Relative next node index to get stream.
418 * Requested free size of the next stream.
421 * Valid next stream on success.
423 * @see rte_node_next_stream_put().
426 static inline void **
427 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
428 rte_edge_t next, uint16_t nb_objs)
430 node = __rte_node_next_node_get(node, next);
431 const uint16_t idx = node->idx;
432 uint16_t free_space = node->size - idx;
434 if (unlikely(free_space < nb_objs))
435 __rte_node_stream_alloc_size(graph, node, nb_objs);
437 return &node->objs[idx];
441 * Put the next stream to pending state in the circular buffer
442 * for further processing. Should be invoked after rte_node_next_stream_get().
445 * Graph pointer returned from rte_graph_lookup().
447 * Current node pointer.
449 * Relative next node index..
451 * Number of objs updated in the stream after getting the stream using
452 * rte_node_next_stream_get.
454 * @see rte_node_next_stream_get().
458 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
459 rte_edge_t next, uint16_t idx)
464 node = __rte_node_next_node_get(node, next);
466 __rte_node_enqueue_tail_update(graph, node);
472 * Home run scenario, Enqueue all the objs of current node to next
473 * node in optimized way by swapping the streams of both nodes.
474 * Performs good when next node is already not in pending state.
475 * If next node is already in pending state then normal enqueue
479 * Graph pointer returned from rte_graph_lookup().
481 * Current node pointer.
483 * Relative next node index.
487 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
490 struct rte_node *dst = __rte_node_next_node_get(src, next);
492 /* Let swap the pointers if dst don't have valid objs */
493 if (likely(dst->idx == 0)) {
494 void **dobjs = dst->objs;
495 uint16_t dsz = dst->size;
496 dst->objs = src->objs;
497 dst->size = src->size;
501 __rte_node_enqueue_tail_update(graph, dst);
502 } else { /* Move the objects from src node to dst node */
503 rte_node_enqueue(graph, src, next, src->objs, src->idx);
511 #endif /* _RTE_GRAPH_WORKER_H_ */