Skip to content

Commit

Permalink
Fix query snapshot readTime logic
Browse files Browse the repository at this point in the history
  • Loading branch information
brettwillis committed Apr 23, 2024
1 parent 2af8fab commit 633a8e5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 117 deletions.
4 changes: 2 additions & 2 deletions dev/src/document-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
async get(
requestTag: string
): Promise<Array<DocumentSnapshot<AppModelType, DbModelType>>> {
const {result} = await this.getResponse(requestTag);
const {result} = await this._get(requestTag);
return result;
}

Expand All @@ -91,7 +91,7 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
*
* @param requestTag A unique client-assigned identifier for this request.
*/
async getResponse(
async _get(
requestTag: string
): Promise<BatchGetResponse<AppModelType, DbModelType>> {
await this.fetchDocuments(requestTag);
Expand Down
183 changes: 72 additions & 111 deletions dev/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ export class VectorQuerySnapshot<
}
}

interface QueryStreamResponse<
interface QueryStreamElement<
AppModelType = firestore.DocumentData,
DbModelType extends firestore.DocumentData = firestore.DocumentData,
> {
Expand All @@ -1470,19 +1470,12 @@ interface QueryStreamResponse<

interface QueryResponse<TSnapshot> {
transaction?: Uint8Array;
readTime?: Timestamp;
explainMetrics?: ExplainMetrics;
result: TSnapshot;
result?: TSnapshot;
}

interface AggregateQueryResponse<
AggregateSpecType extends AggregateSpec,
AppModelType = firestore.DocumentData,
DbModelType extends firestore.DocumentData = firestore.DocumentData,
> {
transaction?: Uint8Array;
explainMetrics?: ExplainMetrics;
result: AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>;
interface QuerySnapshotResponse<TSnapshot> extends QueryResponse<TSnapshot> {
result: TSnapshot;
}

/** Internal representation of a query cursor before serialization. */
Expand Down Expand Up @@ -1718,19 +1711,6 @@ class QueryUtil<
readonly _serializer: Serializer
) {}

async _get(
query: Template,
transactionIdOrReadTime?: Uint8Array | Timestamp,
retryWithCursor = true
): Promise<ReturnType<Template['_createSnapshot']>> {
const {result} = await this._getResponse(
query,
transactionIdOrReadTime,
retryWithCursor
);
return result;
}

_getResponse(
query: Template,
transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions,
Expand All @@ -1742,7 +1722,9 @@ class QueryUtil<

return new Promise((resolve, reject) => {
const docs: Array<QueryDocumentSnapshot<AppModelType, DbModelType>> = [];
const output: Omit<QueryResponse<never>, 'result'> = {};
const output: Omit<QueryResponse<never>, 'result'> & {
readTime?: Timestamp;
} = {};

this._stream(
query,
Expand All @@ -1753,7 +1735,7 @@ class QueryUtil<
.on('error', err => {
reject(wrapError(err, stack));
})
.on('data', (data: QueryStreamResponse<AppModelType, DbModelType>) => {
.on('data', (data: QueryStreamElement<AppModelType, DbModelType>) => {
if (data.transaction) {
output.transaction = data.transaction;
}
Expand All @@ -1775,31 +1757,28 @@ class QueryUtil<
docs.reverse();
}

// TODO(ehsannas): Typings of QuerySnapshot suggest that readTime is always available
// however there are in the scenario of query plan with no results there will be no read time sent
// so the typings should be updated to make readTime optional?
// For now we keep the null assertion behaviour which has been the case historically
// if (!output.readTime) {
// reject(wrapError(Error('No read time'), stack));
// return;
// }

const result = query._createSnapshot(
// TODO(ehsannas): null assertion is incorrect in some scenarios, see above comment
output.readTime!,
docs.length,
() => docs,
() => {
const changes: Array<DocumentChange<AppModelType, DbModelType>> =
[];
for (let i = 0; i < docs.length; ++i) {
changes.push(new DocumentChange('added', docs[i], -1, i));
}
return changes;
}
) as ReturnType<Template['_createSnapshot']>;
// Only return a snapshot when we have a readTime
// explain queries with analyze !== true will return no documents and no read time
const result = output.readTime
? (query._createSnapshot(
output.readTime,
docs.length,
() => docs,
() => {
const changes: Array<
DocumentChange<AppModelType, DbModelType>
> = [];
for (let i = 0; i < docs.length; ++i) {
changes.push(new DocumentChange('added', docs[i], -1, i));
}
return changes;
}
) as ReturnType<Template['_createSnapshot']>)
: undefined;

resolve({
...output,
transaction: output.transaction,
explainMetrics: output.explainMetrics,
result,
});
});
Expand Down Expand Up @@ -1869,7 +1848,7 @@ class QueryUtil<
return;
}

const output: QueryStreamResponse<AppModelType, DbModelType> = {};
const output: QueryStreamElement<AppModelType, DbModelType> = {};

// Proto comes with zero-length buffer by default
if (proto.transaction?.length) {
Expand Down Expand Up @@ -3036,8 +3015,9 @@ export class Query<
* });
* ```
*/
get(): Promise<QuerySnapshot<AppModelType, DbModelType>> {
return this._get();
async get(): Promise<QuerySnapshot<AppModelType, DbModelType>> {
const {result} = await this._get();
return result;
}

/**
Expand All @@ -3061,37 +3041,30 @@ export class Query<
if (!explainMetrics) {
throw new Error('No explain results');
}
return new ExplainResults(
explainMetrics,
result as QuerySnapshot<AppModelType, DbModelType>
);
return new ExplainResults(explainMetrics, result || null);
}

/**
* Internal get() method that accepts an optional transaction id.
* Internal get() method that accepts an optional transaction options, and
* returns a query snapshot with transaction and explain metadata.
*
* @private
* @internal
* @param transactionOrReadTime A transaction ID, options to start a new
* transaction, or timestamp to use as read time.
*/
async _get(
transactionIdOrReadTime?: Uint8Array | Timestamp
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
return this._queryUtil._get(this, transactionIdOrReadTime) as Promise<
transactionIdOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions
): Promise<QuerySnapshotResponse<QuerySnapshot<AppModelType, DbModelType>>> {
const result = await this._getResponse(transactionIdOrReadTime);
if (!result.result) {
throw new Error('No QuerySnapshot result');
}
return result as QuerySnapshotResponse<
QuerySnapshot<AppModelType, DbModelType>
>;
}

/**
* Internal get() method that accepts an optional transaction id, and returns
* transaction metadata.
*
* @private
* @internal
* @param transactionOrReadTime A transaction ID, options to start a new
* transaction, or timestamp to use as read time.
*/
_getResponse(
transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions,
explainOptions?: firestore.ExplainOptions
Expand Down Expand Up @@ -3174,7 +3147,7 @@ export class Query<
const transform = new Transform({
objectMode: true,
transform(
chunk: QueryStreamResponse<AppModelType, DbModelType>,
chunk: QueryStreamElement<AppModelType, DbModelType>,
encoding,
callback
) {
Expand Down Expand Up @@ -3966,51 +3939,35 @@ export class AggregateQuery<
*
* @return A promise that will be resolved with the results of the query.
*/
get(): Promise<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
> {
return this._get();
}

/**
* Internal get() method that accepts an optional transaction id.
*
* @private
* @internal
* @param transactionOrReadTime A transaction ID, options to start a new
* transaction, or timestamp to use as read time.
*/
async _get(
transactionIdOrReadTime?: Uint8Array | Timestamp
): Promise<
async get(): Promise<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
> {
const {result} = await this._getResponse(transactionIdOrReadTime);
const {result} = await this._get();
return result;
}

/**
* Internal get() method that accepts an optional transaction id, and returns
* transaction metadata.
* Internal get() method that accepts an optional transaction options and
* returns a snapshot with transaction and explain metadata.
*
* @private
* @internal
* @param transactionOrReadTime A transaction ID, options to start a new
* transaction, or timestamp to use as read time.
*/
async _getResponse(
async _get(
transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions
): Promise<
AggregateQueryResponse<AggregateSpecType, AppModelType, DbModelType>
QuerySnapshotResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
>
> {
const response = await this._getResponseOrExplain(transactionOrReadTime);
const response = await this._getResponse(transactionOrReadTime);
if (!response.result) {
throw new Error('No AggregateQuery results');
}
return response as AggregateQueryResponse<
AggregateSpecType,
AppModelType,
DbModelType
return response as QuerySnapshotResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
>;
}

Expand All @@ -4023,20 +3980,20 @@ export class AggregateQuery<
* @param transactionOrReadTime A transaction ID, options to start a new
* transaction, or timestamp to use as read time.
*/
_getResponseOrExplain(
_getResponse(
transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions,
explainOptions?: firestore.ExplainOptions
): Promise<
Partial<
AggregateQueryResponse<AggregateSpecType, AppModelType, DbModelType>
QueryResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
>
> {
// Capture the error stack to preserve stack tracing across async calls.
const stack = Error().stack!;

return new Promise((resolve, reject) => {
const output: Partial<
AggregateQueryResponse<AggregateSpecType, AppModelType, DbModelType>
const output: QueryResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
> = {};

const stream = this._stream(transactionOrReadTime, explainOptions);
Expand All @@ -4046,8 +4003,8 @@ export class AggregateQuery<
stream.on(
'data',
(
data: Partial<
AggregateQueryResponse<AggregateSpecType, AppModelType, DbModelType>
data: QueryResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
>
) => {
if (data.transaction) {
Expand Down Expand Up @@ -4091,8 +4048,8 @@ export class AggregateQuery<
const stream: Transform = new Transform({
objectMode: true,
transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => {
const output: Partial<
AggregateQueryResponse<AggregateSpecType, AppModelType, DbModelType>
const output: QueryResponse<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
> = {};

// Proto comes with zero-length buffer by default
Expand Down Expand Up @@ -4281,7 +4238,7 @@ export class AggregateQuery<
AggregateQuerySnapshot<AggregateSpecType, AppModelType, DbModelType>
>
> {
const {result, explainMetrics} = await this._getResponseOrExplain(
const {result, explainMetrics} = await this._getResponse(
undefined,
options
);
Expand Down Expand Up @@ -4473,13 +4430,17 @@ export class VectorQuery<
*
* @returns A promise that will be resolved with the results of the query.
*/
get(): Promise<VectorQuerySnapshot<AppModelType, DbModelType>> {
return this._queryUtil._get(
async get(): Promise<VectorQuerySnapshot<AppModelType, DbModelType>> {
const {result} = await this._queryUtil._getResponse(
this,
/*transactionId*/ undefined,
// VectorQuery cannot be retried with cursors as they do not support cursors yet.
/*retryWithCursor*/ false
) as Promise<VectorQuerySnapshot<AppModelType, DbModelType>>;
);
if (!result) {
throw new Error('No VectorQuerySnapshot result');
}
return result;
}

/**
Expand Down
8 changes: 4 additions & 4 deletions dev/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ export class Transaction implements firestore.Transaction {
const {
transaction,
result: [result],
} = await documentReader.getResponse(this._requestTag);
} = await documentReader._get(this._requestTag);
return {transaction, result};
}

Expand All @@ -722,7 +722,7 @@ export class Transaction implements firestore.Transaction {
fieldMask,
opts
);
return documentReader.getResponse(this._requestTag);
return documentReader._get(this._requestTag);
}

private async getQueryFn<
Expand All @@ -733,9 +733,9 @@ export class Transaction implements firestore.Transaction {
opts: Uint8Array | api.ITransactionOptions | Timestamp
): Promise<{
transaction?: Uint8Array;
result: Awaited<ReturnType<TQuery['_getResponse']>>['result'];
result: Awaited<ReturnType<TQuery['_get']>>['result'];
}> {
return query._getResponse(opts);
return query._get(opts);
}
}

Expand Down

0 comments on commit 633a8e5

Please sign in to comment.