-
Notifications
You must be signed in to change notification settings - Fork 2
/
SpannerDatabaseConnector.kt
103 lines (90 loc) · 3.29 KB
/
SpannerDatabaseConnector.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright 2020 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.wfanet.measurement.gcloud.spanner
import com.google.cloud.spanner.DatabaseClient
import com.google.cloud.spanner.DatabaseId
import com.google.cloud.spanner.Spanner
import io.grpc.LoadBalancerRegistry
import io.grpc.grpclb.GrpclbLoadBalancerProvider
import java.time.Duration
import java.util.logging.Logger
import kotlinx.coroutines.TimeoutCancellationException
/**
* Wraps a connection to a Spanner database for convenient access to an [AsyncDatabaseClient], the
* [DatabaseId], waiting for the connection to be ready, etc.
*/
class SpannerDatabaseConnector(
projectName: String,
instanceName: String,
databaseName: String,
private val readyTimeout: Duration,
emulatorHost: String?
) : AutoCloseable {
private val spanner: Spanner = buildSpanner(projectName, emulatorHost)
val databaseId: DatabaseId = DatabaseId.of(projectName, instanceName, databaseName)
private val internalDatabaseClient: DatabaseClient by lazy {
spanner.getDatabaseClient(databaseId)
}
val databaseClient: AsyncDatabaseClient
get() = internalDatabaseClient.asAsync()
/**
* Suspends until [databaseClient] is ready, throwing a
* [kotlinx.coroutines.TimeoutCancellationException] if [readyTimeout] is reached.
*/
suspend fun waitUntilReady() {
databaseClient.waitUntilReady(readyTimeout)
}
override fun close() {
spanner.close()
}
/**
* Executes [block] with this [SpannerDatabaseConnector] once it's ready, ensuring that this is
* closed when done.
*/
suspend fun <R> usingSpanner(block: suspend (spanner: SpannerDatabaseConnector) -> R): R {
use { spanner ->
try {
spanner.waitUntilReady()
} catch (e: TimeoutCancellationException) {
// Closing Spanner can take a long time (e.g. 1 minute) and delay the
// exception being surfaced, so we log here to give immediate feedback.
logger.severe { "Timed out waiting for Spanner to be ready" }
throw e
}
return block(spanner)
}
}
companion object {
private val logger = Logger.getLogger(this::class.java.name)
}
}
/** Builds a [SpannerDatabaseConnector] from these flags. */
private fun SpannerFlags.toSpannerDatabaseConnector(): SpannerDatabaseConnector {
return SpannerDatabaseConnector(
projectName = projectName,
instanceName = instanceName,
databaseName = databaseName,
readyTimeout = readyTimeout,
emulatorHost = emulatorHost
)
}
/**
* Executes [block] with a [SpannerDatabaseConnector] resource once it's ready, ensuring that the
* resource is closed.
*/
suspend fun <R> SpannerFlags.usingSpanner(
block: suspend (spanner: SpannerDatabaseConnector) -> R
): R {
return toSpannerDatabaseConnector().usingSpanner(block)
}