Skip to content

Commit

Permalink
Merge branch '6.1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
bclozel committed May 13, 2024
2 parents 3547491 + c6b6ccd commit cd56b47
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
Expand Up @@ -337,6 +337,7 @@ public void run() {
logger.trace("Send for " + this.emitter + " failed: " + ex);
}
terminate();
this.emitter.completeWithError(ex);
return;
}
}
Expand Down
Expand Up @@ -79,16 +79,6 @@ public class ResponseBodyEmitter {
@Nullable
private Throwable failure;

/**
* After an I/O error, we don't call {@link #completeWithError} directly but
* wait for the Servlet container to call us via {@code AsyncListener#onError}
* on a container thread at which point we call completeWithError.
* This flag is used to ignore further calls to complete or completeWithError
* that may come for example from an application try-catch block on the
* thread of the I/O error.
*/
private boolean ioErrorOnSend;

private final DefaultCallback timeoutCallback = new DefaultCallback();

private final ErrorCallback errorCallback = new ErrorCallback();
Expand Down Expand Up @@ -198,7 +188,6 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
this.handler.send(object, mediaType);
}
catch (IOException ex) {
this.ioErrorOnSend = true;
throw ex;
}
catch (Throwable ex) {
Expand Down Expand Up @@ -234,7 +223,6 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
this.handler.send(items);
}
catch (IOException ex) {
this.ioErrorOnSend = true;
throw ex;
}
catch (Throwable ex) {
Expand All @@ -255,10 +243,6 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
* related events such as an error while {@link #send(Object) sending}.
*/
public synchronized void complete() {
// Ignore complete after IO failure on send
if (this.ioErrorOnSend) {
return;
}
this.complete = true;
if (this.handler != null) {
this.handler.complete();
Expand All @@ -277,10 +261,6 @@ public synchronized void complete() {
* {@link #send(Object) sending}.
*/
public synchronized void completeWithError(Throwable ex) {
// Ignore complete after IO failure on send
if (this.ioErrorOnSend) {
return;
}
this.complete = true;
this.failure = ex;
if (this.handler != null) {
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.springframework.web.servlet.mvc.method.annotation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -372,6 +373,24 @@ void writeText() throws Exception {
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
}

@Test
void failOnWriteShouldCompleteEmitter() throws Exception {

Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));

ErroringEmitterHandler emitterHandler = new ErroringEmitterHandler();
emitter.initialize(emitterHandler);

sink.tryEmitNext("The quick");
sink.tryEmitNext(" brown fox jumps over ");
sink.tryEmitNext("the lazy dog");
sink.tryEmitComplete();

assertThat(emitterHandler.getHandlingStatus()).isEqualTo(HandlingStatus.ERROR);
assertThat(emitterHandler.getFailure()).isInstanceOf(IOException.class);
}

@Test
void writeFluxOfString() throws Exception {

Expand Down Expand Up @@ -451,6 +470,10 @@ private static class EmitterHandler implements ResponseBodyEmitter.Handler {

private final List<Object> values = new ArrayList<>();

private HandlingStatus handlingStatus;

private Throwable failure;


public List<?> getValues() {
return this.values;
Expand All @@ -460,22 +483,33 @@ public String getValuesAsText() {
return this.values.stream().map(Object::toString).collect(Collectors.joining());
}

public HandlingStatus getHandlingStatus() {
return this.handlingStatus;
}

public Throwable getFailure() {
return this.failure;
}

@Override
public void send(Object data, MediaType mediaType) {
public void send(Object data, MediaType mediaType) throws IOException {
this.values.add(data);
}

@Override
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) {
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) throws IOException {
items.forEach(item -> this.values.add(item.getData()));
}

@Override
public void complete() {
this.handlingStatus = HandlingStatus.SUCCESS;
}

@Override
public void completeWithError(Throwable failure) {
this.handlingStatus = HandlingStatus.ERROR;
this.failure = failure;
}

@Override
Expand All @@ -491,6 +525,22 @@ public void onCompletion(Runnable callback) {
}
}

private enum HandlingStatus {
SUCCESS,ERROR
}

private static class ErroringEmitterHandler extends EmitterHandler {
@Override
public void send(Object data, MediaType mediaType) throws IOException {
throw new IOException();
}

@Override
public void send(Set<ResponseBodyEmitter.DataWithMediaType> items) throws IOException {
throw new IOException();
}
}

private static class Bar {

private final String value;
Expand Down
Expand Up @@ -148,23 +148,6 @@ void sendWithError() throws Exception {
verifyNoMoreInteractions(this.handler);
}

@Test // gh-30687
void completeIgnoredAfterIOException() throws Exception {
this.emitter.initialize(this.handler);
verify(this.handler).onTimeout(any());
verify(this.handler).onError(any());
verify(this.handler).onCompletion(any());
verifyNoMoreInteractions(this.handler);

willThrow(new IOException()).given(this.handler).send("foo", MediaType.TEXT_PLAIN);
assertThatIOException().isThrownBy(() -> this.emitter.send("foo", MediaType.TEXT_PLAIN));
verify(this.handler).send("foo", MediaType.TEXT_PLAIN);
verifyNoMoreInteractions(this.handler);

this.emitter.complete();
verifyNoMoreInteractions(this.handler);
}

@Test // gh-30687
void completeAfterNonIOException() throws Exception {
this.emitter.initialize(this.handler);
Expand Down

0 comments on commit cd56b47

Please sign in to comment.