Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-2794]: cst: decouple chunk download from consumers #18278

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from

Conversation

abhijat
Copy link
Contributor

@abhijat abhijat commented May 7, 2024

Chunk API is changed to use a background loop similar to remote segment. Now all chunk download requests are queued up and the loop issues requests to remote segment to download the chunk, setting the download result into the promise in wait list.

Additionally the prefetch code is changed to perform actual prefetches by scheduling chunk downloads for prefetch count. This enables the consumer to be unblocked faster, as soon as the first chunk download finishes.

Since the new structure is harder to unit test, some new tests are added to introduce disruptions such as chunk deletion, HTTP errors during the hydration process.

The next step will be to link a consumer's abort source with a chunk hydration attempt.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

  • none

@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from 6b256bf to 9343667 Compare May 7, 2024 07:07
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented May 7, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aef6-4323-9b5e-5b09982230fd:

"rptest.tests.e2e_iam_role_test.ShortLivedCredentialsTests.test_short_lived_credentials"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestWithDisruptions.test_write_with_node_failures.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=0"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_read_when_cache_smaller_than_segment_size"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_config_batches.num_messages=2.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.timequery_test.TimeQueryTest.test_timequery.cloud_storage=True.batch_cache=False.spillover=False"
"rptest.tests.timequery_test.TimeQueryTest.test_timequery_with_local_gc"
"rptest.tests.shadow_indexing_firewall_test.ShadowIndexingFirewallTest.test_consume_from_blocked_s3.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aef8-43b7-8c71-890570e65fbc:

"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_write.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndSpilloverTest.test_spillover.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.S3.test_case=.TS_Read==True.TS_ChunkedRead==True"
"rptest.tests.timequery_test.TimeQueryTest.test_timequery.cloud_storage=True.batch_cache=False.spillover=True"
"rptest.tests.delete_records_test.DeleteRecordsTest.test_delete_records_segment_deletion.cloud_storage_enabled=True.truncate_point=at_segment_boundary"

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f521e-aefa-4a5a-b019-0c6763d417a0:

