Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40557: [C++] Use PutObject request for S3 in OutputStream when only uploading small data #41564

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

OliLay
Copy link

@OliLay OliLay commented May 7, 2024

Rationale for this change

See #40557. The previous implementation would always issue multi part uploads which come with 3x RTT to S3 instead of just 1x RTT with a PutObject request.

What changes are included in this PR?

Implement logic in the S3 OutputStream to use a PutObject request if data is below a certain threshold (5 MB) and the output stream is closed. If more data is written, a multi part upload is triggered. Note: Previously, opening the output stream was already expensive because the CreateMultipartUpload request was triggered then. With this change opening the output stream becomes cheap, as we rather wait until some data is written to decide which upload method to use. This required some more state-keeping in the output stream class.

Are these changes tested?

No new tests were added, as there are already tests for very small writes and very large writes, which will trigger both ways of uploading. Everything should therefore be covered by existing tests.

Are there any user-facing changes?

  • Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated). Replicating the old behavior is not possible without sending another request which defeats the purpose of this performance optimization. I hope this is fine.

Copy link

github-actions bot commented May 7, 2024

⚠️ GitHub issue #40557 has been automatically assigned in GitHub to PR creator.

@OliLay
Copy link
Author

OliLay commented May 13, 2024

From the log output, it seems like the failing CI jobs are not related to this change. Correct me if I am wrong though. Should I rebase (in case the flaky tests are already fixed on main)?

@orf
Copy link

orf commented May 13, 2024

I think it’s worth rebasing to see?

@OliLay
Copy link
Author

OliLay commented May 14, 2024

Rebased to current main, now waiting for the CI approval again :)

@mapleFU mapleFU requested review from pitrou and felipecrv May 14, 2024 12:16
cpp/src/arrow/filesystem/s3fs.cc Outdated Show resolved Hide resolved
cpp/src/arrow/filesystem/s3fs_test.cc Show resolved Hide resolved
@github-actions github-actions bot added awaiting review Awaiting review awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels May 14, 2024
@pitrou
Copy link
Member

pitrou commented May 14, 2024

Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated).

Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage).

@orf
Copy link

orf commented May 14, 2024

Previously, we would fail when opening the output stream if the bucket doesn't exist. We inferred that by sending the CreateMultipartUpload request, which we now do not send anymore upon opening the stream. We now rather fail at closing, or at writing (when >5MB have accumulated).

Hmm, I'm not sure that is ok. Usually, when opening a file for writing, you expect the initial open to fail if the path cannot be written to. I have no idea how much code relies on that, but that's a common expectation due to how filesystems usually work (e.g. when accessing local storage).

This isn’t guaranteed with the current implementation though? Putting a part, or completing a multipart upload, can fail in various ways? An obvious one would be a checksum failure.

@pitrou
Copy link
Member

pitrou commented May 14, 2024

My point is that if the path cannot be written to, the error happens when opening the file, not later on.

@OliLay
Copy link
Author

OliLay commented May 14, 2024

My point is that if the path cannot be written to, the error happens when opening the file, not later on.

