Skip to content

Commit

Permalink
feat(useAsyncQueue): add options.signal parameter (#3033)
Browse files Browse the repository at this point in the history
  • Loading branch information
cross-origin committed May 15, 2023
1 parent 8855f24 commit 7da7c4e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
32 changes: 32 additions & 0 deletions packages/core/useAsyncQueue/index.test.ts
Expand Up @@ -104,4 +104,36 @@ describe('useAsyncQueue', () => {
expect(finalTaskSpy).toHaveBeenCalledOnce()
})
})

it('should cancel the tasks', async () => {
const controller = new AbortController()
const { activeIndex, result } = useAsyncQueue([p1], {
signal: controller.signal,
})
controller.abort()
await retry(() => {
expect(activeIndex.value).toBe(0)
expect(result).toHaveLength(1)
expect(result[activeIndex.value]).toMatchInlineSnapshot(`
{
"data": [Error: aborted],
"state": "aborted",
}
`)
})
})

it('should abort the tasks when AbortSignal.abort is triggered', async () => {
const controller = new AbortController()
const abort = () => controller.abort()
const finalTaskSpy = vi.fn(() => Promise.resolve('data'))
const { activeIndex, result } = useAsyncQueue([p1, abort, finalTaskSpy], {
signal: controller.signal,
})
await retry(() => {
expect(activeIndex.value).toBe(2)
expect(result).toHaveLength(3)
expect(finalTaskSpy).not.toHaveBeenCalled()
})
})
})
81 changes: 62 additions & 19 deletions packages/core/useAsyncQueue/index.ts
@@ -1,11 +1,11 @@
import { noop } from '@vueuse/shared'
import type { Ref } from 'vue-demi'
import { reactive, ref } from 'vue-demi'
import { noop } from '@vueuse/shared'

export type UseAsyncQueueTask<T> = (...args: any[]) => T | Promise<T>

export interface UseAsyncQueueResult<T> {
state: 'pending' | 'fulfilled' | 'rejected'
state: 'aborted' | 'fulfilled' | 'pending' | 'rejected'
data: T | null
}

Expand Down Expand Up @@ -33,6 +33,11 @@ export interface UseAsyncQueueOptions {
*
*/
onFinished?: () => void

/**
* A AbortSignal that can be used to abort the task.
*/
signal?: AbortSignal
}

/**
Expand All @@ -53,14 +58,21 @@ export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options:
interrupt = true,
onError = noop,
onFinished = noop,
signal,
} = options

const promiseState: Record<UseAsyncQueueResult<T>['state'], UseAsyncQueueResult<T>['state']> = {
const promiseState: Record<
UseAsyncQueueResult<T>['state'],
UseAsyncQueueResult<T>['state']
> = {
aborted: 'aborted',
fulfilled: 'fulfilled',
pending: 'pending',
rejected: 'rejected',
fulfilled: 'fulfilled',
}

const initialResult = Array.from(new Array(tasks.length), () => ({ state: promiseState.pending, data: null }))

const result = reactive(initialResult) as UseAsyncQueueResult<T>[]

const activeIndex = ref<number>(-1)
Expand All @@ -80,26 +92,57 @@ export function useAsyncQueue<T = any>(tasks: UseAsyncQueueTask<any>[], options:
}

tasks.reduce((prev, curr) => {
return prev.then((prevRes) => {
if (result[activeIndex.value]?.state === promiseState.rejected && interrupt) {
onFinished()
return
}

return curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
return prev
.then((prevRes) => {
if (signal?.aborted) {
updateResult(promiseState.aborted, new Error('aborted'))
return
}

if (
result[activeIndex.value]?.state === promiseState.rejected
&& interrupt
) {
onFinished()
return
}

const done = curr(prevRes).then((currentRes: any) => {
updateResult(promiseState.fulfilled, currentRes)
activeIndex.value === tasks.length - 1 && onFinished()
return currentRes
})

if (!signal)
return done

return Promise.race([done, whenAborted(signal)])
})
.catch((e) => {
if (signal?.aborted) {
updateResult(promiseState.aborted, e)
return e
}

updateResult(promiseState.rejected, e)
onError()
return e
})
}).catch((e) => {
updateResult(promiseState.rejected, e)
onError()
return e
})
}, Promise.resolve())

return {
activeIndex,
result,
}
}

function whenAborted(signal: AbortSignal): Promise<never> {
return new Promise((resolve, reject) => {
const error = new Error('aborted')

if (signal.aborted)
reject(error)
else
signal.addEventListener('abort', () => reject(error), { once: true })
})
}

0 comments on commit 7da7c4e

Please sign in to comment.