"rptest.tests.read_replica_e2e_test.TestReadReplicaService.test_identical_hwms.partition_count=5.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=5"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_read_when_segment_size_smaller_than_chunk_size"
"rptest.tests.e2e_iam_role_test.AWSRoleFetchTests.test_write"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_write.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.follower_fetching_test.FollowerFetchingTest.test_basic_follower_fetching.read_from_object_store=True"
"rptest.tests.e2e_shadow_indexing_test.EndToEndSpilloverTest.test_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.full_disk_test.LogStorageMaxSizeSI.test_stay_below_target_size.log_segment_size=1048576.cleanup_policy=compact"
"rptest.tests.upgrade_test.UpgradeFromPriorFeatureVersionCloudStorageTest.test_rolling_upgrade.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69fc-4f64-b39e-abb593882d2d:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact.delete"
"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.TIRED_STORAGE.cleanup_policy=compact.delete"
"rptest.tests.cloud_retention_test.CloudRetentionTest.test_cloud_retention.max_consume_rate_mb=None.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=3"
"rptest.tests.e2e_shadow_indexing_test.ShadowIndexingWhileBusyTest.test_create_or_delete_topics_while_busy.short_retention=True.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.random_node_operations_test.RandomNodeOperationsTest.test_node_operations.enable_failures=False.num_to_upgrade=0.with_tiered_storage=True"
"rptest.tests.read_replica_e2e_test.TestReadReplicaService.test_identical_hwms.partition_count=5.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.random_node_operations_test.RandomNodeOperationsTest.test_node_operations.enable_failures=True.num_to_upgrade=0.with_tiered_storage=True"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore.message_size=5000.num_messages=100000.recovery_overrides=.retention.local.target.bytes.1024.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage.cleanup_policy=compact.delete"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestWithDisruptions.test_write_with_node_failures.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestCompactedTopic.test_compacting_during_leadership_transfer.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.full_disk_test.LogStorageMaxSizeSI.test_stay_below_target_size.log_segment_size=1048576.cleanup_policy=delete"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_aborted_tx.recovery_overrides=.retention.local.target.bytes.1024.redpanda.remote.write.True.redpanda.remote.read.True.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.shadow_indexing_tx_test.ShadowIndexingTxTest.test_txless_segments.cloud_storage_type=CloudStorageType.ABS"
"rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.ABS.test_case=.TS_Read==True.TS_ChunkedRead==True"
"rptest.tests.delete_records_test.DeleteRecordsTest.test_delete_records_segment_deletion.cloud_storage_enabled=True.truncate_point=at_segment_boundary"
"rptest.tests.shadow_indexing_firewall_test.ShadowIndexingFirewallTest.test_consume_from_blocked_s3.cloud_storage_type=CloudStorageType.ABS"

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69ff-43b1-b891-910b09e05453:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=off.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact"
"rptest.tests.e2e_shadow_indexing_test.ShadowIndexingWhileBusyTest.test_create_or_delete_topics_while_busy.short_retention=True.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestCompactedTopic.test_compacting_during_leadership_transfer.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_iam_role_test.AWSRoleFetchTests.test_write"
"rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage.cleanup_policy=delete"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestWithDisruptions.test_write_with_node_failures.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_aborted_tx.recovery_overrides=.retention.local.target.bytes.1024.redpanda.remote.write.True.redpanda.remote.read.True.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.follower_fetching_test.FollowerFetchingTest.test_basic_follower_fetching.read_from_object_store=True"
"rptest.tests.cloud_retention_test.CloudRetentionTest.test_cloud_retention.max_consume_rate_mb=None.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=5"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_read_when_segment_size_smaller_than_chunk_size"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore.message_size=5000.num_messages=100000.recovery_overrides=.retention.local.target.bytes.1024.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.shadow_indexing_tx_test.ShadowIndexingTxTest.test_txless_segments.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.timequery_test.TimeQueryTest.test_timequery.cloud_storage=True.batch_cache=False.spillover=False"
"rptest.tests.timequery_test.TimeQueryTest.test_timequery_with_local_gc"
"rptest.tests.upgrade_test.UpgradeFromPriorFeatureVersionCloudStorageTest.test_rolling_upgrade.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.shadow_indexing_firewall_test.ShadowIndexingFirewallTest.test_consume_from_blocked_s3.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.workload_upgrade_runner_test.RedpandaUpgradeTest.test_workloads_through_releases.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/48776#018f5226-69fa-4b74-827c-055186b505cc:

"rptest.tests.node_pool_migration_test.NodePoolMigrationTest.test_migrating_redpanda_nodes_to_new_pool.balancing_mode=node_add.test_mode=TestMode.FAST_MOVES.cleanup_policy=compact"
"rptest.tests.cloud_retention_test.CloudRetentionTest.test_cloud_retention.max_consume_rate_mb=20.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.ShadowIndexingWhileBusyTest.test_create_or_delete_topics_while_busy.short_retention=False.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_prefetch_chunks.prefetch=0"
"rptest.tests.cloud_storage_chunk_read_path_test.CloudStorageChunkReadTest.test_read_when_cache_smaller_than_segment_size"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_write.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage_with_partition_moves.cleanup_policy=delete"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTestCompactedTopic.test_write.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_aborted_tx.recovery_overrides=.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndShadowIndexingTest.test_reset_from_cloud.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_shadow_indexing_test.EndToEndSpilloverTest.test_spillover.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore.message_size=5000.num_messages=100000.recovery_overrides=.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.full_disk_test.LogStorageMaxSizeSI.test_stay_below_target_size.log_segment_size=1048576.cleanup_policy=compact"
"rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_config_batches.num_messages=2.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.shadow_indexing_tx_test.ShadowIndexingTxTest.test_shadow_indexing_aborted_txs.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.test_si_cache_space_leak.ShadowIndexingCacheSpaceLeakTest.test_si_cache.message_size=10000.num_messages=100000.concurrency=2"
"rptest.tests.tiered_storage_model_test.TieredStorageTest.test_tiered_storage.cloud_storage_type=CloudStorageType.S3.test_case=.TS_Read==True.TS_Timequery==True"

