blacklistedMethods = new HashSet<>();
+ if (configurationString != null && configurationString.length() > 0) {
+ for (String configuration : Splitter.on(',').split(configurationString)) {
+ int leftCurly = configuration.indexOf('{');
+ // '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified
+ String methodOrSvc;
+ // An expression originally wrapped in curly braces; like {m:256,h:256}, {m:256}, {h:256}
+ String binlogOptionStr;
+ if (leftCurly == -1) {
+ methodOrSvc = configuration;
+ binlogOptionStr = null;
+ } else {
+ int rightCurly = configuration.indexOf('}', leftCurly);
+ if (rightCurly != configuration.length() - 1) {
+ throw new IllegalArgumentException("Illegal log config pattern: " + configuration);
+ }
+ methodOrSvc = configuration.substring(0, leftCurly);
+ // option without the curly braces
+ binlogOptionStr = configuration.substring(leftCurly + 1, configuration.length() - 1);
+ }
+ if (methodOrSvc.isEmpty()) {
+ throw new IllegalArgumentException("Illegal log config pattern: " + configuration);
+ }
+ if (methodOrSvc.equals("*")) {
+ // parse config for "*"
+ checkState(
+ globalLog == null,
+ "Duplicate entry, this is fatal: " + configuration);
+ globalLog = createBinaryLog(sink, binlogOptionStr);
+ logger.log(Level.INFO, "Global binlog: {0}", binlogOptionStr);
+ } else if (isServiceGlob(methodOrSvc)) {
+ // parse config for a service, e.g. "service/*"
+ String service = MethodDescriptor.extractFullServiceName(methodOrSvc);
+ checkState(
+ !perServiceLogs.containsKey(service),
+ "Duplicate entry, this is fatal: " + configuration);
+ perServiceLogs.put(service, createBinaryLog(sink, binlogOptionStr));
+ logger.log(
+ Level.INFO,
+ "Service binlog: service={0} config={1}",
+ new Object[] {service, binlogOptionStr});
+ } else if (methodOrSvc.startsWith("-")) {
+ // parse config for a method, e.g. "-service/method"
+ String blacklistedMethod = methodOrSvc.substring(1);
+ if (blacklistedMethod.length() == 0) {
+ continue;
+ }
+ checkState(
+ !blacklistedMethods.contains(blacklistedMethod),
+ "Duplicate entry, this is fatal: " + configuration);
+ checkState(
+ !perMethodLogs.containsKey(blacklistedMethod),
+ "Duplicate entry, this is fatal: " + configuration);
+ blacklistedMethods.add(blacklistedMethod);
+ } else {
+ // parse config for a fully qualified method, e.g "serice/method"
+ checkState(
+ !perMethodLogs.containsKey(methodOrSvc),
+ "Duplicate entry, this is fatal: " + configuration);
+ checkState(
+ !blacklistedMethods.contains(methodOrSvc),
+ "Duplicate entry, this method was blacklisted: " + configuration);
+ perMethodLogs.put(methodOrSvc, createBinaryLog(sink, binlogOptionStr));
+ logger.log(
+ Level.INFO,
+ "Method binlog: method={0} config={1}",
+ new Object[] {methodOrSvc, binlogOptionStr});
+ }
+ }
+ }
+ this.globalLog = globalLog;
+ this.perServiceLogs = Collections.unmodifiableMap(perServiceLogs);
+ this.perMethodLogs = Collections.unmodifiableMap(perMethodLogs);
+ this.blacklistedMethods = Collections.unmodifiableSet(blacklistedMethods);
+ }
+
+ /**
+ * Accepts a full method name and returns the log that should be used.
+ */
+ @Override
+ public BinlogHelper getLog(String fullMethodName) {
+ if (blacklistedMethods.contains(fullMethodName)) {
+ return null;
+ }
+ BinlogHelper methodLog = perMethodLogs.get(fullMethodName);
+ if (methodLog != null) {
+ return methodLog;
+ }
+ BinlogHelper serviceLog = perServiceLogs.get(
+ MethodDescriptor.extractFullServiceName(fullMethodName));
+ if (serviceLog != null) {
+ return serviceLog;
+ }
+ return globalLog;
+ }
+
+ /**
+ * Returns a binlog with the correct header and message limits or {@code null} if the input
+ * is malformed. The input should be a string that is in one of these forms:
+ *
+ * {@code {h(:\d+)?}, {m(:\d+)?}, {h(:\d+)?,m(:\d+)?}}
+ *
+ *
If the {@code logConfig} is null, the returned binlog will have a limit of
+ * Integer.MAX_VALUE.
+ */
+ @VisibleForTesting
+ @Nullable
+ static BinlogHelper createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) {
+ if (logConfig == null) {
+ return new BinlogHelper(
+ new SinkWriterImpl(
+ sink, TimeProvider.SYSTEM_TIME_PROVIDER, Integer.MAX_VALUE, Integer.MAX_VALUE));
+ }
+ try {
+ final int maxHeaderBytes;
+ final int maxMsgBytes;
+ String[] parts = logConfig.split(";", 2);
+ if (parts.length == 2) {
+ if (!(parts[0].startsWith("h") && parts[1].startsWith("m"))) {
+ throw new IllegalArgumentException("Illegal log config pattern");
+ }
+ maxHeaderBytes = optionalInt(parts[0].substring(1));
+ maxMsgBytes = optionalInt(parts[1].substring(1));
+ } else if (parts[0].startsWith("h")) {
+ maxHeaderBytes = optionalInt(parts[0].substring(1));
+ maxMsgBytes = 0;
+ } else if (parts[0].startsWith("m")) {
+ maxHeaderBytes = 0;
+ maxMsgBytes = optionalInt(parts[0].substring(1));
+ } else {
+ throw new IllegalArgumentException("Illegal log config pattern");
+ }
+ return new BinlogHelper(
+ new SinkWriterImpl(
+ sink, TimeProvider.SYSTEM_TIME_PROVIDER, maxHeaderBytes, maxMsgBytes));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Illegal log config pattern");
+ }
+ }
+
+ /** Returns {@code s}, after verifying it contains only digits. */
+ static String checkDigits(String s) {
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c < '0' || '9' < c) {
+ throw new IllegalArgumentException("Illegal log config pattern");
+ }
+ }
+ return s;
+ }
+
+ /** Parses the optional int of the form "" (max int) or ":123" (123). */
+ static int optionalInt(String s) {
+ if (s.isEmpty()) {
+ return Integer.MAX_VALUE;
+ }
+ if (!s.startsWith(":")) {
+ throw new IllegalArgumentException("Illegal log config pattern");
+ }
+ s = checkDigits(s.substring(1));
+ return Integer.parseInt(s);
+ }
+
+ /**
+ * Returns true if the input string is a glob of the form: {@code /*}.
+ */
+ static boolean isServiceGlob(String input) {
+ return input.endsWith("/*");
+ }
+ }
+
+ @VisibleForTesting
+ static Address socketToProto(SocketAddress address) {
+ checkNotNull(address, "address");
+
+ Address.Builder builder = Address.newBuilder();
+ if (address instanceof InetSocketAddress) {
+ InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
+ if (inetAddress instanceof Inet4Address) {
+ builder.setType(Address.Type.TYPE_IPV4)
+ .setAddress(InetAddressUtil.toAddrString(inetAddress));
+ } else if (inetAddress instanceof Inet6Address) {
+ builder.setType(Address.Type.TYPE_IPV6)
+ .setAddress(InetAddressUtil.toAddrString(inetAddress));
+ } else {
+ logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
+ builder.setAddress(address.toString());
+ }
+ builder.setIpPort(((InetSocketAddress) address).getPort());
+ } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
+ // To avoid a compile time dependency on grpc-netty, we check against the runtime class name.
+ builder.setType(Address.Type.TYPE_UNIX)
+ .setAddress(address.toString());
+ } else {
+ builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
+ }
+ return builder.build();
+ }
+
+ private static final Set NEVER_INCLUDED_METADATA = new HashSet<>(
+ Collections.singletonList(
+ // grpc-status-details-bin is already logged in a field of the binlog proto
+ STATUS_DETAILS_KEY.name()));
+ private static final Set ALWAYS_INCLUDED_METADATA = new HashSet<>(
+ Collections.singletonList(
+ "grpc-trace-bin"));
+
+ static final class MaybeTruncated {
+ T proto;
+ boolean truncated;
+
+ private MaybeTruncated(T proto, boolean truncated) {
+ this.proto = proto;
+ this.truncated = truncated;
+ }
+ }
+
+ @VisibleForTesting
+ static MaybeTruncated createMetadataProto(
+ Metadata metadata, int maxHeaderBytes) {
+ checkNotNull(metadata, "metadata");
+ checkArgument(maxHeaderBytes >= 0, "maxHeaderBytes must be non negative");
+ io.grpc.binarylog.v1.Metadata.Builder metaBuilder = io.grpc.binarylog.v1.Metadata.newBuilder();
+ // This code is tightly coupled with Metadata's implementation
+ byte[][] serialized = InternalMetadata.serialize(metadata);
+ boolean truncated = false;
+ if (serialized != null) {
+ int curBytes = 0;
+ for (int i = 0; i < serialized.length; i += 2) {
+ String key = new String(serialized[i], Charsets.UTF_8);
+ byte[] value = serialized[i + 1];
+ if (NEVER_INCLUDED_METADATA.contains(key)) {
+ continue;
+ }
+ boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key);
+ int bytesAfterAdd = curBytes + key.length() + value.length;
+ if (!forceInclude && bytesAfterAdd > maxHeaderBytes) {
+ truncated = true;
+ continue;
+ }
+ metaBuilder.addEntryBuilder()
+ .setKey(key)
+ .setValue(ByteString.copyFrom(value));
+ if (!forceInclude) {
+ // force included keys do not count towards the size limit
+ curBytes = bytesAfterAdd;
+ }
+ }
+ }
+ return new MaybeTruncated<>(metaBuilder, truncated);
+ }
+
+ @VisibleForTesting
+ static MaybeTruncated createMessageProto(
+ byte[] message, int maxMessageBytes) {
+ checkNotNull(message, "message");
+ checkArgument(maxMessageBytes >= 0, "maxMessageBytes must be non negative");
+ Message.Builder msgBuilder = Message
+ .newBuilder()
+ .setLength(message.length);
+ if (maxMessageBytes > 0) {
+ int desiredBytes = Math.min(maxMessageBytes, message.length);
+ msgBuilder.setData(ByteString.copyFrom(message, 0, desiredBytes));
+ }
+ return new MaybeTruncated<>(msgBuilder, maxMessageBytes < message.length);
+ }
+}
diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
new file mode 100644
index 00000000000..349995d9f80
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
@@ -0,0 +1,471 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.Timestamps;
+import io.grpc.ConnectivityState;
+import io.grpc.InternalChannelz;
+import io.grpc.InternalChannelz.ChannelStats;
+import io.grpc.InternalChannelz.ChannelTrace.Event;
+import io.grpc.InternalChannelz.RootChannelList;
+import io.grpc.InternalChannelz.ServerList;
+import io.grpc.InternalChannelz.ServerSocketsList;
+import io.grpc.InternalChannelz.ServerStats;
+import io.grpc.InternalChannelz.SocketStats;
+import io.grpc.InternalChannelz.TransportStats;
+import io.grpc.InternalInstrumented;
+import io.grpc.InternalWithLogId;
+import io.grpc.Status;
+import io.grpc.channelz.v1.Address;
+import io.grpc.channelz.v1.Address.OtherAddress;
+import io.grpc.channelz.v1.Address.TcpIpAddress;
+import io.grpc.channelz.v1.Address.UdsAddress;
+import io.grpc.channelz.v1.Channel;
+import io.grpc.channelz.v1.ChannelConnectivityState;
+import io.grpc.channelz.v1.ChannelConnectivityState.State;
+import io.grpc.channelz.v1.ChannelData;
+import io.grpc.channelz.v1.ChannelRef;
+import io.grpc.channelz.v1.ChannelTrace;
+import io.grpc.channelz.v1.ChannelTraceEvent;
+import io.grpc.channelz.v1.ChannelTraceEvent.Severity;
+import io.grpc.channelz.v1.GetServerSocketsResponse;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.channelz.v1.Security;
+import io.grpc.channelz.v1.Security.OtherSecurity;
+import io.grpc.channelz.v1.Security.Tls;
+import io.grpc.channelz.v1.Server;
+import io.grpc.channelz.v1.ServerData;
+import io.grpc.channelz.v1.ServerRef;
+import io.grpc.channelz.v1.Socket;
+import io.grpc.channelz.v1.SocketData;
+import io.grpc.channelz.v1.SocketOption;
+import io.grpc.channelz.v1.SocketOptionLinger;
+import io.grpc.channelz.v1.SocketOptionTcpInfo;
+import io.grpc.channelz.v1.SocketOptionTimeout;
+import io.grpc.channelz.v1.SocketRef;
+import io.grpc.channelz.v1.Subchannel;
+import io.grpc.channelz.v1.SubchannelRef;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.cert.CertificateEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A static utility class for turning internal data structures into protos.
+ */
+final class ChannelzProtoUtil {
+ private static final Logger logger = Logger.getLogger(ChannelzProtoUtil.class.getName());
+
+ private ChannelzProtoUtil() {
+ // do not instantiate.
+ }
+
+ static ChannelRef toChannelRef(InternalWithLogId obj) {
+ return ChannelRef
+ .newBuilder()
+ .setChannelId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static SubchannelRef toSubchannelRef(InternalWithLogId obj) {
+ return SubchannelRef
+ .newBuilder()
+ .setSubchannelId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static ServerRef toServerRef(InternalWithLogId obj) {
+ return ServerRef
+ .newBuilder()
+ .setServerId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static SocketRef toSocketRef(InternalWithLogId obj) {
+ return SocketRef
+ .newBuilder()
+ .setSocketId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static Server toServer(InternalInstrumented obj) {
+ ServerStats stats = getFuture(obj.getStats());
+ Server.Builder builder = Server
+ .newBuilder()
+ .setRef(toServerRef(obj))
+ .setData(toServerData(stats));
+ for (InternalInstrumented listenSocket : stats.listenSockets) {
+ builder.addListenSocket(toSocketRef(listenSocket));
+ }
+ return builder.build();
+ }
+
+ static ServerData toServerData(ServerStats stats) {
+ return ServerData
+ .newBuilder()
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos))
+ .build();
+ }
+
+ static Security toSecurity(InternalChannelz.Security security) {
+ Preconditions.checkNotNull(security);
+ Preconditions.checkState(
+ security.tls != null ^ security.other != null,
+ "one of tls or othersecurity must be non null");
+ if (security.tls != null) {
+ Tls.Builder tlsBuilder
+ = Tls.newBuilder().setStandardName(security.tls.cipherSuiteStandardName);
+ try {
+ if (security.tls.localCert != null) {
+ tlsBuilder.setLocalCertificate(ByteString.copyFrom(
+ security.tls.localCert.getEncoded()));
+ }
+ if (security.tls.remoteCert != null) {
+ tlsBuilder.setRemoteCertificate(ByteString.copyFrom(
+ security.tls.remoteCert.getEncoded()));
+ }
+ } catch (CertificateEncodingException e) {
+ logger.log(Level.FINE, "Caught exception", e);
+ }
+ return Security.newBuilder().setTls(tlsBuilder).build();
+ } else {
+ OtherSecurity.Builder builder = OtherSecurity.newBuilder().setName(security.other.name);
+ if (security.other.any != null) {
+ builder.setValue((Any) security.other.any);
+ }
+ return Security.newBuilder().setOther(builder).build();
+ }
+ }
+
+ static Socket toSocket(InternalInstrumented obj) {
+ SocketStats socketStats = getFuture(obj.getStats());
+ Socket.Builder builder = Socket.newBuilder()
+ .setRef(toSocketRef(obj))
+ .setLocal(toAddress(socketStats.local));
+ if (socketStats.security != null) {
+ builder.setSecurity(toSecurity(socketStats.security));
+ }
+ // listen sockets do not have remote nor data
+ if (socketStats.remote != null) {
+ builder.setRemote(toAddress(socketStats.remote));
+ }
+ builder.setData(extractSocketData(socketStats));
+ return builder.build();
+ }
+
+ static Address toAddress(SocketAddress address) {
+ Preconditions.checkNotNull(address);
+ Address.Builder builder = Address.newBuilder();
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress inetAddress = (InetSocketAddress) address;
+ builder.setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(
+ ByteString.copyFrom(inetAddress.getAddress().getAddress()))
+ .setPort(inetAddress.getPort())
+ .build());
+ } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) {
+ builder.setUdsAddress(
+ UdsAddress
+ .newBuilder()
+ .setFilename(address.toString()) // DomainSocketAddress.toString returns filename
+ .build());
+ } else {
+ builder.setOtherAddress(OtherAddress.newBuilder().setName(address.toString()).build());
+ }
+ return builder.build();
+ }
+
+ static SocketData extractSocketData(SocketStats socketStats) {
+ SocketData.Builder builder = SocketData.newBuilder();
+ if (socketStats.data != null) {
+ TransportStats s = socketStats.data;
+ builder
+ .setStreamsStarted(s.streamsStarted)
+ .setStreamsSucceeded(s.streamsSucceeded)
+ .setStreamsFailed(s.streamsFailed)
+ .setMessagesSent(s.messagesSent)
+ .setMessagesReceived(s.messagesReceived)
+ .setKeepAlivesSent(s.keepAlivesSent)
+ .setLastLocalStreamCreatedTimestamp(
+ Timestamps.fromNanos(s.lastLocalStreamCreatedTimeNanos))
+ .setLastRemoteStreamCreatedTimestamp(
+ Timestamps.fromNanos(s.lastRemoteStreamCreatedTimeNanos))
+ .setLastMessageSentTimestamp(
+ Timestamps.fromNanos(s.lastMessageSentTimeNanos))
+ .setLastMessageReceivedTimestamp(
+ Timestamps.fromNanos(s.lastMessageReceivedTimeNanos))
+ .setLocalFlowControlWindow(
+ Int64Value.of(s.localFlowControlWindow))
+ .setRemoteFlowControlWindow(
+ Int64Value.of(s.remoteFlowControlWindow));
+ }
+ builder.addAllOption(toSocketOptionsList(socketStats.socketOptions));
+ return builder.build();
+ }
+
+ public static final String SO_LINGER = "SO_LINGER";
+ public static final String SO_TIMEOUT = "SO_TIMEOUT";
+ public static final String TCP_INFO = "TCP_INFO";
+
+ static SocketOption toSocketOptionLinger(int lingerSeconds) {
+ final SocketOptionLinger lingerOpt;
+ if (lingerSeconds >= 0) {
+ lingerOpt = SocketOptionLinger
+ .newBuilder()
+ .setActive(true)
+ .setDuration(Durations.fromSeconds(lingerSeconds))
+ .build();
+ } else {
+ lingerOpt = SocketOptionLinger.getDefaultInstance();
+ }
+ return SocketOption
+ .newBuilder()
+ .setName(SO_LINGER)
+ .setAdditional(Any.pack(lingerOpt))
+ .build();
+ }
+
+ static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) {
+ Preconditions.checkNotNull(name);
+ return SocketOption
+ .newBuilder()
+ .setName(name)
+ .setAdditional(
+ Any.pack(
+ SocketOptionTimeout
+ .newBuilder()
+ .setDuration(Durations.fromMillis(timeoutMillis))
+ .build()))
+ .build();
+ }
+
+ static SocketOption toSocketOptionTcpInfo(InternalChannelz.TcpInfo i) {
+ SocketOptionTcpInfo tcpInfo = SocketOptionTcpInfo.newBuilder()
+ .setTcpiState(i.state)
+ .setTcpiCaState(i.caState)
+ .setTcpiRetransmits(i.retransmits)
+ .setTcpiProbes(i.probes)
+ .setTcpiBackoff(i.backoff)
+ .setTcpiOptions(i.options)
+ .setTcpiSndWscale(i.sndWscale)
+ .setTcpiRcvWscale(i.rcvWscale)
+ .setTcpiRto(i.rto)
+ .setTcpiAto(i.ato)
+ .setTcpiSndMss(i.sndMss)
+ .setTcpiRcvMss(i.rcvMss)
+ .setTcpiUnacked(i.unacked)
+ .setTcpiSacked(i.sacked)
+ .setTcpiLost(i.lost)
+ .setTcpiRetrans(i.retrans)
+ .setTcpiFackets(i.fackets)
+ .setTcpiLastDataSent(i.lastDataSent)
+ .setTcpiLastAckSent(i.lastAckSent)
+ .setTcpiLastDataRecv(i.lastDataRecv)
+ .setTcpiLastAckRecv(i.lastAckRecv)
+ .setTcpiPmtu(i.pmtu)
+ .setTcpiRcvSsthresh(i.rcvSsthresh)
+ .setTcpiRtt(i.rtt)
+ .setTcpiRttvar(i.rttvar)
+ .setTcpiSndSsthresh(i.sndSsthresh)
+ .setTcpiSndCwnd(i.sndCwnd)
+ .setTcpiAdvmss(i.advmss)
+ .setTcpiReordering(i.reordering)
+ .build();
+ return SocketOption
+ .newBuilder()
+ .setName(TCP_INFO)
+ .setAdditional(Any.pack(tcpInfo))
+ .build();
+ }
+
+ static SocketOption toSocketOptionAdditional(String name, String value) {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(value);
+ return SocketOption.newBuilder().setName(name).setValue(value).build();
+ }
+
+ static List toSocketOptionsList(InternalChannelz.SocketOptions options) {
+ Preconditions.checkNotNull(options);
+ List ret = new ArrayList<>();
+ if (options.lingerSeconds != null) {
+ ret.add(toSocketOptionLinger(options.lingerSeconds));
+ }
+ if (options.soTimeoutMillis != null) {
+ ret.add(toSocketOptionTimeout(SO_TIMEOUT, options.soTimeoutMillis));
+ }
+ if (options.tcpInfo != null) {
+ ret.add(toSocketOptionTcpInfo(options.tcpInfo));
+ }
+ for (Map.Entry entry : options.others.entrySet()) {
+ ret.add(toSocketOptionAdditional(entry.getKey(), entry.getValue()));
+ }
+ return ret;
+ }
+
+ static Channel toChannel(InternalInstrumented channel) {
+ ChannelStats stats = getFuture(channel.getStats());
+ Channel.Builder channelBuilder = Channel
+ .newBuilder()
+ .setRef(toChannelRef(channel))
+ .setData(extractChannelData(stats));
+ for (InternalWithLogId subchannel : stats.subchannels) {
+ channelBuilder.addSubchannelRef(toSubchannelRef(subchannel));
+ }
+
+ return channelBuilder.build();
+ }
+
+ static ChannelData extractChannelData(InternalChannelz.ChannelStats stats) {
+ ChannelData.Builder builder = ChannelData.newBuilder();
+ builder.setTarget(stats.target)
+ .setState(toChannelConnectivityState(stats.state))
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos));
+ if (stats.channelTrace != null) {
+ builder.setTrace(toChannelTrace(stats.channelTrace));
+ }
+ return builder.build();
+ }
+
+ static ChannelConnectivityState toChannelConnectivityState(ConnectivityState s) {
+ return ChannelConnectivityState.newBuilder().setState(toState(s)).build();
+ }
+
+ private static ChannelTrace toChannelTrace(InternalChannelz.ChannelTrace channelTrace) {
+ return ChannelTrace.newBuilder()
+ .setNumEventsLogged(channelTrace.numEventsLogged)
+ .setCreationTimestamp(Timestamps.fromNanos(channelTrace.creationTimeNanos))
+ .addAllEvents(toChannelTraceEvents(channelTrace.events))
+ .build();
+ }
+
+ private static List toChannelTraceEvents(List events) {
+ List channelTraceEvents = new ArrayList<>();
+ for (Event event : events) {
+ ChannelTraceEvent.Builder builder = ChannelTraceEvent.newBuilder()
+ .setDescription(event.description)
+ .setSeverity(Severity.valueOf(event.severity.name()))
+ .setTimestamp(Timestamps.fromNanos(event.timestampNanos));
+ if (event.channelRef != null) {
+ builder.setChannelRef(toChannelRef(event.channelRef));
+ }
+ if (event.subchannelRef != null) {
+ builder.setSubchannelRef(toSubchannelRef(event.subchannelRef));
+ }
+ channelTraceEvents.add(builder.build());
+ }
+ return Collections.unmodifiableList(channelTraceEvents);
+ }
+
+ static State toState(ConnectivityState state) {
+ if (state == null) {
+ return State.UNKNOWN;
+ }
+ try {
+ return Enum.valueOf(State.class, state.name());
+ } catch (IllegalArgumentException e) {
+ return State.UNKNOWN;
+ }
+ }
+
+ static Subchannel toSubchannel(InternalInstrumented subchannel) {
+ ChannelStats stats = getFuture(subchannel.getStats());
+ Subchannel.Builder subchannelBuilder = Subchannel
+ .newBuilder()
+ .setRef(toSubchannelRef(subchannel))
+ .setData(extractChannelData(stats));
+ Preconditions.checkState(stats.sockets.isEmpty() || stats.subchannels.isEmpty());
+ for (InternalWithLogId childSocket : stats.sockets) {
+ subchannelBuilder.addSocketRef(toSocketRef(childSocket));
+ }
+ for (InternalWithLogId childSubchannel : stats.subchannels) {
+ subchannelBuilder.addSubchannelRef(toSubchannelRef(childSubchannel));
+ }
+ return subchannelBuilder.build();
+ }
+
+ static GetTopChannelsResponse toGetTopChannelResponse(RootChannelList rootChannels) {
+ GetTopChannelsResponse.Builder responseBuilder = GetTopChannelsResponse
+ .newBuilder()
+ .setEnd(rootChannels.end);
+ for (InternalInstrumented c : rootChannels.channels) {
+ responseBuilder.addChannel(ChannelzProtoUtil.toChannel(c));
+ }
+ return responseBuilder.build();
+ }
+
+ static GetServersResponse toGetServersResponse(ServerList servers) {
+ GetServersResponse.Builder responseBuilder = GetServersResponse
+ .newBuilder()
+ .setEnd(servers.end);
+ for (InternalInstrumented s : servers.servers) {
+ responseBuilder.addServer(ChannelzProtoUtil.toServer(s));
+ }
+ return responseBuilder.build();
+ }
+
+ static GetServerSocketsResponse toGetServerSocketsResponse(ServerSocketsList serverSockets) {
+ GetServerSocketsResponse.Builder responseBuilder = GetServerSocketsResponse
+ .newBuilder()
+ .setEnd(serverSockets.end);
+ for (InternalWithLogId s : serverSockets.sockets) {
+ responseBuilder.addSocketRef(ChannelzProtoUtil.toSocketRef(s));
+ }
+ return responseBuilder.build();
+ }
+
+ private static T getFuture(ListenableFuture future) {
+ try {
+ T ret = future.get();
+ if (ret == null) {
+ throw Status.UNIMPLEMENTED
+ .withDescription("The entity's stats can not be retrieved. "
+ + "If this is an InProcessTransport this is expected.")
+ .asRuntimeException();
+ }
+ return ret;
+ } catch (InterruptedException e) {
+ throw Status.INTERNAL.withCause(e).asRuntimeException();
+ } catch (ExecutionException e) {
+ throw Status.INTERNAL.withCause(e).asRuntimeException();
+ }
+ }
+}
diff --git a/services/src/main/java/io/grpc/services/ChannelzService.java b/services/src/main/java/io/grpc/services/ChannelzService.java
new file mode 100644
index 00000000000..07e9e7ad035
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/ChannelzService.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.ExperimentalApi;
+import io.grpc.InternalChannelz;
+import io.grpc.InternalChannelz.ChannelStats;
+import io.grpc.InternalChannelz.ServerList;
+import io.grpc.InternalChannelz.ServerSocketsList;
+import io.grpc.InternalChannelz.ServerStats;
+import io.grpc.InternalChannelz.SocketStats;
+import io.grpc.InternalInstrumented;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.channelz.v1.ChannelzGrpc;
+import io.grpc.channelz.v1.GetChannelRequest;
+import io.grpc.channelz.v1.GetChannelResponse;
+import io.grpc.channelz.v1.GetServerRequest;
+import io.grpc.channelz.v1.GetServerResponse;
+import io.grpc.channelz.v1.GetServerSocketsRequest;
+import io.grpc.channelz.v1.GetServerSocketsResponse;
+import io.grpc.channelz.v1.GetServersRequest;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetSocketRequest;
+import io.grpc.channelz.v1.GetSocketResponse;
+import io.grpc.channelz.v1.GetSubchannelRequest;
+import io.grpc.channelz.v1.GetSubchannelResponse;
+import io.grpc.channelz.v1.GetTopChannelsRequest;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * The channelz service provides stats about a running gRPC process.
+ *
+ * @deprecated Use {@link io.grpc.protobuf.services.ChannelzService} instead.
+ */
+@Deprecated
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4206")
+public final class ChannelzService extends ChannelzGrpc.ChannelzImplBase {
+ private final InternalChannelz channelz;
+ private final int maxPageSize;
+
+ /**
+ * Creates an instance.
+ */
+ public static ChannelzService newInstance(int maxPageSize) {
+ return new ChannelzService(InternalChannelz.instance(), maxPageSize);
+ }
+
+ @VisibleForTesting
+ ChannelzService(InternalChannelz channelz, int maxPageSize) {
+ this.channelz = channelz;
+ this.maxPageSize = maxPageSize;
+ }
+
+ /** Returns top level channel aka {@link io.grpc.ManagedChannel}. */
+ @Override
+ public void getTopChannels(
+ GetTopChannelsRequest request, StreamObserver responseObserver) {
+ InternalChannelz.RootChannelList rootChannels
+ = channelz.getRootChannels(request.getStartChannelId(), maxPageSize);
+
+ GetTopChannelsResponse resp;
+ try {
+ resp = ChannelzProtoUtil.toGetTopChannelResponse(rootChannels);
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a top level channel aka {@link io.grpc.ManagedChannel}. */
+ @Override
+ public void getChannel(
+ GetChannelRequest request, StreamObserver responseObserver) {
+ InternalInstrumented s = channelz.getRootChannel(request.getChannelId());
+ if (s == null) {
+ responseObserver.onError(
+ Status.NOT_FOUND.withDescription("Can't find channel " + request.getChannelId())
+ .asRuntimeException());
+ return;
+ }
+
+ GetChannelResponse resp;
+ try {
+ resp = GetChannelResponse
+ .newBuilder()
+ .setChannel(ChannelzProtoUtil.toChannel(s))
+ .build();
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ /** Returns servers. */
+ @Override
+ public void getServers(
+ GetServersRequest request, StreamObserver responseObserver) {
+ ServerList servers = channelz.getServers(request.getStartServerId(), maxPageSize);
+
+ GetServersResponse resp;
+ try {
+ resp = ChannelzProtoUtil.toGetServersResponse(servers);
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a server. */
+ @Override
+ public void getServer(
+ GetServerRequest request, StreamObserver responseObserver) {
+ InternalInstrumented s = channelz.getServer(request.getServerId());
+ if (s == null) {
+ responseObserver.onError(
+ Status.NOT_FOUND.withDescription("Can't find server " + request.getServerId())
+ .asRuntimeException());
+ return;
+ }
+
+ GetServerResponse resp;
+ try {
+ resp = GetServerResponse
+ .newBuilder()
+ .setServer(ChannelzProtoUtil.toServer(s))
+ .build();
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a subchannel. */
+ @Override
+ public void getSubchannel(
+ GetSubchannelRequest request, StreamObserver responseObserver) {
+ InternalInstrumented s = channelz.getSubchannel(request.getSubchannelId());
+ if (s == null) {
+ responseObserver.onError(
+ Status.NOT_FOUND.withDescription("Can't find subchannel " + request.getSubchannelId())
+ .asRuntimeException());
+ return;
+ }
+
+ GetSubchannelResponse resp;
+ try {
+ resp = GetSubchannelResponse
+ .newBuilder()
+ .setSubchannel(ChannelzProtoUtil.toSubchannel(s))
+ .build();
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a socket. */
+ @Override
+ public void getSocket(
+ GetSocketRequest request, StreamObserver responseObserver) {
+ InternalInstrumented s = channelz.getSocket(request.getSocketId());
+ if (s == null) {
+ responseObserver.onError(
+ Status.NOT_FOUND.withDescription("Can't find socket " + request.getSocketId())
+ .asRuntimeException());
+ return;
+ }
+
+ GetSocketResponse resp;
+ try {
+ resp =
+ GetSocketResponse.newBuilder().setSocket(ChannelzProtoUtil.toSocket(s)).build();
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getServerSockets(
+ GetServerSocketsRequest request, StreamObserver responseObserver) {
+ ServerSocketsList serverSockets
+ = channelz.getServerSockets(request.getServerId(), request.getStartSocketId(), maxPageSize);
+ if (serverSockets == null) {
+ responseObserver.onError(
+ Status.NOT_FOUND.withDescription("Can't find server " + request.getServerId())
+ .asRuntimeException());
+ return;
+ }
+
+ GetServerSocketsResponse resp;
+ try {
+ resp = ChannelzProtoUtil.toGetServerSocketsResponse(serverSockets);
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ responseObserver.onNext(resp);
+ responseObserver.onCompleted();
+ }
+}
diff --git a/services/src/main/java/io/grpc/services/InetAddressUtil.java b/services/src/main/java/io/grpc/services/InetAddressUtil.java
new file mode 100644
index 00000000000..057a8ccb5e6
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/InetAddressUtil.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.primitives.Ints;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.util.Arrays;
+
+// This is copied from guava 20.0 because it is a @Beta api
+final class InetAddressUtil {
+ private static final int IPV6_PART_COUNT = 8;
+
+ public static String toAddrString(InetAddress ip) {
+ checkNotNull(ip);
+ if (ip instanceof Inet4Address) {
+ // For IPv4, Java's formatting is good enough.
+ return ip.getHostAddress();
+ }
+ checkArgument(ip instanceof Inet6Address);
+ byte[] bytes = ip.getAddress();
+ int[] hextets = new int[IPV6_PART_COUNT];
+ for (int i = 0; i < hextets.length; i++) {
+ hextets[i] = Ints.fromBytes((byte) 0, (byte) 0, bytes[2 * i], bytes[2 * i + 1]);
+ }
+ compressLongestRunOfZeroes(hextets);
+ return hextetsToIPv6String(hextets);
+ }
+
+ private static void compressLongestRunOfZeroes(int[] hextets) {
+ int bestRunStart = -1;
+ int bestRunLength = -1;
+ int runStart = -1;
+ for (int i = 0; i < hextets.length + 1; i++) {
+ if (i < hextets.length && hextets[i] == 0) {
+ if (runStart < 0) {
+ runStart = i;
+ }
+ } else if (runStart >= 0) {
+ int runLength = i - runStart;
+ if (runLength > bestRunLength) {
+ bestRunStart = runStart;
+ bestRunLength = runLength;
+ }
+ runStart = -1;
+ }
+ }
+ if (bestRunLength >= 2) {
+ Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
+ }
+ }
+
+ private static String hextetsToIPv6String(int[] hextets) {
+ // While scanning the array, handle these state transitions:
+ // start->num => "num" start->gap => "::"
+ // num->num => ":num" num->gap => "::"
+ // gap->num => "num" gap->gap => ""
+ StringBuilder buf = new StringBuilder(39);
+ boolean lastWasNumber = false;
+ for (int i = 0; i < hextets.length; i++) {
+ boolean thisIsNumber = hextets[i] >= 0;
+ if (thisIsNumber) {
+ if (lastWasNumber) {
+ buf.append(':');
+ }
+ buf.append(Integer.toHexString(hextets[i]));
+ } else {
+ if (i == 0 || lastWasNumber) {
+ buf.append("::");
+ }
+ }
+ lastWasNumber = thisIsNumber;
+ }
+ return buf.toString();
+ }
+}
\ No newline at end of file
diff --git a/services/src/main/java/io/grpc/services/TempFileSink.java b/services/src/main/java/io/grpc/services/TempFileSink.java
new file mode 100644
index 00000000000..c28339d1bd6
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/TempFileSink.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.protobuf.MessageLite;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The output file goes to the JVM's temp dir with a prefix of BINARY_INFO. The proto messages
+ * are written serially using {@link MessageLite#writeDelimitedTo(OutputStream)}.
+ */
+class TempFileSink implements BinaryLogSink {
+ private static final Logger logger = Logger.getLogger(TempFileSink.class.getName());
+
+ private final String outPath;
+ private final OutputStream out;
+ private boolean closed;
+
+ TempFileSink() throws IOException {
+ File outFile = File.createTempFile("BINARY_INFO.", "");
+ outPath = outFile.getPath();
+ logger.log(Level.INFO, "Writing binary logs to to {0}", outFile.getAbsolutePath());
+ out = new BufferedOutputStream(new FileOutputStream(outFile));
+ }
+
+ String getPath() {
+ return this.outPath;
+ }
+
+ @Override
+ public synchronized void write(MessageLite message) {
+ if (closed) {
+ logger.log(Level.FINEST, "Attempt to write after TempFileSink is closed.");
+ return;
+ }
+ try {
+ message.writeDelimitedTo(out);
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Caught exception while writing", e);
+ closeQuietly();
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ }
+ }
+
+ private synchronized void closeQuietly() {
+ try {
+ close();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Caught exception while closing", e);
+ }
+ }
+}