That is true. I guess the question is if arrow's OutputStream API makes an explicit guarantee that Open should throw if the target does not exist. My guess would be that you shouldn't built code upon this assumption if it isn't explicitly stated in arrow's API/docs (which it is not), but of course real-world usage deviates from that (Hyrum's Law).
But checking if the bucket exists would at least come with another 1x RTT to S3 and the goal of the PR was to reduce the amount of blocking calls to S3 to reduce overall latency. If we add another check here, we'll have a total 2x RTT to S3 for small uploads, which is better than the initial 3x RTT without this change, but still not optimal from a performance-view. (and we would probably have 4x RTT for multipart uploads)

@pitrou
Copy link
Member

pitrou commented May 14, 2024

That is true. I guess the question is if arrow's OutputStream API makes an explicit guarantee that Open should throw if the target does not exist. My guess would be that you shouldn't built code upon this assumption if it isn't explicitly stated in arrow's API/docs (which it is not), but of course real-world usage deviates from that (Hyrum's Law).

The API docs generally do not go into that level of detail. However, it is a general assumption that a filesystem "looks like" a local filesystem API-wise.

It is also much more convenient to get an error early, than after you have already "written" multiple megabytes of data to the file.

A compromise would be to add a dedicated option in S3Options, but of course the optimization would only benefit those users that enable the option.

@OliLay
Copy link
Author

OliLay commented May 14, 2024

A compromise would be to add a dedicated option in S3Options, but of course the optimization would only benefit those users that enable the option.

We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior). I don't think it makes sense to explicitly issue a HeadBucket request because that will lead to minimum 4 requests with multi-part uploads then. (although we would only have 2 requests for small writes without the optimization compared to current main)
What do you think?

@pitrou
Copy link
Member

pitrou commented May 14, 2024

We can do that. I would propose that if the optimization is disabled, we directly use multi-part uploads (basically replicating the old behavior).

That sounds reasonable to me.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review awaiting committer review Awaiting committer review labels May 14, 2024
@orf
Copy link

orf commented May 14, 2024

Just to note, issuing HeadBucket doesn't guarantee that a write will succeed - there isn't really a good way to check without actually writing. A HeadObject on the key and failing on any 403 is probably ok though? However there are valid cases where you'd want to write to a key that your principal is not able to read from. HeadBucket also requires full s3:ListBucket permissions, policies that restrict listing to specific prefixes would need to be updated.

I think an optimization flag is appropriate as the behaviour is technically changing. Does it make sense to make the flag a somewhat generic one, rather than specific to this case?

There are a few other optimizations that might also fall into the "more performant, slightly different semantics" category. If I was to contribute a PR to improve one of the linked areas, would we want to add a new specific flag for this case or bundle it under a single "optimized" flag?

The upside would be that it becomes more configurable, whereas the downside is that the testing and support matrix explodes. Perhaps it's better to just have a single optimized=True flag, vs receiving bug reports when specifically optimize_put_object=True, optimize_delete_dir=False, optimize_move=True, optimize_delete=False, optimize_ensure_parents_exist=True, optimize_foobar=True are set?

Edit: i guess this is only relevant for higher-level Python bindings, we'd still want internal flags for individual features.

@OliLay
Copy link
Author

OliLay commented May 15, 2024

I added a sanitize_bucket_on_open_ flag to the S3Options, adjusted the logic and also instantiated tests with this flag enabled.
I guess the Python bindings can be tackled in a separate PR, right?

// So we instead default to application/octet-stream which is less misleading
if (!req.ContentTypeHasBeenSet()) {
req.SetContentType("application/octet-stream");
if (metadata == nullptr ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (metadata == nullptr ||
if (!metadata ||

Comment on lines 1602 to 1610
if (metadata == nullptr ||
!metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
// If we do not set anything then the SDK will default to application/xml
// which confuses some tools (https://github.com/apache/arrow/issues/11934)
// So we instead default to application/octet-stream which is less misleading
request->SetContentType("application/octet-stream");
} else {
RETURN_NOT_OK(SetObjectMetadata(metadata, request));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about swapping these clauses for easy to read?

if (metadata && metadata->Contains(ObjectMetadataSetter<ObjectRequest>::CONTENT_TYPE_KEY)) {
  RETURN_NOT_OK(SetObjectMetadata(metadata, request));
} else {
  request->SetContentType("application/octet-stream");
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels May 15, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels May 16, 2024
/// for latency-sensitive applications, at the cost of the OutputStream may throwing an
/// exception at a later stage (i.e. at writing or closing) if e.g. the bucket does not
/// exist.
bool sanitize_bucket_on_open = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should make this more general, to open up other potential optimizations?

  /// Whether to allow file-open methods to return before the actual open
  ///
  /// Enabling this true may reduce the latency of `OpenInputStream`, `OpenOutpuStream`,
  /// and similar methods, by reducing the number of roundtrips necessary. It may also
  /// allow usage of more efficient S3 APIs for small files.
  /// The downside is that failure conditions such as attempting to open a file in a
  /// non-existing bucket will only be reported when actual I/O is done (at worse,
  /// when attempting to close the file).
  bool allow_delayed_open = false;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like this much more 👍

@@ -1197,6 +1199,19 @@ TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
TestOpenOutputStream();
}

TEST_F(TestS3FS, OpenOutputStreamNoBucketSanitizationSyncWrites) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding more test methods for every combinatorial expansion, perhaps we should instead use a for loop on the various tested parameter values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a struct which abstracts the parameter combinations. 7748824

@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re

template <typename ObjectRequest>
struct ObjectMetadataSetter {
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: naming conventions for constants

Suggested change
static constexpr std::string_view CONTENT_TYPE_KEY = "Content-Type";
static constexpr std::string_view kContentTypeKey = "Content-Type";

}

return Status::OK();
}

Status CleanupAfterFlush() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named CleanupAfterClose?

@@ -1734,7 +1812,7 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

// Upload current buffer
// Upload current buffer if we are above threshold for multi-part upload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is misleading: this always uploads the current buffer, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, CommitCurrentPart starts by calling CreateMultipartUpload.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I just wanted to make the point that we won't call this when we haven't accumulated enough data. I've clarified the comment. 7748824

if (current_part_ == nullptr) {
// In case the stream is closed directly after it has been opened without writing
// anything, we'll have to create an empty buffer.
buf = Buffer::FromVector<uint8_t>({});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but std::make_shared<Buffer>("", 0) looks simpler to me.

Comment on lines 1885 to 1887
template <typename RequestType, typename OutcomeType>
static Result<OutcomeType> TriggerUploadRequest(
const RequestType& request, const std::shared_ptr<S3ClientHolder>& holder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this declaration is actually necessary? i.e. TriggerUploadRequest doesn't need to be a template method, it can be a regular overloaded method.

Comment on lines 1880 to 1883
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t, OutcomeType outcome)>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this signature more informative

Suggested change
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t, OutcomeType outcome)>;
template <typename RequestType, typename OutcomeType>
using UploadResultCallbackFunction =
std::function<Status(const RequestType& request, std::shared_ptr<UploadState>,
int32_t part_number, OutcomeType outcome)>;

Comment on lines 1990 to 1992
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: move more arguments

Suggested change
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
return Upload<Aws::S3::Model::PutObjectRequest, Aws::S3::Model::PutObjectOutcome>(
std::move(req), std::move(sync_result_callback), std::move(async_result_callback),
data, nbytes, std::move(owned_buffer));

Comment on lines 2038 to 2040
return Upload<Aws::S3::Model::UploadPartRequest, Aws::S3::Model::UploadPartOutcome>(
std::move(req), sync_result_callback, async_result_callback, data, nbytes,
owned_buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: more arguments can be moved.

@OliLay OliLay requested review from kou, pitrou and mapleFU May 17, 2024 07:39
@mapleFU
Copy link
Member

mapleFU commented May 24, 2024

Sorry for delaying review! Would merge after other committers approve

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @OliLay ! I have some questions and suggestions below.

@@ -1293,12 +1295,14 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re

template <typename ObjectRequest>
struct ObjectMetadataSetter {
static constexpr std::string_view kContentTypeKey = "Content-Type";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Content-Type" is still used as a literal above. Should we move this at the top-level and use it everywhere?
Or, conversely, just undo this, since the "Content-Type" literal is unlikely to change value...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the change to move it to a constant and rather have it inline now everywhere again.

// If we do not set anything then the SDK will default to application/xml
// which confuses some tools (https://github.com/apache/arrow/issues/11934)
// So we instead default to application/octet-stream which is less misleading
request->SetContentType("application/octet-stream");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So metadata is ignored if it doesn't contain a "Content-Type" key? Or am I missing something here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think this was an issue, I fixed it and made the code also a bit clearer.

return Status::OK();
}

// OutputStream interface

bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not instead

Suggested change
bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; }
bool ShouldBeMultipartUpload() const {
return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_;
}

bool ShouldBeMultipartUpload() const { return pos_ > kMultiPartUploadThresholdSize; }

bool IsMultipartUpload() const {
return ShouldBeMultipartUpload() || is_multipart_created_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add is_multipart_created_ here? Is there any situation where is_multipart_created_ would be true but ShouldBeMultipartUpload() would be false?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is probably no reason, I think it evolved to be like this during the PR review and also before we had the feature flag. I streamlined the handling now to use the upload id instead of this additional boolean.

static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
int part_number, const S3Model::UploadPartRequest& req,
const Result<S3Model::UploadPartOutcome>& result) {
static Status UploadError(const Aws::S3::Model::PutObjectRequest& request,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps call this UploadUsingSingleRequestError?

if (--state->parts_in_progress == 0) {
state->pending_parts_completed.MarkFinished(state->status);
if (--state->uploads_in_progress == 0) {
state->pending_uploads_completed.MarkFinished(state->status);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


Aws::String upload_id_;
bool closed_ = true;
bool is_multipart_created_ = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, is_multipart_created_ == true iff !upload_id_.empty(), right? Perhaps this can be consolidated and we can rename upload_id_ to something more explicit, such as multipart_upload_id.

Comment on lines 1198 to 1201
void apply_to_s3_options(S3Options& options) const {
options.background_writes = background_writes;
options.allow_delayed_open = allow_delayed_open;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coding style nit: 1) use CamelCase for function names, 2) avoid mutable refs; therefore:

Suggested change
void apply_to_s3_options(S3Options& options) const {
options.background_writes = background_writes;
options.allow_delayed_open = allow_delayed_open;
}
void ApplyToS3Options(S3Options* options) const {
options->background_writes = background_writes;
options->allow_delayed_open = allow_delayed_open;
}

TEST_F(TestS3FS, OpenOutputStream) {
for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) {
combination.apply_to_s3_options(options_);
MakeFileSystem();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is deleting the files currently on the filesystem, which means the tests might succeed even if the write path doesn't work for some options.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I implemented cleanup (i.e. emptying the test bucket and restoring the test files again) after each test run.

TestOpenOutputStreamDestructor();
TEST_F(TestS3FS, OpenOutputStream) {
for (const auto& combination : S3OptionsTestParameters::GetCartesianProduct()) {
combination.apply_to_s3_options(options_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would also leave a trace in the test log to make diagnosing failures easier (see the ARROW_SCOPED_TRACE macro somewhere).

@OliLay OliLay requested a review from pitrou June 4, 2024 11:50
@OliLay
Copy link
Author

OliLay commented Jun 5, 2024

I also merged main into this branch due to a conflict. Should be free of conflicts now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants