diff --git a/examples/build.gradle b/examples/build.gradle index 9af9adb28b8e..2a064516a237 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -91,6 +91,20 @@ task helloWorldClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task retryingHelloWorldServer(type: CreateStartScripts) { + mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldServer' + applicationName = 'retrying-hello-world-server' + outputDir = new File(project.buildDir, 'tmp') + classpath = startScripts.classpath +} + +task retryingHelloWorldClient(type: CreateStartScripts) { + mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldClient' + applicationName = 'retrying-hello-world-client' + outputDir = new File(project.buildDir, 'tmp') + classpath = startScripts.classpath +} + task hedgingHelloWorldServer(type: CreateStartScripts) { mainClassName = 'io.grpc.examples.hedging.HedgingHelloWorldServer' applicationName = 'hedging-hello-world-server' @@ -119,6 +133,8 @@ applicationDistribution.into('bin') { from(helloWorldClient) from(hedgingHelloWorldClient) from(hedgingHelloWorldServer) + from(retryingHelloWorldClient) + from(retryingHelloWorldServer) from(compressingHelloWorldClient) fileMode = 0755 } diff --git a/examples/gradle/wrapper/gradle-wrapper.properties b/examples/gradle/wrapper/gradle-wrapper.properties index 7c4388a92162..21c56efbe47f 100644 --- a/examples/gradle/wrapper/gradle-wrapper.properties +++ b/examples/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Tue Jun 02 23:49:09 PDT 2020 +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-bin.zip -zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME diff --git a/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java new file mode 100644 index 000000000000..c03728664bfa --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java @@ -0,0 +1,149 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.retrying; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; + +import java.io.InputStreamReader; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * A client that requests a greeting from the {@link RetryingHelloWorldServer} with a retrying policy. + */ +public class RetryingHelloWorldClient { + static final String ENV_DISABLE_RETRIES = "DISABLE_RETRIES_IN_RETRIES_EXAMPLE"; + + private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName()); + + private final boolean enableRetries; + private final ManagedChannel channel; + private final GreeterGrpc.GreeterBlockingStub blockingStub; + private final AtomicInteger totalRpcs = new AtomicInteger(); + private final AtomicInteger failedRpcs = new AtomicInteger(); + + protected Map getRetryingServiceConfig() { + return new Gson() + .fromJson( + new JsonReader( + new InputStreamReader( + RetryingHelloWorldClient.class.getResourceAsStream( + "retrying_service_config.json"), + UTF_8)), + Map.class); + } + + /** + * Construct client connecting to HelloWorld server at {@code host:port}. + */ + public RetryingHelloWorldClient(String host, int port, boolean enableRetries) { + + ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(host, port) + // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid + // needing certificates. + .usePlaintext(); + if (enableRetries) { + Map serviceConfig = getRetryingServiceConfig(); + logger.info("Client started with retrying configuration: " + serviceConfig.toString()); + channelBuilder.defaultServiceConfig(serviceConfig).enableRetry(); + } + channel = channelBuilder.build(); + blockingStub = GreeterGrpc.newBlockingStub(channel); + this.enableRetries = enableRetries; + } + + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + + /** + * Say hello to server in a blocking unary call. + */ + public void greet(String name) { + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response = null; + StatusRuntimeException statusRuntimeException = null; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + failedRpcs.incrementAndGet(); + statusRuntimeException = e; + } + + totalRpcs.incrementAndGet(); + + if (statusRuntimeException == null) { + logger.log(Level.INFO,"Greeting: {0}", new Object[]{response.getMessage()}); + } else { + logger.log(Level.INFO,"RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()}); + } + } + + private void printSummary() { + logger.log( + Level.INFO, + "\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n", + new Object[]{ + totalRpcs.get(), failedRpcs.get()}); + + if (enableRetries) { + logger.log( + Level.INFO, + "Retrying enabled. To disable retries, run the client with environment variable {0}=true.", + ENV_DISABLE_RETRIES); + } else { + logger.log( + Level.INFO, + "Retrying disabled. To enable retries, unset environment variable {0} and then run the client.", + ENV_DISABLE_RETRIES); + } + } + + public static void main(String[] args) throws Exception { + boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRIES)); + final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries); + ForkJoinPool executor = new ForkJoinPool(); + + for (int i = 0; i < 50; i++) { + final String userId = "user" + i; + executor.execute( + new Runnable() { + @Override + public void run() { + client.greet(userId); + } + }); + } + executor.awaitQuiescence(100, TimeUnit.SECONDS); + executor.shutdown(); + client.printSummary(); + client.shutdown(); + } +} diff --git a/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java new file mode 100644 index 000000000000..51c37a3fdcac --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java @@ -0,0 +1,111 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.retrying; + +import io.grpc.*; + +import java.text.DecimalFormat; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * A HelloWorld server that responds to requests with UNAVAILABLE with a given percentage. + */ +public class RetryingHelloWorldServer { + private static final Logger logger = Logger.getLogger(RetryingHelloWorldServer.class.getName()); + private static final float unavailablePercentage = 0.5F; + private static Random random = new Random(); + + private Server server; + + private void start() throws IOException { + /* The port on which the server should run */ + int port = 50051; + server = ServerBuilder.forPort(port) + .addService(new GreeterImpl()) + .build() + .start(); + logger.info("Server started, listening on " + port); + + DecimalFormat df = new DecimalFormat("#%"); + logger.info("Responding as UNAVAILABLE to " + df.format(unavailablePercentage) + " requests"); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + RetryingHelloWorldServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + final RetryingHelloWorldServer server = new RetryingHelloWorldServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + AtomicInteger retryCounter = new AtomicInteger(0); + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + int count = retryCounter.incrementAndGet(); + if (random.nextFloat() < unavailablePercentage) { + logger.info("Returning stubbed UNAVAILABLE error. count: " + count); + responseObserver.onError(Status.UNAVAILABLE + .withDescription("Greeter temporarily unavailable...").asRuntimeException()); + } else { + logger.info("Returning successful Hello response, count: " + count); + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } + } +} diff --git a/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json b/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json new file mode 100644 index 000000000000..ff115a9ad654 --- /dev/null +++ b/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json @@ -0,0 +1,22 @@ +{ + "methodConfig": [ + { + "name": [ + { + "service": "helloworld.Greeter", + "method": "SayHello" + } + ], + + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.5s", + "maxBackoff": "30s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } + } + ] +}