new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f590c-83eb-41a6-82f3-0db3549a8532:

"rptest.tests.e2e_shadow_indexing_test.EndToEndThrottlingTest.test_throttling.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f5920-b310-4f1d-a946-a24e1b60eb34:

"rptest.tests.e2e_shadow_indexing_test.EndToEndThrottlingTest.test_throttling.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/48831#018f590c-83e9-401c-885f-181ed3d14382:

"rptest.tests.e2e_shadow_indexing_test.EndToEndThrottlingTest.test_throttling.cloud_storage_type=CloudStorageType.ABS"

@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch 9 times, most recently from d9a9d3c to d33851b Compare May 9, 2024 14:24
@abhijat abhijat marked this pull request as draft May 9, 2024 14:49
@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch 4 times, most recently from e8e1b47 to ca691f4 Compare May 10, 2024 12:13
@abhijat abhijat marked this pull request as ready for review May 10, 2024 12:16
@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from ca691f4 to 4014ef8 Compare May 10, 2024 15:07
@abhijat
Copy link
Contributor Author

abhijat commented May 13, 2024

/ci-repeat

@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from 4014ef8 to fc99549 Compare May 13, 2024 15:08
@abhijat
Copy link
Contributor Author

abhijat commented May 14, 2024

/ci-repeat

@abhijat abhijat requested review from andrwng, dotnwat and Lazin May 14, 2024 13:15
@abhijat
Copy link
Contributor Author

abhijat commented May 14, 2024

/cdt

Copy link
Contributor

@nvartolomei nvartolomei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deferring full review to someone with more context

_chunk_waiters.size());
for (auto& w : _chunk_waiters) {
w.promise.set_exception(ss::gate_closed_exception{});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect. Imagine a queued task T1 that is preparing to append to _chunk_waiters. The gate closes, on the next iteration the while condition is evaluated to false and we get to this code which we execute. T1 gets scheduled and adds a new entry to _chunk_waiters.

Even if T1 held the gate, this code is still incorrect as the is_closed() evaluates to true as soon as gate::close is called. This code must be called only after the promise returned by gate::close is fulfilled.

This should also reset _chunk_waiters here since they were "served" now. Or, why not just call set_waiter_errors

void remote_segment::set_waiter_errors(const std::exception_ptr& err) {
which seems to handle all this?

Copy link
Contributor Author

@abhijat abhijat May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to handle any pending waiters after the gate close future resolves, I added something similar in the chunk API in this PR

co_await _gate.close();
resolve_prefetch_futures();
vlog(_ctxlog.debug, "stopped segment_chunks");

I will look into adding a similar method to remote segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, would it makes sense to call set_waiter_errors() in remote_segment::stop() to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?

Or maybe it'd help if we had an abort source to block the addition of new work (i.e. check an abort source before adding to _chunk_waiters or _wait_list)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, would it makes sense to call set_waiter_errors() in remote_segment::stop() to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?

I tried this but it doesn't work, as you said the gate will never close because the task which added to chunk waiters holds it open.

I have been going through the code paths leading to this section and I don't think the code needs to change, it is correct as it is.

Consider the code flow related to chunk waiters modification

(the code at the end of cloud_storage::remote_segment::run_hydrate_bg which cleans up waiters is referred to cleanup)

  1. chunk waiters are added by either cloud_storage::remote_segment::service_chunk_requests or cloud_storage::remote_segment::download_chunk
  2. the potential bug is that cleanup runs but does not handle all chunk waiters, IE some are added after cleanup runs or even concurrently.
  3. cleanup runs when gate is closed, IE when the call to gate close is initiated, not when the future returned by gate::close resolves.

cloud_storage::remote_segment::service_chunk_requests: at the point when cleanup runs, we cannot have a concurrent execution of this method sevice_chunk_requests - this is part of the loop we just exited, and we co_await for the completion of cloud_storage::remote_segment::service_chunk_requests there as the last thing in the iteration. These two code sections are run serially.

cloud_storage::remote_segment::download_chunk: the first thing we do is try to enter the gate. At this point if the gate is closed, i.e. just the initial call to gate::close which sets the optional inside the gate, not the future being resolved; we will not be able to enter and add the waiter. If we do enter, which means the gate was open, from the gate entry to the point where we add the waiter there is no async call, so it is not possible that we enter the gate but somehow get pre-empted before adding the waiter, and add the waiter later on after cleanup runs.

So it looks like we cannot add new waiters after or during cleanup, this code should handle all pending waiters.

@nvartolomei what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stared at this a bit and I think I agree that it's currently safe.

I think the general concern is around service_chunk_requests(), which has a scheduling point between the gate hold and the addition to _chunk_waiters. If the sequence was some:

  • T1: begin call to service_chunk_requests(). Start holding the gate
  • T1: swap out _chunk_waiters into requests for processing
  • T2: call stop on the segment, close the gate
  • TN: background loop exits, collects the remainder of _chunk_waiters
  • T1: process requests, some fail and are added to _chunk_waiters

perhaps that is what Nicolae's concerned about? In reality, TN is always going to come after the last step in T1 since they run from the same background fiber. Maybe that's useful context to include somewhere, or make the interface clearer e.g. by returning the failed chunk waiters in service_chunk_requests() and making the background loop explicitly handle that case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented the change to return failed requests from service_chunk_requests and append them in the bg loop.

co_await _bg_cvar.wait(
[this] { return downloads_in_progress() || _gate.is_closed(); });
// TODO prioritize "real" requests over prefetches
co_await ss::max_concurrent_for_each(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this cause a lot (too much) throw away work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean because of the max_concurrent_for_each? Or because we do not filter for chunks with empty waiters before entering the method to hydrate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we always spawn N tasks where N is number of chunks. I see that they do early returns, but ideally we would spawn just one (two—with prefetch?) why enqueue extra work to the scheduler? I'm missing something?

The chunks are 16MB? Not so many per segment so might not be that big of a deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i guess it wouldn't be tough to get only the chunks that are download_in_progress and then do this max_concurrent call? I suppose the condition variable condition means there definitely will be at least one waiter, but it seems like an easy win to filter up front

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a filter


auto& chunk = _chunks[start_offset];
auto& waiters = chunk.waiters;
if (waiters.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this skip already hydrated chunks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we serve requests in segment_chunks::hydrate_chunk which are eventually routed to this code path, we do an early return of the chunk file handle for hydrated chunks, and we only progress to enqueue the request where the chunk is not yet hydrated

if (curr_state == chunk_state::hydrated) {
vassert(
chunk.handle,
"chunk state is hydrated without data file for id {}",
chunk_start);
co_return chunk.handle.value();
}

There is a vassert before setting chunk handle that the chunk should not already have a file handle associated to it, maybe there can be an added vassert that the chunk state is download_in_progress

_chunk_waiters.size());
for (auto& w : _chunk_waiters) {
w.promise.set_exception(ss::gate_closed_exception{});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, would it makes sense to call set_waiter_errors() in remote_segment::stop() to handle any stragglers? Or would that end up hanging because the in-flight hydrations wouldn't complete?

Or maybe it'd help if we had an abort source to block the addition of new work (i.e. check an abort source before adding to _chunk_waiters or _wait_list)?

co_await _bg_cvar.wait(
[this] { return downloads_in_progress() || _gate.is_closed(); });
// TODO prioritize "real" requests over prefetches
co_await ss::max_concurrent_for_each(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i guess it wouldn't be tough to get only the chunks that are download_in_progress and then do this max_concurrent call? I suppose the condition variable condition means there definitely will be at least one waiter, but it seems like an easy win to filter up front

/// If prefetch chunks are already hydrated or download is in progress, then
/// we skip the download.
void
schedule_prefetches(chunk_start_offset_t start_offset, size_t prefetch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be clearer renamed num_prefetches or somesuch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

src/v/cloud_storage/segment_chunk_api.cc Show resolved Hide resolved
src/v/cloud_storage/segment_chunk_api.cc Show resolved Hide resolved
@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from fc99549 to 606a55e Compare May 17, 2024 08:03
@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from 606a55e to abc960c Compare May 17, 2024 08:07
Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes here are good, but maybe we need to align on what (if anything) to do about the background fiber confusion Nicolae points out

Comment on lines +198 to +258
_chunks | views::filter(has_waiters) | views::keys,
_segment.concurrency(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just calling out that this is an asynchronous iteration over a member container. What guarantees its safety? Should we build a smaller container of chunk offsets and pass it to this max_concurrent_for_each call?

Looking around I think it's probably fine if we never erase or clear the _chunks container, but it can be a bit of a footgun if we don't explicitly think about it. Is that the case? Maybe worth adding a comment about why it's safe it so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _chunks map is almost const (other than not being initialized in the c-tor), it is initialized once in start and then never changes for the lifetime of the container, I will add a comment to clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing this comment?

_chunk_waiters.size());
for (auto& w : _chunk_waiters) {
w.promise.set_exception(ss::gate_closed_exception{});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stared at this a bit and I think I agree that it's currently safe.

I think the general concern is around service_chunk_requests(), which has a scheduling point between the gate hold and the addition to _chunk_waiters. If the sequence was some:

  • T1: begin call to service_chunk_requests(). Start holding the gate
  • T1: swap out _chunk_waiters into requests for processing
  • T2: call stop on the segment, close the gate
  • TN: background loop exits, collects the remainder of _chunk_waiters
  • T1: process requests, some fail and are added to _chunk_waiters

perhaps that is what Nicolae's concerned about? In reality, TN is always going to come after the last step in T1 since they run from the same background fiber. Maybe that's useful context to include somewhere, or make the interface clearer e.g. by returning the failed chunk waiters in service_chunk_requests() and making the background loop explicitly handle that case

Chunk waiters are stored in a list which is swapped before processing.
Because chunk downloads take time, it is possible that new waiters may
be registered while some downloads are being processed. The remote
segment may receive a stop request during this interval. If this
happens, the hydration loop will shut down, not processing the
remaining waiters, and the waiters will prevent gate closure of remote
segment, causing a hang.

The workaround cancels pending downloads while exiting the background
loop.
The chunk API is now structured around a background loop similar to the
remote segment. When a download request for a chunk arrives, it is
added to the wait list for the chunk. Then a condvar is triggered,
which wakes up the bg loop and request is issued to remote segment to
download the chunk.

Once the chunk is downloaded, all waiters are notified, which are also
notified when there is an error during download.
The tests related to chunk downloads are moved to a separate unit and
cleaned up. This involves removing repetitive code, using ranges and not
spinning up a new chunk API instance and instead using the one inside
remote segment for tests.
The prefetch logic is changed. Instead of downloading a single byte
range for chunk + prefetch and splitting them while writing to disk,
we now schedule separate API calls per chunk. This enables the
consumer to get unblocked faster as soon as the first chunk is
downloaded.
A new test is added where some chunk downloads fail and the assertion
checks if the results are correctly reflected. The supporting tooling
to fail requests is also added to s3 imposter.
Three new tests are added which test the chunk hydration process while
under disruptive actions. Because the bg loop structure does not yield
itself to unit testing easily, we use disruptive actions to test code
paths within the download loop. The following scenarios are added:

* Abruptly stop remote segment during pending downloads to ensure there
  are no hangs and exceptions are reported correctly.
* Delete random chunk files while a large number of random chunk
  hydrations are in progress. This emulates cache eviction and forces
  re-hydration.
* Randomly fail HTTP requests either with retryable or non retryable
  errors.
Even if the test code fails, the background thread must be stopped to
avoid a hung test.
@abhijat abhijat force-pushed the core-2794-decouple-dl-from-clients branch from abc960c to d087ecd Compare May 22, 2024 09:07
@abhijat abhijat requested a review from andrwng May 22, 2024 13:32
Comment on lines +198 to +258
_chunks | views::filter(has_waiters) | views::keys,
_segment.concurrency(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing this comment?

Comment on lines +331 to +332
/// materialized. Any chunks which are downloaded successfully but fail to
/// materialize are added back to the waiter list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment needs an update w.r.t being added back to the waiter list?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants