Skip to content

Commit

Permalink
feat: reverse scans public preview (#1711)
Browse files Browse the repository at this point in the history
This adds a reversed boolean to Query, which will allow endusers to stream rows in reverse order.

Example:
```java
Query query = Query.create("alphabet").range("a", "z").limit(3);
ServerStream<Row> results = client.readRows(query);

for (Row row : results) {
  System.out.println(row.getKey().toStringUtf8());
}
// Prints z, y, x
```
  • Loading branch information
igorbernstein2 committed Jun 27, 2023
1 parent f4f2e2e commit 176360f
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 86 deletions.
11 changes: 11 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -134,4 +134,15 @@
<method>*</method>
<to>*</to>
</difference>
<!-- Removed methods in an internal class -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/data/v2/internal/RowSetUtil</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -30,7 +30,7 @@ public class RowMergerUtil implements AutoCloseable {

public RowMergerUtil() {
RowBuilder<Row> rowBuilder = new DefaultRowAdapter().createRowBuilder();
merger = new RowMerger<>(rowBuilder);
merger = new RowMerger<>(rowBuilder, false);
}

@Override
Expand Down
Expand Up @@ -50,80 +50,79 @@ public final class RowSetUtil {
private RowSetUtil() {}

/**
* Splits the provided {@link RowSet} along the provided splitPoint into 2 segments. The right
* segment will contain all keys that are strictly greater than the splitPoint and all {@link
* RowRange}s truncated to start right after the splitPoint. The primary usecase is to resume a
* broken ReadRows stream.
* Removes all the keys and range parts that fall on or before the splitPoint.
*
* <p>The direction of before is determined by fromStart: for forward scans fromStart is true and
* will remove all the keys and range segments that would've been read prior to the splitPoint
* (ie. all of the keys sort lexiographically at or before the split point. For reverse scans,
* fromStart is false and all segments that sort lexiographically at or after the split point are
* removed. The primary usecase is to resume a broken ReadRows stream.
*/
@Nonnull
public static Split split(@Nonnull RowSet rowSet, @Nonnull ByteString splitPoint) {
// Edgecase: splitPoint is the leftmost key ("")
if (splitPoint.isEmpty()) {
return Split.of(null, rowSet);
}
public static RowSet erase(RowSet rowSet, ByteString splitPoint, boolean fromStart) {
RowSet.Builder newRowSet = RowSet.newBuilder();

// An empty RowSet represents a full table scan. Make that explicit so that there is RowRange to
// split.
if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
}

RowSet.Builder leftBuilder = RowSet.newBuilder();
boolean leftIsEmpty = true;
RowSet.Builder rightBuilder = RowSet.newBuilder();
boolean rightIsEmpty = true;

// Handle point lookups
for (ByteString key : rowSet.getRowKeysList()) {
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) <= 0) {
leftBuilder.addRowKeys(key);
leftIsEmpty = false;
if (fromStart) {
// key is right of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) > 0) {
newRowSet.addRowKeys(key);
}
} else {
rightBuilder.addRowKeys(key);
rightIsEmpty = false;
// key is left of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) < 0) {
newRowSet.addRowKeys(key);
}
}
}

for (RowRange range : rowSet.getRowRangesList()) {
StartPoint startPoint = StartPoint.extract(range);
int startCmp =
ComparisonChain.start()
.compare(startPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// when value lies on the split point, only closed start points are on the left
.compareTrueFirst(startPoint.isClosed, true)
.result();

// Range is fully on the right side
if (startCmp > 0) {
rightBuilder.addRowRanges(range);
rightIsEmpty = false;
continue;
// Handle ranges
for (RowRange rowRange : rowSet.getRowRangesList()) {
RowRange newRange = truncateRange(rowRange, splitPoint, fromStart);
if (newRange != null) {
newRowSet.addRowRanges(newRange);
}
}

EndPoint endPoint = EndPoint.extract(range);
int endCmp =
ComparisonChain.start()
// empty (true) end key means rightmost regardless of the split point
.compareFalseFirst(endPoint.value.isEmpty(), false)
.compare(endPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// don't care if the endpoint is open/closed: both will be on the left if the value is
// <=
.result();

if (endCmp <= 0) {
// Range is fully on the left
leftBuilder.addRowRanges(range);
leftIsEmpty = false;
} else {
// Range is split
leftBuilder.addRowRanges(range.toBuilder().setEndKeyClosed(splitPoint));
leftIsEmpty = false;
rightBuilder.addRowRanges(range.toBuilder().setStartKeyOpen(splitPoint));
rightIsEmpty = false;
// Return the new rowset if there is anything left to read
RowSet result = newRowSet.build();
if (result.getRowKeysList().isEmpty() && result.getRowRangesList().isEmpty()) {
return null;
}
return result;
}

private static RowRange truncateRange(RowRange range, ByteString split, boolean fromStart) {
if (fromStart) {
// range end is on or left of the split: skip
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) <= 0) {
return null;
}
} else {
// range is on or right of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) >= 0) {
return null;
}
}
RowRange.Builder newRange = range.toBuilder();

if (fromStart) {
// range start is on or left of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) <= 0) {
newRange.setStartKeyOpen(split);
}
} else {
// range end is on or right of the split
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) >= 0) {
newRange.setEndKeyOpen(split);
}
}

return Split.of(
leftIsEmpty ? null : leftBuilder.build(), rightIsEmpty ? null : rightBuilder.build());
return newRange.build();
}

/**
Expand Down
Expand Up @@ -184,6 +184,26 @@ public Query limit(long limit) {
return this;
}

/**
* Return rows in reverse order.
*
* <p>The row will be streamed in reverse lexiographic order of the keys. The row key ranges are
* still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
* will remain unchanged from the ordering forward scans. This is particularly useful to get the
* last N records before a key:
*
* <pre>{@code
* query
* .range(ByteStringRange.unbounded().endOpen("key"))
* .limit(10)
* .reversed(true)
* }</pre>
*/
public Query reversed(boolean enable) {
builder.setReversed(enable);
return this;
}

/**
* Split this query into multiple queries that can be evenly distributed across Bigtable nodes and
* be run in parallel. This method takes the results from {@link
Expand Down Expand Up @@ -379,11 +399,12 @@ public boolean advance(@Nonnull ByteString lastSeenRowKey) {

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
RowSet remaining =
RowSetUtil.erase(query.builder.getRows(), lastSeenRowKey, !query.builder.getReversed());
if (remaining == null) {
return false;
}
query.builder.setRows(split.getRight());
query.builder.setRows(remaining);
return true;
}
}
Expand Down
Expand Up @@ -732,7 +732,7 @@ private Builder() {
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());

featureFlags = FeatureFlags.newBuilder();
featureFlags = FeatureFlags.newBuilder().setReverseScans(true);
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand Down
Expand Up @@ -85,7 +85,8 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) {
return originalRequest;
}

RowSet remaining = RowSetUtil.split(originalRequest.getRows(), lastKey).getRight();
RowSet remaining =
RowSetUtil.erase(originalRequest.getRows(), lastKey, !originalRequest.getReversed());

// Edge case: retrying a fulfilled request.
// A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
Expand Down
Expand Up @@ -61,8 +61,8 @@ public class RowMerger<RowT> implements Reframer<RowT, ReadRowsResponse> {
private final StateMachine<RowT> stateMachine;
private Queue<RowT> mergedRows;

public RowMerger(RowBuilder<RowT> rowBuilder) {
stateMachine = new StateMachine<>(rowBuilder);
public RowMerger(RowBuilder<RowT> rowBuilder, boolean reversed) {
stateMachine = new StateMachine<>(rowBuilder, reversed);
mergedRows = new ArrayDeque<>();
}

Expand Down
Expand Up @@ -49,7 +49,7 @@ public RowMergingCallable(
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
RowBuilder<RowT> rowBuilder = rowAdapter.createRowBuilder();
RowMerger<RowT> merger = new RowMerger<>(rowBuilder);
RowMerger<RowT> merger = new RowMerger<>(rowBuilder, request.getReversed());
ReframingResponseObserver<ReadRowsResponse, RowT> innerObserver =
new ReframingResponseObserver<>(responseObserver, merger);
inner.call(request, innerObserver, context);
Expand Down
Expand Up @@ -76,6 +76,7 @@
*/
final class StateMachine<RowT> {
private final RowBuilder<RowT> adapter;
private boolean reversed;
private State currentState;
private ByteString lastCompleteRowKey;

Expand All @@ -102,9 +103,11 @@ final class StateMachine<RowT> {
* Initialize a new state machine that's ready for a new row.
*
* @param adapter The adapter that will build the final row.
* @param reversed
*/
StateMachine(RowBuilder<RowT> adapter) {
StateMachine(RowBuilder<RowT> adapter, boolean reversed) {
this.adapter = adapter;
this.reversed = reversed;
reset();
}

Expand Down Expand Up @@ -261,9 +264,15 @@ State handleChunk(CellChunk chunk) {
validate(chunk.hasFamilyName(), "AWAITING_NEW_ROW: family missing");
validate(chunk.hasQualifier(), "AWAITING_NEW_ROW: qualifier missing");
if (lastCompleteRowKey != null) {
validate(
ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey()) < 0,
"AWAITING_NEW_ROW: key must be strictly increasing");

int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey());
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
}

rowKey = chunk.getRowKey();
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.FeatureFlags;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
Expand All @@ -36,8 +37,14 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -78,12 +85,24 @@ public class BigtableDataClientFactoryTest {

private final BlockingQueue<Attributes> setUpAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Attributes> terminateAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Metadata> requestMetadata = new LinkedBlockingDeque<>();

@Before
public void setUp() throws IOException {
service = new FakeBigtableService();
server =
FakeServiceBuilder.create(service)
.intercept(
new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
requestMetadata.add(headers);
return next.startCall(call, headers);
}
})
.addTransportFilter(
new ServerTransportFilter() {
@Override
Expand Down Expand Up @@ -276,6 +295,24 @@ public void testCreateWithRefreshingChannel() throws Exception {
assertThat(terminateAttributes).hasSize(poolSize);
}

@Test
public void testFeatureFlags() throws Exception {
try (BigtableDataClientFactory factory = BigtableDataClientFactory.create(defaultSettings);
BigtableDataClient client = factory.createDefault()) {

requestMetadata.clear();
client.mutateRow(RowMutation.create("some-table", "some-key").deleteRow());
}

Metadata metadata = requestMetadata.take();
String encodedValue =
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
FeatureFlags featureFlags =
FeatureFlags.parseFrom(BaseEncoding.base64Url().decode(encodedValue));

assertThat(featureFlags.getReverseScans()).isTrue();
}

@Test
public void testBulkMutationFlowControllerConfigured() throws Exception {
BigtableDataSettings settings =
Expand Down Expand Up @@ -306,6 +343,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
volatile MutateRowRequest lastRequest;
BlockingQueue<ReadRowsRequest> readRowsRequests = new LinkedBlockingDeque<>();
BlockingQueue<PingAndWarmRequest> pingAndWarmRequests = new LinkedBlockingDeque<>();

private ApiFunction<ReadRowsRequest, ReadRowsResponse> readRowsCallback =
new ApiFunction<ReadRowsRequest, ReadRowsResponse>() {
@Override
Expand Down

0 comments on commit 176360f

Please sign in to comment.