Skip to content

Commit

Permalink
binder: Invoke onTransportReady() in a round-robin fashion. (#8835)
Browse files Browse the repository at this point in the history
Also call onTransportReady() only if isReady() still holds by the time
we get to a given Inbound. This dramatically reduces timeouts and
improves throughput when flow control has kicked in.

This approach is still not completely fair since each ongoing call might
consume a different amount of window on its turn, but because of the way
Outbound#writeMessageData() and BlockPool already work, everyone gets to
send at least 16kb.
  • Loading branch information
jdcormie committed Jan 14, 2022
1 parent e279479 commit 25531d6
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions binder/src/main/java/io/grpc/binder/internal/BinderTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TimeProvider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -186,6 +188,9 @@ protected enum TransportState {

protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls;

@GuardedBy("this")
private final LinkedHashSet<Integer> callIdsToNotifyWhenReady = new LinkedHashSet<>();

@GuardedBy("this")
protected Attributes attributes;

Expand Down Expand Up @@ -529,9 +534,18 @@ final void handleAcknowledgedBytes(long numBytes) {
logger.log(
Level.FINE,
"handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this);
// We're ready again, and need to poke any waiting transactions.
for (Inbound<?> inbound : ongoingCalls.values()) {
inbound.onTransportReady();

// The LinkedHashSet contract guarantees that an id already present in this collection will
// not lose its priority if we re-insert it here.
callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet());

Iterator<Integer> i = callIdsToNotifyWhenReady.iterator();
while (isReady() && i.hasNext()) {
Inbound<?> inbound = ongoingCalls.get(i.next());
i.remove();
if (inbound != null) { // Calls can be removed out from under us.
inbound.onTransportReady();
}
}
}
}
Expand Down

0 comments on commit 25531d6

Please sign in to comment.