diff --git a/services/BUILD.bazel b/services/BUILD.bazel index 97b5120ccd3..4418862b0fa 100644 --- a/services/BUILD.bazel +++ b/services/BUILD.bazel @@ -5,11 +5,17 @@ package(default_visibility = ["//visibility:public"]) java_library( name = "binarylog", srcs = [ + "src/main/java/io/grpc/protobuf/services/BinaryLogProvider.java", "src/main/java/io/grpc/services/BinaryLogProvider.java", + "src/main/java/io/grpc/protobuf/services/BinaryLogProviderImpl.java", "src/main/java/io/grpc/services/BinaryLogProviderImpl.java", + "src/main/java/io/grpc/protobuf/services/BinaryLogSink.java", "src/main/java/io/grpc/services/BinaryLogSink.java", + "src/main/java/io/grpc/protobuf/services/BinlogHelper.java", "src/main/java/io/grpc/services/BinlogHelper.java", + "src/main/java/io/grpc/protobuf/services/InetAddressUtil.java", "src/main/java/io/grpc/services/InetAddressUtil.java", + "src/main/java/io/grpc/protobuf/services/TempFileSink.java", "src/main/java/io/grpc/services/TempFileSink.java", ], deps = [ @@ -26,7 +32,9 @@ java_library( java_library( name = "channelz", srcs = [ + "src/main/java/io/grpc/protobuf/services/ChannelzProtoUtil.java", "src/main/java/io/grpc/services/ChannelzProtoUtil.java", + "src/main/java/io/grpc/protobuf/services/ChannelzService.java", "src/main/java/io/grpc/services/ChannelzService.java", ], deps = [ @@ -46,6 +54,7 @@ java_library( name = "reflection", srcs = [ "src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java", + "src/main/java/io/grpc/services/ProtoReflectionService.java", ], deps = [ ":_reflection_java_grpc", @@ -66,7 +75,8 @@ java_library( java_library( name = "health", srcs = [ - "src/main/java/io/grpc/services/HealthServiceImpl.java", + "src/main/java/io/grpc/protobuf/services/HealthServiceImpl.java", + "src/main/java/io/grpc/protobuf/services/HealthStatusManager.java", "src/main/java/io/grpc/services/HealthStatusManager.java", ], deps = [ diff --git a/services/src/main/java/io/grpc/services/BinaryLogProvider.java b/services/src/main/java/io/grpc/services/BinaryLogProvider.java new file mode 100644 index 00000000000..3ca5de65558 --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogProvider.java @@ -0,0 +1,194 @@ +/* + * Copyright 2017 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.grpc.BinaryLog; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.Internal; +import io.grpc.InternalClientInterceptors; +import io.grpc.InternalServerInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerMethodDefinition; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import javax.annotation.Nullable; + +// TODO(zpencer): rename class to AbstractBinaryLog +@Internal +public abstract class BinaryLogProvider extends BinaryLog { + @VisibleForTesting + public static final Marshaller BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); + + private final ClientInterceptor binaryLogShim = new BinaryLogShim(); + + /** + * Wraps a channel to provide binary logging on {@link ClientCall}s as needed. + */ + @Override + public final Channel wrapChannel(Channel channel) { + return ClientInterceptors.intercept(channel, binaryLogShim); + } + + private static MethodDescriptor toByteBufferMethod( + MethodDescriptor method) { + return method.toBuilder(BYTEARRAY_MARSHALLER, BYTEARRAY_MARSHALLER).build(); + } + + /** + * Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed. + */ + @Override + public final ServerMethodDefinition wrapMethodDefinition( + ServerMethodDefinition oMethodDef) { + ServerInterceptor binlogInterceptor = + getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName()); + if (binlogInterceptor == null) { + return oMethodDef; + } + MethodDescriptor binMethod = + BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor()); + ServerMethodDefinition binDef = + InternalServerInterceptors.wrapMethod(oMethodDef, binMethod); + ServerCallHandler binlogHandler = + InternalServerInterceptors.interceptCallHandlerCreate( + binlogInterceptor, binDef.getServerCallHandler()); + return ServerMethodDefinition.create(binMethod, binlogHandler); + } + + /** + * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, + * so the interceptor must be reusable across calls. At runtime, the request and response + * marshallers are always {@code Marshaller}. + * Returns {@code null} if this method is not binary logged. + */ + // TODO(zpencer): ensure the interceptor properly handles retries and hedging + @Nullable + protected abstract ServerInterceptor getServerInterceptor(String fullMethodName); + + /** + * Returns a {@link ClientInterceptor} for binary logging. gRPC is free to cache the interceptor, + * so the interceptor must be reusable across calls. At runtime, the request and response + * marshallers are always {@code Marshaller}. + * Returns {@code null} if this method is not binary logged. + */ + // TODO(zpencer): ensure the interceptor properly handles retries and hedging + @Nullable + protected abstract ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions); + + @Override + public void close() throws IOException { + // default impl: noop + // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there + } + + // Creating a named class makes debugging easier + private static final class ByteArrayMarshaller implements Marshaller { + @Override + public InputStream stream(byte[] value) { + return new ByteArrayInputStream(value); + } + + @Override + public byte[] parse(InputStream stream) { + try { + return parseHelper(stream); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private byte[] parseHelper(InputStream stream) throws IOException { + try { + return IoUtils.toByteArray(stream); + } finally { + stream.close(); + } + } + } + + /** + * The pipeline of interceptors is hard coded when the {@link ManagedChannel} is created. + * This shim interceptor should always be installed as a placeholder. When a call starts, + * this interceptor checks with the {@link BinaryLogProvider} to see if logging should happen + * for this particular {@link ClientCall}'s method. + */ + private final class BinaryLogShim implements ClientInterceptor { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, + Channel next) { + ClientInterceptor binlogInterceptor = getClientInterceptor( + method.getFullMethodName(), callOptions); + if (binlogInterceptor == null) { + return next.newCall(method, callOptions); + } else { + return InternalClientInterceptors + .wrapClientInterceptor( + binlogInterceptor, + BYTEARRAY_MARSHALLER, + BYTEARRAY_MARSHALLER) + .interceptCall(method, callOptions, next); + } + } + } + + // Copied from internal + private static final class IoUtils { + /** maximum buffer to be read is 16 KB. */ + private static final int MAX_BUFFER_LENGTH = 16384; + + /** Returns the byte array. */ + public static byte[] toByteArray(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copy(in, out); + return out.toByteArray(); + } + + /** Copies the data from input stream to output stream. */ + public static long copy(InputStream from, OutputStream to) throws IOException { + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + Preconditions.checkNotNull(from); + Preconditions.checkNotNull(to); + byte[] buf = new byte[MAX_BUFFER_LENGTH]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; + } + } +} diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java index 5c09f8a3ddd..dee13a924c6 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java +++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java @@ -20,7 +20,6 @@ import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.ServerInterceptor; -import io.grpc.protobuf.services.BinaryLogProvider; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable;