Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Addresses to have a max calls per connection #8386

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
13 changes: 11 additions & 2 deletions okhttp/api/okhttp.api
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,25 @@ public final class okhttp3/ConnectionPool {
public final fun connectionCount ()I
public final fun evictAll ()V
public final fun idleConnectionCount ()I
public final fun setDefaultPolicy (Lokhttp3/ConnectionPool$ConnectionPoolPolicy;)V
public final fun setPolicy (Lokhttp3/Address;Lokhttp3/ConnectionPool$AddressPolicy;)V
}

public final class okhttp3/ConnectionPool$AddressPolicy {
public final field backoffDelayMillis J
public final field backoffJitterMillis I
public final field maximumConcurrentCallsPerConnection Ljava/lang/Integer;
public final field minimumConcurrentCalls I
public fun <init> ()V
public fun <init> (IJI)V
public synthetic fun <init> (IJIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IJILjava/lang/Integer;)V
public synthetic fun <init> (IJILjava/lang/Integer;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class okhttp3/ConnectionPool$ConnectionPoolPolicy {
public final field maximumConcurrentCallsPerConnection I
public fun <init> ()V
public fun <init> (I)V
public synthetic fun <init> (IILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class okhttp3/ConnectionSpec {
Expand Down
32 changes: 32 additions & 0 deletions okhttp/src/main/kotlin/okhttp3/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ class ConnectionPool internal constructor(
delegate.setPolicy(address, policy)
}

/**
* Sets a default AddressPolicy that applies to all addresses.
* Overwrites any existing default policy.
*/
@ExperimentalOkHttpApi
fun setDefaultPolicy(policy: ConnectionPoolPolicy) {
delegate.setDefaultPolicy(policy)
}

/**
* A policy for how the pool should treat a specific address.
*/
Expand All @@ -152,5 +161,28 @@ class ConnectionPool internal constructor(
@JvmField val backoffDelayMillis: Long = 60 * 1000,
/** How much jitter to introduce in connection retry backoff delays */
@JvmField val backoffJitterMillis: Int = 100,
/**
* The maximum number of concurrent calls per connection for a given address.
* Set this value to 1 to disable HTTP/2 connection coalescing
*/
@JvmField val maximumConcurrentCallsPerConnection: Int? = null,
)

/**
* A default policy for all connections. Overridable by [AddressPolicy]
* for specific addresses.
*/
class ConnectionPoolPolicy(
/**
* The maximum number of concurrent calls per connection.
* Set this value to 1 to disable HTTP/2 connection coalescing
*/
@JvmField val maximumConcurrentCallsPerConnection: Int = Int.MAX_VALUE,
) {
init {
require(maximumConcurrentCallsPerConnection > 0) {
"maximumConcurrentCallsPerConnection is not allowed to be less than one (1)."
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.locks.ReentrantLock
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSocket
import kotlin.concurrent.withLock
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer need when the references from lock.withLock were changed to this.withLock

  • though that extension function still forwards to lock.withLock so same concurrent method used one step further.

import kotlin.math.min
import okhttp3.Address
import okhttp3.Connection
import okhttp3.ConnectionListener
Expand Down Expand Up @@ -119,6 +119,8 @@ class RealConnection(
internal var allocationLimit = 1
private set

private var lastMaxConcurrentStreamsFromSettings: Int? = null

/** Current calls carried by this connection. */
val calls = mutableListOf<Reference<RealCall>>()

Expand Down Expand Up @@ -176,7 +178,8 @@ class RealConnection(
.flowControlListener(flowControlListener)
.build()
this.http2Connection = http2Connection
this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
this.lastMaxConcurrentStreamsFromSettings = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
recalculateAllocationLimit()
http2Connection.start()
}

Expand Down Expand Up @@ -335,7 +338,7 @@ class RealConnection(
return http2Connection.isHealthy(nowNs)
}

val idleDurationNs = lock.withLock { nowNs - idleAtNs }
val idleDurationNs = this.withLock { nowNs - idleAtNs }
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source)
}
Expand All @@ -354,9 +357,21 @@ class RealConnection(
connection: Http2Connection,
settings: Settings,
) {
lock.withLock {
this.withLock {
this.lastMaxConcurrentStreamsFromSettings = settings.getMaxConcurrentStreams()
recalculateAllocationLimit()
}
}

/**
* Resets the [allocationLimit] field based on any settings which may have been applied
* Needed to allow for policy changes to adjust the limit, similarly to the change
* made during settings changes
*/
internal fun recalculateAllocationLimit() {
this.withLock {
val oldLimit = allocationLimit
allocationLimit = settings.getMaxConcurrentStreams()
allocationLimit = getMaximumAllocationLimit()

if (allocationLimit < oldLimit) {
// We might need new connections to keep policies satisfied
Expand All @@ -368,6 +383,17 @@ class RealConnection(
}
}

private fun getMaximumAllocationLimit(): Int {
// if we have not negotiated a max per streams yet, don't check for the policy override
val negotiatedMaxCurrentStreams = lastMaxConcurrentStreamsFromSettings ?: return 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just defaulted to 1 for Http/1 and Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams() for Http/2?

should it be a lateinit and start sets to one or the other?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe?

private var lastMaxConcurrentStreamsFromSettings = 1 would avoid the early return, but then we're checking against the policy.

Would it be better to have the null and early return or check the policy every time, even if we're not in HTTP/2 land?


val maxPolicyValue =
connectionPool.getMaximumCallsPerConnection(route.address)
?: Int.MAX_VALUE

return min(maxPolicyValue, negotiatedMaxCurrentStreams)
}

override fun handshake(): Handshake? = handshake

/** Track a bad route in the route database. Other routes will be attempted first. */
Expand Down Expand Up @@ -398,7 +424,7 @@ class RealConnection(
e: IOException?,
) {
var noNewExchangesEvent = false
lock.withLock {
this.withLock {
if (e is StreamResetException) {
when {
e.errorCode == ErrorCode.REFUSED_STREAM -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class RealConnectionPool(
@Volatile
private var addressStates: Map<Address, AddressState> = mapOf()

@Volatile
private var defaultPolicy: ConnectionPool.ConnectionPoolPolicy? = null

private val cleanupQueue: TaskQueue = taskRunner.newQueue()
private val cleanupTask =
object : Task("$okHttpName ConnectionPool connection closer") {
Expand Down Expand Up @@ -97,7 +100,7 @@ class RealConnectionPool(
* This confirms the returned connection is healthy before returning it. If this encounters any
* unhealthy connections in its search, this will clean them up.
*
* If [routes] is non-null these are the resolved routes (ie. IP addresses) for the connection.
* If [routes] is non-null these are the resolved routes (i.e. IP addresses) for the connection.
* This is used to coalesce related domains to the same HTTP/2 connection, such as `square.com`
* and `square.ca`.
*/
Expand Down Expand Up @@ -389,12 +392,60 @@ class RealConnectionPool(
}
}

for (connection in connections) {
if (connection.route.address != address) {
continue
}
// This method takes a lock in order to recalculate the limit
// This will also change the maximum connections (if needed) for us
connection.recalculateAllocationLimit()
}

// change the minimum connections (if needed)
when {
newConnectionsNeeded > 0 -> state.scheduleOpener()
newConnectionsNeeded < 0 -> scheduleCloser()
}
}

/**
* Fetches the maximum number of calls allowed for a connection, for a given address
*/
fun getMaximumCallsPerConnection(address: Address): Int? {
val specificMaximum = this.addressStates[address]?.policy?.maximumConcurrentCallsPerConnection
val globalMaximum = this.defaultPolicy?.maximumConcurrentCallsPerConnection

return specificMaximum ?: globalMaximum
}

fun setDefaultPolicy(policy: ConnectionPool.ConnectionPoolPolicy) {
val referencePolicy = this.defaultPolicy

while (true) {
val oldPolicy = this.defaultPolicy
if (defaultPolicyUpdater.compareAndSet(this, oldPolicy, policy)) {
break
}
}

if (referencePolicy?.maximumConcurrentCallsPerConnection == policy.maximumConcurrentCallsPerConnection) {
return
}

for (connection in connections) {
val existingPolicy = addressStates[connection.route.address]

// skip any address policy that already contains a max
if (existingPolicy?.policy?.maximumConcurrentCallsPerConnection != null) {
continue
}

// This method takes a lock in order to recalculate the limit
// This will also change the maximum connections (if needed) for us
connection.recalculateAllocationLimit()
}
}

/** Open connections to [address], if required by the address policy. */
fun scheduleOpener(address: Address) {
addressStates[address]?.scheduleOpener()
Expand All @@ -405,7 +456,7 @@ class RealConnectionPool(
}

/**
* Ensure enough connections open to [address] to satisfy its [ConnectionPool.AddressPolicy].
* Ensure enough connections open to [AddressState.address] to satisfy its [ConnectionPool.AddressPolicy].
* If there are already enough connections, we're done.
* If not, we create one and then schedule the task to run again immediately.
*/
Expand Down Expand Up @@ -466,5 +517,12 @@ class RealConnectionPool(
Map::class.java,
"addressStates",
)

private var defaultPolicyUpdater =
AtomicReferenceFieldUpdater.newUpdater(
RealConnectionPool::class.java,
ConnectionPool.ConnectionPoolPolicy::class.java,
"defaultPolicy",
)
}
}