Skip to content

Commit

Permalink
Java trace test demo (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Oct 12, 2022
1 parent 311dd97 commit 14cb9e8
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 43 deletions.
9 changes: 9 additions & 0 deletions end2end-test-examples/echo-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ Receive server-streaming responses with 10 seconds interval. Re-create the strea
./gradlew run --args="--stream=true --numRpcs=10 --interval=10000 --host=grpc-cloudapi.googleapis.com"
```

Use DirectPath Traffic Director and enable trace
```sh
./gradlew run --args="--numRpcs=1 --reqSize=100 --resSize=100 --host=grpc-cloudapi-directpath-esf-sidecar.sandbox.googleapis.com --useTd=true --enableTrace=true"
```

Example results:

```sh
Expand Down Expand Up @@ -91,6 +96,10 @@ Per sec Payload = 0.07 MB (exact amount of KB = 10000)

`--metricProbeName`: Additional label for metrics.

`--useTd`: Use DirectPath Traffic Director. The peer IP will also be printed.

`--enableTrace`: Enable trace, which will be exported to StackDriver.

## Deployment

Build the jar using maven (`mvn clean package`) or docker:
Expand Down
5 changes: 4 additions & 1 deletion end2end-test-examples/echo-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ version '2.3-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8

def grpcVersion = '1.34.1'
def grpcVersion = '1.49.0'
def protobufVersion = '3.9.0'
def protocVersion = protobufVersion
def opencensusVersion = '0.31.0'
Expand All @@ -25,13 +25,16 @@ dependencies {
implementation "io.grpc:grpc-testing:${grpcVersion}"
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
implementation "io.grpc:grpc-census:${grpcVersion}"
implementation "io.grpc:grpc-googleapis:${grpcVersion}"
implementation "net.sourceforge.argparse4j:argparse4j:0.9.0"
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
implementation "org.hdrhistogram:HdrHistogram:2.1.12"
implementation "io.opencensus:opencensus-api:${opencensusVersion}"
implementation "io.opencensus:opencensus-impl:${opencensusVersion}"
implementation "io.opencensus:opencensus-exporter-stats-stackdriver:${opencensusVersion}"
implementation "io.opencensus:opencensus-exporter-trace-stackdriver:${opencensusVersion}"
implementation 'io.opencensus:opencensus-contrib-grpc-metrics:${opencensus.version}'
implementation "com.google.http-client:google-http-client-jackson2:1.41.7"

implementation "org.apache.commons:commons-math3:3.6.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public class Args {
private static final int PORT = 443;

final int numRpcs;
final boolean enableTracer;
final String cookie;
final boolean header;
final int warmup;
Expand Down Expand Up @@ -46,6 +45,8 @@ public class Args {
final String metricProbeName;
final int numMsgs;
final int msgsInterval;
final boolean useTd;
final boolean enableTrace;

Args(String[] args) throws ArgumentParserException {
ArgumentParser parser =
Expand All @@ -55,7 +56,6 @@ public class Args {
.description("Echo client java binary");

parser.addArgument("--numRpcs").type(Integer.class).setDefault(1);
parser.addArgument("--tracer").type(Boolean.class).setDefault(false);
parser.addArgument("--cookie").type(String.class).setDefault("");
parser.addArgument("--header").type(Boolean.class).setDefault(false);
parser.addArgument("--warmup").type(Integer.class).setDefault(0);
Expand Down Expand Up @@ -89,12 +89,13 @@ public class Args {
parser.addArgument("--metricProbeName").type(String.class).setDefault("");
parser.addArgument("--numMsgs").type(Integer.class).setDefault(1);
parser.addArgument("--msgsInterval").type(Integer.class).setDefault(0);
parser.addArgument("--useTd").type(Boolean.class).setDefault(false);
parser.addArgument("--enableTrace").type(Boolean.class).setDefault(false);

Namespace ns = parser.parseArgs(args);

// Read args
numRpcs = ns.getInt("numRpcs");
enableTracer = ns.getBoolean("tracer");
cookie = ns.getString("cookie");
header = ns.getBoolean("header");
warmup = ns.getInt("warmup");
Expand Down Expand Up @@ -128,6 +129,8 @@ public class Args {
metricProbeName = ns.getString("metricProbeName");
numMsgs = ns.getInt("numMsgs");
msgsInterval = ns.getInt("msgsInterval");
useTd = ns.getBoolean("useTd");
enableTrace = ns.getBoolean("enableTrace");

distrib = (qps > 0) ? new PoissonDistribution(1000 / qps) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.google.api.MonitoredResource;
import io.grpc.*;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.alts.ComputeEngineChannelBuilder;
import io.grpc.echo.Echo.BatchEchoRequest;
import io.grpc.echo.Echo.BatchEchoResponse;
import io.grpc.echo.Echo.EchoResponse;
Expand All @@ -16,10 +19,18 @@
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;
import io.opencensus.metrics.*;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.config.TraceConfig;
import io.opencensus.trace.samplers.Samplers;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.sql.Timestamp;
import java.util.*;
Expand All @@ -31,6 +42,7 @@
import java.util.logging.Logger;
import javax.net.ssl.SSLException;
import org.HdrHistogram.Histogram;
import io.opencensus.common.Scope;

public class EchoClient {
private static final int STREAMING_MIN_INTERVAL = 1000;
Expand All @@ -51,10 +63,43 @@ public class EchoClient {
private Map<String, Map<Boolean, AtomicLong>> errorCounts = new ConcurrentHashMap<>();
private final String OTHER_STATUS = "OTHER";

private static final Tracer tracer = Tracing.getTracer();

private final ClientInterceptor peerCheckInterceptor =
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// Check peer IP after connection is established.
SocketAddress remoteAddr =
clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
String peerIP =
((InetSocketAddress) remoteAddr).getAddress().getHostAddress();
System.out.println("Peer IP = " + peerIP);
super.onHeaders(headers);
}
},
headers);
}
};
}
};

public EchoClient(Args args) throws IOException {
this.args = args;

setUpMetrics();
setupMetrics();
if (args.enableTrace) {
EchoClient.setupOpencensusTrace();
}

channels = new ManagedChannel[args.numChannels];
asyncStubs = new GrpcCloudapiStub[args.numChannels];
Expand Down Expand Up @@ -88,7 +133,7 @@ public EchoClient(Args args) throws IOException {
}
}

private void setUpMetrics() throws IOException {
private void setupMetrics() throws IOException {
// Configure standard gRPC client metrics
RpcViews.registerClientGrpcViews();

Expand Down Expand Up @@ -183,13 +228,30 @@ private void setUpMetrics() throws IOException {
}
}

private NettyChannelBuilder getChannelBuilder() throws SSLException {
NettyChannelBuilder builder =
NettyChannelBuilder.forTarget(args.host + ":" + args.port)
.sslContext(
GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build());
private static void setupOpencensusTrace() throws IOException {
// Always sample
TraceConfig traceConfig = Tracing.getTraceConfig();
traceConfig.updateActiveTraceParams(
traceConfig.getActiveTraceParams().toBuilder().setSampler(Samplers.alwaysSample()).build());
// Export to Stackdriver
StackdriverTraceExporter.createAndRegister(
StackdriverTraceConfiguration.builder()
.setProjectId("directpath-prod-manual-testing")
.build());
}

private ManagedChannelBuilder getChannelBuilder() throws SSLException {
ManagedChannelBuilder<?> builder;
if (args.useTd) {
builder = ComputeEngineChannelBuilder.forTarget("google-c2p-experimental:///" + args.host);
} else {
builder =
NettyChannelBuilder.forTarget(args.host + ":" + args.port)
.sslContext(
GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build());
}
if (!args.overrideService.isEmpty()) {
builder.overrideAuthority(args.overrideService);
}
Expand Down Expand Up @@ -227,6 +289,9 @@ private Channel createChannel(int i) throws SSLException {
ClientInterceptor interceptor = new MetricsClientInterceptor(this);
channel = ClientInterceptors.intercept(channel, interceptor);
}
if (args.useTd) {
channel = ClientInterceptors.intercept(channel, peerCheckInterceptor);
}
if (i == 0) {
blockingChannelCreated = System.currentTimeMillis();
}
Expand Down Expand Up @@ -328,6 +393,8 @@ void streamingEcho() {
}

void blockingEcho(Histogram histogram) throws SSLException {
String traceID = "Java-E2E-traceID-" + String.valueOf((int)(Math.random() * 1000.));
Scope ss = EchoClient.tracer.spanBuilder(traceID).startScopedSpan();
long start = 0;
try {
if (args.resType == 0) {
Expand Down Expand Up @@ -368,6 +435,8 @@ void blockingEcho(Histogram histogram) throws SSLException {
long elapsed = System.currentTimeMillis() - start;
logger.warning(String.format("RPC failed after %d ms: %s", elapsed, e.getStatus()));
e.printStackTrace();
} finally {
ss.close();
}
}

Expand Down

0 comments on commit 14cb9e8

Please sign in to comment.