Skip to content

Commit

Permalink
Rewrite waitForBatch to reject early on non-batch errors
Browse files Browse the repository at this point in the history
  • Loading branch information
SevInf committed Nov 14, 2022
1 parent 29b6d5b commit 827d37a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 27 deletions.
10 changes: 4 additions & 6 deletions packages/client/src/runtime/getPrismaClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ import { MetricsClient } from './core/metrics/MetricsClient'
import { applyModelsAndClientExtensions } from './core/model/applyModelsAndClientExtensions'
import { UserArgs } from './core/model/UserArgs'
import { createPrismaPromise } from './core/request/createPrismaPromise'
import type {
InteractiveTransactionOptions,
PrismaPromise,
PrismaPromiseTransaction,
} from './core/request/PrismaPromise'
import { InteractiveTransactionOptions, PrismaPromise, PrismaPromiseTransaction } from './core/request/PrismaPromise'
import { getLockCountPromise } from './core/transaction/utils/createLockCountPromise'
import { BaseDMMFHelper, DMMFHelper } from './dmmf'
import type { DMMF } from './dmmf-types'
Expand Down Expand Up @@ -955,7 +951,9 @@ new PrismaClient({
)
}

return request.requestTransaction?.({ id: txId, index, isolationLevel: options?.isolationLevel }, lock)
return (
request.requestTransaction?.({ id: txId, index, isolationLevel: options?.isolationLevel }, lock) ?? request
)
})

return waitForBatch(requests)
Expand Down
68 changes: 68 additions & 0 deletions packages/client/src/runtime/utils/waitForBatch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { PrismaClientKnownRequestError } from '@prisma/engine-core'

import { waitForBatch } from './waitForBatch'

test('resolves when all promises succesfully resolve', async () => {
const result = await waitForBatch([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)])

expect(result).toEqual([1, 2, 3])
})

test('returns results in order even if they resolve out of order', async () => {
const result = await waitForBatch([
Promise.resolve(1),
new Promise((resolve) => setTimeout(() => resolve(2), 200)),
Promise.resolve(3),
])

expect(result).toEqual([1, 2, 3])
})

test('rejects with error, which batchRequestIdx matches its position in batch', async () => {
const err1 = new PrismaClientKnownRequestError('one', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 2 })
const err2 = new PrismaClientKnownRequestError('two', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 1 })
const err3 = new PrismaClientKnownRequestError('three', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 0 })
const result = waitForBatch([Promise.reject(err1), Promise.reject(err2), Promise.reject(err3)])

await expect(result).rejects.toBe(err2)
})

test('rejects with error, which batchRequestIdx matches its position in batch, even if it rejects later', async () => {
const err1 = new PrismaClientKnownRequestError('one', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 2 })
const err2 = new PrismaClientKnownRequestError('two', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 1 })
const err3 = new PrismaClientKnownRequestError('three', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 0 })
const result = waitForBatch([
Promise.reject(err1),
new Promise((_, reject) => setTimeout(() => reject(err2), 200)),
Promise.reject(err3),
])

await expect(result).rejects.toBe(err2)
})

test('rejects with the first error if no batchRequestIdxes match position', async () => {
const err1 = new PrismaClientKnownRequestError('one', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 1 })
const err2 = new PrismaClientKnownRequestError('two', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 2 })
const err3 = new PrismaClientKnownRequestError('three', { code: 'P1', clientVersion: '0.0.0', batchRequestIdx: 0 })
const result = waitForBatch([
Promise.reject(err1),
new Promise((_, reject) => setTimeout(() => reject(err2), 200)),
Promise.reject(err3),
])

await expect(result).rejects.toBe(err1)
})

test('rejects with error if one of the promises rejects with non-batch error', async () => {
const error = new Error('nope')
const result = waitForBatch([Promise.resolve(1), Promise.reject(error), Promise.resolve(3)])

await expect(result).rejects.toBe(error)
})

test('rejects with error if one of the promises rejects with non-batch error, even if other promsies never resolve', async () => {
const error = new Error('nope')
const result = waitForBatch([new Promise(() => {}), Promise.reject(error)])

await expect(result).rejects.toBe(error)
})
78 changes: 57 additions & 21 deletions packages/client/src/runtime/utils/waitForBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,69 @@ import { hasBatchIndex } from '@prisma/engine-core'
* Waits for result of batch $transaction and picks the best possible error to report if any
* of the request fails. Best error is determined as follows:
*
* - if engine have reported and index of failed request in the batch, the best error is the one
* who's index matches that
* - otherwise, first error is used (like Promise.all)
* - if engine have reported an error without batch request index, then the batch is immediately rejected
* with this error without waiting for other promises
* - if engine have reported and index of failed request in the batch and that index matches the position of the
* particular request in the batch, batch is rejected with that error
* - if batch request index is reported and it does not match current request position, wait for other requests. If no indices
* match request positions, reject with the earliest error in the batch
*
* @param promises
* @returns
*/
export async function waitForBatch<T extends unknown[]>(promises: T): Promise<{ [K in keyof T]: Awaited<T[K]> }> {
const results = await Promise.allSettled(promises)
export function waitForBatch<T extends PromiseLike<unknown>[]>(
promises: T,
): Promise<{ [K in keyof T]: Awaited<T[K]> }> {
return new Promise((resolve, reject) => {
const successfulResults = new Array(promises.length) as { [K in keyof T]: Awaited<T[K]> }
let bestError: unknown = null
let done = false
let settledPromisesCount = 0

let bestError: unknown = null
const successfulResults = [] as { [K in keyof T]: Awaited<T[K]> }
for (const [i, result] of results.entries()) {
if (result.status === 'rejected') {
const reason = result.reason
if (hasBatchIndex(reason) && reason.batchRequestIdx === i) {
bestError = reason
} else if (!bestError) {
bestError = reason
const settleOnePromise = () => {
if (done) {
return
}
settledPromisesCount++
if (settledPromisesCount === promises.length) {
done = true
if (bestError) {
reject(bestError)
} else {
resolve(successfulResults)
}
}
}

const immediatelyReject = (error: unknown) => {
if (!done) {
done = true
reject(error)
}
} else {
successfulResults.push(result.value)
}
}

if (bestError) {
throw bestError
}
return successfulResults
for (let i = 0; i < promises.length; i++) {
promises[i].then(
(result) => {
successfulResults[i] = result
settleOnePromise()
},
(error) => {
if (!hasBatchIndex(error)) {
immediatelyReject(error)
return
}

if (error.batchRequestIdx === i) {
immediatelyReject(error)
} else {
if (!bestError) {
bestError = error
}
settleOnePromise()
}
},
)
}
})
}

0 comments on commit 827d37a

Please sign in to comment.