-
Notifications
You must be signed in to change notification settings - Fork 101
/
MergedResultSet.java
426 lines (369 loc) · 13.1 KB
/
MergedResultSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.connection;
import static com.google.common.base.Preconditions.checkState;
import com.google.cloud.spanner.ForwardingStructReader;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
/**
* {@link MergedResultSet} is a {@link ResultSet} implementation that combines the results from
* multiple queries. Each query uses its own {@link RowProducer} that feeds rows into the {@link
* MergedResultSet}. The order of the records in the {@link MergedResultSet} is not guaranteed.
*/
class MergedResultSet extends ForwardingStructReader implements PartitionedQueryResultSet {
static class PartitionExecutor implements Runnable {
private final Connection connection;
private final String partitionId;
private final LinkedBlockingDeque<PartitionExecutorResult> queue;
private final AtomicBoolean shouldStop = new AtomicBoolean();
PartitionExecutor(
Connection connection,
String partitionId,
LinkedBlockingDeque<PartitionExecutorResult> queue) {
this.connection = Preconditions.checkNotNull(connection);
this.partitionId = Preconditions.checkNotNull(partitionId);
this.queue = queue;
}
@Override
public void run() {
try (ResultSet resultSet = connection.runPartition(partitionId)) {
boolean first = true;
while (resultSet.next()) {
Struct row = resultSet.getCurrentRowAsStruct();
if (first) {
queue.put(
PartitionExecutorResult.dataAndMetadata(
row, resultSet.getType(), resultSet.getMetadata()));
first = false;
} else {
queue.put(PartitionExecutorResult.data(row));
}
if (shouldStop.get()) {
break;
}
}
if (first) {
// Special case: The result set did not return any rows. Push the metadata to the merged
// result set.
queue.put(
PartitionExecutorResult.typeAndMetadata(
resultSet.getType(), resultSet.getMetadata()));
}
} catch (Throwable exception) {
putWithoutInterruptPropagation(PartitionExecutorResult.exception(exception));
} finally {
// Emit a special 'finished' result to ensure that the row producer is not blocked on a
// queue that never receives any more results. This ensures that we can safely block on
// queue.take(), as we know that we will always receive at least one result from each
// worker.
putWithoutInterruptPropagation(PartitionExecutorResult.finished());
}
}
private void putWithoutInterruptPropagation(PartitionExecutorResult result) {
try {
queue.put(result);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
static class PartitionExecutorResult {
private final Struct data;
private final Throwable exception;
private final Type type;
private final ResultSetMetadata metadata;
static PartitionExecutorResult data(@Nonnull Struct data) {
return new PartitionExecutorResult(Preconditions.checkNotNull(data), null, null, null);
}
static PartitionExecutorResult typeAndMetadata(
@Nonnull Type type, @Nonnull ResultSetMetadata metadata) {
return new PartitionExecutorResult(
null, Preconditions.checkNotNull(type), Preconditions.checkNotNull(metadata), null);
}
static PartitionExecutorResult dataAndMetadata(
@Nonnull Struct data, @Nonnull Type type, @Nonnull ResultSetMetadata metadata) {
return new PartitionExecutorResult(
Preconditions.checkNotNull(data),
Preconditions.checkNotNull(type),
Preconditions.checkNotNull(metadata),
null);
}
static PartitionExecutorResult exception(@Nonnull Throwable exception) {
return new PartitionExecutorResult(null, null, null, Preconditions.checkNotNull(exception));
}
static PartitionExecutorResult finished() {
return new PartitionExecutorResult(null, null, null, null);
}
private PartitionExecutorResult(
Struct data, Type type, ResultSetMetadata metadata, Throwable exception) {
this.data = data;
this.type = type;
this.metadata = metadata;
this.exception = exception;
}
boolean hasData() {
return this.data != null;
}
boolean isFinished() {
return this.data == null
&& this.type == null
&& this.metadata == null
&& this.exception == null;
}
}
interface RowProducer extends Supplier<Struct> {
boolean nextRow() throws Throwable;
void close();
Type getType();
ResultSetMetadata getMetadata();
int getNumPartitions();
int getParallelism();
}
static class EmptyRowProducer implements RowProducer {
@Override
public Struct get() {
return Struct.newBuilder().build();
}
@Override
public boolean nextRow() {
return false;
}
@Override
public Type getType() {
return Type.struct();
}
@Override
public ResultSetMetadata getMetadata() {
return ResultSetMetadata.getDefaultInstance();
}
@Override
public int getNumPartitions() {
return 0;
}
@Override
public int getParallelism() {
return 0;
}
@Override
public void close() {}
}
private static class RowProducerImpl implements RowProducer {
/** The maximum number of rows that we will cache per thread that is fetching rows. */
private static final int QUEUE_SIZE_PER_WORKER = 32;
private final ExecutorService executor;
private final int parallelism;
private final List<PartitionExecutor> partitionExecutors;
private final AtomicInteger finishedCounter;
private final LinkedBlockingDeque<PartitionExecutorResult> queue;
private ResultSetMetadata metadata;
private Type type;
private Struct currentRow;
private Throwable exception;
RowProducerImpl(Connection connection, List<String> partitions, int maxParallelism) {
Preconditions.checkArgument(maxParallelism >= 0, "maxParallelism must be >= 0");
Preconditions.checkArgument(
!Preconditions.checkNotNull(partitions).isEmpty(), "partitions must not be empty");
if (maxParallelism == 0) {
// Dynamically determine parallelism.
this.parallelism = Math.min(partitions.size(), Runtime.getRuntime().availableProcessors());
} else {
this.parallelism = Math.min(partitions.size(), maxParallelism);
}
this.executor =
Executors.newFixedThreadPool(
this.parallelism,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("partitioned-query-row-producer");
thread.setDaemon(true);
return thread;
});
this.queue = new LinkedBlockingDeque<>(QUEUE_SIZE_PER_WORKER * this.parallelism);
this.partitionExecutors = new ArrayList<>(partitions.size());
this.finishedCounter = new AtomicInteger(partitions.size());
for (String partition : partitions) {
PartitionExecutor partitionExecutor =
new PartitionExecutor(connection, partition, this.queue);
this.partitionExecutors.add(partitionExecutor);
this.executor.submit(partitionExecutor);
}
// Pre-emptively shutdown the executor. This does not terminate any running tasks, but it
// stops the executor from accepting any new tasks and guarantees that the executor will
// always be shutdown, regardless whether the user calls ResultSet#close().
this.executor.shutdown();
}
@Override
public void close() {
this.partitionExecutors.forEach(partitionExecutor -> partitionExecutor.shouldStop.set(true));
// shutdownNow will interrupt any running tasks and then shut down directly.
// This will also cancel any queries that might be running.
this.executor.shutdownNow();
}
@Override
public boolean nextRow() throws Throwable {
if (this.exception != null) {
throw this.exception;
}
while (true) {
PartitionExecutorResult next;
if ((next = queue.peek()) != null && !next.isFinished()) {
// There's a valid result available. Return this quickly.
if (setNextRow(queue.remove())) {
return true;
}
}
// Block until the next row is available.
next = queue.take();
if (next.isFinished()) {
finishedCounter.decrementAndGet();
if (finishedCounter.get() == 0) {
return false;
}
} else {
if (setNextRow(next)) {
return true;
}
}
}
}
boolean setNextRow(PartitionExecutorResult next) throws Throwable {
if (next.exception != null) {
this.exception = next.exception;
throw next.exception;
}
currentRow = next.data;
if (this.metadata == null && next.metadata != null) {
this.metadata = next.metadata;
}
if (this.type == null && next.type != null) {
this.type = next.type;
}
return next.hasData();
}
@Override
public Struct get() {
checkState(currentRow != null, "next() call required");
return currentRow;
}
public ResultSetMetadata getMetadata() {
checkState(metadata != null, "next() call required");
return metadata;
}
@Override
public int getNumPartitions() {
return partitionExecutors.size();
}
@Override
public int getParallelism() {
return parallelism;
}
public Type getType() {
checkState(type != null, "next() call required");
return type;
}
}
private final RowProducer rowProducer;
private boolean closed;
MergedResultSet(Connection connection, List<String> partitions, int maxParallelism) {
this(
Preconditions.checkNotNull(partitions).isEmpty()
? new EmptyRowProducer()
: new RowProducerImpl(connection, partitions, maxParallelism));
}
private MergedResultSet(RowProducer rowProducer) {
super(rowProducer);
this.rowProducer = rowProducer;
}
@Override
protected void checkValidState() {
Preconditions.checkState(!closed, "This result set has been closed");
}
@Override
public boolean next() throws SpannerException {
checkValidState();
try {
return rowProducer.nextRow();
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
} catch (Throwable throwable) {
throw SpannerExceptionFactory.asSpannerException(throwable);
}
}
@Override
public Struct getCurrentRowAsStruct() {
checkValidState();
return rowProducer.get();
}
@Override
public void close() {
this.closed = true;
rowProducer.close();
}
@Override
public ResultSetStats getStats() {
throw new UnsupportedOperationException(
"ResultSetStats are available only for results returned from analyzeQuery() calls");
}
@Override
public ResultSetMetadata getMetadata() {
checkValidState();
return rowProducer.getMetadata();
}
@Override
public Type getType() {
checkValidState();
return rowProducer.getType();
}
@Override
public int getColumnCount() {
return getType().getStructFields().size();
}
@Override
public int getColumnIndex(String columnName) {
return getType().getFieldIndex(columnName);
}
@Override
public Type getColumnType(int columnIndex) {
return getType().getStructFields().get(columnIndex).getType();
}
@Override
public Type getColumnType(String columnName) {
return getType().getStructFields().get(getColumnIndex(columnName)).getType();
}
@Override
public int getNumPartitions() {
return rowProducer.getNumPartitions();
}
@Override
public int getParallelism() {
return rowProducer.getParallelism();
}
}