X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_eventdev%2Frte_event_timer_adapter.c;h=459bc476d417cf66e369f44e984c5166b03c4472;hb=cc7b73ea9e3bdbe2b3ef7bf92a7596231eae4afb;hp=575da041ba221d7e975d488b0f81785944180ea4;hpb=d69d085883140fce1e356ccb69adfae4f0ab6869;p=dpdk.git diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c index 575da041ba..459bc476d4 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.c +++ b/lib/librte_eventdev/rte_event_timer_adapter.c @@ -34,7 +34,7 @@ static int evtim_buffer_logtype; static struct rte_event_timer_adapter adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX]; -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops; +static const struct rte_event_timer_adapter_ops swtim_ops; #define EVTIM_LOG(level, logtype, ...) \ rte_log(RTE_LOG_ ## level, logtype, \ @@ -117,14 +117,14 @@ default_port_conf_cb(uint16_t id, uint8_t event_dev_id, uint8_t *event_port_id, return ret; } -struct rte_event_timer_adapter * __rte_experimental +struct rte_event_timer_adapter * rte_event_timer_adapter_create(const struct rte_event_timer_adapter_conf *conf) { return rte_event_timer_adapter_create_ext(conf, default_port_conf_cb, NULL); } -struct rte_event_timer_adapter * __rte_experimental +struct rte_event_timer_adapter * rte_event_timer_adapter_create_ext( const struct rte_event_timer_adapter_conf *conf, rte_event_timer_adapter_port_conf_cb_t conf_cb, @@ -211,7 +211,7 @@ rte_event_timer_adapter_create_ext( * implementation. */ if (adapter->ops == NULL) - adapter->ops = &sw_event_adapter_timer_ops; + adapter->ops = &swtim_ops; /* Allow driver to do some setup */ FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP); @@ -235,7 +235,7 @@ free_memzone: return NULL; } -int __rte_experimental +int rte_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter, struct rte_event_timer_adapter_info *adapter_info) { @@ -253,7 +253,7 @@ rte_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter, return 0; } -int __rte_experimental +int rte_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter) { int ret; @@ -276,7 +276,7 @@ rte_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter) return 0; } -int __rte_experimental +int rte_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter) { int ret; @@ -299,7 +299,7 @@ rte_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter) return 0; } -struct rte_event_timer_adapter * __rte_experimental +struct rte_event_timer_adapter * rte_event_timer_adapter_lookup(uint16_t adapter_id) { char name[DATA_MZ_NAME_MAX_LEN]; @@ -340,7 +340,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id) * implementation. */ if (adapter->ops == NULL) - adapter->ops = &sw_event_adapter_timer_ops; + adapter->ops = &swtim_ops; /* Set fast-path function pointers */ adapter->arm_burst = adapter->ops->arm_burst; @@ -352,7 +352,7 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id) return adapter; } -int __rte_experimental +int rte_event_timer_adapter_free(struct rte_event_timer_adapter *adapter) { int ret; @@ -382,7 +382,7 @@ rte_event_timer_adapter_free(struct rte_event_timer_adapter *adapter) return 0; } -int __rte_experimental +int rte_event_timer_adapter_service_id_get(struct rte_event_timer_adapter *adapter, uint32_t *service_id) { @@ -394,7 +394,7 @@ rte_event_timer_adapter_service_id_get(struct rte_event_timer_adapter *adapter, return adapter->data->service_inited ? 0 : -ESRCH; } -int __rte_experimental +int rte_event_timer_adapter_stats_get(struct rte_event_timer_adapter *adapter, struct rte_event_timer_adapter_stats *stats) { @@ -406,7 +406,7 @@ rte_event_timer_adapter_stats_get(struct rte_event_timer_adapter *adapter, return adapter->ops->stats_get(adapter, stats); } -int __rte_experimental +int rte_event_timer_adapter_stats_reset(struct rte_event_timer_adapter *adapter) { ADAPTER_VALID_OR_ERR_RET(adapter, -EINVAL); @@ -427,9 +427,11 @@ rte_event_timer_adapter_stats_reset(struct rte_event_timer_adapter *adapter) #define EVENT_BUFFER_BATCHSZ 32 #define EVENT_BUFFER_MASK (EVENT_BUFFER_SZ - 1) +#define EXP_TIM_BUF_SZ 128 + struct event_buffer { - uint16_t head; - uint16_t tail; + size_t head; + size_t tail; struct rte_event events[EVENT_BUFFER_SZ]; } __rte_cache_aligned; @@ -455,7 +457,7 @@ event_buffer_init(struct event_buffer *bufp) static int event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp) { - uint16_t head_idx; + size_t head_idx; struct rte_event *buf_eventp; if (event_buffer_full(bufp)) @@ -477,13 +479,16 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id, uint16_t *nb_events_flushed, uint16_t *nb_events_inv) { - uint16_t head_idx, tail_idx, n = 0; struct rte_event *events = bufp->events; + size_t head_idx, tail_idx; + uint16_t n = 0; /* Instead of modulus, bitwise AND with mask to get index. */ head_idx = bufp->head & EVENT_BUFFER_MASK; tail_idx = bufp->tail & EVENT_BUFFER_MASK; + RTE_ASSERT(head_idx < EVENT_BUFFER_SZ && tail_idx < EVENT_BUFFER_SZ); + /* Determine the largest contigous run we can attempt to enqueue to the * event device. */ @@ -491,150 +496,165 @@ event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id, n = head_idx - tail_idx; else if (head_idx < tail_idx) n = EVENT_BUFFER_SZ - tail_idx; + else if (event_buffer_full(bufp)) + n = EVENT_BUFFER_SZ - tail_idx; else { *nb_events_flushed = 0; return; } + n = RTE_MIN(EVENT_BUFFER_BATCHSZ, n); *nb_events_inv = 0; + *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id, &events[tail_idx], n); - if (*nb_events_flushed != n && rte_errno == -EINVAL) { - EVTIM_LOG_ERR("failed to enqueue invalid event - dropping it"); - (*nb_events_inv)++; + if (*nb_events_flushed != n) { + if (rte_errno == -EINVAL) { + EVTIM_LOG_ERR("failed to enqueue invalid event - " + "dropping it"); + (*nb_events_inv)++; + } else if (rte_errno == -ENOSPC) + rte_pause(); } + if (*nb_events_flushed > 0) + EVTIM_BUF_LOG_DBG("enqueued %"PRIu16" timer events to event " + "device", *nb_events_flushed); + bufp->tail = bufp->tail + *nb_events_flushed + *nb_events_inv; } /* * Software event timer adapter implementation */ - -struct rte_event_timer_adapter_sw_data { - /* List of messages for outstanding timers */ - TAILQ_HEAD(, msg) msgs_tailq_head; - /* Lock to guard tailq and armed count */ - rte_spinlock_t msgs_tailq_sl; +struct swtim { /* Identifier of service executing timer management logic. */ uint32_t service_id; /* The cycle count at which the adapter should next tick */ uint64_t next_tick_cycles; - /* Incremented as the service moves through phases of an iteration */ - volatile int service_phase; /* The tick resolution used by adapter instance. May have been * adjusted from what user requested */ uint64_t timer_tick_ns; /* Maximum timeout in nanoseconds allowed by adapter instance. */ uint64_t max_tmo_ns; - /* Ring containing messages to arm or cancel event timers */ - struct rte_ring *msg_ring; - /* Mempool containing msg objects */ - struct rte_mempool *msg_pool; /* Buffered timer expiry events to be enqueued to an event device. */ struct event_buffer buffer; /* Statistics */ struct rte_event_timer_adapter_stats stats; - /* The number of threads currently adding to the message ring */ - rte_atomic16_t message_producer_count; + /* Mempool of timer objects */ + struct rte_mempool *tim_pool; + /* Back pointer for convenience */ + struct rte_event_timer_adapter *adapter; + /* Identifier of timer data instance */ + uint32_t timer_data_id; + /* Track which cores have actually armed a timer */ + struct { + rte_atomic16_t v; + } __rte_cache_aligned in_use[RTE_MAX_LCORE]; + /* Track which cores' timer lists should be polled */ + unsigned int poll_lcores[RTE_MAX_LCORE]; + /* The number of lists that should be polled */ + int n_poll_lcores; + /* Timers which have expired and can be returned to a mempool */ + struct rte_timer *expired_timers[EXP_TIM_BUF_SZ]; + /* The number of timers that can be returned to a mempool */ + size_t n_expired_timers; }; -enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL}; - -struct msg { - enum msg_type type; - struct rte_event_timer *evtim; - struct rte_timer tim; - TAILQ_ENTRY(msg) msgs; -}; +static inline struct swtim * +swtim_pmd_priv(const struct rte_event_timer_adapter *adapter) +{ + return adapter->data->adapter_priv; +} static void -sw_event_timer_cb(struct rte_timer *tim, void *arg) +swtim_callback(struct rte_timer *tim) { - int ret; + struct rte_event_timer *evtim = tim->arg; + struct rte_event_timer_adapter *adapter; + unsigned int lcore = rte_lcore_id(); + struct swtim *sw; uint16_t nb_evs_flushed = 0; uint16_t nb_evs_invalid = 0; uint64_t opaque; - struct rte_event_timer *evtim; - struct rte_event_timer_adapter *adapter; - struct rte_event_timer_adapter_sw_data *sw_data; + int ret; - evtim = arg; opaque = evtim->impl_opaque[1]; adapter = (struct rte_event_timer_adapter *)(uintptr_t)opaque; - sw_data = adapter->data->adapter_priv; + sw = swtim_pmd_priv(adapter); - ret = event_buffer_add(&sw_data->buffer, &evtim->ev); + ret = event_buffer_add(&sw->buffer, &evtim->ev); if (ret < 0) { /* If event buffer is full, put timer back in list with * immediate expiry value, so that we process it again on the * next iteration. */ - rte_timer_reset_sync(tim, 0, SINGLE, rte_lcore_id(), - sw_event_timer_cb, evtim); + ret = rte_timer_alt_reset(sw->timer_data_id, tim, 0, SINGLE, + lcore, NULL, evtim); + if (ret < 0) { + EVTIM_LOG_DBG("event buffer full, failed to reset " + "timer with immediate expiry value"); + } else { + sw->stats.evtim_retry_count++; + EVTIM_LOG_DBG("event buffer full, resetting rte_timer " + "with immediate expiry value"); + } - sw_data->stats.evtim_retry_count++; - EVTIM_LOG_DBG("event buffer full, resetting rte_timer with " - "immediate expiry value"); + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore].v))) + sw->poll_lcores[sw->n_poll_lcores++] = lcore; } else { - struct msg *m = container_of(tim, struct msg, tim); - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs); EVTIM_BUF_LOG_DBG("buffered an event timer expiry event"); - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; - /* Free the msg object containing the rte_timer now that - * we've buffered its event successfully. + /* Empty the buffer here, if necessary, to free older expired + * timers only */ - rte_mempool_put(sw_data->msg_pool, m); + if (unlikely(sw->n_expired_timers == EXP_TIM_BUF_SZ)) { + rte_mempool_put_bulk(sw->tim_pool, + (void **)sw->expired_timers, + sw->n_expired_timers); + sw->n_expired_timers = 0; + } - /* Bump the count when we successfully add an expiry event to - * the buffer. - */ - sw_data->stats.evtim_exp_count++; + sw->expired_timers[sw->n_expired_timers++] = tim; + sw->stats.evtim_exp_count++; + + evtim->state = RTE_EVENT_TIMER_NOT_ARMED; } - if (event_buffer_batch_ready(&sw_data->buffer)) { - event_buffer_flush(&sw_data->buffer, + if (event_buffer_batch_ready(&sw->buffer)) { + event_buffer_flush(&sw->buffer, adapter->data->event_dev_id, adapter->data->event_port_id, &nb_evs_flushed, &nb_evs_invalid); - sw_data->stats.ev_enq_count += nb_evs_flushed; - sw_data->stats.ev_inv_count += nb_evs_invalid; + sw->stats.ev_enq_count += nb_evs_flushed; + sw->stats.ev_inv_count += nb_evs_invalid; } } static __rte_always_inline uint64_t get_timeout_cycles(struct rte_event_timer *evtim, - struct rte_event_timer_adapter *adapter) + const struct rte_event_timer_adapter *adapter) { - uint64_t timeout_ns; - struct rte_event_timer_adapter_sw_data *sw_data; - - sw_data = adapter->data->adapter_priv; - timeout_ns = evtim->timeout_ticks * sw_data->timer_tick_ns; + struct swtim *sw = swtim_pmd_priv(adapter); + uint64_t timeout_ns = evtim->timeout_ticks * sw->timer_tick_ns; return timeout_ns * rte_get_timer_hz() / NSECPERSEC; - } /* This function returns true if one or more (adapter) ticks have occurred since * the last time it was called. */ static inline bool -adapter_did_tick(struct rte_event_timer_adapter *adapter) +swtim_did_tick(struct swtim *sw) { uint64_t cycles_per_adapter_tick, start_cycles; uint64_t *next_tick_cyclesp; - struct rte_event_timer_adapter_sw_data *sw_data; - - sw_data = adapter->data->adapter_priv; - next_tick_cyclesp = &sw_data->next_tick_cycles; - cycles_per_adapter_tick = sw_data->timer_tick_ns * + next_tick_cyclesp = &sw->next_tick_cycles; + cycles_per_adapter_tick = sw->timer_tick_ns * (rte_get_timer_hz() / NSECPERSEC); - start_cycles = rte_get_timer_cycles(); /* Note: initially, *next_tick_cyclesp == 0, so the clause below will @@ -646,7 +666,6 @@ adapter_did_tick(struct rte_event_timer_adapter *adapter) * boundary. */ start_cycles -= start_cycles % cycles_per_adapter_tick; - *next_tick_cyclesp = start_cycles + cycles_per_adapter_tick; return true; @@ -661,15 +680,12 @@ check_timeout(struct rte_event_timer *evtim, const struct rte_event_timer_adapter *adapter) { uint64_t tmo_nsec; - struct rte_event_timer_adapter_sw_data *sw_data; + struct swtim *sw = swtim_pmd_priv(adapter); - sw_data = adapter->data->adapter_priv; - tmo_nsec = evtim->timeout_ticks * sw_data->timer_tick_ns; - - if (tmo_nsec > sw_data->max_tmo_ns) + tmo_nsec = evtim->timeout_ticks * sw->timer_tick_ns; + if (tmo_nsec > sw->max_tmo_ns) return -1; - - if (tmo_nsec < sw_data->timer_tick_ns) + if (tmo_nsec < sw->timer_tick_ns) return -2; return 0; @@ -697,110 +713,36 @@ check_destination_event_queue(struct rte_event_timer *evtim, return 0; } -#define NB_OBJS 32 static int -sw_event_timer_adapter_service_func(void *arg) +swtim_service_func(void *arg) { - int i, num_msgs; - uint64_t cycles, opaque; + struct rte_event_timer_adapter *adapter = arg; + struct swtim *sw = swtim_pmd_priv(adapter); uint16_t nb_evs_flushed = 0; uint16_t nb_evs_invalid = 0; - struct rte_event_timer_adapter *adapter; - struct rte_event_timer_adapter_sw_data *sw_data; - struct rte_event_timer *evtim = NULL; - struct rte_timer *tim = NULL; - struct msg *msg, *msgs[NB_OBJS]; - - adapter = arg; - sw_data = adapter->data->adapter_priv; - - sw_data->service_phase = 1; - rte_smp_wmb(); - - while (rte_atomic16_read(&sw_data->message_producer_count) > 0 || - !rte_ring_empty(sw_data->msg_ring)) { - - num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring, - (void **)msgs, NB_OBJS, NULL); - - for (i = 0; i < num_msgs; i++) { - int ret = 0; - - RTE_SET_USED(ret); - - msg = msgs[i]; - evtim = msg->evtim; - - switch (msg->type) { - case MSG_TYPE_ARM: - EVTIM_SVC_LOG_DBG("dequeued ARM message from " - "ring"); - tim = &msg->tim; - rte_timer_init(tim); - cycles = get_timeout_cycles(evtim, - adapter); - ret = rte_timer_reset(tim, cycles, SINGLE, - rte_lcore_id(), - sw_event_timer_cb, - evtim); - RTE_ASSERT(ret == 0); - - evtim->impl_opaque[0] = (uintptr_t)tim; - evtim->impl_opaque[1] = (uintptr_t)adapter; - - TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head, - msg, - msgs); - break; - case MSG_TYPE_CANCEL: - EVTIM_SVC_LOG_DBG("dequeued CANCEL message " - "from ring"); - opaque = evtim->impl_opaque[0]; - tim = (struct rte_timer *)(uintptr_t)opaque; - RTE_ASSERT(tim != NULL); - - ret = rte_timer_stop(tim); - RTE_ASSERT(ret == 0); - - /* Free the msg object for the original arm - * request. - */ - struct msg *m; - m = container_of(tim, struct msg, tim); - TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, - msgs); - rte_mempool_put(sw_data->msg_pool, m); - - /* Free the msg object for the current msg */ - rte_mempool_put(sw_data->msg_pool, msg); - - evtim->impl_opaque[0] = 0; - evtim->impl_opaque[1] = 0; - - break; - } - } - } - sw_data->service_phase = 2; - rte_smp_wmb(); + if (swtim_did_tick(sw)) { + rte_timer_alt_manage(sw->timer_data_id, + sw->poll_lcores, + sw->n_poll_lcores, + swtim_callback); - if (adapter_did_tick(adapter)) { - rte_timer_manage(); + /* Return expired timer objects back to mempool */ + rte_mempool_put_bulk(sw->tim_pool, (void **)sw->expired_timers, + sw->n_expired_timers); + sw->n_expired_timers = 0; - event_buffer_flush(&sw_data->buffer, + event_buffer_flush(&sw->buffer, adapter->data->event_dev_id, adapter->data->event_port_id, - &nb_evs_flushed, &nb_evs_invalid); + &nb_evs_flushed, + &nb_evs_invalid); - sw_data->stats.ev_enq_count += nb_evs_flushed; - sw_data->stats.ev_inv_count += nb_evs_invalid; - sw_data->stats.adapter_tick_count++; + sw->stats.ev_enq_count += nb_evs_flushed; + sw->stats.ev_inv_count += nb_evs_invalid; + sw->stats.adapter_tick_count++; } - sw_data->service_phase = 0; - rte_smp_wmb(); - return 0; } @@ -820,7 +762,7 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual) int size; int cache_size = 0; - for (i = 0; ; i++) { + for (i = 0;; i++) { size = 1 << i; if (RTE_MAX_LCORE * size < (int)(nb_actual - nb_requested) && @@ -834,168 +776,144 @@ compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual) return cache_size; } -#define SW_MIN_INTERVAL 1E5 - static int -sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter) +swtim_init(struct rte_event_timer_adapter *adapter) { - int ret; - struct rte_event_timer_adapter_sw_data *sw_data; - uint64_t nb_timers; + int i, ret; + struct swtim *sw; unsigned int flags; struct rte_service_spec service; - static bool timer_subsystem_inited; // static initialized to false - /* Allocate storage for SW implementation data */ - char priv_data_name[RTE_RING_NAMESIZE]; - snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8, - adapter->data->id); - adapter->data->adapter_priv = rte_zmalloc_socket( - priv_data_name, - sizeof(struct rte_event_timer_adapter_sw_data), - RTE_CACHE_LINE_SIZE, - adapter->data->socket_id); - if (adapter->data->adapter_priv == NULL) { + /* Allocate storage for private data area */ +#define SWTIM_NAMESIZE 32 + char swtim_name[SWTIM_NAMESIZE]; + snprintf(swtim_name, SWTIM_NAMESIZE, "swtim_%"PRIu8, + adapter->data->id); + sw = rte_zmalloc_socket(swtim_name, sizeof(*sw), RTE_CACHE_LINE_SIZE, + adapter->data->socket_id); + if (sw == NULL) { EVTIM_LOG_ERR("failed to allocate space for private data"); rte_errno = ENOMEM; return -1; } - if (adapter->data->conf.timer_tick_ns < SW_MIN_INTERVAL) { - EVTIM_LOG_ERR("failed to create adapter with requested tick " - "interval"); - rte_errno = EINVAL; - return -1; - } - - sw_data = adapter->data->adapter_priv; + /* Connect storage to adapter instance */ + adapter->data->adapter_priv = sw; + sw->adapter = adapter; - sw_data->timer_tick_ns = adapter->data->conf.timer_tick_ns; - sw_data->max_tmo_ns = adapter->data->conf.max_tmo_ns; + sw->timer_tick_ns = adapter->data->conf.timer_tick_ns; + sw->max_tmo_ns = adapter->data->conf.max_tmo_ns; - TAILQ_INIT(&sw_data->msgs_tailq_head); - rte_spinlock_init(&sw_data->msgs_tailq_sl); - rte_atomic16_init(&sw_data->message_producer_count); - - /* Rings require power of 2, so round up to next such value */ - nb_timers = rte_align64pow2(adapter->data->conf.nb_timers); - - char msg_ring_name[RTE_RING_NAMESIZE]; - snprintf(msg_ring_name, RTE_RING_NAMESIZE, - "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id); - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ? - RING_F_SP_ENQ | RING_F_SC_DEQ : - RING_F_SC_DEQ; - sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers, - adapter->data->socket_id, flags); - if (sw_data->msg_ring == NULL) { - EVTIM_LOG_ERR("failed to create message ring"); - rte_errno = ENOMEM; - goto free_priv_data; - } - - char pool_name[RTE_RING_NAMESIZE]; - snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8, + /* Create a timer pool */ + char pool_name[SWTIM_NAMESIZE]; + snprintf(pool_name, SWTIM_NAMESIZE, "swtim_pool_%"PRIu8, adapter->data->id); - - /* Both the arming/canceling thread and the service thread will do puts - * to the mempool, but if the SP_PUT flag is enabled, we can specify - * single-consumer get for the mempool. - */ - flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ? - MEMPOOL_F_SC_GET : 0; - - /* The usable size of a ring is count - 1, so subtract one here to - * make the counts agree. - */ + /* Optimal mempool size is a power of 2 minus one */ + uint64_t nb_timers = rte_align64pow2(adapter->data->conf.nb_timers); int pool_size = nb_timers - 1; int cache_size = compute_msg_mempool_cache_size( adapter->data->conf.nb_timers, nb_timers); - sw_data->msg_pool = rte_mempool_create(pool_name, pool_size, - sizeof(struct msg), cache_size, - 0, NULL, NULL, NULL, NULL, - adapter->data->socket_id, flags); - if (sw_data->msg_pool == NULL) { - EVTIM_LOG_ERR("failed to create message object mempool"); + flags = 0; /* pool is multi-producer, multi-consumer */ + sw->tim_pool = rte_mempool_create(pool_name, pool_size, + sizeof(struct rte_timer), cache_size, 0, NULL, NULL, + NULL, NULL, adapter->data->socket_id, flags); + if (sw->tim_pool == NULL) { + EVTIM_LOG_ERR("failed to create timer object mempool"); rte_errno = ENOMEM; - goto free_msg_ring; + goto free_alloc; + } + + /* Initialize the variables that track in-use timer lists */ + for (i = 0; i < RTE_MAX_LCORE; i++) + rte_atomic16_init(&sw->in_use[i].v); + + /* Initialize the timer subsystem and allocate timer data instance */ + ret = rte_timer_subsystem_init(); + if (ret < 0) { + if (ret != -EALREADY) { + EVTIM_LOG_ERR("failed to initialize timer subsystem"); + rte_errno = ret; + goto free_mempool; + } + } + + ret = rte_timer_data_alloc(&sw->timer_data_id); + if (ret < 0) { + EVTIM_LOG_ERR("failed to allocate timer data instance"); + rte_errno = ret; + goto free_mempool; } - event_buffer_init(&sw_data->buffer); + /* Initialize timer event buffer */ + event_buffer_init(&sw->buffer); + + sw->adapter = adapter; /* Register a service component to run adapter logic */ memset(&service, 0, sizeof(service)); snprintf(service.name, RTE_SERVICE_NAME_MAX, - "sw_evimer_adap_svc_%"PRIu8, adapter->data->id); + "swtim_svc_%"PRIu8, adapter->data->id); service.socket_id = adapter->data->socket_id; - service.callback = sw_event_timer_adapter_service_func; + service.callback = swtim_service_func; service.callback_userdata = adapter; service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE); - ret = rte_service_component_register(&service, &sw_data->service_id); + ret = rte_service_component_register(&service, &sw->service_id); if (ret < 0) { EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32 - ": err = %d", service.name, sw_data->service_id, + ": err = %d", service.name, sw->service_id, ret); rte_errno = ENOSPC; - goto free_msg_pool; + goto free_mempool; } EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name, - sw_data->service_id); + sw->service_id); - adapter->data->service_id = sw_data->service_id; + adapter->data->service_id = sw->service_id; adapter->data->service_inited = 1; - if (!timer_subsystem_inited) { - rte_timer_subsystem_init(); - timer_subsystem_inited = true; - } - return 0; - -free_msg_pool: - rte_mempool_free(sw_data->msg_pool); -free_msg_ring: - rte_ring_free(sw_data->msg_ring); -free_priv_data: - rte_free(sw_data); +free_mempool: + rte_mempool_free(sw->tim_pool); +free_alloc: + rte_free(sw); return -1; } -static int -sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter) +static void +swtim_free_tim(struct rte_timer *tim, void *arg) { - int ret; - struct msg *m1, *m2; - struct rte_event_timer_adapter_sw_data *sw_data = - adapter->data->adapter_priv; - - rte_spinlock_lock(&sw_data->msgs_tailq_sl); + struct swtim *sw = arg; - /* Cancel outstanding rte_timers and free msg objects */ - m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head); - while (m1 != NULL) { - EVTIM_LOG_DBG("freeing outstanding timer"); - m2 = TAILQ_NEXT(m1, msgs); - - rte_timer_stop_sync(&m1->tim); - rte_mempool_put(sw_data->msg_pool, m1); + rte_mempool_put(sw->tim_pool, tim); +} - m1 = m2; - } +/* Traverse the list of outstanding timers and put them back in the mempool + * before freeing the adapter to avoid leaking the memory. + */ +static int +swtim_uninit(struct rte_event_timer_adapter *adapter) +{ + int ret; + struct swtim *sw = swtim_pmd_priv(adapter); - rte_spinlock_unlock(&sw_data->msgs_tailq_sl); + /* Free outstanding timers */ + rte_timer_stop_all(sw->timer_data_id, + sw->poll_lcores, + sw->n_poll_lcores, + swtim_free_tim, + sw); - ret = rte_service_component_unregister(sw_data->service_id); + ret = rte_service_component_unregister(sw->service_id); if (ret < 0) { EVTIM_LOG_ERR("failed to unregister service component"); return ret; } - rte_ring_free(sw_data->msg_ring); - rte_mempool_free(sw_data->msg_pool); - rte_free(adapter->data->adapter_priv); + rte_mempool_free(sw->tim_pool); + rte_free(sw); + adapter->data->adapter_priv = NULL; return 0; } @@ -1016,88 +934,79 @@ get_mapped_count_for_service(uint32_t service_id) } static int -sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter) +swtim_start(const struct rte_event_timer_adapter *adapter) { int mapped_count; - struct rte_event_timer_adapter_sw_data *sw_data; - - sw_data = adapter->data->adapter_priv; + struct swtim *sw = swtim_pmd_priv(adapter); /* Mapping the service to more than one service core can introduce * delays while one thread is waiting to acquire a lock, so only allow * one core to be mapped to the service. + * + * Note: the service could be modified such that it spreads cores to + * poll over multiple service instances. */ - mapped_count = get_mapped_count_for_service(sw_data->service_id); + mapped_count = get_mapped_count_for_service(sw->service_id); - if (mapped_count == 1) - return rte_service_component_runstate_set(sw_data->service_id, - 1); + if (mapped_count != 1) + return mapped_count < 1 ? -ENOENT : -ENOTSUP; - return mapped_count < 1 ? -ENOENT : -ENOTSUP; + return rte_service_component_runstate_set(sw->service_id, 1); } static int -sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter) +swtim_stop(const struct rte_event_timer_adapter *adapter) { int ret; - struct rte_event_timer_adapter_sw_data *sw_data = - adapter->data->adapter_priv; + struct swtim *sw = swtim_pmd_priv(adapter); - ret = rte_service_component_runstate_set(sw_data->service_id, 0); + ret = rte_service_component_runstate_set(sw->service_id, 0); if (ret < 0) return ret; - /* Wait for the service to complete its final iteration before - * stopping. - */ - while (sw_data->service_phase != 0) + /* Wait for the service to complete its final iteration */ + while (rte_service_may_be_active(sw->service_id)) rte_pause(); - rte_smp_rmb(); - return 0; } static void -sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter, +swtim_get_info(const struct rte_event_timer_adapter *adapter, struct rte_event_timer_adapter_info *adapter_info) { - struct rte_event_timer_adapter_sw_data *sw_data; - sw_data = adapter->data->adapter_priv; - - adapter_info->min_resolution_ns = sw_data->timer_tick_ns; - adapter_info->max_tmo_ns = sw_data->max_tmo_ns; + struct swtim *sw = swtim_pmd_priv(adapter); + adapter_info->min_resolution_ns = sw->timer_tick_ns; + adapter_info->max_tmo_ns = sw->max_tmo_ns; } static int -sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter, - struct rte_event_timer_adapter_stats *stats) +swtim_stats_get(const struct rte_event_timer_adapter *adapter, + struct rte_event_timer_adapter_stats *stats) { - struct rte_event_timer_adapter_sw_data *sw_data; - sw_data = adapter->data->adapter_priv; - *stats = sw_data->stats; + struct swtim *sw = swtim_pmd_priv(adapter); + *stats = sw->stats; /* structure copy */ return 0; } static int -sw_event_timer_adapter_stats_reset( - const struct rte_event_timer_adapter *adapter) +swtim_stats_reset(const struct rte_event_timer_adapter *adapter) { - struct rte_event_timer_adapter_sw_data *sw_data; - sw_data = adapter->data->adapter_priv; - memset(&sw_data->stats, 0, sizeof(sw_data->stats)); + struct swtim *sw = swtim_pmd_priv(adapter); + memset(&sw->stats, 0, sizeof(sw->stats)); return 0; } -static __rte_always_inline uint16_t -__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, - struct rte_event_timer **evtims, - uint16_t nb_evtims) +static uint16_t +__swtim_arm_burst(const struct rte_event_timer_adapter *adapter, + struct rte_event_timer **evtims, + uint16_t nb_evtims) { - uint16_t i; - int ret; - struct rte_event_timer_adapter_sw_data *sw_data; - struct msg *msgs[nb_evtims]; + int i, ret; + struct swtim *sw = swtim_pmd_priv(adapter); + uint32_t lcore_id = rte_lcore_id(); + struct rte_timer *tim, *tims[nb_evtims]; + uint64_t cycles; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1107,101 +1016,103 @@ __sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, } #endif - sw_data = adapter->data->adapter_priv; + /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of + * the highest lcore to insert such timers into + */ + if (lcore_id == LCORE_ID_ANY) + lcore_id = RTE_MAX_LCORE - 1; + + /* If this is the first time we're arming an event timer on this lcore, + * mark this lcore as "in use"; this will cause the service + * function to process the timer list that corresponds to this lcore. + */ + if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) { + EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll", + lcore_id); + sw->poll_lcores[sw->n_poll_lcores] = lcore_id; + ++sw->n_poll_lcores; + } - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims); + ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims, + nb_evtims); if (ret < 0) { rte_errno = ENOSPC; return 0; } - /* Let the service know we're producing messages for it to process */ - rte_atomic16_inc(&sw_data->message_producer_count); - - /* If the service is managing timers, wait for it to finish */ - while (sw_data->service_phase == 2) - rte_pause(); - - rte_smp_rmb(); - for (i = 0; i < nb_evtims; i++) { /* Don't modify the event timer state in these cases */ if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { rte_errno = EALREADY; break; } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { + evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { rte_errno = EINVAL; break; } ret = check_timeout(evtims[i], adapter); - if (ret == -1) { + if (unlikely(ret == -1)) { evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; rte_errno = EINVAL; break; - } - if (ret == -2) { + } else if (unlikely(ret == -2)) { evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; rte_errno = EINVAL; break; } - if (check_destination_event_queue(evtims[i], adapter) < 0) { + if (unlikely(check_destination_event_queue(evtims[i], + adapter) < 0)) { evtims[i]->state = RTE_EVENT_TIMER_ERROR; rte_errno = EINVAL; break; } - /* Checks passed, set up a message to enqueue */ - msgs[i]->type = MSG_TYPE_ARM; - msgs[i]->evtim = evtims[i]; + tim = tims[i]; + rte_timer_init(tim); - /* Set the payload pointer if not set. */ - if (evtims[i]->ev.event_ptr == NULL) - evtims[i]->ev.event_ptr = evtims[i]; + evtims[i]->impl_opaque[0] = (uintptr_t)tim; + evtims[i]->impl_opaque[1] = (uintptr_t)adapter; - /* msg objects that get enqueued successfully will be freed - * either by a future cancel operation or by the timer - * expiration callback. - */ - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) { - rte_errno = ENOSPC; + cycles = get_timeout_cycles(evtims[i], adapter); + ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles, + SINGLE, lcore_id, NULL, evtims[i]); + if (ret < 0) { + /* tim was in RUNNING or CONFIG state */ + evtims[i]->state = RTE_EVENT_TIMER_ERROR; break; } - EVTIM_LOG_DBG("enqueued ARM message to ring"); - + rte_smp_wmb(); + EVTIM_LOG_DBG("armed an event timer"); evtims[i]->state = RTE_EVENT_TIMER_ARMED; } - /* Let the service know we're done producing messages */ - rte_atomic16_dec(&sw_data->message_producer_count); - if (i < nb_evtims) - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i], - nb_evtims - i); + rte_mempool_put_bulk(sw->tim_pool, + (void **)&tims[i], nb_evtims - i); return i; } static uint16_t -sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter, - struct rte_event_timer **evtims, - uint16_t nb_evtims) +swtim_arm_burst(const struct rte_event_timer_adapter *adapter, + struct rte_event_timer **evtims, + uint16_t nb_evtims) { - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims); + return __swtim_arm_burst(adapter, evtims, nb_evtims); } static uint16_t -sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, - struct rte_event_timer **evtims, - uint16_t nb_evtims) +swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, + struct rte_event_timer **evtims, + uint16_t nb_evtims) { - uint16_t i; - int ret; - struct rte_event_timer_adapter_sw_data *sw_data; - struct msg *msgs[nb_evtims]; + int i, ret; + struct rte_timer *timp; + uint64_t opaque; + struct swtim *sw = swtim_pmd_priv(adapter); #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1211,23 +1122,6 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, } #endif - sw_data = adapter->data->adapter_priv; - - ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims); - if (ret < 0) { - rte_errno = ENOSPC; - return 0; - } - - /* Let the service know we're producing messages for it to process */ - rte_atomic16_inc(&sw_data->message_producer_count); - - /* If the service could be modifying event timer states, wait */ - while (sw_data->service_phase == 2) - rte_pause(); - - rte_smp_rmb(); - for (i = 0; i < nb_evtims; i++) { /* Don't modify the event timer state in these cases */ if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { @@ -1238,54 +1132,56 @@ sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter, break; } - msgs[i]->type = MSG_TYPE_CANCEL; - msgs[i]->evtim = evtims[i]; + rte_smp_rmb(); + + opaque = evtims[i]->impl_opaque[0]; + timp = (struct rte_timer *)(uintptr_t)opaque; + RTE_ASSERT(timp != NULL); - if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) { - rte_errno = ENOSPC; + ret = rte_timer_alt_stop(sw->timer_data_id, timp); + if (ret < 0) { + /* Timer is running or being configured */ + rte_errno = EAGAIN; break; } - EVTIM_LOG_DBG("enqueued CANCEL message to ring"); + rte_mempool_put(sw->tim_pool, (void **)timp); evtims[i]->state = RTE_EVENT_TIMER_CANCELED; - } + evtims[i]->impl_opaque[0] = 0; + evtims[i]->impl_opaque[1] = 0; - /* Let the service know we're done producing messages */ - rte_atomic16_dec(&sw_data->message_producer_count); - - if (i < nb_evtims) - rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i], - nb_evtims - i); + rte_smp_wmb(); + } return i; } static uint16_t -sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter, - struct rte_event_timer **evtims, - uint64_t timeout_ticks, - uint16_t nb_evtims) +swtim_arm_tmo_tick_burst(const struct rte_event_timer_adapter *adapter, + struct rte_event_timer **evtims, + uint64_t timeout_ticks, + uint16_t nb_evtims) { int i; for (i = 0; i < nb_evtims; i++) evtims[i]->timeout_ticks = timeout_ticks; - return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims); + return __swtim_arm_burst(adapter, evtims, nb_evtims); } -static const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = { - .init = sw_event_timer_adapter_init, - .uninit = sw_event_timer_adapter_uninit, - .start = sw_event_timer_adapter_start, - .stop = sw_event_timer_adapter_stop, - .get_info = sw_event_timer_adapter_get_info, - .stats_get = sw_event_timer_adapter_stats_get, - .stats_reset = sw_event_timer_adapter_stats_reset, - .arm_burst = sw_event_timer_arm_burst, - .arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst, - .cancel_burst = sw_event_timer_cancel_burst, +static const struct rte_event_timer_adapter_ops swtim_ops = { + .init = swtim_init, + .uninit = swtim_uninit, + .start = swtim_start, + .stop = swtim_stop, + .get_info = swtim_get_info, + .stats_get = swtim_stats_get, + .stats_reset = swtim_stats_reset, + .arm_burst = swtim_arm_burst, + .arm_tmo_tick_burst = swtim_arm_tmo_tick_burst, + .cancel_burst = swtim_cancel_burst, }; RTE_INIT(event_timer_adapter_init_log)