Skip to content

Commit

Permalink
Modifie the Impl methods in ConnectionImpl to return java.sql.ResultS…
Browse files Browse the repository at this point in the history
…et, commenting the queryJob logic for the time being as it currently doesn't return the java.sql.ResultSet
  • Loading branch information
prash-mi committed Sep 9, 2021
1 parent 22af3f3 commit eb824b4
Showing 1 changed file with 11 additions and 9 deletions.
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.sql.ResultSet;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -60,7 +61,7 @@ public BigQueryDryRunResult dryRun(String sql) throws BigQuerySQLException {
}

@Override
public BigQueryResultSet executeSelect(String sql) throws BigQuerySQLException {
public ResultSet executeSelect(String sql) throws BigQuerySQLException {
// use jobs.query if all the properties of connectionSettings are supported
if (isFastQuerySupported()) {
String projectId = bigQueryOptions.getProjectId();
Expand All @@ -70,11 +71,11 @@ public BigQueryResultSet executeSelect(String sql) throws BigQuerySQLException {
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
createQueryJob(sql, connectionSettings, null, null);
return getQueryResultsRpc(JobId.fromPb(queryJob.getJobReference()));
return null; // TODO getQueryResultsRpc(JobId.fromPb(queryJob.getJobReference()));
}

@Override
public BigQueryResultSet executeSelect(
public ResultSet executeSelect(
String sql, List<QueryParameter> parameters, Map<String, String> labels)
throws BigQuerySQLException {
// use jobs.query if possible
Expand All @@ -87,10 +88,10 @@ public BigQueryResultSet executeSelect(
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
createQueryJob(sql, connectionSettings, parameters, labels);
return getQueryResultsRpc(JobId.fromPb(queryJob.getJobReference()));
return null; // TODO getQueryResultsRpc(JobId.fromPb(queryJob.getJobReference()));
}

private BigQueryResultSet queryRpc(final String projectId, final QueryRequest queryRequest) {
private ResultSet queryRpc(final String projectId, final QueryRequest queryRequest) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
Expand Down Expand Up @@ -122,7 +123,7 @@ private BigQueryResultSet queryRpc(final String projectId, final QueryRequest qu
long totalRows = results.getTotalRows().longValue();
long pageRows = results.getRows().size();
JobId jobId = JobId.fromPb(results.getJobReference());
return getQueryResultsWithJobId(totalRows, pageRows, null, jobId);
return null; // TODO getQueryResultsWithJobId(totalRows, pageRows, null, jobId);
}
}

Expand All @@ -136,15 +137,16 @@ private Map<String, String> getFieldNameType(Schema schema) {
return nameType;
}

private BigQueryResultSet processQueryResponseResults(
private ResultSet processQueryResponseResults(
com.google.api.services.bigquery.model.QueryResponse results) {
Schema schema;
long numRows;
schema = Schema.fromPb(results.getSchema());
numRows = results.getTotalRows().longValue();

// Producer thread for populating the buffer row by row
BlockingQueue<TableRow> buffer = new LinkedBlockingDeque<>(1000); // TODO: Update the capacity
BlockingQueue<TableRow> buffer =
new LinkedBlockingDeque<>(1000); // TODO: Update the capacity. Prefetch limit
Runnable populateBufferRunnable =
() -> { // producer thread populating the buffer
List<TableRow> tableRows = results.getRows();
Expand All @@ -166,7 +168,7 @@ private BigQueryResultSet processQueryResponseResults(
long totalRows = results.getTotalRows().longValue();
long pageRows = results.getRows().size();
JobId jobId = JobId.fromPb(results.getJobReference());
return getQueryResultsWithJobId(totalRows, pageRows, schema, jobId);
return null; // TODO - Implement getQueryResultsWithJobId(totalRows, pageRows, schema, jobId);
}

/* Returns query results using either tabledata.list or the high throughput Read API */
Expand Down

0 comments on commit eb824b4

Please sign in to comment.