Skip to content

Commit

Permalink
feat: support UNRECOGNIZED types + decode BYTES columns lazily (#2219)
Browse files Browse the repository at this point in the history
* perf: decode BYTES columns lazily

BYTES columns are encoded as Base64 strings. Decoding these are relatively CPU-heavy,
especially for large values. Decoding them is not always necessary if the user only
needs the Base64 string. Also, the internally used Guava decoder is less efficient
than JDK implementations that are available from Java 8 and onwards. This change
therefore delays the decoding of BYTES columns until it is actually necessary, and
then uses the JDK implementation instead of the Guava version. The JDK implementation
in OpenJDK 17 uses approx 1/3 of the CPU cycles of the Guava version.

* feat: support unrecognized types

* feat: add support for lazy bytes arrays

* chore: cleanup

* fix: unrecognized types with array element type should be considered unrecognized

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* chore: address review comments

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] committed Feb 2, 2023
1 parent d44e468 commit fc721c4
Show file tree
Hide file tree
Showing 14 changed files with 1,341 additions and 57 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.4.0')
implementation platform('com.google.cloud:libraries-bom:26.5.0')
implementation 'com.google.cloud:google-cloud-spanner'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.35.1'
implementation 'com.google.cloud:google-cloud-spanner:6.35.2'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.35.1"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.35.2"
```

## Authentication
Expand Down
Expand Up @@ -32,12 +32,14 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.ListValue;
import com.google.protobuf.NullValue;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
Expand All @@ -55,23 +57,29 @@
import java.math.BigDecimal;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Base64;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
private static final Tracer tracer = Tracing.getTracer();
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();

interface Listener {
/**
Expand Down Expand Up @@ -353,6 +361,79 @@ private boolean isMergeable(KindCase kind) {
}
}

static final class LazyByteArray implements Serializable {
private static final Base64.Encoder ENCODER = Base64.getEncoder();
private static final Base64.Decoder DECODER = Base64.getDecoder();
private final String base64String;
private transient AbstractLazyInitializer<ByteArray> byteArray;

LazyByteArray(@Nonnull String base64String) {
this.base64String = Preconditions.checkNotNull(base64String);
this.byteArray = defaultInitializer();
}

LazyByteArray(@Nonnull ByteArray byteArray) {
this.base64String =
ENCODER.encodeToString(Preconditions.checkNotNull(byteArray).toByteArray());
this.byteArray =
new AbstractLazyInitializer<ByteArray>() {
@Override
protected ByteArray initialize() {
return byteArray;
}
};
}

private AbstractLazyInitializer<ByteArray> defaultInitializer() {
return new AbstractLazyInitializer<ByteArray>() {
@Override
protected ByteArray initialize() {
return ByteArray.copyFrom(DECODER.decode(base64String));
}
};
}

private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
byteArray = defaultInitializer();
}

ByteArray getByteArray() {
try {
return byteArray.get();
} catch (Throwable t) {
throw SpannerExceptionFactory.asSpannerException(t);
}
}

String getBase64String() {
return base64String;
}

@Override
public String toString() {
return getBase64String();
}

@Override
public int hashCode() {
return base64String.hashCode();
}

@Override
public boolean equals(Object o) {
if (o instanceof LazyByteArray) {
return lazyByteArraysEqual((LazyByteArray) o);
}
return false;
}

private boolean lazyByteArraysEqual(LazyByteArray other) {
return Objects.equals(getBase64String(), other.getBase64String());
}
}

static class GrpcStruct extends Struct implements Serializable {
private final Type type;
private final List<Object> rowData;
Expand Down Expand Up @@ -395,7 +476,11 @@ private Object writeReplace() {
builder.set(fieldName).to(Value.pgJsonb((String) value));
break;
case BYTES:
builder.set(fieldName).to((ByteArray) value);
builder
.set(fieldName)
.to(
Value.bytesFromBase64(
value == null ? null : ((LazyByteArray) value).getBase64String()));
break;
case TIMESTAMP:
builder.set(fieldName).to((Timestamp) value);
Expand Down Expand Up @@ -431,7 +516,17 @@ private Object writeReplace() {
builder.set(fieldName).toPgJsonbArray((Iterable<String>) value);
break;
case BYTES:
builder.set(fieldName).toBytesArray((Iterable<ByteArray>) value);
builder
.set(fieldName)
.toBytesArrayFromBase64(
value == null
? null
: ((List<LazyByteArray>) value)
.stream()
.map(
element ->
element == null ? null : element.getBase64String())
.collect(Collectors.toList()));
break;
case TIMESTAMP:
builder.set(fieldName).toTimestampArray((Iterable<Timestamp>) value);
Expand Down Expand Up @@ -511,7 +606,7 @@ private static Object decodeValue(Type fieldType, com.google.protobuf.Value prot
return proto.getStringValue();
case BYTES:
checkType(fieldType, proto, KindCase.STRING_VALUE);
return ByteArray.fromBase64(proto.getStringValue());
return new LazyByteArray(proto.getStringValue());
case TIMESTAMP:
checkType(fieldType, proto, KindCase.STRING_VALUE);
return Timestamp.parseTimestamp(proto.getStringValue());
Expand All @@ -526,6 +621,8 @@ private static Object decodeValue(Type fieldType, com.google.protobuf.Value prot
checkType(fieldType, proto, KindCase.LIST_VALUE);
ListValue structValue = proto.getListValue();
return decodeStructValue(fieldType, structValue);
case UNRECOGNIZED:
return proto;
default:
throw new AssertionError("Unhandled type code: " + fieldType.getCode());
}
Expand Down Expand Up @@ -634,7 +731,11 @@ protected String getPgJsonbInternal(int columnIndex) {

@Override
protected ByteArray getBytesInternal(int columnIndex) {
return (ByteArray) rowData.get(columnIndex);
return getLazyBytesInternal(columnIndex).getByteArray();
}

LazyByteArray getLazyBytesInternal(int columnIndex) {
return (LazyByteArray) rowData.get(columnIndex);
}

@Override
Expand All @@ -647,6 +748,10 @@ protected Date getDateInternal(int columnIndex) {
return (Date) rowData.get(columnIndex);
}

protected com.google.protobuf.Value getProtoValueInternal(int columnIndex) {
return (com.google.protobuf.Value) rowData.get(columnIndex);
}

@Override
protected Value getValueInternal(int columnIndex) {
final List<Type.StructField> structFields = getType().getStructFields();
Expand All @@ -671,13 +776,16 @@ protected Value getValueInternal(int columnIndex) {
case PG_JSONB:
return Value.pgJsonb(isNull ? null : getPgJsonbInternal(columnIndex));
case BYTES:
return Value.bytes(isNull ? null : getBytesInternal(columnIndex));
return Value.internalBytes(isNull ? null : getLazyBytesInternal(columnIndex));
case TIMESTAMP:
return Value.timestamp(isNull ? null : getTimestampInternal(columnIndex));
case DATE:
return Value.date(isNull ? null : getDateInternal(columnIndex));
case STRUCT:
return Value.struct(isNull ? null : getStructInternal(columnIndex));
case UNRECOGNIZED:
return Value.unrecognized(
isNull ? NULL_VALUE : getProtoValueInternal(columnIndex), columnType);
case ARRAY:
final Type elementType = columnType.getArrayElementType();
switch (elementType.getCode()) {
Expand Down Expand Up @@ -785,9 +893,10 @@ protected List<String> getPgJsonbListInternal(int columnIndex) {
}

@Override
@SuppressWarnings("unchecked") // We know ARRAY<BYTES> produces a List<ByteArray>.
@SuppressWarnings("unchecked") // We know ARRAY<BYTES> produces a List<LazyByteArray>.
protected List<ByteArray> getBytesListInternal(int columnIndex) {
return Collections.unmodifiableList((List<ByteArray>) rowData.get(columnIndex));
return Lists.transform(
(List<LazyByteArray>) rowData.get(columnIndex), l -> l == null ? null : l.getByteArray());
}

@Override
Expand Down
Expand Up @@ -251,7 +251,6 @@ public Date getDate(String columnName) {

@Override
public Value getValue(int columnIndex) {
checkNonNull(columnIndex, columnIndex);
return getValueInternal(columnIndex);
}

Expand Down

0 comments on commit fc721c4

Please sign in to comment.