-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Maintenance - MonitorSource #10308
Conversation
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
Please link the design doc in the PR description |
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
.../v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
Outdated
Show resolved
Hide resolved
.../v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
Show resolved
Hide resolved
.../v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
Outdated
Show resolved
Hide resolved
...ink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
Outdated
Show resolved
Hide resolved
...ink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkStreamingTestUtils.java
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/ManualSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor comments, otherwise LGTM
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/MonitorSource.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iceberg/flink/maintenance/operator/SingleThreadedIteratorSource.java
Outdated
Show resolved
Hide resolved
.../v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
Show resolved
Hide resolved
.../v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CollectingSink.java
Outdated
Show resolved
Hide resolved
|
||
// Add temporary dir as a warehouse location | ||
try { | ||
this.warehouse = Files.createTempDirectory("warehouse"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to pass in a @TempDir
that can auto clean up files/dirs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Junit5 mixing extensions with @TempDir
is not supported.
See: junit-team/junit5#1786
I found the proposed solution much more ugly than creating a temp dir using the java api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that the temp dir (and files under it) wasn't cleaned up at the end of the test. at least, we should consider deleteOnExit()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is taken care in the afterEach
method:
Files.walk(warehouse).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah. I missed the afterEach
part. it might still make sense to add deleteOnExit
as fallback, in case afterEach
method throws an exception before the file cleanup step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteOnExit
will only work if the directory is empty. See: https://docs.oracle.com/javase/7/docs/api/java/io/File.html#delete%28%29
I don't think it would help much adding this to the code.
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/FlinkSqlExtension.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
....19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestMonitorSource.java
Outdated
Show resolved
Hide resolved
c1c188d
to
41d1b29
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. have one minor comment and will leave the decision to you on that one
Merged to main. |
Implements a monitor source which emits TableChange events based on the new commits to the Iceberg table.
This will be used for Maintenance Task scheduling as described in the design doc. The Scheduling paragraph describes the require parameters of the TableChange object. Also the Monitor paragraph adds some more details.
Implements #10300