Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException in TaskRestartCallback #1676

Open
rfkm opened this issue Nov 30, 2022 · 0 comments
Open

NullPointerException in TaskRestartCallback #1676

rfkm opened this issue Nov 30, 2022 · 0 comments

Comments

@rfkm
Copy link
Contributor

rfkm commented Nov 30, 2022

Hello. We are encountering the problem of some tasks getting stuck on extremely rare occasions. The thread dump did not reveal any obviously blocked threads in our code base. However, we found that an uncaught error reporter reported the following NPE:

java.lang.NullPointerException
        at monix.eval.internal.TaskRestartCallback.run(TaskRestartCallback.scala:65)
        at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:66)
        at monix.execution.internal.Trampoline$ResumeRun$1.run(Trampoline.scala:51)
        at monix.execution.schedulers.TracingRunnable.run(TracingRunnable.scala:33)
        at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

I'm not sure but I suspect that the scheduled TaskRestartCallback was executed multiple times.

final def run(): Unit = {
val fn = this.register
this.register = null
fn(context, this)
}

I have not yet been able to create a minimal reproduction environment, but there is a place in our code where we are using scala.concurrent.blocking and it may have something to do with it.

protected final def forkTheRest(ec: ExecutionContext): Unit = {
final class ResumeRun(head: Runnable, rest: ChunkedArrayQueue[Runnable]) extends Runnable {
def run(): Unit = {
immediateQueue.enqueueAll(rest)
immediateLoop(head, ec)
}
}
val head = immediateQueue.dequeue()
if (head ne null) {
val rest = immediateQueue
immediateQueue = makeQueue()
ec.execute(new ResumeRun(head, rest))
}
}

The blocking context may cause forkTheRest to be invoked. Is this a thread-safe implementation?
Since ChunkedArrayQueue does not appear to be thread-safe, isn't it dangerous to potentially reference it from other threads?
I am not very familiar with the code inside monix, so it is very possible that I am misunderstanding something, but in any case, I am sure that something strange is going on and I would appreciate your help in finding out the cause.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant