Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read-write Mutex #94

Open
fvasco opened this issue Jul 31, 2017 · 34 comments
Open

Read-write Mutex #94

fvasco opened this issue Jul 31, 2017 · 34 comments
Assignees

Comments

@fvasco
Copy link
Contributor

fvasco commented Jul 31, 2017

Is it in the roadmap?

@elizarov
Copy link
Contributor

elizarov commented Jul 31, 2017

It is on my to-do list, but not for the short-term. It is really up for grabs. This is a great feature for someone, who wants to figure how to write low-level lock-free implementations for kotlinx.coroutines primitives, to go ahead, study Mutex implementation, and figure out how to abstract (reuse) parts of that to implement ReadWriteMutex. It would be a pity if I write it myself and deprive someone else from this great learning opportunity.

@bobvawter
Copy link

Here's a basic implementation of the 80% case for ReadWriteMutex built on top of Mutex where you just want to execute coroutines as readers or writers.

https://gist.github.com/bobvawter/4ff642d5996dfccb228425909f303306

@fvasco
Copy link
Contributor Author

fvasco commented Sep 2, 2017

The proposed implementation misses of support for "select" clause.

@elizarov
Copy link
Contributor

elizarov commented Sep 2, 2017

Adding "select" in not a critical problem. I have a different concern here -- stateLock approach does not seem efficient to me for a core synchronization primitive. The act of acquiring read lock should be fully lock-free for scalability purposes, so that it scales to multi-core machines where a lot of threads can be acquiring and releasing read lock concurrently.

@bobvawter
Copy link

@elizarov If you agree with the general design of that ReadWriteMutex API, I can work on making the implementation more like Mutex (using an atomic queue of operations), adding additional state-management methods (e.g. tryReadLock()), and then put together a PR in the next couple of weeks.

Two questions before I start:

  • Should state-change functions like readLock() accept an owner argument?
  • If a user were to pass the same object twice to readLock(owner:Any?), should that call succeed?

I can see arguments for owner representing a specific actor (possibly reentrant or concurrent), or for owner representing a specific read operation. The former would look like some internal structure similar to ConcurrentMap<Any, Int> and the latter just a ConcurrentSet<Any>.

@jcornaz
Copy link
Contributor

jcornaz commented Feb 13, 2018

@elizarov, I don't understand your statement:

The act of acquiring read lock should be fully lock-free

How is it possible to guarantee that there will never be a write-lock and a read-lock open at the same time without locking something when we acquire a read-lock ?

I am not aware of other algorithms than theses two: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Implementation

And neither of them alow to acquire a readlock in a lock-free manner.

Here my current read-write mutex implementation: https://gist.github.com/jcornaz/e94ee6a3a139ddd33c20554b0380d30f

@jcornaz
Copy link
Contributor

jcornaz commented Feb 13, 2018

After a second thought I think I now understand how it can be lock free to acquire a read-lock (if there is no write-lock open).

I drafted a first implementation here : https://github.com/jcornaz/kotlinx.coroutines/blob/c2f6831c3d8aa81c3b6c9e9da753873153e2b8a9/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/ReadWriteMutex.kt
(EDIT: This implementation suffer a critical issue allowing to acquire a read lock, when a write lock is active)

This implementation still suffer some problems that I'll address before creating a PR.

But, could you tell me what interface you prefer?

Choice 1:

interface ReadWriteMutex {
  val isReadLocked: Boolean
  val isWriteLocked: Boolean

  fun tryLockRead(): Boolean
  fun tryLockWrite(): Boolean

  suspend fun lockRead()
  fun unlockRead()

  suspend fun lockWrite()
  fun unlockWrite()
}

Choice 2:

interface Lock {
   fun unlock()
}

interface ReadWriteMutex {

  val isReadLocked: Boolean
  val isWriteLocked: Boolean

  fun tryLockRead(): Lock?
  fun tryLockWrite(): Lock?

  suspend fun lockRead(): Lock
  suspend fun lockWrite(): Lock
}

The first interface looks more like the Mutex interface. But it's less safe, because one could call unlockRead() without calling lockRead() before. The second interface make possible to address this issue, but would be less consistent with the Mutex interface.

Note 1: I know there is no select clause yet. I will try to add them.
Note 2: This is not possible to specify owner yet. Should I address it too ?

@jcornaz
Copy link
Contributor

jcornaz commented Feb 14, 2018

Here would be my implementation using the second interface:
https://github.com/jcornaz/kotlinx.coroutines/blob/feature/readwrite-mutex/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/ReadWriteMutex.kt

If you agree with the concept, I'll try to implement select clauses, add comments and prepare a pull request.

@fvasco
Copy link
Contributor Author

fvasco commented Feb 14, 2018

Regardless of final choice I consider better to unify interfaces.

I like solution 2 but I suspect it requires some extra allocation.
Moreover using a Mutex force me to store locally both Mutex and Lock instances.

I suggest a "Choice 3" opposite to the first.

Choice 3:

interface ReadWriteMutex {
    val read: Mutex
    val write: Mutex
}

comparison:

rwMutex.lockRead()
rwMutex.read.lock()

Plus: choice 3 is compatible with regular mutex.

@jcornaz
Copy link
Contributor

jcornaz commented Feb 14, 2018

Thanks @fvasco, this is a very good proposition and would be my preferred choice too.

I'll try do it that way

@elizarov
Copy link
Contributor

I like @fvasco API proposal, too. Please, send PR when you are done.

As a side note, we are considering to move "stack overflow prevention" logic that is currently complicating implementation of a regular (exlusive) Mutex into Unconfined dispatcher. It should make Mutex (and ReadWriteMutex) implementation simpler and faster for a typical case of a default dispatcher, while still being free from stack overflow problem under Unconfined dispatcher. See #80 for details.

@Dmitry-Borodin
Copy link
Contributor

@fvasco please clarify whether you are working on it. Since this issue is still with "up for grabs" label, and it is not clear, whether it make sense to work on it.

@fvasco
Copy link
Contributor Author

fvasco commented Apr 9, 2018

Hi @Dmitry-Borodin
sorry for misunderstanding,
I'm considering this job assigned to @jcornaz

I'll try do it that way

Any news?

@jcornaz
Copy link
Contributor

jcornaz commented Apr 9, 2018

Well hmm... I'd say don't wait for me. The implementation is harder than I thought.

Even if I am actually still trying to do it (at least for my own practice and learning of the subject), I think it's better if you don't wait for my implementation.

@Dmitry-Borodin, feel free to work on it, if you're interested.

@Dmitry-Borodin
Copy link
Contributor

Dmitry-Borodin commented Apr 9, 2018

Thank you.
I'll take a look what I can do, I will write here if I will deliver it.

@elizarov
Copy link
Contributor

Let me clarify for people googling up this thread. We are not focused on implementation of RW mutex right-now. We are more focused on CSP/actor-based programming paradigms. However, if there is PR with a high-quality/high-performance RW-mutex implementation, then we'll accept it into the library.

@kobe2000
Copy link

kobe2000 commented Jun 15, 2018

We are more focused on CSP/actor-based programming paradigms

Hi, did you mean actor programming can eliminate RW mutex?

@elizarov
Copy link
Contributor

Actor-style programming eliminates the need for all kinds of synchronization primitives, including (but not limited to) locks and RW locks. The whole idea of actor-style programming is that mutable state is never shared, but is encapsulated into an actor.

@kobe2000
Copy link

kobe2000 commented Jun 15, 2018

Could you please give me some hint how to use actor-style programming avoiding RW lock in such situation:
I use a large collection in memory(GBs), and it needs to support CRUD operations in multithread, I can not figure it out how to code avoiding RW lock to improve performance.
Thank you!

@btwilk
Copy link

btwilk commented Jun 24, 2018

Here are efficient read-write lock and semaphore implementations that I would like to contribute.

https://github.com/btwilk/kotlinx.coroutines/tree/new-sync/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync

I've been using them for a while without any problems but I would appreciate review for correctness.

Let me know what needs to be fixed/cleaned up/tested for a successful PR.

@fvasco
Copy link
Contributor Author

fvasco commented Jun 24, 2018

Hi @btwilk
are you considered to expose the proposed public interface?

interface ReadWriteMutex {
    val read: Mutex
    val write: Mutex
}

in such case the implementation can be private.

@btwilk
Copy link

btwilk commented Jun 25, 2018

I think tracking multiple owners is too much overhead for a low-level primitive, which is the main reason I didn't consider it.

Also, the Mutex interface is pretty heavy. I haven't thought about:

  • onLock: support for select
  • lock: the cancellation behavior documented in Mutex

I hope there is a way forward that allows for incremental progress.

@btwilk
Copy link

btwilk commented Jun 25, 2018

I just updated my branch with the proposed interface, except using a lighter-weight Lock interface instead of Mutex.

@qwwdfsad qwwdfsad removed this from the 1.2.1 milestone Apr 23, 2019
@aleksclark
Copy link

Any input on the new PR for this?

@elizarov
Copy link
Contributor

Sorry. We're currently concentrated on flows, so the mutex stuff is on a back-burner. However, we also in the process of introducing an efficient implementation of semaphore (see #1101, paper on the algorithm is to be published, too) and plan to reimplement a regular mutex on top of this efficient semaphore. The architecture of this semaphore actually scales to other synchronization primitives like R/W mutexes.

@alllex
Copy link

alllex commented Mar 2, 2020

The "up for grabs" tag was removed, but there seems to be no progress on the issue. Does someone actively work on this?

@elizarov
Copy link
Contributor

elizarov commented Mar 3, 2020

It is not an "easy" issue that you can just go and fix, but locks are not that much useful with coroutines as with threads, so no short-terms plans to fix it.

@alllex
Copy link

alllex commented Mar 3, 2020

Do I understand correctly, that the new efficient Semaphore has already been implemented and merged? Is it all that is supposedly required for an efficient implementation of the R/W mutex or there are more infrastructural changes on the way?

@elizarov
Copy link
Contributor

elizarov commented Mar 3, 2020

Do I understand correctly, that the new efficient Semaphore has already been implemented and merged? Is it all that is supposedly required for an efficient implementation of the R/W mutex or there are more infrastructural changes on the way?

Yes. Semaphore is implemented and merged, yet R/W mutex is quote a non-trivial extension of that algorithm.

@LouisCAD
Copy link
Contributor

Quoting @elizarov from over 4 years ago:

It would be a pity if I write it myself and deprive someone else from this great learning opportunity.

Looks like there are not a lot of folks that feel good enough to take up that learning opportunity 😅

I personally wanted to make one a few months ago as I finally had the use case, and even with all the interesting discussion here and several links, I didn't succeed nor make encouraging progress in the one day time box I set myself, so I ended up giving up and using a catch-all Mutex instead.

@qwwdfsad
Copy link
Member

qwwdfsad commented Sep 13, 2021

Writing your own RW-mutex or, even more, extending our Semaphore algorithm may be quite a hard task.
But when you already have an efficient semaphore realization, you don't have to!

