diff --git a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java index 1c29471..8d26d44 100644 --- a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java +++ b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java @@ -32,6 +32,7 @@ import org.springframework.util.ReflectionUtils; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import javax.management.MBeanServer; @@ -143,7 +144,7 @@ void shouldConsiderInitialSize() { when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock).doOnNext(it -> creations.incrementAndGet())); when(connectionMock.validate(any())).thenReturn(Mono.empty()); - ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).build(); + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).allocatorSubscribeOn(Schedulers.immediate()).build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).consumeNextWith(actual -> { @@ -188,7 +189,7 @@ void shouldConsiderCustomizer() { when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock).doOnNext(it -> creations.incrementAndGet())); when(connectionMock.validate(any())).thenReturn(Mono.empty()); - ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).customizer(connectionPoolBuilder -> connectionPoolBuilder.sizeBetween(2, 10)).build(); + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).allocatorSubscribeOn(Schedulers.immediate()).customizer(connectionPoolBuilder -> connectionPoolBuilder.sizeBetween(2, 10)).build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).consumeNextWith(actual -> { @@ -212,7 +213,7 @@ void shouldReusePooledConnection() { AtomicLong createCounter = new AtomicLong(); when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock).doOnSubscribe(ignore -> createCounter.incrementAndGet())); - ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).initialSize(0).build(); + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).allocatorSubscribeOn(Schedulers.immediate()).initialSize(0).build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).assertNext(actual -> { @@ -237,7 +238,7 @@ void shouldCreateMultipleConnections() { when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock).doOnSubscribe(ignore -> createCounter.incrementAndGet())); when(connectionMock.validate(any())).thenReturn(Mono.empty()); - ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).initialSize(0).build(); + ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).allocatorSubscribeOn(Schedulers.immediate()).initialSize(0).build(); ConnectionPool pool = new ConnectionPool(configuration); pool.create().as(StepVerifier::create).expectNextCount(1).verifyComplete(); @@ -411,6 +412,7 @@ void shouldReusePooledConnectionAfterTimeout() { when(connectionFactoryMock.create()).thenReturn((Publisher) connectionPublisher); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) + .allocatorSubscribeOn(Schedulers.immediate()) .acquireRetry(0) .initialSize(0) .maxAcquireTime(Duration.ofMillis(70)) @@ -850,6 +852,7 @@ void shouldDropConnectionOnFailedValidation() { when(connectionMock.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false), Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) + .allocatorSubscribeOn(Schedulers.immediate()) .acquireRetry(0) .initialSize(0) .maxSize(2) @@ -876,6 +879,7 @@ void shouldDropConnectionOnFailedValidationWithRetry() { when(connectionMock.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false), Mono.just(false), Mono.empty()); ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock) + .allocatorSubscribeOn(Schedulers.immediate()) .acquireRetry(1) .initialSize(0) .maxSize(2)