+ return 0;
+}
+
+static int32_t
+rte_telemetry_read_client_sockets(struct telemetry_impl *telemetry)
+{
+ int ret;
+ telemetry_client *client;
+ char client_buf[BUF_SIZE];
+ int bytes;
+
+ TAILQ_FOREACH(client, &telemetry->client_list_head, client_list) {
+ bytes = read(client->fd, client_buf, BUF_SIZE-1);
+
+ if (bytes > 0) {
+ client_buf[bytes] = '\0';
+ telemetry->request_client = client;
+ ret = rte_telemetry_parse(telemetry, client_buf);
+ if (ret < 0) {
+ TELEMETRY_LOG_WARN("Parse socket input failed: %i",
+ ret);
+ return -1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int32_t
+rte_telemetry_run(void *userdata)
+{
+ int ret;
+ struct telemetry_impl *telemetry = userdata;
+
+ if (telemetry == NULL) {
+ TELEMETRY_LOG_WARN("TELEMETRY could not be initialised");
+ return -1;
+ }
+
+ ret = rte_telemetry_accept_new_client(telemetry);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Accept and read new client failed");
+ return -1;
+ }
+
+ ret = rte_telemetry_read_client_sockets(telemetry);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Client socket read failed");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void
+*rte_telemetry_run_thread_func(void *userdata)
+{
+ int ret;
+ struct telemetry_impl *telemetry = userdata;
+
+ if (telemetry == NULL) {
+ TELEMETRY_LOG_ERR("%s passed a NULL instance", __func__);
+ pthread_exit(0);
+ }
+
+ while (telemetry->thread_status) {
+ rte_telemetry_run(telemetry);
+ ret = usleep(SLEEP_TIME);
+ if (ret < 0)
+ TELEMETRY_LOG_ERR("Calling thread could not be put to sleep");
+ }
+ pthread_exit(0);
+}
+
+static int32_t
+rte_telemetry_set_socket_nonblock(int fd)
+{
+ int flags;
+
+ if (fd < 0) {
+ TELEMETRY_LOG_ERR("Invalid fd provided");
+ return -1;
+ }
+
+ flags = fcntl(fd, F_GETFL, 0);
+ if (flags < 0)
+ flags = 0;
+
+ return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+}
+
+static int32_t
+rte_telemetry_create_socket(struct telemetry_impl *telemetry)
+{
+ int ret;
+ struct sockaddr_un addr;
+ char socket_path[BUF_SIZE];
+
+ if (telemetry == NULL)
+ return -1;
+
+ telemetry->server_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ if (telemetry->server_fd == -1) {
+ TELEMETRY_LOG_ERR("Failed to open socket");
+ return -1;
+ }
+
+ ret = rte_telemetry_set_socket_nonblock(telemetry->server_fd);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not set socket to NONBLOCK");
+ goto close_socket;
+ }
+
+ addr.sun_family = AF_UNIX;
+ rte_telemetry_get_runtime_dir(socket_path, sizeof(socket_path));
+ strlcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
+ unlink(socket_path);
+
+ if (bind(telemetry->server_fd, (struct sockaddr *)&addr,
+ sizeof(addr)) < 0) {
+ TELEMETRY_LOG_ERR("Socket binding error");
+ goto close_socket;
+ }
+
+ return 0;
+
+close_socket:
+ if (close(telemetry->server_fd) < 0) {
+ TELEMETRY_LOG_ERR("Close TELEMETRY socket failed");
+ return -EPERM;
+ }
+
+ return -1;
+}
+
+int32_t
+rte_telemetry_init(void)
+{
+ int ret;
+ pthread_attr_t attr;
+ const char *telemetry_ctrl_thread = "telemetry";
+
+ if (static_telemetry) {
+ TELEMETRY_LOG_WARN("TELEMETRY structure already initialised");
+ return -EALREADY;
+ }
+
+ static_telemetry = calloc(1, sizeof(struct telemetry_impl));
+ if (static_telemetry == NULL) {
+ TELEMETRY_LOG_ERR("Memory could not be allocated");
+ return -ENOMEM;
+ }
+
+ static_telemetry->socket_id = rte_socket_id();
+ rte_metrics_init(static_telemetry->socket_id);
+
+ ret = pthread_attr_init(&attr);
+ if (ret != 0) {
+ TELEMETRY_LOG_ERR("Pthread attribute init failed");
+ return -EPERM;
+ }
+
+ ret = rte_telemetry_create_socket(static_telemetry);
+ if (ret < 0) {
+ ret = rte_telemetry_cleanup();
+ if (ret < 0)
+ TELEMETRY_LOG_ERR("TELEMETRY cleanup failed");
+ return -EPERM;
+ }
+ TAILQ_INIT(&static_telemetry->client_list_head);
+
+ ret = rte_ctrl_thread_create(&static_telemetry->thread_id,
+ telemetry_ctrl_thread, &attr, rte_telemetry_run_thread_func,
+ (void *)static_telemetry);
+ static_telemetry->thread_status = 1;
+
+ if (ret < 0) {
+ ret = rte_telemetry_cleanup();
+ if (ret < 0)
+ TELEMETRY_LOG_ERR("TELEMETRY cleanup failed");
+ return -EPERM;
+ }
+
+ return 0;
+}
+
+static int32_t
+rte_telemetry_client_cleanup(struct telemetry_client *client)
+{
+ int ret;
+
+ ret = close(client->fd);
+ free(client->file_path);
+ free(client);
+
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Close client socket failed");
+ return -EPERM;
+ }
+
+ return 0;
+}
+
+int32_t
+rte_telemetry_cleanup(void)
+{
+ int ret;
+ struct telemetry_impl *telemetry = static_telemetry;
+ telemetry_client *client, *temp_client;
+
+ TAILQ_FOREACH_SAFE(client, &telemetry->client_list_head, client_list,
+ temp_client) {
+ TAILQ_REMOVE(&telemetry->client_list_head, client, client_list);
+ ret = rte_telemetry_client_cleanup(client);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Client cleanup failed");
+ return -EPERM;
+ }
+ }
+
+ ret = close(telemetry->server_fd);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Close TELEMETRY socket failed");
+ free(telemetry);
+ return -EPERM;
+ }
+
+ telemetry->thread_status = 0;
+ pthread_join(telemetry->thread_id, NULL);
+ free(telemetry);
+ static_telemetry = NULL;
+
+ return 0;
+}
+
+int32_t
+rte_telemetry_unregister_client(struct telemetry_impl *telemetry,
+ const char *client_path)
+{
+ int ret;
+ telemetry_client *client, *temp_client;
+
+ if (telemetry == NULL) {
+ TELEMETRY_LOG_WARN("TELEMETRY is not initialised");
+ return -ENODEV;
+ }
+
+ if (client_path == NULL) {
+ TELEMETRY_LOG_ERR("Invalid client path");
+ goto einval_fail;
+ }
+
+ if (TAILQ_EMPTY(&telemetry->client_list_head)) {
+ TELEMETRY_LOG_ERR("There are no clients currently registered");
+ return -EPERM;
+ }
+
+ TAILQ_FOREACH_SAFE(client, &telemetry->client_list_head, client_list,
+ temp_client) {
+ if (strcmp(client_path, client->file_path) == 0) {
+ TAILQ_REMOVE(&telemetry->client_list_head, client,
+ client_list);
+ ret = rte_telemetry_client_cleanup(client);
+
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Client cleanup failed");
+ return -EPERM;
+ }
+
+ return 0;
+ }
+ }
+
+ TELEMETRY_LOG_WARN("Couldn't find client, possibly not registered yet.");
+ return -1;
+
+einval_fail:
+ ret = rte_telemetry_send_error_response(telemetry, -EINVAL);
+ if (ret < 0)
+ TELEMETRY_LOG_ERR("Could not send error");
+ return -EINVAL;
+}
+
+int32_t
+rte_telemetry_register_client(struct telemetry_impl *telemetry,
+ const char *client_path)
+{
+ int ret, fd;
+ struct sockaddr_un addrs;
+
+ if (telemetry == NULL) {
+ TELEMETRY_LOG_ERR("Could not initialize TELEMETRY API");
+ return -ENODEV;
+ }
+
+ if (client_path == NULL) {
+ TELEMETRY_LOG_ERR("Invalid client path");
+ return -EINVAL;
+ }
+
+ telemetry_client *client;
+ TAILQ_FOREACH(client, &telemetry->client_list_head, client_list) {
+ if (strcmp(client_path, client->file_path) == 0) {
+ TELEMETRY_LOG_WARN("'%s' already registered",
+ client_path);
+ return -EINVAL;
+ }
+ }
+
+ fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ if (fd == -1) {
+ TELEMETRY_LOG_ERR("Client socket error");
+ return -EACCES;
+ }
+
+ ret = rte_telemetry_set_socket_nonblock(fd);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not set socket to NONBLOCK");
+ return -EPERM;
+ }
+
+ addrs.sun_family = AF_UNIX;
+ strlcpy(addrs.sun_path, client_path, sizeof(addrs.sun_path));
+ telemetry_client *new_client = malloc(sizeof(telemetry_client));
+ new_client->file_path = strdup(client_path);
+ new_client->fd = fd;
+
+ if (connect(fd, (struct sockaddr *)&addrs, sizeof(addrs)) == -1) {
+ TELEMETRY_LOG_ERR("TELEMETRY client connect to %s didn't work",
+ client_path);
+ ret = rte_telemetry_client_cleanup(new_client);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Client cleanup failed");
+ return -EPERM;
+ }
+ return -EINVAL;
+ }
+
+ TAILQ_INSERT_HEAD(&telemetry->client_list_head, new_client, client_list);
+
+ return 0;
+}
+
+int32_t
+rte_telemetry_parse_client_message(struct telemetry_impl *telemetry, char *buf)
+{
+ int ret, action_int;
+ json_error_t error;
+ json_t *root = json_loads(buf, 0, &error);
+
+ if (root == NULL) {
+ TELEMETRY_LOG_WARN("Could not load JSON object from data passed in : %s",
+ error.text);
+ goto fail;
+ } else if (!json_is_object(root)) {
+ TELEMETRY_LOG_WARN("JSON Request is not a JSON object");
+ goto fail;
+ }
+
+ json_t *action = json_object_get(root, "action");
+ if (action == NULL) {
+ TELEMETRY_LOG_WARN("Request does not have action field");
+ goto fail;
+ } else if (!json_is_integer(action)) {
+ TELEMETRY_LOG_WARN("Action value is not an integer");
+ goto fail;
+ }
+
+ json_t *command = json_object_get(root, "command");
+ if (command == NULL) {
+ TELEMETRY_LOG_WARN("Request does not have command field");
+ goto fail;
+ } else if (!json_is_string(command)) {
+ TELEMETRY_LOG_WARN("Command value is not a string");
+ goto fail;
+ }
+
+ action_int = json_integer_value(action);
+ if (action_int != ACTION_POST) {
+ TELEMETRY_LOG_WARN("Invalid action code");
+ goto fail;
+ }
+
+ if (strcmp(json_string_value(command), "clients") != 0) {
+ TELEMETRY_LOG_WARN("Invalid command");
+ goto fail;
+ }
+
+ json_t *data = json_object_get(root, "data");
+ if (data == NULL) {
+ TELEMETRY_LOG_WARN("Request does not have data field");
+ goto fail;
+ }
+
+ json_t *client_path = json_object_get(data, "client_path");
+ if (client_path == NULL) {
+ TELEMETRY_LOG_WARN("Request does not have client_path field");
+ goto fail;
+ }
+
+ if (!json_is_string(client_path)) {
+ TELEMETRY_LOG_WARN("Client_path value is not a string");
+ goto fail;
+ }
+
+ ret = rte_telemetry_register_client(telemetry,
+ json_string_value(client_path));
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not register client");
+ telemetry->register_fail_count++;
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ TELEMETRY_LOG_WARN("Client attempted to register with invalid message");
+ json_decref(root);
+ return -1;
+}
+
+static int32_t
+rte_telemetry_dummy_client_socket(const char *valid_client_path)
+{
+ int sockfd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ struct sockaddr_un addr = {0};
+
+ if (sockfd < 0) {
+ TELEMETRY_LOG_ERR("Test socket creation failure");
+ return -1;
+ }
+
+ addr.sun_family = AF_UNIX;
+ strlcpy(addr.sun_path, valid_client_path, sizeof(addr.sun_path));
+ unlink(valid_client_path);
+
+ if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ TELEMETRY_LOG_ERR("Test socket binding failure");
+ return -1;
+ }
+
+ if (listen(sockfd, 1) < 0) {
+ TELEMETRY_LOG_ERR("Listen failure");
+ return -1;
+ }
+
+ return sockfd;
+}
+
+int32_t
+rte_telemetry_selftest(void)
+{
+ const char *invalid_client_path = SELFTEST_INVALID_CLIENT;
+ const char *valid_client_path = SELFTEST_VALID_CLIENT;
+ int ret, sockfd;
+
+ TELEMETRY_LOG_INFO("Selftest");
+
+ ret = rte_telemetry_init();
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Valid initialisation test failed");
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Valid initialisation test passed");
+
+ ret = rte_telemetry_init();
+ if (ret != -EALREADY) {
+ TELEMETRY_LOG_ERR("Invalid initialisation test failed");
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Invalid initialisation test passed");
+
+ ret = rte_telemetry_unregister_client(static_telemetry,
+ invalid_client_path);
+ if (ret != -EPERM) {
+ TELEMETRY_LOG_ERR("Invalid unregister test failed");
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Invalid unregister test passed");
+
+ sockfd = rte_telemetry_dummy_client_socket(valid_client_path);
+ if (sockfd < 0) {
+ TELEMETRY_LOG_ERR("Test socket creation failed");
+ return -1;
+ }
+
+ ret = rte_telemetry_register_client(static_telemetry, valid_client_path);
+ if (ret != 0) {
+ TELEMETRY_LOG_ERR("Valid register test failed: %i", ret);
+ return -1;
+ }
+
+ accept(sockfd, NULL, NULL);
+ TELEMETRY_LOG_INFO("Success - Valid register test passed");
+
+ ret = rte_telemetry_register_client(static_telemetry, valid_client_path);
+ if (ret != -EINVAL) {
+ TELEMETRY_LOG_ERR("Invalid register test failed: %i", ret);
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Invalid register test passed");
+
+ ret = rte_telemetry_unregister_client(static_telemetry,
+ invalid_client_path);
+ if (ret != -1) {
+ TELEMETRY_LOG_ERR("Invalid unregister test failed: %i", ret);
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Invalid unregister test passed");
+
+ ret = rte_telemetry_unregister_client(static_telemetry, valid_client_path);
+ if (ret != 0) {
+ TELEMETRY_LOG_ERR("Valid unregister test failed: %i", ret);
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Valid unregister test passed");
+
+ ret = rte_telemetry_cleanup();
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Cleanup test failed");
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Valid cleanup test passed");
+
+ return 0;
+}
+
+int32_t
+rte_telemetry_socket_messaging_testing(int index, int socket)
+{
+ struct telemetry_impl *telemetry = calloc(1, sizeof(telemetry_impl));
+ int fd, bad_send_fd, send_fd, bad_fd, bad_recv_fd, recv_fd, ret;
+
+ if (telemetry == NULL) {
+ TELEMETRY_LOG_ERR("Could not initialize Telemetry API");
+ return -1;
+ }
+
+ telemetry->server_fd = socket;
+ telemetry->reg_index[0] = index;
+ TELEMETRY_LOG_INFO("Beginning Telemetry socket message Selftest");
+ rte_telemetry_socket_test_setup(telemetry, &send_fd, &recv_fd);
+ TELEMETRY_LOG_INFO("Register valid client test");
+
+ ret = rte_telemetry_socket_register_test(telemetry, &fd, send_fd,
+ recv_fd);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Register valid client test failed!");
+ free(telemetry);
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Register valid client test passed!");
+
+ TELEMETRY_LOG_INFO("Register invalid/same client test");
+ ret = rte_telemetry_socket_test_setup(telemetry, &bad_send_fd,
+ &bad_recv_fd);
+ ret = rte_telemetry_socket_register_test(telemetry, &bad_fd,
+ bad_send_fd, bad_recv_fd);
+ if (!ret) {
+ TELEMETRY_LOG_ERR("Register invalid/same client test failed!");
+ free(telemetry);
+ return -1;
+ }
+
+ TELEMETRY_LOG_INFO("Success - Register invalid/same client test passed!");
+
+ ret = rte_telemetry_json_socket_message_test(telemetry, fd);
+ if (ret < 0) {
+ free(telemetry);
+ return -1;
+ }
+
+ free(telemetry);
+ return 0;
+}
+
+int32_t
+rte_telemetry_socket_register_test(struct telemetry_impl *telemetry, int *fd,
+ int send_fd, int recv_fd)
+{
+ int ret;
+ char good_req_string[BUF_SIZE];
+
+ snprintf(good_req_string, sizeof(good_req_string),
+ "{\"action\":1,\"command\":\"clients\",\"data\":{\"client_path\""
+ ":\"%s\"}}", SOCKET_TEST_CLIENT_PATH);
+
+ listen(recv_fd, 1);
+
+ ret = send(send_fd, good_req_string, strlen(good_req_string), 0);
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not send message over socket");
+ return -1;
+ }
+
+ rte_telemetry_run(telemetry);
+
+ if (telemetry->register_fail_count != 0)
+ return -1;
+
+ *fd = accept(recv_fd, NULL, NULL);
+
+ return 0;
+}
+
+int32_t
+rte_telemetry_socket_test_setup(struct telemetry_impl *telemetry, int *send_fd,
+ int *recv_fd)
+{
+ int ret;
+ const char *client_path = SOCKET_TEST_CLIENT_PATH;
+ char socket_path[BUF_SIZE];
+ struct sockaddr_un addr = {0};
+ struct sockaddr_un addrs = {0};
+ *send_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+ *recv_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+
+ listen(telemetry->server_fd, 5);
+ addr.sun_family = AF_UNIX;
+ rte_telemetry_get_runtime_dir(socket_path, sizeof(socket_path));
+ strlcpy(addr.sun_path, socket_path, sizeof(addr.sun_path));
+
+ ret = connect(*send_fd, (struct sockaddr *) &addr, sizeof(addr));
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not connect socket");
+ return -1;
+ }
+
+ telemetry->accept_fd = accept(telemetry->server_fd, NULL, NULL);
+
+ addrs.sun_family = AF_UNIX;
+ strlcpy(addrs.sun_path, client_path, sizeof(addrs.sun_path));
+ unlink(client_path);
+
+ ret = bind(*recv_fd, (struct sockaddr *)&addrs, sizeof(addrs));
+ if (ret < 0) {
+ TELEMETRY_LOG_ERR("Could not bind socket");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t
+rte_telemetry_stat_parse(char *buf, struct json_data *json_data_struct)
+{
+ json_error_t error;
+ json_t *root = json_loads(buf, 0, &error);
+ int arraylen, i;
+ json_t *status, *dataArray, *port, *stats, *name, *value, *dataArrayObj,
+ *statsArrayObj;
+
+ stats = NULL;
+ port = NULL;
+ name = NULL;
+
+ if (buf == NULL) {
+ TELEMETRY_LOG_ERR("JSON message is NULL");
+ return -EINVAL;
+ }
+
+ if (root == NULL) {
+ TELEMETRY_LOG_ERR("Could not load JSON object from data passed in : %s",
+ error.text);
+ return -EPERM;
+ } else if (!json_is_object(root)) {
+ TELEMETRY_LOG_ERR("JSON Request is not a JSON object");
+ json_decref(root);
+ return -EINVAL;
+ }
+
+ status = json_object_get(root, "status_code");
+ if (!status) {
+ TELEMETRY_LOG_ERR("Request does not have status field");
+ return -EINVAL;
+ } else if (!json_is_string(status)) {
+ TELEMETRY_LOG_ERR("Status value is not a string");
+ return -EINVAL;
+ }
+
+ json_data_struct->status_code = strdup(json_string_value(status));
+
+ dataArray = json_object_get(root, "data");
+ if (dataArray == NULL) {
+ TELEMETRY_LOG_ERR("Request does not have data field");
+ return -EINVAL;
+ }
+
+ arraylen = json_array_size(dataArray);
+ if (arraylen == 0) {
+ json_data_struct->data = "null";
+ return -EINVAL;
+ }
+
+ for (i = 0; i < arraylen; i++) {
+ dataArrayObj = json_array_get(dataArray, i);
+ port = json_object_get(dataArrayObj, "port");
+ stats = json_object_get(dataArrayObj, "stats");
+ }
+
+ if (port == NULL) {
+ TELEMETRY_LOG_ERR("Request does not have port field");
+ return -EINVAL;
+ }
+
+ if (!json_is_integer(port)) {
+ TELEMETRY_LOG_ERR("Port value is not an integer");
+ return -EINVAL;
+ }
+
+ json_data_struct->port = json_integer_value(port);
+
+ if (stats == NULL) {
+ TELEMETRY_LOG_ERR("Request does not have stats field");
+ return -EINVAL;
+ }
+
+ arraylen = json_array_size(stats);
+ for (i = 0; i < arraylen; i++) {
+ statsArrayObj = json_array_get(stats, i);
+ name = json_object_get(statsArrayObj, "name");
+ value = json_object_get(statsArrayObj, "value");
+ }
+
+ if (name == NULL) {
+ TELEMETRY_LOG_ERR("Request does not have name field");
+ return -EINVAL;
+ }
+
+ if (!json_is_string(name)) {
+ TELEMETRY_LOG_ERR("Stat name value is not a string");
+ return -EINVAL;