Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support decoding empty DataBuffers for Decoders that support it #29903

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == byte[].class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public byte[] decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<DataBuffer> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public interface Decoder<T> {
*/
boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType);

/**
* Whether the decoder supports decoding an Object of its target type from an
* empty message. When it is true, the decoder will always return a non-null
* value from its {@code decode} method when an empty message is decoded.
* @return {@code true} if supported, {@code false} otherwise
* @since 6.0.5
*/
default boolean canDecodeEmptyMessage() {
return false;
}

/**
* Decode a {@link DataBuffer} input stream into a Flux of {@code T}.
* @param inputStream the {@code DataBuffer} input stream to decode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<Resource> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == String.class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.util.MimeType;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.springframework.core.io.buffer.DataBufferUtils.release;

/**
* Abstract base class for {@link Decoder} unit tests. Subclasses need to implement
Expand Down Expand Up @@ -114,6 +115,7 @@ protected <T> void testDecodeAll(Publisher<DataBuffer> input, Class<? extends T>
* <li>{@link #testDecodeError(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeCancel(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeEmpty(ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeEmptyMessage(ResolvableType, MimeType, Map)}</li>
* </ul>
*
* @param input the input to be provided to the decoder
Expand All @@ -131,6 +133,7 @@ protected <T> void testDecodeAll(Publisher<DataBuffer> input, ResolvableType out
testDecodeError(input, outputType, mimeType, hints);
testDecodeCancel(input, outputType, mimeType, hints);
testDecodeEmpty(outputType, mimeType, hints);
testDecodeEmptyMessage(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -258,6 +261,25 @@ protected void testDecodeEmpty(ResolvableType outputType, @Nullable MimeType mim
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decode decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeEmptyMessage(ResolvableType outputType, MimeType mimeType, Map<String, Object> hints) {
if (!this.decoder.canDecodeEmptyMessage()) {
return;
}
DataBuffer buffer = this.bufferFactory.allocateBuffer(0);
Object result = this.decoder.decode(buffer, outputType, mimeType, hints);
releaseDataBufferIfIdentical(buffer, result);
Assert.notNull(result, "result expected to be non null");
Assert.isAssignable(outputType.toClass(), result.getClass(), "result not of specified type");
}

// Mono

/**
Expand Down Expand Up @@ -289,6 +311,7 @@ protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input,
* <li>{@link #testDecodeToMonoError(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoCancel(Publisher, ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoEmpty(ResolvableType, MimeType, Map)}</li>
* <li>{@link #testDecodeToMonoEmptyMessage(ResolvableType, MimeType, Map)}</li>
* </ul>
*
* @param input the input to be provided to the decoder
Expand All @@ -306,6 +329,7 @@ protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input, ResolvableTy
testDecodeToMonoError(input, outputType, mimeType, hints);
testDecodeToMonoCancel(input, outputType, mimeType, hints);
testDecodeToMonoEmpty(outputType, mimeType, hints);
testDecodeToMonoEmptyMessage(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -419,6 +443,32 @@ protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeTy
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decodeToMono decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeToMonoEmptyMessage(ResolvableType outputType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {

if (!this.decoder.canDecodeEmptyMessage()) {
return;
}

Flux<DataBuffer> source = Flux.range(0, 2)
.map(i -> this.bufferFactory.allocateBuffer(0));

Mono<?> result = this.decoder.decodeToMono(source, outputType, mimeType, hints)
.doOnNext(this::releaseIfDataBuffer);

StepVerifier.create(result)
.expectNextMatches(next -> outputType.toClass().isInstance(next))
.verifyComplete();
}

/**
* Creates a deferred {@link DataBuffer} containing the given bytes.
* @param bytes the bytes that are to be stored in the buffer
Expand All @@ -432,6 +482,27 @@ protected Mono<DataBuffer> dataBuffer(byte[] bytes) {
});
}

/**
* If {@code value} is referentially identical to {@code buffer}, release it.
* @param buffer the {@link DataBuffer} that is compared
* @param value the {@link Object} that is compared
*/
private void releaseDataBufferIfIdentical(DataBuffer buffer, Object value) {
if (buffer == value) {
release(buffer);
}
}

/**
* If {@code value} is a {@link DataBuffer}, release it.
* @param value the {@link Object} that is checked
*/
private void releaseIfDataBuffer(Object value) {
if (value instanceof DataBuffer) {
release((DataBuffer) value);
}
}

/**
* Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -245,7 +245,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -263,7 +263,10 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
message, parameter, "Cannot decode to [" + targetType + "]" + message));
}

private boolean nonEmptyDataBuffer(DataBuffer buffer) {
private boolean nonEmptyDataBuffer(DataBuffer buffer, Decoder<?> decoder) {
if (decoder.canDecodeEmptyMessage()) {
return true;
}
if (buffer.readableByteCount() > 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void echoChannel() {
@Test // gh-26344
public void echoChannelWithEmptyInput() {
Flux<String> result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result).expectNext(" echoed").verifyComplete();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

@Override
public boolean canDecodeEmptyMessage() {
return true;
}

@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public void exceedMaxSize() {
testDecode(input, Msg.class, step -> step.verifyError(DecodingException.class));
}

@Test
public void decodeEmpty() {
testDecodeEmptyMessage(ResolvableType.forClass(Msg.class), null, null);
}

private Mono<DataBuffer> dataBuffer(Msg msg) {
return Mono.fromCallable(() -> {
byte[] bytes = msg.toByteArray();
Expand Down