net/mlx5: install a socket to exchange a file descriptor
authorXueming Li <xuemingl@mellanox.com>
Fri, 6 Oct 2017 15:45:49 +0000 (23:45 +0800)
committerFerruh Yigit <ferruh.yigit@intel.com>
Thu, 12 Oct 2017 00:36:57 +0000 (01:36 +0100)
Use a unix socket to get back the communication channel with the Kernel
driver from the primary process, this is necessary to remap those pages
in the secondary process memory space and thus use the same Tx queues.

This is only supported from rdma-core (v15).

Signed-off-by: Nelio Laranjeiro <nelio.laranjeiro@6wind.com>
Signed-off-by: Xueming Li <xuemingl@mellanox.com>
drivers/net/mlx5/Makefile
drivers/net/mlx5/mlx5.c
drivers/net/mlx5/mlx5.h
drivers/net/mlx5/mlx5_ethdev.c
drivers/net/mlx5/mlx5_rxtx.h
drivers/net/mlx5/mlx5_socket.c [new file with mode: 0644]
drivers/net/mlx5/mlx5_txq.c

index f75d344..816a9cc 100644 (file)
@@ -52,6 +52,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_MLX5_PMD) += mlx5_rss.c
 SRCS-$(CONFIG_RTE_LIBRTE_MLX5_PMD) += mlx5_fdir.c
 SRCS-$(CONFIG_RTE_LIBRTE_MLX5_PMD) += mlx5_mr.c
 SRCS-$(CONFIG_RTE_LIBRTE_MLX5_PMD) += mlx5_flow.c
+SRCS-$(CONFIG_RTE_LIBRTE_MLX5_PMD) += mlx5_socket.c
 
 # Basic CFLAGS.
 CFLAGS += -O3
@@ -88,6 +89,11 @@ ifdef CONFIG_RTE_LIBRTE_MLX5_TX_MP_CACHE
 CFLAGS += -DMLX5_PMD_TX_MP_CACHE=$(CONFIG_RTE_LIBRTE_MLX5_TX_MP_CACHE)
 endif
 
+# Disable false positive warning
+ifeq ($(CONFIG_RTE_TOOLCHAIN_ICC),y)
+CFLAGS_mlx5_txq.o += -wd3656
+endif
+
 include $(RTE_SDK)/mk/rte.lib.mk
 
 # Generate and clean-up mlx5_autoconf.h.
index a4b7184..f0e1099 100644 (file)
@@ -209,6 +209,7 @@ mlx5_dev_close(struct rte_eth_dev *dev)
        }
        if (priv->reta_idx != NULL)
                rte_free(priv->reta_idx);
+       priv_socket_uninit(priv);
        priv_unlock(priv);
        memset(priv, 0, sizeof(*priv));
 }
@@ -578,6 +579,40 @@ mlx5_pci_probe(struct rte_pci_driver *pci_drv, struct rte_pci_device *pci_dev)
                        .rx_vec_en = MLX5_ARG_UNSET,
                };
 
+               mlx5_dev[idx].ports |= test;
+
+               if (mlx5_is_secondary()) {
+                       /* from rte_ethdev.c */
+                       char name[RTE_ETH_NAME_MAX_LEN];
+
+                       snprintf(name, sizeof(name), "%s port %u",
+                                ibv_get_device_name(ibv_dev), port);
+                       eth_dev = rte_eth_dev_attach_secondary(name);
+                       if (eth_dev == NULL) {
+                               ERROR("can not attach rte ethdev");
+                               err = ENOMEM;
+                               goto error;
+                       }
+                       eth_dev->device = &pci_dev->device;
+                       eth_dev->dev_ops = NULL;
+                       priv = eth_dev->data->dev_private;
+                       /* Receive command fd from primary process */
+                       err = priv_socket_connect(priv);
+                       if (err < 0) {
+                               err = -err;
+                               goto error;
+                       }
+                       /* Remap UAR for Tx queues. */
+                       err = priv_tx_uar_remap(priv, err);
+                       if (err < 0) {
+                               err = -err;
+                               goto error;
+                       }
+                       priv_dev_select_rx_function(priv, eth_dev);
+                       priv_dev_select_tx_function(priv, eth_dev);
+                       continue;
+               }
+
                DEBUG("using port %u (%08" PRIx32 ")", port, test);
 
                ctx = ibv_open_device(ibv_dev);
index 78b27ed..1ce02e8 100644 (file)
@@ -151,6 +151,8 @@ struct priv {
        uint32_t link_speed_capa; /* Link speed capabilities. */
        struct mlx5_xstats_ctrl xstats_ctrl; /* Extended stats control. */
        rte_spinlock_t lock; /* Lock for control functions. */
+       int primary_socket; /* Unix socket for primary process. */
+       struct rte_intr_handle intr_handle_socket; /* Interrupt handler. */
 };
 
 /**
@@ -299,4 +301,11 @@ int priv_flow_start(struct priv *);
 void priv_flow_stop(struct priv *);
 int priv_flow_rxq_in_use(struct priv *, struct rxq *);
 
+/* mlx5_socket.c */
+
+int priv_socket_init(struct priv *priv);
+int priv_socket_uninit(struct priv *priv);
+void priv_socket_handle(struct priv *priv);
+int priv_socket_connect(struct priv *priv);
+
 #endif /* RTE_PMD_MLX5_H_ */
index 831d920..adcde9c 100644 (file)
@@ -31,6 +31,8 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define _GNU_SOURCE
+
 #include <stddef.h>
 #include <assert.h>
 #include <unistd.h>
@@ -50,6 +52,7 @@
 #include <linux/version.h>
 #include <fcntl.h>
 #include <stdalign.h>
+#include <sys/un.h>
 
 #include <rte_atomic.h>
 #include <rte_ethdev.h>
@@ -1236,6 +1239,23 @@ mlx5_dev_interrupt_handler(void *cb_arg)
                                              NULL);
 }
 
+/**
+ * Handle interrupts from the socket.
+ *
+ * @param cb_arg
+ *   Callback argument.
+ */
+static void
+mlx5_dev_handler_socket(void *cb_arg)
+{
+       struct rte_eth_dev *dev = cb_arg;
+       struct priv *priv = dev->data->dev_private;
+
+       priv_lock(priv);
+       priv_socket_handle(priv);
+       priv_unlock(priv);
+}
+
 /**
  * Uninstall interrupt handler.
  *
@@ -1247,17 +1267,20 @@ mlx5_dev_interrupt_handler(void *cb_arg)
 void
 priv_dev_interrupt_handler_uninstall(struct priv *priv, struct rte_eth_dev *dev)
 {
-       if (!dev->data->dev_conf.intr_conf.lsc &&
-               !dev->data->dev_conf.intr_conf.rmv)
-               return;
-       rte_intr_callback_unregister(&priv->intr_handle,
-                                    mlx5_dev_interrupt_handler,
-                                    dev);
+       if (dev->data->dev_conf.intr_conf.lsc ||
+           dev->data->dev_conf.intr_conf.rmv)
+               rte_intr_callback_unregister(&priv->intr_handle,
+                                            mlx5_dev_interrupt_handler, dev);
+       if (priv->primary_socket)
+               rte_intr_callback_unregister(&priv->intr_handle_socket,
+                                            mlx5_dev_handler_socket, dev);
        if (priv->pending_alarm)
                rte_eal_alarm_cancel(mlx5_dev_link_status_handler, dev);
        priv->pending_alarm = 0;
        priv->intr_handle.fd = 0;
        priv->intr_handle.type = RTE_INTR_HANDLE_UNKNOWN;
+       priv->intr_handle_socket.fd = 0;
+       priv->intr_handle_socket.type = RTE_INTR_HANDLE_UNKNOWN;
 }
 
 /**
@@ -1273,9 +1296,7 @@ priv_dev_interrupt_handler_install(struct priv *priv, struct rte_eth_dev *dev)
 {
        int rc, flags;
 
-       if (!dev->data->dev_conf.intr_conf.lsc &&
-               !dev->data->dev_conf.intr_conf.rmv)
-               return;
+       assert(!mlx5_is_secondary());
        assert(priv->ctx->async_fd > 0);
        flags = fcntl(priv->ctx->async_fd, F_GETFL);
        rc = fcntl(priv->ctx->async_fd, F_SETFL, flags | O_NONBLOCK);
@@ -1283,12 +1304,21 @@ priv_dev_interrupt_handler_install(struct priv *priv, struct rte_eth_dev *dev)
                INFO("failed to change file descriptor async event queue");
                dev->data->dev_conf.intr_conf.lsc = 0;
                dev->data->dev_conf.intr_conf.rmv = 0;
-       } else {
+       }
+       if (dev->data->dev_conf.intr_conf.lsc ||
+           dev->data->dev_conf.intr_conf.rmv) {
                priv->intr_handle.fd = priv->ctx->async_fd;
                priv->intr_handle.type = RTE_INTR_HANDLE_EXT;
                rte_intr_callback_register(&priv->intr_handle,
-                                          mlx5_dev_interrupt_handler,
-                                          dev);
+                                          mlx5_dev_interrupt_handler, dev);
+       }
+
+       rc = priv_socket_init(priv);
+       if (!rc && priv->primary_socket) {
+               priv->intr_handle_socket.fd = priv->primary_socket;
+               priv->intr_handle_socket.type = RTE_INTR_HANDLE_EXT;
+               rte_intr_callback_register(&priv->intr_handle_socket,
+                                          mlx5_dev_handler_socket, dev);
        }
 }
 
index d4c29cc..bd1d601 100644 (file)
@@ -286,6 +286,7 @@ struct txq_ctrl {
        struct ibv_qp *qp; /* Queue Pair. */
        unsigned int socket; /* CPU socket ID for allocations. */
        struct txq txq; /* Data path structure. */
+       off_t uar_mmap_offset; /* UAR mmap offset for non-primary process. */
 };
 
 /* mlx5_rxq.c */
@@ -319,6 +320,7 @@ int txq_ctrl_setup(struct rte_eth_dev *, struct txq_ctrl *, uint16_t,
 int mlx5_tx_queue_setup(struct rte_eth_dev *, uint16_t, uint16_t, unsigned int,
                        const struct rte_eth_txconf *);
 void mlx5_tx_queue_release(void *);
+int priv_tx_uar_remap(struct priv *priv, int fd);
 
 /* mlx5_rxtx.c */
 
diff --git a/drivers/net/mlx5/mlx5_socket.c b/drivers/net/mlx5/mlx5_socket.c
new file mode 100644 (file)
index 0000000..78b4138
--- /dev/null
@@ -0,0 +1,294 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright 2016 6WIND S.A.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of 6WIND S.A. nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#define _GNU_SOURCE
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "mlx5.h"
+#include "mlx5_utils.h"
+
+/**
+ * Initialise the socket to communicate with the secondary process
+ *
+ * @param[in] priv
+ *   Pointer to private structure.
+ *
+ * @return
+ *   0 on success, errno value on failure.
+ */
+int
+priv_socket_init(struct priv *priv)
+{
+       struct sockaddr_un sun = {
+               .sun_family = AF_UNIX,
+       };
+       int ret;
+       int flags;
+       struct stat file_stat;
+
+       /*
+        * Initialise the socket to communicate with the secondary
+        * process.
+        */
+       ret = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (ret < 0) {
+               WARN("secondary process not supported: %s", strerror(errno));
+               return ret;
+       }
+       priv->primary_socket = ret;
+       flags = fcntl(priv->primary_socket, F_GETFL, 0);
+       if (flags == -1)
+               goto out;
+       ret = fcntl(priv->primary_socket, F_SETFL, flags | O_NONBLOCK);
+       if (ret < 0)
+               goto out;
+       snprintf(sun.sun_path, sizeof(sun.sun_path), "/var/tmp/%s_%d",
+                MLX5_DRIVER_NAME, priv->primary_socket);
+       ret = stat(sun.sun_path, &file_stat);
+       if (!ret)
+               claim_zero(remove(sun.sun_path));
+       ret = bind(priv->primary_socket, (const struct sockaddr *)&sun,
+                  sizeof(sun));
+       if (ret < 0) {
+               WARN("cannot bind socket, secondary process not supported: %s",
+                    strerror(errno));
+               goto close;
+       }
+       ret = listen(priv->primary_socket, 0);
+       if (ret < 0) {
+               WARN("Secondary process not supported: %s", strerror(errno));
+               goto close;
+       }
+       return ret;
+close:
+       remove(sun.sun_path);
+out:
+       claim_zero(close(priv->primary_socket));
+       priv->primary_socket = 0;
+       return -(ret);
+}
+
+/**
+ * Un-Initialise the socket to communicate with the secondary process
+ *
+ * @param[in] priv
+ *   Pointer to private structure.
+ *
+ * @return
+ *   0 on success, errno value on failure.
+ */
+int
+priv_socket_uninit(struct priv *priv)
+{
+       MKSTR(path, "/var/tmp/%s_%d", MLX5_DRIVER_NAME, priv->primary_socket);
+       claim_zero(close(priv->primary_socket));
+       priv->primary_socket = 0;
+       claim_zero(remove(path));
+       return 0;
+}
+
+/**
+ * Handle socket interrupts.
+ *
+ * @param priv
+ *   Pointer to private structure.
+ */
+void
+priv_socket_handle(struct priv *priv)
+{
+       int conn_sock;
+       int ret = 0;
+       struct cmsghdr *cmsg = NULL;
+       struct ucred *cred = NULL;
+       char buf[CMSG_SPACE(sizeof(struct ucred))] = { 0 };
+       char vbuf[1024] = { 0 };
+       struct iovec io = {
+               .iov_base = vbuf,
+               .iov_len = sizeof(*vbuf),
+       };
+       struct msghdr msg = {
+               .msg_iov = &io,
+               .msg_iovlen = 1,
+               .msg_control = buf,
+               .msg_controllen = sizeof(buf),
+       };
+       int *fd;
+
+       /* Accept the connection from the client. */
+       conn_sock = accept(priv->primary_socket, NULL, NULL);
+       if (conn_sock < 0) {
+               WARN("connection failed: %s", strerror(errno));
+               return;
+       }
+       ret = setsockopt(conn_sock, SOL_SOCKET, SO_PASSCRED, &(int){1},
+                                        sizeof(int));
+       if (ret < 0) {
+               WARN("cannot change socket options");
+               goto out;
+       }
+       ret = recvmsg(conn_sock, &msg, MSG_WAITALL);
+       if (ret < 0) {
+               WARN("received an empty message: %s", strerror(errno));
+               goto out;
+       }
+       /* Expect to receive credentials only. */
+       cmsg = CMSG_FIRSTHDR(&msg);
+       if (cmsg == NULL) {
+               WARN("no message");
+               goto out;
+       }
+       if ((cmsg->cmsg_type == SCM_CREDENTIALS) &&
+               (cmsg->cmsg_len >= sizeof(*cred))) {
+               cred = (struct ucred *)CMSG_DATA(cmsg);
+               assert(cred != NULL);
+       }
+       cmsg = CMSG_NXTHDR(&msg, cmsg);
+       if (cmsg != NULL) {
+               WARN("Message wrongly formated");
+               goto out;
+       }
+       /* Make sure all the ancillary data was received and valid. */
+       if ((cred == NULL) || (cred->uid != getuid()) ||
+           (cred->gid != getgid())) {
+               WARN("wrong credentials");
+               goto out;
+       }
+       /* Set-up the ancillary data. */
+       cmsg = CMSG_FIRSTHDR(&msg);
+       assert(cmsg != NULL);
+       cmsg->cmsg_level = SOL_SOCKET;
+       cmsg->cmsg_type = SCM_RIGHTS;
+       cmsg->cmsg_len = CMSG_LEN(sizeof(priv->ctx->cmd_fd));
+       fd = (int *)CMSG_DATA(cmsg);
+       *fd = priv->ctx->cmd_fd;
+       ret = sendmsg(conn_sock, &msg, 0);
+       if (ret < 0)
+               WARN("cannot send response");
+out:
+       close(conn_sock);
+}
+
+/**
+ * Connect to the primary process.
+ *
+ * @param[in] priv
+ *   Pointer to private structure.
+ *
+ * @return
+ *   fd on success, negative errno value on failure.
+ */
+int
+priv_socket_connect(struct priv *priv)
+{
+       struct sockaddr_un sun = {
+               .sun_family = AF_UNIX,
+       };
+       int socket_fd;
+       int *fd = NULL;
+       int ret;
+       struct ucred *cred;
+       char buf[CMSG_SPACE(sizeof(*cred))] = { 0 };
+       char vbuf[1024] = { 0 };
+       struct iovec io = {
+               .iov_base = vbuf,
+               .iov_len = sizeof(*vbuf),
+       };
+       struct msghdr msg = {
+               .msg_control = buf,
+               .msg_controllen = sizeof(buf),
+               .msg_iov = &io,
+               .msg_iovlen = 1,
+       };
+       struct cmsghdr *cmsg;
+
+       ret = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (ret < 0) {
+               WARN("cannot connect to primary");
+               return ret;
+       }
+       socket_fd = ret;
+       snprintf(sun.sun_path, sizeof(sun.sun_path), "/var/tmp/%s_%d",
+                MLX5_DRIVER_NAME, priv->primary_socket);
+       ret = connect(socket_fd, (const struct sockaddr *)&sun, sizeof(sun));
+       if (ret < 0) {
+               WARN("cannot connect to primary");
+               goto out;
+       }
+       cmsg = CMSG_FIRSTHDR(&msg);
+       if (cmsg == NULL) {
+               DEBUG("cannot get first message");
+               goto out;
+       }
+       cmsg->cmsg_level = SOL_SOCKET;
+       cmsg->cmsg_type = SCM_CREDENTIALS;
+       cmsg->cmsg_len = CMSG_LEN(sizeof(*cred));
+       cred = (struct ucred *)CMSG_DATA(cmsg);
+       if (cred == NULL) {
+               DEBUG("no credentials received");
+               goto out;
+       }
+       cred->pid = getpid();
+       cred->uid = getuid();
+       cred->gid = getgid();
+       ret = sendmsg(socket_fd, &msg, MSG_DONTWAIT);
+       if (ret < 0) {
+               WARN("cannot send credentials to primary: %s",
+                    strerror(errno));
+               goto out;
+       }
+       ret = recvmsg(socket_fd, &msg, MSG_WAITALL);
+       if (ret <= 0) {
+               WARN("no message from primary: %s", strerror(errno));
+               goto out;
+       }
+       cmsg = CMSG_FIRSTHDR(&msg);
+       if (cmsg == NULL) {
+               WARN("No file descriptor received");
+               goto out;
+       }
+       fd = (int *)CMSG_DATA(cmsg);
+       if (*fd <= 0) {
+               WARN("no file descriptor received: %s", strerror(errno));
+               ret = *fd;
+               goto out;
+       }
+       ret = *fd;
+out:
+       close(socket_fd);
+       return ret;
+}
index 39a38c1..1b45b4a 100644 (file)
@@ -36,6 +36,8 @@
 #include <errno.h>
 #include <string.h>
 #include <stdint.h>
+#include <unistd.h>
+#include <sys/mman.h>
 
 /* Verbs header. */
 /* ISO C doesn't support unnamed structs/unions, disabling -pedantic. */
@@ -168,6 +170,7 @@ txq_setup(struct txq_ctrl *tmpl, struct txq_ctrl *txq_ctrl)
        struct mlx5dv_obj obj;
        int ret = 0;
 
+       qp.comp_mask = MLX5DV_QP_MASK_UAR_MMAP_OFFSET;
        obj.cq.in = ibcq;
        obj.cq.out = &cq_info;
        obj.qp.in = tmpl->qp;
@@ -194,6 +197,13 @@ txq_setup(struct txq_ctrl *tmpl, struct txq_ctrl *txq_ctrl)
        tmpl->txq.elts =
                (struct rte_mbuf *(*)[1 << tmpl->txq.elts_n])
                ((uintptr_t)txq_ctrl + sizeof(*txq_ctrl));
+       if (qp.comp_mask | MLX5DV_QP_MASK_UAR_MMAP_OFFSET) {
+               tmpl->uar_mmap_offset = qp.uar_mmap_offset;
+       } else {
+               ERROR("Failed to retrieve UAR info, invalid libmlx5.so version");
+               return EINVAL;
+       }
+
        return 0;
 }
 
