+static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct pending_request *sr, const struct timespec *now)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, last_msg;
+
+ param = sr->async.param;
+ reply = ¶m->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(¶m->end, now) <= 0;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+
+ /* mark this request as processed */
+ param->n_responses_processed++;
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ } else if (timeout) {
+ /* count it as processed response, but don't increment
+ * nb_received.
+ */
+ param->n_responses_processed++;
+ }
+
+ free(sr->reply);
+
+ last_msg = param->n_responses_processed == reply->nb_sent;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct pending_request *sr)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = ¶m->user_reply;
+
+ param->clb(sr->request, reply);
+
+ /* clean up */
+ free(sr->async.param->user_reply.msgs);
+ free(sr->async.param);
+ free(sr->request);
+ free(sr);
+}
+
+static struct pending_request *
+async_reply_handle_thread_unsafe(void *arg)
+{
+ struct pending_request *req = (struct pending_request *)arg;
+ enum async_action action;
+ struct timespec ts_now;
+ struct timeval now;
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ goto no_trigger;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ action = process_async_request(req, &ts_now);
+
+ TAILQ_REMOVE(&pending_requests.requests, req, next);
+
+ if (rte_eal_alarm_cancel(async_reply_handle, req) < 0) {
+ /* if we failed to cancel the alarm because it's already in
+ * progress, don't proceed because otherwise we will end up
+ * handling the same message twice.
+ */
+ if (rte_errno == EINPROGRESS) {
+ RTE_LOG(DEBUG, EAL, "Request handling is already in progress\n");
+ goto no_trigger;
+ }
+ RTE_LOG(ERR, EAL, "Failed to cancel alarm\n");
+ }
+
+ if (action == ACTION_TRIGGER)
+ return req;
+no_trigger:
+ free(req);
+ return NULL;
+}
+
+static void
+async_reply_handle(void *arg)
+{
+ struct pending_request *req;
+
+ pthread_mutex_lock(&pending_requests.lock);
+ req = async_reply_handle_thread_unsafe(arg);
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ if (req != NULL)
+ trigger_async_action(req);
+}
+