Semaphore is quite versatile primitive and the rest of the synchronization primitives can be built on top of it.

  • Little book of semaphores has a step-by-step implementation of RW-lock on top of a few semaphores with a detailed explanation of why things work (or don't). The target chapter is 4.2.

  • The more efficient implementation (lightswitch is replaced with CAS-loop), but in C++, can be found here. This particular C+ code is pretty straightforward to be ported on Kotlin.

  • The most straightforward way, though not supported in our API is to have a Semaphore(Int.MAX_VALUE) where issuing read-lock is acquire(1) and issuing write-lock is acquire(Long.MAX_VALUE)

NB: these implementations are good enough for any project-specific uses, but not for kotlinx.coroutines themselves. We have plenty of open questions regarding how RW public API must look like, and, in addition, we would like to extend the internal Semaphore algorithm in the most efficient way

@youcef-debbah
Copy link

Without any redirections below is my full implementation of a read/write mutex based on RWMutex implementation (go lang)
check out RWMutex source HERE
THIS article will help you understand the implementation
disclaimer: I wrote this in a hurry in a single workday, more tests and proper documentation are certainly needed but for now this is what I have, I hope this would save someone some time and effort in the future!

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.SelectClause2
import kotlinx.coroutines.sync.Mutex
import java.util.concurrent.atomic.AtomicInteger
import javax.naming.OperationNotSupportedException

interface ReadWriteMutex {
    val read: Mutex
    val write: Mutex
    val state: LockState

    enum class LockState {

        // at least one read lock is acquired (returned from lock fun)
        READ_LOCKED,

        // write lock is acquired, in this state new readers/writers will -obviously-
        // suspended -inside lock()- until the current writer release his lock,
        // in the same time keep in mind that the writer itself may have been
        // already returned from lock fun OR it may be still suspended -inside lock()-
        // waiting for old readers to release their locks
        WRITE_LOCKED,

        // no lock is acquired
        UNLOCKED,
    }

    fun ensure(targetState: LockState)
    fun ensureWriteLocked()
    fun ensureReadLocked()
    fun ensureUnlocked()
}

fun ReadWriteMutex(): ReadWriteMutex {
    val mutex = Mutex()
    val writePermissions = Channel<Unit>()
    val readPermissions = Channel<Unit>()
    val pendingCount = AtomicInteger()
    val readersDeparting = AtomicInteger()
    return SimpleReadWriteMutex(
        pendingCount,
        ReaderMutex(mutex, writePermissions, readPermissions, pendingCount, readersDeparting),
        WriterMutex(mutex, writePermissions, readPermissions, pendingCount, readersDeparting),
    )
}

// simple and efficient non-reentrant read write mutex without timeout shenanigans
// ReaderMutex does not track any locks owners, WriterMutex track only the current writer owner
internal class SimpleReadWriteMutex(
    private val pendingCount: AtomicInteger,
    override val read: ReaderMutex,
    override val write: WriterMutex,
) : ReadWriteMutex {
    override val state: ReadWriteMutex.LockState
        get() {
            val state = pendingCount.get()
            return if (state > 0)
                ReadWriteMutex.LockState.READ_LOCKED
            else if (state == 0)
                ReadWriteMutex.LockState.UNLOCKED
            else
                ReadWriteMutex.LockState.WRITE_LOCKED
        }

    override fun ensure(targetState: ReadWriteMutex.LockState) {
        val currentState = state
        if (currentState != targetState)
            throw IllegalStateException("the ReadWriteMutex was expected to be $targetState but was $currentState")
    }

    override fun ensureWriteLocked() {
        ensure(ReadWriteMutex.LockState.WRITE_LOCKED)
    }

    override fun ensureReadLocked() {
        ensure(ReadWriteMutex.LockState.READ_LOCKED)
    }

    override fun ensureUnlocked() {
        ensure(ReadWriteMutex.LockState.UNLOCKED)
    }

    override fun toString(): String = "SimpleReadWriteMutex($state)"
}

internal sealed class AbstractReadOrWriteMutex(
    protected val mutex: Mutex,
    protected val writePermissions: Channel<Unit>,
    protected val readPermissions: Channel<Unit>,
    protected val pendingCount: AtomicInteger,
    protected val readersDeparting: AtomicInteger,
) : Mutex {

    companion object {
        const val MAX_READERS = 1 shl 30
    }

    @Deprecated(
        message = "Mutex.onLock deprecated without replacement. For additional details please refer to #2794",
        level = DeprecationLevel.WARNING,
        replaceWith = ReplaceWith(""),
    )
    override val onLock: SelectClause2<Any?, Mutex>
        get() = throw OperationNotSupportedException("")

    override fun toString(): String {
        val lockState = if (isLocked) "locked" else "unlocked"
        val className = this::class.simpleName ?: "Mutex"
        return "$className($lockState)"
    }

    protected fun overUnlockedException() =
        IllegalStateException("cannot unlock because the mutex is already unlocked!")

    fun debug() = toString() + "{readersDeparting=$readersDeparting,pendingCount=$pendingCount}"
}

internal class ReaderMutex(
    mutex: Mutex,
    writePermissions: Channel<Unit>,
    readPermissions: Channel<Unit>,
    pendingCount: AtomicInteger,
    readersDeparting: AtomicInteger,
) : AbstractReadOrWriteMutex(mutex, writePermissions, readPermissions, pendingCount, readersDeparting) {

    override val isLocked: Boolean
        get() = pendingCount.get() > 0

    override suspend fun lock(owner: Any?) {
        if (owner != null) throw UnsupportedOperationException()
        if (pendingCount.incrementAndGet() < 0) {
            readPermissions.receive()
        }
    }

    override fun unlock(owner: Any?) {
        if (owner != null) throw UnsupportedOperationException()
        val readersCount = pendingCount.decrementAndGet()
        if (readersCount < 0) {
            if (readersCount == -1 || readersCount == -MAX_READERS - 1) {
                throw overUnlockedException()
            }
            if (readersDeparting.decrementAndGet() == 0) {
                writePermissions.trySend(Unit)
            }
        }
    }

    override fun tryLock(owner: Any?): Boolean {
        if (owner != null) throw UnsupportedOperationException()
        while (true) {
            val readersCount = pendingCount.get()
            if (readersCount < 0) {
                return false
            }
            if (pendingCount.compareAndSet(readersCount, readersCount + 1)) {
                return true
            }
        }
    }

    override fun holdsLock(owner: Any): Boolean {
        throw UnsupportedOperationException()
    }
}

internal class WriterMutex(
    mutex: Mutex,
    writePermissions: Channel<Unit>,
    readPermissions: Channel<Unit>,
    pendingCount: AtomicInteger,
    readersDeparting: AtomicInteger,
) : AbstractReadOrWriteMutex(mutex, writePermissions, readPermissions, pendingCount, readersDeparting) {

    override val isLocked: Boolean
        get() = pendingCount.get() < 0

    override suspend fun lock(owner: Any?) {
        mutex.lock(owner)
        val oldReadersCount = pendingCount.getAndAdd(-MAX_READERS)
        if (oldReadersCount != 0 && readersDeparting.addAndGet(oldReadersCount) != 0) {
            writePermissions.receive()
        }
    }

    override fun unlock(owner: Any?) {
        val newReadersCount = pendingCount.addAndGet(MAX_READERS)
        if (newReadersCount >= MAX_READERS) {
            throw overUnlockedException()
        }
        repeat(newReadersCount) {
            readPermissions.trySend(Unit)
        }
        mutex.unlock(owner)
    }

    override fun tryLock(owner: Any?): Boolean {
        if (!mutex.tryLock(owner)) {
            return false
        }
        if (!pendingCount.compareAndSet(0, -MAX_READERS)) {
            mutex.unlock(owner)
            return false
        }
        return true
    }

    override fun holdsLock(owner: Any): Boolean = mutex.holdsLock(owner)
}

// not used, but may need it again to test some old school concurrent algorithms later
internal class Condition(val mutex: Mutex = Mutex()) {
    private val signals = Channel<Unit>()

    fun signal(): Boolean = signals.trySend(Unit).isSuccess

    suspend fun waitSignalUnlocked(owner: Any? = null) {
        mutex.unlock(owner)
        signals.receive()
        mutex.lock(owner)
    }

    suspend fun waitSignal() {
        signals.receive()
    }
}

@UnderMybrella
Copy link

I've been working on a semaphore-backed implementation of this, it's not super clean at the moment but I think I've got a basic iteration working over here. I'm gonna do some polishing before submitting a PR though, make sure it's working as intended.

@qwwdfsad
Copy link
Member

We still haven't decided whether we are going to provide our own RW-mutex (though we have a working prototype here: #2045), and we are not ready to accept a PR with that either. Writing your own might be a tricky task, so for your own use I suggest using a wrapper over Semaphore using any technique described above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests