Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Optimize batching / incremental progress #3089

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

Kimahriman
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Resolves #3081

Adds support for splitting an optimize run into batches with a new config spark.databricks.delta.optimize.batchSize. Batches will be created by grouping existing bins into groups until batchSize is reached. The default behavior remains the same, and batching is only enabled if the batchSize is configured.

This will apply to all optimization paths. I don't see any reason it shouldn't apply to to compaction, z-ordering, clustering, auto-compaction, or reorg/DV rewriting if a user configures it.

The way transactions are handled within the optimize executor had to be updated. Instead of creating a transaction upfront, we list all the files in the most recent snapshot, and then create transactions for each batch.

This is very important to add for clustering, as there is no way to manually do a partial set of the table using partition filtering. This could cause a lot of execution time and storage space to be wasted if something fails before optimizing the entire table finishes.

How was this patch tested?

A simple new UT is added. I can add others as well, just looking for some feedback on the approach and suggestions of what other tests to add.

Does this PR introduce any user-facing changes?

Yes, adds new capability to optimization that is disabled by default.


val filesToProcess = bins.flatMap(_._2)

txn.trackFilesRead(filesToProcess)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only difference with the existing default behavior. Only the filtered candidates are registered with the transaction, not all matching files.

val filesToProcess = bins.flatMap(_._2)

txn.trackFilesRead(filesToProcess)
txn.trackReadPredicates(partitionPredicate)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the partition predicate should still be registered even if it's just a subset of the partition that's being processed. This is already what's happening anyway with the candidates being filtered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks correct to me compared with filtering code through OptimisticTransaction

Copy link
Contributor

@dabao521 dabao521 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change, I left a few comments and we need more tests for this change.

@@ -61,17 +62,17 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
*/
def validateZorderByColumns(
spark: SparkSession,
txn: OptimisticTransaction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you delete the obsolete comment above for txn and replace with snapshot?

   * @param txn the [[OptimisticTransaction]] being used to optimize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@@ -273,34 +280,17 @@ class OptimizeExecutor(

val maxThreads =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable can be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep good catch

bins: Seq[(Map[String, String], Seq[AddFile])],
batchSize: Long)
: Seq[Seq[(Map[String, String], Seq[AddFile])]] = {
val batches = new ArrayBuffer[Seq[(Map[String, String], Seq[AddFile])]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it has multiple nested containers, can we add a named type for Seq[(Map[String, String], Seq[AddFile])] and (Map[String, String], Seq[AddFile]) to make it more readable?

Something like

case class Bin(partitionValue: Map[String, String], files: Seq[AddFile])

case class Batch(bins: Seq[Bin])

or similiar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep thought these were getting verbose. Added case classes.

val filesToProcess = bins.flatMap(_._2)

txn.trackFilesRead(filesToProcess)
txn.trackReadPredicates(partitionPredicate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks correct to me compared with filtering code through OptimisticTransaction

@@ -309,10 +299,10 @@ class OptimizeExecutor(
optimizeStats.totalConsideredFiles = candidateFiles.size
optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size
optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism
val numTableColumns = txn.snapshot.metadata.schema.size
val numTableColumns = snapshot.metadata.schema.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we rename optimizeStats.numBatches at line 298 to optimizeStats.numBins since it is not batch any more with this change? Also we probably want to add optimizeStats.numBins = jobs.size .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense, I also kept a numBatches to actually represent the number of batches? Or is that bad because it changes the meaning of the existing stat

val rows = new OptimizeExecutor(spark, txn, partitionPredicates, Seq(), true, optimizeContext)
.optimize()
val rows = new OptimizeExecutor(spark, deltaLog.update(), catalogTable, partitionPredicates,
Seq(), true, optimizeContext).optimize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know this is pre-existing, can you spell out the arguments for Seq() and true for better readablity?

.internal()
.doc(
"""
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, it's
|The size of a batch within an OPTIMIZE JOB. After a batch is complete, its

}
val batchResults = batchSize match {
case Some(size) =>
groupBinsIntoBatches(jobs, size).map(runOptimizeBatch(_, maxFileSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong. The files are binpacked in the following steps:

  1. groupFilesIntoBins: bin pack files according to OptimizeTableStrategy.maxBinSize and respecting the partition boundaries.
  2. groupBinsIntoBatches : Group multiple bins into one batch. Multiple partitioned bins can come to the same batch.

So each transaction can have data from multiple partitions. the ConflictChecker rejects one transaction if two txns writting to the same partition. It won't be a problem for those txns within the single OPTIMIZE since batches are executed in serialized order. For concurrent OPTIMIZE commands, we can consider each batch only include single partitioend data so we can minimize the chance conflict from concurrent OPTIMIZE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm didn't look at the ConflictChecker when working on this, I did have in my mind an improvement to doing multiple batches simultaneously/overlapping to prevent the tail of execution for each batch. There's no conceptual reason the commits should conflict, since they are reading specific files and not changing data. If they would conflict that might just be an improvement that should be made in the ConflictChecker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though since I'm creating a transaction off the same original snapshot, even doing them serially they should invoke the same conflict checking against themselves right? And the current simple test doesn't seem to have an issue. Will see if I encounter anything in further tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess the Optimize command does it's own conflict checking to resolve that

assert(files.values.forall(_.length == 1))
// The last 5 commits in the history should be optimize batches, one for each partition
val commits = deltaLog.history.getHistory(None)
assert(commits.filter(_.operation == "OPTIMIZE").length == optimizeCommits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a check the data before and after OPTIMIZE should be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@@ -536,6 +536,39 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
}

test("optimize command with batching") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more tests to cover:

  1. OPTIMIZE WHERE for patititioned table. This makes sure the batching works correctly with the filter.
  2. Since this change also impacts zorder by and cluster by, we need to add tests for both of them to validate batching works as expected.
  3. OPTIMIZE on an empty table. Make sure it doens't trigger any divide by zero errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I'll work on those

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request][Spark] Optimize automated batching
2 participants