Skip to content

Commit

Permalink
observability: add configurable value for flush from configuration (g…
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya committed Mar 31, 2022
1 parent b20ce17 commit 86e3362
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 245 deletions.
Expand Up @@ -46,7 +46,8 @@ public static synchronized GcpObservability grpcInit() throws IOException {
GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), 10);
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig.getFlushMessageCount());
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink,
Expand Down
Expand Up @@ -26,6 +26,9 @@ public interface ObservabilityConfig {
/** Get destination project ID - where logs will go. */
String getDestinationProjectId();

/** Get message count threshold to flush - flush once message count is reached. */
Long getFlushMessageCount();

/** Get filters set for logging. */
List<LogFilter> getLogFilters();

Expand Down
Expand Up @@ -34,6 +34,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {

private boolean enableCloudLogging = true;
private String destinationProjectId = null;
private Long flushMessageCount = null;
private List<LogFilter> logFilters;
private List<EventType> eventTypes;

Expand All @@ -56,6 +57,7 @@ private void parseLoggingConfig(Map<String, ?> loggingConfig) {
enableCloudLogging = value;
}
destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id");
flushMessageCount = JsonUtil.getNumberAsLong(loggingConfig, "flush_message_count");
List<?> rawList = JsonUtil.getList(loggingConfig, "log_filters");
if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
Expand Down Expand Up @@ -116,6 +118,11 @@ public String getDestinationProjectId() {
return destinationProjectId;
}

@Override
public Long getFlushMessageCount() {
return flushMessageCount;
}

@Override
public List<LogFilter> getLogFilters() {
return logFilters;
Expand Down
Expand Up @@ -210,12 +210,6 @@ public boolean isEventToBeLogged(EventType event) {
if (logEventTypeSet == null) {
return true;
}
boolean logEvent;
if (logEventTypeSet.isEmpty()) {
logEvent = false;
} else {
logEvent = logEventTypeSet.contains(event);
}
return logEvent;
return logEventTypeSet.contains(event);
}
}

This file was deleted.

Expand Up @@ -49,12 +49,12 @@ public class GcpLogSink implements Sink {
private static final Set<String> kubernetesResourceLabelSet
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name");
private static final int FALLBACK_FLUSH_LIMIT = 100;
private static final long FALLBACK_FLUSH_LIMIT = 100L;
private final Map<String, String> customTags;
private final Logging gcpLoggingClient;
private final MonitoredResource kubernetesResource;
private final int flushLimit;
private int flushCounter;
private final Long flushLimit;
private long flushCounter;

private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
Expand All @@ -70,19 +70,19 @@ private static Logging createLoggingClient(String projectId) {
* @param destinationProjectId cloud project id to write logs
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, int flushLimit) {
Map<String, String> customTags, Long flushLimit) {
this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit);

}

@VisibleForTesting
GcpLogSink(Logging client, Map<String, String> locationTags, Map<String, String> customTags,
int flushLimit) {
Long flushLimit) {
this.gcpLoggingClient = client;
this.customTags = customTags != null ? customTags : new HashMap<>();
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != 0 ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0;
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0L;
}

/**
Expand Down Expand Up @@ -116,10 +116,10 @@ public void write(GrpcLogRecord logProto) {
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
flushCounter += 1;
flushCounter = ++flushCounter;
if (flushCounter >= flushLimit) {
gcpLoggingClient.flush();
flushCounter = 0;
flushCounter = 0L;
}
}
} catch (Exception e) {
Expand Down

This file was deleted.

Expand Up @@ -65,7 +65,7 @@ public class LoggingTest {
private static final Map<String, String> customTags = ImmutableMap.of(
"KEY1", "Value1",
"KEY2", "VALUE2");
private static final int flushLimit = 100;
private static final long flushLimit = 100L;

/**
* Cloud logging test using LoggingChannelProvider and LoggingServerProvider.
Expand Down

0 comments on commit 86e3362

Please sign in to comment.