Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Action to Analyze table #10288

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

karuppayya
Copy link
Contributor

This change adds a Spark action to Analyze tables.
As part of analysis, the action generates Apache data - sketch for NDV stats and writes it as puffins.

@karuppayya
Copy link
Contributor Author

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistic of the given columns and stores it as Puffin files. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalyzeTableSparkAction is a generic name, I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Either we can change the change the naming to computeNDVSketches or make it generic such that any kind of stats can be computed from this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more on this, I think we should just call it computeNDVSketches and not mix it with partition stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the model of RDMS and Engines like Trino using ANALYZE TABLE <tblName> to collect all table level stats.
With a procedure per stats model, the user have to invoke procedure/action for every stats and
also with any new stats addition, the user need to ensure to update his code to call the new procedure/action.

not mix it with partition stats.

I think we could have partition stats as a separate action since it per partition, whereas this procedure can collect top level table stats.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya
I can see the tests in TestAnalyzeTableAction, it's working fine.
But have we tested in Spark, whether its working with a query like -
"Analyze table table1 compute statistics" ?

Because generally it gives the error
"[NOT_SUPPORTED_COMMAND_FOR_V2_TABLE] ANALYZE TABLE is not supported for v2 tables."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesnot have the grammar for Analyzing tables.
This PR introduces a Spark action. In subsequent PR, i plan to introduce a iceberg procedure to invoke the Spark action.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@rice668 rice668 May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Hi, @ajantha-bhat I agree with you, otherwise, the queries would have a lot of limitations, such as being applicable only for calculating the NDV over the entire table.

For example, Trino might want to read the NDV values written by Spark to respond to queries. However, if the query has partition filter conditions, then Trino would not be able to use the pre-computed NDV information from Spark. So, what do you think ?

Copy link

@jeesou jeesou May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya , as the above discussions suggest, there can be multiple engines like spark, presto, trino etc, who might want to query of the same data right. So in such a scenario the sketches that are generated by Spark or suppose Presto, must be readable by the alternate engine right.

This question is coming because I ran one Analyze query on Presto and the puffin file it created looks like this ->

{"blobs":[{"type":"apache-datasketches-theta-v1","fields":[2],"snapshot-id":7724902347602477706,
"sequence-number":1,"offset":44,"length":40,"properties":{"ndv":"3"}}],"properties":{"created-by":"presto-testversion"}}

where as the one created by iceberg through the changes of this PR looks like this ->

{"blobs":[{"type":"apache-datasketches-theta-v1","fields":[3],
"snapshot-id":5334747061548805461,"sequence-number":1,"offset":4,"length":32}],"properties"

If seen properly the "{"ndv":"3"}" portion is missing in the iceberg change.

So can we make any modifications or any suggestions from your side may be?
Because as per my understanding the Sketch file should be universal to all engines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeesou
Yes, agreed that the sketch needs to compatible across all engines.
This PR takes care of using the same library(Apache dataasketches) as Trino does. (This was the major concern here)
Do we need to add the property ndv , should nt engines be reading the value from the sketch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm this discussion makes me wonder if we're under spec'd in this regard. According to the spec:

https://iceberg.apache.org/puffin-spec/#blob-types

The blob metadata for this blob may include following properties:
    ndv: estimate of number of distinct values, derived from the sketch.

It really seems like we should take a stance. Either it must be in the sketch or it must be in the properties. "may include" seems a little too loose.

spark(), table, columnsToBeAnalyzed.toArray(new String[0]));
table
.updateStatistics()
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if table's current snapshot has modified concurrently by another client between like 117 to line 120?


public static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
SparkSession spark, String tableName, String... columns) {
String sql = String.format("select %s from %s", String.join(",", columns), tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also think about incremental update and update sketches from previous checkpoint. Querying whole table maybe not efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, incremental need to be wired into the ends of write paths.
This procedure could exist in parallel, which could get stats of the whole table on demand.

assumeTrue(catalogName.equals("spark_catalog"));
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2')",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default format version itself v2 now. So, specifying it again is redundant.

String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID()));
OutputFile outputFile = fileIO.newOutputFile(path);
try (PuffinWriter writer =
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this name instead of "analyze table procedure".

@ajantha-bhat
Copy link
Member

there was an old PR on the same: #6582

@huaxingao
Copy link
Contributor

there was an old PR on the same: #6582

I don't have time to work on this, so karuppayya will take over. Thanks a lot @karuppayya for continuing the work.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karuppayya @huaxingao @szehon-ho this is aewsome to see! I left a review of the API/implementation, still have yet to review the tests which look to be a WIP

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these stats be a Set<StandardBlobType> instead of arbitrary Strings? I feel like the API becomes more well defined in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, StandardBlobType defines string constants not enums

Comment on lines +89 to +98
private void validateColumns() {
validateEmptyColumns();
validateTypes();
}

private void validateEmptyColumns() {
if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.isEmpty()) {
throw new ValidationException("No columns to analyze for the table", table.name());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this validation should just happen at the time of setting these on the action rather than at the execcution time.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this interface should have a snapshot API to allow users to pass in a snapshot to generate the statistics for. If it's not specified then we can generate the statistics for the latest snapshot.

Comment on lines +104 to +106
if (field == null) {
throw new ValidationException("No column with %s name in the table", columnName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after the if

SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed)
throws IOException {
Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does computeDVSketches need to be public? Seems like it can just be package private. Also nit, either way don't think you need the full qualified method name

import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;

public class ThetaSketchJavaSerializable implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Comment on lines +46 to +51
if (sketch == null) {
return null;
}
if (sketch instanceof UpdateSketch) {
return sketch.compact();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think you need the writer.finish() because the try with resources will close, and close will already finish

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means that the file will be uncompressed. I think it makes sense not to compress these files by default since the sketch will be a single long per column, so it'll be quite small already and not worth paying the price of compression/decompression.

Comment on lines +157 to +165
if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
return emptySketchWrapped;
}
if (sketch1.getSketch() == null) {
return sketch2;
}
if (sketch2.getSketch() == null) {
return sketch1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants