New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2794]: cst: decouple chunk download from consumers #18278
base: dev
Are you sure you want to change the base?
[CORE-2794]: cst: decouple chunk download from consumers #18278
Conversation
6b256bf
to
9343667
Compare
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aef6-4323-9b5e-5b09982230fd:
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aef8-43b7-8c71-890570e65fbc:
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aefa-4a5a-b019-0c6763d417a0:
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69fc-4f64-b39e-abb593882d2d:
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69ff-43b1-b891-910b09e05453:
new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69fa-4b74-827c-055186b505cc:
new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f590c-83eb-41a6-82f3-0db3549a8532:
new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f5920-b310-4f1d-a946-a24e1b60eb34:
new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f590c-83e9-401c-885f-181ed3d14382:
|
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aef6-4323-9b5e-5b09982230fd ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69fc-4f64-b39e-abb593882d2d ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69ff-43b1-b891-910b09e05453 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48927#018f62af-da13-43cf-88f7-e2286f13c467 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48937#018f634e-bb99-4938-80f7-16266b950213 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/48989#018f70a3-44ee-4d8d-a710-11251ac79545 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/49416#018f9fd9-22c8-4dc0-896b-76fc91460e3f ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/49416#018f9fd9-22c3-4686-b8c0-59c6f8489623 |
d9a9d3c
to
d33851b
Compare
e8e1b47
to
ca691f4
Compare
ca691f4
to
4014ef8
Compare
/ci-repeat |
4014ef8
to
fc99549
Compare
/ci-repeat |
/cdt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deferring full review to someone with more context
_chunk_waiters.size()); | ||
for (auto& w : _chunk_waiters) { | ||
w.promise.set_exception(ss::gate_closed_exception{}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems incorrect. Imagine a queued task T1 that is preparing to append to _chunk_waiters
. The gate closes, on the next iteration the while condition is evaluated to false and we get to this code which we execute. T1 gets scheduled and adds a new entry to _chunk_waiters
.
Even if T1 held the gate, this code is still incorrect as the is_closed() evaluates to true as soon as gate::close is called. This code must be called only after the promise returned by gate::close is fulfilled.
This should also reset _chunk_waiters
here since they were "served" now. Or, why not just call set_waiter_errors
void remote_segment::set_waiter_errors(const std::exception_ptr& err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to handle any pending waiters after the gate close future resolves, I added something similar in the chunk API in this PR
redpanda/src/v/cloud_storage/segment_chunk_api.cc
Lines 103 to 105 in fc99549
co_await _gate.close(); | |
resolve_prefetch_futures(); | |
vlog(_ctxlog.debug, "stopped segment_chunks"); |
I will look into adding a similar method to remote segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, would it makes sense to call set_waiter_errors()
in remote_segment::stop()
to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?
Or maybe it'd help if we had an abort source to block the addition of new work (i.e. check an abort source before adding to _chunk_waiters or _wait_list)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, would it makes sense to call set_waiter_errors() in remote_segment::stop() to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?
I tried this but it doesn't work, as you said the gate will never close because the task which added to chunk waiters holds it open.
I have been going through the code paths leading to this section and I don't think the code needs to change, it is correct as it is.
Consider the code flow related to chunk waiters modification
(the code at the end of cloud_storage::remote_segment::run_hydrate_bg
which cleans up waiters is referred to cleanup
)
- chunk waiters are added by either
cloud_storage::remote_segment::service_chunk_requests
orcloud_storage::remote_segment::download_chunk
- the potential bug is that
cleanup
runs but does not handle all chunk waiters, IE some are added aftercleanup
runs or even concurrently. cleanup
runs when gate is closed, IE when the call to gate close is initiated, not when the future returned bygate::close
resolves.
cloud_storage::remote_segment::service_chunk_requests
: at the point when cleanup
runs, we cannot have a concurrent execution of this method sevice_chunk_requests
- this is part of the loop we just exited, and we co_await
for the completion of cloud_storage::remote_segment::service_chunk_requests
there as the last thing in the iteration. These two code sections are run serially.
cloud_storage::remote_segment::download_chunk
: the first thing we do is try to enter the gate. At this point if the gate is closed, i.e. just the initial call to gate::close
which sets the optional inside the gate, not the future being resolved; we will not be able to enter and add the waiter. If we do enter, which means the gate was open, from the gate entry to the point where we add the waiter there is no async call, so it is not possible that we enter the gate but somehow get pre-empted before adding the waiter, and add the waiter later on after cleanup
runs.
So it looks like we cannot add new waiters after or during cleanup
, this code should handle all pending waiters.
@nvartolomei what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stared at this a bit and I think I agree that it's currently safe.
I think the general concern is around service_chunk_requests()
, which has a scheduling point between the gate hold and the addition to _chunk_waiters
. If the sequence was some:
- T1: begin call to
service_chunk_requests()
. Start holding the gate - T1: swap out
_chunk_waiters
intorequests
for processing - T2: call stop on the segment, close the gate
- TN: background loop exits, collects the remainder of
_chunk_waiters
- T1: process
requests
, some fail and are added to_chunk_waiters
perhaps that is what Nicolae's concerned about? In reality, TN is always going to come after the last step in T1 since they run from the same background fiber. Maybe that's useful context to include somewhere, or make the interface clearer e.g. by returning the failed chunk waiters in service_chunk_requests()
and making the background loop explicitly handle that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented the change to return failed requests from service_chunk_requests
and append them in the bg loop.
co_await _bg_cvar.wait( | ||
[this] { return downloads_in_progress() || _gate.is_closed(); }); | ||
// TODO prioritize "real" requests over prefetches | ||
co_await ss::max_concurrent_for_each( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this cause a lot (too much) throw away work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean because of the max_concurrent_for_each
? Or because we do not filter for chunks with empty waiters before entering the method to hydrate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we always spawn N tasks where N is number of chunks. I see that they do early returns, but ideally we would spawn just one (two—with prefetch?) why enqueue extra work to the scheduler? I'm missing something?
The chunks are 16MB? Not so many per segment so might not be that big of a deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea i guess it wouldn't be tough to get only the chunks that are download_in_progress
and then do this max_concurrent call? I suppose the condition variable condition means there definitely will be at least one waiter, but it seems like an easy win to filter up front
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a filter
|
||
auto& chunk = _chunks[start_offset]; | ||
auto& waiters = chunk.waiters; | ||
if (waiters.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this skip already hydrated chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we serve requests in segment_chunks::hydrate_chunk
which are eventually routed to this code path, we do an early return of the chunk file handle for hydrated chunks, and we only progress to enqueue the request where the chunk is not yet hydrated
redpanda/src/v/cloud_storage/segment_chunk_api.cc
Lines 131 to 137 in a9d0662
if (curr_state == chunk_state::hydrated) { | |
vassert( | |
chunk.handle, | |
"chunk state is hydrated without data file for id {}", | |
chunk_start); | |
co_return chunk.handle.value(); | |
} |
There is a vassert before setting chunk handle that the chunk should not already have a file handle associated to it, maybe there can be an added vassert that the chunk state is download_in_progress
_chunk_waiters.size()); | ||
for (auto& w : _chunk_waiters) { | ||
w.promise.set_exception(ss::gate_closed_exception{}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, would it makes sense to call set_waiter_errors()
in remote_segment::stop()
to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?
Or maybe it'd help if we had an abort source to block the addition of new work (i.e. check an abort source before adding to _chunk_waiters or _wait_list)?
co_await _bg_cvar.wait( | ||
[this] { return downloads_in_progress() || _gate.is_closed(); }); | ||
// TODO prioritize "real" requests over prefetches | ||
co_await ss::max_concurrent_for_each( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea i guess it wouldn't be tough to get only the chunks that are download_in_progress
and then do this max_concurrent call? I suppose the condition variable condition means there definitely will be at least one waiter, but it seems like an easy win to filter up front
/// If prefetch chunks are already hydrated or download is in progress, then | ||
/// we skip the download. | ||
void | ||
schedule_prefetches(chunk_start_offset_t start_offset, size_t prefetch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may be clearer renamed num_prefetches
or somesuch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
fc99549
to
606a55e
Compare
606a55e
to
abc960c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the changes here are good, but maybe we need to align on what (if anything) to do about the background fiber confusion Nicolae points out
_chunks | views::filter(has_waiters) | views::keys, | ||
_segment.concurrency(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just calling out that this is an asynchronous iteration over a member container. What guarantees its safety? Should we build a smaller container of chunk offsets and pass it to this max_concurrent_for_each call?
Looking around I think it's probably fine if we never erase or clear the _chunks container, but it can be a bit of a footgun if we don't explicitly think about it. Is that the case? Maybe worth adding a comment about why it's safe it so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _chunks
map is almost const (other than not being initialized in the c-tor), it is initialized once in start
and then never changes for the lifetime of the container, I will add a comment to clarify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing this comment?
_chunk_waiters.size()); | ||
for (auto& w : _chunk_waiters) { | ||
w.promise.set_exception(ss::gate_closed_exception{}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stared at this a bit and I think I agree that it's currently safe.
I think the general concern is around service_chunk_requests()
, which has a scheduling point between the gate hold and the addition to _chunk_waiters
. If the sequence was some:
- T1: begin call to
service_chunk_requests()
. Start holding the gate - T1: swap out
_chunk_waiters
intorequests
for processing - T2: call stop on the segment, close the gate
- TN: background loop exits, collects the remainder of
_chunk_waiters
- T1: process
requests
, some fail and are added to_chunk_waiters
perhaps that is what Nicolae's concerned about? In reality, TN is always going to come after the last step in T1 since they run from the same background fiber. Maybe that's useful context to include somewhere, or make the interface clearer e.g. by returning the failed chunk waiters in service_chunk_requests()
and making the background loop explicitly handle that case
Chunk waiters are stored in a list which is swapped before processing. Because chunk downloads take time, it is possible that new waiters may be registered while some downloads are being processed. The remote segment may receive a stop request during this interval. If this happens, the hydration loop will shut down, not processing the remaining waiters, and the waiters will prevent gate closure of remote segment, causing a hang. The workaround cancels pending downloads while exiting the background loop.
The chunk API is now structured around a background loop similar to the remote segment. When a download request for a chunk arrives, it is added to the wait list for the chunk. Then a condvar is triggered, which wakes up the bg loop and request is issued to remote segment to download the chunk. Once the chunk is downloaded, all waiters are notified, which are also notified when there is an error during download.
The tests related to chunk downloads are moved to a separate unit and cleaned up. This involves removing repetitive code, using ranges and not spinning up a new chunk API instance and instead using the one inside remote segment for tests.
The prefetch logic is changed. Instead of downloading a single byte range for chunk + prefetch and splitting them while writing to disk, we now schedule separate API calls per chunk. This enables the consumer to get unblocked faster as soon as the first chunk is downloaded.
A new test is added where some chunk downloads fail and the assertion checks if the results are correctly reflected. The supporting tooling to fail requests is also added to s3 imposter.
Three new tests are added which test the chunk hydration process while under disruptive actions. Because the bg loop structure does not yield itself to unit testing easily, we use disruptive actions to test code paths within the download loop. The following scenarios are added: * Abruptly stop remote segment during pending downloads to ensure there are no hangs and exceptions are reported correctly. * Delete random chunk files while a large number of random chunk hydrations are in progress. This emulates cache eviction and forces re-hydration. * Randomly fail HTTP requests either with retryable or non retryable errors.
Even if the test code fails, the background thread must be stopped to avoid a hung test.
abc960c
to
d087ecd
Compare
_chunks | views::filter(has_waiters) | views::keys, | ||
_segment.concurrency(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing this comment?
/// materialized. Any chunks which are downloaded successfully but fail to | ||
/// materialize are added back to the waiter list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment needs an update w.r.t being added back to the waiter list?
Chunk API is changed to use a background loop similar to remote segment. Now all chunk download requests are queued up and the loop issues requests to remote segment to download the chunk, setting the download result into the promise in wait list.
Additionally the prefetch code is changed to perform actual prefetches by scheduling chunk downloads for prefetch count. This enables the consumer to be unblocked faster, as soon as the first chunk download finishes.
Since the new structure is harder to unit test, some new tests are added to introduce disruptions such as chunk deletion, HTTP errors during the hydration process.
The next step will be to link a consumer's abort source with a chunk hydration attempt.
Backports Required
Release Notes