Skip to content

Commit

Permalink
feat: BatchedRequestResolver works with NonEmptyArray (#2609)
Browse files Browse the repository at this point in the history
  • Loading branch information
patroza committed Apr 25, 2024
1 parent 7d9950e commit ac1898e
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 18 deletions.
8 changes: 8 additions & 0 deletions .changeset/gentle-dingos-fail.md
@@ -0,0 +1,8 @@
---
"effect": patch
"@effect/experimental": patch
"@effect/rpc": patch
"@effect/sql": patch
---

change: BatchedRequestResolver works with NonEmptyArray
5 changes: 3 additions & 2 deletions packages/effect/src/RequestResolver.ts
Expand Up @@ -2,6 +2,7 @@
* @since 2.0.0
*/

import type { NonEmptyArray } from "./Array.js"
import * as Context from "./Context.js"
import * as Effect from "./Effect.js"
import type * as Either from "./Either.js"
Expand Down Expand Up @@ -138,7 +139,7 @@ export const makeWithEntry: <A, R>(
* @category constructors
*/
export const makeBatched: <A extends Request.Request<any, any>, R>(
run: (requests: Array<A>) => Effect.Effect<void, never, R>
run: (requests: NonEmptyArray<A>) => Effect.Effect<void, never, R>
) => RequestResolver<A, R> = internal.makeBatched

/**
Expand Down Expand Up @@ -269,7 +270,7 @@ export const fromFunction: <A extends Request.Request<any>>(
* @category constructors
*/
export const fromFunctionBatched: <A extends Request.Request<any>>(
f: (chunk: Array<A>) => Iterable<Request.Request.Success<A>>
f: (chunk: NonEmptyArray<A>) => Iterable<Request.Request.Success<A>>
) => RequestResolver<A> = internal.fromFunctionBatched

/**
Expand Down
16 changes: 8 additions & 8 deletions packages/effect/src/internal/dataSource.ts
Expand Up @@ -24,21 +24,21 @@ export const makeWithEntry = <A, R>(

/** @internal */
export const makeBatched = <A extends Request.Request<any, any>, R>(
run: (requests: Array<A>) => Effect.Effect<void, never, R>
run: (requests: RA.NonEmptyArray<A>) => Effect.Effect<void, never, R>
): RequestResolver.RequestResolver<A, R> =>
new core.RequestResolverImpl<A, R>(
(requests) => {
if (requests.length > 1) {
return core.forEachSequentialDiscard(requests, (block) => {
const filtered = block.filter((_) => !_.state.completed).map((_) => _.request)
if (filtered.length === 0) {
if (!RA.isNonEmptyArray(filtered)) {
return core.void
}
return invokeWithInterrupt(run(filtered), block)
})
} else if (requests.length === 1) {
const filtered = requests[0].filter((_) => !_.state.completed).map((_) => _.request)
if (filtered.length === 0) {
if (!RA.isNonEmptyArray(filtered)) {
return core.void
}
return run(filtered)
Expand Down Expand Up @@ -210,7 +210,7 @@ export const eitherWith = dual<
export const fromFunction = <A extends Request.Request<any>>(
f: (request: A) => Request.Request.Success<A>
): RequestResolver.RequestResolver<A> =>
makeBatched((requests: Array<A>) =>
makeBatched((requests: RA.NonEmptyArray<A>) =>
core.forEachSequentialDiscard(
requests,
(request) => complete(request, core.exitSucceed(f(request)) as any)
Expand All @@ -219,9 +219,9 @@ export const fromFunction = <A extends Request.Request<any>>(

/** @internal */
export const fromFunctionBatched = <A extends Request.Request<any>>(
f: (chunk: Array<A>) => Iterable<Request.Request.Success<A>>
f: (chunk: RA.NonEmptyArray<A>) => Iterable<Request.Request.Success<A>>
): RequestResolver.RequestResolver<A> =>
makeBatched((as: Array<A>) =>
makeBatched((as: RA.NonEmptyArray<A>) =>
Effect.forEach(
f(as),
(res, i) => complete(as[i], core.exitSucceed(res) as any),
Expand All @@ -233,7 +233,7 @@ export const fromFunctionBatched = <A extends Request.Request<any>>(
export const fromEffect = <R, A extends Request.Request<any, any>>(
f: (a: A) => Effect.Effect<Request.Request.Success<A>, Request.Request.Error<A>, R>
): RequestResolver.RequestResolver<A, R> =>
makeBatched((requests: Array<A>) =>
makeBatched((requests: RA.NonEmptyArray<A>) =>
Effect.forEach(
requests,
(a) => Effect.flatMap(Effect.exit(f(a)), (e) => complete(a, e as any)),
Expand Down Expand Up @@ -261,7 +261,7 @@ export const fromEffectTagged = <
A,
ReturnType<Fns[keyof Fns]> extends Effect.Effect<infer _A, infer _E, infer R> ? R : never
> =>
makeBatched<A, any>((requests: Array<A>) => {
makeBatched<A, any>((requests: RA.NonEmptyArray<A>) => {
const grouped: Record<string, Array<A>> = {}
const tags: Array<A["_tag"]> = []
for (let i = 0, len = requests.length; i < len; i++) {
Expand Down
2 changes: 1 addition & 1 deletion packages/experimental/src/RequestResolver.ts
Expand Up @@ -178,7 +178,7 @@ export const persisted: {
result: Request.Request.Result<Req>
): Effect.Effect<void, never, any> => Effect.ignoreLogged(storage.set(request as any, result))

return RequestResolver.makeBatched((requests: Array<Req>) =>
return RequestResolver.makeBatched((requests: Arr.NonEmptyArray<Req>) =>
Effect.flatMap(partition(requests), ([remaining, results]) => {
const completeCached = Effect.forEach(
results,
Expand Down
3 changes: 2 additions & 1 deletion packages/experimental/test/RequestResolver.test.ts
Expand Up @@ -7,6 +7,7 @@ import { NodeContext } from "@effect/platform-node"
import { Schema } from "@effect/schema"
import * as it from "@effect/vitest"
import { Array, Effect, Exit, Layer, PrimaryKey, Request, RequestResolver, TestClock } from "effect"
import type { NonEmptyArray } from "effect/Array"
import { assert, describe } from "vitest"

class User extends Schema.Class<User>("User")({
Expand Down Expand Up @@ -42,7 +43,7 @@ describe("RequestResolver", () => {
it.effect(storeId, () =>
Effect.gen(function*(_) {
let count = 0
const baseResolver = RequestResolver.makeBatched((reqs: Array<MyRequest | TTLRequest>) => {
const baseResolver = RequestResolver.makeBatched((reqs: NonEmptyArray<MyRequest | TTLRequest>) => {
count += reqs.length
return Effect.forEach(reqs, (req) => {
if (req.id === -1) return Request.fail(req, "not found")
Expand Down
2 changes: 1 addition & 1 deletion packages/rpc/src/Resolver.ts
Expand Up @@ -31,7 +31,7 @@ export const make = <HR, E>(
const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req)))
const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req))))

return RequestResolver.makeBatched((requests: Array<Rpc.Request<Schema.TaggedRequest.Any>>) => {
return RequestResolver.makeBatched((requests: Arr.NonEmptyArray<Rpc.Request<Schema.TaggedRequest.Any>>) => {
const [effectRequests, streamRequests] = Arr.partition(
requests,
(_): _ is Rpc.Request<Rpc.StreamRequest.Any> => StreamRequestTypeId in _.request
Expand Down
3 changes: 2 additions & 1 deletion packages/rpc/src/ResolverNoStream.ts
Expand Up @@ -3,6 +3,7 @@
*/
import * as Schema from "@effect/schema/Schema"
import * as Serializable from "@effect/schema/Serializable"
import type { NonEmptyArray } from "effect/Array"
import * as Channel from "effect/Channel"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
Expand All @@ -29,7 +30,7 @@ export const make = <HR, E>(
const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req)))
const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req))))

return RequestResolver.makeBatched((requests: Array<Rpc.Request<Schema.TaggedRequest.Any>>) =>
return RequestResolver.makeBatched((requests: NonEmptyArray<Rpc.Request<Schema.TaggedRequest.Any>>) =>
pipe(
Effect.forEach(requests, (_) =>
Effect.map(
Expand Down
9 changes: 5 additions & 4 deletions packages/sql/src/Resolver.ts
Expand Up @@ -3,6 +3,7 @@
*/
import type { ParseError } from "@effect/schema/ParseResult"
import * as Schema from "@effect/schema/Schema"
import type { NonEmptyArray } from "effect/Array"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Equal from "effect/Equal"
Expand Down Expand Up @@ -218,7 +219,7 @@ export const ordered = <T extends string, I, II, RI, A, IA, _, E, RA = never, R
> => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
(requests: Array<SqlRequest<T, A, E | ResultLengthMismatch>>) => {
(requests: NonEmptyArray<SqlRequest<T, A, E | ResultLengthMismatch>>) => {
const [inputs, spanLinks] = partitionRequests(requests)
return options.execute(inputs as any).pipe(
Effect.filterOrFail(
Expand Down Expand Up @@ -286,7 +287,7 @@ export const grouped = <T extends string, I, II, K, RI, A, IA, Row, E, RA = neve
): Effect.Effect<SqlResolver<T, I, Array<A>, E, RI>, never, RA | R> => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
(requests: Array<SqlRequest<T, Array<A>, E>>) => {
(requests: NonEmptyArray<SqlRequest<T, Array<A>, E>>) => {
const [inputs, spanLinks] = partitionRequests(requests)
const resultMap = new Map<K, Array<A>>()
return options.execute(inputs as any).pipe(
Expand Down Expand Up @@ -360,7 +361,7 @@ export const findById = <T extends string, I, II, RI, A, IA, Row, E, RA = never,
): Effect.Effect<SqlResolver<T, I, Option.Option<A>, E, RI>, never, RA | R> => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
(requests: Array<SqlRequest<T, Option.Option<A>, E>>) => {
(requests: NonEmptyArray<SqlRequest<T, Option.Option<A>, E>>) => {
const [inputs, spanLinks, idMap] = partitionRequestsById<II>()(requests)
return options.execute(inputs as any).pipe(
Effect.bindTo("rawResults"),
Expand Down Expand Up @@ -425,7 +426,7 @@ const void_ = <T extends string, I, II, RI, E, R = never>(
}
): Effect.Effect<SqlResolver<T, I, void, E, RI>, never, R> => {
const resolver = RequestResolver.makeBatched(
(requests: Array<SqlRequest<T, void, E>>) => {
(requests: NonEmptyArray<SqlRequest<T, void, E>>) => {
const [inputs, spanLinks] = partitionRequests(requests)
return options.execute(inputs as any).pipe(
Effect.andThen(
Expand Down

0 comments on commit ac1898e

Please sign in to comment.