Skip to content

Commit e710b38

Browse files
authoredJul 18, 2024··
fix (ai/core): race condition in mergeStreams (#2325)
1 parent f1d746f commit e710b38

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed
 

‎.changeset/mean-radios-flash.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
fix (ai/core): race condition in mergeStreams

‎packages/core/core/util/merge-streams.test.ts

+40
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,43 @@ it('should return values from the 2nd stream until the 1st stream has values', a
8484
'2f',
8585
]);
8686
});
87+
88+
it('should not duplicate last value when parallel calls happen', async () => {
89+
let stream1Controller: ReadableStreamDefaultController<string> | undefined;
90+
const stream1 = new ReadableStream({
91+
start(controller) {
92+
stream1Controller = controller;
93+
},
94+
});
95+
96+
stream1Controller!.enqueue('1a');
97+
stream1Controller!.close();
98+
99+
let stream2Controller: ReadableStreamDefaultController<string> | undefined;
100+
const stream2 = new ReadableStream({
101+
start(controller) {
102+
stream2Controller = controller;
103+
},
104+
});
105+
106+
const mergedStream = mergeStreams(stream1, stream2);
107+
108+
const reader = mergedStream.getReader();
109+
110+
const resultsPromise = Promise.all([
111+
reader.read(),
112+
reader.read(),
113+
reader.read(),
114+
reader.read(),
115+
reader.read(),
116+
]);
117+
118+
stream2Controller!.enqueue('2a');
119+
stream2Controller!.enqueue('2b');
120+
stream2Controller!.enqueue('2c');
121+
stream2Controller!.close();
122+
123+
const values = (await resultsPromise).map(result => result.value);
124+
125+
expect(values).toEqual(['1a', '2a', '2b', '2c', undefined]);
126+
});

‎packages/core/core/util/merge-streams.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ export function mergeStreams<VALUE1, VALUE2>(
7575
try {
7676
// stream 1 is done, we can only read from stream 2:
7777
if (stream1Done) {
78-
readStream2(controller);
78+
await readStream2(controller);
7979
return;
8080
}
8181

8282
// stream 2 is done, we can only read from stream 1:
8383
if (stream2Done) {
84-
readStream1(controller);
84+
await readStream1(controller);
8585
return;
8686
}
8787

@@ -109,15 +109,15 @@ export function mergeStreams<VALUE1, VALUE2>(
109109
lastRead1 = undefined;
110110
if (result.done) {
111111
// stream 1 is done, we can only read from stream 2:
112-
readStream2(controller);
112+
await readStream2(controller);
113113
stream1Done = true;
114114
}
115115
} else {
116116
lastRead2 = undefined;
117117
// stream 2 is done, we can only read from stream 1:
118118
if (result.done) {
119119
stream2Done = true;
120-
readStream1(controller);
120+
await readStream1(controller);
121121
}
122122
}
123123
} catch (error) {

0 commit comments

Comments
 (0)
Please sign in to comment.