Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Addresses to have a max calls per connection #8386

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions okhttp/api/okhttp.api
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,11 @@ public final class okhttp3/ConnectionPool {
public final class okhttp3/ConnectionPool$AddressPolicy {
public final field backoffDelayMillis J
public final field backoffJitterMillis I
public final field maximumConcurrentCallsPerConnection I
public final field minimumConcurrentCalls I
public fun <init> ()V
public fun <init> (IJI)V
public synthetic fun <init> (IJIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IJII)V
public synthetic fun <init> (IJIIILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class okhttp3/ConnectionSpec {
Expand Down
5 changes: 5 additions & 0 deletions okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,10 @@ class ConnectionPool internal constructor(
@JvmField val backoffDelayMillis: Long = 60 * 1000,
/** How much jitter to introduce in connection retry backoff delays */
@JvmField val backoffJitterMillis: Int = 100,
/**
* The maximum number of concurrent calls per connection.
* Set this value to 1 to disable HTTP/2 connection coalescing
*/
@JvmField val maximumConcurrentCallsPerConnection: Int = Int.MAX_VALUE,
peckb1 marked this conversation as resolved.
Show resolved Hide resolved
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer need when the references from lock.withLock were changed to this.withLock

  • though that extension function still forwards to lock.withLock so same concurrent method used one step further.

import kotlin.math.min
import okhttp3.Address
import okhttp3.Connection
import okhttp3.ConnectionListener
Expand Down Expand Up @@ -119,6 +119,8 @@ class RealConnection(
internal var allocationLimit = 1
private set

private var lastMaxConcurrentStreamsFromSettings: Int? = null

/** Current calls carried by this connection. */
val calls = mutableListOf<Reference<RealCall>>()

Expand Down Expand Up @@ -176,7 +178,8 @@ class RealConnection(
.flowControlListener(flowControlListener)
.build()
this.http2Connection = http2Connection
this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
this.lastMaxConcurrentStreamsFromSettings = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
recalculateAllocationLimit()
http2Connection.start()
}

Expand Down Expand Up @@ -335,7 +338,7 @@ class RealConnection(
return http2Connection.isHealthy(nowNs)
}

val idleDurationNs = lock.withLock { nowNs - idleAtNs }
val idleDurationNs = this.withLock { nowNs - idleAtNs }
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source)
}
Expand All @@ -354,9 +357,21 @@ class RealConnection(
connection: Http2Connection,
settings: Settings,
) {
lock.withLock {
this.withLock {
this.lastMaxConcurrentStreamsFromSettings = settings.getMaxConcurrentStreams()
recalculateAllocationLimit()
}
}

/**
* Resets the [allocationLimit] field based on any settings which may have been applied
* Needed to allow for policy changes to adjust the limit, similarly to the change
* made during settings changes
*/
internal fun recalculateAllocationLimit() {
this.withLock {
val oldLimit = allocationLimit
allocationLimit = settings.getMaxConcurrentStreams()
allocationLimit = getMaximumAllocationLimit()

if (allocationLimit < oldLimit) {
// We might need new connections to keep policies satisfied
Expand All @@ -368,6 +383,18 @@ class RealConnection(
}
}

private fun getMaximumAllocationLimit(): Int {
// if we have not negotiated a max per streams yet, don't check for the policy override
val negotiatedMaxCurrentStreams = lastMaxConcurrentStreamsFromSettings ?: return 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just defaulted to 1 for Http/1 and Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams() for Http/2?

should it be a lateinit and start sets to one or the other?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe?

private var lastMaxConcurrentStreamsFromSettings = 1 would avoid the early return, but then we're checking against the policy.

Would it be better to have the null and early return or check the policy every time, even if we're not in HTTP/2 land?


val maxPolicyValue =
connectionPool.getPolicy(route.address)
?.maximumConcurrentCallsPerConnection
?: Int.MAX_VALUE

return min(maxPolicyValue, negotiatedMaxCurrentStreams)
}

override fun handshake(): Handshake? = handshake

/** Track a bad route in the route database. Other routes will be attempted first. */
Expand Down Expand Up @@ -398,7 +425,7 @@ class RealConnection(
e: IOException?,
) {
var noNewExchangesEvent = false
lock.withLock {
this.withLock {
if (e is StreamResetException) {
when {
e.errorCode == ErrorCode.REFUSED_STREAM -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class RealConnectionPool(
* This confirms the returned connection is healthy before returning it. If this encounters any
* unhealthy connections in its search, this will clean them up.
*
* If [routes] is non-null these are the resolved routes (ie. IP addresses) for the connection.
* If [routes] is non-null these are the resolved routes (i.e. IP addresses) for the connection.
* This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
* and `square.ca`.
*/
Expand Down Expand Up @@ -389,12 +389,30 @@ class RealConnectionPool(
}
}

for (connection in connections) {
if (connection.route.address != address) {
continue
}
// This method takes a lock in order to recalculate the limit
// This will also change the maximum connections (if needed) for us
connection.recalculateAllocationLimit()
}

// change the minimum connections (if needed)
when {
newConnectionsNeeded > 0 -> state.scheduleOpener()
newConnectionsNeeded < 0 -> scheduleCloser()
}
}

/**
* Fetches a stored polity for a given [address]
* Returns null if no policy was set for that address.
*/
fun getPolicy(address: Address): ConnectionPool.AddressPolicy? {
return this.addressStates[address]?.policy
}

/** Open connections to [address], if required by the address policy. */
fun scheduleOpener(address: Address) {
addressStates[address]?.scheduleOpener()
Expand All @@ -405,7 +423,7 @@ class RealConnectionPool(
}

/**
* Ensure enough connections open to [address] to satisfy its [ConnectionPool.AddressPolicy].
* Ensure enough connections open to [AddressState.address] to satisfy its [ConnectionPool.AddressPolicy].
* If there are already enough connections, we're done.
* If not, we create one and then schedule the task to run again immediately.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ class ConnectionPoolTest {
assertThat(pool.connectionCount()).isEqualTo(2)
forceConnectionsToExpire(pool, expireTime)
assertThat(pool.connectionCount()).isEqualTo(1)

setPolicy(pool, address, ConnectionPool.AddressPolicy(3))
assertThat(pool.connectionCount()).isEqualTo(3)
}

@Test fun connectionPreWarmingHttp2() {
Expand Down Expand Up @@ -263,6 +266,73 @@ class ConnectionPoolTest {
assertThat(pool.connectionCount()).isEqualTo(1)
}

@Test fun testSettingMaxConcurrentOnAddressPolicyHttp2() {
taskFaker.advanceUntil(System.nanoTime())
val expireSooner = taskFaker.nanoTime + 1_000_000_000_000
val expireLater = taskFaker.nanoTime + 2_000_000_000_000

routePlanner.autoGeneratePlans = true
val address = routePlanner.address
val pool = routePlanner.pool

// Add a connection to the pool that won't expire for a while
routePlanner.defaultConnectionIdleAtNanos = expireLater
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 1))
assertThat(pool.connectionCount()).isEqualTo(1)

// All other connections created will expire sooner
routePlanner.defaultConnectionIdleAtNanos = expireSooner

// Turn it into an http/2 connection that supports 5 concurrent streams
// which can satisfy a larger policy
val connection = routePlanner.plans.first().connection
val http2Connection = connectHttp2(peer, connection, 5)
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 5))
assertThat(pool.connectionCount()).isEqualTo(1)

// Decrease the policy max connections, and check that new connections are created
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 5, maximumConcurrentCallsPerConnection = 1))
// fills up the first connect and then adds single connections
// 5 = 1 + 1 + 1 + 1 + 1 (five unique connections)
assertThat(pool.connectionCount()).isEqualTo(5)

// increase the policy max connections, and check that new connections are created
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 5, maximumConcurrentCallsPerConnection = 2))
forceConnectionsToExpire(pool, expireSooner)
// fills up the first connect and then adds single connections
// 5 = 2 + 1 + 1 + 1 (four unique connections)
assertThat(pool.connectionCount()).isEqualTo(4)

// increase the policy max connections, and check that new connections are created
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 5, maximumConcurrentCallsPerConnection = 4))
forceConnectionsToExpire(pool, expireSooner)
// fills up the first connect and then adds single connections
// 5 = 4 + 1 (two unique connections)
assertThat(pool.connectionCount()).isEqualTo(2)

// Decrease the policy max connections, and check that new connections are created
setPolicy(pool, address, ConnectionPool.AddressPolicy(minimumConcurrentCalls = 5, maximumConcurrentCallsPerConnection = 3))
// fills up the first connect and then removes an unused after
// 5 = 3 + 1 + 1 (three unique connections)
assertThat(pool.connectionCount()).isEqualTo(3)

// If you update the settings to something smaller than the current
// set policy it should be adhered too
updateMaxConcurrentStreams(http2Connection, 2)
forceConnectionsToExpire(pool, expireSooner)
// fills up the first connect and then adds single connections
// 5 = 2 + 1 + 1 + 1 (four unique connections)
assertThat(pool.connectionCount()).isEqualTo(4)

// If you update the settings to something more than the current
// set policy it should not go past the max in the policy
updateMaxConcurrentStreams(http2Connection, 5)
forceConnectionsToExpire(pool, expireSooner)
// fills up the first connect and then adds single connections
// 5 = 3 + 1 + 1 (three unique connections)
assertThat(pool.connectionCount()).isEqualTo(3)
}

private fun setPolicy(
pool: RealConnectionPool,
address: Address,
Expand Down