From 392549cf92683baebb3b0146cadc609ba394bbb3 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Wed, 13 Mar 2024 23:52:15 +0800 Subject: [PATCH] [improve][pip] PIP-339: Introducing the --log-topic Option for Pulsar Sinks and Sources (#22185) --- .../apache/pulsar/common/io/SinkConfig.java | 1 + .../apache/pulsar/common/io/SourceConfig.java | 1 + .../org/apache/pulsar/admin/cli/CmdSinks.java | 5 +++++ .../apache/pulsar/admin/cli/CmdSources.java | 5 +++++ .../functions/utils/SinkConfigUtils.java | 15 +++++++++++++++ .../functions/utils/SourceConfigUtils.java | 15 +++++++++++++++ .../functions/utils/SinkConfigUtilsTest.java | 18 ++++++++++++++++++ .../functions/utils/SourceConfigUtilsTest.java | 17 +++++++++++++++++ 8 files changed, 77 insertions(+) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 09b98249a4df1..57e67c0bcee0d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -94,4 +94,5 @@ public class SinkConfig { private String transformFunction; private String transformFunctionClassName; private String transformFunctionConfig; + private String logTopic; } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index 251e0bf810b81..1991957045752 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -72,4 +72,5 @@ public class SourceConfig { private BatchSourceConfig batchSourceConfig; // batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED private String batchBuilder; + private String logTopic; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 35dec57654144..66b2816e77705 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand { @Parameter(names = "--transform-function-config", description = "Configuration of the transform function " + "applied before the Sink") protected String transformFunctionConfig; + @Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") + protected String logTopic; protected SinkConfig sinkConfig; @@ -605,6 +607,9 @@ void processArguments() throws Exception { if (transformFunctionConfig != null) { sinkConfig.setTransformFunctionConfig(transformFunctionConfig); } + if (null != logTopic) { + sinkConfig.setLogTopic(logTopic); + } // check if configs are valid validateSinkConfigs(sinkConfig); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index ac6ff5e68453d..c94fd49d71748 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -365,6 +365,8 @@ abstract class SourceDetailsCommand extends BaseCommand { @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates " + "how the secret is fetched by the underlying secrets provider") protected String secretsString; + @Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced") + protected String logTopic; protected SourceConfig sourceConfig; @@ -500,6 +502,9 @@ void processArguments() throws Exception { } sourceConfig.setSecrets(secretsMap); } + if (null != logTopic) { + sourceConfig.setLogTopic(logTopic); + } // check if source configs are valid validateSourceConfigs(sourceConfig); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index d93676a106d9a..6631c053fac49 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -87,6 +87,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail if (sinkConfig.getName() != null) { functionDetailsBuilder.setName(sinkConfig.getName()); } + if (sinkConfig.getLogTopic() != null) { + functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic()); + } functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); if (sinkConfig.getParallelism() != null) { functionDetailsBuilder.setParallelism(sinkConfig.getParallelism()); @@ -321,6 +324,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) { sinkConfig.setRetainOrdering(false); sinkConfig.setRetainKeyOrdering(false); } + if (!isEmpty(functionDetails.getLogTopic())) { + sinkConfig.setLogTopic(functionDetails.getLogTopic()); + } sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); @@ -426,6 +432,12 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic)); } } + if (!isEmpty(sinkConfig.getLogTopic())) { + if (!TopicName.isValid(sinkConfig.getLogTopic())) { + throw new IllegalArgumentException( + String.format("LogTopic topic %s is invalid", sinkConfig.getLogTopic())); + } + } if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Sink parallelism must be a positive number"); @@ -613,6 +625,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne if (mergedConfig.getInputSpecs() == null) { mergedConfig.setInputSpecs(new HashMap<>()); } + if (!StringUtils.isEmpty(newConfig.getLogTopic())) { + mergedConfig.setLogTopic(newConfig.getLogTopic()); + } if (newConfig.getInputs() != null) { newConfig.getInputs().forEach((topicName -> { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index a6430bbea4585..692d7459268dd 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -81,6 +81,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource if (sourceConfig.getName() != null) { functionDetailsBuilder.setName(sourceConfig.getName()); } + if (sourceConfig.getLogTopic() != null) { + functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic()); + } functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); if (sourceConfig.getParallelism() != null) { functionDetailsBuilder.setParallelism(sourceConfig.getParallelism()); @@ -274,6 +277,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) { producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType())); sourceConfig.setProducerConfig(producerConfig); } + if (!isEmpty(functionDetails.getLogTopic())) { + sourceConfig.setLogTopic(functionDetails.getLogTopic()); + } if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); @@ -308,6 +314,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour if (!isEmpty(sourceConfig.getTopicName()) && !TopicName.isValid(sourceConfig.getTopicName())) { throw new IllegalArgumentException("Topic name is invalid"); } + if (!isEmpty(sourceConfig.getLogTopic())) { + if (!TopicName.isValid(sourceConfig.getLogTopic())) { + throw new IllegalArgumentException( + String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic())); + } + } if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) { throw new IllegalArgumentException("Source parallelism must be a positive number"); } @@ -434,6 +446,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon if (newConfig.getSecrets() != null) { mergedConfig.setSecrets(newConfig.getSecrets()); } + if (!StringUtils.isEmpty(newConfig.getLogTopic())) { + mergedConfig.setLogTopic(newConfig.getLogTopic()); + } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees() .equals(existingConfig.getProcessingGuarantees())) { throw new IllegalArgumentException("Processing Guarantees cannot be altered"); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 14cd77f60ff95..5c2b6d92b9366 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -136,6 +136,7 @@ public void testConvertBackFidelity() throws IOException { sinkConfig.setTransformFunction("builtin://transform"); sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}"); + sinkConfig.setLogTopic("log-topic"); Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null)); assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType()); @@ -522,6 +523,22 @@ public void testMergeDifferentTransformFunctionConfig() { ); } + @Test + public void testMergeDifferentLogTopic() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newSinkConfig = createUpdatedSinkConfig("logTopic", "Different"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals( + mergedConfig.getLogTopic(), + "Different" + ); + mergedConfig.setLogTopic(sinkConfig.getLogTopic()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + @Test public void testValidateConfig() { SinkConfig sinkConfig = createSinkConfig(); @@ -559,6 +576,7 @@ private SinkConfig createSinkConfig() { sinkConfig.setTransformFunction("builtin://transform"); sinkConfig.setTransformFunctionClassName("Transform"); sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}"); + sinkConfig.setLogTopic("log-topic"); return sinkConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index a4da4203d9641..bcf399b6da736 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -309,6 +309,22 @@ public void testMergeDifferentProducerConfig() { ); } + @Test + public void testMergeDifferentLogTopic() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newSourceConfig = createUpdatedSourceConfig("logTopic", "Different"); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig); + assertEquals( + mergedConfig.getLogTopic(), + "Different" + ); + mergedConfig.setLogTopic(sourceConfig.getLogTopic()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + @Test public void testValidateConfig() { SourceConfig sourceConfig = createSourceConfig(); @@ -399,6 +415,7 @@ private SourceConfig createSourceConfig() { sourceConfig.setProducerConfig(producerConfig); sourceConfig.setConfigs(configs); + sourceConfig.setLogTopic("log-topic"); return sourceConfig; }