telemetry: add client feature and sockets
authorCiara Power <ciara.power@intel.com>
Sat, 27 Oct 2018 09:17:43 +0000 (10:17 +0100)
committerThomas Monjalon <thomas@monjalon.net>
Sat, 27 Oct 2018 13:18:23 +0000 (15:18 +0200)
This patch introduces clients to the telemetry API.

When a client makes a connection through the initial telemetry
socket, they can send a message through the socket to be
parsed. Register messages are expected through this socket, to
enable clients to register and have a client socket setup for
future communications.

A TAILQ is used to store all clients information. Using this, the
client sockets are polled for messages, which will later be parsed
and dealt with accordingly.

Functionality that make use of the client sockets were introduced
in this patch also, such as writing to client sockets, and sending
error responses.

Signed-off-by: Ciara Power <ciara.power@intel.com>
Signed-off-by: Brian Archbold <brian.archbold@intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
devtools/test-build.sh
lib/librte_telemetry/Makefile
lib/librte_telemetry/meson.build
lib/librte_telemetry/rte_telemetry.c
lib/librte_telemetry/rte_telemetry_internal.h
mk/rte.app.mk

index d26a9f3..42f4ad0 100755 (executable)
@@ -10,6 +10,7 @@ default_path=$PATH
 # - DPDK_DEP_ARCHIVE
 # - DPDK_DEP_CFLAGS
 # - DPDK_DEP_ISAL (y/[n])
+# - DPDK_DEP_JSON (y/[n])
 # - DPDK_DEP_LDFLAGS
 # - DPDK_DEP_MLX (y/[n])
 # - DPDK_DEP_NUMA ([y]/n)
@@ -96,6 +97,7 @@ reset_env ()
        unset DPDK_DEP_ARCHIVE
        unset DPDK_DEP_CFLAGS
        unset DPDK_DEP_ISAL
+       unset DPDK_DEP_JSON
        unset DPDK_DEP_LDFLAGS
        unset DPDK_DEP_MLX
        unset DPDK_DEP_NUMA
@@ -184,6 +186,8 @@ config () # <directory> <target> <options>
                sed -ri          's,(MVPP2_PMD=)n,\1y,' $1/.config
                test -z "$LIBMUSDK_PATH" || \
                sed -ri         's,(MVNETA_PMD=)n,\1y,' $1/.config
+               test -z "$DPDK_DEP_JSON" || \
+               sed -ri          's,(TELEMETRY=)n,\1y,' $1/.config
                build_config_hook $1 $2 $3
 
                # Explicit enabler/disabler (uppercase)
index a2d4ff1..0d61361 100644 (file)
@@ -13,6 +13,7 @@ CFLAGS += -DALLOW_EXPERIMENTAL_API
 LDLIBS += -lrte_eal -lrte_ethdev
 LDLIBS += -lrte_metrics
 LDLIBS += -lpthread
+LDLIBS += -ljansson
 
 EXPORT_MAP := rte_telemetry_version.map
 
index 7716076..7b93980 100644 (file)
@@ -5,3 +5,10 @@ sources = files('rte_telemetry.c')
 headers = files('rte_telemetry.h', 'rte_telemetry_internal.h')
 deps += ['metrics', 'ethdev']
 cflags += '-DALLOW_EXPERIMENTAL_API'
+
+jansson = cc.find_library('jansson', required: false)
+if jansson.found()
+       ext_deps += jansson
+else
+       build = false
+endif
index 2dabd3e..f29c2ae 100644 (file)
@@ -7,6 +7,7 @@
 #include <pthread.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <jansson.h>
 
 #include <rte_eal.h>
 #include <rte_ethdev.h>
@@ -18,6 +19,7 @@
 #include "rte_telemetry_internal.h"
 
 #define BUF_SIZE 1024
+#define ACTION_POST 1
 #define SLEEP_TIME 10
 
 static telemetry_impl *static_telemetry;
@@ -39,6 +41,91 @@ rte_telemetry_is_port_active(int port_id)
 
        TELEMETRY_LOG_ERR("port_id: %d is invalid, not active",
                port_id);
