Skip to content

Commit

Permalink
iox-eclipse-iceoryx#2177 Reverse early return and relaxed memory order
Browse files Browse the repository at this point in the history
  • Loading branch information
albtam committed May 1, 2024
1 parent f955e76 commit 04837bb
Showing 1 changed file with 44 additions and 39 deletions.
83 changes: 44 additions & 39 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -99,54 +99,59 @@ inline bool SpscSofi<ValueType, CapacityValue>::empty() const noexcept
template <class ValueType, uint64_t CapacityValue>
inline bool SpscSofi<ValueType, CapacityValue>::pop(ValueType& valueOut) noexcept
{
// We need 'm_readPosition.load(std::memory_order_acquire)' to happen before
// 'm_writePosition.load(std::memory_order_acquire)' to avoid the following scenario where the
// CPU reorders the load of m_readPosition and m_writePosition:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|--C--|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3. The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_acquire);
uint64_t nextReadPosition{0};
bool popWasSuccessful{true};
// Memory order relaxed is enough since:
// - no synchronization needed for m_readPosition
// - if m_writePosition is loaded before m_readPosition and m_readPosition changed, it will be detected by the
// compare_exchange loop
uint64_t currentReadPosition = m_readPosition.load(std::memory_order_relaxed);

do
{
// SYNC POINT READ: m_data
// See explanation of the corresponding synchronization point in push()
if (currentReadPosition == m_writePosition.load(std::memory_order_acquire))
{
return false;
// We don't need to check if read has changed, as it is enough to know that the empty
// state was valid in the past. The same race can also happen after the while loop and
// before the return operation
nextReadPosition = currentReadPosition;
popWasSuccessful = false;
// We cannot just return false (i.e. we need to continue the loop) to avoid the following situation:
// 0. Initial situation (the queue is full)
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 1. The consumer thread loads m_writePosition => 3
// |----|--B--|--C--|
// ^ ^
// w=3 r=1
// 2. The producer thread pushes two times
// |--D--|--E--|-----|
// ^ ^
// r=3 w=5
// 3. The consumer thread loads m_readPosition => 3 The pop method returns false
// => Whereas the queue was full, pop returned false giving the impression that the queue if empty
}
else
{
// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));
nextReadPosition = currentReadPosition + 1U;
popWasSuccessful = true;

// we use memcpy here, to ensure that there is no logic in copying the data
std::memcpy(&valueOut, &m_data[currentReadPosition % m_size], sizeof(ValueType));


// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
// =============================================
// While memory synchronization is not needed for m_readPosition, we need to ensure that the
// 'memcpy' happens before updating m_readPosition.
// Corresponding m_readPosition load/acquire is in the push method
// =============================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz
// while this thread is blocked, we would still need more than 500 years to overflow
// m_readPosition and encounter the ABA problem
// We need to check if m_readPosition hasn't changed otherwise valueOut might be corrupted
// =============================================
// While memory synchronization is not needed for m_readPosition, we need to ensure that the
// 'memcpy' happens before updating m_readPosition.
// Corresponding m_readPosition load/acquire is in the CAS loop of push method
// =============================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is pushing at a rate of 1 GHz
// while this thread is blocked, we would still need more than 500 years to overflow
// m_readPosition and encounter the ABA problem
}
} while (!m_readPosition.compare_exchange_weak(
currentReadPosition, currentReadPosition + 1U, std::memory_order_acq_rel, std::memory_order_acquire));
currentReadPosition, nextReadPosition, std::memory_order_acq_rel, std::memory_order_acquire));

return true;
return popWasSuccessful;
}

template <class ValueType, uint64_t CapacityValue>
Expand Down Expand Up @@ -206,8 +211,8 @@ inline bool SpscSofi<ValueType, CapacityValue>::push(const ValueType& valueIn, V
// Another issue might be that two consecutive pushes (not concurrent) happen on different CPU
// cores without synchronization, then the memory also needs to be synchronized for the overflow
// case.
// Memory order failure is memory_order_relaxed since there is no further synchronization
// needed if there is no overflow
// Memory order failure needs to be memory_order_acquire to match the corresponding m_readPosition store/release in
// the CAS loop of the pop method
// ======================================
// ABA problem: m_readPosition is an uint64_t. Assuming a thread is popping at a rate of 1 GHz while
// this thread is blocked, we would still need more than 500 years to overflow m_readPosition and
Expand Down

0 comments on commit 04837bb

Please sign in to comment.