Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery cache & friends #21888

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

sopel39
Copy link
Member

@sopel39 sopel39 commented May 9, 2024

Implement subquery cache for Hive/Iceberg/Delta

Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

@cla-bot cla-bot bot added the cla-signed label May 9, 2024
@github-actions github-actions bot added iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels May 9, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 2 times, most recently from 9f4aa11 to ad12339 Compare May 9, 2024 16:10

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove optimizedLocalScheduling from NodeSchedulerConfig ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a separate PR out of the last commit ? It's unrelated to everything else and can be landed quickly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also need to change implementation of hive.force-local-scheduling. With current changes we're effectively forcing local scheduling by default in hive (unless node went down). That's probably not the right default as it could create hot spots in the cluster. Instead of flipping isRemotelyAccessible, that flag should now change whether or not host addresses are returned by hive connector and by default we shouldn't provide addresses from HDFS.

@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from c270355 to a72b5df Compare May 15, 2024 12:16
@github-actions github-actions bot added the ui Web UI label May 21, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 4 times, most recently from f771ae0 to a0ab7fc Compare May 22, 2024 08:25
@sopel39 sopel39 marked this pull request as ready for review May 22, 2024 08:25
@sopel39 sopel39 changed the title WIP: Subquery cache & friends Subquery cache & friends May 22, 2024
@sopel39 sopel39 force-pushed the ks/subquery_cache branch 5 times, most recently from 865c615 to 74302ec Compare May 23, 2024 12:38
@deigote
Copy link
Member

deigote commented May 27, 2024

Hi 👋🏽 maybe a dumb question, but from subquery cache for Hive/Iceberg/Delta I'm not clear if this is about a subquery cache for the Hive/Iceberg/Delta connectors, or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood. I'm hoping the latter but it'd be great if you could clarify 🙏🏽 !

@sopel39
Copy link
Member Author

sopel39 commented May 27, 2024

or rather a cache for any connector that uses Hive/Iceberg/Delta under the hood.

@deigote I'm not sure what you mean by any connector that uses Hive/Iceberg/Delta under the hood. However, this PR makes subquery cache a 1st class citizen, where source of data can be from any connector as long as connector implements getCacheTableId, getCacheColumnId, getCacheSplitId

@deigote
Copy link
Member

deigote commented May 27, 2024

Thanks! That's what I was hoping for. I got confused by the PR's description saying Implement subquery cache for Hive/Iceberg/Delta which seemed to imply the PR was only about the Hive / Iceberg / Delta connectors. But what I get is with this PR:

  • Trino offers any connector the ability to leverage subquery caching
  • The Hive / Iceberg / Delta already use said ability to implement subquery caching

lukasz-stec and others added 7 commits May 27, 2024 22:36
ChooseAlternativeNode defines alternative sub-plans that can be used
to execute given part of the query.
The actual sub-plan is then chosen per split during task execution.
Alternative sub-plans cannot span multiple stages and are only supported
for source stages.

Co-authored-by: Assaf Bern <assaf.bern@starburstdata.com>
These methods are required by subquery cache to describe
split data for cache key purpose.

ConnectorPageSourceProvider#getUnenforcedPredicate
is used to describe what unenforced predicate will be
applied on split data.

ConnectorPageSourceProvider#prunePredicate is used
to simplify filter predicates on per split bases
(e.g. removing paritioning predicates that fully
contain split data)

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Dynamic row filtering performs fine-grained filtering of rows
at table scan level thus greatly improving performance of some queries.
With the new contract scheduler (pipeline or FTE) will schedule
remote accessible splits on selected nodes if such nodes are available
and only fallback to other nodes is nodes are no longer part of cluster.
Connector might have stalled node information while creating splits
which could result in selecting nodes which are now offline. Additionally,
in FTE mode nodes can go down so split addresses could no longer be valid
then task is restarted.

Additionally, this commit simplifies UniformNodeSelector optimizedLocalScheduling
which was hard to reason about and was not taking advantages of
recent improvements like adaptive split queue length.

Co-authored-by: Karol Sobczak <napewnotrafi@gmail.com>
sopel39 and others added 8 commits May 28, 2024 15:07
CacheManager is a set of SPI classes for implementing
split level cache storage.

MemoryCacheManager is a high-performance implementation of
CacheManager that keeps cached data in revocable memory.
Cache table id together with split id and column id represent
rows produced by ConnectorPageSource for a given split.

Cache ids can also be used to canonicalise query plans
for the purpouse of comparison or cache key generation.

This commit implements cache ids for Hive, Iceberg, Delta and TPCH
connectors.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Cache hit rate depend on deterministic split generation.
Hive connector has a concept of "initial splits" which
are smaller and there is a limited of them.
Therefore, if deterministic splits are
required, then initial splits must be disabled because
Hive split generation doesn't have guaranteed ordering.
Dynamic filter id might be registered by both local join
and as coming from coordinator.
CanonicalSubplanExtractor creates a canonical
representation of a subplan using cache ids
provided by the connector. Canonical subplans
are used to compare plans against each other
and enable extracting of common subplans.

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Subquery cache is a lightweight mechanism for caching
source stage computations. It works across queries, but
also within a query if similar subqueries are identified.

Subquery cache works with both streaming and FTE mode. Cache
results are never stalled since data is cached per split. Dedicated
"cache splits ids" include create time and change set
(in case of Delta/Iceberg).

Subquery cache works as follows:
1. During planning, subqueries eligible for caching
   are identified. If there are similar subqueries within
   a query, then common subplan is extracted.
2. Query plan is rewritten using caching plan alternatives
   (fallback to original subquery, cache data, load from cache)
3. SPI PlanSignatures are computed for each cached subquery
4. Splits are scheduled deterministically on nodes based on (PlanSignature, SplitId) pair
5. On the worker cache plugin (currently only memory based) will determine
   if cached data is available for a given split

Co-authored-by: Kamil Endruszkiewicz <kamil.endruszkiewicz@starburstdata.com>
Co-authored-by: radek <radoslaw.kondziolka@starburstdata.com>
Co-authored-by: lukasz-stec <lukasz.stec@starburstdata.com>
Co-authored-by: Raunaq Morarka <raunaqmorarka@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector performance ui Web UI
Development

Successfully merging this pull request may close these issues.

None yet

6 participants