Skip to content

Commit

Permalink
AVRO-3983: Allow setting a custom encoder in DataFileWriter (#2874)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed May 5, 2024
1 parent 76991e9 commit e962bc4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class DataFileWriter<D> implements Closeable, Flushable {

private byte[] sync; // 16 random bytes
private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
private Function<OutputStream, BinaryEncoder> initEncoder = out -> new EncoderFactory().directBinaryEncoder(out,
null);

private boolean isOpen;
private Codec codec;
Expand Down Expand Up @@ -130,6 +133,17 @@ public DataFileWriter<D> setSyncInterval(int syncInterval) {
return this;
}

/**
* Allows setting a different encoder than the default DirectBinaryEncoder.
*
* @param initEncoderFunc Function to create a binary encoder
* @return this DataFileWriter
*/
public DataFileWriter<D> setEncoder(Function<OutputStream, BinaryEncoder> initEncoderFunc) {
this.initEncoder = initEncoderFunc;
return this;
}

/** Open a new file for data matching a schema with a random sync. */
public DataFileWriter<D> create(Schema schema, File file) throws IOException {
SyncableFileOutputStream sfos = new SyncableFileOutputStream(file);
Expand Down Expand Up @@ -242,7 +256,7 @@ private void init(OutputStream outs) throws IOException {
this.vout = efactory.directBinaryEncoder(out, null);
dout.setSchema(schema);
buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1));
this.bufOut = efactory.directBinaryEncoder(buffer, null);
this.bufOut = this.initEncoder.apply(buffer);
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
Expand Down
45 changes: 30 additions & 15 deletions lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.avro.file.CodecFactory;
Expand All @@ -40,7 +42,9 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.RandomData;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -93,22 +97,32 @@ private File makeFile(CodecFactory codec) {
@ParameterizedTest
@MethodSource("codecs")
public void runTestsInOrder(CodecFactory codec) throws Exception {
LOG.info("Running with codec: " + codec);
testGenericWrite(codec);
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
testGenericAppend(codec);
testReadWithHeader(codec);
testFSync(codec, false);
testFSync(codec, true);
// Run for both encoders, but the MethodSource didn't really like it,
// so it is just a loop within the test
List<Function<OutputStream, BinaryEncoder>> encoders = new ArrayList<>();
encoders.add(b -> new EncoderFactory().directBinaryEncoder(b, null));
encoders.add(b -> new EncoderFactory().blockingDirectBinaryEncoder(b, null));

for (Function<OutputStream, BinaryEncoder> encoder : encoders) {
LOG.info("Running with codec: {}", codec);
testGenericWrite(codec, encoder);
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
testGenericAppend(codec, encoder);
testReadWithHeader(codec);
testFSync(codec, encoder, false);
testFSync(codec, encoder, true);
}
}

private void testGenericWrite(CodecFactory codec) throws IOException {
private void testGenericWrite(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc)
throws IOException {
DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>()).setSyncInterval(100);
if (codec != null) {
writer.setCodec(codec);
}
writer.setEncoder(encoderFunc);
writer.create(SCHEMA, makeFile(codec));
try {
int count = 0;
Expand Down Expand Up @@ -210,10 +224,12 @@ private void testSyncDiscovery(CodecFactory codec) throws IOException {
}
}

private void testGenericAppend(CodecFactory codec) throws IOException {
private void testGenericAppend(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc)
throws IOException {
File file = makeFile(codec);
long start = file.length();
try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>()).appendTo(file)) {
writer.setEncoder(encoderFunc);
for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) {
writer.append(datum);
}
Expand Down Expand Up @@ -254,11 +270,8 @@ private void testReadWithHeader(CodecFactory codec) throws IOException {
assertEquals(validPos, sin.tell(), "Should not move from sync point on reopen");
assertNotNull(readerFalse.next(), "Should be able to reopen at sync point");
}

}

}

}

@Test
Expand Down Expand Up @@ -306,8 +319,10 @@ public void flushCount() throws IOException {
assertTrue(out.flushCount < currentCount && out.flushCount >= flushCounter);
}

private void testFSync(CodecFactory codec, boolean useFile) throws IOException {
private void testFSync(CodecFactory codec, Function<OutputStream, BinaryEncoder> encoderFunc, boolean useFile)
throws IOException {
try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.setEncoder(encoderFunc);
writer.setFlushOnEveryBlock(false);
TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
if (useFile) {
Expand Down

0 comments on commit e962bc4

Please sign in to comment.