Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sources report their partitioning to Spark #176

Draft
wants to merge 20 commits into
base: spark-3.2
Choose a base branch
from

Conversation

EnricoMi
Copy link
Collaborator

@EnricoMi EnricoMi commented Mar 23, 2022

Having the sources report their partitioning to Spark allows Spark to exploit the existing partitioning and avoid shuffling all data for operations that require the existing partitioning.

For instance, reading triples with predicate partitioning produces a Dataset that is already partitioned by column "predicate", so a groupBy("predicate") would not need to shuffle the data at all:

import uk.co.gresearch.spark.dgraph.connector._

val target = "localhost:9080"

reader
  .option(PartitionerOption, PredicatePartitionerOption)
  .dgraph.triples(target)
  .groupBy("predicate")
  .count()

The groupBy("predicate") will not shuffle the graph data after reading from Dgraph.

The Spark plan for this Dataset is:

*(1) HashAggregate(keys=[predicate#3377], functions=[count(1)], output=[predicate#3377, count#3425L])
+- *(1) HashAggregate(keys=[predicate#3377], functions=[partial_count(1)], output=[predicate#3377, count#3440L])
   +- *(1) Project [predicate#3377]
      +- BatchScan[subject#3376L, predicate#3377, objectUid#3378L, objectString#3379, objectLong#3380L, objectDouble#3381, objectTimestamp#3382, objectBoolean#3383, objectGeo#3384, objectPassword#3385, objectType#3386] class uk.co.gresearch.spark.dgraph.connector.TripleScan

Without reporting the existing partitioning, the plan would look like:

*(2) HashAggregate(keys=[predicate#3100], functions=[count(1)], output=[predicate#3100, count#3148L])
+- Exchange hashpartitioning(predicate#3100, 2), true, [id=#1300]
   +- *(1) HashAggregate(keys=[predicate#3100], functions=[partial_count(1)], output=[predicate#3100, count#3163L])
      +- *(1) Project [predicate#3100]
         +- BatchScan[subject#3099L, predicate#3100, objectUid#3101L, objectString#3102, objectLong#3103L, objectDouble#3104, objectTimestamp#3105, objectBoolean#3106, objectGeo#3107, objectPassword#3108, objectType#3109] class uk.co.gresearch.spark.dgraph.connector.TripleScan

By reporting the partitioning, Spark remove the Exchange hashpartitioning(predicate#3100, 2), true, [id=#1300] step, as it becomes be redundant.

This refactors SingletonPartitioner to extend PredicatePartitioner but with a single partition (all predicates per partition). This allows 'NodeSourcein wide node mode to reject any predicate-partitioned partitioner while relying onSingletonPartitioner` to provide the same behaviour as PredicatePartitioner with one partition did so far.

@github-actions
Copy link

github-actions bot commented Mar 23, 2022

Unit Test Results

     832 files  +     26       832 suites  +26   34m 14s ⏱️ + 6m 8s
     513 tests +     93       513 ✔️ +     93  0 💤 ±0  0 ±0 
13 338 runs  +2 418  13 338 ✔️ +2 418  0 💤 ±0  0 ±0 

Results for commit 40599a5. ± Comparison against base commit 6c4b463.

This pull request removes 88 and adds 181 tests. Note that renamed tests count towards both.
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should encode Edge
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should fail without target
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load as a predicate partitions
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load as a single partition
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges in chunks
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via implicit dgraph target
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via implicit dgraph targets
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via path
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via paths
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via target option
…
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy clustered distribution with unknown column: [col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy partially overlapping clustered distribution: [col1,col2,col3,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy unknown distribution
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should satisfy clustered distribution with more columns: [col1,col2,col3,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should satisfy identical clustered distribution: col1,col2,col3
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy clustered distribution with unknown column: [col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy partially overlapping clustered distribution: [col1,col2,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy unknown distribution
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should satisfy clustered distribution with more columns: [col1,col2,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should satisfy identical clustered distribution: col1,col2
…

♻️ This comment has been updated with latest results.

@EnricoMi EnricoMi force-pushed the branch-report-partitioning branch 2 times, most recently from 8a53242 to 7c3cb43 Compare March 27, 2022 14:43
@EnricoMi
Copy link
Collaborator Author

With Spark 3.3, using partitioning information reported by SupportsReportPartitioning to be consider during planning requires spark.sql.sources.v2.bucketing.enabled to be true, which defaults to false:

apache/spark@20ffbf7#diff-13c5b65678b327277c68d17910ae93629801af00117a0e3da007afd95b6c6764R1337-R1341

@github-actions
Copy link

github-actions bot commented Mar 1, 2023

Test Results

     992 files  +     31       992 suites  +31   43m 30s ⏱️ + 12m 44s
     513 tests +     93       513 ✔️ +     93  0 💤 ±0  0 ±0 
15 903 runs  +2 883  15 903 ✔️ +2 883  0 💤 ±0  0 ±0 

Results for commit 9771957. ± Comparison against base commit 1429b4f.

This pull request removes 88 and adds 181 tests. Note that renamed tests count towards both.
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should encode Edge
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should fail without target
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load as a predicate partitions
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load as a single partition
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges in chunks
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via implicit dgraph target
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via implicit dgraph targets
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via path
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via paths
uk.co.gresearch.spark.dgraph.connector.sources.TestEdgeSource ‑ EdgeDataSource should load edges via target option
…
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy clustered distribution with unknown column: [col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy partially overlapping clustered distribution: [col1,col2,col3,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should not satisfy unknown distribution
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should satisfy clustered distribution with more columns: [col1,col2,col3,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2,col3] should satisfy identical clustered distribution: col1,col2,col3
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy clustered distribution with unknown column: [col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy partially overlapping clustered distribution: [col1,col2,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should not satisfy unknown distribution
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should satisfy clustered distribution with more columns: [col1,col2,col0]
uk.co.gresearch.spark.dgraph.connector.TestTripleScan ‑ TripleScan with [col1,col2] should satisfy identical clustered distribution: col1,col2
…

@EnricoMi EnricoMi marked this pull request as draft May 3, 2023 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant