New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink #3233
base: master
Are you sure you want to change the base?
Conversation
…e-write topology if not TwoPhaseCommittingSink
@PatrickRen , CC |
Good catch @loserwang1024! Could you please add a test case to prevent later code changes to revert this fix? |
I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one. |
Could you define your own one in the test itself? |
Done it. |
...omposer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
Outdated
Show resolved
Hide resolved
DataStreamSource<Event> inputStream = env.fromCollection(mockEvents); | ||
DataSinkTranslator translator = new DataSinkTranslator(); | ||
|
||
String uid = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe some more descriptive content, like
String uid = "Uid set by the addPreWriteTopology topology";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried it before, but it shows that :
java.lang.IllegalArgumentException: Node hash must be a 32 character String that describes a hex code.
...ser/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java
Show resolved
Hide resolved
@PatrickRen , CC, Would you like to help me review this PR? |
Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.
(ps: the modify of StarRocksUtils just apply spotless)