-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18679. Add API for bulk/paged delete of files #6726
HADOOP-18679. Add API for bulk/paged delete of files #6726
Conversation
A more minimal design that is easier to use and implement. Caller creates a BulkOperation; they get the page size of it and then submit batches to delete of less than that size. The outcome of each call contains a list of failures. S3A implementation to show how straightforward it is. Even with the single entry page size, it is still more efficient to use this as it doesn't try to recreate a parent dir or perform any probes to see if it is a directory: it maps straight to a DELETE call. Change-Id: Ibe8737e7933fe03d39070e7138a710b50c3d60c2
Add methods in FileUtil to take an FS, cast to a BulkDeleteSource then perform the pageSize/bulkDelete operations. This is to make reflection based access straightforward: no new interfaces or types to work with, just two new methods with type-erased lists. Change-Id: I2d7b1bf8198422de635253fc087ca551a8bc6609
Change-Id: Ib098c07cc1f7747ed1a3131b252656c96c520a75
Using this PR to start with the initial design, implementation and services offered by having lower-level interaction with S3 pushed down into an S3AStore class, with interface/impl split. The bulk delete callbacks now to talk to the store, *not* s3afs, with some minor changes in behaviour (IllegalArgumentException is raised if root paths / are to be deleted) Mock tests are failing; I expected that: they are always brittle. What next? get this in and then move lower level fs ops over a method calling s3client at a time, or in groups, as appropriate. The metric of success are: * all callback classes created in S3A FS can work through the store * no s3client direct invocation in S3AFS Change-Id: Ib5bc58991533fd5d13e78426e88b884b6ae5205c
Changing results of method calls, using Tuples.pair() to return Map.Entry() instances as immutable tuples. Change-Id: Ibdd5a5b11fe0a57b293b9cb623272e272c8bab69
and some minor prod changes.
4bbb3b7
to
a61f139
Compare
...-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
commented. I've also done a PR #6738 which tunes the API to work with iceberg, having just written a PoC of the iceberg binding. My PR
Can you cherrypick this PR onto your branch and then do the review comments. After which, please do not do any rebasing of your PR. That way, it is easier for me too keep my own branch in sync with your changes. Thanks. PoC of iceberg integration, based on their S3FileIO one. The iceberg api passes in a collection of paths, which may span multiple filesystems. To handle this,
|
We are going to need a default FS impl which just invokes delete(path, false) and maps any IOE to a failure. Change-Id: If56bca7cb8529ccbfbb1dfa29cedc8287ec980d4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented
*/ | ||
public class DefaultBulkDeleteOperation implements BulkDelete { | ||
|
||
private final int pageSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is always 1, isn't it? so much can be simplified here
- no need for the field
- no need to pass it in the constructor
- pageSize() to return 1
validateBulkDeletePaths(paths, pageSize, basePath); | ||
List<Map.Entry<Path, String>> result = new ArrayList<>(); | ||
// this for loop doesn't make sense as pageSize must be 1. | ||
for (Path path : paths) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's only even going to be 1 entry here
try { | ||
fs.delete(path, false); | ||
// What to do if this return false? | ||
// I think we should add the path to the result list with value "Not Deleted". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good q. I'd say yes. or actually do a getFileStatus() cal and see what is there for
- file doesn't exist (not an error)
- path is a directory: add to result
key is that file not found isn't escalated
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir); | ||
roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig); | ||
|
||
int range = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on a store where bulk delete page size == 1, use that as the range
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
Add bulk delete path capability true for all FS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy with this, just want to make sure we are happy with the fallback logic of delete() false. it's such a vague result anyway.
we should get reviews from people like iceberg devs
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
@@ -85,6 +88,9 @@ public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) { | |||
protected Configuration createConfiguration() { | |||
Configuration conf = super.createConfiguration(); | |||
S3ATestUtils.disableFilesystemCaching(conf); | |||
conf = propagateBucketOptions(conf, getTestBucketName(conf)); | |||
skipIfNotEnabled(conf, Constants.ENABLE_MULTI_DELETE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should actually be guarded with 'if (enableMultiObjectDelete)' so at least the single entry delete ops are tested with GCS. This does matter as GCS returns 404 on a DELETE nonexistent-path; we need to make sure that doesn't trigger a failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch. tested with gcs bucket as well.
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
for the iceberg support to work, all filesystems MUST implement the api or we have to modify that PoC to handle the case where they don't. I'd rather the spec says any FS which supports delete() MUST support this since it comes for free. |
iceberg poc pr apache/iceberg#10233 |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented.
- I'd prefer that BulkDeleteSource to be in FileSystem, with the base implementatoon returning a DefaultBulkDeleteOperation instance.
- and so we should have one of the contract tests doing it directly through the API on the getFileSystem() instance
- other than that though, looking really good.
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Outdated
Show resolved
Hide resolved
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
Outdated
Show resolved
Hide resolved
...common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefalutBulkDeleteSource.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java
Show resolved
Hide resolved
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented. Now, one more thing based on my changes in #6686.
it's adding many more methods to this class, so I'm giving them the name of the class/interface + "_" + method name.
can you do the same here? some style checker will complain but it will help us to separate the methods in the new class.
other than that, all good
@@ -4980,4 +4982,17 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath) | |||
methodNotSupported(); | |||
return null; | |||
} | |||
|
|||
/** | |||
* Create a default bulk delete operation to be used for any FileSystem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't hold for the subclasses. better to say
Create a bulk delete operation.
The default implementation returns an instance of {@link DefaultBulkDeleteOperation}
I don't understand what to do here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mukund-thakur tried to clarify what i mean: we have the class/interface split from the method/operation by a _ character
it makes a lot more sense when you look at my PR, which brings a lot more methods into the same class. Why all the same class? less reflection code
* @throws IllegalArgumentException path not valid. | ||
* @throws IOException problems resolving paths | ||
*/ | ||
public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename bulkDelete_pageSize
* @throws IOException IO problems including networking, authentication and more. | ||
* @throws IllegalArgumentException if a path argument is invalid. | ||
*/ | ||
public static List<Map.Entry<Path, String>> bulkDelete(FileSystem fs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename bulkDelete_delete
💔 -1 overall
This message was automatically generated. |
mukund, if you can do those naming changes then I'm +1 |
+1 I was about to merge then I realised that yetus wasn't ready. Here is my draft commit message
create a BulkDelete implementation from a This is optimized for object stores with bulk delete APIs; Even with a page size of 1, the S3A implementation is The interface BulkDeleteSource is implemented by To aid use through reflection APIs, the class Contributed by Mukund Thakur and Steve Loughran |
merged to trunk. mukund, can you do the backport to branch-3.4.1, while we can think about what to do for 3.3.9? speaking of which, we should think about that... |
fyi #6686 adds dynamic load for the wrapped methods, and calls them through the tests. |
While backporting this to branch-3.4 I see this failure. Will check if this is happening on trunk as well. |
yeah, just seen those too. the mocked s3 client isn't getting the delete one object calls via the store object. got a small PR up to help debug it, which is not a fix |
problem is store is null in innermost s3afs, triggers an NPE in deleteObject() before the aws client has its delete operation called, so list of deleted paths is not updated. easiest immediate fix is to mock deleteObject(), longer term we should actually be stubbing the entire store, as that's what the interface/impl split is meant to assist |
fix this with mockito magic somehow. https://github.com/apache/hadoop/pull/6843/files |
I tried to do it yesterday too. I think my solution would have pulled out createStore() into a method and overrode it, or added S3AInternals.setStore() call. |
update, #5081 does successfully show the staging UT failure (good!) but also some in hadoop-common due to the new interface...we need to modify the tests to indicate it is safe to not override the base implementation. |
Applications can create a BulkDelete instance from a BulkDeleteSource; the BulkDelete interface provides the pageSize(): the maximum number of entries which can be deleted, and a bulkDelete(Collection paths) method which can take a collection up to pageSize() long. This is optimized for object stores with bulk delete APIs; the S3A connector will offer the page size of fs.s3a.bulk.delete.page.size unless bulk delete has been disabled. Even with a page size of 1, the S3A implementation is more efficient than delete(path) as there are no safety checks for the path being a directory or probes for the need to recreate directories. The interface BulkDeleteSource is implemented by all FileSystem implementations, with a page size of 1 and mapped to delete(pathToDelete, false). This means that callers do not need to have special case handling for object stores versus classic filesystems. To aid use through reflection APIs, the class org.apache.hadoop.io.wrappedio.WrappedIO has been created with "reflection friendly" methods. Contributed by Mukund Thakur and Steve Loughran Conflicts: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Adding tests on top of #6494
Description of PR
How was this patch tested?
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?