Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobInputStream improvements #3148

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public BlobInputStream(LargeObject lo) {
*/

public BlobInputStream(LargeObject lo, int bsize) {
this(lo, bsize, Long.MAX_VALUE);
this(lo, bsize, -1);
}

/**
Expand All @@ -90,8 +90,24 @@ public BlobInputStream(LargeObject lo, int bsize, long limit) {
// The very first read multiplies the last buffer size by two, so we divide by two to get
// the first read to be exactly the initial buffer size
this.lastBufferSize = INITIAL_BUFFER_SIZE / 2;

try {
// initialise current position for mark/reset
this.absolutePosition = lo.tell64();
} catch (SQLException e1) {
try {
// the tell64 function does not exist before PostgreSQL 9.3
this.absolutePosition = lo.tell();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what this is doing ?

Copy link
Contributor Author

@OrangeDog OrangeDog Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line specifically, or the whole added section?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me that we are switching from LO to bytea here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole section. Frankly, the code is non obvious, so it makes sense to document the considered edge cases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is sad we increase the number of roundtrips for a common case of "blob is used only once". Can we avoid adding extra db calls in the constructor?

Copy link
Contributor Author

@OrangeDog OrangeDog Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me that we are switching from LO to bytea here ?

No? On old versions of Postgres (which you still test against), large objects only use an int for their position, with a limit of 2GB: https://www.postgresql.org/docs/9.2/lo-intro.html

See also the existing implementation of reset(), which checks whether it can use seek instead of seek64.

Copy link
Contributor Author

@OrangeDog OrangeDog Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid adding extra db calls in the constructor?

Not really. Someone might call mark() before anything else, and the position needs to be correctly initialised before then.

I suppose you could add extra logic to defer the initialisation to the first call to either read(), mark(), reset(), or skip(). That didn't seem like a good idea to me though. Better to guarantee that it's been done before anything else can be called.

} catch (SQLException e2) {
RuntimeException e3 = new RuntimeException("Failed to create BlobInputStream", e1);
e3.addSuppressed(e2);
throw e3;
}
}

// Treat -1 as no limit for backward compatibility
this.limit = limit == -1 ? Long.MAX_VALUE : limit;
this.limit = limit == -1 ? Long.MAX_VALUE : limit + this.absolutePosition;
this.markPosition = this.absolutePosition;
}

/**
Expand Down Expand Up @@ -335,6 +351,59 @@ public boolean markSupported() {
return true;
}

/**
* Skips over and discards {@code n} bytes of data from this input stream.
*
* <p>Due to the "sparse" implementation of Large Objects, this class allows skipping
* past the "end" of the stream. Subsequent reads will continue to return {@code -1}.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped which might be zero.
* @throws IOException if an underlying driver error occurs.
* In particular, this will throw if attempting to skip beyond the maximum length
* of a large object, which by default is 4,398,046,509,056 bytes.
* @see java.io.InputStream#skip(long)
*/
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
// The spec does allow for skipping backwards, but let's not.
return 0;
}

try (ResourceLock ignore = lock.obtain()) {
LargeObject lo = getLo();
long loId = lo.getLongOID();

long targetPosition = absolutePosition + n;
if (targetPosition > limit || targetPosition < 0) {
targetPosition = limit;
}
long currentPosition = absolutePosition;
long skipped = targetPosition - currentPosition;

if (buffer != null && buffer.length - bufferPosition > skipped) {
bufferPosition += (int) skipped;
} else {
try {
if (targetPosition <= Integer.MAX_VALUE) {
lo.seek((int) targetPosition, LargeObject.SEEK_SET);
} else {
lo.seek64(targetPosition, LargeObject.SEEK_SET);
}
} catch (SQLException e) {
throw new IOException(
GT.tr("Can not skip stream for large object {0} by {1} (currently @{2})",
loId, n, currentPosition),
e);
}
buffer = null;
}
absolutePosition = targetPosition;
return skipped;
}
}

private LargeObject getLo() throws IOException {
if (lo == null) {
throw new IOException("BlobOutputStream is closed");
Expand Down
213 changes: 202 additions & 11 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/BlobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import org.postgresql.PGConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.postgresql.test.TestUtil;
import org.postgresql.test.annotations.DisabledIfServerVersionBelow;
import org.postgresql.util.PSQLState;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -25,6 +30,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
Expand All @@ -35,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;

import javax.sql.rowset.serial.SerialBlob;
Expand Down Expand Up @@ -290,18 +297,184 @@ void markResetStream() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();

long oid = rs.getLong(1);
LargeObject blob = lom.open(oid);
InputStream bis = blob.getInputStream();

assertEquals('<', bis.read());
bis.mark(4);
assertEquals('?', bis.read());
assertEquals('x', bis.read());
assertEquals('m', bis.read());
assertEquals('l', bis.read());
bis.reset();
assertEquals('?', bis.read());
try (LargeObject blob = lom.open(oid)) {
InputStream bis = blob.getInputStream();

assertEquals('<', bis.read());
bis.mark(4);
assertEquals('?', bis.read());
assertEquals('x', bis.read());
assertEquals('m', bis.read());
assertEquals('l', bis.read());
bis.reset();
assertEquals('?', bis.read());
}
}
}
}

@Test
void markResetWithInitialOffset() throws Exception {
assertTrue(uploadFile(TEST_FILE, NATIVE_STREAM) > 0);

try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SELECT lo FROM testblob where id = '/test-file.xml'")) {
assertTrue(rs.next());

LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();

long oid = rs.getLong(1);
try (LargeObject blob = lom.open(oid)) {
blob.seek(4);
InputStream bis = blob.getInputStream();

assertEquals('l', bis.read());
bis.reset();
assertEquals('l', bis.read());
assertEquals(' ', bis.read());
bis.mark(4);
assertEquals('v', bis.read());
assertEquals('e', bis.read());
bis.reset();
assertEquals('v', bis.read());
}
}
}
}

@Test
void skip() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals(0, bis.read());
assertEquals(1024L, bis.skip(1024));
assertEquals(1, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(65, bis.read());
}
}

@Test
void skipBackwards() throws Exception {
assertTrue(uploadFile(TEST_FILE, NATIVE_STREAM) > 0);

try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SELECT lo FROM testblob where id = '/test-file.xml'")) {
assertTrue(rs.next());

LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = rs.getLong(1);

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals('<', bis.read());
assertEquals(0, bis.skip(-1));
assertEquals('?', bis.read());
}
}
}
}

@Test
void skipToEnd() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals(96 * 1024, bis.skip(96 * 1024));
assertEquals(-1, bis.read());
}
}

@Test
void skipPastEnd() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals(1024 * 1024, bis.skip(1024 * 1024));
assertEquals(-1, bis.read());
assertEquals(1024, bis.skip(1024));
assertEquals(-1, bis.read());
}
}

@Test
@DisabledIfServerVersionBelow("9.3")
void skipPastMaxAfter9_3() throws Exception {
assumeTrue(TestUtil.haveMinimumServerVersion(con, ServerVersion.v9_3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have been @DisabledIfServerVersionBelow("9.3")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I didn't know about it. Other tests in this file don't even skip, they just pass.


LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

LargeObject blob = lom.open(loid, LargeObjectManager.READ);
InputStream bis = blob.getInputStream();
assertEquals(0, bis.read());
assertThrows(IOException.class, () -> bis.skip(Long.MAX_VALUE / 2));
assertEquals(0, bis.read());
assertThrows(IOException.class, () -> {
while (bis.read() != -1) {
// consume stream
}
});
assertThrows(IOException.class, bis::close);
SQLException ex = assertThrows(SQLException.class, blob::close);
assertEquals(PSQLState.IN_FAILED_SQL_TRANSACTION.getState(), ex.getSQLState());
}

@Test
void skipPastMaxBefore9_3() throws Exception {
assumeFalse(TestUtil.haveMinimumServerVersion(con, ServerVersion.v9_3));

LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream();
assertEquals(0, bis.read());
assertThrows(IOException.class, () -> bis.skip(Integer.MAX_VALUE));
assertEquals(0, bis.read());
while (bis.read() != -1) {
// consume stream
}
bis.close();
}
}

@Test
void skipWithInitialOffset() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use parameterized tests or at least individual test methods with their own names? Currently, it is hard to understand the intention behind the added tests

blob.seek(1024);

InputStream bis = blob.getInputStream();
assertEquals(1, bis.read());
assertEquals(1024L, bis.skip(1024));
assertEquals(2, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(66, bis.read());
}
}

@Test
void skipWithLimit() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = createMediumLargeObject();

try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) {
InputStream bis = blob.getInputStream(65 * 1024);
assertEquals(0, bis.read());
assertEquals(64 * 1024L, bis.skip(64 * 1024));
assertEquals(64, bis.read());
assertEquals(1022L, bis.skip(1024));
assertEquals(-1, bis.read());
}
}

Expand Down Expand Up @@ -494,6 +667,24 @@ private long uploadFile(String file, int method) throws Exception {
return oid;
}

/**
* Creates a large object big enough to require multiple buffers when reading.
* @return the OID of the created large object
* @see org.postgresql.largeobject.BlobInputStream#INITIAL_BUFFER_SIZE
*/
private long createMediumLargeObject() throws Exception {
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI();
long loid = lom.createLO();
try (LargeObject blob = lom.open(loid, LargeObjectManager.WRITE)) {
byte[] buf = new byte[1024];
for (byte i = 0; i < 96; i++) {
Arrays.fill(buf, i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer random data since 11111 22222 333 might mask bugs in case read accesses a wrong buffer item

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test has to know which value is expected. Random data would be difficult to verify by hand.

As there are only 256 possible byte values, there will always be duplicates in the data, so you cannot prove it wasn't accidentally reading a different byte that happened to have the same value.

blob.write(buf);
}
}
return loid;
}

/*
* Helper - compares the blobs in a table with a local file. Note this uses the postgresql
* specific Large Object API
Expand Down