test mbuf attach
[dpdk.git] / lib / librte_graph / rte_graph_worker.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(C) 2020 Marvell International Ltd.
3  */
4
5 #ifndef _RTE_GRAPH_WORKER_H_
6 #define _RTE_GRAPH_WORKER_H_
7
8 /**
9  * @file rte_graph_worker.h
10  *
11  * @warning
12  * @b EXPERIMENTAL:
13  * All functions in this file may be changed or removed without prior notice.
14  *
15  * This API allows a worker thread to walk over a graph and nodes to create,
16  * process, enqueue and move streams of objects to the next nodes.
17  */
18
19 #include <rte_common.h>
20 #include <rte_cycles.h>
21 #include <rte_prefetch.h>
22 #include <rte_memcpy.h>
23 #include <rte_memory.h>
24
25 #include "rte_graph.h"
26
27 #ifdef __cplusplus
28 extern "C" {
29 #endif
30
31 /**
32  * @internal
33  *
34  * Data structure to hold graph data.
35  */
36 struct rte_graph {
37         uint32_t tail;               /**< Tail of circular buffer. */
38         uint32_t head;               /**< Head of circular buffer. */
39         uint32_t cir_mask;           /**< Circular buffer wrap around mask. */
40         rte_node_t nb_nodes;         /**< Number of nodes in the graph. */
41         rte_graph_off_t *cir_start;  /**< Pointer to circular buffer. */
42         rte_graph_off_t nodes_start; /**< Offset at which node memory starts. */
43         rte_graph_t id; /**< Graph identifier. */
44         int socket;     /**< Socket ID where memory is allocated. */
45         char name[RTE_GRAPH_NAMESIZE];  /**< Name of the graph. */
46         uint64_t fence;                 /**< Fence. */
47 } __rte_cache_aligned;
48
49 /**
50  * @internal
51  *
52  * Data structure to hold node data.
53  */
54 struct rte_node {
55         /* Slow path area  */
56         uint64_t fence;         /**< Fence. */
57         rte_graph_off_t next;   /**< Index to next node. */
58         rte_node_t id;          /**< Node identifier. */
59         rte_node_t parent_id;   /**< Parent Node identifier. */
60         rte_edge_t nb_edges;    /**< Number of edges from this node. */
61         uint32_t realloc_count; /**< Number of times realloced. */
62
63         char parent[RTE_NODE_NAMESIZE]; /**< Parent node name. */
64         char name[RTE_NODE_NAMESIZE];   /**< Name of the node. */
65
66         /* Fast path area  */
67 #define RTE_NODE_CTX_SZ 16
68         uint8_t ctx[RTE_NODE_CTX_SZ] __rte_cache_aligned; /**< Node Context. */
69         uint16_t size;          /**< Total number of objects available. */
70         uint16_t idx;           /**< Number of objects used. */
71         rte_graph_off_t off;    /**< Offset of node in the graph reel. */
72         uint64_t total_cycles;  /**< Cycles spent in this node. */
73         uint64_t total_calls;   /**< Calls done to this node. */
74         uint64_t total_objs;    /**< Objects processed by this node. */
75         RTE_STD_C11
76                 union {
77                         void **objs;       /**< Array of object pointers. */
78                         uint64_t objs_u64;
79                 };
80         RTE_STD_C11
81                 union {
82                         rte_node_process_t process; /**< Process function. */
83                         uint64_t process_u64;
84                 };
85         struct rte_node *nodes[] __rte_cache_min_aligned; /**< Next nodes. */
86 } __rte_cache_aligned;
87
88 /**
89  * @internal
90  *
91  * Allocate a stream of objects.
92  *
93  * If stream already exists then re-allocate it to a larger size.
94  *
95  * @param graph
96  *   Pointer to the graph object.
97  * @param node
98  *   Pointer to the node object.
99  */
100 __rte_experimental
101 void __rte_node_stream_alloc(struct rte_graph *graph, struct rte_node *node);
102
103 /**
104  * @internal
105  *
106  * Allocate a stream with requested number of objects.
107  *
108  * If stream already exists then re-allocate it to a larger size.
109  *
110  * @param graph
111  *   Pointer to the graph object.
112  * @param node
113  *   Pointer to the node object.
114  * @param req_size
115  *   Number of objects to be allocated.
116  */
117 __rte_experimental
118 void __rte_node_stream_alloc_size(struct rte_graph *graph,
119                                   struct rte_node *node, uint16_t req_size);
120
121 /**
122  * Perform graph walk on the circular buffer and invoke the process function
123  * of the nodes and collect the stats.
124  *
125  * @param graph
126  *   Graph pointer returned from rte_graph_lookup function.
127  *
128  * @see rte_graph_lookup()
129  */
130 __rte_experimental
131 static inline void
132 rte_graph_walk(struct rte_graph *graph)
133 {
134         const rte_graph_off_t *cir_start = graph->cir_start;
135         const rte_node_t mask = graph->cir_mask;
136         uint32_t head = graph->head;
137         struct rte_node *node;
138         uint64_t start;
139         uint16_t rc;
140         void **objs;
141
142         /*
143          * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
144          * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
145          * in a circular buffer fashion.
146          *
147          *      +-----+ <= cir_start - head [number of source nodes]
148          *      |     |
149          *      | ... | <= source nodes
150          *      |     |
151          *      +-----+ <= cir_start [head = 0] [tail = 0]
152          *      |     |
153          *      | ... | <= pending streams
154          *      |     |
155          *      +-----+ <= cir_start + mask
156          */
157         while (likely(head != graph->tail)) {
158                 node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
159                 RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
160                 objs = node->objs;
161                 rte_prefetch0(objs);
162
163                 if (rte_graph_has_stats_feature()) {
164                         start = rte_rdtsc();
165                         rc = node->process(graph, node, objs, node->idx);
166                         node->total_cycles += rte_rdtsc() - start;
167                         node->total_calls++;
168                         node->total_objs += rc;
169                 } else {
170                         node->process(graph, node, objs, node->idx);
171                 }
172                 node->idx = 0;
173                 head = likely((int32_t)head > 0) ? head & mask : head;
174         }
175         graph->tail = 0;
176 }
177
178 /* Fast path helper functions */
179
180 /**
181  * @internal
182  *
183  * Enqueue a given node to the tail of the graph reel.
184  *
185  * @param graph
186  *   Pointer Graph object.
187  * @param node
188  *   Pointer to node object to be enqueued.
189  */
190 static __rte_always_inline void
191 __rte_node_enqueue_tail_update(struct rte_graph *graph, struct rte_node *node)
192 {
193         uint32_t tail;
194
195         tail = graph->tail;
196         graph->cir_start[tail++] = node->off;
197         graph->tail = tail & graph->cir_mask;
198 }
199
200 /**
201  * @internal
202  *
203  * Enqueue sequence prologue function.
204  *
205  * Updates the node to tail of graph reel and resizes the number of objects
206  * available in the stream as needed.
207  *
208  * @param graph
209  *   Pointer to the graph object.
210  * @param node
211  *   Pointer to the node object.
212  * @param idx
213  *   Index at which the object enqueue starts from.
214  * @param space
215  *   Space required for the object enqueue.
216  */
217 static __rte_always_inline void
218 __rte_node_enqueue_prologue(struct rte_graph *graph, struct rte_node *node,
219                             const uint16_t idx, const uint16_t space)
220 {
221
222         /* Add to the pending stream list if the node is new */
223         if (idx == 0)
224                 __rte_node_enqueue_tail_update(graph, node);
225
226         if (unlikely(node->size < (idx + space)))
227                 __rte_node_stream_alloc(graph, node);
228 }
229
230 /**
231  * @internal
232  *
233  * Get the node pointer from current node edge id.
234  *
235  * @param node
236  *   Current node pointer.
237  * @param next
238  *   Edge id of the required node.
239  *
240  * @return
241  *   Pointer to the node denoted by the edge id.
242  */
243 static __rte_always_inline struct rte_node *
244 __rte_node_next_node_get(struct rte_node *node, rte_edge_t next)
245 {
246         RTE_ASSERT(next < node->nb_edges);
247         RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
248         node = node->nodes[next];
249         RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
250
251         return node;
252 }
253
254 /**
255  * Enqueue the objs to next node for further processing and set
256  * the next node to pending state in the circular buffer.
257  *
258  * @param graph
259  *   Graph pointer returned from rte_graph_lookup().
260  * @param node
261  *   Current node pointer.
262  * @param next
263  *   Relative next node index to enqueue objs.
264  * @param objs
265  *   Objs to enqueue.
266  * @param nb_objs
267  *   Number of objs to enqueue.
268  */
269 __rte_experimental
270 static inline void
271 rte_node_enqueue(struct rte_graph *graph, struct rte_node *node,
272                  rte_edge_t next, void **objs, uint16_t nb_objs)
273 {
274         node = __rte_node_next_node_get(node, next);
275         const uint16_t idx = node->idx;
276
277         __rte_node_enqueue_prologue(graph, node, idx, nb_objs);
278
279         rte_memcpy(&node->objs[idx], objs, nb_objs * sizeof(void *));
280         node->idx = idx + nb_objs;
281 }
282
283 /**
284  * Enqueue only one obj to next node for further processing and
285  * set the next node to pending state in the circular buffer.
286  *
287  * @param graph
288  *   Graph pointer returned from rte_graph_lookup().
289  * @param node
290  *   Current node pointer.
291  * @param next
292  *   Relative next node index to enqueue objs.
293  * @param obj
294  *   Obj to enqueue.
295  */
296 __rte_experimental
297 static inline void
298 rte_node_enqueue_x1(struct rte_graph *graph, struct rte_node *node,
299                     rte_edge_t next, void *obj)
300 {
301         node = __rte_node_next_node_get(node, next);
302         uint16_t idx = node->idx;
303
304         __rte_node_enqueue_prologue(graph, node, idx, 1);
305
306         node->objs[idx++] = obj;
307         node->idx = idx;
308 }
309
310 /**
311  * Enqueue only two objs to next node for further processing and
312  * set the next node to pending state in the circular buffer.
313  * Same as rte_node_enqueue_x1 but enqueue two objs.
314  *
315  * @param graph
316  *   Graph pointer returned from rte_graph_lookup().
317  * @param node
318  *   Current node pointer.
319  * @param next
320  *   Relative next node index to enqueue objs.
321  * @param obj0
322  *   Obj to enqueue.
323  * @param obj1
324  *   Obj to enqueue.
325  */
326 __rte_experimental
327 static inline void
328 rte_node_enqueue_x2(struct rte_graph *graph, struct rte_node *node,
329                     rte_edge_t next, void *obj0, void *obj1)
330 {
331         node = __rte_node_next_node_get(node, next);
332         uint16_t idx = node->idx;
333
334         __rte_node_enqueue_prologue(graph, node, idx, 2);
335
336         node->objs[idx++] = obj0;
337         node->objs[idx++] = obj1;
338         node->idx = idx;
339 }
340
341 /**
342  * Enqueue only four objs to next node for further processing and
343  * set the next node to pending state in the circular buffer.
344  * Same as rte_node_enqueue_x1 but enqueue four objs.
345  *
346  * @param graph
347  *   Graph pointer returned from rte_graph_lookup().
348  * @param node
349  *   Current node pointer.
350  * @param next
351  *   Relative next node index to enqueue objs.
352  * @param obj0
353  *   1st obj to enqueue.
354  * @param obj1
355  *   2nd obj to enqueue.
356  * @param obj2
357  *   3rd obj to enqueue.
358  * @param obj3
359  *   4th obj to enqueue.
360  */
361 __rte_experimental
362 static inline void
363 rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
364                     rte_edge_t next, void *obj0, void *obj1, void *obj2,
365                     void *obj3)
366 {
367         node = __rte_node_next_node_get(node, next);
368         uint16_t idx = node->idx;
369
370         __rte_node_enqueue_prologue(graph, node, idx, 4);
371
372         node->objs[idx++] = obj0;
373         node->objs[idx++] = obj1;
374         node->objs[idx++] = obj2;
375         node->objs[idx++] = obj3;
376         node->idx = idx;
377 }
378
379 /**
380  * Enqueue objs to multiple next nodes for further processing and
381  * set the next nodes to pending state in the circular buffer.
382  * objs[i] will be enqueued to nexts[i].
383  *
384  * @param graph
385  *   Graph pointer returned from rte_graph_lookup().
386  * @param node
387  *   Current node pointer.
388  * @param nexts
389  *   List of relative next node indices to enqueue objs.
390  * @param objs
391  *   List of objs to enqueue.
392  * @param nb_objs
393  *   Number of objs to enqueue.
394  */
395 __rte_experimental
396 static inline void
397 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
398                       rte_edge_t *nexts, void **objs, uint16_t nb_objs)
399 {
400         uint16_t i;
401
402         for (i = 0; i < nb_objs; i++)
403                 rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
404 }
405
406 /**
407  * Get the stream of next node to enqueue the objs.
408  * Once done with the updating the objs, needs to call
409  * rte_node_next_stream_put to put the next node to pending state.
410  *
411  * @param graph
412  *   Graph pointer returned from rte_graph_lookup().
413  * @param node
414  *   Current node pointer.
415  * @param next
416  *   Relative next node index to get stream.
417  * @param nb_objs
418  *   Requested free size of the next stream.
419  *
420  * @return
421  *   Valid next stream on success.
422  *
423  * @see rte_node_next_stream_put().
424  */
425 __rte_experimental
426 static inline void **
427 rte_node_next_stream_get(struct rte_graph *graph, struct rte_node *node,
428                          rte_edge_t next, uint16_t nb_objs)
429 {
430         node = __rte_node_next_node_get(node, next);
431         const uint16_t idx = node->idx;
432         uint16_t free_space = node->size - idx;
433
434         if (unlikely(free_space < nb_objs))
435                 __rte_node_stream_alloc_size(graph, node, nb_objs);
436
437         return &node->objs[idx];
438 }
439
440 /**
441  * Put the next stream to pending state in the circular buffer
442  * for further processing. Should be invoked after rte_node_next_stream_get().
443  *
444  * @param graph
445  *   Graph pointer returned from rte_graph_lookup().
446  * @param node
447  *   Current node pointer.
448  * @param next
449  *   Relative next node index..
450  * @param idx
451  *   Number of objs updated in the stream after getting the stream using
452  *   rte_node_next_stream_get.
453  *
454  * @see rte_node_next_stream_get().
455  */
456 __rte_experimental
457 static inline void
458 rte_node_next_stream_put(struct rte_graph *graph, struct rte_node *node,
459                          rte_edge_t next, uint16_t idx)
460 {
461         if (unlikely(!idx))
462                 return;
463
464         node = __rte_node_next_node_get(node, next);
465         if (node->idx == 0)
466                 __rte_node_enqueue_tail_update(graph, node);
467
468         node->idx += idx;
469 }
470
471 /**
472  * Home run scenario, Enqueue all the objs of current node to next
473  * node in optimized way by swapping the streams of both nodes.
474  * Performs good when next node is already not in pending state.
475  * If next node is already in pending state then normal enqueue
476  * will be used.
477  *
478  * @param graph
479  *   Graph pointer returned from rte_graph_lookup().
480  * @param src
481  *   Current node pointer.
482  * @param next
483  *   Relative next node index.
484  */
485 __rte_experimental
486 static inline void
487 rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
488                           rte_edge_t next)
489 {
490         struct rte_node *dst = __rte_node_next_node_get(src, next);
491
492         /* Let swap the pointers if dst don't have valid objs */
493         if (likely(dst->idx == 0)) {
494                 void **dobjs = dst->objs;
495                 uint16_t dsz = dst->size;
496                 dst->objs = src->objs;
497                 dst->size = src->size;
498                 src->objs = dobjs;
499                 src->size = dsz;
500                 dst->idx = src->idx;
501                 __rte_node_enqueue_tail_update(graph, dst);
502         } else { /* Move the objects from src node to dst node */
503                 rte_node_enqueue(graph, src, next, src->objs, src->idx);
504         }
505 }
506
507 #ifdef __cplusplus
508 }
509 #endif
510
511 #endif /* _RTE_GRAPH_WORKER_H_ */