-
Notifications
You must be signed in to change notification settings - Fork 819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BlobInputStream improvements #3148
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,14 +8,19 @@ | |
import static org.junit.jupiter.api.Assertions.assertArrayEquals; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.junit.jupiter.api.Assertions.fail; | ||
import static org.junit.jupiter.api.Assumptions.assumeFalse; | ||
import static org.junit.jupiter.api.Assumptions.assumeTrue; | ||
|
||
import org.postgresql.PGConnection; | ||
import org.postgresql.core.ServerVersion; | ||
import org.postgresql.largeobject.LargeObject; | ||
import org.postgresql.largeobject.LargeObjectManager; | ||
import org.postgresql.test.TestUtil; | ||
import org.postgresql.test.annotations.DisabledIfServerVersionBelow; | ||
import org.postgresql.util.PSQLState; | ||
|
||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.AfterEach; | ||
|
@@ -25,6 +30,7 @@ | |
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.sql.Blob; | ||
|
@@ -35,6 +41,7 @@ | |
import java.sql.SQLException; | ||
import java.sql.Statement; | ||
import java.sql.Types; | ||
import java.util.Arrays; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
||
import javax.sql.rowset.serial.SerialBlob; | ||
|
@@ -290,18 +297,184 @@ void markResetStream() throws Exception { | |
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
|
||
long oid = rs.getLong(1); | ||
LargeObject blob = lom.open(oid); | ||
InputStream bis = blob.getInputStream(); | ||
|
||
assertEquals('<', bis.read()); | ||
bis.mark(4); | ||
assertEquals('?', bis.read()); | ||
assertEquals('x', bis.read()); | ||
assertEquals('m', bis.read()); | ||
assertEquals('l', bis.read()); | ||
bis.reset(); | ||
assertEquals('?', bis.read()); | ||
try (LargeObject blob = lom.open(oid)) { | ||
InputStream bis = blob.getInputStream(); | ||
|
||
assertEquals('<', bis.read()); | ||
bis.mark(4); | ||
assertEquals('?', bis.read()); | ||
assertEquals('x', bis.read()); | ||
assertEquals('m', bis.read()); | ||
assertEquals('l', bis.read()); | ||
bis.reset(); | ||
assertEquals('?', bis.read()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
void markResetWithInitialOffset() throws Exception { | ||
assertTrue(uploadFile(TEST_FILE, NATIVE_STREAM) > 0); | ||
|
||
try (Statement stmt = con.createStatement()) { | ||
try (ResultSet rs = stmt.executeQuery("SELECT lo FROM testblob where id = '/test-file.xml'")) { | ||
assertTrue(rs.next()); | ||
|
||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
|
||
long oid = rs.getLong(1); | ||
try (LargeObject blob = lom.open(oid)) { | ||
blob.seek(4); | ||
InputStream bis = blob.getInputStream(); | ||
|
||
assertEquals('l', bis.read()); | ||
bis.reset(); | ||
assertEquals('l', bis.read()); | ||
assertEquals(' ', bis.read()); | ||
bis.mark(4); | ||
assertEquals('v', bis.read()); | ||
assertEquals('e', bis.read()); | ||
bis.reset(); | ||
assertEquals('v', bis.read()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
void skip() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals(0, bis.read()); | ||
assertEquals(1024L, bis.skip(1024)); | ||
assertEquals(1, bis.read()); | ||
assertEquals(64 * 1024L, bis.skip(64 * 1024)); | ||
assertEquals(65, bis.read()); | ||
} | ||
} | ||
|
||
@Test | ||
void skipBackwards() throws Exception { | ||
assertTrue(uploadFile(TEST_FILE, NATIVE_STREAM) > 0); | ||
|
||
try (Statement stmt = con.createStatement()) { | ||
try (ResultSet rs = stmt.executeQuery("SELECT lo FROM testblob where id = '/test-file.xml'")) { | ||
assertTrue(rs.next()); | ||
|
||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = rs.getLong(1); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals('<', bis.read()); | ||
assertEquals(0, bis.skip(-1)); | ||
assertEquals('?', bis.read()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Test | ||
void skipToEnd() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals(96 * 1024, bis.skip(96 * 1024)); | ||
assertEquals(-1, bis.read()); | ||
} | ||
} | ||
|
||
@Test | ||
void skipPastEnd() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals(1024 * 1024, bis.skip(1024 * 1024)); | ||
assertEquals(-1, bis.read()); | ||
assertEquals(1024, bis.skip(1024)); | ||
assertEquals(-1, bis.read()); | ||
} | ||
} | ||
|
||
@Test | ||
@DisabledIfServerVersionBelow("9.3") | ||
void skipPastMaxAfter9_3() throws Exception { | ||
assumeTrue(TestUtil.haveMinimumServerVersion(con, ServerVersion.v9_3)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could have been There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed. I didn't know about it. Other tests in this file don't even skip, they just pass. |
||
|
||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
LargeObject blob = lom.open(loid, LargeObjectManager.READ); | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals(0, bis.read()); | ||
assertThrows(IOException.class, () -> bis.skip(Long.MAX_VALUE / 2)); | ||
assertEquals(0, bis.read()); | ||
assertThrows(IOException.class, () -> { | ||
while (bis.read() != -1) { | ||
// consume stream | ||
} | ||
}); | ||
assertThrows(IOException.class, bis::close); | ||
SQLException ex = assertThrows(SQLException.class, blob::close); | ||
assertEquals(PSQLState.IN_FAILED_SQL_TRANSACTION.getState(), ex.getSQLState()); | ||
} | ||
|
||
@Test | ||
void skipPastMaxBefore9_3() throws Exception { | ||
assumeFalse(TestUtil.haveMinimumServerVersion(con, ServerVersion.v9_3)); | ||
|
||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(); | ||
assertEquals(0, bis.read()); | ||
assertThrows(IOException.class, () -> bis.skip(Integer.MAX_VALUE)); | ||
assertEquals(0, bis.read()); | ||
while (bis.read() != -1) { | ||
// consume stream | ||
} | ||
bis.close(); | ||
} | ||
} | ||
|
||
@Test | ||
void skipWithInitialOffset() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please use parameterized tests or at least individual test methods with their own names? Currently, it is hard to understand the intention behind the added tests |
||
blob.seek(1024); | ||
|
||
InputStream bis = blob.getInputStream(); | ||
assertEquals(1, bis.read()); | ||
assertEquals(1024L, bis.skip(1024)); | ||
assertEquals(2, bis.read()); | ||
assertEquals(64 * 1024L, bis.skip(64 * 1024)); | ||
assertEquals(66, bis.read()); | ||
} | ||
} | ||
|
||
@Test | ||
void skipWithLimit() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = createMediumLargeObject(); | ||
|
||
try (LargeObject blob = lom.open(loid, LargeObjectManager.READ)) { | ||
InputStream bis = blob.getInputStream(65 * 1024); | ||
assertEquals(0, bis.read()); | ||
assertEquals(64 * 1024L, bis.skip(64 * 1024)); | ||
assertEquals(64, bis.read()); | ||
assertEquals(1022L, bis.skip(1024)); | ||
assertEquals(-1, bis.read()); | ||
} | ||
} | ||
|
||
|
@@ -494,6 +667,24 @@ private long uploadFile(String file, int method) throws Exception { | |
return oid; | ||
} | ||
|
||
/** | ||
* Creates a large object big enough to require multiple buffers when reading. | ||
* @return the OID of the created large object | ||
* @see org.postgresql.largeobject.BlobInputStream#INITIAL_BUFFER_SIZE | ||
*/ | ||
private long createMediumLargeObject() throws Exception { | ||
LargeObjectManager lom = ((PGConnection) con).getLargeObjectAPI(); | ||
long loid = lom.createLO(); | ||
try (LargeObject blob = lom.open(loid, LargeObjectManager.WRITE)) { | ||
byte[] buf = new byte[1024]; | ||
for (byte i = 0; i < 96; i++) { | ||
Arrays.fill(buf, i); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer random data since 11111 22222 333 might mask bugs in case read accesses a wrong buffer item There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test has to know which value is expected. Random data would be difficult to verify by hand. As there are only 256 possible byte values, there will always be duplicates in the data, so you cannot prove it wasn't accidentally reading a different byte that happened to have the same value. |
||
blob.write(buf); | ||
} | ||
} | ||
return loid; | ||
} | ||
|
||
/* | ||
* Helper - compares the blobs in a table with a local file. Note this uses the postgresql | ||
* specific Large Object API | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document what this is doing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line specifically, or the whole added section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me that we are switching from LO to bytea here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole section. Frankly, the code is non obvious, so it makes sense to document the considered edge cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is sad we increase the number of roundtrips for a common case of "blob is used only once". Can we avoid adding extra db calls in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No? On old versions of Postgres (which you still test against), large objects only use an int for their position, with a limit of 2GB: https://www.postgresql.org/docs/9.2/lo-intro.html
See also the existing implementation of
reset()
, which checks whether it can useseek
instead ofseek64
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. Someone might call
mark()
before anything else, and the position needs to be correctly initialised before then.I suppose you could add extra logic to defer the initialisation to the first call to either
read()
,mark()
,reset()
, orskip()
. That didn't seem like a good idea to me though. Better to guarantee that it's been done before anything else can be called.