1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2018 Ericsson AB
9 #include <rte_atomic.h>
10 #include <rte_cycles.h>
11 #include <rte_random.h>
14 dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
17 int32_t inflight_credits = port->inflight_credits;
18 int32_t missing_credits = credits - inflight_credits;
19 int32_t total_on_loan;
21 int32_t acquired_credits;
22 int32_t new_total_on_loan;
24 if (likely(missing_credits <= 0)) {
25 port->inflight_credits -= credits;
29 total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
30 available = dsw->max_inflight - total_on_loan;
31 acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
33 if (available < acquired_credits)
36 /* This is a race, no locks are involved, and thus some other
37 * thread can allocate tokens in between the check and the
40 new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
43 if (unlikely(new_total_on_loan > dsw->max_inflight)) {
44 /* Some other port took the last credits */
45 rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
49 DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
52 port->inflight_credits += acquired_credits;
53 port->inflight_credits -= credits;
59 dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
62 port->inflight_credits += credits;
64 if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
65 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
66 int32_t return_credits =
67 port->inflight_credits - leave_credits;
69 port->inflight_credits = leave_credits;
71 rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
73 DSW_LOG_DP_PORT(DEBUG, port->id,
74 "Returned %d tokens to pool.\n",
80 dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
82 if (dequeued > 0 && port->busy_start == 0)
83 /* work period begins */
84 port->busy_start = rte_get_timer_cycles();
85 else if (dequeued == 0 && port->busy_start > 0) {
86 /* work period ends */
87 uint64_t work_period =
88 rte_get_timer_cycles() - port->busy_start;
89 port->busy_cycles += work_period;
95 dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
97 uint64_t passed = now - port->measurement_start;
98 uint64_t busy_cycles = port->busy_cycles;
100 if (port->busy_start > 0) {
101 busy_cycles += (now - port->busy_start);
102 port->busy_start = now;
105 int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
107 port->measurement_start = now;
108 port->busy_cycles = 0;
110 port->total_busy_cycles += busy_cycles;
116 dsw_port_load_update(struct dsw_port *port, uint64_t now)
122 old_load = rte_atomic16_read(&port->load);
124 period_load = dsw_port_load_close_period(port, now);
126 new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
127 (DSW_OLD_LOAD_WEIGHT+1);
129 rte_atomic16_set(&port->load, new_load);
133 dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
135 if (now < port->next_load_update)
138 port->next_load_update = now + port->load_update_interval;
140 dsw_port_load_update(port, now);
144 dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
146 struct dsw_queue *queue = &dsw->queues[queue_id];
149 if (queue->num_serving_ports > 1)
150 port_id = queue->flow_to_port_map[flow_hash];
152 /* A single-link queue, or atomic/ordered/parallel but
153 * with just a single serving port.
155 port_id = queue->serving_ports[0];
157 DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
158 "to port %d.\n", queue_id, flow_hash, port_id);
164 dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
165 uint8_t dest_port_id)
167 struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
168 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
169 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
170 uint16_t enqueued = 0;
172 if (*buffer_len == 0)
175 /* The rings are dimensioned to fit all in-flight events (even
176 * on a single ring), so looping will work.
180 rte_event_ring_enqueue_burst(dest_port->in_ring,
182 *buffer_len-enqueued,
184 } while (unlikely(enqueued != *buffer_len));
190 dsw_port_get_parallel_flow_id(struct dsw_port *port)
192 uint16_t flow_id = port->next_parallel_flow_id;
194 port->next_parallel_flow_id =
195 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
201 dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
202 uint8_t dest_port_id, const struct rte_event *event)
204 struct rte_event *buffer = source_port->out_buffer[dest_port_id];
205 uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
207 if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
208 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
210 buffer[*buffer_len] = *event;
215 #define DSW_FLOW_ID_BITS (24)
217 dsw_flow_id_hash(uint32_t flow_id)
223 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
224 offset += DSW_MAX_FLOWS_BITS;
225 } while (offset < DSW_FLOW_ID_BITS);
231 dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
232 struct rte_event event)
234 uint8_t dest_port_id;
236 event.flow_id = dsw_port_get_parallel_flow_id(source_port);
238 dest_port_id = dsw_schedule(dsw, event.queue_id,
239 dsw_flow_id_hash(event.flow_id));
241 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
245 dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
246 const struct rte_event *event)
249 uint8_t dest_port_id;
251 if (unlikely(dsw->queues[event->queue_id].schedule_type ==
252 RTE_SCHED_TYPE_PARALLEL)) {
253 dsw_port_buffer_parallel(dsw, source_port, *event);
257 flow_hash = dsw_flow_id_hash(event->flow_id);
259 dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
261 dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
265 dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
267 /* To pull the control ring reasonbly often on busy ports,
268 * each dequeued/enqueued event is considered an 'op' too.
270 port->ops_since_bg_task += (num_events+1);
274 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
277 dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
279 if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
282 now = rte_get_timer_cycles();
286 /* Logic to avoid having events linger in the output
289 dsw_port_flush_out_buffers(dsw, port);
291 dsw_port_consider_load_update(port, now);
293 port->ops_since_bg_task = 0;
298 dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
300 uint16_t dest_port_id;
302 for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
303 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
307 dsw_event_enqueue(void *port, const struct rte_event *ev)
309 return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
312 static __rte_always_inline uint16_t
313 dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
314 uint16_t events_len, bool op_types_known,
315 uint16_t num_new, uint16_t num_release,
316 uint16_t num_non_release)
318 struct dsw_port *source_port = port;
319 struct dsw_evdev *dsw = source_port->dsw;
323 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
324 "events to port %d.\n", events_len, source_port->id);
326 dsw_port_bg_process(dsw, source_port);
328 /* XXX: For performance (=ring efficiency) reasons, the
329 * scheduler relies on internal non-ring buffers instead of
330 * immediately sending the event to the destination ring. For
331 * a producer that doesn't intend to produce or consume any
332 * more events, the scheduler provides a way to flush the
333 * buffer, by means of doing an enqueue of zero events. In
334 * addition, a port cannot be left "unattended" (e.g. unused)
335 * for long periods of time, since that would stall
336 * migration. Eventdev API extensions to provide a cleaner way
337 * to archieve both of these functions should be
340 if (unlikely(events_len == 0)) {
341 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
342 dsw_port_flush_out_buffers(dsw, source_port);
346 if (unlikely(events_len > source_port->enqueue_depth))
347 events_len = source_port->enqueue_depth;
349 dsw_port_note_op(source_port, events_len);
352 for (i = 0; i < events_len; i++) {
353 switch (events[i].op) {
354 case RTE_EVENT_OP_RELEASE:
357 case RTE_EVENT_OP_NEW:
366 /* Technically, we could allow the non-new events up to the
367 * first new event in the array into the system, but for
368 * simplicity reasons, we deny the whole burst if the port is
369 * above the water mark.
371 if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
372 source_port->new_event_threshold))
375 enough_credits = dsw_port_acquire_credits(dsw, source_port,
377 if (unlikely(!enough_credits))
380 source_port->pending_releases -= num_release;
382 for (i = 0; i < events_len; i++) {
383 const struct rte_event *event = &events[i];
385 if (likely(num_release == 0 ||
386 event->op != RTE_EVENT_OP_RELEASE))
387 dsw_port_buffer_event(dsw, source_port, event);
390 DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
391 "accepted.\n", num_non_release);
393 return num_non_release;
397 dsw_event_enqueue_burst(void *port, const struct rte_event events[],
400 return dsw_event_enqueue_burst_generic(port, events, events_len, false,
405 dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
408 return dsw_event_enqueue_burst_generic(port, events, events_len, true,
409 events_len, 0, events_len);
413 dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
416 return dsw_event_enqueue_burst_generic(port, events, events_len, true,
421 dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
423 return dsw_event_dequeue_burst(port, events, 1, wait);
427 dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
430 return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
434 dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
435 uint64_t wait __rte_unused)
437 struct dsw_port *source_port = port;
438 struct dsw_evdev *dsw = source_port->dsw;
441 source_port->pending_releases = 0;
443 dsw_port_bg_process(dsw, source_port);
445 if (unlikely(num > source_port->dequeue_depth))
446 num = source_port->dequeue_depth;
448 dequeued = dsw_port_dequeue_burst(source_port, events, num);
450 source_port->pending_releases = dequeued;
452 dsw_port_load_record(source_port, dequeued);
454 dsw_port_note_op(source_port, dequeued);
457 DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
460 dsw_port_return_credits(dsw, source_port, dequeued);
462 /* XXX: Assuming the port can't produce any more work,
463 * consider flushing the output buffer, on dequeued ==