Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support decoding empty DataBuffers for Decoders that support it #29903

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Expand Up @@ -44,6 +44,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == byte[].class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public byte[] decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -47,6 +47,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Flux<DataBuffer> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -53,6 +53,15 @@ public interface Decoder<T> {
*/
boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType);

/**
* Whether the decoder supports decoding messages from empty data buffers.
* @return {@code true} if supported, {@code false} otherwise
* @since 6.0.5
*/
default boolean canDecodeEmptyDataBuffer() {
return false;
}

/**
* Decode a {@link DataBuffer} input stream into a Flux of {@code T}.
* @param inputStream the {@code DataBuffer} input stream to decode
Expand Down
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -48,6 +48,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -56,6 +56,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Flux<Resource> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -105,6 +105,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return (elementType.resolve() == String.class && super.canDecode(elementType, mimeType));
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.springframework.util.MimeType;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.springframework.core.io.buffer.DataBufferUtils.release;

/**
* Abstract base class for {@link Decoder} unit tests. Subclasses need to implement
Expand Down Expand Up @@ -131,6 +132,7 @@ protected <T> void testDecodeAll(Publisher<DataBuffer> input, ResolvableType out
testDecodeError(input, outputType, mimeType, hints);
testDecodeCancel(input, outputType, mimeType, hints);
testDecodeEmpty(outputType, mimeType, hints);
testDecodeEmptyBuffer(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -258,6 +260,25 @@ protected void testDecodeEmpty(ResolvableType outputType, @Nullable MimeType mim
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decode decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeEmptyBuffer(ResolvableType outputType, MimeType mimeType, Map<String, Object> hints) {
if (!this.decoder.canDecodeEmptyDataBuffer()) {
return;
}
DataBuffer buffer = this.bufferFactory.allocateBuffer(0);
Object result = this.decoder.decode(buffer, outputType, mimeType, hints);
releaseBufferIfIdentical(buffer, result);
Assert.notNull(result, "result expected to be non null");
Assert.isAssignable(outputType.toClass(), result.getClass(), "result not of specified type");
}

// Mono

/**
Expand Down Expand Up @@ -306,6 +327,7 @@ protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input, ResolvableTy
testDecodeToMonoError(input, outputType, mimeType, hints);
testDecodeToMonoCancel(input, outputType, mimeType, hints);
testDecodeToMonoEmpty(outputType, mimeType, hints);
testDecodeToMonoEmptyBuffer(outputType, mimeType, hints);
}

/**
Expand Down Expand Up @@ -419,6 +441,29 @@ protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeTy
StepVerifier.create(result).verifyComplete();
}

/**
* Test a {@link Decoder#decodeToMono decode} scenario where the input stream is an empty buffer.
* The output is expected to be filled when the decoder supports it.
*
* @param outputType the desired output type
* @param mimeType the mime type to use for decoding. May be {@code null}.
* @param hints the hints used for decoding. May be {@code null}.
*/
protected void testDecodeToMonoEmptyBuffer(ResolvableType outputType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {

if (!this.decoder.canDecodeEmptyDataBuffer()) {
return;
}

DataBuffer buffer = this.bufferFactory.allocateBuffer(0);
Mono<?> result = this.decoder.decodeToMono(Mono.just(buffer), outputType, mimeType, hints)
.doOnNext(value -> releaseBufferIfIdentical(buffer, value));
StepVerifier.create(result)
.expectNextMatches(next -> outputType.toClass().isInstance(next))
.verifyComplete();
}

/**
* Creates a deferred {@link DataBuffer} containing the given bytes.
* @param bytes the bytes that are to be stored in the buffer
Expand All @@ -432,6 +477,12 @@ protected Mono<DataBuffer> dataBuffer(byte[] bytes) {
});
}

private void releaseBufferIfIdentical(DataBuffer buffer, Object value) {
if (buffer == value) {
release(buffer);
}
}

/**
* Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError}
*/
Expand Down
Expand Up @@ -231,7 +231,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -245,7 +245,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = content.next()
.filter(this::nonEmptyDataBuffer)
.filter(dataBuffer -> nonEmptyDataBuffer(dataBuffer, decoder))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
Expand All @@ -263,7 +263,10 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
message, parameter, "Cannot decode to [" + targetType + "]" + message));
}

private boolean nonEmptyDataBuffer(DataBuffer buffer) {
private boolean nonEmptyDataBuffer(DataBuffer buffer, Decoder<?> decoder) {
if (decoder.canDecodeEmptyDataBuffer()) {
return true;
}
if (buffer.readableByteCount() > 0) {
return true;
}
Expand Down
Expand Up @@ -167,7 +167,7 @@ public void echoChannel() {
@Test // gh-26344
public void echoChannelWithEmptyInput() {
Flux<String> result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result).expectNext(" echoed").verifyComplete();
}

@Test
Expand Down
Expand Up @@ -125,6 +125,11 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

@Override
public boolean canDecodeEmptyDataBuffer() {
return true;
}

@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand Down
Expand Up @@ -221,6 +221,11 @@ public void exceedMaxSize() {
testDecode(input, Msg.class, step -> step.verifyError(DecodingException.class));
}

@Test
public void decodeEmpty() {
testDecodeEmptyBuffer(ResolvableType.forClass(Msg.class), null, null);
}

private Mono<DataBuffer> dataBuffer(Msg msg) {
return Mono.fromCallable(() -> {
byte[] bytes = msg.toByteArray();
Expand Down