Skip to content

Commit

Permalink
fix: Logic for retrying specifiied internal errors (#1822)
Browse files Browse the repository at this point in the history
Fixes #1808
  • Loading branch information
surbhigarg92 committed Mar 17, 2023
1 parent b914981 commit f915bd1
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
11 changes: 1 addition & 10 deletions src/partial-result-stream.ts
Expand Up @@ -23,6 +23,7 @@ import {common as p} from 'protobufjs';
import {Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import {grpc} from 'google-gax';
import {isRetryableInternalError} from './transaction-runner';

import {codec, JSONOptions, Json, Field, Value} from './codec';
import {google} from '../protos/protos';
Expand Down Expand Up @@ -552,13 +553,3 @@ export function partialResultStream(
function _hasResumeToken(chunk: google.spanner.v1.PartialResultSet): boolean {
return is.defined(chunk.resumeToken) && chunk.resumeToken.length > 0;
}

function isRetryableInternalError(err: grpc.ServiceError): boolean {
return (
err.code === grpc.status.INTERNAL &&
(err.message.includes(
'Received unexpected EOS on DATA frame from server'
) ||
err.message.includes('Received RST_STREAM'))
);
}
29 changes: 27 additions & 2 deletions src/transaction-runner.ts
Expand Up @@ -175,9 +175,14 @@ export abstract class Runner<T> {
Math.floor(Math.random() * 1000)
);
}

/** Returns whether the given error should cause a transaction retry. */
shouldRetry(err: grpc.ServiceError): boolean {
return RETRYABLE.includes(err.code!) || isSessionNotFoundError(err);
return (
RETRYABLE.includes(err.code!) ||
isSessionNotFoundError(err) ||
isRetryableInternalError(err)
);
}
/**
* Retrieves a transaction to run against.
Expand Down Expand Up @@ -233,7 +238,10 @@ export abstract class Runner<T> {
// Note that if the error is a 'Session not found' error, it will be
// thrown here. We do this to bubble this error up to the caller who is
// responsible for retrying the transaction on a different session.
if (!RETRYABLE.includes(lastError.code!)) {
if (
!RETRYABLE.includes(lastError.code!) &&
!isRetryableInternalError(lastError)
) {
throw lastError;
}

Expand Down Expand Up @@ -367,3 +375,20 @@ export class AsyncTransactionRunner<T> extends Runner<T> {
return this.runFn(transaction);
}
}

/**
* Checks whether the given error is a retryable internal error.
* @param error the error to check
* @return true if the error is a retryable internal error, and otherwise false.
*/
export function isRetryableInternalError(err: grpc.ServiceError): boolean {
return (
err.code === grpc.status.INTERNAL &&
(err.message.includes(
'Received unexpected EOS on DATA frame from server'
) ||
err.message.includes('RST_STREAM') ||
err.message.includes('HTTP/2 error code: INTERNAL_ERROR') ||
err.message.includes('Connection closed with unknown cause'))
);
}
28 changes: 28 additions & 0 deletions test/spanner.ts
Expand Up @@ -2937,6 +2937,34 @@ describe('Spanner with mock server', () => {
await database.close();
});

it('should retry on internal error', async () => {
let attempts = 0;
const database = newTestDatabase();

const [updated] = await database.runTransactionAsync(
(transaction): Promise<number[]> => {
transaction.begin();
return transaction.runUpdate(insertSql).then(updateCount => {
if (!attempts) {
spannerMock.setExecutionTime(
spannerMock.commit,
SimulatedExecutionTime.ofError({
code: grpc.status.INTERNAL,
message: 'Received RST_STREAM',
} as MockError)
);
}
attempts++;
return transaction.commit().then(() => updateCount);
});
}
);
assert.strictEqual(updated, 1);
assert.strictEqual(attempts, 2);

await database.close();
});

describe('batch-readonly-transaction', () => {
it('should use session from pool', async () => {
const database = newTestDatabase({min: 0, incStep: 1});
Expand Down

0 comments on commit f915bd1

Please sign in to comment.