+
+       return 0;
+}
+
+int32_t
+rte_telemetry_write_to_socket(struct telemetry_impl *telemetry,
+       const char *json_string)
+{
+       int ret;
+
+       if (telemetry == NULL) {
+               TELEMETRY_LOG_ERR("Could not initialise TELEMETRY_API");
+               return -1;
+       }
+
+       if (telemetry->request_client == NULL) {
+               TELEMETRY_LOG_ERR("No client has been chosen to write to");
+               return -1;
+       }
+
+       if (json_string == NULL) {
+               TELEMETRY_LOG_ERR("Invalid JSON string!");
+               return -1;
+       }
+
+       ret = send(telemetry->request_client->fd,
+                       json_string, strlen(json_string), 0);
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Failed to write to socket for client: %s",
+                               telemetry->request_client->file_path);
+               return -1;
+       }
+
+       return 0;
+}
+
+int32_t
+rte_telemetry_send_error_response(struct telemetry_impl *telemetry,
+       int error_type)
+{
+       int ret;
+       const char *status_code, *json_buffer;
+       json_t *root;
+
+       if (error_type == -EPERM)
+               status_code = "Status Error: Unknown";
+       else if (error_type == -EINVAL)
+               status_code = "Status Error: Invalid Argument 404";
+       else if (error_type == -ENOMEM)
+               status_code = "Status Error: Memory Allocation Error";
+       else {
+               TELEMETRY_LOG_ERR("Invalid error type");
+               return -EINVAL;
+       }
+
+       root = json_object();
+
+       if (root == NULL) {
+               TELEMETRY_LOG_ERR("Could not create root JSON object");
+               return -EPERM;
+       }
+
+       ret = json_object_set_new(root, "status_code", json_string(status_code));
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Status code field cannot be set");
+               json_decref(root);
+               return -EPERM;
+       }
+
+       ret = json_object_set_new(root, "data", json_null());
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Data field cannot be set");
+               json_decref(root);
+               return -EPERM;
+       }
+
+       json_buffer = json_dumps(root, JSON_INDENT(2));
+       json_decref(root);
+
+       ret = rte_telemetry_write_to_socket(telemetry, json_buffer);
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Could not write to socket");
+               return -EPERM;
+       }
+
        return 0;
 }
 
@@ -115,8 +202,7 @@ rte_telemetry_initial_accept(struct telemetry_impl *telemetry)
        uint16_t pid;
 
        RTE_ETH_FOREACH_DEV(pid) {
-               telemetry->reg_index =
-                       rte_telemetry_reg_ethdev_to_metrics(pid);
+               telemetry->reg_index = rte_telemetry_reg_ethdev_to_metrics(pid);
                break;
        }
 
@@ -130,6 +216,38 @@ rte_telemetry_initial_accept(struct telemetry_impl *telemetry)
        return 0;
 }
 
+static int32_t
+rte_telemetry_read_client(struct telemetry_impl *telemetry)
+{
+       char buf[BUF_SIZE];
+       int ret, buffer_read;
+
+       buffer_read = read(telemetry->accept_fd, buf, BUF_SIZE-1);
+
+       if (buffer_read == -1) {
+               TELEMETRY_LOG_ERR("Read error");
+               return -1;
+       } else if (buffer_read == 0) {
+               goto close_socket;
+       } else {
+               buf[buffer_read] = '\0';
+               ret = rte_telemetry_parse_client_message(telemetry, buf);
+               if (ret < 0)
+                       TELEMETRY_LOG_WARN("Parse message failed");
+               goto close_socket;
+       }
+
+close_socket:
+       if (close(telemetry->accept_fd) < 0) {
+               TELEMETRY_LOG_ERR("Close TELEMETRY socket failed");
+               free(telemetry);
+               return -EPERM;
+       }
+       telemetry->accept_fd = 0;
+
+       return 0;
+}
+
 static int32_t
 rte_telemetry_accept_new_client(struct telemetry_impl *telemetry)
 {
@@ -141,8 +259,8 @@ rte_telemetry_accept_new_client(struct telemetry_impl *telemetry)
                        TELEMETRY_LOG_ERR("Listening error with server fd");
                        return -1;
                }
-               telemetry->accept_fd = accept(telemetry->server_fd, NULL, NULL);
 
+               telemetry->accept_fd = accept(telemetry->server_fd, NULL, NULL);
                if (telemetry->accept_fd >= 0 &&
                        telemetry->metrics_register_done == 0) {
                        ret = rte_telemetry_initial_accept(telemetry);
@@ -151,6 +269,31 @@ rte_telemetry_accept_new_client(struct telemetry_impl *telemetry)
                                return -1;
                        }
                }
+       } else {
+               ret = rte_telemetry_read_client(telemetry);
+               if (ret < 0) {
+                       TELEMETRY_LOG_ERR("Failed to read socket buffer");
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+static int32_t
+rte_telemetry_read_client_sockets(struct telemetry_impl *telemetry)
+{
+       telemetry_client *client;
+       char client_buf[BUF_SIZE];
+       int bytes;
+
+       TAILQ_FOREACH(client, &telemetry->client_list_head, client_list) {
+               bytes = read(client->fd, client_buf, BUF_SIZE-1);
+
+               if (bytes > 0) {
+                       client_buf[bytes] = '\0';
+                       telemetry->request_client = client;
+               }
        }
 
        return 0;
@@ -173,6 +316,12 @@ rte_telemetry_run(void *userdata)
                return -1;
        }
 
+       ret = rte_telemetry_read_client_sockets(telemetry);
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Client socket read failed");
+               return -1;
+       }
+
        return 0;
 }
 
@@ -291,6 +440,7 @@ rte_telemetry_init()
                        TELEMETRY_LOG_ERR("TELEMETRY cleanup failed");
                return -EPERM;
        }
+       TAILQ_INIT(&static_telemetry->client_list_head);
 
        ret = rte_ctrl_thread_create(&static_telemetry->thread_id,
                telemetry_ctrl_thread, &attr, rte_telemetry_run_thread_func,
@@ -307,11 +457,39 @@ rte_telemetry_init()
        return 0;
 }
 
