Skip to content

Commit

Permalink
added Stream.fromEventListener (#2513)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
jessekelly881 and tim-smart committed Apr 30, 2024
1 parent 5cd63d2 commit a1c7ab8
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changeset/green-lions-jog.md
@@ -0,0 +1,6 @@
---
"effect": minor
"@effect/platform-browser": patch
---

added Stream.fromEventListener, and BrowserStream.{fromEventListenerWindow, fromEventListenerDocument} for constructing a stream from addEventListener
10 changes: 10 additions & 0 deletions packages/effect/src/Stream.ts
Expand Up @@ -4558,3 +4558,13 @@ export const decodeText: {
* @category encoding
*/
export const encodeText: <E, R>(self: Stream<string, E, R>) => Stream<Uint8Array, E, R> = internal.encodeText

/**
* Creates a `Stream` using addEventListener.
* @since 3.1.0
*/
export const fromEventListener: <A = Event>(
target: EventTarget,
type: string,
options?: boolean | Omit<AddEventListenerOptions, "signal">
) => Stream<A> = internal.fromEventListener
25 changes: 25 additions & 0 deletions packages/effect/src/internal/stream.ts
Expand Up @@ -8048,3 +8048,28 @@ export const encodeText = <E, R>(self: Stream.Stream<string, E, R>): Stream.Stre
const encoder = new TextEncoder()
return map(self, (s) => encoder.encode(s))
})

/** @internal */
export const fromEventListener = <A = Event>(
target: EventTarget,
type: string,
options?: boolean | Omit<AddEventListenerOptions, "signal">
): Stream.Stream<A> =>
_async<A>((emit) => {
let batch: Array<A> = []
let taskRunning = false
function cb(e: A) {
batch.push(e)
if (!taskRunning) {
taskRunning = true
queueMicrotask(() => {
const events = batch
batch = []
taskRunning = false
emit.chunk(Chunk.unsafeFromArray(events))
})
}
}
target.addEventListener(type, cb as any, options)
return Effect.sync(() => target.removeEventListener(type, cb as any, options))
})
23 changes: 23 additions & 0 deletions packages/effect/test/Stream/fromEventListener.test.ts
@@ -0,0 +1,23 @@
import { Effect, Stream } from "effect"
import * as it from "effect-test/utils/extend"
import { describe } from "vitest"

class TestTarget extends EventTarget {
emit() {
this.dispatchEvent(new Event("test-event"))
}
}

describe("Stream.fromEventListener", () => {
it.effect("emitted count", (ctx) =>
Effect.gen(function*(_) {
const target = new TestTarget()

const count = yield* _(
Stream.fromEventListener(target, "test-event"),
Stream.interruptWhen(Effect.sync(() => target.emit()).pipe(Effect.repeatN(2))),
Stream.runCount
)
ctx.expect(count).toEqual(3)
}))
})
24 changes: 24 additions & 0 deletions packages/platform-browser/src/BrowserStream.ts
@@ -0,0 +1,24 @@
/**
* @since 1.0.0
*/

import type * as Stream from "effect/Stream"
import * as internal from "./internal/stream.js"

/**
* Creates a `Stream` from window.addEventListener.
* @since 1.0.0
*/
export const fromEventListenerWindow: <K extends keyof WindowEventMap>(
type: K,
options?: boolean | Omit<AddEventListenerOptions, "signal">
) => Stream.Stream<WindowEventMap[K]> = internal.fromEventListenerWindow

/**
* Creates a `Stream` from document.addEventListener.
* @since 1.0.0
*/
export const fromEventListenerDocument: <K extends keyof DocumentEventMap>(
type: K,
options?: boolean | Omit<AddEventListenerOptions, "signal">
) => Stream.Stream<DocumentEventMap[K]> = internal.fromEventListenerDocument
5 changes: 5 additions & 0 deletions packages/platform-browser/src/index.ts
Expand Up @@ -13,6 +13,11 @@ export * as BrowserKeyValueStore from "./BrowserKeyValueStore.js"
*/
export * as BrowserRuntime from "./BrowserRuntime.js"

/**
* @since 1.0.0
*/
export * as BrowserStream from "./BrowserStream.js"

/**
* @since 1.0.0
*/
Expand Down
17 changes: 17 additions & 0 deletions packages/platform-browser/src/internal/stream.ts
@@ -0,0 +1,17 @@
/**
* @since 1.0.0
*/

import * as Stream from "effect/Stream"

/** @internal */
export const fromEventListenerWindow = <K extends keyof WindowEventMap>(
type: K,
options?: boolean | Omit<AddEventListenerOptions, "signal">
) => Stream.fromEventListener<WindowEventMap[K]>(window, type, options)

/** @internal */
export const fromEventListenerDocument = <K extends keyof DocumentEventMap>(
type: K,
options?: boolean | Omit<AddEventListenerOptions, "signal">
) => Stream.fromEventListener<DocumentEventMap[K]>(document, type, options)

0 comments on commit a1c7ab8

Please sign in to comment.