Skip to content

Commit

Permalink
Avoid polling endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jridgewell committed Jul 18, 2023
1 parent fab2656 commit d85c776
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 106 deletions.
14 changes: 5 additions & 9 deletions test/e2e/cancel-request/app/edge-route/route.ts
@@ -1,9 +1,10 @@
import { Streamable } from '../../streamable'
import { Deferred } from '../../sleep'

export const runtime = 'edge'

let streamable
let requestAborted = false
let requestAborted = new Deferred()

export async function GET(req: Request): Promise<Response> {
// Consume the entire request body.
Expand All @@ -13,18 +14,13 @@ export async function GET(req: Request): Promise<Response> {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
await Promise.all([requestAborted, streamable.streamCleanedUp])
return new Response(`${streamable.i}`)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
requestAborted.resolve()
}
return new Response(streamable.stream)
}
14 changes: 5 additions & 9 deletions test/e2e/cancel-request/app/node-route/route.ts
@@ -1,11 +1,12 @@
import { Streamable } from '../../streamable'
import { Deferred } from '../../sleep'

export const runtime = 'nodejs'
// Next thinks it can statically compile this route, which breaks the test.
export const dynamic = 'force-dynamic'

let streamable
let requestAborted = false
let requestAborted = new Deferred()

export async function GET(req: Request): Promise<Response> {
// Consume the entire request body.
Expand All @@ -15,18 +16,13 @@ export async function GET(req: Request): Promise<Response> {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
await Promise.all([requestAborted, streamable.streamCleanedUp])
return new Response(`${streamable.i}`)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
requestAborted.resolve()
}
return new Response(streamable.stream)
}
14 changes: 5 additions & 9 deletions test/e2e/cancel-request/middleware.ts
@@ -1,11 +1,12 @@
import { Streamable } from './streamable'
import { Deferred } from './sleep'

export const config = {
matcher: '/middleware',
}

let streamable
let requestAborted = false
let requestAborted = new Deferred()

export default async function handler(req: Request): Promise<Response> {
// Consume the entire request body.
Expand All @@ -15,18 +16,13 @@ export default async function handler(req: Request): Promise<Response> {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
await Promise.all([requestAborted, streamable.streamCleanedUp])
return new Response(`${streamable.i}`)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
requestAborted.resolve()
}
return new Response(streamable.stream)
}
14 changes: 5 additions & 9 deletions test/e2e/cancel-request/pages/api/edge-api.ts
@@ -1,11 +1,12 @@
import { Streamable } from '../../streamable'
import { Deferred } from '../../sleep'

export const config = {
runtime: 'edge',
}

let streamable
let requestAborted = false
let requestAborted = new Deferred()

export default async function handler(req: Request): Promise<Response> {
// Consume the entire request body.
Expand All @@ -15,18 +16,13 @@ export default async function handler(req: Request): Promise<Response> {
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (streamable) {
return new Response(
JSON.stringify({
requestAborted,
i: streamable.i,
streamCleanedUp: streamable.streamCleanedUp,
})
)
await Promise.all([requestAborted, streamable.streamCleanedUp])
return new Response(`${streamable.i}`)
}

streamable = Streamable()
req.signal.onabort = () => {
requestAborted = true
requestAborted.resolve()
}
return new Response(streamable.stream)
}
15 changes: 6 additions & 9 deletions test/e2e/cancel-request/pages/api/node-api.ts
@@ -1,13 +1,14 @@
import { IncomingMessage, ServerResponse } from 'http'
import { pipeline } from 'stream'
import { Readable } from '../../readable'
import { Deferred } from '../../sleep'

export const config = {
runtime: 'nodejs',
}

let readable
let requestAborted = false
let requestAborted = new Deferred()

export default function handler(
_req: IncomingMessage,
Expand All @@ -19,19 +20,15 @@ export default function handler(
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
if (readable) {
res.end(
JSON.stringify({
requestAborted,
i: readable.i,
streamCleanedUp: readable.streamCleanedUp,
})
)
Promise.all([requestAborted, readable.streamCleanedUp]).finally(() => {
res.end(`${readable.i}`)
})
return
}

readable = Readable()
res.on('close', () => {
requestAborted = true
requestAborted.resolve()
})
pipeline(readable.stream, res, () => {
res.end()
Expand Down
12 changes: 8 additions & 4 deletions test/e2e/cancel-request/readable.ts
@@ -1,20 +1,24 @@
import * as stream from 'stream'
import { sleep } from './sleep'
import { Deferred, sleep } from './sleep'

export function Readable() {
const encoder = new TextEncoder()
const clean = new Deferred()
const readable = {
i: 0,
streamCleanedUp: false,
streamCleanedUp: clean.promise,
stream: new stream.Readable({
async read() {
await sleep(100)
this.push(encoder.encode(String(readable.i++)))

if (readable.i >= 25) this.push(null)
if (readable.i >= 25) {
clean.reject()
this.push(null)
}
},
destroy() {
readable.streamCleanedUp = true
clean.resolve()
},
}),
}
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/cancel-request/sleep.ts
@@ -1,3 +1,16 @@
export function sleep(ms: number) {
return new Promise((res) => setTimeout(res, ms))
}

export class Deferred<T> {
declare promise: Promise<T>
declare resolve: (v?: T | PromiseLike<T>) => void
declare reject: (r?: any) => void

constructor() {
this.promise = new Promise((res, rej) => {
this.resolve = res
this.reject = rej
})
}
}
68 changes: 15 additions & 53 deletions test/e2e/cancel-request/stream-cancel.test.ts
Expand Up @@ -8,12 +8,6 @@ createNextDescribe(
files: __dirname,
},
({ next }) => {
type CancelState = {
requestAborted: boolean
streamCleanedUp: boolean
i: number
}

function prime(url: string) {
return new Promise<void>((resolve) => {
url = new URL(url, next.url).href
Expand All @@ -36,71 +30,39 @@ createNextDescribe(
})
}

// The disconnect from our prime request to the server isn't instant, and
// there's no good signal on the client end for when it happens. So we just
// fetch multiple times waiting for it to happen.
async function getTillCancelled(url: string) {
let json: CancelState
for (let i = 0; i < 500; i++) {
const res = await next.fetch(url)
json = (await res.json()) as CancelState
if (json.streamCleanedUp && json.requestAborted) {
break
}

await sleep(10)
}
return json!
}

it('Midddleware cancels inner ReadableStream', async () => {
await prime('/middleware')
const json = await getTillCancelled('/middleware')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
const res = await next.fetch('/middleware')
const i = +(await res.text())
expect(i).toBeWithin(0, 5)
})

it('App Route Handler Edge cancels inner ReadableStream', async () => {
await prime('/edge-route')
const json = await getTillCancelled('/edge-route')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
const res = await next.fetch('/edge-route')
const i = +(await res.text())
expect(i).toBeWithin(0, 5)
})

it('App Route Handler NodeJS cancels inner ReadableStream', async () => {
await prime('/node-route')
const json = await getTillCancelled('/node-route')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
const res = await next.fetch('/node-route')
const i = +(await res.text())
expect(i).toBeWithin(0, 5)
})

it('Pages Api Route Edge cancels inner ReadableStream', async () => {
await prime('/api/edge-api')
const json = await getTillCancelled('/api/edge-api')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
const res = await next.fetch('/api/edge-api')
const i = +(await res.text())
expect(i).toBeWithin(0, 5)
})

it('Pages Api Route NodeJS cancels inner ReadableStream', async () => {
await prime('/api/node-api')
const json = await getTillCancelled('/api/node-api')
expect(json).toMatchObject({
requestAborted: true,
streamCleanedUp: true,
i: (expect as any).toBeWithin(0, 5),
})
const res = await next.fetch('/api/node-api')
const i = +(await res.text())
expect(i).toBeWithin(0, 5)
})
}
)
12 changes: 8 additions & 4 deletions test/e2e/cancel-request/streamable.ts
@@ -1,19 +1,23 @@
import { sleep } from './sleep'
import { Deferred, sleep } from './sleep'

export function Streamable() {
const encoder = new TextEncoder()
const clean = new Deferred()
const streamable = {
i: 0,
streamCleanedUp: false,
streamCleanedUp: clean.promise,
stream: new ReadableStream({
async pull(controller) {
await sleep(100)
controller.enqueue(encoder.encode(String(streamable.i++)))

if (streamable.i >= 25) controller.close()
if (streamable.i >= 25) {
clean.reject()
controller.close()
}
},
cancel() {
streamable.streamCleanedUp = true
clean.resolve()
},
}),
}
Expand Down

0 comments on commit d85c776

Please sign in to comment.