Skip to content

Commit

Permalink
gcp-observability: Update logging fields for GA and use custom Batchi…
Browse files Browse the repository at this point in the history
…ngSettings (grpc#9959)

This commit updates the following in gcp observability logging schema
* `payload.status_code` will be of type `google.rpc.Code` instead of `uint32`.
*  names in enum `Address.TYPE`

Use custom batching settings for [LoggingOptions](https://javadoc.io/doc/com.google.cloud/google-cloud-logging/latest/com/google/cloud/logging/LoggingOptions.html)

Note: Upgraded `com.google.cloud:google-cloud-logging` from `3.6.1` to `3.14.5`.
  • Loading branch information
DNVindhya committed Mar 21, 2023
1 parent 051e397 commit 0ee55ea
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 20 deletions.
16 changes: 8 additions & 8 deletions gcp-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ tasks.named("compileJava").configure {
}

dependencies {
def cloudLoggingVersion = '3.6.1'
def cloudLoggingVersion = '3.14.5'

annotationProcessor libraries.auto.value
api project(':grpc-api')


// TODO(dnvindhya): Prefer using our own libraries, update the dependencies
// in gradle/libs.versions instead
implementation project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-alts'),
Expand All @@ -35,12 +37,10 @@ dependencies {
libraries.opencensus.exporter.trace.stackdriver,
project(':grpc-xds'), // Align grpc versions
project(':grpc-services'), // Align grpc versions
libraries.animalsniffer.annotations, // Prefer our version
libraries.google.auth.credentials, // Prefer our version
libraries.protobuf.java.util, // Prefer our version
libraries.gson, // Prefer our version
libraries.perfmark.api, // Prefer our version
libraries.re2j, // Prefer our version
('com.google.protobuf:protobuf-java:3.21.12'),
('com.google.api.grpc:proto-google-common-protos:2.14.2'),
('com.google.auth:google-auth-library-oauth2-http:1.16.0'),
('io.opencensus:opencensus-api:0.31.1'),
('com.google.guava:guava:31.1-jre')

runtimeOnly libraries.opencensus.impl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Joiner;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.rpc.Code;
import io.grpc.Attributes;
import io.grpc.Deadline;
import io.grpc.Grpc;
Expand Down Expand Up @@ -182,7 +183,7 @@ void logTrailer(

PayloadBuilderHelper<Payload.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes);
pair.payloadBuilder.setStatusCode(status.getCode().value());
pair.payloadBuilder.setStatusCode(Code.forNumber(status.getCode().value()));
String statusDescription = status.getDescription();
if (statusDescription != null) {
pair.payloadBuilder.setStatusMessage(statusDescription);
Expand Down Expand Up @@ -404,10 +405,10 @@ static Address socketAddressToProto(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
if (inetAddress instanceof Inet4Address) {
builder.setType(Address.Type.TYPE_IPV4)
builder.setType(Address.Type.IPV4)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else if (inetAddress instanceof Inet6Address) {
builder.setType(Address.Type.TYPE_IPV6)
builder.setType(Address.Type.IPV6)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else {
logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
Expand All @@ -417,7 +418,7 @@ static Address socketAddressToProto(SocketAddress address) {
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
// To avoid a compiled time dependency on grpc-netty, we check against the
// runtime class name.
builder.setType(Address.Type.TYPE_UNIX)
builder.setType(Address.Type.UNIX)
.setAddress(address.toString());
} else {
builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowController;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.logging.Payload.JsonPayload;
import com.google.cloud.logging.Severity;
import com.google.cloud.logging.v2.stub.LoggingServiceV2StubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand All @@ -41,6 +44,7 @@
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* Sink for Google Cloud Logging.
Expand Down Expand Up @@ -102,6 +106,7 @@ public void write(GrpcLogRecord logProto) {
if (servicesToExclude.contains(logProto.getServiceName())) {
return;
}
LogEntry grpcLogEntry = null;
try {
GrpcLogRecord.EventType eventType = logProto.getType();
// TODO(DNVindhya): make sure all (int, long) values are not displayed as double
Expand All @@ -117,11 +122,18 @@ public void write(GrpcLogRecord logProto) {
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
LogEntry grpcLogEntry = grpcLogEntryBuilder.build();
grpcLogEntry = grpcLogEntryBuilder.build();
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
}
} catch (FlowController.FlowControlRuntimeException e) {
String grpcLogEntryString = null;
if (grpcLogEntry != null) {
grpcLogEntryString = grpcLogEntry.toStructuredJsonString();
}
logger.log(Level.SEVERE, "Limit exceeded while writing log entry to cloud logging");
logger.log(Level.SEVERE, "Log entry = ", grpcLogEntryString);
} catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
}
Expand All @@ -132,6 +144,16 @@ Logging createLoggingClient() {
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
BatchingSettings loggingDefaultBatchingSettings = LoggingServiceV2StubSettings.newBuilder()
.writeLogEntriesSettings().getBatchingSettings();
// Custom batching settings
BatchingSettings grpcLoggingVBatchingSettings = loggingDefaultBatchingSettings.toBuilder()
.setDelayThreshold(Duration.ofSeconds(1L)).setFlowControlSettings(
loggingDefaultBatchingSettings.getFlowControlSettings().toBuilder()
.setMaxOutstandingRequestBytes(52428800L) //50 MiB
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build()).build();
builder.setBatchingSettings(grpcLoggingVBatchingSettings);
return builder.build().getService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpc.observabilitylog.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/rpc/code.proto";

option java_multiple_files = true;
option java_package = "io.grpc.observabilitylog.v1";
Expand Down Expand Up @@ -97,7 +98,7 @@ message Payload {
// the RPC timeout value
google.protobuf.Duration timeout = 2;
// The gRPC status code
uint32 status_code = 3;
google.rpc.Code status_code = 3;
// The gRPC status message
string status_message = 4;
// The value of the grpc-status-details-bin metadata key, if any.
Expand All @@ -115,9 +116,9 @@ message Payload {
message Address {
enum Type {
TYPE_UNKNOWN = 0;
TYPE_IPV4 = 1; // in 1.2.3.4 form
TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
TYPE_UNIX = 3; // UDS string
IPV4 = 1; // in 1.2.3.4 form
IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
UNIX = 3; // UDS string
}
Type type = 1;
string address = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void socketToProto_ipv4() throws Exception {
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV4)
.setType(Address.Type.IPV4)
.setAddress("127.0.0.1")
.setIpPort(12345)
.build());
Expand All @@ -109,7 +110,7 @@ public void socketToProto_ipv6() throws Exception {
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV6)
.setType(Address.Type.IPV6)
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
.setIpPort(12345)
.build());
Expand Down Expand Up @@ -454,7 +455,7 @@ public void logTrailer() throws Exception {
builder.setPeer(LogHelper.socketAddressToProto(peer));
builder.setPayload(
builder.getPayload().toBuilder()
.setStatusCode(Status.INTERNAL.getCode().value())
.setStatusCode(Code.forNumber(Status.INTERNAL.getCode().value()))
.setStatusMessage("test description")
.build());
GrpcLogRecord base = builder.build();
Expand Down

0 comments on commit 0ee55ea

Please sign in to comment.