@@ -557,3 +567,59 @@ mlx5_tx_queue_release(void *dpdk_txq)
        rte_free(txq_ctrl);
        priv_unlock(priv);
 }
+
+
+/**
+ * Map locally UAR used in Tx queues for BlueFlame doorbell.
+ *
+ * @param[in] priv
+ *   Pointer to private structure.
+ * @param fd
+ *   Verbs file descriptor to map UAR pages.
+ *
+ * @return
+ *   0 on success, errno value on failure.
+ */
+int
+priv_tx_uar_remap(struct priv *priv, int fd)
+{
+       unsigned int i, j;
+       uintptr_t pages[priv->txqs_n];
+       unsigned int pages_n = 0;
+       uintptr_t uar_va;
+       void *addr;
+       struct txq *txq;
+       struct txq_ctrl *txq_ctrl;
+       int already_mapped;
+       size_t page_size = sysconf(_SC_PAGESIZE);
+
+       /*
+        * As rdma-core, UARs are mapped in size of OS page size.
+        * Use aligned address to avoid duplicate mmap.
+        * Ref to libmlx5 function: mlx5_init_context()
+        */
+       for (i = 0; i != priv->txqs_n; ++i) {
+               txq = (*priv->txqs)[i];
+               txq_ctrl = container_of(txq, struct txq_ctrl, txq);
+               uar_va = (uintptr_t)txq_ctrl->txq.bf_reg;
+               uar_va = RTE_ALIGN_FLOOR(uar_va, page_size);
+               already_mapped = 0;
+               for (j = 0; j != pages_n; ++j) {
+                       if (pages[j] == uar_va) {
+                               already_mapped = 1;
+                               break;
+                       }
+               }
+               if (already_mapped)
+                       continue;
+               pages[pages_n++] = uar_va;
+               addr = mmap((void *)uar_va, page_size,
+                           PROT_WRITE, MAP_FIXED | MAP_SHARED, fd,
+                           txq_ctrl->uar_mmap_offset);
+               if (addr != (void *)uar_va) {
+                       ERROR("call to mmap failed on UAR for txq %d\n", i);
+                       return -1;
+               }
+       }
+       return 0;
+}