Skip to content

Commit

Permalink
Close ResponseBodyEmitter in case of write errors
Browse files Browse the repository at this point in the history
Prior to this commit, the `ReactiveTypeHandler` would handle `Flux`-like
return types from controller methods and adapt them to SSE streams using
the `SseEmitter`/`ResponseBodyEmitter` APIs. In case an `IOException` is
thrown while writing to the HTTP response stream, the
`ReactiveTypeHandler` would rely on the Servlet container to call
`AsyncListener#onError` - this would be the signal for Spring MVC to
complete the async exchange. To prevent racing issues between this
signal and the actual handling of the exception, changes like  gh-20173
were applied. Since then, robust checks were added with gh-32340 in
`StandardServletAsyncWebRequest.LifecycleHttpServletResponse`.

With Jetty 12, `AsyncListener#onError` would not be called as the error
would happen while writing in blocking mode to the response (so, not
using the Servlet WriteListener contract). But still, such `IOException`
would still result in the closing of the HTTP connection. As of Jetty
12.0.4, this is no longer the case and the party managing the async
lifecycle is in charge of completing the exchange, as it should. This
means that the current behavior leaks HTTP connections for these cases
and causes memory issues.

This commit ensures that such exceptions happening during response
writes are caught and result in the completion of the `SSEEmitter` and
the closing of the exchange. Even if other Servlet containers still
propagate the error `AsyncListener#onError`, competing signals are still
managed with gh-32340.

Closes gh-32629
  • Loading branch information
bclozel committed May 13, 2024
1 parent 09b8fea commit c6b6ccd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
Expand Up @@ -337,6 +337,7 @@ public void run() {
logger.trace("Send for " + this.emitter + " failed: " + ex);
}
terminate();
this.emitter.completeWithError(ex);
return;
}
}
Expand Down
Expand Up @@ -79,16 +79,6 @@ public class ResponseBodyEmitter {
@Nullable
private Throwable failure;

/**
* After an I/O error, we don't call {@link #completeWithError} directly but
* wait for the Servlet container to call us via {@code AsyncListener#onError}
* on a container thread at which point we call completeWithError.
* This flag is used to ignore further calls to complete or completeWithError
* that may come for example from an application try-catch block on the
* thread of the I/O error.
*/
private boolean ioErrorOnSend;

private final DefaultCallback timeoutCallback = new DefaultCallback();

private final ErrorCallback errorCallback = new ErrorCallback();
Expand Down Expand Up @@ -198,7 +188,6 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
this.handler.send(object, mediaType);
}
catch (IOException ex) {
this.ioErrorOnSend = true;
throw ex;
}
catch (Throwable ex) {
Expand Down Expand Up @@ -234,7 +223,6 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
this.handler.send(items);
}
catch (IOException ex) {
this.ioErrorOnSend = true;
throw ex;
}
catch (Throwable ex) {
Expand All @@ -255,10 +243,6 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
* related events such as an error while {@link #send(Object) sending}.
*/
public synchronized void complete() {
// Ignore complete after IO failure on send
if (this.ioErrorOnSend) {
return;
}
this.complete = true;
if (this.handler != null) {
this.handler.complete();
Expand All @@ -277,10 +261,6 @@ public synchronized void complete() {
* {@link #send(Object) sending}.
*/
public synchronized void completeWithError(Throwable ex) {
// Ignore complete after IO failure on send
if (this.ioErrorOnSend) {
return;
}
this.complete = true;
this.failure = ex;
if (this.handler != null) {
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.springframework.web.servlet.mvc.method.annotation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -372,6 +373,24 @@ void writeText() throws Exception {
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
}

@Test
void failOnWriteShouldCompleteEmitter() throws Exception {

Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));

ErroringEmitterHandler emitterHandler = new ErroringEmitterHandler();
emitter.initialize(emitterHandler);

sink.tryEmitNext("The quick");
sink.tryEmitNext(" brown fox jumps over ");
sink.tryEmitNext("the lazy dog");
sink.tryEmitComplete();

assertThat(emitterHandler.getHandlingStatus()).isEqualTo(HandlingStatus.ERROR);
assertThat(emitterHandler.getFailure()).isInstanceOf(IOException.class);
}

@Test
void writeFluxOfString() throws Exception {

Expand Down Expand Up @@ -451,6 +470,10 @@ private static class EmitterHandler implements ResponseBodyEmitter.Handler {

private final List<Object> values = new ArrayList<>();

private HandlingStatus handlingStatus;

private Throwable failure;


public List<?> getValues() {
return this.values;
Expand All @@ -460,22 +483,33 @@ public String getValuesAsText() {
return this.values.stream().map(Object::toString).collect(Collectors.joining());
}

public HandlingStatus getHandlingStatus() {
return this.handlingStatus;
}

public Throwable getFailure() {
return this.failure;
}

@Override
public void send(Object data, MediaType mediaType) {
public void send(Object data, MediaType mediaType) throws IOException {
this.values.add(data);
}

@Override
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) {
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) throws IOException {
items.forEach(item -> this.values.add(item.getData()));
}

@Override
public void complete() {
this.handlingStatus = HandlingStatus.SUCCESS;
}

@Override
public void completeWithError(Throwable failure) {
this.handlingStatus = HandlingStatus.ERROR;
this.failure = failure;
}

@Override
Expand All @@ -491,6 +525,22 @@ public void onCompletion(Runnable callback) {
}
}

private enum HandlingStatus {
SUCCESS,ERROR
}

private static class ErroringEmitterHandler extends EmitterHandler {
@Override
public void send(Object data, MediaType mediaType) throws IOException {
throw new IOException();
}

@Override
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) throws IOException {
throw new IOException();
}
}

private static class Bar {

private final String value;
Expand Down
Expand Up @@ -148,23 +148,6 @@ void sendWithError() throws Exception {
verifyNoMoreInteractions(this.handler);
}

@Test // gh-30687
void completeIgnoredAfterIOException() throws Exception {
this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onError(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler);

willThrow(new IOException()).given(this.handler).send("foo", MediaType.TEXT_PLAIN);
assertThatIOException().isThrownBy(() -> this.emitter.send("foo", MediaType.TEXT_PLAIN));
verify(this.handler).send("foo", MediaType.TEXT_PLAIN);
verifyNoMoreInteractions(this.handler);

this.emitter.complete();
verifyNoMoreInteractions(this.handler);
}

@Test // gh-30687
void completeAfterNonIOException() throws Exception {
this.emitter.initialize(this.handler);
Expand Down

0 comments on commit c6b6ccd

Please sign in to comment.