Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: refactor sink shuffling statistics collection #10331

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

stevenzwu
Copy link
Contributor

to support sketch statistics and auto migration from Map stats to reservoir sampling sketch if cardinality is detected high

@stevenzwu stevenzwu force-pushed the refactor-stats branch 2 times, most recently from b97bf41 to 0882cb8 Compare May 14, 2024 00:08
@stevenzwu
Copy link
Contributor Author

stevenzwu commented May 14, 2024

Moved DataStatistics away from generic and use a type to distinguish btw Map and Sketch statistics. A couple of reasons.

  • generics getting a bit too complicated.
  • support auto migration/promotion of Map stats to Sketch if the cardinality is detected to be high. Default statistics type should be Auto. but if auto didn't work well in some cases, users can set the type to Map or Sketch explicitly.

Will add the sketch range partitioner in a separate PR following this one.

@stevenzwu stevenzwu requested a review from pvary May 14, 2024 00:28
…h statistics and auto migration from Map stats to reservoir sampling sketch if cardinality is detected high
this.dataStatistics = statisticsSerializer.createInstance();
}
private final StatisticsType type;
private final Map<SortKey, Long> keyFrequency;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine both Map and Sketch stats in the same aggregated statistics object would allow run-time switch from Map stats to Sketch.

if (record.type() == StatisticsType.Map) {
keyFrequencySerializer.serialize(record.keyFrequency(), target);
} else {
rangeBoundsSerializer.serialize(Arrays.asList(record.rangeBounds()), target);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused list serializer from Flink. paying a small penalty for array to list conversion for that.

}

@SuppressWarnings("unchecked")
private void merge(DataStatistics taskStatistics) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method shows the stats type migration from Map to Sketch


if (localStatistics.type() == StatisticsType.Map) {
Map<SortKey, Long> mapStatistics = (Map<SortKey, Long>) localStatistics.result();
if (statisticsType == StatisticsType.Auto
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is stats migration (Map -> Sketch) at operator side during collection phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With AUTO, if any task, or coordinator decides that we move to sketch then it might be a good idea for everyone to move to sketch to save memory, and transformations.
Do we want to have an extra message in this case, or at least switch when a global stat comes where we already switched to stat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question

  • each operator makes independent decision on switching from Map to Sketch during local collection phase.
  • when operators received the global statistics from coordinator, operators should also check if type switch is needed. but looks like I missed this logic. will add.

import org.apache.iceberg.relocated.com.google.common.collect.Maps;

@Internal
class DataStatisticsSerializer extends TypeSerializer<DataStatistics> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this single serializer can handle both map and sketch stats type

* DataStatisticsUtil is the utility to serialize and deserialize {@link DataStatistics} and {@link
* AggregatedStatistics}
*/
class StatisticsUtil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed from DataStatisticsUtil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comment too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataStatisticsUtil -> StatisticsUtil

@@ -66,6 +66,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.slf4j'
}

implementation libs.datasketches
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the jar file is about 1MB. so not too big to be included

Comment on lines +230 to +231
SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update);
coordinatorSketchStatistics.update(taskSketch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering which is better:

  1. Getting a map from task -> converting task map to sketch -> merging the coordinator and the map sketch
  2. Updating the coordinator sketch, by adding the values from the map directly

Which one is performing better? Which results in better approximation in the resulting sketch?

If we consciously use the 1st solution, then we probably want to send a message to the tasks when we switch to sketch to not bother sending the whole map, but just the sketch (it might be a smaller message)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once coordinator switched to sketch, all operators will switch too upon the receiving of the global statistics

Comment on lines +110 to +113
/**
* This assumes the sort keys have equal weight, which is usually the case for high-cardinality
* scenarios (like device_id, user_id, uuid etc.).
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle cases where there is a high cardinality value and many small cardinality values? Especially for cases where the high cardinality value is high enough that multiple buckets should be used to write those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great question. I assume you meant high cardinality case but key weights are not evenly distributed (like hot key or some other non-even distributions like normal distributions).

Let me explain the implementation here and the Spark Partitioner. here, the array is sorted first. then samples are split evenly into range bounds. keys with higher weight should have more weights and the range should be smaller. Spark Partitioner calculate the weights of sampled keys and take that into account when computing the range bounds, which is similar to how the MapRangePartitioner works. I didn't want to convert the array of samples (which could be large) to a map with weights for every sampled keys. I plan to run some simulation to verify how well the current approach works. if it is bad, we can evaluate and switch the map weight calculation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I have missed the sampling part.
How are we make sure that our samples are representative?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... I see, the sketch implementation has a getSamples method. Let's run some tests. I understand your reasoning here now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add the simulation tests later in a separate PR, since this PR is very big already

*
* <ul>
* <li>Pro: relatively low memory footprint for high-cardinality sort keys.
* <li>Con: memory footprint can be large if the key cardinality is high.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix the comment pls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify on what to fix here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume, you wanted to mention lower accuracy as a con for the sketch type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching it. I misread the code context earlier.

this.checkpointId = checkpointId;
this.type = type;
this.keyFrequency = keyFrequency;
this.rangeBounds = rangeBounds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add a check at here to make sure keyFrequency and rangeBounds won't have value at the same time

private final int parallelism;
private final TypeSerializer<DataStatistics> statisticsSerializer;
private final int downstreamParallelism;
private final StatisticsType statisticsType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the field is not being used

Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) taskStatistics.result();
if (coordinatorStatisticsType == StatisticsType.Map) {
taskMapStats.forEach((key, count) -> coordinatorMapStatistics.merge(key, count, Long::sum));
if (coordinatorMapStatistics.size() > switchToSketchThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for coordinator, unlike operator which needs to check if StatisticsType = Auto, we will convert it from map to sketch once the size reaches the threshold?

StatisticsEvent statisticsEvent =
StatisticsEvent.createAggregatedStatisticsEvent(
checkpointId, globalStatistics, aggregatedStatisticsSerializer);
for (int i = 0; i < context.currentParallelism(); ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a function #parallelism at line 187 to get the current parallelism. Do we want to remove the function

}

this.taskStatisticsType = StatisticsUtil.collectType(statisticsType, globalStatistics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg repo follows the type that, we always use this. to refer to the class variable? If that's the case, then let's update globalStatistics to this.globalStatistics like what we do in line 113

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for line 124 and 125

}

/**
* To understand how range bounds are used in range partitioning, heere is an example for human
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo heere to here

* <li>Target size is "coordinator reservoir size * over sampling ration (10) / operator
* parallelism"
* <li>Min is 1K to achieve good accuracy while memory footprint is still relatively small
* <li>Max is 100K to cap the memory footprint on coordinator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the current implementation, operator reservoir size depends on coordinator reservoir size completely. Do we check the operator reservoir min max value?

taskMapStats.forEach(
(sortKey, count) -> {
for (int i = 0; i < count; ++i) {
sketchConsumer.accept(sortKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider to execute the sketchConsumer.accept in parallel to make it faster

AggregatedStatistics<D, S> completedStatistics = null;
if (inProgressStatistics != null && inProgressStatistics.checkpointId() < checkpointId) {
AggregatedStatistics completedStatistics = null;
if (inProgress() && inProgressCheckpointId < checkpointId) {
Copy link
Contributor

@pvary pvary May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely comfortable with these checks.
If I understand correctly, we only aggregate the statistics for the latest checkpoint. We either emit a partial statistics for the old one, or just throw it away.

Is it so rare of an edge case for the statistics to arrive out of order? Even with unaligned checkpoints?

* @param operatorParallelism data statistics operator parallelism
* @return reservoir size
*/
static int determineOperatorReservoirSize(int operatorParallelism, int numPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the writers behave in case of an autoscaler restart?

I understand that we store the statistics in the state, and when the autoscaler restarts the job, we restore the previous statistics. Let's say we have increased our parallelism from 5 to 8. How will we distribute the records when we are still using the old statistics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we call the determineBounds with the new parallelism, and we have a slightly worse results compared to the next checkpoint when we have stats collected with the new parallelism? Or we will use the already calculated bounds, and use fewer writers to write?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

3 participants