crypto/scheduler: fix build with old gcc
[dpdk.git] / drivers / crypto / scheduler / scheduler_multicore.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 #include <unistd.h>
33
34 #include <rte_cryptodev.h>
35 #include <rte_malloc.h>
36
37 #include "rte_cryptodev_scheduler_operations.h"
38 #include "scheduler_pmd_private.h"
39
40 #define MC_SCHED_ENQ_RING_NAME_PREFIX   "MCS_ENQR_"
41 #define MC_SCHED_DEQ_RING_NAME_PREFIX   "MCS_DEQR_"
42
43 #define MC_SCHED_BUFFER_SIZE 32
44
45 /** multi-core scheduler context */
46 struct mc_scheduler_ctx {
47         uint32_t num_workers;             /**< Number of workers polling */
48         uint32_t stop_signal;
49
50         struct rte_ring *sched_enq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES];
51         struct rte_ring *sched_deq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES];
52 };
53
54 struct mc_scheduler_qp_ctx {
55         struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES];
56         uint32_t nb_slaves;
57
58         uint32_t last_enq_worker_idx;
59         uint32_t last_deq_worker_idx;
60
61         struct mc_scheduler_ctx *mc_private_ctx;
62 };
63
64 static uint16_t
65 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
66 {
67         struct mc_scheduler_qp_ctx *mc_qp_ctx =
68                         ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
69         struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
70         uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx;
71         uint16_t i, processed_ops = 0;
72
73         if (unlikely(nb_ops == 0))
74                 return 0;
75
76         for (i = 0; i <  mc_ctx->num_workers && nb_ops != 0; i++) {
77                 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx];
78                 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring,
79                         (void *)(&ops[processed_ops]), nb_ops, NULL);
80
81                 nb_ops -= nb_queue_ops;
82                 processed_ops += nb_queue_ops;
83
84                 if (++worker_idx == mc_ctx->num_workers)
85                         worker_idx = 0;
86         }
87         mc_qp_ctx->last_enq_worker_idx = worker_idx;
88
89         return processed_ops;
90 }
91
92 static uint16_t
93 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
94                 uint16_t nb_ops)
95 {
96         struct rte_ring *order_ring =
97                         ((struct scheduler_qp_ctx *)qp)->order_ring;
98         uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
99                         nb_ops);
100         uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
101                         nb_ops_to_enq);
102
103         scheduler_order_insert(order_ring, ops, nb_ops_enqd);
104
105         return nb_ops_enqd;
106 }
107
108
109 static uint16_t
110 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
111 {
112         struct mc_scheduler_qp_ctx *mc_qp_ctx =
113                         ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
114         struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
115         uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx;
116         uint16_t i, processed_ops = 0;
117
118         for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) {
119                 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx];
120                 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring,
121                         (void *)(&ops[processed_ops]), nb_ops, NULL);
122
123                 nb_ops -= nb_deq_ops;
124                 processed_ops += nb_deq_ops;
125                 if (++worker_idx == mc_ctx->num_workers)
126                         worker_idx = 0;
127         }
128
129         mc_qp_ctx->last_deq_worker_idx = worker_idx;
130
131         return processed_ops;
132
133 }
134
135 static uint16_t
136 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
137                 uint16_t nb_ops)
138 {
139         struct rte_ring *order_ring =
140                         ((struct scheduler_qp_ctx *)qp)->order_ring;
141
142         return scheduler_order_drain(order_ring, ops, nb_ops);
143 }
144
145 static int
146 slave_attach(__rte_unused struct rte_cryptodev *dev,
147                 __rte_unused uint8_t slave_id)
148 {
149         return 0;
150 }
151
152 static int
153 slave_detach(__rte_unused struct rte_cryptodev *dev,
154                 __rte_unused uint8_t slave_id)
155 {
156         return 0;
157 }
158
159 static int
160 mc_scheduler_worker(struct rte_cryptodev *dev)
161 {
162         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
163         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
164         struct rte_ring *enq_ring;
165         struct rte_ring *deq_ring;
166         uint32_t core_id = rte_lcore_id();
167         int i, worker_idx = -1;
168         struct scheduler_slave *slave;
169         struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE];
170         struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE];
171         uint16_t processed_ops;
172         uint16_t left_op = 0;
173         uint16_t left_op_idx = 0;
174         uint16_t inflight_ops = 0;
175
176         for (i = 0; i < (int)sched_ctx->nb_wc; i++) {
177                 if (sched_ctx->wc_pool[i] == core_id) {
178                         worker_idx = i;
179                         break;
180                 }
181         }
182         if (worker_idx == -1) {
183                 CS_LOG_ERR("worker on core %u:cannot find worker index!\n", core_id);
184                 return -1;
185         }
186
187         slave = &sched_ctx->slaves[worker_idx];
188         enq_ring = mc_ctx->sched_enq_ring[worker_idx];
189         deq_ring = mc_ctx->sched_deq_ring[worker_idx];
190
191         while (!mc_ctx->stop_signal) {
192                 if (left_op) {
193                         processed_ops =
194                                 rte_cryptodev_enqueue_burst(slave->dev_id,
195                                                 slave->qp_id,
196                                                 &enq_ops[left_op_idx], left_op);
197
198                         left_op -= processed_ops;
199                         left_op_idx += processed_ops;
200                 } else {
201                         uint16_t nb_deq_ops = rte_ring_dequeue_burst(enq_ring,
202                                 (void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL);
203                         if (nb_deq_ops) {
204                                 processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
205                                                 slave->qp_id, enq_ops, nb_deq_ops);
206
207                                 if (unlikely(processed_ops < nb_deq_ops)) {
208                                         left_op = nb_deq_ops - processed_ops;
209                                         left_op_idx = processed_ops;
210                                 }
211
212                                 inflight_ops += processed_ops;
213                         }
214                 }
215
216                 if (inflight_ops > 0) {
217                         processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
218                                         slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE);
219                         if (processed_ops) {
220                                 uint16_t nb_enq_ops = rte_ring_enqueue_burst(deq_ring,
221                                         (void *)deq_ops, processed_ops, NULL);
222                                 inflight_ops -= nb_enq_ops;
223                         }
224                 }
225
226                 rte_pause();
227         }
228
229         return 0;
230 }
231
232 static int
233 scheduler_start(struct rte_cryptodev *dev)
234 {
235         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
236         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
237         uint16_t i;
238
239         mc_ctx->stop_signal = 0;
240
241         for (i = 0; i < sched_ctx->nb_wc; i++)
242                 rte_eal_remote_launch(
243                         (lcore_function_t *)mc_scheduler_worker, dev,
244                                         sched_ctx->wc_pool[i]);
245
246         if (sched_ctx->reordering_enabled) {
247                 dev->enqueue_burst = &schedule_enqueue_ordering;
248                 dev->dequeue_burst = &schedule_dequeue_ordering;
249         } else {
250                 dev->enqueue_burst = &schedule_enqueue;
251                 dev->dequeue_burst = &schedule_dequeue;
252         }
253
254         for (i = 0; i < dev->data->nb_queue_pairs; i++) {
255                 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
256                 struct mc_scheduler_qp_ctx *mc_qp_ctx =
257                                 qp_ctx->private_qp_ctx;
258                 uint32_t j;
259
260                 memset(mc_qp_ctx->slaves, 0,
261                                 RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES *
262                                 sizeof(struct scheduler_slave));
263                 for (j = 0; j < sched_ctx->nb_slaves; j++) {
264                         mc_qp_ctx->slaves[j].dev_id =
265                                         sched_ctx->slaves[j].dev_id;
266                         mc_qp_ctx->slaves[j].qp_id = i;
267                 }
268
269                 mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves;
270
271                 mc_qp_ctx->last_enq_worker_idx = 0;
272                 mc_qp_ctx->last_deq_worker_idx = 0;
273         }
274
275         return 0;
276 }
277
278 static int
279 scheduler_stop(struct rte_cryptodev *dev)
280 {
281         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
282         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
283         uint16_t i;
284
285         mc_ctx->stop_signal = 1;
286
287         for (i = 0; i < sched_ctx->nb_wc; i++)
288                 rte_eal_wait_lcore(sched_ctx->wc_pool[i]);
289
290         return 0;
291 }
292
293 static int
294 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id)
295 {
296         struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
297         struct mc_scheduler_qp_ctx *mc_qp_ctx;
298         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
299         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
300
301         mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0,
302                         rte_socket_id());
303         if (!mc_qp_ctx) {
304                 CS_LOG_ERR("failed allocate memory for private queue pair");
305                 return -ENOMEM;
306         }
307
308         mc_qp_ctx->mc_private_ctx = mc_ctx;
309         qp_ctx->private_qp_ctx = (void *)mc_qp_ctx;
310
311
312         return 0;
313 }
314
315 static int
316 scheduler_create_private_ctx(struct rte_cryptodev *dev)
317 {
318         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
319         struct mc_scheduler_ctx *mc_ctx;
320         uint16_t i;
321
322         if (sched_ctx->private_ctx)
323                 rte_free(sched_ctx->private_ctx);
324
325         mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0,
326                         rte_socket_id());
327         if (!mc_ctx) {
328                 CS_LOG_ERR("failed allocate memory");
329                 return -ENOMEM;
330         }
331
332         mc_ctx->num_workers = sched_ctx->nb_wc;
333         for (i = 0; i < sched_ctx->nb_wc; i++) {
334                 char r_name[16];
335
336                 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX "%u", i);
337                 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE,
338                                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
339                 if (!mc_ctx->sched_enq_ring[i]) {
340                         CS_LOG_ERR("Cannot create ring for worker %u", i);
341                         return -1;
342                 }
343                 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX "%u", i);
344                 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE,
345                                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
346                 if (!mc_ctx->sched_deq_ring[i]) {
347                         CS_LOG_ERR("Cannot create ring for worker %u", i);
348                         return -1;
349                 }
350         }
351
352         sched_ctx->private_ctx = (void *)mc_ctx;
353
354         return 0;
355 }
356
357 struct rte_cryptodev_scheduler_ops scheduler_mc_ops = {
358         slave_attach,
359         slave_detach,
360         scheduler_start,
361         scheduler_stop,
362         scheduler_config_qp,
363         scheduler_create_private_ctx,
364         NULL,   /* option_set */
365         NULL    /* option_get */
366 };
367
368 struct rte_cryptodev_scheduler mc_scheduler = {
369                 .name = "multicore-scheduler",
370                 .description = "scheduler which will run burst across multiple cpu cores",
371                 .mode = CDEV_SCHED_MODE_MULTICORE,
372                 .ops = &scheduler_mc_ops
373 };
374
375 struct rte_cryptodev_scheduler *multicore_scheduler = &mc_scheduler;