Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column #3285

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented Apr 30, 2024

This closes FLINK-35272.

Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology:

SchemaTransformOp --> DataTransformOp --> SchemaOp

where schema projections are applied in SchemaTransformOp and data projection & filtering are applied in DataTransformOp. The idea is SchemaTransformOp might be embedded in Sources in the future to reduce payload data size transferred in Flink Job.

However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in DataTransformOp. See a example as follows:

# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
  - source-table: employee
    projection: id, upper(name) as newname
    filter: age > 18

Such transformation rules will fail since name and age columns are removed in SchemaTransformOp, and those data rows could not be retrieved in DataTransformOp, where the actual expression evaluation and filtering comes into effect.

This PR introduces a new design, renaming the transform topology as follows:

PreTransformOp --> PostTransformOp --> SchemaOp

where the PreTransformOp filters out columns, but only if:

  • The column is not present in projection rules
  • The column is not indirectly referenced by calculation and filtering expressions

Referenced columns will be generated with exact same order as in the original schema. All schema and data events about those temporarily-referenced columns will be omitted after PostTransformOp. For example, given the following transform rule:

# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
  - source-table: employee
    projection: id, age + 4 as newage
    filter: age > 4

PreTransformOp will yield an intermediate schema (ID INT NOT NULL, AGE INT) and corresponding trimmed data records to downstream. Calculated columns (newage here) will not be created then since they haven't been evaluated here; Unused columns (name here) will be removed as early as possible.

If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like [id, newname, __PREFIX__name, __PREFIX__age] will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so a DataChangeEvent would be like [1, null, 'Alice', 19].

Adding prefix is meant to deal with such cases:

# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
  - source-table: employee
    projection: id, upper(name) as name

Here we need to distinguish the calculated column (new) name and the referenced original column (old) name. So after the name mangling process the schema would be like: [id, name, __PREFIX__name].

Also, the filtering process is still done in PostTransformOp since user could write down a filter expression that references calculated column, but their value won't be available until PostTransformOp's evaluation. It also means in the following somewhat ambigious case:

# Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
transform:
  - source-table: employee
    projection: id, age * 2 as age
    filter: age > 18

The filtering expression is applied to the calculated age column (doubled!) instead of the original one.

Now, any calculated column referenced in filtering column will be rewritten as its original definition. For example, the following transform rule:

transform:
  - source-table: employee
    projection: id, age * 2 as newage
    filter: newage > 18

...will be rewritten as follows:

transform:
  - source-table: employee
    projection: id, age * 2 as newage
    filter: age * 2 > 18

Hence, no calculated columns need to be evaluated before filtering process.

@yuxiqian
Copy link
Contributor Author

This PR is still in very early progress, looking for @aiwenmo & @lvyanquan's comments.

@yuxiqian yuxiqian force-pushed the FLINK-35272 branch 2 times, most recently from c448e89 to 31a7c1d Compare May 6, 2024 06:06
@yuxiqian
Copy link
Contributor Author

yuxiqian commented May 6, 2024

Updated based on previous comments, cc @aiwenmo

@yuxiqian yuxiqian marked this pull request as ready for review May 6, 2024 07:19
@yuxiqian
Copy link
Contributor Author

yuxiqian commented May 7, 2024

Thanks for @aiwenmo's kindly review, addressed comments above.

@yuxiqian yuxiqian requested a review from aiwenmo May 10, 2024 07:44
…EN statement & Remove unused `containFilteredComputedColumn` field
@yuxiqian
Copy link
Contributor Author

yuxiqian commented May 11, 2024

Thanks @aiwenmo for reviewing, I've addressed your comments.

Copy link
Contributor

@aiwenmo aiwenmo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yuxiqian
Copy link
Contributor Author

cc @PatrickRen @lvyanquan

@yuxiqian yuxiqian changed the title [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants