-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-40557: [C++] Use PutObject
request for S3 in OutputStream when only uploading small data
#41564
base: main
Are you sure you want to change the base?
Conversation
|
From the log output, it seems like the failing CI jobs are not related to this change. Correct me if I am wrong though. Should I rebase (in case the flaky tests are already fixed on main)? |
I think it’s worth rebasing to see? |
Rebased to current main, now waiting for the CI approval again :) |
Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage). |
This isn’t guaranteed with the current implementation though? Putting a part, or completing a multipart upload, can fail in various ways? An obvious one would be a checksum failure. |
My point is that if the path cannot be written to, the error happens when opening the file, not later on. |
That is true. I guess the question is if |
The API docs generally do not go into that level of detail. However, it is a general assumption that a filesystem "looks like" a local filesystem API-wise. It is also much more convenient to get an error early, than after you have already "written" multiple megabytes of data to the file. A compromise would be to add a dedicated option in |
We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior). I don't think it makes sense to explicitly issue a HeadBucket request because that will lead to minimum 4 requests with multi-part uploads then. (although we would only have 2 requests for small writes without the optimization compared to current |
That sounds reasonable to me. |
Just to note, issuing I think an optimization flag is appropriate as the behaviour is technically changing. Does it make sense to make the flag a somewhat generic one, rather than specific to this case? There are a few other optimizations that might also fall into the "more performant, slightly different semantics" category. If I was to contribute a PR to improve one of the linked areas, would we want to add a new specific flag for this case or bundle it under a single "optimized" flag? The upside would be that it becomes more configurable, whereas the downside is that the testing and support matrix explodes. Perhaps it's better to just have a single Edit: i guess this is only relevant for higher-level Python bindings, we'd still want internal flags for individual features. |
I added a |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
// So we instead default to application/octet-stream which is less misleading | ||
if (!req.ContentTypeHasBeenSet()) { | ||
req.SetContentType("application/octet-stream"); | ||
if (metadata == nullptr || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (metadata == nullptr || | |
if (!metadata || |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (metadata == nullptr || | ||
!metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) { | ||
// If we do not set anything then the SDK will default to application/xml | ||
// which confuses some tools (https://github.com/apache/arrow/issues/11934) | ||
// So we instead default to application/octet-stream which is less misleading | ||
request->SetContentType("application/octet-stream"); | ||
} else { | ||
RETURN_NOT_OK(SetObjectMetadata(metadata, request)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about swapping these clauses for easy to read?
if (metadata && metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
RETURN_NOT_OK(SetObjectMetadata(metadata, request));
} else {
request->SetContentType("application/octet-stream");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/src/arrow/filesystem/s3fs.h
Outdated
/// for latency-sensitive applications, at the cost of the OutputStream may throwing an | ||
/// exception at a later stage (i.e. at writing or closing) if e.g. the bucket does not | ||
/// exist. | ||
bool sanitize_bucket_on_open = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should make this more general, to open up other potential optimizations?
/// Whether to allow file-open methods to return before the actual open
///
/// Enabling this true may reduce the latency of `OpenInputStream`, `OpenOutpuStream`,
/// and similar methods, by reducing the number of roundtrips necessary. It may also
/// allow usage of more efficient S3 APIs for small files.
/// The downside is that failure conditions such as attempting to open a file in a
/// non-existing bucket will only be reported when actual I/O is done (at worse,
/// when attempting to close the file).
bool allow_delayed_open = false;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like this much more 👍
@@ -1197,6 +1199,19 @@ TEST_F(TestS3FS, OpenOutputStreamSyncWrites) { | |||
TestOpenOutputStream(); | |||
} | |||
|
|||
TEST_F(TestS3FS, OpenOutputStreamNoBucketSanitizationSyncWrites) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding more test methods for every combinatorial expansion, perhaps we should instead use a for
loop on the various tested parameter values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a struct which abstracts the parameter combinations. 7748824
cpp/src/arrow/filesystem/s3fs.cc
Outdated
@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re | |||
|
|||
template <typename ObjectRequest> | |||
struct ObjectMetadataSetter { | |||
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: naming conventions for constants
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type"; | |
static constexpr std::string_view kContentTypeKey = "Content-Type"; |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
} | ||
|
||
return Status::OK(); | ||
} | ||
|
||
Status CleanupAfterFlush() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be named CleanupAfterClose
?
cpp/src/arrow/filesystem/s3fs.cc
Outdated
@@ -1734,7 +1812,7 @@ class ObjectOutputStream final : public io::OutputStream { | |||
return Status::OK(); | |||
} | |||
|
|||
// Upload current buffer | |||
// Upload current buffer if we are above threshold for multi-part upload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is misleading: this always uploads the current buffer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, CommitCurrentPart
starts by calling CreateMultipartUpload
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I just wanted to make the point that we won't call this when we haven't accumulated enough data. I've clarified the comment. 7748824
cpp/src/arrow/filesystem/s3fs.cc
Outdated
if (current_part_ == nullptr) { | ||
// In case the stream is closed directly after it has been opened without writing | ||
// anything, we'll have to create an empty buffer. | ||
buf = Buffer::FromVector<uint8_t>({}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but std::make_shared<Buffer>("", 0)
looks simpler to me.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
template <typename RequestType, typename OutcomeType> | ||
static Result<OutcomeType> TriggerUploadRequest( | ||
const RequestType& request, const std::shared_ptr<S3ClientHolder>& holder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this declaration is actually necessary? i.e. TriggerUploadRequest
doesn't need to be a template method, it can be a regular overloaded method.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
template <typename RequestType, typename OutcomeType> | ||
using UploadResultCallbackFunction = | ||
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | ||
int32_t, OutcomeType outcome)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this signature more informative
template <typename RequestType, typename OutcomeType> | |
using UploadResultCallbackFunction = | |
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | |
int32_t, OutcomeType outcome)>; | |
template <typename RequestType, typename OutcomeType> | |
using UploadResultCallbackFunction = | |
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>, | |
int32_t part_number, OutcomeType outcome)>; |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | ||
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | ||
owned_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits: move more arguments
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | |
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | |
owned_buffer); | |
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>( | |
std::move(req), std::move(sync_result_callback), std::move(async_result_callback), | |
data, nbytes, std::move(owned_buffer)); |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
return Upload<Aws::S3::Model::UploadPartRequest, Aws::S3::Model::UploadPartOutcome>( | ||
std::move(req), sync_result_callback, async_result_callback, data, nbytes, | ||
owned_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: more arguments can be moved.
Sorry for delaying review! Would merge after other committers approve |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @OliLay ! I have some questions and suggestions below.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re | |||
|
|||
template <typename ObjectRequest> | |||
struct ObjectMetadataSetter { | |||
static constexpr std::string_view kContentTypeKey = "Content-Type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Content-Type" is still used as a literal above. Should we move this at the top-level and use it everywhere?
Or, conversely, just undo this, since the "Content-Type" literal is unlikely to change value...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the change to move it to a constant and rather have it inline now everywhere again.
// If we do not set anything then the SDK will default to application/xml | ||
// which confuses some tools (https://github.com/apache/arrow/issues/11934) | ||
// So we instead default to application/octet-stream which is less misleading | ||
request->SetContentType("application/octet-stream"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So metadata
is ignored if it doesn't contain a "Content-Type" key? Or am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I think this was an issue, I fixed it and made the code also a bit clearer.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
return Status::OK(); | ||
} | ||
|
||
// OutputStream interface | ||
|
||
bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not instead
bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; } | |
bool ShouldBeMultipartUpload() const { | |
return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_; | |
} |
cpp/src/arrow/filesystem/s3fs.cc
Outdated
bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; } | ||
|
||
bool IsMultipartUpload() const { | ||
return ShouldBeMultipartUpload() || is_multipart_created_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add is_multipart_created_
here? Is there any situation where is_multipart_created_
would be true but ShouldBeMultipartUpload()
would be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is probably no reason, I think it evolved to be like this during the PR review and also before we had the feature flag. I streamlined the handling now to use the upload id instead of this additional boolean.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state, | ||
int part_number, const S3Model::UploadPartRequest& req, | ||
const Result<S3Model::UploadPartOutcome>& result) { | ||
static Status UploadError(const Aws::S3::Model::PutObjectRequest& request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps call this UploadUsingSingleRequestError
?
if (--state->parts_in_progress == 0) { | ||
state->pending_parts_completed.MarkFinished(state->status); | ||
if (--state->uploads_in_progress == 0) { | ||
state->pending_uploads_completed.MarkFinished(state->status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
cpp/src/arrow/filesystem/s3fs.cc
Outdated
|
||
Aws::String upload_id_; | ||
bool closed_ = true; | ||
bool is_multipart_created_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, is_multipart_created_ == true
iff !upload_id_.empty()
, right? Perhaps this can be consolidated and we can rename upload_id_
to something more explicit, such as multipart_upload_id
.
void apply_to_s3_options(S3Options& options) const { | ||
options.background_writes = background_writes; | ||
options.allow_delayed_open = allow_delayed_open; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coding style nit: 1) use CamelCase
for function names, 2) avoid mutable refs; therefore:
void apply_to_s3_options(S3Options& options) const { | |
options.background_writes = background_writes; | |
options.allow_delayed_open = allow_delayed_open; | |
} | |
void ApplyToS3Options(S3Options* options) const { | |
options->background_writes = background_writes; | |
options->allow_delayed_open = allow_delayed_open; | |
} |
TEST_F(TestS3FS, OpenOutputStream) { | ||
for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { | ||
combination.apply_to_s3_options(options_); | ||
MakeFileSystem(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is deleting the files currently on the filesystem, which means the tests might succeed even if the write path doesn't work for some options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I implemented cleanup (i.e. emptying the test bucket and restoring the test files again) after each test run.
TestOpenOutputStreamDestructor(); | ||
TEST_F(TestS3FS, OpenOutputStream) { | ||
for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) { | ||
combination.apply_to_s3_options(options_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we would also leave a trace in the test log to make diagnosing failures easier (see the ARROW_SCOPED_TRACE
macro somewhere).
I also merged main into this branch due to a conflict. Should be free of conflicts now. |
Rationale for this change
See #40557. The previous implementation would always issue multi part uploads which come with 3x RTT to S3 instead of just 1x RTT with a
PutObject
request.What changes are included in this PR?
Implement logic in the S3
OutputStream
to use aPutObject
request if data is below a certain threshold (5 MB) and the output stream is closed. If more data is written, a multi part upload is triggered. Note: Previously, opening the output stream was already expensive because theCreateMultipartUpload
request was triggered then. With this change opening the output stream becomes cheap, as we rather wait until some data is written to decide which upload method to use. This required some more state-keeping in the output stream class.Are these changes tested?
No new tests were added, as there are already tests for very small writes and very large writes, which will trigger both ways of uploading. Everything should therefore be covered by existing tests.
Are there any user-facing changes?
CreateMultipartUpload
request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated). Replicating the old behavior is not possible without sending another request which defeats the purpose of this performance optimization. I hope this is fine.