cryptodev: support device independent sessions
[dpdk.git] / drivers / crypto / scheduler / scheduler_multicore.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2017 Intel Corporation. All rights reserved.
5  *
6  *   Redistribution and use in source and binary forms, with or without
7  *   modification, are permitted provided that the following conditions
8  *   are met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above copyright
13  *       notice, this list of conditions and the following disclaimer in
14  *       the documentation and/or other materials provided with the
15  *       distribution.
16  *     * Neither the name of Intel Corporation nor the names of its
17  *       contributors may be used to endorse or promote products derived
18  *       from this software without specific prior written permission.
19  *
20  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 #include <unistd.h>
33
34 #include <rte_cryptodev.h>
35 #include <rte_malloc.h>
36
37 #include "rte_cryptodev_scheduler_operations.h"
38 #include "scheduler_pmd_private.h"
39
40 #define MC_SCHED_ENQ_RING_NAME_PREFIX   "MCS_ENQR_"
41 #define MC_SCHED_DEQ_RING_NAME_PREFIX   "MCS_DEQR_"
42
43 #define MC_SCHED_BUFFER_SIZE 32
44
45 /** multi-core scheduler context */
46 struct mc_scheduler_ctx {
47         uint32_t num_workers;             /**< Number of workers polling */
48         uint32_t stop_signal;
49
50         struct rte_ring *sched_enq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES];
51         struct rte_ring *sched_deq_ring[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKER_CORES];
52 };
53
54 struct mc_scheduler_qp_ctx {
55         struct scheduler_slave slaves[RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES];
56         uint32_t nb_slaves;
57
58         uint32_t last_enq_worker_idx;
59         uint32_t last_deq_worker_idx;
60
61         struct mc_scheduler_ctx *mc_private_ctx;
62 };
63
64 static uint16_t
65 schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
66 {
67         struct mc_scheduler_qp_ctx *mc_qp_ctx =
68                         ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
69         struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
70         uint32_t worker_idx = mc_qp_ctx->last_enq_worker_idx;
71         uint16_t i, processed_ops = 0;
72
73         if (unlikely(nb_ops == 0))
74                 return 0;
75
76         for (i = 0; i <  mc_ctx->num_workers && nb_ops != 0; i++) {
77                 struct rte_ring *enq_ring = mc_ctx->sched_enq_ring[worker_idx];
78                 uint16_t nb_queue_ops = rte_ring_enqueue_burst(enq_ring,
79                         (void *)(&ops[processed_ops]), nb_ops, NULL);
80
81                 nb_ops -= nb_queue_ops;
82                 processed_ops += nb_queue_ops;
83
84                 if (++worker_idx == mc_ctx->num_workers)
85                         worker_idx = 0;
86         }
87         mc_qp_ctx->last_enq_worker_idx = worker_idx;
88
89         return processed_ops;
90 }
91
92 static uint16_t
93 schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
94                 uint16_t nb_ops)
95 {
96         struct rte_ring *order_ring =
97                         ((struct scheduler_qp_ctx *)qp)->order_ring;
98         uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
99                         nb_ops);
100         uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
101                         nb_ops_to_enq);
102
103         scheduler_order_insert(order_ring, ops, nb_ops_enqd);
104
105         return nb_ops_enqd;
106 }
107
108
109 static uint16_t
110 schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops)
111 {
112         struct mc_scheduler_qp_ctx *mc_qp_ctx =
113                         ((struct scheduler_qp_ctx *)qp)->private_qp_ctx;
114         struct mc_scheduler_ctx *mc_ctx = mc_qp_ctx->mc_private_ctx;
115         uint32_t worker_idx = mc_qp_ctx->last_deq_worker_idx;
116         uint16_t i, processed_ops = 0;
117
118         for (i = 0; i < mc_ctx->num_workers && nb_ops != 0; i++) {
119                 struct rte_ring *deq_ring = mc_ctx->sched_deq_ring[worker_idx];
120                 uint16_t nb_deq_ops = rte_ring_dequeue_burst(deq_ring,
121                         (void *)(&ops[processed_ops]), nb_ops, NULL);
122
123                 nb_ops -= nb_deq_ops;
124                 processed_ops += nb_deq_ops;
125                 if (++worker_idx == mc_ctx->num_workers)
126                         worker_idx = 0;
127         }
128
129         mc_qp_ctx->last_deq_worker_idx = worker_idx;
130
131         return processed_ops;
132
133 }
134
135 static uint16_t
136 schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
137                 uint16_t nb_ops)
138 {
139         struct rte_ring *order_ring =
140                         ((struct scheduler_qp_ctx *)qp)->order_ring;
141
142         return scheduler_order_drain(order_ring, ops, nb_ops);
143 }
144
145 static int
146 slave_attach(__rte_unused struct rte_cryptodev *dev,
147                 __rte_unused uint8_t slave_id)
148 {
149         return 0;
150 }
151
152 static int
153 slave_detach(__rte_unused struct rte_cryptodev *dev,
154                 __rte_unused uint8_t slave_id)
155 {
156         return 0;
157 }
158
159 static int
160 mc_scheduler_worker(struct rte_cryptodev *dev)
161 {
162         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
163         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
164         struct rte_ring *enq_ring;
165         struct rte_ring *deq_ring;
166         uint32_t core_id = rte_lcore_id();
167         int i, worker_idx = -1;
168         struct scheduler_slave *slave;
169         struct rte_crypto_op *enq_ops[MC_SCHED_BUFFER_SIZE];
170         struct rte_crypto_op *deq_ops[MC_SCHED_BUFFER_SIZE];
171         uint16_t processed_ops;
172         uint16_t left_op = 0;
173         uint16_t left_op_idx = 0;
174         uint16_t inflight_ops = 0;
175
176         for (i = 0; i < (int)sched_ctx->nb_wc; i++) {
177                 if (sched_ctx->wc_pool[i] == core_id) {
178                         worker_idx = i;
179                         break;
180                 }
181         }
182         if (worker_idx == -1) {
183                 CS_LOG_ERR("worker on core %u:cannot find worker index!\n", core_id);
184                 return -1;
185         }
186
187         slave = &sched_ctx->slaves[worker_idx];
188         enq_ring = mc_ctx->sched_enq_ring[worker_idx];
189         deq_ring = mc_ctx->sched_deq_ring[worker_idx];
190
191         while (!mc_ctx->stop_signal) {
192                 if (left_op) {
193                         processed_ops =
194                                 rte_cryptodev_enqueue_burst(slave->dev_id,
195                                                 slave->qp_id,
196                                                 &enq_ops[left_op_idx], left_op);
197
198                         left_op -= processed_ops;
199                         left_op_idx += processed_ops;
200                 } else {
201                         uint16_t nb_deq_ops = rte_ring_dequeue_burst(enq_ring,
202                                 (void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL);
203                         if (nb_deq_ops) {
204                                 processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
205                                                 slave->qp_id, enq_ops, nb_deq_ops);
206
207                                 if (unlikely(processed_ops < nb_deq_ops)) {
208                                         left_op = nb_deq_ops - processed_ops;
209                                         left_op_idx = processed_ops;
210                                 }
211
212                                 inflight_ops += processed_ops;
213                         }
214                 }
215
216                 if (inflight_ops > 0) {
217                         processed_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
218                                         slave->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE);
219                         if (processed_ops) {
220                                 uint16_t nb_enq_ops = rte_ring_enqueue_burst(deq_ring,
221                                         (void *)deq_ops, processed_ops, NULL);
222                                 inflight_ops -= nb_enq_ops;
223                         }
224                 }
225
226                 rte_pause();
227         }
228
229         return 0;
230 }
231
232 static int
233 scheduler_start(struct rte_cryptodev *dev)
234 {
235         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
236         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
237         uint16_t i;
238
239         mc_ctx->stop_signal = 0;
240
241         for (i = 0; i < sched_ctx->nb_wc; i++)
242                 rte_eal_remote_launch(
243                         (lcore_function_t *)mc_scheduler_worker, dev,
244                                         sched_ctx->wc_pool[i]);
245
246         if (sched_ctx->reordering_enabled) {
247                 dev->enqueue_burst = &schedule_enqueue_ordering;
248                 dev->dequeue_burst = &schedule_dequeue_ordering;
249         } else {
250                 dev->enqueue_burst = &schedule_enqueue;
251                 dev->dequeue_burst = &schedule_dequeue;
252         }
253
254         for (i = 0; i < dev->data->nb_queue_pairs; i++) {
255                 struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];
256                 struct mc_scheduler_qp_ctx *mc_qp_ctx =
257                                 qp_ctx->private_qp_ctx;
258                 uint32_t j;
259
260                 memset(mc_qp_ctx->slaves, 0,
261                                 RTE_CRYPTODEV_SCHEDULER_MAX_NB_SLAVES *
262                                 sizeof(struct scheduler_slave));
263                 for (j = 0; j < sched_ctx->nb_slaves; j++) {
264                         mc_qp_ctx->slaves[j].dev_id =
265                                         sched_ctx->slaves[j].dev_id;
266                         mc_qp_ctx->slaves[j].qp_id = i;
267                 }
268
269                 mc_qp_ctx->nb_slaves = sched_ctx->nb_slaves;
270
271                 mc_qp_ctx->last_enq_worker_idx = 0;
272                 mc_qp_ctx->last_deq_worker_idx = 0;
273         }
274
275         return 0;
276 }
277
278 static int
279 scheduler_stop(struct rte_cryptodev *dev)
280 {
281         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
282         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
283
284         mc_ctx->stop_signal = 1;
285
286         for (uint16_t i = 0; i < sched_ctx->nb_wc; i++)
287                 rte_eal_wait_lcore(sched_ctx->wc_pool[i]);
288
289         return 0;
290 }
291
292 static int
293 scheduler_config_qp(struct rte_cryptodev *dev, uint16_t qp_id)
294 {
295         struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];
296         struct mc_scheduler_qp_ctx *mc_qp_ctx;
297         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
298         struct mc_scheduler_ctx *mc_ctx = sched_ctx->private_ctx;
299
300         mc_qp_ctx = rte_zmalloc_socket(NULL, sizeof(*mc_qp_ctx), 0,
301                         rte_socket_id());
302         if (!mc_qp_ctx) {
303                 CS_LOG_ERR("failed allocate memory for private queue pair");
304                 return -ENOMEM;
305         }
306
307         mc_qp_ctx->mc_private_ctx = mc_ctx;
308         qp_ctx->private_qp_ctx = (void *)mc_qp_ctx;
309
310
311         return 0;
312 }
313
314 static int
315 scheduler_create_private_ctx(struct rte_cryptodev *dev)
316 {
317         struct scheduler_ctx *sched_ctx = dev->data->dev_private;
318         struct mc_scheduler_ctx *mc_ctx;
319
320         if (sched_ctx->private_ctx)
321                 rte_free(sched_ctx->private_ctx);
322
323         mc_ctx = rte_zmalloc_socket(NULL, sizeof(struct mc_scheduler_ctx), 0,
324                         rte_socket_id());
325         if (!mc_ctx) {
326                 CS_LOG_ERR("failed allocate memory");
327                 return -ENOMEM;
328         }
329
330         mc_ctx->num_workers = sched_ctx->nb_wc;
331         for (uint16_t i = 0; i < sched_ctx->nb_wc; i++) {
332                 char r_name[16];
333
334                 snprintf(r_name, sizeof(r_name), MC_SCHED_ENQ_RING_NAME_PREFIX "%u", i);
335                 mc_ctx->sched_enq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE,
336                                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
337                 if (!mc_ctx->sched_enq_ring[i]) {
338                         CS_LOG_ERR("Cannot create ring for worker %u", i);
339                         return -1;
340                 }
341                 snprintf(r_name, sizeof(r_name), MC_SCHED_DEQ_RING_NAME_PREFIX "%u", i);
342                 mc_ctx->sched_deq_ring[i] = rte_ring_create(r_name, PER_SLAVE_BUFF_SIZE,
343                                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
344                 if (!mc_ctx->sched_deq_ring[i]) {
345                         CS_LOG_ERR("Cannot create ring for worker %u", i);
346                         return -1;
347                 }
348         }
349
350         sched_ctx->private_ctx = (void *)mc_ctx;
351
352         return 0;
353 }
354
355 struct rte_cryptodev_scheduler_ops scheduler_mc_ops = {
356         slave_attach,
357         slave_detach,
358         scheduler_start,
359         scheduler_stop,
360         scheduler_config_qp,
361         scheduler_create_private_ctx,
362         NULL,   /* option_set */
363         NULL    /* option_get */
364 };
365
366 struct rte_cryptodev_scheduler mc_scheduler = {
367                 .name = "multicore-scheduler",
368                 .description = "scheduler which will run burst across multiple cpu cores",
369                 .mode = CDEV_SCHED_MODE_MULTICORE,
370                 .ops = &scheduler_mc_ops
371 };
372
373 struct rte_cryptodev_scheduler *multicore_scheduler = &mc_scheduler;