#define BATCH_SIZE 32
#define BLOCK_CNT_THRESHOLD 10
#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE 1024
+#define MIN_VECTOR_SIZE 4
+#define MAX_VECTOR_NS 1E9
+#define MIN_VECTOR_NS 1E5
#define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
#define ETH_RX_ADAPTER_MEM_NAME_LEN 32
uint16_t eth_rx_qid;
};
+struct eth_rx_vector_data {
+ TAILQ_ENTRY(eth_rx_vector_data) next;
+ uint16_t port;
+ uint16_t queue;
+ uint16_t max_vector_count;
+ uint64_t event;
+ uint64_t ts;
+ uint64_t vector_timeout_ticks;
+ struct rte_mempool *vector_pool;
+ struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
/* Instance per adapter */
struct rte_eth_event_enqueue_buffer {
/* Count of events in this buffer */
uint32_t wrr_pos;
/* Event burst buffer */
struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+ /* Vector enable flag */
+ uint8_t ena_vector;
+ /* Timestamp of previous vector expiry list traversal */
+ uint64_t prev_expiry_ts;
+ /* Minimum ticks to wait before traversing expiry list */
+ uint64_t vector_tmo_ticks;
+ /* vector list */
+ struct eth_rx_vector_data_list vector_list;
/* Per adapter stats */
struct rte_event_eth_rx_adapter_stats stats;
/* Block count, counts up to BLOCK_CNT_THRESHOLD */
struct eth_rx_queue_info {
int queue_enabled; /* True if added */
int intr_enabled;
+ uint8_t ena_vector;
uint16_t wt; /* Polling weight */
uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
uint64_t event;
+ struct eth_rx_vector_data vector_data;
};
static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
&rx_adapter->event_enqueue_buffer;
struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+ if (!buf->count)
+ return 0;
+
uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
rx_adapter->event_port_id,
buf->events,
return n;
}
+static inline void
+rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_rx_vector_data *vec)
+{
+ vec->vector_ev->nb_elem = 0;
+ vec->vector_ev->port = vec->port;
+ vec->vector_ev->queue = vec->queue;
+ vec->vector_ev->attr_valid = true;
+ TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+}
+
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+ struct eth_rx_queue_info *queue_info,
+ struct rte_eth_event_enqueue_buffer *buf,
+ struct rte_mbuf **mbufs, uint16_t num)
+{
+ struct rte_event *ev = &buf->events[buf->count];
+ struct eth_rx_vector_data *vec;
+ uint16_t filled, space, sz;
+
+ filled = 0;
+ vec = &queue_info->vector_data;
+
+ if (vec->vector_ev == NULL) {
+ if (rte_mempool_get(vec->vector_pool,
+ (void **)&vec->vector_ev) < 0) {
+ rte_pktmbuf_free_bulk(mbufs, num);
+ return 0;
+ }
+ rxa_init_vector(rx_adapter, vec);
+ }
+ while (num) {
+ if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+ /* Event ready. */
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ ev++;
+ filled++;
+ vec->vector_ev = NULL;
+ TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+ if (rte_mempool_get(vec->vector_pool,
+ (void **)&vec->vector_ev) < 0) {
+ rte_pktmbuf_free_bulk(mbufs, num);
+ return 0;
+ }
+ rxa_init_vector(rx_adapter, vec);
+ }
+
+ space = vec->max_vector_count - vec->vector_ev->nb_elem;
+ sz = num > space ? space : num;
+ memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+ sizeof(void *) * sz);
+ vec->vector_ev->nb_elem += sz;
+ num -= sz;
+ mbufs += sz;
+ vec->ts = rte_rdtsc();
+ }
+
+ if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ ev++;
+ filled++;
+ vec->vector_ev = NULL;
+ TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+ }
+
+ return filled;
+}
+
static inline void
rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t eth_dev_id,
uint16_t nb_cb;
uint16_t dropped;
- /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
- rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
- do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
-
- for (i = 0; i < num; i++) {
- m = mbufs[i];
-
- rss = do_rss ?
- rxa_do_softrss(m, rx_adapter->rss_key_be) :
- m->hash.rss;
- ev->event = event;
- ev->flow_id = (rss & ~flow_id_mask) |
- (ev->flow_id & flow_id_mask);
- ev->mbuf = m;
- ev++;
+ if (!eth_rx_queue_info->ena_vector) {
+ /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
+ rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
+ do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
+ for (i = 0; i < num; i++) {
+ m = mbufs[i];
+
+ rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+ : m->hash.rss;
+ ev->event = event;
+ ev->flow_id = (rss & ~flow_id_mask) |
+ (ev->flow_id & flow_id_mask);
+ ev->mbuf = m;
+ ev++;
+ }
+ } else {
+ num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+ buf, mbufs, num);
}
- if (dev_info->cb_fn) {
+ if (num && dev_info->cb_fn) {
dropped = 0;
nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
- ETH_EVENT_BUFFER_SIZE, buf->count, ev,
- num, dev_info->cb_arg, &dropped);
+ ETH_EVENT_BUFFER_SIZE, buf->count,
+ &buf->events[buf->count], num,
+ dev_info->cb_arg, &dropped);
if (unlikely(nb_cb > num))
RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
nb_cb, num);
return nb_rx;
}
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+ struct rte_event_eth_rx_adapter *rx_adapter = arg;
+ struct rte_eth_event_enqueue_buffer *buf =
+ &rx_adapter->event_enqueue_buffer;
+ struct rte_event *ev;
+
+ if (buf->count)
+ rxa_flush_event_buffer(rx_adapter);
+
+ if (vec->vector_ev->nb_elem == 0)
+ return;
+ ev = &buf->events[buf->count];
+
+ /* Event ready. */
+ ev->event = vec->event;
+ ev->vec = vec->vector_ev;
+ buf->count++;
+
+ vec->vector_ev = NULL;
+ vec->ts = 0;
+}
+
static int
rxa_service_func(void *args)
{
return 0;
}
+ if (rx_adapter->ena_vector) {
+ if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+ rx_adapter->vector_tmo_ticks) {
+ struct eth_rx_vector_data *vec;
+
+ TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+ uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+ if (elapsed_time >= vec->vector_timeout_ticks) {
+ rxa_vector_expire(vec, rx_adapter);
+ TAILQ_REMOVE(&rx_adapter->vector_list,
+ vec, next);
+ }
+ }
+ rx_adapter->prev_expiry_ts = rte_rdtsc();
+ }
+ }
+
stats = &rx_adapter->stats;
stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
stats->rx_packets += rxa_poll(rx_adapter);
}
}
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
+ uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+ uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+ struct eth_rx_vector_data *vector_data;
+ uint32_t flow_id;
+
+ vector_data = &queue_info->vector_data;
+ vector_data->max_vector_count = vector_count;
+ vector_data->port = port_id;
+ vector_data->queue = qid;
+ vector_data->vector_pool = mp;
+ vector_data->vector_timeout_ticks =
+ NSEC2TICK(vector_ns, rte_get_timer_hz());
+ vector_data->ts = 0;
+ flow_id = queue_info->event & 0xFFFFF;
+ flow_id =
+ flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
+ vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
static void
rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
struct eth_device_info *dev_info,
int32_t rx_queue_id)
{
+ struct eth_rx_vector_data *vec;
int pollq;
int intrq;
int sintrq;
return;
}
+ /* Push all the partial event vectors to event device. */
+ TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+ if (vec->queue != rx_queue_id)
+ continue;
+ rxa_vector_expire(vec, rx_adapter);
+ TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+ }
+
pollq = rxa_polled_queue(dev_info, rx_queue_id);
intrq = rxa_intr_queue(dev_info, rx_queue_id);
sintrq = rxa_shared_intr(dev_info, rx_queue_id);
}
}
+static void
+rxa_sw_event_vector_configure(
+ struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
+ int rx_queue_id,
+ const struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+ struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+ struct eth_rx_queue_info *queue_info;
+ struct rte_event *qi_ev;
+
+ if (rx_queue_id == -1) {
+ uint16_t nb_rx_queues;
+ uint16_t i;
+
+ nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+ for (i = 0; i < nb_rx_queues; i++)
+ rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
+ config);
+ return;
+ }
+
+ queue_info = &dev_info->rx_queue[rx_queue_id];
+ qi_ev = (struct rte_event *)&queue_info->event;
+ queue_info->ena_vector = 1;
+ qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+ rxa_set_vector_data(queue_info, config->vector_sz,
+ config->vector_timeout_ns, config->vector_mp,
+ rx_queue_id, dev_info->dev->data->port_id);
+ rx_adapter->ena_vector = 1;
+ rx_adapter->vector_tmo_ticks =
+ rx_adapter->vector_tmo_ticks ?
+ RTE_MIN(config->vector_timeout_ns >> 1,
+ rx_adapter->vector_tmo_ticks) :
+ config->vector_timeout_ns >> 1;
+}
+
static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
uint16_t eth_dev_id,
int rx_queue_id,
rx_adapter->conf_cb = conf_cb;
rx_adapter->conf_arg = conf_arg;
rx_adapter->id = id;
+ TAILQ_INIT(&rx_adapter->vector_list);
strcpy(rx_adapter->mem_name, mem_name);
rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
RTE_MAX_ETHPORTS *
return -EINVAL;
}
+ if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+ (queue_conf->rx_queue_flags &
+ RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+ RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+ " eth port: %" PRIu16 " adapter id: %" PRIu8,
+ eth_dev_id, id);
+ return -EINVAL;
+ }
+
if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
(rx_queue_id != -1)) {
RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
return 0;
}
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+ limits->max_sz = MAX_VECTOR_SIZE;
+ limits->min_sz = MIN_VECTOR_SIZE;
+ limits->max_timeout_ns = MAX_VECTOR_NS;
+ limits->min_timeout_ns = MIN_VECTOR_NS;
+
+ return 0;
+}
+
int
rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
int32_t rx_queue_id)
ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
} else {
- ret = -ENOTSUP;
+ rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
+ rx_queue_id, config);
}
return ret;
ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
dev, &rte_eth_devices[eth_port_id], limits);
} else {
- ret = -ENOTSUP;
+ ret = rxa_sw_vector_limits(limits);
}
return ret;