Skip to content

Commit

Permalink
examples: Adds GRPC client/server retrying example using service config
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderscott committed Jun 10, 2020
1 parent a3a7a5e commit d85e6f2
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 2 deletions.
16 changes: 16 additions & 0 deletions examples/build.gradle
Expand Up @@ -91,6 +91,20 @@ task helloWorldClient(type: CreateStartScripts) {
classpath = startScripts.classpath
}

task retryingHelloWorldServer(type: CreateStartScripts) {
mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldServer'
applicationName = 'retrying-hello-world-server'
outputDir = new File(project.buildDir, 'tmp')
classpath = startScripts.classpath
}

task retryingHelloWorldClient(type: CreateStartScripts) {
mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldClient'
applicationName = 'retrying-hello-world-client'
outputDir = new File(project.buildDir, 'tmp')
classpath = startScripts.classpath
}

task hedgingHelloWorldServer(type: CreateStartScripts) {
mainClassName = 'io.grpc.examples.hedging.HedgingHelloWorldServer'
applicationName = 'hedging-hello-world-server'
Expand Down Expand Up @@ -119,6 +133,8 @@ applicationDistribution.into('bin') {
from(helloWorldClient)
from(hedgingHelloWorldClient)
from(hedgingHelloWorldServer)
from(retryingHelloWorldClient)
from(retryingHelloWorldServer)
from(compressingHelloWorldClient)
fileMode = 0755
}
5 changes: 3 additions & 2 deletions examples/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
#Tue Jun 02 23:49:09 PDT 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
@@ -0,0 +1,149 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.retrying;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;

import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A client that requests a greeting from the {@link RetryingHelloWorldServer} with a retrying policy.
*/
public class RetryingHelloWorldClient {
static final String ENV_DISABLE_RETRIES = "DISABLE_RETRIES_IN_RETRIES_EXAMPLE";

private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName());

private final boolean enableRetries;
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private final AtomicInteger totalRpcs = new AtomicInteger();
private final AtomicInteger failedRpcs = new AtomicInteger();

protected Map<String, ?> getRetryingServiceConfig() {
return new Gson()
.fromJson(
new JsonReader(
new InputStreamReader(
RetryingHelloWorldClient.class.getResourceAsStream(
"retrying_service_config.json"),
UTF_8)),
Map.class);
}

/**
* Construct client connecting to HelloWorld server at {@code host:port}.
*/
public RetryingHelloWorldClient(String host, int port, boolean enableRetries) {

ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext();
if (enableRetries) {
Map<String, ?> serviceConfig = getRetryingServiceConfig();
logger.info("Client started with retrying configuration: " + serviceConfig.toString());
channelBuilder.defaultServiceConfig(serviceConfig).enableRetry();
}
channel = channelBuilder.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
this.enableRetries = enableRetries;
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}

/**
* Say hello to server in a blocking unary call.
*/
public void greet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response = null;
StatusRuntimeException statusRuntimeException = null;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
failedRpcs.incrementAndGet();
statusRuntimeException = e;
}

totalRpcs.incrementAndGet();

if (statusRuntimeException == null) {
logger.log(Level.INFO,"Greeting: {0}", new Object[]{response.getMessage()});
} else {
logger.log(Level.INFO,"RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()});
}
}

private void printSummary() {
logger.log(
Level.INFO,
"\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n",
new Object[]{
totalRpcs.get(), failedRpcs.get()});

if (enableRetries) {
logger.log(
Level.INFO,
"Retrying enabled. To disable retries, run the client with environment variable {0}=true.",
ENV_DISABLE_RETRIES);
} else {
logger.log(
Level.INFO,
"Retrying disabled. To enable retries, unset environment variable {0} and then run the client.",
ENV_DISABLE_RETRIES);
}
}

public static void main(String[] args) throws Exception {
boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRIES));
final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
ForkJoinPool executor = new ForkJoinPool();

for (int i = 0; i < 50; i++) {
final String userId = "user" + i;
executor.execute(
new Runnable() {
@Override
public void run() {
client.greet(userId);
}
});
}
executor.awaitQuiescence(100, TimeUnit.SECONDS);
executor.shutdown();
client.printSummary();
client.shutdown();
}
}
@@ -0,0 +1,111 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.examples.retrying;

import io.grpc.*;

import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* A HelloWorld server that responds to requests with UNAVAILABLE with a given percentage.
*/
public class RetryingHelloWorldServer {
private static final Logger logger = Logger.getLogger(RetryingHelloWorldServer.class.getName());
private static final float unavailablePercentage = 0.5F;
private static Random random = new Random();

private Server server;

private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);

DecimalFormat df = new DecimalFormat("#%");
logger.info("Responding as UNAVAILABLE to " + df.format(unavailablePercentage) + " requests");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
RetryingHelloWorldServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}

private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final RetryingHelloWorldServer server = new RetryingHelloWorldServer();
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
AtomicInteger retryCounter = new AtomicInteger(0);

@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
int count = retryCounter.incrementAndGet();
if (random.nextFloat() < unavailablePercentage) {
logger.info("Returning stubbed UNAVAILABLE error. count: " + count);
responseObserver.onError(Status.UNAVAILABLE
.withDescription("Greeter temporarily unavailable...").asRuntimeException());
} else {
logger.info("Returning successful Hello response, count: " + count);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
}
@@ -0,0 +1,22 @@
{
"methodConfig": [
{
"name": [
{
"service": "helloworld.Greeter",
"method": "SayHello"
}
],

"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}
]
}

0 comments on commit d85e6f2

Please sign in to comment.