Skip to content

Commit

Permalink
AVRO-3871: Support nested lists/maps in BlockingDirectBinaryEncoder (#…
Browse files Browse the repository at this point in the history
…2732)

* Support nested lists/maps

* Add some tests
  • Loading branch information
Fokko committed May 5, 2024
1 parent e962bc4 commit 9f9023c
Show file tree
Hide file tree
Showing 5 changed files with 544 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;

/**
* An {@link Encoder} for Avro's binary encoding that does not buffer output.
Expand All @@ -46,13 +48,13 @@
* @see Decoder
*/
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
private final BufferOutputStream buffer;
private final ArrayList<BufferOutputStream> buffers;

private OutputStream originalStream;
private final ArrayDeque<OutputStream> stashedBuffers;

private boolean inBlock = false;
private int depth = 0;

private long blockItemCount;
private final ArrayDeque<Long> blockItemCounts;

/**
* Create a writer that sends its output to the underlying stream
Expand All @@ -62,24 +64,30 @@ public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
*/
public BlockingDirectBinaryEncoder(OutputStream out) {
super(out);
this.buffer = new BufferOutputStream();
this.buffers = new ArrayList<>();
this.stashedBuffers = new ArrayDeque<>();
this.blockItemCounts = new ArrayDeque<>();
}

private void startBlock() {
if (inBlock) {
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder");
stashedBuffers.push(out);
if (this.buffers.size() <= depth) {
this.buffers.add(new BufferOutputStream());
}
originalStream = out;
buffer.reset();
out = buffer;
inBlock = true;
BufferOutputStream buf = buffers.get(depth);
buf.reset();
this.depth += 1;
this.out = buf;
}

private void endBlock() {
if (!inBlock) {
if (depth == 0) {
throw new RuntimeException("Called endBlock, while not buffering a block");
}
out = originalStream;
this.depth -= 1;
out = stashedBuffers.pop();
BufferOutputStream buffer = this.buffers.get(depth);
long blockItemCount = blockItemCounts.pop();
if (blockItemCount > 0) {
try {
// Make it negative, so the reader knows that the number of bytes is coming
Expand All @@ -90,13 +98,11 @@ private void endBlock() {
throw new RuntimeException(e);
}
}
inBlock = false;
buffer.reset();
}

@Override
public void setItemCount(long itemCount) throws IOException {
blockItemCount = itemCount;
blockItemCounts.push(itemCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DirectBinaryEncoder extends BinaryEncoder {
* Create a writer that sends its output to the underlying stream
* <code>out</code>.
**/
public DirectBinaryEncoder(OutputStream out) {
protected DirectBinaryEncoder(OutputStream out) {
configure(out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.specific.TestRecordWithMapsAndArrays;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
Expand All @@ -31,13 +32,35 @@
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class TestBlockingDirectBinaryEncoder {

private void writeToArray(BinaryEncoder encoder, int[] numbers) throws IOException {
encoder.writeArrayStart();
encoder.setItemCount(numbers.length);
for (int number : numbers) {
encoder.startItem();
encoder.writeString(Integer.toString(number));
}
encoder.writeArrayEnd();
}

private void writeToMap(BinaryEncoder encoder, long[] numbers) throws IOException {
encoder.writeMapStart();
encoder.setItemCount(numbers.length);
for (long number : numbers) {
encoder.startItem();
encoder.writeString(Long.toString(number));
encoder.writeLong(number);
}
encoder.writeMapEnd();
}

@Test
void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -49,35 +72,57 @@ void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException
encoder.writeFixed(new byte[] { (byte) 0xC3, (byte) 0x01 });
encoder.writeFixed(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", TestRecordWithMapsAndArrays.SCHEMA$));

int len = 5;
// Array
this.writeToArray(encoder, new int[] { 1, 2, 3, 4, 5 });

// Map
writeToMap(encoder, new long[] { 1L, 2L, 3L, 4L, 5L });

// Nested Array

encoder.writeArrayStart();
encoder.setItemCount(len);
for (int i = 0; i < len; i++) {
encoder.startItem();
encoder.writeString(Integer.toString(i));
}
encoder.setItemCount(2);
this.writeToArray(encoder, new int[] { 1, 2 });
this.writeToArray(encoder, new int[] { 3, 4, 5 });
encoder.writeArrayEnd();

// Nested Map

encoder.writeMapStart();
encoder.setItemCount(len);
for (long i = 0; i < len; i++) {
encoder.startItem();
encoder.writeString(Long.toString(i));
encoder.writeLong(i);
}
encoder.setItemCount(2);
encoder.writeString("first");
this.writeToMap(encoder, new long[] { 1L, 2L });
encoder.writeString("second");
this.writeToMap(encoder, new long[] { 3L, 4L, 5L });
encoder.writeMapEnd();

// Read

encoder.flush();

BinaryMessageDecoder<TestRecordWithMapsAndArrays> decoder = TestRecordWithMapsAndArrays.getDecoder();
TestRecordWithMapsAndArrays r = decoder.decode(baos.toByteArray());

assertThat(r.getArr(), is(Arrays.asList("0", "1", "2", "3", "4")));
assertThat(r.getArr(), is(Arrays.asList("1", "2", "3", "4", "5")));
Map<String, Long> map = r.getMap();
assertThat(map.size(), is(5));
for (long i = 0; i < len; i++) {
for (long i = 1; i <= 5; i++) {
assertThat(map.get(Long.toString(i)), is(i));
}

assertThat(r.getNestedArr(), is(Arrays.asList(Arrays.asList("1", "2"), Arrays.asList("3", "4", "5"))));

Map<String, Map<String, Long>> nestedMap = r.getNestedMap();
assertThat(nestedMap.size(), is(2));

assertThat(nestedMap.get("first").size(), is(2));
assertThat(nestedMap.get("first").get("1"), is(1L));
assertThat(nestedMap.get("first").get("2"), is(2L));

assertThat(nestedMap.get("second").size(), is(3));
assertThat(nestedMap.get("second").get("3"), is(3L));
assertThat(nestedMap.get("second").get("4"), is(4L));
assertThat(nestedMap.get("second").get("5"), is(5L));
}

@Test
Expand All @@ -93,8 +138,8 @@ void testSkippingUsingBlocks() throws IOException, NoSuchAlgorithmException {
in.read(null, mockDecoder);
}

verify(mockDecoder, times(1)).skipMap();
verify(mockDecoder, times(1)).skipArray();
verify(mockDecoder, times(2)).skipMap();
verify(mockDecoder, times(2)).skipArray();
verify(mockDecoder, times(0)).readString();
verify(mockDecoder, times(0)).readLong();
}
Expand Down

0 comments on commit 9f9023c

Please sign in to comment.