Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent tests and avoid testing HTTP/2 Frames with Log Handler. #8257

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ subprojects {

systemProperty("okhttp.platform", platform)
systemProperty("junit.jupiter.extensions.autodetection.enabled", "true")
systemProperty("junit.jupiter.execution.parallel.enabled", "true")
systemProperty("junit.jupiter.execution.parallel.mode.default", "concurrent")
systemProperty("junit.jupiter.execution.parallel.config.strategy", "dynamic")
systemProperty("junit.jupiter.execution.parallel.config.dynamic.factor", "1.0")
}

// https://publicobject.com/2023/04/16/read-a-project-file-in-a-kotlin-multiplatform-test/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

package mockwebserver3.internal.http2

import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class EventSourceRecorder : EventSourceListener() {
}

private fun nextEvent(): Any {
return events.poll(10, TimeUnit.SECONDS)
return events.poll(25, TimeUnit.SECONDS)
?: throw AssertionError("Timed out waiting for event.")
}

Expand All @@ -96,7 +96,8 @@ class EventSourceRecorder : EventSourceListener() {
}

fun assertOpen(): EventSource {
val event = nextEvent() as Open
val event = nextEvent()
check(event is Open) { "Expected Open but was $event" }
return event.eventSource
}

Expand All @@ -105,7 +106,8 @@ class EventSourceRecorder : EventSourceListener() {
}

