ethdev: fix memory ordering for callback functions
authorHonnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Tue, 13 Oct 2020 16:25:37 +0000 (11:25 -0500)
committerFerruh Yigit <ferruh.yigit@intel.com>
Fri, 16 Oct 2020 17:48:18 +0000 (19:48 +0200)
Call back functions are registered on the control plane. They
are accessed from the data plane. Hence, correct memory orderings
should be used to avoid race conditions.

Fixes: 4dc294158cac ("ethdev: support optional Rx and Tx callbacks")
Fixes: c8231c63ddcb ("ethdev: insert Rx callback as head of list")
Cc: stable@dpdk.org
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
Reviewed-by: Phil Yang <phil.yang@arm.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
lib/librte_ethdev/rte_ethdev.c
lib/librte_ethdev/rte_ethdev.h

index 2198548..7eaeccc 100644 (file)
@@ -4610,12 +4610,20 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
                rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
 
        if (!tail) {
-               rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+               /* Stores to cb->fn and cb->param should complete before
+                * cb is visible to data plane.
+                */
+               __atomic_store_n(
+                       &rte_eth_devices[port_id].post_rx_burst_cbs[queue_id],
+                       cb, __ATOMIC_RELEASE);
 
        } else {
                while (tail->next)
                        tail = tail->next;
-               tail->next = cb;
+               /* Stores to cb->fn and cb->param should complete before
+                * cb is visible to data plane.
+                */
+               __atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE);
        }
        rte_spinlock_unlock(&rte_eth_rx_cb_lock);
 
@@ -4700,12 +4708,20 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
                rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
 
        if (!tail) {
-               rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
+               /* Stores to cb->fn and cb->param should complete before
+                * cb is visible to data plane.
+                */
+               __atomic_store_n(
+                       &rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id],
+                       cb, __ATOMIC_RELEASE);
 
        } else {
                while (tail->next)
                        tail = tail->next;
-               tail->next = cb;
+               /* Stores to cb->fn and cb->param should complete before
+                * cb is visible to data plane.
+                */
+               __atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE);
        }
        rte_spinlock_unlock(&rte_eth_tx_cb_lock);
 
@@ -4736,7 +4752,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
                cb = *prev_cb;
                if (cb == user_cb) {
                        /* Remove the user cb from the callback list. */
-                       *prev_cb = cb->next;
+                       __atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED);
                        ret = 0;
                        break;
                }
@@ -4770,7 +4786,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t queue_id,
                cb = *prev_cb;
                if (cb == user_cb) {
                        /* Remove the user cb from the callback list. */
-                       *prev_cb = cb->next;
+                       __atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED);
                        ret = 0;
                        break;
                }
index f07dbc4..c37b3a1 100644 (file)
@@ -3911,7 +3911,8 @@ struct rte_eth_rxtx_callback;
  *   The callback function
  * @param user_param
  *   A generic pointer parameter which will be passed to each invocation of the
- *   callback function on this port and queue.
+ *   callback function on this port and queue. Inter-thread synchronization
+ *   of any user data changes is the responsibility of the user.
  *
  * @return
  *   NULL on error.
@@ -3940,7 +3941,8 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t queue_id,
  *   The callback function
  * @param user_param
  *   A generic pointer parameter which will be passed to each invocation of the
- *   callback function on this port and queue.
+ *   callback function on this port and queue. Inter-thread synchronization
+ *   of any user data changes is the responsibility of the user.
  *
  * @return
  *   NULL on error.
@@ -3968,7 +3970,8 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t queue_id,
  *   The callback function
  * @param user_param
  *   A generic pointer parameter which will be passed to each invocation of the
- *   callback function on this port and queue.
+ *   callback function on this port and queue. Inter-thread synchronization
+ *   of any user data changes is the responsibility of the user.
  *
  * @return
  *   NULL on error.
@@ -3993,7 +3996,9 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t queue_id,
  *   on that queue.
  *
  * - After a short delay - where the delay is sufficient to allow any
- *   in-flight callbacks to complete.
+ *   in-flight callbacks to complete. Alternately, the RCU mechanism can be
+ *   used to detect when data plane threads have ceased referencing the
+ *   callback memory.
  *
  * @param port_id
  *   The port identifier of the Ethernet device.
@@ -4026,7 +4031,9 @@ int rte_eth_remove_rx_callback(uint16_t port_id, uint16_t queue_id,
  *   on that queue.
  *
  * - After a short delay - where the delay is sufficient to allow any
- *   in-flight callbacks to complete.
+ *   in-flight callbacks to complete. Alternately, the RCU mechanism can be
+ *   used to detect when data plane threads have ceased referencing the
+ *   callback memory.
  *
  * @param port_id
  *   The port identifier of the Ethernet device.
@@ -4687,10 +4694,18 @@ rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
                                     rx_pkts, nb_pkts);
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-       if (unlikely(dev->post_rx_burst_cbs[queue_id] != NULL)) {
-               struct rte_eth_rxtx_callback *cb =
-                               dev->post_rx_burst_cbs[queue_id];
+       struct rte_eth_rxtx_callback *cb;
 
+       /* __ATOMIC_RELEASE memory order was used when the
+        * call back was inserted into the list.
+        * Since there is a clear dependency between loading
+        * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
+        * not required.
+        */
+       cb = __atomic_load_n(&dev->post_rx_burst_cbs[queue_id],
+                               __ATOMIC_RELAXED);
+
+       if (unlikely(cb != NULL)) {
                do {
                        nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
                                                nb_pkts, cb->param);
@@ -4953,7 +4968,16 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
 #endif
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-       struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
+       struct rte_eth_rxtx_callback *cb;
+
+       /* __ATOMIC_RELEASE memory order was used when the
+        * call back was inserted into the list.
+        * Since there is a clear dependency between loading
+        * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
+        * not required.
+        */
+       cb = __atomic_load_n(&dev->pre_tx_burst_cbs[queue_id],
+                               __ATOMIC_RELAXED);
 
        if (unlikely(cb != NULL)) {
                do {