Skip to content

Commit

Permalink
Resharding: producer can toggle numShards for Object types in the cou…
Browse files Browse the repository at this point in the history
…rse of a delta chain
  • Loading branch information
Sunjeet committed Jan 16, 2024
1 parent c2920b7 commit e789579
Show file tree
Hide file tree
Showing 19 changed files with 1,126 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.ProducerListenerSupport.ProducerListeners;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_TYPE_RESHARDING_INVOKED;
import static java.lang.System.currentTimeMillis;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -92,14 +93,19 @@ abstract class AbstractHollowProducer {

boolean isInitialized;

private final long targetMaxTypeShardSize;
private final boolean allowTypeResharding;
private final boolean focusHoleFillInFewestShards;


@Deprecated
public AbstractHollowProducer(
HollowProducer.Publisher publisher,
HollowProducer.Announcer announcer) {
this(new HollowFilesystemBlobStager(), publisher, announcer,
Collections.emptyList(),
new VersionMinterWithCounter(), null, 0,
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, null,
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, null,
new DummyBlobStorageCleaner(), new BasicSingleProducerEnforcer(),
null, true);
}
Expand All @@ -111,7 +117,7 @@ public AbstractHollowProducer(
this(b.stager, b.publisher, b.announcer,
b.eventListeners,
b.versionMinter, b.snapshotPublishExecutor,
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards,
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards, b.allowTypeResharding,
b.metricsCollector, b.blobStorageCleaner, b.singleProducerEnforcer,
b.hashCodeFinder, b.doIntegrityCheck);
}
Expand All @@ -126,6 +132,7 @@ private AbstractHollowProducer(
int numStatesBetweenSnapshots,
long targetMaxTypeShardSize,
boolean focusHoleFillInFewestShards,
boolean allowTypeResharding,
HollowMetricsCollector<HollowProducerMetrics> metricsCollector,
HollowProducer.BlobStorageCleaner blobStorageCleaner,
SingleProducerEnforcer singleProducerEnforcer,
Expand All @@ -140,11 +147,15 @@ private AbstractHollowProducer(
this.numStatesBetweenSnapshots = numStatesBetweenSnapshots;
this.hashCodeFinder = hashCodeFinder;
this.doIntegrityCheck = doIntegrityCheck;
this.targetMaxTypeShardSize = targetMaxTypeShardSize;
this.allowTypeResharding = allowTypeResharding;
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;

HollowWriteStateEngine writeEngine = hashCodeFinder == null
? new HollowWriteStateEngine()
: new HollowWriteStateEngine(hashCodeFinder);
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
writeEngine.allowTypeResharding(allowTypeResharding);
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);

this.objectMapper = new HollowObjectMapper(writeEngine);
Expand Down Expand Up @@ -283,6 +294,9 @@ private HollowProducer.ReadState restore(
HollowWriteStateEngine writeEngine = hashCodeFinder == null
? new HollowWriteStateEngine()
: new HollowWriteStateEngine(hashCodeFinder);
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
writeEngine.allowTypeResharding(allowTypeResharding);
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
HollowWriteStateCreator.populateStateEngineWithTypeWriteStates(writeEngine, schemas);
HollowObjectMapper newObjectMapper = new HollowObjectMapper(writeEngine);
if (hashCodeFinder != null) {
Expand Down Expand Up @@ -380,15 +394,9 @@ long runCycle(

// 3. Produce a new state if there's work to do
if (writeEngine.hasChangedSinceLastCycle()) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
!writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
if (schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
} else {
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion);

// 3a. Publish, run checks & validation, then announce new state consumers
publish(listeners, toVersion, artifacts);
Expand Down Expand Up @@ -507,6 +515,17 @@ public void removeListener(HollowProducerEventListener listener) {
listeners.removeListener(listener);
}

private void updateHeaderTags(HollowWriteStateEngine writeEngine, long toVersion, boolean schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_HASH, new HollowSchemaHash(writeEngine.getSchemas()).getHash());
if (schemaChangedFromPriorVersion) {
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE, Boolean.TRUE.toString());
} else {
writeEngine.getHeaderTags().remove(HollowStateEngine.HEADER_TAG_SCHEMA_CHANGE);
}
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(toVersion));
writeEngine.getHeaderTags().remove(HEADER_TAG_TYPE_RESHARDING_INVOKED);
}

void populate(
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ public static class Builder<B extends HollowProducer.Builder<B>> {
Executor snapshotPublishExecutor = null;
int numStatesBetweenSnapshots = 0;
boolean focusHoleFillInFewestShards = false;
boolean allowTypeResharding = false;
long targetMaxTypeShardSize = DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE;
HollowMetricsCollector<HollowProducerMetrics> metricsCollector;
BlobStorageCleaner blobStorageCleaner = new DummyBlobStorageCleaner();
Expand Down Expand Up @@ -850,11 +851,31 @@ public B withTargetMaxTypeShardSize(long targetMaxTypeShardSize) {
return (B) this;
}

/**
* Experimental: Setting this will focus the holes returned by the FreeOrdinalTracker for each state into as few shards as possible.
*
* This can be used by the consumers to reduce the work necessary to apply a delta, by skipping recreation of shards where no records are added.
*/
public B withFocusHoleFillInFewestShards(boolean focusHoleFillInFewestShards) {
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;
return (B) this;
}

/**
* Experimental: Setting this will allow producer to adjust number of shards per type in the course of a delta chain.
*
* Consumer-side delta transitions work by making a copy of one shard at a time, so the ability to accommodate more
* data in a type by growing the number of shards instead of the size of shards leads to means consumers can apply
* delta transitions with a memory overhead (equal to the configured max shard size).
*
* Requires integrity check to be enabled, and honors numShards pinned using annotation in data model.
* Also requires consumers to be on a recent Hollow library version that supports re-sharding at the time of delta application.
*/
public B withTypeResharding(boolean allowTypeResharding) {
this.allowTypeResharding = allowTypeResharding;
return (B) this;
}

public B withMetricsCollector(HollowMetricsCollector<HollowProducerMetrics> metricsCollector) {
this.metricsCollector = metricsCollector;
return (B) this;
Expand Down Expand Up @@ -887,6 +908,14 @@ public B noIntegrityCheck() {
}

protected void checkArguments() {
if (allowTypeResharding == true && doIntegrityCheck == false) { // type resharding feature rollout
throw new IllegalArgumentException("Enabling type re-sharding requires integrity check to also be enabled");
}
if (allowTypeResharding == true && focusHoleFillInFewestShards == true) { // type re-sharding feature rollout
// More thorough testing required before enabling these features to work in tandem
// simple test case for when features are allowed to work together passes, see {@code testReshardingWithFocusHoleFillInFewestShards}
throw new IllegalArgumentException("Producer does not yet support using both re-sharding and focusHoleFillInFewestShards features in tandem");
}
if (stager != null && compressor != null) {
throw new IllegalArgumentException(
"Both a custom BlobStager and BlobCompressor were specified -- please specify only one of these.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import java.time.Duration;
import java.util.Map;
import java.util.OptionalLong;

/**
Expand Down Expand Up @@ -87,9 +88,13 @@ public void onAnnouncementComplete(com.netflix.hollow.api.producer.Status status

HollowReadStateEngine stateEngine = readState.getStateEngine();
dataSizeBytes = stateEngine.calcApproxDataSize();
Map<String, Integer> numShardsPerType = stateEngine.numShardsPerType();
Map<String, Long> shardSizePerType = stateEngine.calcApproxShardSizePerType();

announcementMetricsBuilder
.setDataSizeBytes(dataSizeBytes)
.setNumShardsPerType(numShardsPerType)
.setShardSizePerType(shardSizePerType)
.setIsAnnouncementSuccess(isAnnouncementSuccess)
.setAnnouncementDurationMillis(elapsed.toMillis());
lastAnnouncementSuccessTimeNanoOptional.ifPresent(announcementMetricsBuilder::setLastAnnouncementSuccessTimeNano);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package com.netflix.hollow.api.producer.metrics;

import java.util.Map;
import java.util.OptionalLong;

public class AnnouncementMetrics {

private long dataSizeBytes; // Heap footprint of announced blob in bytes
private Map<String, Integer> numShardsPerType;
private Map<String, Long> shardSizePerType;
private long announcementDurationMillis; // Announcement duration in ms, only applicable to completed cycles (skipped cycles dont announce)
private boolean isAnnouncementSuccess; // true if announcement was successful, false if announcement failed
private OptionalLong lastAnnouncementSuccessTimeNano; // monotonic time of last successful announcement (no relation to wall clock), N/A until first successful announcement
Expand All @@ -29,6 +32,12 @@ public class AnnouncementMetrics {
public long getDataSizeBytes() {
return dataSizeBytes;
}
public Map<String, Integer> getNumShardsPerType() {
return numShardsPerType;
}
public Map<String, Long> getShardSizePerType() {
return shardSizePerType;
}
public long getAnnouncementDurationMillis() {
return announcementDurationMillis;
}
Expand All @@ -41,6 +50,8 @@ public OptionalLong getLastAnnouncementSuccessTimeNano() {

private AnnouncementMetrics(Builder builder) {
this.dataSizeBytes = builder.dataSizeBytes;
this.numShardsPerType = builder.numShardsPerType;
this.shardSizePerType = builder.shardSizePerType;
this.announcementDurationMillis = builder.announcementDurationMillis;
this.isAnnouncementSuccess = builder.isAnnouncementSuccess;
this.lastAnnouncementSuccessTimeNano = builder.lastAnnouncementSuccessTimeNano;
Expand All @@ -51,6 +62,8 @@ public static final class Builder {
private long announcementDurationMillis;
private boolean isAnnouncementSuccess;
private OptionalLong lastAnnouncementSuccessTimeNano;
private Map<String, Integer> numShardsPerType;
private Map<String, Long> shardSizePerType;

public Builder() {
lastAnnouncementSuccessTimeNano = OptionalLong.empty();
Expand All @@ -60,6 +73,14 @@ public Builder setDataSizeBytes(long dataSizeBytes) {
this.dataSizeBytes = dataSizeBytes;
return this;
}
public Builder setNumShardsPerType(Map<String, Integer> numShardsPerType) {
this.numShardsPerType = numShardsPerType;
return this;
}
public Builder setShardSizePerType(Map<String, Long> shardSizePerType) {
this.shardSizePerType = shardSizePerType;
return this;
}
public Builder setAnnouncementDurationMillis(long announcementDurationMillis) {
this.announcementDurationMillis = announcementDurationMillis;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public interface HollowStateEngine extends HollowDataset {
*/
String HEADER_TAG_SCHEMA_CHANGE = "hollow.schema.changedFromPriorVersion";

/**
* A header tag indicating that num shards for a type has changed since the prior version. Its value encodes
* the type(s) that were re-sharded along with the before and after num shards in the fwd delta direction.
* For e.g. Movie:(2,4) Actor:(8,4)
*/
String HEADER_TAG_TYPE_RESHARDING_INVOKED = "hollow.type.resharding.invoked";

/**
* A header tag containing the hash of serialized hollow schema.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,30 @@ public long calcApproxDataSize() {
.sum();
}

/**
* @return the no. of shards for each type in the read state
*/
public Map<String, Integer> numShardsPerType() {
Map<String, Integer> typeShards = new HashMap<>();
for (String type : this.getAllTypes()) {
HollowTypeReadState typeState = this.getTypeState(type);
typeShards.put(type, typeState.numShards());
}
return typeShards;
}

/**
* @return the approx heap footprint of a single shard in bytes, for each type in the read state
*/
public Map<String, Long> calcApproxShardSizePerType() {
Map<String, Long> typeShardSizes = new HashMap<>();
for (String type : this.getAllTypes()) {
HollowTypeReadState typeState = this.getTypeState(type);
typeShardSizes.put(type, typeState.getApproximateShardSizeInBytes());
}
return typeShardSizes;
}

@Override
public HollowTypeDataAccess getTypeDataAccess(String type) {
return typeStates.get(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,14 @@ public HollowTypeReadState getTypeState() {
* @return an approximate accounting of the current cost of the "ordinal holes" in this type state.
*/
public abstract long getApproximateHoleCostInBytes();


/**
* @return an approximate accounting of the current heap footprint occupied by each shard of this type state.
*/
public long getApproximateShardSizeInBytes() {
return getApproximateHeapFootprintInBytes() / numShards();
}

/**
* @return The number of shards into which this type is split. Sharding is transparent, so this has no effect on normal usage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public void run() {
partStreams.flush();
}


/**
* Serialize the changes necessary to transition a consumer from the previous state
* to the current state as a delta blob.
Expand Down Expand Up @@ -238,7 +237,7 @@ public void run() {
HollowSchema schema = typeState.getSchema();
schema.writeTo(partStream);

writeNumShards(partStream, typeState.getNumShards());
writeNumShards(partStream, typeState.getRevNumShards());

typeState.writeReverseDelta(partStream);
}
Expand Down

0 comments on commit e789579

Please sign in to comment.