examples/vhost_blk: replace SMP barrier with thread fence
[dpdk.git] / examples / vhost_blk / vhost_blk.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2019 Intel Corporation
3  */
4
5 #ifndef _GNU_SOURCE
6 #define _GNU_SOURCE
7 #endif
8 #include <pthread.h>
9 #include <sched.h>
10
11 #include <stdint.h>
12 #include <unistd.h>
13 #include <stdbool.h>
14 #include <signal.h>
15 #include <assert.h>
16 #include <semaphore.h>
17 #include <linux/virtio_blk.h>
18 #include <linux/virtio_ring.h>
19
20 #include <rte_atomic.h>
21 #include <rte_cycles.h>
22 #include <rte_log.h>
23 #include <rte_malloc.h>
24 #include <rte_vhost.h>
25
26 #include "vhost_blk.h"
27 #include "blk_spec.h"
28
29 #define VIRTQ_DESC_F_NEXT       1
30 #define VIRTQ_DESC_F_AVAIL      (1 << 7)
31 #define VIRTQ_DESC_F_USED       (1 << 15)
32
33 #define MAX_TASK                12
34
35 #define VHOST_BLK_FEATURES ((1ULL << VIRTIO_F_RING_PACKED) | \
36                             (1ULL << VIRTIO_F_VERSION_1) |\
37                             (1ULL << VIRTIO_F_NOTIFY_ON_EMPTY) | \
38                             (1ULL << VHOST_USER_F_PROTOCOL_FEATURES))
39 #define CTRLR_NAME              "vhost.socket"
40
41 enum CTRLR_WORKER_STATUS {
42         WORKER_STATE_START = 0,
43         WORKER_STATE_STOP,
44 };
45
46 struct vhost_blk_ctrlr *g_vhost_ctrlr;
47
48 /* Path to folder where character device will be created. Can be set by user. */
49 static char dev_pathname[PATH_MAX] = "";
50 static sem_t exit_sem;
51 static enum CTRLR_WORKER_STATUS worker_thread_status;
52
53 struct vhost_blk_ctrlr *
54 vhost_blk_ctrlr_find(const char *ctrlr_name)
55 {
56         if (ctrlr_name == NULL)
57                 return NULL;
58
59         /* currently we only support 1 socket file fd */
60         return g_vhost_ctrlr;
61 }
62
63 static uint64_t
64 gpa_to_vva(struct vhost_blk_ctrlr *ctrlr, uint64_t gpa, uint64_t *len)
65 {
66         assert(ctrlr->mem != NULL);
67
68         return rte_vhost_va_from_guest_pa(ctrlr->mem, gpa, len);
69 }
70
71 static void
72 enqueue_task(struct vhost_blk_task *task)
73 {
74         struct vhost_blk_queue *vq = task->vq;
75         struct vring_used *used = vq->vring.used;
76
77         rte_vhost_set_last_inflight_io_split(task->ctrlr->vid,
78                 vq->id, task->req_idx);
79
80         /* Fill out the next entry in the "used" ring.  id = the
81          * index of the descriptor that contained the blk request.
82          * len = the total amount of data transferred for the blk
83          * request. We must report the correct len, for variable
84          * length blk CDBs, where we may return less data than
85          * allocated by the guest VM.
86          */
87         used->ring[used->idx & (vq->vring.size - 1)].id = task->req_idx;
88         used->ring[used->idx & (vq->vring.size - 1)].len = task->data_len;
89         rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
90         used->idx++;
91         rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
92
93         rte_vhost_clr_inflight_desc_split(task->ctrlr->vid,
94                 vq->id, used->idx, task->req_idx);
95
96         /* Send an interrupt back to the guest VM so that it knows
97          * a completion is ready to be processed.
98          */
99         rte_vhost_vring_call(task->ctrlr->vid, vq->id);
100 }
101
102 static void
103 enqueue_task_packed(struct vhost_blk_task *task)
104 {
105         struct vhost_blk_queue *vq = task->vq;
106         struct vring_packed_desc *desc;
107
108         rte_vhost_set_last_inflight_io_packed(task->ctrlr->vid, vq->id,
109                                             task->inflight_idx);
110
111         desc = &vq->vring.desc_packed[vq->last_used_idx];
112         desc->id = task->buffer_id;
113         desc->addr = 0;
114
115         rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
116         if (vq->used_wrap_counter)
117                 desc->flags |= VIRTQ_DESC_F_AVAIL | VIRTQ_DESC_F_USED;
118         else
119                 desc->flags &= ~(VIRTQ_DESC_F_AVAIL | VIRTQ_DESC_F_USED);
120         rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
121
122         rte_vhost_clr_inflight_desc_packed(task->ctrlr->vid, vq->id,
123                                            task->inflight_idx);
124
125         vq->last_used_idx += task->chain_num;
126         if (vq->last_used_idx >= vq->vring.size) {
127                 vq->last_used_idx -= vq->vring.size;
128                 vq->used_wrap_counter = !vq->used_wrap_counter;
129         }
130
131         /* Send an interrupt back to the guest VM so that it knows
132          * a completion is ready to be processed.
133          */
134         rte_vhost_vring_call(task->ctrlr->vid, vq->id);
135 }
136
137 static bool
138 descriptor_has_next_packed(struct vring_packed_desc *cur_desc)
139 {
140         return !!(cur_desc->flags & VRING_DESC_F_NEXT);
141 }
142
143 static bool
144 descriptor_has_next_split(struct vring_desc *cur_desc)
145 {
146         return !!(cur_desc->flags & VRING_DESC_F_NEXT);
147 }
148
149 static int
150 desc_payload_to_iovs(struct vhost_blk_ctrlr *ctrlr, struct iovec *iovs,
151                      uint32_t *iov_index, uintptr_t payload, uint64_t remaining)
152 {
153         void *vva;
154         uint64_t len;
155
156         do {
157                 if (*iov_index >= VHOST_BLK_MAX_IOVS) {
158                         fprintf(stderr, "VHOST_BLK_MAX_IOVS reached\n");
159                         return -1;
160                 }
161                 len = remaining;
162                 vva = (void *)(uintptr_t)gpa_to_vva(ctrlr,
163                                  payload, &len);
164                 if (!vva || !len) {
165                         fprintf(stderr, "failed to translate desc address.\n");
166                         return -1;
167                 }
168
169                 iovs[*iov_index].iov_base = vva;
170                 iovs[*iov_index].iov_len = len;
171                 payload += len;
172                 remaining -= len;
173                 (*iov_index)++;
174         } while (remaining);
175
176         return 0;
177 }
178
179 static struct vring_desc *
180 vring_get_next_desc(struct vhost_blk_queue *vq, struct vring_desc *desc)
181 {
182         if (descriptor_has_next_split(desc))
183                 return &vq->vring.desc[desc->next];
184
185         return NULL;
186 }
187
188 static struct vring_packed_desc *
189 vring_get_next_desc_packed(struct vhost_blk_queue *vq, uint16_t *req_idx)
190 {
191         if (descriptor_has_next_packed(&vq->vring.desc_packed[*req_idx])) {
192                 *req_idx = (*req_idx + 1) % vq->vring.size;
193                 return &vq->vring.desc_packed[*req_idx];
194         }
195
196         return NULL;
197 }
198
199 static struct rte_vhost_inflight_desc_packed *
200 vring_get_next_inflight_desc(struct vhost_blk_queue *vq,
201                         struct rte_vhost_inflight_desc_packed *desc)
202 {
203         if (!!(desc->flags & VRING_DESC_F_NEXT))
204                 return &vq->inflight_ring.inflight_packed->desc[desc->next];
205
206         return NULL;
207 }
208
209 static int
210 setup_iovs_from_descs_split(struct vhost_blk_ctrlr *ctrlr,
211                             struct vhost_blk_queue *vq, uint16_t req_idx,
212                             struct iovec *iovs, uint32_t *iovs_idx,
213                             uint32_t *payload)
214 {
215         struct vring_desc *desc = &vq->vring.desc[req_idx];
216
217         do {
218                 /* does not support indirect descriptors */
219                 assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
220
221                 if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
222                         fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
223                         return -1;
224                 }
225
226                 if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
227                         desc->addr, desc->len) != 0) {
228                         fprintf(stderr, "Failed to convert desc payload to iovs\n");
229                         return -1;
230                 }
231
232                 *payload += desc->len;
233
234                 desc = vring_get_next_desc(vq, desc);
235         } while (desc != NULL);
236
237         return 0;
238 }
239
240 static int
241 setup_iovs_from_descs_packed(struct vhost_blk_ctrlr *ctrlr,
242                              struct vhost_blk_queue *vq, uint16_t req_idx,
243                              struct iovec *iovs, uint32_t *iovs_idx,
244                              uint32_t *payload)
245 {
246         struct vring_packed_desc *desc = &vq->vring.desc_packed[req_idx];
247
248         do {
249                 /* does not support indirect descriptors */
250                 assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
251
252                 if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
253                         fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
254                         return -1;
255                 }
256
257                 if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
258                         desc->addr, desc->len) != 0) {
259                         fprintf(stderr, "Failed to convert desc payload to iovs\n");
260                         return -1;
261                 }
262
263                 *payload += desc->len;
264
265                 desc = vring_get_next_desc_packed(vq, &req_idx);
266         } while (desc != NULL);
267
268         return 0;
269 }
270
271 static int
272 setup_iovs_from_inflight_desc(struct vhost_blk_ctrlr *ctrlr,
273                               struct vhost_blk_queue *vq, uint16_t req_idx,
274                               struct iovec *iovs, uint32_t *iovs_idx,
275                               uint32_t *payload)
276 {
277         struct rte_vhost_ring_inflight *inflight_vq;
278         struct rte_vhost_inflight_desc_packed *desc;
279
280         inflight_vq = &vq->inflight_ring;
281         desc = &inflight_vq->inflight_packed->desc[req_idx];
282
283         do {
284                 /* does not support indirect descriptors */
285                 assert((desc->flags & VRING_DESC_F_INDIRECT) == 0);
286
287                 if (*iovs_idx >= VHOST_BLK_MAX_IOVS) {
288                         fprintf(stderr, "Reach VHOST_BLK_MAX_IOVS\n");
289                         return -1;
290                 }
291
292                 if (desc_payload_to_iovs(ctrlr, iovs, iovs_idx,
293                         desc->addr, desc->len) != 0) {
294                         fprintf(stderr, "Failed to convert desc payload to iovs\n");
295                         return -1;
296                 }
297
298                 *payload += desc->len;
299
300                 desc = vring_get_next_inflight_desc(vq, desc);
301         } while (desc != NULL);
302
303         return 0;
304 }
305
306 static void
307 process_blk_task(struct vhost_blk_task *task)
308 {
309         uint32_t payload = 0;
310
311         if (task->vq->packed_ring) {
312                 struct rte_vhost_ring_inflight *inflight_ring;
313                 struct rte_vhost_resubmit_info *resubmit_inflight;
314
315                 inflight_ring = &task->vq->inflight_ring;
316                 resubmit_inflight = inflight_ring->resubmit_inflight;
317
318                 if (resubmit_inflight != NULL &&
319                     resubmit_inflight->resubmit_list != NULL) {
320                         if (setup_iovs_from_inflight_desc(task->ctrlr, task->vq,
321                                 task->req_idx, task->iovs, &task->iovs_cnt,
322                                 &payload)) {
323                                 fprintf(stderr, "Failed to setup iovs\n");
324                                 return;
325                         }
326                 } else {
327                         if (setup_iovs_from_descs_packed(task->ctrlr, task->vq,
328                                 task->req_idx, task->iovs, &task->iovs_cnt,
329                                 &payload)) {
330                                 fprintf(stderr, "Failed to setup iovs\n");
331                                 return;
332                         }
333                 }
334         } else {
335                 if (setup_iovs_from_descs_split(task->ctrlr, task->vq,
336                         task->req_idx, task->iovs, &task->iovs_cnt, &payload)) {
337                         fprintf(stderr, "Failed to setup iovs\n");
338                         return;
339                 }
340         }
341
342         /* First IOV must be the req head. */
343         task->req = (struct virtio_blk_outhdr *)task->iovs[0].iov_base;
344         assert(sizeof(*task->req) == task->iovs[0].iov_len);
345
346         /* Last IOV must be the status tail. */
347         task->status = (uint8_t *)task->iovs[task->iovs_cnt - 1].iov_base;
348         assert(sizeof(*task->status) == task->iovs[task->iovs_cnt - 1].iov_len);
349
350         /* Transport data len */
351         task->data_len = payload - task->iovs[0].iov_len -
352                 task->iovs[task->iovs_cnt - 1].iov_len;
353
354         if (vhost_bdev_process_blk_commands(task->ctrlr->bdev, task))
355                 /* invalid response */
356                 *task->status = VIRTIO_BLK_S_IOERR;
357         else
358                 /* successfully */
359                 *task->status = VIRTIO_BLK_S_OK;
360
361         if (task->vq->packed_ring)
362                 enqueue_task_packed(task);
363         else
364                 enqueue_task(task);
365 }
366
367 static void
368 blk_task_init(struct vhost_blk_task *task)
369 {
370         task->iovs_cnt = 0;
371         task->data_len = 0;
372         task->req = NULL;
373         task->status = NULL;
374 }
375
376 static void
377 submit_inflight_vq(struct vhost_blk_queue *vq)
378 {
379         struct rte_vhost_ring_inflight *inflight_ring;
380         struct rte_vhost_resubmit_info *resubmit_inflight;
381         struct vhost_blk_task *task;
382
383         inflight_ring = &vq->inflight_ring;
384         resubmit_inflight = inflight_ring->resubmit_inflight;
385
386         if (resubmit_inflight == NULL ||
387             resubmit_inflight->resubmit_num == 0)
388                 return;
389
390         fprintf(stdout, "Resubmit inflight num is %d\n",
391                 resubmit_inflight->resubmit_num);
392
393         while (resubmit_inflight->resubmit_num-- > 0) {
394                 uint16_t desc_idx;
395
396                 desc_idx = resubmit_inflight->resubmit_list[
397                                         resubmit_inflight->resubmit_num].index;
398
399                 if (vq->packed_ring) {
400                         uint16_t task_idx;
401                         struct rte_vhost_inflight_desc_packed *desc;
402
403                         desc = inflight_ring->inflight_packed->desc;
404                         task_idx = desc[desc[desc_idx].last].id;
405                         task = &vq->tasks[task_idx];
406
407                         task->req_idx = desc_idx;
408                         task->chain_num = desc[desc_idx].num;
409                         task->buffer_id = task_idx;
410                         task->inflight_idx = desc_idx;
411
412                         vq->last_avail_idx += desc[desc_idx].num;
413                         if (vq->last_avail_idx >= vq->vring.size) {
414                                 vq->last_avail_idx -= vq->vring.size;
415                                 vq->avail_wrap_counter =
416                                         !vq->avail_wrap_counter;
417                         }
418                 } else
419                         /* In split ring, the desc_idx is the req_id
420                          * which was initialized when allocated the task pool.
421                          */
422                         task = &vq->tasks[desc_idx];
423
424                 blk_task_init(task);
425                 process_blk_task(task);
426         }
427
428         free(resubmit_inflight->resubmit_list);
429         resubmit_inflight->resubmit_list = NULL;
430 }
431
432 /* Use the buffer_id as the task_idx */
433 static uint16_t
434 vhost_blk_vq_get_desc_chain_buffer_id(struct vhost_blk_queue *vq,
435                                       uint16_t *req_head, uint16_t *num)
436 {
437         struct vring_packed_desc *desc = &vq->vring.desc_packed[
438                                                 vq->last_avail_idx];
439
440         *req_head = vq->last_avail_idx;
441         *num = 1;
442
443         while (descriptor_has_next_packed(desc)) {
444                 vq->last_avail_idx = (vq->last_avail_idx + 1) % vq->vring.size;
445                 desc = &vq->vring.desc_packed[vq->last_avail_idx];
446                 *num += 1;
447         }
448
449         /* Point to next desc */
450         vq->last_avail_idx = (vq->last_avail_idx + 1) % vq->vring.size;
451         if (vq->last_avail_idx < *req_head)
452                 vq->avail_wrap_counter = !vq->avail_wrap_counter;
453
454         return desc->id;
455 }
456
457 static uint16_t
458 vq_get_desc_idx(struct vhost_blk_queue *vq)
459 {
460         uint16_t desc_idx;
461         uint16_t last_avail_idx;
462
463         last_avail_idx = vq->last_avail_idx & (vq->vring.size - 1);
464         desc_idx = vq->vring.avail->ring[last_avail_idx];
465         vq->last_avail_idx++;
466
467         return desc_idx;
468 }
469
470 static int
471 vhost_blk_vq_is_avail(struct vhost_blk_queue *vq)
472 {
473         if (vq->packed_ring) {
474                 uint16_t flags = vq->vring.desc_packed[
475                                         vq->last_avail_idx].flags;
476                 bool avail_wrap_counter = vq->avail_wrap_counter;
477
478                 return (!!(flags & VIRTQ_DESC_F_AVAIL) == avail_wrap_counter &&
479                         !!(flags & VIRTQ_DESC_F_USED) != avail_wrap_counter);
480         } else {
481                 if (vq->vring.avail->idx != vq->last_avail_idx)
482                         return 1;
483
484                 return 0;
485         }
486 }
487
488 static void
489 process_vq(struct vhost_blk_queue *vq)
490 {
491         struct vhost_blk_task *task;
492
493         if (vq->packed_ring) {
494                 while (vhost_blk_vq_is_avail(vq)) {
495                         uint16_t task_idx, req_idx, last_idx, chain_num;
496
497                         task_idx = vhost_blk_vq_get_desc_chain_buffer_id(vq,
498                                         &req_idx, &chain_num);
499                         task = &vq->tasks[task_idx];
500
501                         blk_task_init(task);
502                         task->req_idx = req_idx;
503                         task->chain_num = chain_num;
504                         task->buffer_id = task_idx;
505                         last_idx = (req_idx + chain_num - 1) % vq->vring.size;
506
507                         rte_vhost_set_inflight_desc_packed(task->ctrlr->vid,
508                                                            vq->id,
509                                                            task->req_idx,
510                                                            last_idx,
511                                                            &task->inflight_idx);
512
513                         process_blk_task(task);
514                 }
515         } else {
516                 while (vhost_blk_vq_is_avail(vq)) {
517                         uint16_t desc_idx;
518
519                         desc_idx = vq_get_desc_idx(vq);
520                         task = &vq->tasks[desc_idx];
521
522                         blk_task_init(task);
523                         rte_vhost_set_inflight_desc_split(task->ctrlr->vid,
524                                                           vq->id,
525                                                           task->req_idx);
526                         process_blk_task(task);
527                 }
528         }
529 }
530
531 static void *
532 ctrlr_worker(void *arg)
533 {
534         struct vhost_blk_ctrlr *ctrlr = (struct vhost_blk_ctrlr *)arg;
535         cpu_set_t cpuset;
536         pthread_t thread;
537         int i;
538
539         fprintf(stdout, "Ctrlr Worker Thread start\n");
540
541         if (ctrlr == NULL || ctrlr->bdev == NULL) {
542                 fprintf(stderr,
543                         "%s: Error, invalid argument passed to worker thread\n",
544                         __func__);
545                 exit(0);
546         }
547
548         thread = pthread_self();
549         CPU_ZERO(&cpuset);
550         CPU_SET(0, &cpuset);
551         pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
552
553         for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
554                 submit_inflight_vq(&ctrlr->queues[i]);
555
556         while (worker_thread_status != WORKER_STATE_STOP)
557                 for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
558                         process_vq(&ctrlr->queues[i]);
559
560         fprintf(stdout, "Ctrlr Worker Thread Exiting\n");
561         sem_post(&exit_sem);
562         return NULL;
563 }
564
565 static int
566 alloc_task_pool(struct vhost_blk_ctrlr *ctrlr)
567 {
568         struct vhost_blk_queue *vq;
569         int i, j;
570
571         for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
572                 vq = &ctrlr->queues[i];
573
574                 vq->tasks = rte_zmalloc(NULL,
575                         sizeof(struct vhost_blk_task) * vq->vring.size, 0);
576                 if (!vq->tasks) {
577                         fprintf(stderr, "Failed to allocate task memory\n");
578                         return -1;
579                 }
580
581                 for (j = 0; j < vq->vring.size; j++) {
582                         vq->tasks[j].req_idx = j;
583                         vq->tasks[j].ctrlr = ctrlr;
584                         vq->tasks[j].vq = vq;
585                 }
586         }
587
588         return 0;
589 }
590
591 static void
592 free_task_pool(struct vhost_blk_ctrlr *ctrlr)
593 {
594         int i;
595
596         for (i = 0; i < NUM_OF_BLK_QUEUES; i++)
597                 rte_free(ctrlr->queues[i].tasks);
598 }
599
600 static int
601 new_device(int vid)
602 {
603         struct vhost_blk_ctrlr *ctrlr;
604         struct vhost_blk_queue *vq;
605         char path[PATH_MAX];
606         uint64_t features;
607         pthread_t tid;
608         int i, ret;
609         bool packed_ring;
610
611         ret = rte_vhost_get_ifname(vid, path, PATH_MAX);
612         if (ret) {
613                 fprintf(stderr, "Failed to get the socket path\n");
614                 return -1;
615         }
616
617         ctrlr = vhost_blk_ctrlr_find(path);
618         if (!ctrlr) {
619                 fprintf(stderr, "Failed to find controller\n");
620                 return -1;
621         }
622
623         if (ctrlr->started)
624                 return 0;
625
626         ctrlr->vid = vid;
627         ret = rte_vhost_get_negotiated_features(vid, &features);
628         if (ret) {
629                 fprintf(stderr, "Failed to get the negotiated features\n");
630                 return -1;
631         }
632         packed_ring = !!(features & (1ULL << VIRTIO_F_RING_PACKED));
633
634         /* Disable Notifications and init last idx */
635         for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
636                 vq = &ctrlr->queues[i];
637                 vq->id = i;
638
639                 assert(rte_vhost_get_vhost_vring(ctrlr->vid, i,
640                                                  &vq->vring) == 0);
641                 assert(rte_vhost_get_vring_base(ctrlr->vid, i,
642                                                &vq->last_avail_idx,
643                                                &vq->last_used_idx) == 0);
644                 assert(rte_vhost_get_vhost_ring_inflight(ctrlr->vid, i,
645                                                 &vq->inflight_ring) == 0);
646
647                 if (packed_ring) {
648                         /* for the reconnection */
649                         assert(rte_vhost_get_vring_base_from_inflight(
650                                 ctrlr->vid, i,
651                                 &vq->last_avail_idx,
652                                 &vq->last_used_idx) == 0);
653
654                         vq->avail_wrap_counter = vq->last_avail_idx &
655                                 (1 << 15);
656                         vq->last_avail_idx = vq->last_avail_idx &
657                                 0x7fff;
658                         vq->used_wrap_counter = vq->last_used_idx &
659                                 (1 << 15);
660                         vq->last_used_idx = vq->last_used_idx &
661                                 0x7fff;
662                 }
663
664                 vq->packed_ring = packed_ring;
665                 rte_vhost_enable_guest_notification(vid, i, 0);
666         }
667
668         assert(rte_vhost_get_mem_table(vid, &ctrlr->mem) == 0);
669         assert(ctrlr->mem != NULL);
670         assert(alloc_task_pool(ctrlr) == 0);
671
672         /* start polling vring */
673         worker_thread_status = WORKER_STATE_START;
674         fprintf(stdout, "New Device %s, Device ID %d\n", path, vid);
675         if (pthread_create(&tid, NULL, &ctrlr_worker, ctrlr) < 0) {
676                 fprintf(stderr, "Worker Thread Started Failed\n");
677                 return -1;
678         }
679
680         /* device has been started */
681         ctrlr->started = 1;
682         pthread_detach(tid);
683         return 0;
684 }
685
686 static void
687 destroy_device(int vid)
688 {
689         char path[PATH_MAX];
690         struct vhost_blk_ctrlr *ctrlr;
691         struct vhost_blk_queue *vq;
692         int i, ret;
693
694         ret = rte_vhost_get_ifname(vid, path, PATH_MAX);
695         if (ret) {
696                 fprintf(stderr, "Destroy Ctrlr Failed\n");
697                 return;
698         }
699
700         fprintf(stdout, "Destroy %s Device ID %d\n", path, vid);
701         ctrlr = vhost_blk_ctrlr_find(path);
702         if (!ctrlr) {
703                 fprintf(stderr, "Destroy Ctrlr Failed\n");
704                 return;
705         }
706
707         if (!ctrlr->started)
708                 return;
709
710         worker_thread_status = WORKER_STATE_STOP;
711         sem_wait(&exit_sem);
712
713         for (i = 0; i < NUM_OF_BLK_QUEUES; i++) {
714                 vq = &ctrlr->queues[i];
715                 if (vq->packed_ring) {
716                         vq->last_avail_idx |= (vq->avail_wrap_counter <<
717                                 15);
718                         vq->last_used_idx |= (vq->used_wrap_counter <<
719                                 15);
720                 }
721
722                 rte_vhost_set_vring_base(ctrlr->vid, i,
723                                          vq->last_avail_idx,
724                                          vq->last_used_idx);
725         }
726
727         free_task_pool(ctrlr);
728         free(ctrlr->mem);
729
730         ctrlr->started = 0;
731 }
732
733 static int
734 new_connection(int vid)
735 {
736         /* extend the proper features for block device */
737         vhost_session_install_rte_compat_hooks(vid);
738
739         return 0;
740 }
741
742 struct vhost_device_ops vhost_blk_device_ops = {
743         .new_device =  new_device,
744         .destroy_device = destroy_device,
745         .new_connection = new_connection,
746 };
747
748 static struct vhost_block_dev *
749 vhost_blk_bdev_construct(const char *bdev_name,
750         const char *bdev_serial, uint32_t blk_size, uint64_t blk_cnt,
751         bool wce_enable)
752 {
753         struct vhost_block_dev *bdev;
754
755         bdev = rte_zmalloc(NULL, sizeof(*bdev), RTE_CACHE_LINE_SIZE);
756         if (!bdev)
757                 return NULL;
758
759         snprintf(bdev->name, sizeof(bdev->name), "%s", bdev_name);
760         snprintf(bdev->product_name, sizeof(bdev->product_name), "%s",
761                  bdev_serial);
762         bdev->blocklen = blk_size;
763         bdev->blockcnt = blk_cnt;
764         bdev->write_cache = wce_enable;
765
766         fprintf(stdout, "Blocklen=%d, blockcnt=%"PRIx64"\n", bdev->blocklen,
767                 bdev->blockcnt);
768
769         /* use memory as disk storage space */
770         bdev->data = rte_zmalloc(NULL, blk_cnt * blk_size, 0);
771         if (!bdev->data) {
772                 fprintf(stderr, "No enough reserved huge memory for disk\n");
773                 free(bdev);
774                 return NULL;
775         }
776
777         return bdev;
778 }
779
780 static struct vhost_blk_ctrlr *
781 vhost_blk_ctrlr_construct(const char *ctrlr_name)
782 {
783         int ret;
784         struct vhost_blk_ctrlr *ctrlr;
785         char *path;
786         char cwd[PATH_MAX];
787
788         /* always use current directory */
789         path = getcwd(cwd, PATH_MAX);
790         if (!path) {
791                 fprintf(stderr, "Cannot get current working directory\n");
792                 return NULL;
793         }
794         snprintf(dev_pathname, sizeof(dev_pathname), "%s/%s", path, ctrlr_name);
795
796         unlink(dev_pathname);
797
798         if (rte_vhost_driver_register(dev_pathname, 0) != 0) {
799                 fprintf(stderr, "Socket %s already exists\n", dev_pathname);
800                 return NULL;
801         }
802
803         ret = rte_vhost_driver_set_features(dev_pathname, VHOST_BLK_FEATURES);
804         if (ret != 0) {
805                 fprintf(stderr, "Set vhost driver features failed\n");
806                 rte_vhost_driver_unregister(dev_pathname);
807                 return NULL;
808         }
809
810         /* set vhost user protocol features */
811         vhost_dev_install_rte_compat_hooks(dev_pathname);
812
813         ctrlr = rte_zmalloc(NULL, sizeof(*ctrlr), RTE_CACHE_LINE_SIZE);
814         if (!ctrlr) {
815                 rte_vhost_driver_unregister(dev_pathname);
816                 return NULL;
817         }
818
819         /* hardcoded block device information with 128MiB */
820         ctrlr->bdev = vhost_blk_bdev_construct("malloc0", "vhost_blk_malloc0",
821                                                 4096, 32768, 0);
822         if (!ctrlr->bdev) {
823                 rte_free(ctrlr);
824                 rte_vhost_driver_unregister(dev_pathname);
825                 return NULL;
826         }
827
828         rte_vhost_driver_callback_register(dev_pathname,
829                                            &vhost_blk_device_ops);
830
831         return ctrlr;
832 }
833
834 static void
835 vhost_blk_ctrlr_destroy(struct vhost_blk_ctrlr *ctrlr)
836 {
837         if (ctrlr->bdev != NULL) {
838                 if (ctrlr->bdev->data != NULL)
839                         rte_free(ctrlr->bdev->data);
840
841                 rte_free(ctrlr->bdev);
842         }
843         rte_free(ctrlr);
844
845         rte_vhost_driver_unregister(dev_pathname);
846 }
847
848 static void
849 signal_handler(__rte_unused int signum)
850 {
851         struct vhost_blk_ctrlr *ctrlr;
852
853         ctrlr = vhost_blk_ctrlr_find(dev_pathname);
854         if (ctrlr == NULL)
855                 return;
856
857         if (ctrlr->started)
858                 destroy_device(ctrlr->vid);
859
860         vhost_blk_ctrlr_destroy(ctrlr);
861         exit(0);
862 }
863
864 int main(int argc, char *argv[])
865 {
866         int ret;
867
868         /* init EAL */
869         ret = rte_eal_init(argc, argv);
870         if (ret < 0)
871                 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
872
873         g_vhost_ctrlr = vhost_blk_ctrlr_construct(CTRLR_NAME);
874         if (g_vhost_ctrlr == NULL) {
875                 fprintf(stderr, "Construct vhost blk controller failed\n");
876                 return 0;
877         }
878
879         if (sem_init(&exit_sem, 0, 0) < 0) {
880                 fprintf(stderr, "Error init exit_sem\n");
881                 return -1;
882         }
883
884         signal(SIGINT, signal_handler);
885
886         ret = rte_vhost_driver_start(dev_pathname);
887         if (ret < 0) {
888                 fprintf(stderr, "Failed to start vhost driver.\n");
889                 return -1;
890         }
891
892         /* loop for exit the application */
893         while (1)
894                 sleep(1);
895
896         return 0;
897 }