Skip to content

Commit

Permalink
core, netty: server builders extend a public API class
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk authored and dfawley committed Jan 15, 2021
1 parent 68498ff commit b8e2273
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 77 deletions.
41 changes: 33 additions & 8 deletions core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import com.google.common.base.Preconditions;
import io.grpc.Deadline;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SharedResourcePool;
import java.io.File;
import java.util.Collections;
Expand Down Expand Up @@ -67,8 +72,7 @@
* </pre>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1783")
public final class InProcessServerBuilder
extends AbstractServerImplBuilder<InProcessServerBuilder> {
public final class InProcessServerBuilder extends ForwardingServerBuilder<InProcessServerBuilder> {
/**
* Create a server builder that will bind with the given name.
*
Expand All @@ -93,22 +97,40 @@ public static String generateName() {
return UUID.randomUUID().toString();
}

private final ServerImplBuilder serverImplBuilder;
final String name;
int maxInboundMetadataSize = Integer.MAX_VALUE;
ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

private InProcessServerBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name");

final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
public List<? extends InternalServer> buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return buildTransportServers(streamTracerFactories);
}
}

serverImplBuilder = new ServerImplBuilder(new InProcessClientTransportServersBuilder());

// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
serverImplBuilder.setStatsRecordStartedRpcs(false);
serverImplBuilder.setStatsRecordFinishedRpcs(false);
// Disable handshake timeout because it is unnecessary, and can trigger Thread creation that can
// break some environments (like tests).
handshakeTimeout(Long.MAX_VALUE, TimeUnit.SECONDS);
}

@Internal
@Override
protected ServerBuilder<?> delegate() {
return serverImplBuilder;
}

/**
* Provides a custom scheduled executor service.
*
Expand Down Expand Up @@ -140,7 +162,7 @@ public InProcessServerBuilder scheduledExecutorService(
* @since 1.24.0
*/
public InProcessServerBuilder deadlineTicker(Deadline.Ticker ticker) {
setDeadlineTicker(ticker);
serverImplBuilder.setDeadlineTicker(ticker);
return this;
}

Expand All @@ -164,8 +186,7 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
return this;
}

@Override
protected List<InProcessServer> buildTransportServers(
List<InProcessServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
}
Expand All @@ -174,4 +195,8 @@ protected List<InProcessServer> buildTransportServers(
public InProcessServerBuilder useTransportSecurity(File certChain, File privateKey) {
throw new UnsupportedOperationException("TLS not supported in InProcessServer");
}

void setStatsEnabled(boolean value) {
this.serverImplBuilder.setStatsEnabled(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.inprocess;

import io.grpc.Internal;

/**
* Internal {@link InProcessServerBuilder} accessor. This is intended for usage internal to
* the gRPC team. If you *really* think you need to use this, contact the gRPC team first.
*/
@Internal
public class InternalInProcessServerBuilder {
public static void setStatsEnabled(InProcessServerBuilder builder, boolean value) {
builder.setStatsEnabled(value);
}

private InternalInProcessServerBuilder() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import io.grpc.auth.MoreCallCredentials;
import io.grpc.census.InternalCensusStatsAccessor;
import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StatsTestUtils;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
Expand Down Expand Up @@ -171,7 +170,6 @@ public abstract class AbstractInteropTest {

private ScheduledExecutorService testServiceExecutor;
private Server server;
private boolean customCensusModulePresent;

private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -245,21 +243,7 @@ private void startServer() {
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
if (builder instanceof AbstractServerImplBuilder) {
customCensusModulePresent = true;
ServerStreamTracer.Factory censusTracerFactory =
InternalCensusStatsAccessor
.getServerStreamTracerFactory(
tagger, tagContextBinarySerializer, serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true, false /* real-time metrics */);
AbstractServerImplBuilder<?> sb = (AbstractServerImplBuilder<?>) builder;
io.grpc.internal.TestingAccessor.setStatsEnabled(sb, false);
sb.addStreamTracerFactory(censusTracerFactory);
}
if (metricsExpected()) {
assertThat(builder).isInstanceOf(AbstractServerImplBuilder.class);
}

try {
server = builder.build().start();
} catch (IOException ex) {
Expand Down Expand Up @@ -373,6 +357,20 @@ protected final ClientInterceptor createCensusStatsClientInterceptor() {
true, true, true, false /* real-time metrics */);
}

protected final ServerStreamTracer.Factory createCustomCensusTracerFactory() {
return InternalCensusStatsAccessor.getServerStreamTracerFactory(
tagger, tagContextBinarySerializer, serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true, false /* real-time metrics */);
}

/**
* Override this when custom census module presence is different from {@link #metricsExpected()}.
*/
protected boolean customCensusModulePresent() {
return metricsExpected();
}

/**
* Return true if exact metric values should be checked.
*/
Expand Down Expand Up @@ -1510,7 +1508,7 @@ public void customMetadata() throws Exception {
@Test(timeout = 10000)
public void censusContextsPropagated() {
Assume.assumeTrue("Skip the test because server is not in the same process.", server != null);
Assume.assumeTrue(customCensusModulePresent);
Assume.assumeTrue(customCensusModulePresent());
Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan();
// A valid ID is guaranteed to be unique, so we can verify it is actually propagated.
assertTrue(clientParentSpan.getContext().getTraceId().isValid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.grpc.testing.integration;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
Expand All @@ -28,9 +29,12 @@
public class AutoWindowSizingOnTest extends AbstractInteropTest {

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
protected ServerBuilder<?> getServerBuilder() {
NettyServerBuilder builder = NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
// Disable the default census stats tracer, use testing tracer instead.
InternalNettyServerBuilder.setStatsEnabled(builder, false);
return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package io.grpc.testing.integration;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
Expand All @@ -38,14 +39,17 @@ public class Http2NettyLocalChannelTest extends AbstractInteropTest {
private DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder
protected ServerBuilder<?> getServerBuilder() {
NettyServerBuilder builder = NettyServerBuilder
.forAddress(new LocalAddress("in-process-1"))
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.channelType(LocalServerChannel.class)
.workerEventLoopGroup(eventLoopGroup)
.bossEventLoopGroup(eventLoopGroup);
// Disable the default census stats tracer, use testing tracer instead.
InternalNettyServerBuilder.setStatsEnabled(builder, false);
return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
Expand All @@ -41,10 +42,10 @@
public class Http2NettyTest extends AbstractInteropTest {

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ServerBuilder<?> getServerBuilder() {
// Starts the server with HTTPS.
try {
return NettyServerBuilder.forPort(0)
NettyServerBuilder builder = NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(GrpcSslContexts
Expand All @@ -53,6 +54,9 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
.trustManager(TestUtils.loadCert("ca.pem"))
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.build());
// Disable the default census stats tracer, use testing tracer instead.
InternalNettyServerBuilder.setStatsEnabled(builder, false);
return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import com.google.common.base.Throwables;
import com.squareup.okhttp.ConnectionSpec;
import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.okhttp.InternalOkHttpChannelBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static void loadConscrypt() throws Exception {
}

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ServerBuilder<?> getServerBuilder() {
// Starts the server with HTTPS.
try {
SslProvider sslProvider = SslContext.defaultServerProvider();
Expand All @@ -77,10 +78,13 @@ protected AbstractServerImplBuilder<?> getServerBuilder() {
.forServer(TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"));
GrpcSslContexts.configure(contextBuilder, sslProvider);
contextBuilder.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE);
return NettyServerBuilder.forPort(0)
NettyServerBuilder builder = NettyServerBuilder.forPort(0)
.flowControlWindow(AbstractInteropTest.TEST_FLOW_CONTROL_WINDOW)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(contextBuilder.build());
// Disable the default census stats tracer, use testing tracer instead.
InternalNettyServerBuilder.setStatsEnabled(builder, false);
return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package io.grpc.testing.integration;

import io.grpc.ServerBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.inprocess.InternalInProcessChannelBuilder;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.inprocess.InternalInProcessServerBuilder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand All @@ -30,9 +31,12 @@ public class InProcessTest extends AbstractInteropTest {
private static final String SERVER_NAME = "test";

@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
protected ServerBuilder<?> getServerBuilder() {
// Starts the in-process server.
return InProcessServerBuilder.forName(SERVER_NAME);
InProcessServerBuilder builder = InProcessServerBuilder.forName(SERVER_NAME);
// Disable the default census stats tracer, use testing tracer instead.
InternalInProcessServerBuilder.setStatsEnabled(builder, false);
return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
}

@Override
Expand All @@ -43,6 +47,12 @@ protected InProcessChannelBuilder createChannelBuilder() {
return builder.intercept(createCensusStatsClientInterceptor());
}

@Override
protected boolean customCensusModulePresent() {
// Metrics values are not expected, but custom census module is still used.
return true;
}

@Override
protected boolean metricsExpected() {
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizes are
Expand Down

0 comments on commit b8e2273

Please sign in to comment.