New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column #3285
Open
yuxiqian
wants to merge
9
commits into
apache:master
Choose a base branch
from
yuxiqian:FLINK-35272
base: master
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This PR is still in very early progress, looking for @aiwenmo & @lvyanquan's comments. |
yuxiqian
force-pushed
the
FLINK-35272
branch
2 times, most recently
from
May 6, 2024 06:06
c448e89
to
31a7c1d
Compare
Updated based on previous comments, cc @aiwenmo |
aiwenmo
reviewed
May 7, 2024
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
Outdated
Show resolved
Hide resolved
Thanks for @aiwenmo's kindly review, addressed comments above. |
aiwenmo
reviewed
May 11, 2024
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
Outdated
Show resolved
Hide resolved
...me/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
Outdated
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
…EN statement & Remove unused `containFilteredComputedColumn` field
Thanks @aiwenmo for reviewing, I've addressed your comments. |
aiwenmo
approved these changes
May 12, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
yuxiqian
changed the title
[FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul
[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column
May 24, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This closes FLINK-35272.
Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology:
where schema projections are applied in
SchemaTransformOp
and data projection & filtering are applied inDataTransformOp
. The idea isSchemaTransformOp
might be embedded inSources
in the future to reduce payload data size transferred in Flink Job.However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in
DataTransformOp
. See a example as follows:Such transformation rules will fail since
name
andage
columns are removed inSchemaTransformOp
, and those data rows could not be retrieved inDataTransformOp
, where the actual expression evaluation and filtering comes into effect.This PR introduces a new design, renaming the transform topology as follows:
where the
PreTransformOp
filters out columns, but only if:Referenced columns will be generated with exact same order as in the original schema. All schema and data events about those temporarily-referenced columns will be omitted after
PostTransformOp
. For example, given the following transform rule:PreTransformOp
will yield an intermediate schema(ID INT NOT NULL, AGE INT)
and corresponding trimmed data records to downstream. Calculated columns (newage
here) will not be created then since they haven't been evaluated here; Unused columns (name
here) will be removed as early as possible.If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like[id, newname, __PREFIX__name, __PREFIX__age]
will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so aDataChangeEvent
would be like[1, null, 'Alice', 19]
.Adding prefix is meant to deal with such cases:Here we need to distinguish the calculated column(new) name
and the referenced original column(old) name
. So after the name mangling process the schema would be like:[id, name, __PREFIX__name]
.Also, the filtering process is still done inPostTransformOp
since user could write down a filter expression that references calculated column, but their value won't be available untilPostTransformOp
's evaluation. It also means in the following somewhat ambigious case:The filtering expression is applied to the calculatedage
column (doubled!) instead of the original one.Now, any calculated column referenced in filtering column will be rewritten as its original definition. For example, the following transform rule:
...will be rewritten as follows:
Hence, no calculated columns need to be evaluated before filtering process.