From baad33135cb41685d08d480e043191c957f6012b Mon Sep 17 00:00:00 2001 From: David Karnok Date: Tue, 17 Jan 2023 09:23:43 +0100 Subject: [PATCH] 3.x: Fix Single.timeout race condition (#7515) --- .../operators/single/SingleTimeout.java | 6 +---- .../operators/single/SingleTimeoutTest.java | 22 ++++++++++++++++++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java index 609747d1d8..cc5b923727 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java @@ -113,11 +113,7 @@ public void onError(Throwable e) { @Override public void run() { - Disposable d = get(); - if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) { - if (d != null) { - d.dispose(); - } + if (DisposableHelper.dispose(this)) { SingleSource other = this.other; if (other == null) { downstream.onError(new TimeoutException(timeoutMessage(timeout, unit))); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeoutTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeoutTest.java index dcc526c4f6..e54fc11207 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeoutTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeoutTest.java @@ -28,7 +28,7 @@ import io.reactivex.rxjava3.functions.Action; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.schedulers.TestScheduler; +import io.reactivex.rxjava3.schedulers.*; import io.reactivex.rxjava3.subjects.*; import io.reactivex.rxjava3.testsupport.TestHelper; @@ -255,4 +255,24 @@ protected void subscribeActual(@NonNull SingleObserver assertTrue(d.isDisposed()); } + + @Test + public void timeoutWithZero() throws InterruptedException { + int n = 10_000; + Scheduler sch = Schedulers.single(); + for (int i = 0; i < n; i++) { + final int y = i; + final CountDownLatch latch = new CountDownLatch(1); + Disposable d = Single.never() + .timeout(0, TimeUnit.NANOSECONDS, sch) + .subscribe(v -> {}, e -> { + //System.out.println("timeout " + y); + latch.countDown(); + }); + if (!latch.await(2, TimeUnit.SECONDS)) { + System.out.println(d + " " + sch); + throw new IllegalStateException("Timeout did not work at y = " + y); + } + } + } }