Skip to content

Commit

Permalink
iox-eclipse-iceoryx#2177 Update SPSC SoFi
Browse files Browse the repository at this point in the history
- add comments to clarify concurrent behavior
- fix memory orders
  • Loading branch information
albtam committed Mar 20, 2024
1 parent b2cd72b commit db57a3e
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 215 deletions.
169 changes: 90 additions & 79 deletions iceoryx_hoofs/concurrent/buffer/include/iox/detail/spsc_sofi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,36 @@
#include <atomic>
#include <cstdint>
#include <cstring>
#include <utility>

namespace iox
{
namespace concurrent
{
/// @brief
/// Thread safe producer and consumer queue with a safe overflowing behavior.
/// SpscSofi is designed in a FIFO Manner but prevents data loss when pushing into
/// a full SpscSofi. When SpscSofi is full and a Sender tries to push, the data at the
/// current read position will be returned. SpscSofi is threadsafe without using
/// locks. When the buffer is filled, new data is written starting at the
/// beginning of the buffer and overwriting the old.The SpscSofi is especially
/// designed to provide fixed capacity storage. When its capacity is exhausted,
/// newly inserted elements will cause elements either at the beginning
/// to be overwritten.The SpscSofi only allocates memory when
/// created , capacity can be is adjusted explicitly.
///
/// @brief Thread safe (without locks) single producer and single consumer queue with a safe
/// overflowing behavior
/// @note When SpscSoFi is full and a sender tries to push, the data at the current read pos will be
/// returned. This behavior mimics a FiFo queue but prevents resource leaks when pushing into
/// a full SpscSoFi.
/// SpscSoFi is especially designed to provide fixed capacity storage.
/// SpscSoFi only allocates memory when created, capacity can be adjusted explicitly.
/// @example
/// It's an expected behavior that when push/pop are called concurrently and SpscSoFi is full, as
/// many elements as specified with 'CapacityValue' can be removed
/// 0: Initial situation:
/// |--A--|--B--|
/// 1. Thread 1 pushes a new element. Since it is an overflowing situation, the overwritten value is
/// removed and returned to the caller
/// |--A--|--B--|
/// 2. Right before push() returns, pop() detects that an element is about to be removed, and remove
/// the next element
/// |--C--|----|
/// @param[in] ValueType DataType to be stored, must be trivially copyable
/// @param[in] CapacityValue Capacity of the SpscSofi
template <class ValueType, uint64_t CapacityValue>
class SpscSofi
{
// We need to make sure that the copy operation doesn't have any logic
static_assert(std::is_trivially_copyable<ValueType>::value,
"SpscSofi can handle only trivially copyable data types");
/// @brief Check if Atomic integer is lockfree on platform
Expand All @@ -55,87 +63,89 @@ class SpscSofi
/// ATOMIC_INT_LOCK_FREE = 0 - is never lockfree
static_assert(2 <= ATOMIC_INT_LOCK_FREE, "SpscSofi is not able to run lock free on this data type");

/// @brief Internal size needs to be bigger than the size desirred by the user
/// This is because of buffer empty detection and overflow handling
static constexpr uint32_t INTERNAL_SIZE_ADD_ON = 1;

/// @brief This is the resulting internal size on creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_SIZE = CapacityValue + INTERNAL_SIZE_ADD_ON;
// To ensure a consumer gets at least the amount of capacity of data when a queue is full, an additional free
// slot (add-on) is required.
// ========================================================================
// Consider the following scenario when there is no "capacity add-on":
// 1. CapacityValue = 2
// |--A--|--B--|
// ^
// w=2, r=0
// 2. We want to push a new element
// 3. Advance the read position (this effectively reduces the capacity and is the reason the internal capacity
// needs to be larger; the consumer cannot pop out CAPACITY amount of samples even though the queue is full if
// the push thread is suspended right after this operation)
// |--A--|--B--|
// ^ ^
// w=2 r=1
// 4. Take overflow data
// |-----|--B--|
// ^ ^
// w=2 r=1
// 5. Write new data
// |--C--|--B--|
// ^ ^
// w=2 r=1
// 6. Advance next write position
// |--C--|--B--|
// ^
// w=3, r=1
// ========================================================================
// With "capacity add-on"
// 1. CapacityValue = 2, InternalCapacity = 3
// |--A--|--B--|----|
// ^ ^
// r=0 w=2
// 2. We want to push a new element
// 3. We first write at index 2 % capacity
// |--A--|--B--|--C--|
// ^
// w=3, r=0,
// 2. We want to push a new element:
// 4. We detect that the queue if full so we retrieve the value pointed by the read pointer: the value A is
// returned
// |-(A)-|--B--|--C--|
// ^ ^
// w=3 r=1
// ========================================================================
static constexpr uint32_t INTERNAL_CAPACITY_ADDON = 1;

/// @brief Internal capacity of the queue at creation
static constexpr uint32_t INTERNAL_SPSC_SOFI_CAPACITY = CapacityValue + INTERNAL_CAPACITY_ADDON;

public:
/// @brief default constructor which constructs an empty sofi
SpscSofi() noexcept = default;

/// @brief pushs an element into SpscSofi. if SpscSofi is full the oldest data will be
/// @brief push an element into sofi. if sofi is full the oldest data will be
/// returned and the pushed element is stored in its place instead.
/// @param[in] valueIn value which should be stored
/// @param[out] valueOut if SpscSofi is overflowing the value of the overridden value
/// @param[in] value_in value which should be stored
/// @param[out] value_out if sofi is overflowing the value of the overridden value
/// is stored here
/// @concurrent restricted thread safe: single pop, single push no
/// push calls from multiple contexts
/// @return return true if push was sucessfull else false.
/// @note restricted thread safe: can only be called from one thread. The authorization to push into the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return return true if push was successful else false.
/// @code
/// (initial situation, SpscSofi is FULL)
/// Start|-----A-------|
/// |-----B-------|
/// |-----C-------|
/// |-----D-------|
///
///
/// (calling push with data ’E’)
/// Start|-----E-------|
/// |-----A-------|
/// |-----B-------|
/// |-----C-------|
/// (’D’ is returned as valueOut)
///
/// ###################################################################
///
/// (if SpscSofi is not FULL , calling push() add new data)
/// Start|-------------|
/// |-------------| ( Initial SpscSofi )
/// (push() Called two times)
///
/// |-------------|
/// (New Data)
/// |-------------|
/// (New Data)
/// @endcode
/// 1. sofi is empty |-----|-----|
/// 2. push an element |--A--|-----|
/// 3. push an element |--A--|--B--|
/// 5. sofi is full
/// 6. push an element |--C--|--B--| -> value_out is set to 'A'
bool push(const ValueType& valueIn, ValueType& valueOut) noexcept;

/// @brief pop the oldest element
/// @param[out] valueOut storage of the pop'ed value
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @concurrent restricted thread safe: can only be called from one thread. The authorization to pop from the
/// SpscSofi can be transferred to another thread if appropriate synchronization mechanisms are used.
/// @return false if SpscSofi is empty, otherwise true
bool pop(ValueType& valueOut) noexcept;

/// @brief conditional pop call to provide an alternative for a peek
/// and pop approach. If the verificator returns true the
/// peeked element is returned.
/// @param[out] valueOut storage of the pop'ed value
/// @param[in] verificator callable of type bool(const ValueType& peekValue)
/// which takes the value which would be pop'ed as argument and returns
/// true if it should be pop'ed, otherwise false
/// @code
/// int limit = 7128;
/// mysofi.popIf(value, [=](const ValueType & peek)
/// {
/// return peek < limit; // pop only when peek is smaller than limit
/// }
/// ); // pop's a value only if it is smaller than 9012
/// @endcode
/// @concurrent restricted thread safe: single pop, single push no
/// pop or popIf calls from multiple contexts
/// @return false if SpscSofi is empty or when verificator returns false, otherwise true
template <typename Verificator_T>
bool popIf(ValueType& valueOut, const Verificator_T& verificator) noexcept;

/// @brief returns true if SpscSofi is empty, otherwise false
/// @note the use of this function is limited in the concurrency case. if you
/// call this and in another thread pop is called the result can be out
/// of date as soon as you require it
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
bool empty() const noexcept;

/// @brief resizes SpscSofi
Expand All @@ -150,15 +160,16 @@ class SpscSofi
uint64_t capacity() const noexcept;

/// @brief returns the current size of SpscSofi
/// @concurrent unrestricted thread safe
/// @concurrent unrestricted thread safe (the result might already be outdated when used). Expected to be called
/// from either the producer or the consumer thread but not from a third thread
uint64_t size() const noexcept;

private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_SIZE> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_SIZE;
std::pair<uint64_t, uint64_t> getReadWritePositions() const noexcept;

/// @brief the write/read pointers are "atomic pointers" so that they are not
/// reordered (read or written too late)
private:
UninitializedArray<ValueType, INTERNAL_SPSC_SOFI_CAPACITY> m_data;
uint64_t m_size = INTERNAL_SPSC_SOFI_CAPACITY;
std::atomic<uint64_t> m_readPosition{0};
std::atomic<uint64_t> m_writePosition{0};
};
Expand Down

0 comments on commit db57a3e

Please sign in to comment.