From a54317483096da255ee744a49bd635b23c8ac4ba Mon Sep 17 00:00:00 2001 From: Alex Ehrnschwender Date: Tue, 16 Jun 2020 12:18:42 -0700 Subject: [PATCH] examples: Adds client/server retrying example via service config --- examples/README.md | 30 ++++ examples/build.gradle | 16 ++ .../retrying/RetryingHelloWorldClient.java | 148 ++++++++++++++++++ .../retrying/RetryingHelloWorldServer.java | 111 +++++++++++++ .../retrying/retrying_service_config.json | 22 +++ 5 files changed, 327 insertions(+) create mode 100644 examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java create mode 100644 examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java create mode 100644 examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json diff --git a/examples/README.md b/examples/README.md index 9f74895a45c..1f26076137b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -87,6 +87,36 @@ before trying out the examples. +-
+ Retrying + + The [retrying example](src/main/java/io/grpc/examples/retrying) provides a HelloWorld GRPC client & + server which demos the effect of client retry policy configured on the [ManagedChannel]( + ../api/src/main/java/io/grpc/ManagedChannel.java) via [GRPC ServiceConfig]( + https://github.com/grpc/grpc/blob/master/doc/service_config.md). Retry policy implementation & + configuration details are outlined in the [proposal](https://github.com/grpc/proposal/blob/master/A6-client-retries.md). + + This retrying example is very similar to the [hedging example](src/main/java/io/grpc/examples/hedging) in its setup. + The [RetryingHelloWorldServer](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java) responds with + a status UNAVAILABLE error response to a specified percentage of requests to simulate server resource exhaustion and + general flakiness. The [RetryingHelloWorldClient](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java) makes + a number of sequential requests to the server, several of which will be retried depending on the configured policy in + [retrying_service_config.json](src/main/resources/io/grpc/examples/retrying/retrying_service_config.json). Although + the requests are blocking unary calls for simplicity, these could easily be changed to future unary calls in order to + test the result of request concurrency with retry policy enabled. + + One can experiment with the [RetryingHelloWorldServer](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java) + failure conditions to simulate server throttling, as well as alter policy values in the [retrying_service_config.json]( + src/main/resources/io/grpc/examples/retrying/retrying_service_config.json) to see their effects. To disable retrying + entirely, set environment variable `DISABLE_RETRYING_IN_RETRYING_EXAMPLE=true` before running the client. + Disabling the retry policy should produce many more failed GRPC calls as seen in the output log. + + See [the section below](#to-build-the-examples) for how to build and run the example. The + executables for the server and the client are `retrying-hello-world-server` and + `retrying-hello-world-client`. + +
+ ### To build the examples 1. **[Install gRPC Java library SNAPSHOT locally, including code generation plugin](../COMPILING.md) (Only need this step for non-released versions, e.g. master HEAD).** diff --git a/examples/build.gradle b/examples/build.gradle index 9af9adb28b8..2a064516a23 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -91,6 +91,20 @@ task helloWorldClient(type: CreateStartScripts) { classpath = startScripts.classpath } +task retryingHelloWorldServer(type: CreateStartScripts) { + mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldServer' + applicationName = 'retrying-hello-world-server' + outputDir = new File(project.buildDir, 'tmp') + classpath = startScripts.classpath +} + +task retryingHelloWorldClient(type: CreateStartScripts) { + mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldClient' + applicationName = 'retrying-hello-world-client' + outputDir = new File(project.buildDir, 'tmp') + classpath = startScripts.classpath +} + task hedgingHelloWorldServer(type: CreateStartScripts) { mainClassName = 'io.grpc.examples.hedging.HedgingHelloWorldServer' applicationName = 'hedging-hello-world-server' @@ -119,6 +133,8 @@ applicationDistribution.into('bin') { from(helloWorldClient) from(hedgingHelloWorldClient) from(hedgingHelloWorldServer) + from(retryingHelloWorldClient) + from(retryingHelloWorldServer) from(compressingHelloWorldClient) fileMode = 0755 } diff --git a/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java new file mode 100644 index 00000000000..73f5d69802d --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java @@ -0,0 +1,148 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.retrying; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A client that requests a greeting from the {@link RetryingHelloWorldServer} with a retrying policy. + */ +public class RetryingHelloWorldClient { + static final String ENV_DISABLE_RETRYING = "DISABLE_RETRYING_IN_RETRYING_EXAMPLE"; + + private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName()); + + private final boolean enableRetries; + private final ManagedChannel channel; + private final GreeterGrpc.GreeterBlockingStub blockingStub; + private final AtomicInteger totalRpcs = new AtomicInteger(); + private final AtomicInteger failedRpcs = new AtomicInteger(); + + protected Map getRetryingServiceConfig() { + return new Gson() + .fromJson( + new JsonReader( + new InputStreamReader( + RetryingHelloWorldClient.class.getResourceAsStream( + "retrying_service_config.json"), + UTF_8)), + Map.class); + } + + /** + * Construct client connecting to HelloWorld server at {@code host:port}. + */ + public RetryingHelloWorldClient(String host, int port, boolean enableRetries) { + + ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(host, port) + // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid + // needing certificates. + .usePlaintext(); + if (enableRetries) { + Map serviceConfig = getRetryingServiceConfig(); + logger.info("Client started with retrying configuration: " + serviceConfig.toString()); + channelBuilder.defaultServiceConfig(serviceConfig).enableRetry(); + } + channel = channelBuilder.build(); + blockingStub = GreeterGrpc.newBlockingStub(channel); + this.enableRetries = enableRetries; + } + + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(60, TimeUnit.SECONDS); + } + + /** + * Say hello to server in a blocking unary call. + */ + public void greet(String name) { + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response = null; + StatusRuntimeException statusRuntimeException = null; + try { + response = blockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + failedRpcs.incrementAndGet(); + statusRuntimeException = e; + } + + totalRpcs.incrementAndGet(); + + if (statusRuntimeException == null) { + logger.log(Level.INFO,"Greeting: {0}", new Object[]{response.getMessage()}); + } else { + logger.log(Level.INFO,"RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()}); + } + } + + private void printSummary() { + logger.log( + Level.INFO, + "\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n", + new Object[]{ + totalRpcs.get(), failedRpcs.get()}); + + if (enableRetries) { + logger.log( + Level.INFO, + "Retrying enabled. To disable retries, run the client with environment variable {0}=true.", + ENV_DISABLE_RETRYING); + } else { + logger.log( + Level.INFO, + "Retrying disabled. To enable retries, unset environment variable {0} and then run the client.", + ENV_DISABLE_RETRYING); + } + } + + public static void main(String[] args) throws Exception { + boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING)); + final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries); + ForkJoinPool executor = new ForkJoinPool(); + + for (int i = 0; i < 50; i++) { + final String userId = "user" + i; + executor.execute( + new Runnable() { + @Override + public void run() { + client.greet(userId); + } + }); + } + executor.awaitQuiescence(100, TimeUnit.SECONDS); + executor.shutdown(); + client.printSummary(); + client.shutdown(); + } +} diff --git a/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java new file mode 100644 index 00000000000..37c48e0d9b0 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.retrying; + +import java.text.DecimalFormat; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * A HelloWorld server that responds to requests with UNAVAILABLE with a given percentage. + */ +public class RetryingHelloWorldServer { + private static final Logger logger = Logger.getLogger(RetryingHelloWorldServer.class.getName()); + private static final float unavailablePercentage = 0.5F; + private static Random random = new Random(); + + private Server server; + + private void start() throws IOException { + /* The port on which the server should run */ + int port = 50051; + server = ServerBuilder.forPort(port) + .addService(new GreeterImpl()) + .build() + .start(); + logger.info("Server started, listening on " + port); + + DecimalFormat df = new DecimalFormat("#%"); + logger.info("Responding as UNAVAILABLE to " + df.format(unavailablePercentage) + " requests"); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + RetryingHelloWorldServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + final RetryingHelloWorldServer server = new RetryingHelloWorldServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class GreeterImpl extends GreeterGrpc.GreeterImplBase { + AtomicInteger retryCounter = new AtomicInteger(0); + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + int count = retryCounter.incrementAndGet(); + if (random.nextFloat() < unavailablePercentage) { + logger.info("Returning stubbed UNAVAILABLE error. count: " + count); + responseObserver.onError(Status.UNAVAILABLE + .withDescription("Greeter temporarily unavailable...").asRuntimeException()); + } else { + logger.info("Returning successful Hello response, count: " + count); + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } + } +} diff --git a/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json b/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json new file mode 100644 index 00000000000..ff115a9ad65 --- /dev/null +++ b/examples/src/main/resources/io/grpc/examples/retrying/retrying_service_config.json @@ -0,0 +1,22 @@ +{ + "methodConfig": [ + { + "name": [ + { + "service": "helloworld.Greeter", + "method": "SayHello" + } + ], + + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.5s", + "maxBackoff": "30s", + "backoffMultiplier": 2, + "retryableStatusCodes": [ + "UNAVAILABLE" + ] + } + } + ] +}