Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 4, 2023
1 parent 93531d5 commit d7d69e6
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void onComplete() {
.takeWhile(lbEvent ->
// Don't complete until we get a LoadBalancerReadyEvent that is ready.
!(lbEvent instanceof LoadBalancerReadyEvent) ||
!((LoadBalancerReadyEvent) lbEvent).isReady())
!((LoadBalancerReadyEvent) lbEvent).isReady())
.ignoreElements();
return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.RetryingHttpRequesterFilter.BackOffPolicy;
import io.servicetalk.http.netty.StreamObserverTest.MulticastTransportEventsStreamingHttpConnectionFilter;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

Expand Down Expand Up @@ -135,9 +134,9 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Http2HeadersF
});
}
}
}, parentPipeline -> {
serverParentChannel.set(parentPipeline.channel());
}, h2Builder -> {
},
parentPipeline -> serverParentChannel.set(parentPipeline.channel()),
h2Builder -> {
h2Builder.initialSettings().maxConcurrentStreams(maxConcurrentStreams);
return h2Builder;
});
Expand All @@ -162,7 +161,6 @@ void noMaxActiveStreamsViolatedErrorAfterCancel() throws Exception {
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendClientFilter(disableAutoRetries()) // All exceptions should be propagated
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.appendConnectionFilter(connection -> new StreamingHttpConnectionFilter(connection) {
@Override
public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
Expand Down Expand Up @@ -252,7 +250,6 @@ private void noMaxActiveStreamsViolatedErrorWhenLimitChanges(boolean increase,
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.executionStrategy(strategy)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
// Don't allow more than 1 connection:
.appendConnectionFactoryFilter(LimitingConnectionFactoryFilter.withMax(1))
// Retry all ConnectionLimitReachedException(s), don't retry RetryableException(s):
Expand Down Expand Up @@ -324,7 +321,6 @@ void maxActiveStreamsOutsideIntRange() throws Exception {
assert serverAddress != null;
assertThat(MAX_UNSIGNED_INT, is(greaterThan((long) Integer.MAX_VALUE)));
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.protocols(HTTP_2.config)
.build()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ private ContextAwareRetryingHttpClientFilter newFilter(final RetryingHttpRequest
final ContextAwareRetryingHttpClientFilter f =
(ContextAwareRetryingHttpClientFilter) filter.create(client);
Publisher<Object> replayLBEvents = lbEvents.replay(1);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
replayLBEvents.ignoreElements().subscribe();
f.inject(replayLBEvents, sdStatus);
return f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConsumableEvent;
import io.servicetalk.client.api.TransportObserverConnectionFactoryFilter;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.Http2Exception;
import io.servicetalk.http.api.HttpClient;
import io.servicetalk.http.api.HttpConnection;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.DataObserver;
Expand Down Expand Up @@ -54,7 +47,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.netty.AbstractStreamingHttpConnection.MAX_CONCURRENCY_NO_OFFLOADING;
import static io.servicetalk.http.netty.H2PriorKnowledgeFeatureParityTest.bindH2Server;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2;
Expand Down Expand Up @@ -121,7 +113,6 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
});
client = HttpClients.forSingleAddress(HostAndPort.of((InetSocketAddress) serverAcceptorChannel.localAddress()))
.protocols(h2().enableFrameLogging("servicetalk-tests-h2-frame-logger", TRACE, () -> true).build())
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.appendConnectionFactoryFilter(new TransportObserverConnectionFactoryFilter<>(clientTransportObserver))
.build();
}
Expand Down Expand Up @@ -183,44 +174,4 @@ void maxActiveStreamsViolationError() throws Exception {
verifyNoMoreInteractions(clientTransportObserver, clientMultiplexedObserver, clientStreamObserver,
clientDataObserver);
}

/**
* Filter that allows tests to subscribe to
* {@link FilterableStreamingHttpConnection#transportEventStream(HttpEventKey)} without loosing already delivered
* events.
*/
static final class MulticastTransportEventsStreamingHttpConnectionFilter
implements StreamingHttpConnectionFilterFactory {

static final StreamingHttpConnectionFilterFactory INSTANCE =
new MulticastTransportEventsStreamingHttpConnectionFilter();

private MulticastTransportEventsStreamingHttpConnectionFilter() {
// Singleton.
}

@Override
public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
Publisher<? extends ConsumableEvent<Integer>> maxConcurrency = connection
.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING).multicast(2);
return new StreamingHttpConnectionFilter(connection) {
@Override
@SuppressWarnings("unchecked")
public <T> Publisher<? extends T> transportEventStream(final HttpEventKey<T> eventKey) {
return eventKey == MAX_CONCURRENCY_NO_OFFLOADING ? (Publisher<? extends T>) maxConcurrency :
delegate().transportEventStream(eventKey);
}

@Override
public String toString() {
return delegate().toString();
}
};
}

@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
}
}

0 comments on commit d7d69e6

Please sign in to comment.