Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using stream stream output, there is a problem of chunks being out of order #697

Closed
sennoy11012 opened this issue May 9, 2024 · 0 comments

Comments

@sennoy11012
Copy link

Bug Description

When we use streams to output agreed upon code, there is a problem of code fragment confusion.

Reason:
class OpenAiApi

public Flux<ChatCompletionChunk> chatCompletionStream(ChatCompletionRequest chatRequest) {
     Assert.notNull(chatRequest, "The request body can not be null.");
     Assert.isTrue(chatRequest.stream(), "Request must set the steam property to true.");
     AtomicBoolean isInsideTool = new AtomicBoolean(false);
     log.debug("openai.chat-request:{}", JsonUtils.toJson(chatRequest));
     return ((WebClient.RequestBodySpec) this.webClient.post().uri("/v1/chat/completions", new Object[0])).body(
                     Mono.just(chatRequest), ChatCompletionRequest.class).retrieve().bodyToFlux(String.class)
             .takeUntil(SSE_DONE_PREDICATE).filter(SSE_DONE_PREDICATE.negate()).map((content) -> {
                 return (ChatCompletionChunk) ModelOptionsUtils.jsonToObject(content, ChatCompletionChunk.class);
             })
             .map((chunk) -> {
                 if (this.chunkMerger.isStreamingToolFunctionCall(chunk)) {
                     isInsideTool.set(true);
                 }

                 return chunk;
             }).windowUntil((chunk) -> {
                 if (isInsideTool.get() && this.chunkMerger.isStreamingToolFunctionCallFinish(chunk)) {
                     isInsideTool.set(false);
                     return true;
                 } else {
                     return !isInsideTool.get();
                 }
             }).concatMapIterable((window) -> {
                 Mono<ChatCompletionChunk> monoChunk = window.reduce(
                         new ChatCompletionChunk((String) null, (List) null, (Long) null, (String) null,
                                 (String) null, (String) null), (previous, current) -> {
                             return this.chunkMerger.merge(previous, current);
                         });
                 return List.of(monoChunk);
             }).flatMap((mono) -> {
                 return mono;
             });

To convert data, the flatMap method uses parallel processing of chunks. When the speed is too fast, there may be a problem of chunk output being scrambled

environment
JDK17

Spring 3.2.0

Spring ai 1.0.0-SNAPSHOT

Service deployment in Singapore

Copy Steps

Calling chatCompletionStream in the OpenAiApi class to implement responsive data output

Expected behavior

No chunk disorder occurs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant