Skip to content

Commit

Permalink
feat: Define binary transfer for custom types dynamically/automatical…
Browse files Browse the repository at this point in the history
…ly (Issue #2554) (#2556)

The driver would automatically register types for binary transfer when calling
con.unwrap(PGConnection.class).addDataType(String, Class)

Fixes #2554
  • Loading branch information
sebasbaumh committed Jan 31, 2023
1 parent f51c68e commit cb78d0e
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 18 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ij_any_spaces_around_relational_operators = true
ij_any_spaces_around_shift_operators = true
ij_continuation_indent_size = 4
ij_java_align_multiline_parameters = false
ij_java_binary_operation_sign_on_next_line = true
ij_java_if_brace_force = always
ij_java_indent_case_from_switch = false
ij_java_line_comment_add_space = true
Expand Down
50 changes: 50 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,63 @@ Object createQueryKey(String sql, boolean escapeProcessing, boolean isParameteri
*/
int getProtocolVersion();

/**
* Adds a single oid that should be received using binary encoding.
*
* @param oid The oid to request with binary encoding.
*/
void addBinaryReceiveOid(int oid);

/**
* Remove given oid from the list of oids for binary receive encoding.
* <p>Note: the binary receive for the oid can be re-activated later.</p>
*
* @param oid The oid to request with binary encoding.
*/
void removeBinaryReceiveOid(int oid);

/**
* Gets the oids that should be received using binary encoding.
* <p>Note: this returns an unmodifiable set, and its contents might not reflect the current state.</p>
*
* @return The oids to request with binary encoding.
* @deprecated the method returns a copy of the set, so it is not efficient. Use {@link #useBinaryForReceive(int)}
*/
@Deprecated
Set<? extends Integer> getBinaryReceiveOids();

/**
* Sets the oids that should be received using binary encoding.
*
* @param useBinaryForOids The oids to request with binary encoding.
*/
void setBinaryReceiveOids(Set<Integer> useBinaryForOids);

/**
* Adds a single oid that should be sent using binary encoding.
*
* @param oid The oid to send with binary encoding.
*/
void addBinarySendOid(int oid);

/**
* Remove given oid from the list of oids for binary send encoding.
* <p>Note: the binary send for the oid can be re-activated later.</p>
*
* @param oid The oid to send with binary encoding.
*/
void removeBinarySendOid(int oid);

/**
* Gets the oids that should be sent using binary encoding.
* <p>Note: this returns an unmodifiable set, and its contents might not reflect the current state.</p>
*
* @return useBinaryForOids The oids to send with binary encoding.
* @deprecated the method returns a copy of the set, so it is not efficient. Use {@link #useBinaryForSend(int)}
*/
@Deprecated
Set<? extends Integer> getBinarySendOids();

/**
* Sets the oids that should be sent using binary encoding.
*
Expand Down
64 changes: 58 additions & 6 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2912,26 +2912,78 @@ public ReplicationProtocol getReplicationProtocol() {
return replicationProtocol;
}

@Override
public void addBinaryReceiveOid(int oid) {
synchronized (useBinaryReceiveForOids) {
useBinaryReceiveForOids.add(oid);
}
}

@Override
public void removeBinaryReceiveOid(int oid) {
synchronized (useBinaryReceiveForOids) {
useBinaryReceiveForOids.remove(oid);
}
}

@Override
public Set<? extends Integer> getBinaryReceiveOids() {
// copy the values to prevent ConcurrentModificationException when reader accesses the elements
synchronized (useBinaryReceiveForOids) {
return new HashSet<>(useBinaryReceiveForOids);
}
}

@Override
public boolean useBinaryForReceive(int oid) {
return useBinaryReceiveForOids.contains(oid);
synchronized (useBinaryReceiveForOids) {
return useBinaryReceiveForOids.contains(oid);
}
}

@Override
public void setBinaryReceiveOids(Set<Integer> oids) {
useBinaryReceiveForOids.clear();
useBinaryReceiveForOids.addAll(oids);
synchronized (useBinaryReceiveForOids) {
useBinaryReceiveForOids.clear();
useBinaryReceiveForOids.addAll(oids);
}
}

@Override
public void addBinarySendOid(int oid) {
synchronized (useBinarySendForOids) {
useBinarySendForOids.add(oid);
}
}

@Override
public void removeBinarySendOid(int oid) {
synchronized (useBinarySendForOids) {
useBinarySendForOids.remove(oid);
}
}

@Override
public Set<? extends Integer> getBinarySendOids() {
// copy the values to prevent ConcurrentModificationException when reader accesses the elements
synchronized (useBinarySendForOids) {
return new HashSet<>(useBinarySendForOids);
}
}

@Override
public boolean useBinaryForSend(int oid) {
return useBinarySendForOids.contains(oid);
synchronized (useBinarySendForOids) {
return useBinarySendForOids.contains(oid);
}
}

@Override
public void setBinarySendOids(Set<Integer> oids) {
useBinarySendForOids.clear();
useBinarySendForOids.addAll(oids);
synchronized (useBinarySendForOids) {
useBinarySendForOids.clear();
useBinarySendForOids.addAll(oids);
}
}

private void setIntegerDateTimes(boolean state) {
Expand Down
72 changes: 60 additions & 12 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.sql.Struct;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -160,6 +161,11 @@ private enum ReadOnlyBehavior {
// Default forcebinary option.
protected boolean forcebinary = false;

/**
* Oids for which binary transfer should be disabled.
*/
private final Set<? extends Integer> binaryDisabledOids;

private int rsHoldability = ResultSet.CLOSE_CURSORS_AT_COMMIT;
private int savepointId = 0;
// Connection's autocommit state.
Expand Down Expand Up @@ -261,7 +267,14 @@ public PgConnection(HostSpec[] hostSpecs,

this.hideUnprivilegedObjects = PGProperty.HIDE_UNPRIVILEGED_OBJECTS.getBoolean(info);

Set<Integer> binaryOids = getBinaryOids(info);
// get oids that support binary transfer
Set<Integer> binaryOids = getBinaryEnabledOids(info);
// get oids that should be disabled from transfer
binaryDisabledOids = getBinaryDisabledOids(info);
// if there are any, remove them from the enabled ones
if (!binaryDisabledOids.isEmpty()) {
binaryOids.removeAll(binaryDisabledOids);
}

// split for receive and send for better control
Set<Integer> useBinarySendForOids = new HashSet<Integer>(binaryOids);
Expand Down Expand Up @@ -388,28 +401,51 @@ private static Set<Integer> getSupportedBinaryOids() {
Oid.UUID));
}

private static Set<Integer> getBinaryOids(Properties info) throws PSQLException {
/**
* Gets all oids for which binary transfer can be enabled.
*
* @param info properties
* @return oids for which binary transfer can be enabled
* @throws PSQLException if any oid is not valid
*/
private static Set<Integer> getBinaryEnabledOids(Properties info) throws PSQLException {
// check if binary transfer should be enabled for built-in types
boolean binaryTransfer = PGProperty.BINARY_TRANSFER.getBoolean(info);
// Formats that currently have binary protocol support
// get formats that currently have binary protocol support
Set<Integer> binaryOids = new HashSet<Integer>(32);
if (binaryTransfer) {
binaryOids.addAll(SUPPORTED_BINARY_OIDS);
}

// add all oids which are enabled for binary transfer by the creator of the connection
String oids = PGProperty.BINARY_TRANSFER_ENABLE.getOrDefault(info);
if (oids != null) {
binaryOids.addAll(getOidSet(oids));
}
oids = PGProperty.BINARY_TRANSFER_DISABLE.getOrDefault(info);
if (oids != null) {
binaryOids.removeAll(getOidSet(oids));
}

return binaryOids;
}

private static Set<Integer> getOidSet(String oidList) throws PSQLException {
Set<Integer> oids = new HashSet<Integer>();
/**
* Gets all oids for which binary transfer should be disabled.
*
* @param info properties
* @return oids for which binary transfer should be disabled
* @throws PSQLException if any oid is not valid
*/
private static Set<? extends Integer> getBinaryDisabledOids(Properties info)
throws PSQLException {
// check for oids that should explicitly be disabled
String oids = PGProperty.BINARY_TRANSFER_DISABLE.getOrDefault(info);
if (oids == null) {
return Collections.emptySet();
}
return getOidSet(oids);
}

private static Set<? extends Integer> getOidSet(String oidList) throws PSQLException {
if (oidList.isEmpty()) {
return Collections.emptySet();
}
Set<Integer> oids = new HashSet<>();
StringTokenizer tokenizer = new StringTokenizer(oidList, ",");
while (tokenizer.hasMoreTokens()) {
String oid = tokenizer.nextToken();
Expand Down Expand Up @@ -707,14 +743,26 @@ public void addDataType(String type, String name) {
try {
addDataType(type, Class.forName(name).asSubclass(PGobject.class));
} catch (Exception e) {
throw new RuntimeException("Cannot register new type: " + e);
throw new RuntimeException("Cannot register new type " + type, e);
}
}

@Override
public void addDataType(String type, Class<? extends PGobject> klass) throws SQLException {
checkClosed();
// first add the data type to the type cache
typeCache.addDataType(type, klass);
// then check if this type supports binary transfer
if (PGBinaryObject.class.isAssignableFrom(klass) && getPreferQueryMode() != PreferQueryMode.SIMPLE) {
// try to get an oid for this type (will return 0 if the type does not exist in the database)
int oid = typeCache.getPGType(type);

This comment has been minimized.

Copy link
@olavloite

olavloite Feb 2, 2023

Contributor

This will cause a query roundtrip to the database for each new connection that is created to check whether the type box exists. The reason for this is that:

  1. box is a PGBinaryObject.
  2. box is added to the set of object types here:
    addDataType("box", org.postgresql.geometric.PGbox.class);
  3. box is not part of the standard type cache here:
    {"point", Oid.POINT, Types.OTHER, "org.postgresql.geometric.PGpoint", Oid.POINT_ARRAY}

Is that intentional / WAI?

The query that is executed for each connection is

SELECT pg_type.oid, typname   FROM pg_catalog.pg_type   LEFT   JOIN (select ns.oid as nspoid, ns.nspname, r.r           from pg_namespace as ns           join ( select s.r, (current_schemas(false))[s.r] as nspname                    from generate_series(1, array_upper(current_schemas(false), 1)) as s(r) ) as r          using ( nspname )        ) as sp     ON sp.nspoid = typnamespace  WHERE typname = ?  ORDER BY sp.r, pg_type.oid DESC LIMIT 1

This comment has been minimized.

Copy link
@vlsi

vlsi Feb 2, 2023

Member

Thanks for the heads-up.
I guess we need to fix it.

This comment has been minimized.

Copy link
@olavloite

olavloite Feb 2, 2023

Contributor

Thanks. I just added #2746, as I wasn't sure this would ping you.

// check if oid is there and if it is not disabled for binary transfer
if (oid > 0 && !binaryDisabledOids.contains(oid)) {
// allow using binary transfer for receiving and sending of this type
queryExecutor.addBinaryReceiveOid(oid);
queryExecutor.addBinarySendOid(oid);
}
}
}

// This initialises the objectTypes hash map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.test.core;

import org.postgresql.core.BaseConnection;
import org.postgresql.core.QueryExecutor;
import org.postgresql.test.jdbc2.BaseTest4;

import org.junit.Test;

import java.sql.SQLException;
import java.util.Set;

/**
* TestCase to test handling of binary types.
*/
public class QueryExecutorTest extends BaseTest4 {
/**
* Make sure the functions for adding binary transfer OIDs for custom types are correct.
*
* @throws SQLException if a database error occurs
*/
@Test
public void testBinaryTransferOids() throws SQLException {
QueryExecutor queryExecutor = con.unwrap(BaseConnection.class).getQueryExecutor();
// get current OIDs (make a copy of them)
@SuppressWarnings("deprecation")
Set<? extends Integer> oidsReceive = queryExecutor.getBinaryReceiveOids();
@SuppressWarnings("deprecation")
Set<? extends Integer> oidsSend = queryExecutor.getBinarySendOids();
// add a new OID to be transferred as binary data
int customTypeOid = 91716;
assertBinaryForReceive(customTypeOid, false,
() -> "Custom type OID should not be binary for receive by default");
// first for receiving
queryExecutor.addBinaryReceiveOid(customTypeOid);
// Verify
assertBinaryForReceive(customTypeOid, true,
() -> "Just added oid via addBinaryReceiveOid");
assertBinaryForSend(customTypeOid, false,
() -> "Just added oid via addBinaryReceiveOid");
for (int oid : oidsReceive) {
assertBinaryForReceive(oid, true,
() -> "Previously registered BinaryReceiveOids should be intact after "
+ "addBinaryReceiveOid(" + customTypeOid + ")");
}
for (int oid : oidsSend) {
assertBinaryForSend(oid, true,
() -> "Previously registered BinarySendOids should be intact after "
+ "addBinaryReceiveOid(" + customTypeOid + ")");
}
// then for sending
queryExecutor.addBinarySendOid(customTypeOid);
// check new OID
assertBinaryForReceive(customTypeOid, true, () -> "added oid via addBinaryReceiveOid and "
+ "addBinarySendOid");
assertBinaryForSend(customTypeOid, true, () -> "added oid via addBinaryReceiveOid and "
+ "addBinarySendOid");
for (int oid : oidsReceive) {
assertBinaryForReceive(oid, true, () -> "Previously registered BinaryReceiveOids should be "
+ "intact after addBinaryReceiveOid(" + customTypeOid + ") and addBinarySendOid(" + customTypeOid + ")");
}
for (int oid : oidsSend) {
assertBinaryForSend(oid, true, () -> "Previously registered BinarySendOids should be intact"
+ " after addBinaryReceiveOid(" + customTypeOid + ")");
}
}
}
13 changes: 13 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/BaseTest4.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.postgresql.test.jdbc2;

import static org.junit.Assert.assertEquals;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.Oid;
import org.postgresql.core.Version;
import org.postgresql.jdbc.PreferQueryMode;
Expand All @@ -20,6 +23,7 @@
import java.sql.SQLException;
import java.util.Locale;
import java.util.Properties;
import java.util.function.Supplier;

public class BaseTest4 {

Expand Down Expand Up @@ -136,4 +140,13 @@ public void assumeMinimumServerVersion(Version version) throws SQLException {
Assume.assumeTrue(TestUtil.haveMinimumServerVersion(con, version));
}

protected void assertBinaryForReceive(int oid, boolean expected, Supplier<String> message) throws SQLException {
assertEquals(message.get() + ", useBinaryForReceive(oid=" + oid + ")", expected,
con.unwrap(BaseConnection.class).getQueryExecutor().useBinaryForReceive(oid));
}

protected void assertBinaryForSend(int oid, boolean expected, Supplier<String> message) throws SQLException {
assertEquals(message.get() + ", useBinaryForSend(oid=" + oid + ")", expected,
con.unwrap(BaseConnection.class).getQueryExecutor().useBinaryForSend(oid));
}
}

0 comments on commit cb78d0e

Please sign in to comment.