Skip to content

Commit

Permalink
observability: integrate globalTags and configuration into rest of ob…
Browse files Browse the repository at this point in the history
…servability (#9000)

* observability: integrate globalTags and configuration into rest of observability
wire observabilityConfig and globalTags into Observability and make
these available to the channel and server interceptors and specifically
to the LogHelper. Also separate globalTags into custom-tags and location-tags as required by the log-helper
  • Loading branch information
sanjaypujare committed Mar 21, 2022
1 parent c772eb0 commit 2d7302d
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 184 deletions.
Expand Up @@ -39,16 +39,23 @@ final class GlobalLoggingTags {
private static final Logger logger = Logger.getLogger(GlobalLoggingTags.class.getName());

private static final String ENV_KEY_PREFIX = "GRPC_OBSERVABILITY_";
private final Map<String, String> tags;
private final Map<String, String> locationTags;
private final Map<String, String> customTags;

GlobalLoggingTags() {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
populate(builder);
tags = builder.build();
ImmutableMap.Builder<String, String> locationTagsBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, String> customTagsBuilder = ImmutableMap.builder();
populate(locationTagsBuilder, customTagsBuilder);
locationTags = locationTagsBuilder.build();
customTags = customTagsBuilder.build();
}

Map<String, String> getTags() {
return tags;
Map<String, String> getLocationTags() {
return locationTags;
}

Map<String, String> getCustomTags() {
return customTags;
}

@VisibleForTesting
Expand Down Expand Up @@ -139,10 +146,11 @@ static void populateFromMap(Map<String, String> map,
});
}

static void populate(ImmutableMap.Builder<String, String> customTags) {
static void populate(ImmutableMap.Builder<String, String> locationTags,
ImmutableMap.Builder<String, String> customTags) {
populateFromEnvironmentVars(customTags);
populateFromMetadataServer(customTags);
populateFromKubernetesValues(customTags,
populateFromMetadataServer(locationTags);
populateFromKubernetesValues(locationTags,
"/var/run/secrets/kubernetes.io/serviceaccount/namespace",
"/etc/hostname", "/proc/self/cgroup");
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ static synchronized void init(InternalLoggingChannelInterceptor.Factory factory)
ManagedChannelRegistry.getDefaultRegistry().register(instance);
}

static synchronized void finish() {
static synchronized void shutdown() {
if (instance == null) {
throw new IllegalStateException("LoggingChannelProvider not initialized!");
}
Expand Down
Expand Up @@ -45,7 +45,7 @@ static synchronized void init(InternalLoggingServerInterceptor.Factory factory)
ServerRegistry.getDefaultRegistry().register(instance);
}

static synchronized void finish() {
static synchronized void shutdown() {
if (instance == null) {
throw new IllegalStateException("LoggingServerProvider not initialized!");
}
Expand Down
Expand Up @@ -16,49 +16,71 @@

package io.grpc.observability;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import java.io.IOException;

/** The main class for gRPC Observability features. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class Observability {
private static boolean initialized = false;
private static final String PROJECT_ID = "PROJECT";
private static Observability instance = null;
private final Sink sink;

/**
* Initialize grpc-observability.
*
* @throws ProviderNotFoundException if no underlying channel/server provider is available.
*/
public static synchronized void grpcInit() {
if (initialized) {
throw new IllegalStateException("Observability already initialized!");
public static synchronized Observability grpcInit() throws IOException {
if (instance == null) {
GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId());
instance = grpcInit(sink,
new InternalLoggingChannelInterceptor.FactoryImpl(sink,
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig),
new InternalLoggingServerInterceptor.FactoryImpl(sink,
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig));
}
return instance;
}

@VisibleForTesting static Observability grpcInit(Sink sink,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
if (instance == null) {
instance = new Observability(sink, channelInterceptorFactory, serverInterceptorFactory);
}
// TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId
Sink sink = new GcpLogSink(PROJECT_ID);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink));
LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(sink));
// TODO(sanjaypujare): initialize customTags map
initialized = true;
return instance;
}

/** Un-initialize or finish grpc-observability. */
// TODO(sanjaypujare): Once Observability is made into Singleton object,
// close() on sink will be called as part of grpcFinish()
public static synchronized void grpcFinish() {
if (!initialized) {
throw new IllegalStateException("Observability not initialized!");
/** Un-initialize/shutdown grpc-observability. */
public void grpcShutdown() {
synchronized (Observability.class) {
if (instance == null) {
throw new IllegalStateException("Observability already shutdown!");
}
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
LoggingChannelProvider.finish();
LoggingServerProvider.finish();
// TODO(sanjaypujare): finish customTags map
initialized = false;
}

private Observability() {
private Observability(Sink sink,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
this.sink = checkNotNull(sink);
LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
}
Expand Up @@ -16,31 +16,29 @@

package io.grpc.observability;

import static com.google.common.base.Preconditions.checkArgument;

import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/** gRPC Observability configuration processor. */
final class ObservabilityConfig {
private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY";
public interface ObservabilityConfig {
/** Is Cloud Logging enabled. */
boolean isEnableCloudLogging();

/** Get destination project ID - where logs will go. */
String getDestinationProjectId();

/** Get filters set for logging. */
LogFilter[] getLogFilters();

private boolean enableCloudLogging = true;
private String destinationProjectId = null;
private LogFilter[] logFilters;
private GrpcLogRecord.EventType[] eventTypes;
/** Get event types to log. */
EventType[] getEventTypes();

/** POJO for representing a filter used in configuration. */
static class LogFilter {
/**
* POJO for representing a filter used in configuration.
*/
public static class LogFilter {
/** Pattern indicating which service/method to log. */
public final String pattern;

/** Number of bytes of each header to log. */
/** Number of bytes of each header to log. */
public final Integer headerBytes;

/** Number of bytes of each header to log. */
Expand All @@ -52,88 +50,4 @@ static class LogFilter {
this.messageBytes = messageBytes;
}
}

static ObservabilityConfig getInstance() throws IOException {
ObservabilityConfig config = new ObservabilityConfig();
config.parse(System.getenv(CONFIG_ENV_VAR_NAME));
return config;
}

@SuppressWarnings("unchecked")
void parse(String config) throws IOException {
checkArgument(config != null, CONFIG_ENV_VAR_NAME + " value is null!");
parseLoggingConfig(
JsonUtil.getObject((Map<String, ?>) JsonParser.parse(config), "logging_config"));
}

private void parseLoggingConfig(Map<String,?> loggingConfig) {
if (loggingConfig != null) {
Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging");
if (value != null) {
enableCloudLogging = value;
}
destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id");
List<?> rawList = JsonUtil.getList(loggingConfig, "log_filters");
if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
this.logFilters = new LogFilter[jsonLogFilters.size()];
for (int i = 0; i < jsonLogFilters.size(); i++) {
this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i));
}
}
rawList = JsonUtil.getList(loggingConfig, "event_types");
if (rawList != null) {
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
this.eventTypes = new GrpcLogRecord.EventType[jsonEventTypes.size()];
for (int i = 0; i < jsonEventTypes.size(); i++) {
this.eventTypes[i] = convertEventType(jsonEventTypes.get(i));
}
}
}
}

private GrpcLogRecord.EventType convertEventType(String val) {
switch (val) {
case "GRPC_CALL_UNKNOWN":
return GrpcLogRecord.EventType.GRPC_CALL_UNKNOWN;
case "GRPC_CALL_REQUEST_HEADER":
return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_HEADER;
case "GRPC_CALL_RESPONSE_HEADER":
return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_HEADER;
case"GRPC_CALL_REQUEST_MESSAGE":
return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_MESSAGE;
case "GRPC_CALL_RESPONSE_MESSAGE":
return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_MESSAGE;
case "GRPC_CALL_TRAILER":
return GrpcLogRecord.EventType.GRPC_CALL_TRAILER;
case "GRPC_CALL_HALF_CLOSE":
return GrpcLogRecord.EventType.GRPC_CALL_HALF_CLOSE;
case "GRPC_CALL_CANCEL":
return GrpcLogRecord.EventType.GRPC_CALL_CANCEL;
default:
throw new IllegalArgumentException("Unknown event type value:" + val);
}
}

private LogFilter parseJsonLogFilter(Map<String,?> logFilterMap) {
return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"),
JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"),
JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes"));
}

public boolean isEnableCloudLogging() {
return enableCloudLogging;
}

public String getDestinationProjectId() {
return destinationProjectId;
}

public LogFilter[] getLogFilters() {
return logFilters;
}

public EventType[] getEventTypes() {
return eventTypes;
}
}

0 comments on commit 2d7302d

Please sign in to comment.