--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2017 Intel Corporation
+ */
+
+#ifndef _IQ_CHUNK_H_
+#define _IQ_CHUNK_H_
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <rte_eventdev.h>
+
+#define IQ_ROB_NAMESIZE 12
+
+struct sw_queue_chunk {
+ struct rte_event events[SW_EVS_PER_Q_CHUNK];
+ struct sw_queue_chunk *next;
+} __rte_cache_aligned;
+
+static __rte_always_inline bool
+iq_empty(struct sw_iq *iq)
+{
+ return (iq->count == 0);
+}
+
+static __rte_always_inline uint16_t
+iq_count(const struct sw_iq *iq)
+{
+ return iq->count;
+}
+
+static __rte_always_inline struct sw_queue_chunk *
+iq_alloc_chunk(struct sw_evdev *sw)
+{
+ struct sw_queue_chunk *chunk = sw->chunk_list_head;
+ sw->chunk_list_head = chunk->next;
+ chunk->next = NULL;
+ return chunk;
+}
+
+static __rte_always_inline void
+iq_free_chunk(struct sw_evdev *sw, struct sw_queue_chunk *chunk)
+{
+ chunk->next = sw->chunk_list_head;
+ sw->chunk_list_head = chunk;
+}
+
+static __rte_always_inline void
+iq_init(struct sw_evdev *sw, struct sw_iq *iq)
+{
+ iq->head = iq_alloc_chunk(sw);
+ iq->tail = iq->head;
+ iq->head_idx = 0;
+ iq->tail_idx = 0;
+}
+
+static __rte_always_inline void
+iq_enqueue(struct sw_evdev *sw, struct sw_iq *iq, const struct rte_event *ev)
+{
+ iq->tail->events[iq->tail_idx++] = *ev;
+ iq->count++;
+
+ if (unlikely(iq->tail_idx == SW_EVS_PER_Q_CHUNK)) {
+ /* The number of chunks is defined in relation to the total
+ * number of inflight events and number of IQS such that
+ * allocation will always succeed.
+ */
+ struct sw_queue_chunk *chunk = iq_alloc_chunk(sw);
+ iq->tail->next = chunk;
+ iq->tail = chunk;
+ iq->tail_idx = 0;
+ }
+}
+
+static __rte_always_inline void
+iq_pop(struct sw_evdev *sw, struct sw_iq *iq)
+{
+ iq->head_idx++;
+ iq->count--;
+
+ if (unlikely(iq->head_idx == SW_EVS_PER_Q_CHUNK)) {
+ struct sw_queue_chunk *next = iq->head->next;
+ iq_free_chunk(sw, iq->head);
+ iq->head = next;
+ iq->head_idx = 0;
+ }
+}
+
+static __rte_always_inline const struct rte_event *
+iq_peek(struct sw_iq *iq)
+{
+ return &iq->head->events[iq->head_idx];
+}
+
+/* Note: the caller must ensure that count <= iq_count() */
+static __rte_always_inline uint16_t
+iq_dequeue_burst(struct sw_evdev *sw,
+ struct sw_iq *iq,
+ struct rte_event *ev,
+ uint16_t count)
+{
+ struct sw_queue_chunk *current;
+ uint16_t total, index;
+
+ count = RTE_MIN(count, iq_count(iq));
+
+ current = iq->head;
+ index = iq->head_idx;
+ total = 0;
+
+ /* Loop over the chunks */
+ while (1) {
+ struct sw_queue_chunk *next;
+ for (; index < SW_EVS_PER_Q_CHUNK;) {
+ ev[total++] = current->events[index++];
+
+ if (unlikely(total == count))
+ goto done;
+ }
+
+ /* Move to the next chunk */
+ next = current->next;
+ iq_free_chunk(sw, current);
+ current = next;
+ index = 0;
+ }
+
+done:
+ if (unlikely(index == SW_EVS_PER_Q_CHUNK)) {
+ struct sw_queue_chunk *next = iq->head->next;
+ iq_free_chunk(sw, current);
+ iq->head = next;
+ iq->head_idx = 0;
+ } else {
+ iq->head = current;
+ iq->head_idx = index;
+ }
+
+ iq->count -= total;
+
+ return total;
+}
+
+static __rte_always_inline void
+iq_put_back(struct sw_evdev *sw,
+ struct sw_iq *iq,
+ struct rte_event *ev,
+ unsigned int count)
+{
+ /* Put back events that fit in the current head chunk. If necessary,
+ * put back events in a new head chunk. The caller must ensure that
+ * count <= SW_EVS_PER_Q_CHUNK, to ensure that at most one new head is
+ * needed.
+ */
+ uint16_t avail_space = iq->head_idx;
+
+ if (avail_space >= count) {
+ const uint16_t idx = avail_space - count;
+ uint16_t i;
+
+ for (i = 0; i < count; i++)
+ iq->head->events[idx + i] = ev[i];
+
+ iq->head_idx = idx;
+ } else if (avail_space < count) {
+ const uint16_t remaining = count - avail_space;
+ struct sw_queue_chunk *new_head;
+ uint16_t i;
+
+ for (i = 0; i < avail_space; i++)
+ iq->head->events[i] = ev[remaining + i];
+
+ new_head = iq_alloc_chunk(sw);
+ new_head->next = iq->head;
+ iq->head = new_head;
+ iq->head_idx = SW_EVS_PER_Q_CHUNK - remaining;
+
+ for (i = 0; i < remaining; i++)
+ iq->head->events[iq->head_idx + i] = ev[i];
+ }
+
+ iq->count += count;
+}
+
+#endif /* _IQ_CHUNK_H_ */
+++ /dev/null
-/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016-2017 Intel Corporation
- */
-
-/*
- * Ring structure definitions used for the internal ring buffers of the
- * SW eventdev implementation. These are designed for single-core use only.
- */
-#ifndef _IQ_RING_
-#define _IQ_RING_
-
-#include <stdint.h>
-
-#include <rte_common.h>
-#include <rte_memory.h>
-#include <rte_malloc.h>
-#include <rte_eventdev.h>
-
-#define IQ_RING_NAMESIZE 12
-#define QID_IQ_DEPTH 512
-#define QID_IQ_MASK (uint16_t)(QID_IQ_DEPTH - 1)
-
-struct iq_ring {
- char name[IQ_RING_NAMESIZE] __rte_cache_aligned;
- uint16_t write_idx;
- uint16_t read_idx;
-
- struct rte_event ring[QID_IQ_DEPTH];
-};
-
-static inline struct iq_ring *
-iq_ring_create(const char *name, unsigned int socket_id)
-{
- struct iq_ring *retval;
-
- retval = rte_malloc_socket(NULL, sizeof(*retval), 0, socket_id);
- if (retval == NULL)
- goto end;
-
- snprintf(retval->name, sizeof(retval->name), "%s", name);
- retval->write_idx = retval->read_idx = 0;
-end:
- return retval;
-}
-
-static inline void
-iq_ring_destroy(struct iq_ring *r)
-{
- rte_free(r);
-}
-
-static __rte_always_inline uint16_t
-iq_ring_count(const struct iq_ring *r)
-{
- return r->write_idx - r->read_idx;
-}
-
-static __rte_always_inline uint16_t
-iq_ring_free_count(const struct iq_ring *r)
-{
- return QID_IQ_MASK - iq_ring_count(r);
-}
-
-static __rte_always_inline uint16_t
-iq_ring_enqueue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
- const uint16_t read = r->read_idx;
- uint16_t write = r->write_idx;
- const uint16_t space = read + QID_IQ_MASK - write;
- uint16_t i;
-
- if (space < nb_qes)
- nb_qes = space;
-
- for (i = 0; i < nb_qes; i++, write++)
- r->ring[write & QID_IQ_MASK] = qes[i];
-
- r->write_idx = write;
-
- return nb_qes;
-}
-
-static __rte_always_inline uint16_t
-iq_ring_dequeue_burst(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
- uint16_t read = r->read_idx;
- const uint16_t write = r->write_idx;
- const uint16_t items = write - read;
- uint16_t i;
-
- for (i = 0; i < nb_qes; i++, read++)
- qes[i] = r->ring[read & QID_IQ_MASK];
-
- if (items < nb_qes)
- nb_qes = items;
-
- r->read_idx += nb_qes;
-
- return nb_qes;
-}
-
-/* assumes there is space, from a previous dequeue_burst */
-static __rte_always_inline uint16_t
-iq_ring_put_back(struct iq_ring *r, struct rte_event *qes, uint16_t nb_qes)
-{
- uint16_t i, read = r->read_idx;
-
- for (i = nb_qes; i-- > 0; )
- r->ring[--read & QID_IQ_MASK] = qes[i];
-
- r->read_idx = read;
- return nb_qes;
-}
-
-static __rte_always_inline const struct rte_event *
-iq_ring_peek(const struct iq_ring *r)
-{
- return &r->ring[r->read_idx & QID_IQ_MASK];
-}
-
-static __rte_always_inline void
-iq_ring_pop(struct iq_ring *r)
-{
- r->read_idx++;
-}
-
-static __rte_always_inline int
-iq_ring_enqueue(struct iq_ring *r, const struct rte_event *qe)
-{
- const uint16_t read = r->read_idx;
- const uint16_t write = r->write_idx;
- const uint16_t space = read + QID_IQ_MASK - write;
-
- if (space == 0)
- return -1;
-
- r->ring[write & QID_IQ_MASK] = *qe;
-
- r->write_idx = write + 1;
-
- return 0;
-}
-
-#endif
#include <rte_service_component.h>
#include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
#define EVENTDEV_NAME_SW_PMD event_sw
#define NUMA_NODE_ARG "numa_node"
unsigned int i;
int dev_id = sw->data->dev_id;
int socket_id = sw->data->socket_id;
- char buf[IQ_RING_NAMESIZE];
+ char buf[IQ_ROB_NAMESIZE];
struct sw_qid *qid = &sw->qids[idx];
- for (i = 0; i < SW_IQS_MAX; i++) {
- snprintf(buf, sizeof(buf), "q_%u_iq_%d", idx, i);
- qid->iq[i] = iq_ring_create(buf, socket_id);
- if (!qid->iq[i]) {
- SW_LOG_DBG("ring create failed");
- goto cleanup;
- }
- }
+ for (i = 0; i < SW_IQS_MAX; i++)
+ iq_init(sw, &qid->iq[i]);
/* Initialize the FID structures to no pinning (-1), and zero packets */
const struct sw_fid_t fid = {.cq = -1, .pcount = 0};
cleanup:
for (i = 0; i < SW_IQS_MAX; i++) {
- if (qid->iq[i])
- iq_ring_destroy(qid->iq[i]);
+ if (qid->iq[i].head)
+ iq_free_chunk(sw, qid->iq[i].head);
}
if (qid->reorder_buffer) {
struct sw_qid *qid = &sw->qids[id];
uint32_t i;
- for (i = 0; i < SW_IQS_MAX; i++)
- iq_ring_destroy(qid->iq[i]);
+ for (i = 0; i < SW_IQS_MAX; i++) {
+ if (!qid->iq[i].head)
+ continue;
+ iq_free_chunk(sw, qid->iq[i].head);
+ }
if (qid->type == RTE_SCHED_TYPE_ORDERED) {
rte_free(qid->reorder_buffer);
struct sw_evdev *sw = sw_pmd_priv(dev);
const struct rte_eventdev_data *data = dev->data;
const struct rte_event_dev_config *conf = &data->dev_conf;
+ int num_chunks, i;
sw->qid_count = conf->nb_event_queues;
sw->port_count = conf->nb_event_ports;
sw->nb_events_limit = conf->nb_events_limit;
rte_atomic32_set(&sw->inflights, 0);
+ /* Number of chunks sized for worst-case spread of events across IQs */
+ num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) +
+ sw->qid_count*SW_IQS_MAX*2;
+
+ /* If this is a reconfiguration, free the previous IQ allocation */
+ if (sw->chunks)
+ rte_free(sw->chunks);
+
+ sw->chunks = rte_malloc_socket(NULL,
+ sizeof(struct sw_queue_chunk) *
+ num_chunks,
+ 0,
+ sw->data->socket_id);
+ if (!sw->chunks)
+ return -ENOMEM;
+
+ sw->chunk_list_head = NULL;
+ for (i = 0; i < num_chunks; i++)
+ iq_free_chunk(sw, &sw->chunks[i]);
+
if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
return -ENOTSUP;
uint32_t iq;
uint32_t iq_printed = 0;
for (iq = 0; iq < SW_IQS_MAX; iq++) {
- if (!qid->iq[iq]) {
+ if (!qid->iq[iq].head) {
fprintf(f, "\tiq %d is not initialized.\n", iq);
iq_printed = 1;
continue;
}
- uint32_t used = iq_ring_count(qid->iq[iq]);
- uint32_t free = iq_ring_free_count(qid->iq[iq]);
- const char *col = (free == 0) ? COL_RED : COL_RESET;
+ uint32_t used = iq_count(&qid->iq[iq]);
+ const char *col = COL_RESET;
if (used > 0) {
- fprintf(f, "\t%siq %d: Used %d\tFree %d"
- COL_RESET"\n", col, iq, used, free);
+ fprintf(f, "\t%siq %d: Used %d"
+ COL_RESET"\n", col, iq, used);
iq_printed = 1;
}
}
/* check all queues are configured and mapped to ports*/
for (i = 0; i < sw->qid_count; i++)
- if (sw->qids[i].iq[0] == NULL ||
+ if (sw->qids[i].iq[0].head == NULL ||
sw->qids[i].cq_num_mapped_cqs == 0) {
SW_LOG_ERR("Queue %d not configured\n", i);
return -ENOLINK;
#define MAX_SW_PROD_Q_DEPTH 4096
#define SW_FRAGMENTS_MAX 16
+/* Should be power-of-two minus one, to leave room for the next pointer */
+#define SW_EVS_PER_Q_CHUNK 255
+#define SW_Q_CHUNK_SIZE ((SW_EVS_PER_Q_CHUNK + 1) * sizeof(struct rte_event))
+
/* report dequeue burst sizes in buckets */
#define SW_DEQ_STAT_BUCKET_SHIFT 2
/* how many packets pulled from port by sched */
struct rte_event fragments[SW_FRAGMENTS_MAX];
};
+struct sw_iq {
+ struct sw_queue_chunk *head;
+ struct sw_queue_chunk *tail;
+ uint16_t head_idx;
+ uint16_t tail_idx;
+ uint16_t count;
+};
+
struct sw_qid {
/* set when the QID has been initialized */
uint8_t initialized;
struct sw_point_stats stats;
/* Internal priority rings for packets */
- struct iq_ring *iq[SW_IQS_MAX];
+ struct sw_iq iq[SW_IQS_MAX];
uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */
uint64_t iq_pkt_count[SW_IQS_MAX];
/* Internal queues - one per logical queue */
struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned;
+ struct sw_queue_chunk *chunk_list_head;
+ struct sw_queue_chunk *chunks;
/* Cache how many packets are in each cq */
uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned;
#include <rte_hash_crc.h>
#include <rte_event_ring.h>
#include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
#define SW_IQS_MASK (SW_IQS_MAX-1)
*/
uint32_t qid_id = qid->id;
- iq_ring_dequeue_burst(qid->iq[iq_num], qes, count);
+ iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count);
for (i = 0; i < count; i++) {
const struct rte_event *qe = &qes[i];
const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id);
p->cq_buf_count = 0;
}
}
- iq_ring_put_back(qid->iq[iq_num], blocked_qes, nb_blocked);
+ iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked);
return count - nb_blocked;
}
rte_ring_count(qid->reorder_buffer_freelist));
for (i = 0; i < count; i++) {
- const struct rte_event *qe = iq_ring_peek(qid->iq[iq_num]);
+ const struct rte_event *qe = iq_peek(&qid->iq[iq_num]);
uint32_t cq_check_count = 0;
uint32_t cq;
(void *)&p->hist_list[head].rob_entry);
sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
- iq_ring_pop(qid->iq[iq_num]);
+ iq_pop(sw, &qid->iq[iq_num]);
rte_compiler_barrier();
p->inflights++;
return 0;
/* burst dequeue from the QID IQ ring */
- struct iq_ring *ring = qid->iq[iq_num];
- uint32_t ret = iq_ring_dequeue_burst(ring,
+ struct sw_iq *iq = &qid->iq[iq_num];
+ uint32_t ret = iq_dequeue_burst(sw, iq,
&port->cq_buf[port->cq_buf_count], count_free);
port->cq_buf_count += ret;
continue;
uint32_t pkts_done = 0;
- uint32_t count = iq_ring_count(qid->iq[iq_num]);
+ uint32_t count = iq_count(&qid->iq[iq_num]);
if (count > 0) {
if (type == SW_SCHED_TYPE_DIRECT)
continue;
}
- struct sw_qid *dest_qid_ptr =
- &sw->qids[dest_qid];
- const struct iq_ring *dest_iq_ptr =
- dest_qid_ptr->iq[dest_iq];
- if (iq_ring_free_count(dest_iq_ptr) == 0)
- break;
-
pkts_iter++;
struct sw_qid *q = &sw->qids[dest_qid];
- struct iq_ring *r = q->iq[dest_iq];
+ struct sw_iq *iq = &q->iq[dest_iq];
/* we checked for space above, so enqueue must
* succeed
*/
- iq_ring_enqueue(r, qe);
+ iq_enqueue(sw, iq, qe);
q->iq_pkt_mask |= (1 << (dest_iq));
q->iq_pkt_count[dest_iq]++;
q->stats.rx_pkts++;
uint32_t iq_num = PRIO_TO_IQ(qe->priority);
struct sw_qid *qid = &sw->qids[qe->queue_id];
- if ((flags & QE_FLAG_VALID) &&
- iq_ring_free_count(qid->iq[iq_num]) == 0)
- break;
-
/* now process based on flags. Note that for directed
* queues, the enqueue_flush masks off all but the
* valid flag. This makes FWD and PARTIAL enqueues just
*/
qid->iq_pkt_mask |= (1 << (iq_num));
- iq_ring_enqueue(qid->iq[iq_num], qe);
+ iq_enqueue(sw, &qid->iq[iq_num], qe);
qid->iq_pkt_count[iq_num]++;
qid->stats.rx_pkts++;
pkts_iter++;
uint32_t iq_num = PRIO_TO_IQ(qe->priority);
struct sw_qid *qid = &sw->qids[qe->queue_id];
- struct iq_ring *iq_ring = qid->iq[iq_num];
-
- if (iq_ring_free_count(iq_ring) == 0)
- break; /* move to next port */
+ struct sw_iq *iq = &qid->iq[iq_num];
port->stats.rx_pkts++;
* into the qid at the right priority
*/
qid->iq_pkt_mask |= (1 << (iq_num));
- iq_ring_enqueue(iq_ring, qe);
+ iq_enqueue(sw, iq, qe);
qid->iq_pkt_count[iq_num]++;
qid->stats.rx_pkts++;
pkts_iter++;
#include <rte_event_ring.h>
#include "sw_evdev.h"
-#include "iq_ring.h"
+#include "iq_chunk.h"
enum xstats_type {
/* common stats */
pkt_cycles,
poll_return, /* for zero-count and used also for port bucket loop */
/* qid_specific */
- iq_size,
iq_used,
/* qid port mapping specific */
pinned,
return infl;
} while (0);
break;
- case iq_size: return RTE_DIM(qid->iq[0]->ring);
default: return -1;
}
}
const int iq_idx = extra_arg;
switch (type) {
- case iq_used: return iq_ring_count(qid->iq[iq_idx]);
+ case iq_used: return iq_count(&qid->iq[iq_idx]);
default: return -1;
}
}
/* all bucket dequeues are allowed to be reset, handled in loop below */
static const char * const qid_stats[] = {"rx", "tx", "drop",
- "inflight", "iq_size"
+ "inflight"
};
static const enum xstats_type qid_types[] = { rx, tx, dropped,
- inflight, iq_size
+ inflight
};
static const uint8_t qid_reset_allowed[] = {1, 1, 1,
- 0, 0
+ 0
};
static const char * const qid_iq_stats[] = { "used" };
ret = rte_event_dev_xstats_names_get(evdev,
RTE_EVENT_DEV_XSTATS_QUEUE,
0, xstats_names, ids, XSTATS_MAX);
- if (ret != 17) {
- printf("%d: expected 17 stats, got return %d\n", __LINE__, ret);
+ if (ret != 16) {
+ printf("%d: expected 16 stats, got return %d\n", __LINE__, ret);
return -1;
}
ret = rte_event_dev_xstats_get(evdev,
RTE_EVENT_DEV_XSTATS_QUEUE,
0, ids, values, ret);
- if (ret != 17) {
- printf("%d: expected 17 stats, got return %d\n", __LINE__, ret);
+ if (ret != 16) {
+ printf("%d: expected 16 stats, got return %d\n", __LINE__, ret);
return -1;
}
3 /* tx */,
0 /* drop */,
3 /* inflights */,
- 512 /* iq size */,
0, 0, 0, 0, /* iq 0, 1, 2, 3 used */
/* QID-to-Port: pinned_flows, packets */
0, 0,
0 /* tx */,
0 /* drop */,
3 /* inflight */,
- 512 /* iq size */,
0, 0, 0, 0, /* 4 iq used */
/* QID-to-Port: pinned_flows, packets */
0, 0,
goto fail;
/* num queue stats */
-#define NUM_Q_STATS 17
+#define NUM_Q_STATS 16
/* queue offset from start of the devices whole xstats.
* This will break every time we add a statistic to a device/port/queue
*/
"qid_0_tx",
"qid_0_drop",
"qid_0_inflight",
- "qid_0_iq_size",
"qid_0_iq_0_used",
"qid_0_iq_1_used",
"qid_0_iq_2_used",
7, /* tx */
0, /* drop */
7, /* inflight */
- 512, /* iq size */
0, /* iq 0 used */
0, /* iq 1 used */
0, /* iq 2 used */
0, /* tx */
0, /* drop */
7, /* inflight */
- 512, /* iq size */
0, /* iq 0 used */
0, /* iq 1 used */
0, /* iq 2 used */