Skip to content

Commit

Permalink
feat: support partitioned queries (#1300)
Browse files Browse the repository at this point in the history
Adds support for partitioned queries to the JDBC driver. Partitioned queries can be executed either using SQL statements or specific Cloud Spanner methods that are exposed by the specific Cloud Spanner JDBC interfaces.
  • Loading branch information
olavloite committed Sep 15, 2023
1 parent c3be4d4 commit c50da41
Show file tree
Hide file tree
Showing 12 changed files with 964 additions and 7 deletions.
42 changes: 42 additions & 0 deletions clirr-ignored-differences.xml
Expand Up @@ -16,4 +16,46 @@
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>void setSavepointSupport(com.google.cloud.spanner.connection.SavepointSupport)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>int getMaxPartitionedParallelism()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>int getMaxPartitions()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>boolean isAutoPartitionMode()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>boolean isDataBoostEnabled()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>void setAutoPartitionMode(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>void setDataBoostEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>void setMaxPartitionedParallelism(int)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/jdbc/CloudSpannerJdbcConnection</className>
<method>void setMaxPartitions(int)</method>
</difference>

</differences>
Expand Up @@ -120,6 +120,25 @@ private StatementTimeout(long timeout, TimeUnit unit) {
}
}

/** Functional interface that throws {@link SQLException}. */
interface JdbcFunction<T, R> {
R apply(T t) throws SQLException;
}

/** Runs the given function with the timeout that has been set on this statement. */
protected <T> T runWithStatementTimeout(JdbcFunction<Connection, T> function)
throws SQLException {
checkClosed();
StatementTimeout originalTimeout = setTemporaryStatementTimeout();
try {
return function.apply(getConnection().getSpannerConnection());
} catch (SpannerException spannerException) {
throw JdbcSqlExceptionFactory.of(spannerException);
} finally {
resetStatementTimeout(originalTimeout);
}
}

/**
* Sets the statement timeout of the Spanner {@link Connection} to the query timeout of this JDBC
* {@link Statement} and returns the original timeout of the Spanner {@link Connection} so it can
Expand Down
Expand Up @@ -22,12 +22,14 @@
import com.google.cloud.spanner.CommitStats;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.AutocommitDmlMode;
import com.google.cloud.spanner.connection.SavepointSupport;
import com.google.cloud.spanner.connection.TransactionMode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Iterator;
Expand Down Expand Up @@ -334,6 +336,73 @@ default Dialect getDialect() {
return Dialect.GOOGLE_STANDARD_SQL;
}

/**
* Enable data boost for partitioned queries. See also {@link
* CloudSpannerJdbcStatement#partitionQuery(String, PartitionOptions, QueryOption...)} and {@link
* CloudSpannerJdbcPreparedStatement#partitionQuery(PartitionOptions, QueryOption...)}.
*/
default void setDataBoostEnabled(boolean dataBoostEnabled) throws SQLException {
throw new UnsupportedOperationException();
}

/**
* Returns whether data boost is enabled for partitioned queries. See also {@link
* CloudSpannerJdbcStatement#partitionQuery(String, PartitionOptions, QueryOption...)} and {@link
* CloudSpannerJdbcPreparedStatement#partitionQuery(PartitionOptions, QueryOption...)}.
*/
default boolean isDataBoostEnabled() throws SQLException {
throw new UnsupportedOperationException();
}

/**
* Sets whether this connection should always use partitioned queries when a query is executed on
* this connection. Setting this flag to <code>true</code> and then executing a query that cannot
* be partitioned, or executing a query in a read/write transaction, will cause an error. Use this
* flag in combination with {@link #setDataBoostEnabled(boolean)} to force all queries on this
* connection to use data boost.
*/
default void setAutoPartitionMode(boolean alwaysUsePartitionedQueries) throws SQLException {
throw new UnsupportedOperationException();
}

/** Returns whether this connection will execute all queries as partitioned queries. */
default boolean isAutoPartitionMode() throws SQLException {
throw new UnsupportedOperationException();
}

/**
* Sets the maximum number of partitions that should be included as a hint to Cloud Spanner when
* partitioning a query on this connection. Note that this is only a hint and Cloud Spanner might
* choose to ignore the hint.
*/
default void setMaxPartitions(int maxPartitions) throws SQLException {
throw new UnsupportedOperationException();
}

/**
* Gets the maximum number of partitions that should be included as a hint to Cloud Spanner when
* partitioning a query on this connection. Note that this is only a hint and Cloud Spanner might
* choose to ignore the hint.
*/
default int getMaxPartitions() throws SQLException {
throw new UnsupportedOperationException();
}

/**
* Sets the maximum degree of parallelism that is used when executing a partitioned query. A
* partitioned query will use up to <code>maxThreads</code> to execute and retrieve the results
* from Cloud Spanner. Set this value to <code>0</code>> to use the number of available processors
* as returned by {@link Runtime#availableProcessors()}.
*/
default void setMaxPartitionedParallelism(int maxThreads) throws SQLException {
throw new UnsupportedOperationException();
}

/** Returns the maximum degree of parallelism that is used for partitioned queries. */
default int getMaxPartitionedParallelism() throws SQLException {
throw new UnsupportedOperationException();
}

/**
* @see
* com.google.cloud.spanner.connection.Connection#addTransactionRetryListener(com.google.cloud.spanner.connection.TransactionRetryListener)
Expand Down
@@ -0,0 +1,33 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.jdbc;

import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.PartitionOptions;
import java.sql.ResultSet;

/**
* Result set that is returned for partitioned queries, e.g. for 'run partitioned query select ...'
* or for {@link CloudSpannerJdbcPreparedStatement#runPartitionedQuery(PartitionOptions,
* QueryOption...)}.
*/
public interface CloudSpannerJdbcPartitionedQueryResultSet extends ResultSet {
/** Returns the number of partitions that this result set contains. */
int getNumPartitions();

/** Returns the degree of parallelism that this result set uses. */
int getParallelism();
}
@@ -0,0 +1,61 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.jdbc;

import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.PartitionOptions;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* This interface is implemented by {@link PreparedStatement}s that are created on Cloud Spanner
* JDBC connections.
*/
public interface CloudSpannerJdbcPreparedStatement extends PreparedStatement {

/**
* Partitions this query, so it can be executed in parallel. This method returns a {@link
* ResultSet} with a string-representation of the partitions that were created. These strings can
* be used to execute a partition either on this connection or an any other connection (on this
* host or an any other host) by calling the method {@link #runPartition()}. This method will
* automatically enable data boost for the query if {@link
* CloudSpannerJdbcConnection#isDataBoostEnabled()} returns true.
*/
ResultSet partitionQuery(PartitionOptions partitionOptions, QueryOption... options)
throws SQLException;

/**
* Executes the given partition of a query. The partition that should be executed must be set as a
* string parameter on this {@link PreparedStatement} using {@link #setString(int, String)}. The
* value should be a string that was returned by {@link #partitionQuery(PartitionOptions,
* QueryOption...)}.
*/
ResultSet runPartition() throws SQLException;

/**
* Executes the given query as a partitioned query. The query will first be partitioned using the
* {@link #partitionQuery(PartitionOptions, QueryOption...)} method. Each of the partitions will
* then be executed in the background, and the results will be merged into a single result set.
*
* <p>This method will use {@link CloudSpannerJdbcConnection#getMaxPartitionedParallelism()}
* threads to execute the partitioned query. Set this variable to a higher/lower value to
* increase/decrease the degree of parallelism used for execution.
*/
CloudSpannerJdbcPartitionedQueryResultSet runPartitionedQuery(
PartitionOptions partitionOptions, QueryOption... options) throws SQLException;
}
@@ -0,0 +1,60 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.jdbc;

import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.PartitionOptions;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
* This interface is implemented by {@link Statement}s that are created on Cloud Spanner JDBC
* connections.
*/
public interface CloudSpannerJdbcStatement extends Statement {

/**
* Partitions the given query, so it can be executed in parallel. This method returns a {@link
* ResultSet} with a string-representation of the partitions that were created. These strings can
* be used to execute a partition either on this connection or an any other connection (on this
* host or an any other host) by calling the method {@link #runPartition(String)}. This method
* will automatically enable data boost for the query if {@link
* CloudSpannerJdbcConnection#isDataBoostEnabled()} returns true.
*/
ResultSet partitionQuery(String query, PartitionOptions partitionOptions, QueryOption... options)
throws SQLException;

/**
* Executes the given partition of a query. The encodedPartitionId should be a string that was
* returned by {@link #partitionQuery(String, PartitionOptions, QueryOption...)}.
*/
ResultSet runPartition(String encodedPartitionId) throws SQLException;

/**
* Executes the given query as a partitioned query. The query will first be partitioned using the
* {@link #partitionQuery(String, PartitionOptions, QueryOption...)} method. Each of the
* partitions will then be executed in the background, and the results will be merged into a
* single result set.
*
* <p>This method will use {@link CloudSpannerJdbcConnection#getMaxPartitionedParallelism()}
* threads to execute the partitioned query. Set this variable to a higher/lower value to
* increase/decrease the degree of parallelism used for execution.
*/
CloudSpannerJdbcPartitionedQueryResultSet runPartitionedQuery(
String query, PartitionOptions partitionOptions, QueryOption... options) throws SQLException;
}

0 comments on commit c50da41

Please sign in to comment.