Skip to content

Commit

Permalink
[improve][pip] PIP-339: Introducing the --log-topic Option for Pulsar…
Browse files Browse the repository at this point in the history
… Sinks and Sources (apache#22185)
  • Loading branch information
jiangpengcheng committed Mar 13, 2024
1 parent 46c9ce9 commit 392549c
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 0 deletions.
Expand Up @@ -94,4 +94,5 @@ public class SinkConfig {
private String transformFunction;
private String transformFunctionClassName;
private String transformFunctionConfig;
private String logTopic;
}
Expand Up @@ -72,4 +72,5 @@ public class SourceConfig {
private BatchSourceConfig batchSourceConfig;
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
private String logTopic;
}
Expand Up @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+ "applied before the Sink")
protected String transformFunctionConfig;
@Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;

protected SinkConfig sinkConfig;

Expand Down Expand Up @@ -605,6 +607,9 @@ void processArguments() throws Exception {
if (transformFunctionConfig != null) {
sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
}
if (null != logTopic) {
sinkConfig.setLogTopic(logTopic);
}

// check if configs are valid
validateSinkConfigs(sinkConfig);
Expand Down
Expand Up @@ -365,6 +365,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
@Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
+ "how the secret is fetched by the underlying secrets provider")
protected String secretsString;
@Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;

protected SourceConfig sourceConfig;

Expand Down Expand Up @@ -500,6 +502,9 @@ void processArguments() throws Exception {
}
sourceConfig.setSecrets(secretsMap);
}
if (null != logTopic) {
sourceConfig.setLogTopic(logTopic);
}

// check if source configs are valid
validateSourceConfigs(sourceConfig);
Expand Down
Expand Up @@ -87,6 +87,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
if (sinkConfig.getName() != null) {
functionDetailsBuilder.setName(sinkConfig.getName());
}
if (sinkConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
Expand Down Expand Up @@ -321,6 +324,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(false);
}
if (!isEmpty(functionDetails.getLogTopic())) {
sinkConfig.setLogTopic(functionDetails.getLogTopic());
}

sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));

Expand Down Expand Up @@ -426,6 +432,12 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
}
}
if (!isEmpty(sinkConfig.getLogTopic())) {
if (!TopicName.isValid(sinkConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", sinkConfig.getLogTopic()));
}
}

if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Sink parallelism must be a positive number");
Expand Down Expand Up @@ -613,6 +625,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}

if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
Expand Down
Expand Up @@ -81,6 +81,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
if (sourceConfig.getName() != null) {
functionDetailsBuilder.setName(sourceConfig.getName());
}
if (sourceConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sourceConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
Expand Down Expand Up @@ -274,6 +277,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType()));
sourceConfig.setProducerConfig(producerConfig);
}
if (!isEmpty(functionDetails.getLogTopic())) {
sourceConfig.setLogTopic(functionDetails.getLogTopic());
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down Expand Up @@ -308,6 +314,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
if (!isEmpty(sourceConfig.getTopicName()) && !TopicName.isValid(sourceConfig.getTopicName())) {
throw new IllegalArgumentException("Topic name is invalid");
}
if (!isEmpty(sourceConfig.getLogTopic())) {
if (!TopicName.isValid(sourceConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic()));
}
}
if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Source parallelism must be a positive number");
}
Expand Down Expand Up @@ -434,6 +446,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
if (newConfig.getSecrets() != null) {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Expand Up @@ -136,6 +136,7 @@ public void testConvertBackFidelity() throws IOException {

sinkConfig.setTransformFunction("builtin://transform");
sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
sinkConfig.setLogTopic("log-topic");

Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType());
Expand Down Expand Up @@ -522,6 +523,22 @@ public void testMergeDifferentTransformFunctionConfig() {
);
}

@Test
public void testMergeDifferentLogTopic() {
SinkConfig sinkConfig = createSinkConfig();
SinkConfig newSinkConfig = createUpdatedSinkConfig("logTopic", "Different");
SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
assertEquals(
mergedConfig.getLogTopic(),
"Different"
);
mergedConfig.setLogTopic(sinkConfig.getLogTopic());
assertEquals(
new Gson().toJson(sinkConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testValidateConfig() {
SinkConfig sinkConfig = createSinkConfig();
Expand Down Expand Up @@ -559,6 +576,7 @@ private SinkConfig createSinkConfig() {
sinkConfig.setTransformFunction("builtin://transform");
sinkConfig.setTransformFunctionClassName("Transform");
sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
sinkConfig.setLogTopic("log-topic");
return sinkConfig;
}

Expand Down
Expand Up @@ -309,6 +309,22 @@ public void testMergeDifferentProducerConfig() {
);
}

@Test
public void testMergeDifferentLogTopic() {
SourceConfig sourceConfig = createSourceConfig();
SourceConfig newSourceConfig = createUpdatedSourceConfig("logTopic", "Different");
SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
assertEquals(
mergedConfig.getLogTopic(),
"Different"
);
mergedConfig.setLogTopic(sourceConfig.getLogTopic());
assertEquals(
new Gson().toJson(sourceConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testValidateConfig() {
SourceConfig sourceConfig = createSourceConfig();
Expand Down Expand Up @@ -399,6 +415,7 @@ private SourceConfig createSourceConfig() {
sourceConfig.setProducerConfig(producerConfig);

sourceConfig.setConfigs(configs);
sourceConfig.setLogTopic("log-topic");
return sourceConfig;
}

Expand Down

0 comments on commit 392549c

Please sign in to comment.