Skip to content

Commit

Permalink
observability: implement client interceptor for logging (grpc#8956)
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya authored and temawi committed Apr 8, 2022
1 parent 3bf48e4 commit 6718fcc
Show file tree
Hide file tree
Showing 12 changed files with 1,995 additions and 9 deletions.
Expand Up @@ -20,11 +20,14 @@
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;

/** The main class for gRPC Observability features. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class Observability {
private static boolean initialized = false;
private static final String PROJECT_ID = "PROJECT";

/**
* Initialize grpc-observability.
Expand All @@ -35,13 +38,17 @@ public static synchronized void grpcInit() {
if (initialized) {
throw new IllegalStateException("Observability already initialized!");
}
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl());
// TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId
Sink sink = new GcpLogSink(PROJECT_ID);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink));
LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl());
// TODO(sanjaypujare): initialize customTags map
initialized = true;
}

/** Un-initialize or finish grpc-observability. */
// TODO(sanjaypujare): Once Observability is made into Singleton object,
// close() on sink will be called as part of grpcFinish()
public static synchronized void grpcFinish() {
if (!initialized) {
throw new IllegalStateException("Observability not initialized!");
Expand Down
@@ -0,0 +1,94 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.observability.interceptors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.primitives.Ints;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.util.Arrays;

// This is copied from guava 20.0 because it is a @Beta api
final class InetAddressUtil {
private static final int IPV6_PART_COUNT = 8;

public static String toAddrString(InetAddress ip) {
checkNotNull(ip);
if (ip instanceof Inet4Address) {
// For IPv4, Java's formatting is good enough.
return ip.getHostAddress();
}
checkArgument(ip instanceof Inet6Address);
byte[] bytes = ip.getAddress();
int[] hextets = new int[IPV6_PART_COUNT];
for (int i = 0; i < hextets.length; i++) {
hextets[i] = Ints.fromBytes((byte) 0, (byte) 0, bytes[2 * i], bytes[2 * i + 1]);
}
compressLongestRunOfZeroes(hextets);
return hextetsToIPv6String(hextets);
}

private static void compressLongestRunOfZeroes(int[] hextets) {
int bestRunStart = -1;
int bestRunLength = -1;
int runStart = -1;
for (int i = 0; i < hextets.length + 1; i++) {
if (i < hextets.length && hextets[i] == 0) {
if (runStart < 0) {
runStart = i;
}
} else if (runStart >= 0) {
int runLength = i - runStart;
if (runLength > bestRunLength) {
bestRunStart = runStart;
bestRunLength = runLength;
}
runStart = -1;
}
}
if (bestRunLength >= 2) {
Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
}
}

private static String hextetsToIPv6String(int[] hextets) {
// While scanning the array, handle these state transitions:
// start->num => "num" start->gap => "::"
// num->num => ":num" num->gap => "::"
// gap->num => "num" gap->gap => ""
StringBuilder buf = new StringBuilder(39);
boolean lastWasNumber = false;
for (int i = 0; i < hextets.length; i++) {
boolean thisIsNumber = hextets[i] >= 0;
if (thisIsNumber) {
if (lastWasNumber) {
buf.append(':');
}
buf.append(Integer.toHexString(hextets[i]));
} else {
if (i == 0 || lastWasNumber) {
buf.append("::");
}
}
lastWasNumber = thisIsNumber;
}
return buf.toString();
}
}
Expand Up @@ -16,33 +16,235 @@

package io.grpc.observability.interceptors;

import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Internal;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/** A logging interceptor for {@code LoggingChannelProvider}. */
/**
* A logging interceptor for {@code LoggingChannelProvider}.
*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingChannelInterceptor.class.getName());

private final LogHelper helper;

public interface Factory {
ClientInterceptor create();
}

public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;

static LogHelper createLogHelper(Sink sink, TimeProvider provider) {
return new LogHelper(sink, provider);
}

public FactoryImpl(Sink sink) {
this.sink = sink;
this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
}

@Override
public ClientInterceptor create() {
return new InternalLoggingChannelInterceptor();
return new InternalLoggingChannelInterceptor(helper);
}

/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
}
}

private InternalLoggingChannelInterceptor(LogHelper helper) {
this.helper = helper;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// TODO(dnvindhya) implement the interceptor
return null;

final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString();
final String authority = next.authority();
final String serviceName = method.getServiceName();
final String methodName = method.getBareMethodName();
// Get the stricter deadline to calculate the timeout once the call starts
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());

// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(method.getFullMethodName())) {
return next.newCall(method, callOptions);
}

return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Event: EventType.GRPC_CALL_REQUEST_HEADER
// The timeout should reflect the time remaining when the call is started, so compute
// remaining time here.
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));

try {
helper.logRequestHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}

Listener<RespT> observabilityListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.onMessage(message);
}

@Override
public void onHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.onHeaders(headers);
}

@Override
public void onClose(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
status,
trailers,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.onClose(status, trailers);
}
};
super.start(observabilityListener, headers);
}

@Override
public void sendMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.sendMessage(message);
}

@Override
public void halfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.halfClose();
}

@Override
public void cancel(String message, Throwable cause) {
// Event: EventType.GRPC_CALL_CANCEL
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.cancel(message, cause);
}
};
}
}

0 comments on commit 6718fcc

Please sign in to comment.