Skip to content

Commit

Permalink
Prevent NPE when Incremental producer attempts to remove records from…
Browse files Browse the repository at this point in the history
… types that no longer exist
  • Loading branch information
dvehar committed Jan 10, 2024
1 parent d0e88ce commit c2920b7
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ private BitSet markTypeRecordsToRemove(HollowReadStateEngine priorStateEngine, f
private void removeRecordsFromNewState(HollowProducer.WriteState newState, Map<String, BitSet> recordsToRemove) {
for(Map.Entry<String, BitSet> removalEntry : recordsToRemove.entrySet()) {
HollowTypeWriteState writeState = newState.getStateEngine().getTypeState(removalEntry.getKey());
BitSet typeRecordsToRemove = removalEntry.getValue();
if (writeState == null) continue;

BitSet typeRecordsToRemove = removalEntry.getValue();
int ordinalToRemove = typeRecordsToRemove.nextSetBit(0);
while(ordinalToRemove != -1) {
writeState.removeOrdinalFromThisCycle(ordinalToRemove);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ public void publishAndLoadASnapshot() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, null);
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, "6");
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, null);
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, "6");

consumer.triggerRefreshTo(finalVersion);

Expand Down Expand Up @@ -271,11 +271,11 @@ public void publishAndLoadASnapshotDirectly() {
Assert.assertFalse(idx.containsDuplicates());

// backing producer was never initialized, so only records added to the incremental producer are here
assertTypeB(idx, 1, null);
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, null);
assertTypeB(idx, 4, null);
assertTypeB(idx, 5, "6");
assertTypeBOrE(idx, 1, null);
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, null);
assertTypeBOrE(idx, 4, null);
assertTypeBOrE(idx, 5, "6");

consumer.triggerRefreshTo(finalVersion);

Expand Down Expand Up @@ -330,12 +330,12 @@ public void publishDirectlyAndRestore() {
Assert.assertFalse(idx.containsDuplicates());

// backing producer was never initialized, so only records added to the incremental producer are here
assertTypeB(idx, 1, null);
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, null);
assertTypeB(idx, 4, null);
assertTypeB(idx, 5, "5");
assertTypeB(idx, 6, "6");
assertTypeBOrE(idx, 1, null);
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, null);
assertTypeBOrE(idx, 4, null);
assertTypeBOrE(idx, 5, "5");
assertTypeBOrE(idx, 6, "6");

// Create NEW incremental producer which will restore from the state left by the previous incremental producer
/// adding a new type this time (TypeB).
Expand Down Expand Up @@ -375,12 +375,12 @@ public void publishDirectlyAndRestore() {
Assert.assertFalse(idx.containsDuplicates());

// backing producer was never initialized, so only records added to the incremental producer are here
assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, "2");
assertTypeB(idx, 3, "3");
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, null);
assertTypeB(idx, 6, "6");
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, "2");
assertTypeBOrE(idx, 3, "3");
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, null);
assertTypeBOrE(idx, 6, "6");

}

Expand Down Expand Up @@ -431,7 +431,7 @@ public void populate(WriteState state) throws Exception {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 3, "three");
assertTypeBOrE(idx, 3, "three");
}

@Test
Expand All @@ -442,6 +442,7 @@ public void canRemoveAndModifyNewTypesFromRestoredState() {
long originalVersion = genesisProducer.runCycle(new Populator() {
public void populate(WriteState state) throws Exception {
state.add(new TypeA(1, "one", 1));
state.add(new TypeB(2, "two"));
}
});

Expand All @@ -452,22 +453,22 @@ public void populate(WriteState state) throws Exception {
.noIntegrityCheck()
.build();

/// adding a new type this time (TypeB).
backingProducer.initializeDataModel(TypeA.class, TypeB.class);
/// adding a new type this time (TypeE) and removing TypeB
backingProducer.initializeDataModel(TypeA.class, TypeE.class);

/// now create our HollowIncrementalProducer
HollowIncrementalProducer incrementalProducer = new HollowIncrementalProducer(backingProducer);
incrementalProducer.restore(originalVersion, blobStore);

incrementalProducer.addOrModify(new TypeA(1, "one", 2));
incrementalProducer.addOrModify(new TypeA(2, "two", 2));
incrementalProducer.addOrModify(new TypeB(3, "three"));
incrementalProducer.addOrModify(new TypeB(4, "four"));
incrementalProducer.addOrModify(new TypeA(3, "three", 3));
incrementalProducer.addOrModify(new TypeE(4, "four"));
incrementalProducer.addOrModify(new TypeE(5, "five"));

incrementalProducer.runCycle();

incrementalProducer.delete(new RecordPrimaryKey("TypeB", new Object[] { 3 }));
incrementalProducer.addOrModify(new TypeB(4, "four!"));
incrementalProducer.delete(new RecordPrimaryKey("TypeE", new Object[] { 4 }));
incrementalProducer.addOrModify(new TypeE(5, "five!"));

long version2 = incrementalProducer.runCycle();

Expand All @@ -479,17 +480,21 @@ public void populate(WriteState state) throws Exception {
Assert.assertFalse(idx.containsDuplicates());

assertTypeA(idx, 1, "one", 2L);
assertTypeA(idx, 2, "two", 2L);
assertTypeA(idx, 3, "three", 3L);

/// consumers with established data models don't have visibility into new types.
consumer = HollowConsumer.withBlobRetriever(blobStore).build();
consumer.triggerRefreshTo(version2);

idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertNotNull(consumer.getStateEngine().getTypeState("TypeA"));
Assert.assertNull(consumer.getStateEngine().getTypeState("TypeB"));
Assert.assertNotNull(consumer.getStateEngine().getTypeState("TypeE"));

idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeE", "id");
Assert.assertFalse(idx.containsDuplicates());

assertEquals(-1, idx.getMatchingOrdinal(3));
assertTypeB(idx, 4, "four!");
assertEquals(-1, idx.getMatchingOrdinal(4));
assertTypeBOrE(idx, 5, "five!");
}

@Test
Expand Down Expand Up @@ -541,7 +546,7 @@ public void populate(WriteState state) throws Exception {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 3, "three");
assertTypeBOrE(idx, 3, "three");
}

@Test(expected = RuntimeException.class)
Expand Down Expand Up @@ -627,11 +632,11 @@ public void publishUsingThreadConfig() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, null);
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, "6");
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, null);
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, "6");

consumer.triggerRefreshTo(finalVersion);

Expand Down Expand Up @@ -664,7 +669,7 @@ public void discardChanges() {

HollowPrimaryKeyIndex idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");

assertTypeB(idx, 1, "one");
assertTypeBOrE(idx, 1, "one");

incrementalProducer.delete(new TypeB(1, "one"));

Expand All @@ -680,7 +685,7 @@ public void discardChanges() {
consumer = HollowConsumer.withBlobRetriever(blobStore).build();
consumer.triggerRefreshTo(version);

assertTypeB(idx, 1, "one");
assertTypeBOrE(idx, 1, "one");

incrementalProducer.delete(new TypeB(1, "one"));

Expand All @@ -696,7 +701,7 @@ public void discardChanges() {
consumer = HollowConsumer.withBlobRetriever(blobStore).build();
consumer.triggerRefreshTo(finalVersion);

assertTypeB(idx, 1, "one");
assertTypeBOrE(idx, 1, "one");
}

@Test
Expand Down Expand Up @@ -780,11 +785,11 @@ public void addAndDeleteCollections() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, "3");
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, null);
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, "3");
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, null);

consumer.triggerRefreshTo(finalVersion);

Expand Down Expand Up @@ -849,11 +854,11 @@ public void addAndDeleteCollectionsInParallel() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, null);
assertTypeB(idx, 3, "3");
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, null);
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, null);
assertTypeBOrE(idx, 3, "3");
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, null);

consumer.triggerRefreshTo(finalVersion);

Expand Down Expand Up @@ -919,11 +924,11 @@ public void discardCollections() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, "2");
assertTypeB(idx, 3, "3");
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, null);
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, "2");
assertTypeBOrE(idx, 3, "3");
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, null);
}

@Test
Expand Down Expand Up @@ -977,11 +982,11 @@ public void discardCollectionsInParallel() {
idx = new HollowPrimaryKeyIndex(consumer.getStateEngine(), "TypeB", "id");
Assert.assertFalse(idx.containsDuplicates());

assertTypeB(idx, 1, "1");
assertTypeB(idx, 2, "2");
assertTypeB(idx, 3, "3");
assertTypeB(idx, 4, "4");
assertTypeB(idx, 5, null);
assertTypeBOrE(idx, 1, "1");
assertTypeBOrE(idx, 2, "2");
assertTypeBOrE(idx, 3, "3");
assertTypeBOrE(idx, 4, "4");
assertTypeBOrE(idx, 5, null);
}

@Test
Expand Down Expand Up @@ -1478,16 +1483,16 @@ private void assertTypeA(HollowPrimaryKeyIndex typeAIdx, int id1,
}
}

private void assertTypeB(HollowPrimaryKeyIndex typeBIdx, int id1,
String expectedValue) {
int ordinal = typeBIdx.getMatchingOrdinal(id1);
private void assertTypeBOrE(HollowPrimaryKeyIndex typeIdx, int id1,
String expectedValue) {
int ordinal = typeIdx.getMatchingOrdinal(id1);

if (expectedValue == null) {
Assert.assertEquals(-1, ordinal);
} else {
Assert.assertNotEquals(-1, ordinal);
GenericHollowObject obj = new GenericHollowObject(
typeBIdx.getTypeState(), ordinal);
typeIdx.getTypeState(), ordinal);
Assert.assertEquals(expectedValue, obj.getObject("value")
.getString("value"));
}
Expand Down Expand Up @@ -1543,6 +1548,18 @@ public TypeD(int id, String name) {
}
}

@HollowPrimaryKey(fields = "id")
private static class TypeE {
int id;
@HollowTypeName(name = "TypeEValue")
String value;

public TypeE(int id, String value) {
this.id = id;
this.value = value;
}
}

private Collection<HollowObject> getAllHollowObjects(HollowConsumer hollowConsumer, final String type) {
final HollowReadStateEngine readStateEngine = hollowConsumer.getStateEngine();
final HollowTypeDataAccess typeDataAccess = readStateEngine.getTypeDataAccess(type);
Expand Down

0 comments on commit c2920b7

Please sign in to comment.