+static int32_t
+rte_telemetry_client_cleanup(struct telemetry_client *client)
+{
+       int ret;
+
+       ret = close(client->fd);
+       free(client->file_path);
+       free(client);
+
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Close client socket failed");
+               return -EPERM;
+       }
+
+       return 0;
+}
+
 int32_t __rte_experimental
 rte_telemetry_cleanup(void)
 {
        int ret;
        struct telemetry_impl *telemetry = static_telemetry;
+       telemetry_client *client, *temp_client;
+
+       TAILQ_FOREACH_SAFE(client, &telemetry->client_list_head, client_list,
+               temp_client) {
+               TAILQ_REMOVE(&telemetry->client_list_head, client, client_list);
+               ret = rte_telemetry_client_cleanup(client);
+               if (ret < 0) {
+                       TELEMETRY_LOG_ERR("Client cleanup failed");
+                       return -EPERM;
+               }
+       }
 
        ret = close(telemetry->server_fd);
        if (ret < 0) {
@@ -328,6 +506,192 @@ rte_telemetry_cleanup(void)
        return 0;
 }
 
+int32_t
+rte_telemetry_unregister_client(struct telemetry_impl *telemetry,
+       const char *client_path)
+{
+       int ret;
+       telemetry_client *client, *temp_client;
+
+       if (telemetry == NULL) {
+               TELEMETRY_LOG_WARN("TELEMETRY is not initialised");
+               return -ENODEV;
+       }
+
+       if (client_path == NULL) {
+               TELEMETRY_LOG_ERR("Invalid client path");
+               goto einval_fail;
+       }
+
+       if (TAILQ_EMPTY(&telemetry->client_list_head)) {
+               TELEMETRY_LOG_ERR("There are no clients currently registered");
+               return -EPERM;
+       }
+
+       TAILQ_FOREACH_SAFE(client, &telemetry->client_list_head, client_list,
+                       temp_client) {
+               if (strcmp(client_path, client->file_path) == 0) {
+                       TAILQ_REMOVE(&telemetry->client_list_head, client,
+                               client_list);
+                       ret = rte_telemetry_client_cleanup(client);
+
+                       if (ret < 0) {
+                               TELEMETRY_LOG_ERR("Client cleanup failed");
+                               return -EPERM;
+                       }
+
+                       return 0;
+               }
+       }
+
+       TELEMETRY_LOG_WARN("Couldn't find client, possibly not registered yet.");
+       return -1;
+
+einval_fail:
+       ret = rte_telemetry_send_error_response(telemetry, -EINVAL);
+       if (ret < 0)
+               TELEMETRY_LOG_ERR("Could not send error");
+       return -EINVAL;
+}
+
+int32_t
+rte_telemetry_register_client(struct telemetry_impl *telemetry,
+       const char *client_path)
+{
+       int ret, fd;
+       struct sockaddr_un addrs;
+
+       if (telemetry == NULL) {
+               TELEMETRY_LOG_ERR("Could not initialize TELEMETRY API");
+               return -ENODEV;
+       }
+
+       if (client_path == NULL) {
+               TELEMETRY_LOG_ERR("Invalid client path");
+               return -EINVAL;
+       }
+
+       telemetry_client *client;
+       TAILQ_FOREACH(client, &telemetry->client_list_head, client_list) {
+               if (strcmp(client_path, client->file_path) == 0) {
+                       TELEMETRY_LOG_WARN("'%s' already registered",
+                                       client_path);
+                       return -EINVAL;
+               }
+       }
+
+       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+       if (fd == -1) {
+               TELEMETRY_LOG_ERR("Client socket error");
+               return -EACCES;
+       }
+
+       ret = rte_telemetry_set_socket_nonblock(fd);
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Could not set socket to NONBLOCK");
+               return -EPERM;
+       }
+
+       addrs.sun_family = AF_UNIX;
+       strlcpy(addrs.sun_path, client_path, sizeof(addrs.sun_path));
+       telemetry_client *new_client = malloc(sizeof(telemetry_client));
+       new_client->file_path = strdup(client_path);
+       new_client->fd = fd;
+
+       if (connect(fd, (struct sockaddr *)&addrs, sizeof(addrs)) == -1) {
+               TELEMETRY_LOG_ERR("TELEMETRY client connect to %s didn't work",
+                               client_path);
+               ret = rte_telemetry_client_cleanup(new_client);
+               if (ret < 0) {
+                       TELEMETRY_LOG_ERR("Client cleanup failed");
+                       return -EPERM;
+               }
+               return -EINVAL;
+       }
+
+       TAILQ_INSERT_HEAD(&telemetry->client_list_head, new_client, client_list);
+
+       return 0;
+}
+
+int32_t
+rte_telemetry_parse_client_message(struct telemetry_impl *telemetry, char *buf)
+{
+       int ret, action_int;
+       json_error_t error;
+       json_t *root = json_loads(buf, 0, &error);
+
+       if (root == NULL) {
+               TELEMETRY_LOG_WARN("Could not load JSON object from data passed in : %s",
+                               error.text);
+               goto fail;
+       } else if (!json_is_object(root)) {
+               TELEMETRY_LOG_WARN("JSON Request is not a JSON object");
+               goto fail;
+       }
+
+       json_t *action = json_object_get(root, "action");
+       if (action == NULL) {
+               TELEMETRY_LOG_WARN("Request does not have action field");
+               goto fail;
+       } else if (!json_is_integer(action)) {
+               TELEMETRY_LOG_WARN("Action value is not an integer");
+               goto fail;
+       }
+
+       json_t *command = json_object_get(root, "command");
+       if (command == NULL) {
+               TELEMETRY_LOG_WARN("Request does not have command field");
+               goto fail;
+       } else if (!json_is_string(command)) {
+               TELEMETRY_LOG_WARN("Command value is not a string");
+               goto fail;
+       }
+
+       action_int = json_integer_value(action);
+       if (action_int != ACTION_POST) {
+               TELEMETRY_LOG_WARN("Invalid action code");
+               goto fail;
+       }
+
+       if (strcmp(json_string_value(command), "clients") != 0) {
+               TELEMETRY_LOG_WARN("Invalid command");
+               goto fail;
+       }
+
+       json_t *data = json_object_get(root, "data");
+       if (data == NULL) {
+               TELEMETRY_LOG_WARN("Request does not have data field");
+               goto fail;
+       }
+
+       json_t *client_path = json_object_get(data, "client_path");
+       if (client_path == NULL) {
+               TELEMETRY_LOG_WARN("Request does not have client_path field");
+               goto fail;
+       }
+
+       if (!json_is_string(client_path)) {
+               TELEMETRY_LOG_WARN("Client_path value is not a string");
+               goto fail;
+       }
+
+       ret = rte_telemetry_register_client(telemetry,
+                       json_string_value(client_path));
+       if (ret < 0) {
+               TELEMETRY_LOG_ERR("Could not register client");
+               telemetry->register_fail_count++;
+               goto fail;
+       }
+
+       return 0;
+
+fail:
+       TELEMETRY_LOG_WARN("Client attempted to register with invalid message");
+       json_decref(root);
+       return -1;
+}
+
 int telemetry_log_level;
 
 static struct rte_option option = {
index 569d56a..e3292cf 100644 (file)
@@ -3,6 +3,7 @@
  */
 
 #include <rte_log.h>
+#include <rte_tailq.h>
 
 #ifndef _RTE_TELEMETRY_INTERNAL_H_
 #define _RTE_TELEMETRY_INTERNAL_H_
@@ -23,6 +24,12 @@ extern int telemetry_log_level;
 #define TELEMETRY_LOG_INFO(fmt, args...) \
        TELEMETRY_LOG(INFO, fmt, ## args)
 
+typedef struct telemetry_client {
+       char *file_path;
+       int fd;
+       TAILQ_ENTRY(telemetry_client) client_list;
+} telemetry_client;
+
 typedef struct telemetry_impl {
        int accept_fd;
        int server_fd;
@@ -31,6 +38,24 @@ typedef struct telemetry_impl {
        uint32_t socket_id;
        int reg_index;
        int metrics_register_done;
+       TAILQ_HEAD(, telemetry_client) client_list_head;
+       struct telemetry_client *request_client;
+       int register_fail_count;
 } telemetry_impl;
 
+int32_t
+rte_telemetry_parse_client_message(struct telemetry_impl *telemetry, char *buf);
+
+int32_t
+rte_telemetry_send_error_response(struct telemetry_impl *telemetry,
+       int error_type);
+
+int32_t
+rte_telemetry_register_client(struct telemetry_impl *telemetry,
+       const char *client_path);
+
+int32_t
+rte_telemetry_unregister_client(struct telemetry_impl *telemetry,
+       const char *client_path);
+
 #endif
index 39969f4..3ebc4e6 100644 (file)
@@ -51,7 +51,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_ACL)            += --whole-archive
 _LDLIBS-$(CONFIG_RTE_LIBRTE_ACL)            += -lrte_acl
 _LDLIBS-$(CONFIG_RTE_LIBRTE_ACL)            += --no-whole-archive
 _LDLIBS-$(CONFIG_RTE_LIBRTE_TELEMETRY)      += --whole-archive
-_LDLIBS-$(CONFIG_RTE_LIBRTE_TELEMETRY)      += -lrte_telemetry
+_LDLIBS-$(CONFIG_RTE_LIBRTE_TELEMETRY)      += -lrte_telemetry -ljansson
 _LDLIBS-$(CONFIG_RTE_LIBRTE_TELEMETRY)      += --no-whole-archive
 _LDLIBS-$(CONFIG_RTE_LIBRTE_JOBSTATS)       += -lrte_jobstats
 _LDLIBS-$(CONFIG_RTE_LIBRTE_METRICS)        += -lrte_metrics