Skip to content

Commit

Permalink
AVRO-2663: Record inside of Union is not resolved properly (#752)
Browse files Browse the repository at this point in the history
* [AVRO-2663] Bug with nested record

* Compare the names as well

* Apply stylecheck

* Fix some docs

* Make spotless happy

* Remove unused statement

* Fix Spotless error
  • Loading branch information
Fokko authored and RyanSkraba committed Feb 6, 2020
1 parent bc23abe commit dd62091
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 51 deletions.
2 changes: 1 addition & 1 deletion doc/src/content/mddocs/refactoring-resolution.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ writer's schema into the corresponding subtree of the reader's.

* `RecordAdjust` -- resolution involves recursively resolving the
schemas for each field, and dealing with reordering and removal
of fields. An `EnumAdjust` object contains the information
of fields. A `RecordAdjust` object contains the information
needed to do so.

* `SkipAction` -- only generated as a sub-action of a
Expand Down
91 changes: 48 additions & 43 deletions lang/java/avro/src/main/java/org/apache/avro/Resolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class Resolver {
*
* This method walks the reader's and writer's schemas together, generating an
* appropriate subclass of {@link Action} to encapsulate the information needed
* to resolve the corresponding parts of each schema tree. For convience, every
* {@link Action} object has a pointer to the corresponding parts of the
* to resolve the corresponding parts of each schema tree. For convenience,
* every {@link Action} object has a pointer to the corresponding parts of the
* reader's and writer's trees being resolved by the action. Each subclass of
* {@link Action} has additional information needed for different types of
* schema, e.g., the {@link EnumAdjust} subclass has information about
Expand Down Expand Up @@ -183,7 +183,7 @@ public DoNothing(Schema w, Schema r, GenericData d) {
* found in the data being read), then it's safe to ignore it.
*/
public static class ErrorAction extends Action {
public static enum ErrorType {
public enum ErrorType {
/**
* Use when Schema types don't match and can't be converted. For example,
* resolving "int" and "enum".
Expand Down Expand Up @@ -231,8 +231,8 @@ public String toString() {
return "Found " + writer.getFullName() + ", expecting " + reader.getFullName();

case MISSING_REQUIRED_FIELD: {
List<Field> wfields = writer.getFields();
List<Field> rfields = reader.getFields();
final List<Field> wfields = writer.getFields();
final List<Field> rfields = reader.getFields();
String fname = "<oops>";
for (Field rf : rfields) {
if (writer.getField(rf.name()) == null && rf.defaultValue() == null) {
Expand Down Expand Up @@ -580,15 +580,15 @@ private WriterUnion(Schema w, Schema r, GenericData d, boolean ue, Action[] a) {
}

public static Action resolve(Schema writeSchema, Schema readSchema, GenericData data, Map<SeenPair, Action> seen) {
boolean ueqv = unionEquiv(writeSchema, readSchema, new HashMap<>());
final List<Schema> wb = writeSchema.getTypes();
final List<Schema> rb = (ueqv ? readSchema.getTypes() : null);
int sz = wb.size();
final Action[] actions = new Action[sz];
for (int i = 0; i < sz; i++) {
actions[i] = Resolver.resolve(wb.get(i), (ueqv ? rb.get(i) : readSchema), data, seen);
boolean unionEquivalent = unionEquiv(writeSchema, readSchema, new HashMap<>());
final List<Schema> writeTypes = writeSchema.getTypes();
final List<Schema> readTypes = (unionEquivalent ? readSchema.getTypes() : null);
int writeTypeLength = writeTypes.size();
final Action[] actions = new Action[writeTypeLength];
for (int i = 0; i < writeTypeLength; i++) {
actions[i] = Resolver.resolve(writeTypes.get(i), (unionEquivalent ? readTypes.get(i) : readSchema), data, seen);
}
return new WriterUnion(writeSchema, readSchema, data, ueqv, actions);
return new WriterUnion(writeSchema, readSchema, data, unionEquivalent, actions);
}
}

Expand Down Expand Up @@ -639,22 +639,22 @@ public static Action resolve(Schema w, Schema r, GenericData d, Map<SeenPair, Ac
// the
// interest of "bug-for-bug" compatibility, we imported the old algorithm.
private static int firstMatchingBranch(Schema w, Schema r, GenericData d, Map<SeenPair, Action> seen) {
Schema.Type vt = w.getType();
final Schema.Type vt = w.getType();
// first scan for exact match
int j = 0;
int structureMatch = -1;
for (Schema b : r.getTypes()) {
if (vt == b.getType()) {
if (vt == Schema.Type.RECORD || vt == Schema.Type.ENUM || vt == Schema.Type.FIXED) {
String vname = w.getFullName();
String bname = b.getFullName();
final String vname = w.getFullName();
final String bname = b.getFullName();
// return immediately if the name matches exactly according to spec
if (vname != null && vname.equals(bname))
return j;

if (vt == Schema.Type.RECORD && !hasMatchError(RecordAdjust.resolve(w, b, d, seen))) {
String vShortName = w.getName();
String bShortName = b.getName();
final String vShortName = w.getName();
final String bShortName = b.getName();
// use the first structure match or one where the name matches
if ((structureMatch < 0) || (vShortName != null && vShortName.equals(bShortName))) {
structureMatch = j;
Expand Down Expand Up @@ -728,20 +728,21 @@ private static boolean hasMatchError(Action action) {
}
}

private static boolean unionEquiv(Schema w, Schema r, Map<SeenPair, Boolean> seen) {
Schema.Type wt = w.getType();
if (wt != r.getType())
private static boolean unionEquiv(Schema write, Schema read, Map<SeenPair, Boolean> seen) {
final Schema.Type wt = write.getType();
if (wt != read.getType()) {
return false;
}

// Previously, the spec was somewhat ambiguous as to whether getFullName or
// getName should be used here. Using name rather than fully qualified name
// maintains backwards compatibility.
if ((wt == Schema.Type.RECORD || wt == Schema.Type.FIXED || wt == Schema.Type.ENUM)
&& !(w.getName() == null || w.getName().equals(r.getName()))) {
&& !(write.getName() == null || write.getName().equals(read.getName()))) {
return false;
}

switch (w.getType()) {
switch (wt) {
case NULL:
case BOOLEAN:
case INT:
Expand All @@ -753,20 +754,21 @@ private static boolean unionEquiv(Schema w, Schema r, Map<SeenPair, Boolean> see
return true;

case ARRAY:
return unionEquiv(w.getElementType(), r.getElementType(), seen);
return unionEquiv(write.getElementType(), read.getElementType(), seen);
case MAP:
return unionEquiv(w.getValueType(), r.getValueType(), seen);
return unionEquiv(write.getValueType(), read.getValueType(), seen);

case FIXED:
return w.getFixedSize() == r.getFixedSize();
return write.getFixedSize() == read.getFixedSize();

case ENUM: {
List<String> ws = w.getEnumSymbols();
List<String> rs = r.getEnumSymbols();
if (ws.size() != rs.size())
final List<String> ws = write.getEnumSymbols();
final List<String> rs = read.getEnumSymbols();
if (ws.size() != rs.size()) {
return false;
int i;
for (i = 0; i < ws.size(); i++) {
}
int i = 0;
for (; i < ws.size(); i++) {
if (!ws.get(i).equals(rs.get(i))) {
break;
}
Expand All @@ -775,12 +777,13 @@ private static boolean unionEquiv(Schema w, Schema r, Map<SeenPair, Boolean> see
}

case UNION: {
List<Schema> wb = w.getTypes();
List<Schema> rb = r.getTypes();
if (wb.size() != rb.size())
final List<Schema> wb = write.getTypes();
final List<Schema> rb = read.getTypes();
if (wb.size() != rb.size()) {
return false;
int i;
for (i = 0; i < wb.size(); i++) {
}
int i = 0;
for (; i < wb.size(); i++) {
if (!unionEquiv(wb.get(i), rb.get(i), seen)) {
break;
}
Expand All @@ -789,17 +792,19 @@ private static boolean unionEquiv(Schema w, Schema r, Map<SeenPair, Boolean> see
}

case RECORD: {
SeenPair wsc = new SeenPair(w, r);
final SeenPair wsc = new SeenPair(write, read);
if (!seen.containsKey(wsc)) {
seen.put(wsc, true); // Be optimistic, but we may change our minds
List<Field> wb = w.getFields();
List<Field> rb = r.getFields();
final List<Field> wb = write.getFields();
final List<Field> rb = read.getFields();
if (wb.size() != rb.size()) {
seen.put(wsc, false);
} else {
int i;
for (i = 0; i < wb.size(); i++) {
if (!unionEquiv(wb.get(i).schema(), rb.get(i).schema(), seen)) {
int i = 0;
for (; i < wb.size(); i++) {
// Loop through each of the elements, and check if they are equal
if (!wb.get(i).name().equals(rb.get(i).name())
|| !unionEquiv(wb.get(i).schema(), rb.get(i).schema(), seen)) {
break;
}
}
Expand All @@ -809,7 +814,7 @@ private static boolean unionEquiv(Schema w, Schema r, Map<SeenPair, Boolean> see
return seen.get(wsc);
}
default:
throw new IllegalArgumentException("Unknown schema type: " + w.getType());
throw new IllegalArgumentException("Unknown schema type: " + write.getType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.AvroRuntimeException;
Expand Down Expand Up @@ -236,7 +235,6 @@ protected Object convert(Object datum, Schema schema, LogicalType type, Conversi
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
final Object record = data.newRecord(old, expected);
final Object state = data.getRecordState(record, expected);
final List<Field> expectedFields = expected.getFields();

for (Field field : in.readFieldOrder()) {
int pos = field.pos();
Expand All @@ -247,11 +245,6 @@ protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) th
}

readField(record, field, oldDatum, in, state);

// In case the expected field isn't in the read field
if (!expectedFields.get(pos).equals(field)) {
data.setField(record, field.name(), field.pos(), data.getDefaultValue(expectedFields.get(pos)));
}
}

return record;
Expand Down

0 comments on commit dd62091

Please sign in to comment.