fun assertFailure(message: String?) {
val event = nextEvent() as Failure
val event = nextEvent()
check(event is Failure) { "Expected Failure but was $event" }
if (message != null) {
assertThat(event.t!!.message).isEqualTo(message)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package okhttp3

import android.annotation.SuppressLint
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.logging.Handler
Expand All @@ -29,7 +30,6 @@ import kotlin.concurrent.withLock
import okhttp3.internal.buildConnectionPool
import okhttp3.internal.concurrent.TaskRunner
import okhttp3.internal.connection.RealConnectionPool
import okhttp3.internal.http2.Http2
import okhttp3.internal.taskRunnerInternal
import okhttp3.testing.Flaky
import okhttp3.testing.PlatformRule.Companion.LOOM_PROPERTY
Expand All @@ -55,6 +55,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null
private var taskQueuesWereIdle: Boolean = false
val connectionListener = RecordingConnectionListener()
val taskLogger = RecordingTaskLogger()

var logger: Logger? = null

Expand Down Expand Up @@ -88,8 +89,6 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
override fun publish(record: LogRecord) {
val recorded =
when (record.loggerName) {
TaskRunner::class.java.name -> recordTaskRunner
Http2::class.java.name -> recordFrames
"javax.net.ssl" -> recordSslDebug && !sslExcludeFilter.matches(record.message)
else -> false
}
Expand Down Expand Up @@ -121,8 +120,6 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
private fun applyLogger(fn: Logger.() -> Unit) {
Logger.getLogger(OkHttpClient::class.java.`package`.name).fn()
Logger.getLogger(OkHttpClient::class.java.name).fn()
Logger.getLogger(Http2::class.java.name).fn()
Logger.getLogger(TaskRunner::class.java.name).fn()
Logger.getLogger("javax.net.ssl").fn()
}

Expand Down Expand Up @@ -158,7 +155,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
private fun initialClientBuilder(): OkHttpClient.Builder =
if (isLoom()) {
val backend = TaskRunner.RealBackend(loomThreadFactory())
val taskRunner = TaskRunner(backend)
val taskRunner = TaskRunner(backend, logger = taskLogger)

OkHttpClient.Builder()
.connectionPool(
Expand All @@ -170,8 +167,12 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
.dispatcher(Dispatcher(backend.executor))
.taskRunnerInternal(taskRunner)
} else {
val backend = TaskRunner.RealBackend(SharedExecutor)
val taskRunner = TaskRunner(backend, logger = taskLogger)

OkHttpClient.Builder()
.connectionPool(ConnectionPool(connectionListener = connectionListener))
.taskRunnerInternal(taskRunner)
}

private fun loomThreadFactory(): ThreadFactory {
Expand Down Expand Up @@ -322,7 +323,17 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
return connectionListener.recordedEventTypes()
}

fun takeFrameLogs(): List<String> {
return connectionListener.takeFrameLogs()
}

fun takeFrameLog(): String {
return connectionListener.takeFrameLog()
}

companion object {
val SharedExecutor by lazy { Executors.newCachedThreadPool() }

/**
* A network that resolves only one IP address per host. Use this when testing route selection
* fallbacks to prevent the host machine's various IP addresses from interfering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "CANNOT_OVERRIDE_INVISIBLE_MEMBER")

package okhttp3

import assertk.assertThat
Expand All @@ -22,9 +24,11 @@ import assertk.assertions.isInstanceOf
import assertk.assertions.matchesPredicate
import java.util.Deque
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import okhttp3.ConnectionEvent.NoNewExchanges
import okhttp3.internal.connection.RealConnection
import okhttp3.internal.http2.FrameLogger
import okio.IOException
import org.junit.jupiter.api.Assertions

Expand All @@ -34,8 +38,9 @@ open class RecordingConnectionListener(
* EventListeners added by Interceptors will not see all events.
*/
private val enforceOrder: Boolean = true,
) : ConnectionListener() {
) : ConnectionListener(), FrameLogger {
val eventSequence: Deque<ConnectionEvent> = ConcurrentLinkedDeque()
private val frameLogs = LinkedBlockingQueue<String>()

private val forbiddenLocks = mutableSetOf<Any>()

Expand All @@ -47,6 +52,22 @@ open class RecordingConnectionListener(
forbiddenLocks.add(lock)
}

override fun logFrame(frameLog: () -> String) {
frameLogs.add(frameLog())
}

fun takeFrameLog(): String {
return frameLogs.take()
}

fun takeFrameLogs(): List<String> {
synchronized(frameLogs) {
return frameLogs.toList().also {
frameLogs.clear()
}
}
}

/**
* Removes recorded events up to (and including) an event is found whose class equals [eventClass]
* and returns it.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2024 Block, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "CANNOT_OVERRIDE_INVISIBLE_MEMBER")

package okhttp3

import java.util.concurrent.LinkedBlockingQueue
import okhttp3.internal.concurrent.Task
import okhttp3.internal.concurrent.TaskLogger
import okhttp3.internal.concurrent.TaskLogger.Logging.logString
import okhttp3.internal.concurrent.TaskQueue

class RecordingTaskLogger : TaskLogger {
private val logs = LinkedBlockingQueue<String>()

override fun taskLog(
task: Task,
queue: TaskQueue,
messageBlock: () -> String,
) {
logs.add(logString(queue, messageBlock(), task))
}

fun takeAllLogs(): List<String> {
synchronized(logs) {
return logs.toList().also {
logs.clear()
}
}
}

override val loggingEnabled: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")

package okhttp3.internal.concurrent

import assertk.assertThat
Expand All @@ -24,9 +26,9 @@ import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Logger
import kotlin.concurrent.withLock
import okhttp3.OkHttpClient
import okhttp3.RecordingTaskLogger

/**
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
Expand Down Expand Up @@ -61,8 +63,6 @@ class TaskFaker : Closeable {
}
}

val logger = Logger.getLogger("TaskFaker." + instance++)

/** Though this executor service may hold many threads, they are not executed concurrently. */
private val tasksExecutor = Executors.newCachedThreadPool()

Expand Down Expand Up @@ -97,6 +97,8 @@ class TaskFaker : Closeable {
/** True if new tasks should run immediately without stalling. Guarded by [taskRunner]. */
private var isRunningAllTasks = false

private val taskLogger = RecordingTaskLogger()

/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
val taskRunner: TaskRunner =
TaskRunner(
Expand Down Expand Up @@ -164,7 +166,7 @@ class TaskFaker : Closeable {

override fun <T> decorate(queue: BlockingQueue<T>) = TaskFakerBlockingQueue(queue)
},
logger = logger,
logger = taskLogger,
)

/** Wait for the test thread to proceed. */
Expand Down Expand Up @@ -344,6 +346,10 @@ class TaskFaker : Closeable {
tasksExecutor.shutdownNow()
}

fun takeAllLogs(): List<String> {
return taskLogger.takeAllLogs()
}

companion object {
var instance = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ open class PlatformRule
}
}

fun assumeNotWindows() {
assumeFalse(isWindows, "This test fails on Windows.")
}

val isWindows: Boolean
get() = System.getProperty("os.name", "?").startsWith("Windows")

val isAndroid: Boolean
get() = Platform.Companion.isAndroid

Expand Down