1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(C) 2020 Marvell International Ltd.
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
9 * @file rte_graph_worker.h
12 * @b EXPERIMENTAL: this API may change without prior notice
14 * This API allows a worker thread to walk over a graph and nodes to create,
15 * process, enqueue and move streams of objects to the next nodes.
18 #include <rte_common.h>
19 #include <rte_cycles.h>
20 #include <rte_prefetch.h>
21 #include <rte_memcpy.h>
22 #include <rte_memory.h>
24 #include "rte_graph.h"
33 * Data structure to hold graph data.
36 uint32_t tail; /**< Tail of circular buffer. */
37 uint32_t head; /**< Head of circular buffer. */
38 uint32_t cir_mask; /**< Circular buffer wrap around mask. */
39 rte_node_t nb_nodes; /**< Number of nodes in the graph. */
40 rte_graph_off_t *cir_start; /**< Pointer to circular buffer. */
41 rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
42 rte_graph_t id; /**< Graph identifier. */
43 int socket; /**< Socket ID where memory is allocated. */
44 char name[RTE_GRAPH_NAMESIZE]; /**< Name of the graph. */
45 uint64_t fence; /**< Fence. */
46 } __rte_cache_aligned;
51 * Data structure to hold node data.
55 uint64_t fence; /**< Fence. */
56 rte_graph_off_t next; /**< Index to next node. */
57 rte_node_t id; /**< Node identifier. */
58 rte_node_t parent_id; /**< Parent Node identifier. */
59 rte_edge_t nb_edges; /**< Number of edges from this node. */
60 uint32_t realloc_count; /**< Number of times realloced. */
62 char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
63 char name[RTE_NODE_NAMESIZE]; /**< Name of the node. */
66 #define RTE_NODE_CTX_SZ 16
67 uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
68 uint16_t size; /**< Total number of objects available. */
69 uint16_t idx; /**< Number of objects used. */
70 rte_graph_off_t off; /**< Offset of node in the graph reel. */
71 uint64_t total_cycles; /**< Cycles spent in this node. */
72 uint64_t total_calls; /**< Calls done to this node. */
73 uint64_t total_objs; /**< Objects processed by this node. */
76 void **objs; /**< Array of object pointers. */
81 rte_node_process_t process; /**< Process function. */
84 struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
85 } __rte_cache_aligned;
90 * Allocate a stream of objects.
92 * If stream already exists then re-allocate it to a larger size.
95 * Pointer to the graph object.
97 * Pointer to the node object.
100 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
105 * Allocate a stream with requested number of objects.
107 * If stream already exists then re-allocate it to a larger size.
110 * Pointer to the graph object.
112 * Pointer to the node object.
114 * Number of objects to be allocated.
117 void __rte_node_stream_alloc_size(struct rte_graph *graph,
118 struct rte_node *node, uint16_t req_size);
121 * Perform graph walk on the circular buffer and invoke the process function
122 * of the nodes and collect the stats.
125 * Graph pointer returned from rte_graph_lookup function.
127 * @see rte_graph_lookup()
131 rte_graph_walk(struct rte_graph *graph)
133 const rte_graph_off_t *cir_start = graph->cir_start;
134 const rte_node_t mask = graph->cir_mask;
135 uint32_t head = graph->head;
136 struct rte_node *node;
142 * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
143 * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
144 * in a circular buffer fashion.
146 * +-----+ <= cir_start - head [number of source nodes]
148 * | ... | <= source nodes
150 * +-----+ <= cir_start [head = 0] [tail = 0]
152 * | ... | <= pending streams
154 * +-----+ <= cir_start + mask
156 while (likely(head != graph->tail)) {
157 node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
158 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
162 if (rte_graph_has_stats_feature()) {
164 rc = node->process(graph, node, objs, node->idx);
165 node->total_cycles += rte_rdtsc() - start;
167 node->total_objs += rc;
169 node->process(graph, node, objs, node->idx);
172 head = likely((int32_t)head > 0) ? head & mask : head;
177 /* Fast path helper functions */
182 * Enqueue a given node to the tail of the graph reel.
185 * Pointer Graph object.
187 * Pointer to node object to be enqueued.
189 static __rte_always_inline void
190 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
195 graph->cir_start[tail++] = node->off;
196 graph->tail = tail & graph->cir_mask;
202 * Enqueue sequence prologue function.
204 * Updates the node to tail of graph reel and resizes the number of objects
205 * available in the stream as needed.
208 * Pointer to the graph object.
210 * Pointer to the node object.
212 * Index at which the object enqueue starts from.
214 * Space required for the object enqueue.
216 static __rte_always_inline void
217 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
218 const uint16_t idx, const uint16_t space)
221 /* Add to the pending stream list if the node is new */
223 __rte_node_enqueue_tail_update(graph, node);
225 if (unlikely(node->size < (idx + space)))
226 __rte_node_stream_alloc(graph, node);
232 * Get the node pointer from current node edge id.
235 * Current node pointer.
237 * Edge id of the required node.
240 * Pointer to the node denoted by the edge id.
242 static __rte_always_inline struct rte_node *
243 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
245 RTE_ASSERT(next < node->nb_edges);
246 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
247 node = node->nodes[next];
248 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
254 * Enqueue the objs to next node for further processing and set
255 * the next node to pending state in the circular buffer.
258 * Graph pointer returned from rte_graph_lookup().
260 * Current node pointer.
262 * Relative next node index to enqueue objs.
266 * Number of objs to enqueue.
270 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
271 rte_edge_t next, void **objs, uint16_t nb_objs)
273 node = __rte_node_next_node_get(node, next);
274 const uint16_t idx = node->idx;
276 __rte_node_enqueue_prologue(graph, node, idx, nb_objs);
278 rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
279 node->idx = idx + nb_objs;
283 * Enqueue only one obj to next node for further processing and
284 * set the next node to pending state in the circular buffer.
287 * Graph pointer returned from rte_graph_lookup().
289 * Current node pointer.
291 * Relative next node index to enqueue objs.
297 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
298 rte_edge_t next, void *obj)
300 node = __rte_node_next_node_get(node, next);
301 uint16_t idx = node->idx;
303 __rte_node_enqueue_prologue(graph, node, idx, 1);
305 node->objs[idx++] = obj;
310 * Enqueue only two objs to next node for further processing and
311 * set the next node to pending state in the circular buffer.
312 * Same as rte_node_enqueue_x1 but enqueue two objs.
315 * Graph pointer returned from rte_graph_lookup().
317 * Current node pointer.
319 * Relative next node index to enqueue objs.
327 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
328 rte_edge_t next, void *obj0, void *obj1)
330 node = __rte_node_next_node_get(node, next);
331 uint16_t idx = node->idx;
333 __rte_node_enqueue_prologue(graph, node, idx, 2);
335 node->objs[idx++] = obj0;
336 node->objs[idx++] = obj1;
341 * Enqueue only four objs to next node for further processing and
342 * set the next node to pending state in the circular buffer.
343 * Same as rte_node_enqueue_x1 but enqueue four objs.
346 * Graph pointer returned from rte_graph_lookup().
348 * Current node pointer.
350 * Relative next node index to enqueue objs.
352 * 1st obj to enqueue.
354 * 2nd obj to enqueue.
356 * 3rd obj to enqueue.
358 * 4th obj to enqueue.
362 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
363 rte_edge_t next, void *obj0, void *obj1, void *obj2,
366 node = __rte_node_next_node_get(node, next);
367 uint16_t idx = node->idx;
369 __rte_node_enqueue_prologue(graph, node, idx, 4);
371 node->objs[idx++] = obj0;
372 node->objs[idx++] = obj1;
373 node->objs[idx++] = obj2;
374 node->objs[idx++] = obj3;
379 * Enqueue objs to multiple next nodes for further processing and
380 * set the next nodes to pending state in the circular buffer.
381 * objs[i] will be enqueued to nexts[i].
384 * Graph pointer returned from rte_graph_lookup().
386 * Current node pointer.
388 * List of relative next node indices to enqueue objs.
390 * List of objs to enqueue.
392 * Number of objs to enqueue.
396 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
397 rte_edge_t *nexts, void **objs, uint16_t nb_objs)
401 for (i = 0; i < nb_objs; i++)
402 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
406 * Get the stream of next node to enqueue the objs.
407 * Once done with the updating the objs, needs to call
408 * rte_node_next_stream_put to put the next node to pending state.
411 * Graph pointer returned from rte_graph_lookup().
413 * Current node pointer.
415 * Relative next node index to get stream.
417 * Requested free size of the next stream.
420 * Valid next stream on success.
422 * @see rte_node_next_stream_put().
425 static inline void **
426 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
427 rte_edge_t next, uint16_t nb_objs)
429 node = __rte_node_next_node_get(node, next);
430 const uint16_t idx = node->idx;
431 uint16_t free_space = node->size - idx;
433 if (unlikely(free_space < nb_objs))
434 __rte_node_stream_alloc_size(graph, node, nb_objs);
436 return &node->objs[idx];
440 * Put the next stream to pending state in the circular buffer
441 * for further processing. Should be invoked after rte_node_next_stream_get().
444 * Graph pointer returned from rte_graph_lookup().
446 * Current node pointer.
448 * Relative next node index..
450 * Number of objs updated in the stream after getting the stream using
451 * rte_node_next_stream_get.
453 * @see rte_node_next_stream_get().
457 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
458 rte_edge_t next, uint16_t idx)
463 node = __rte_node_next_node_get(node, next);
465 __rte_node_enqueue_tail_update(graph, node);
471 * Home run scenario, Enqueue all the objs of current node to next
472 * node in optimized way by swapping the streams of both nodes.
473 * Performs good when next node is already not in pending state.
474 * If next node is already in pending state then normal enqueue
478 * Graph pointer returned from rte_graph_lookup().
480 * Current node pointer.
482 * Relative next node index.
486 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
489 struct rte_node *dst = __rte_node_next_node_get(src, next);
491 /* Let swap the pointers if dst don't have valid objs */
492 if (likely(dst->idx == 0)) {
493 void **dobjs = dst->objs;
494 uint16_t dsz = dst->size;
495 dst->objs = src->objs;
496 dst->size = src->size;
500 __rte_node_enqueue_tail_update(graph, dst);
501 } else { /* Move the objects from src node to dst node */
502 rte_node_enqueue(graph, src, next, src->objs, src->idx);
510 #endif /* _RTE_GRAPH_WORKER_H_ */