Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Zstd coders #5321

Merged
merged 26 commits into from
May 20, 2024
Merged

Add support for Zstd coders #5321

merged 26 commits into from
May 20, 2024

Conversation

kellen
Copy link
Contributor

@kellen kellen commented Apr 2, 2024

Adds

  • saveAsZstdDictionary to train a Zstd dictionary on some arbitrary SCollection[T]. Estimates the average size of elements T, collects n elements based on a target training set size, then trains and saves the Zstd dictionary.
  • A scala ZstdCoder object with transform Coders for the simple T or for each side of a (K, V)
  • command line argument to map from a type to a dictionary, causing instances of MyClass to get Zstd compression automagically. Probably fails if the type is parameterized. --zstdDictionary=com.spotify.scio.MyClass:gs://bucket/path/dict.bin

Copy link

codecov bot commented Apr 2, 2024

Codecov Report

Attention: Patch coverage is 58.82353% with 28 lines in your changes are missing coverage. Please review.

Project coverage is 61.08%. Comparing base (6ebf9c4) to head (60afcc4).
Report is 52 commits behind head on main.

Files Patch % Lines
...rc/main/scala/com/spotify/scio/io/ZstdDictIO.scala 0.00% 26 Missing ⚠️
...la/com/spotify/scio/coders/CoderMaterializer.scala 92.30% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5321      +/-   ##
==========================================
- Coverage   62.69%   61.08%   -1.61%     
==========================================
  Files         301      306       +5     
  Lines       10848    10993     +145     
  Branches      773      774       +1     
==========================================
- Hits         6801     6715      -86     
- Misses       4047     4278     +231     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

e
)
}
className.replaceAll("\\$", ".") -> path
Copy link
Contributor Author

@kellen kellen Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe missing some other cases here?

Current version is because Class.forName requires OuterClass$InnerClass but the typename in the coders is OuterClass.InnerClass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A blacklist based on package name may also make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added blacklist

Comment on lines 59 to 60
s.split(":", 2).toList match {
case className :: path :: Nil =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default case should catch when we have more that 1 : separator

Suggested change
s.split(":", 2).toList match {
case className :: path :: Nil =>
s.split(":") match {
case Array(className, path) =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path part can contain a :, ala gs://.

I suppose we could use a variety of other characters, e.g. ,, but : makes the most sense as a 'mapping' IMO

Comment on lines +38 to +44
private def unwrapZstd[T](options: CoderOptions, coder: BCoder[T]): BCoder[T] =
coder match {
case c: BZstdCoder[T] =>
val underlying = c.getCoderArguments.get(0).asInstanceOf[BCoder[T]]
unwrap(options, underlying)
case _ => coder
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not factorizing thin in the unwrap since it is always chained ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In e.g. getTupleCoders, we unwrap only the top-level Zstd coder for Tuple2[K, V] but want to retain any Zstd coder on K. Putting the zstd unwrapping in unwrap leads to the dict being discarded on many/every unwrap call.

Comment on lines +30 to +31
keyDict: Array[Byte] = null,
valueDict: Array[Byte] = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of null defaults.
Also, we used kv for beam.KV in many placed, not tuples. This signature is a bit confusing.

Do we actually need this API since this is a regular Tuple2Coder with implicit (un)compressed key and value coders values ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for user ergonomics if setting the coder manually.

sc.parallelize[(String, String)](
  List(...)
)(ZstdCoder.kv(valueDict = dictBytes))

Not providing this means users need to manually lift their dict into a Beam coder into a Scio coder. And using optionals here makes the API unpleasant IMO even though I in general agree with you.

Is ZstdCoder.tuple2 less annoying/confusing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm lightly in favor of tuple2 (unfortunately) -- I think kv is already semantically "taken" by Beam KV class.

// training bytes may not exceed 2GiB a.k.a. the max value of an Int
val trainingBytesTargetActual: Int = Option(trainingBytesTarget).getOrElse {
val computed =
Try(Math.multiplyExact(zstdDictSizeBytes, 100)).toOption.getOrElse(Int.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to catch this or let it fail ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm actually I am unsure of what happens in this case, like if the training size is related to the elements added or the dict size itself. 2gb / 100 implies that the max zdict dictionary size should be 20ish megs, so maybe we should throw on that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on throwing an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reading, ~100x is just a recommendation, so using fewer items would probably decrease its effectiveness but wouldn't be catastrophic. Gonna throw anyway since recommendations argue against large dictionaries.

Comment on lines +81 to +108
// estimate the sample rate we need by examining numElementsForSizeEstimation elements
val streamsCntSI = scoll.count.asSingletonSideInput(0L)
val sampleRateSI = scoll
.take(numElementsForSizeEstimation)
.map(v => toBytes(v).length)
.sum
.withSideInputs(streamsCntSI)
.map { case (totalSize, ctx) =>
val avgSize = totalSize / numElementsForSizeEstimation
val targetNumElements = trainingBytesTargetActual / avgSize
val sampleRate = targetNumElements.toDouble / ctx(streamsCntSI)
logger.info(s"Computed sample rate for Zstd dictionary: ${sampleRate}")
sampleRate
}
.toSCollection
.asSingletonSideInput

scoll
.withSideInputs(sampleRateSI)
.flatMap {
case (s, ctx) if new Random().nextDouble() <= ctx(sampleRateSI) =>
Some(toBytes(s))
case _ => None
}
.toSCollection
.keyBy(_ => ())
.groupByKey
.map { case (_, elements) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using sample with sampleSize as the result should fit in memory anyway ?
We'll just have to use Int for numElementsForSizeEstimation

Suggested change
// estimate the sample rate we need by examining numElementsForSizeEstimation elements
val streamsCntSI = scoll.count.asSingletonSideInput(0L)
val sampleRateSI = scoll
.take(numElementsForSizeEstimation)
.map(v => toBytes(v).length)
.sum
.withSideInputs(streamsCntSI)
.map { case (totalSize, ctx) =>
val avgSize = totalSize / numElementsForSizeEstimation
val targetNumElements = trainingBytesTargetActual / avgSize
val sampleRate = targetNumElements.toDouble / ctx(streamsCntSI)
logger.info(s"Computed sample rate for Zstd dictionary: ${sampleRate}")
sampleRate
}
.toSCollection
.asSingletonSideInput
scoll
.withSideInputs(sampleRateSI)
.flatMap {
case (s, ctx) if new Random().nextDouble() <= ctx(sampleRateSI) =>
Some(toBytes(s))
case _ => None
}
.toSCollection
.keyBy(_ => ())
.groupByKey
.map { case (_, elements) =>
scoll
.sample(numElementsForSizeEstimation)
.map { elements =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so users don't need to know the average size of elements in the pipeline; we estimate it based on numElementsForSizeEstimation. Taking only those elements is not useful, since you may need many more elements to actually train.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe smth like this can help #5352 ?

Copy link
Contributor Author

@kellen kellen May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I imagine doing a bunch of priority queue merges is somewhat less efficient (though more accurate) but perhaps doesn't matter in this case since we have small amounts of data in the end.

@kellen kellen merged commit fe93831 into main May 20, 2024
12 checks passed
@kellen kellen deleted the kellen/shufflezip branch May 20, 2024 19:48
clairemcginty pushed a commit that referenced this pull request May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants