Skip to content

Commit

Permalink
feat: implement efficient skipping in BlobInputStream
Browse files Browse the repository at this point in the history
Using seek is more efficient than reading and discarding the bytes.
The only behavioural change is it now allows skipping past the natural end of the stream, but the method contract already allows for this. It should not break anything.

Closes pgjdbc#3144
  • Loading branch information
OrangeDog committed May 13, 2024
1 parent 2447cc9 commit b044578
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 1 deletion.
Expand Up @@ -18,6 +18,8 @@
* This is an implementation of an InputStream from a large object.
*/
public class BlobInputStream extends InputStream {
static final int DEFAULT_BLOCK_SIZE = 8 * 1024;
static final int LO_BLOCK_SIZE = DEFAULT_BLOCK_SIZE / 4;
static final int DEFAULT_MAX_BUFFER_SIZE = 512 * 1024;
static final int INITIAL_BUFFER_SIZE = 64 * 1024;

Expand Down Expand Up @@ -91,13 +93,19 @@ public BlobInputStream(LargeObject lo, int bsize, long limit) {
// the first read to be exactly the initial buffer size
this.lastBufferSize = INITIAL_BUFFER_SIZE / 2;

// Trying to seek past the max size throws an exception, which skip() should avoid
long maxBlobSize;
try {
// initialise current position for mark/reset
this.absolutePosition = lo.tell64();
// https://github.com/postgres/postgres/blob/REL9_3_0/src/include/storage/large_object.h#L76
maxBlobSize = (long) Integer.MAX_VALUE * LO_BLOCK_SIZE;
} catch (SQLException e1) {
try {
// the tell64 function does not exist before PostgreSQL 9.3
this.absolutePosition = lo.tell();
// 2GiB is the limit for these older versions
maxBlobSize = Integer.MAX_VALUE;
} catch (SQLException e2) {
RuntimeException e3 = new RuntimeException("Failed to create BlobInputStream", e1);
e3.addSuppressed(e2);
Expand All @@ -106,7 +114,7 @@ public BlobInputStream(LargeObject lo, int bsize, long limit) {
}

// Treat -1 as no limit for backward compatibility
this.limit = limit == -1 ? Long.MAX_VALUE : limit + this.absolutePosition;
this.limit = limit == -1 ? maxBlobSize : limit + this.absolutePosition;
this.markPosition = this.absolutePosition;
}

Expand Down Expand Up @@ -351,6 +359,57 @@ public boolean markSupported() {
return true;
}

/**
* Skips over and discards {@code n} bytes of data from this input stream.
*
* <p>Due to the "sparse" implementation of Large Objects, this class allows skipping
* past the "end" of the stream. Subsequent reads will continue to return {@code -1}.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped which might be zero.
* @throws IOException if an I/O error occurs.
* @see java.io.InputStream#skip(long)
*/
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
// The spec does allow for skipping backwards, but let's not.
return 0;
}

try (ResourceLock ignore = lock.obtain()) {
LargeObject lo = getLo();
long loId = lo.getLongOID();

long targetPosition = absolutePosition + n;
if (targetPosition > limit || targetPosition < 0) {
targetPosition = limit;
}
long currentPosition = absolutePosition;
long skipped = targetPosition - currentPosition;
absolutePosition = targetPosition;

if (buffer != null && buffer.length - bufferPosition > skipped) {
bufferPosition += (int) skipped;
} else {
buffer = null;
try {
if (targetPosition <= Integer.MAX_VALUE) {
lo.seek((int) targetPosition, LargeObject.SEEK_SET);
} else {
lo.seek64(targetPosition, LargeObject.SEEK_SET);
}
} catch (SQLException e) {
throw new IOException(
GT.tr("Can not skip stream for large object {0} by {1} (currently @{2})",
loId, n, currentPosition),
e);
}
}
return skipped;
}
}

private LargeObject getLo() throws IOException {
if (lo == null) {
throw new IOException("BlobOutputStream is closed");
Expand Down
81 changes: 81 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/BlobTest.java
Expand Up @@ -8,6 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand All @@ -25,6 +26,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
Expand All @@ -35,6 +37,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;

import javax.sql.rowset.serial.SerialBlob;
Expand Down Expand Up @@ -335,6 +338,66 @@ void markResetWithInitialOffset() throws Exception {
}
}

@Test
void skip() throws Exception {
long expectedBigSkip;
if (TestUtil.haveMinimumServerVersion(con, ServerVersion.v9_3)) {
expectedBigSkip = 4398046442492L;
} else {
expectedBigSkip = 2147417083;
}

LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals(0, bis.read());
assertEquals(1024L, bis.skip(1024));
assertEquals(1, bis.read());
assertEquals(0L, bis.skip(-1));
assertEquals(1, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(65, bis.read());
assertEquals(expectedBigSkip, bis.skip(Long.MAX_VALUE));
assertEquals(-1, bis.read());
bis.close();
assertThrows(IOException.class, () -> bis.skip(1));
}
}

@Test
void skipWithInitialOffset() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
blob.seek(1024);

InputStream bis = blob.getInputStream();
assertEquals(1, bis.read());
assertEquals(1023L, bis.skip(1023));
assertEquals(2, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(66, bis.read());
}
}

@Test
void skipWithLimit() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream(65 * 1024);
assertEquals(0, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(64, bis.read());
assertEquals(1022L, bis.skip(1024));
assertEquals(-1, bis.read());
}
}

@Test
void getBytesOffset() throws Exception {
assertTrue(uploadFile(TEST_FILE, NATIVE_STREAM) > 0);
Expand Down Expand Up @@ -524,6 +587,24 @@ private long uploadFile(String file, int method) throws Exception {
return oid;
}

/**
* Creates a large object big enough to require multiple buffers when reading.
* @return the OID of the created large object
* @see org.postgresql.largeobject.BlobInputStream#INITIAL_BUFFER_SIZE
*/
private long createMediumLargeObject() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = lom.createLO();
try (LargeObject blob = lom.open(loid, LargeObjectManager.WRITE)) {
byte[] buf = new byte[1024];
for (byte i = 0; i < 96; i++) {
Arrays.fill(buf, i);
blob.write(buf);
}
}
return loid;
}

/*
* Helper - compares the blobs in a table with a local file. Note this uses the postgresql
* specific Large Object API
Expand Down

0 comments on commit b044578

Please sign in to comment.