Skip to content

Commit

Permalink
BufferFlux and BufferMono do not depend on netty-codec-http anymore (#…
Browse files Browse the repository at this point in the history
…2517)

Add API for providing a function for extracting a Buffer from an Object

Related to #1953
  • Loading branch information
violetagg committed Sep 29, 2022
1 parent 5631c23 commit 7f6a52e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
22 changes: 13 additions & 9 deletions reactor-netty5-core/src/main/java/reactor/netty5/BufferFlux.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.netty5.buffer.CompositeBuffer;
import io.netty5.util.Send;
import io.netty5.channel.socket.DatagramPacket;
import io.netty5.handler.codec.http.HttpContent;
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
Expand Down Expand Up @@ -68,6 +66,18 @@ public static BufferFlux fromInbound(Publisher<?> source) {
* @return a {@link BufferFlux}
*/
public static BufferFlux fromInbound(Publisher<?> source, BufferAllocator allocator) {
return fromInbound(source, allocator, bufferExtractorFunction);
}

/**
* Decorate as {@link BufferFlux}
*
* @param source publisher to decorate
* @param allocator the channel {@link BufferAllocator}
* @param bufferExtractor a function that extracts a {@link Buffer} from the given {@link Object}
* @return a {@link BufferFlux}
*/
public static BufferFlux fromInbound(Publisher<?> source, BufferAllocator allocator, Function<Object, Buffer> bufferExtractor) {
Objects.requireNonNull(allocator, "allocator");
return maybeFuse(Flux.from(ReactorNetty.publisherOrScalarMap(source, bufferExtractor)), allocator);
}
Expand Down Expand Up @@ -307,19 +317,13 @@ static BufferFlux maybeFuse(Flux<Buffer> source, BufferAllocator allocator) {
/**
* A channel object to {@link Buffer} transformer
*/
static final Function<Object, Buffer> bufferExtractor = o -> {
static final Function<Object, Buffer> bufferExtractorFunction = o -> {
if (o instanceof Buffer buffer) {
return buffer;
}
if (o instanceof DatagramPacket envelope) {
return envelope.content();
}
if (o instanceof HttpContent<?> httpContent) {
return httpContent.payload();
}
if (o instanceof WebSocketFrame frame) {
return frame.binaryData();
}
if (o instanceof byte[] bytes) {
return preferredAllocator().copyOf(bytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void subscribe(CoreSubscriber<? super Buffer> actual) {
}

BufferMono(Mono<?> source) {
super(source.map(BufferFlux.bufferExtractor));
super(source.map(BufferFlux.bufferExtractorFunction));
}

static BufferMono maybeFuse(Mono<?> source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import io.netty5.buffer.Buffer;
import io.netty5.buffer.BufferAllocator;
import io.netty5.handler.codec.http.headers.HttpHeaders;
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
import io.netty5.util.Resource;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.CombinedChannelDuplexHandler;
Expand All @@ -43,6 +45,7 @@
import io.netty5.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty5.BufferFlux;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.NettyInbound;
Expand All @@ -55,6 +58,7 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import static io.netty5.buffer.DefaultBufferAllocators.preferredAllocator;
import static reactor.netty5.ReactorNetty.format;
import static reactor.netty5.ReactorNetty.toPrettyHexDump;

Expand Down Expand Up @@ -101,6 +105,11 @@ public String requestId() {
return asShortText();
}

@Override
public BufferFlux receive() {
return BufferFlux.fromInbound(receiveObject(), alloc(), bufferExtractorFunction);
}

@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends Buffer> source) {
Expand Down Expand Up @@ -434,6 +443,25 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) {
}
);

/**
* A channel object to {@link Buffer} transformer
*/
static final Function<Object, Buffer> bufferExtractorFunction = o -> {
if (o instanceof Buffer buffer) {
return buffer;
}
if (o instanceof HttpContent<?> httpContent) {
return httpContent.payload();
}
if (o instanceof WebSocketFrame frame) {
return frame.binaryData();
}
if (o instanceof byte[] bytes) {
return preferredAllocator().copyOf(bytes);
}
throw new IllegalArgumentException("Object " + o + " of type " + o.getClass() + " " + "cannot be converted to Buffer");
};

static final Logger log = Loggers.getLogger(HttpOperations.class);

static final Pattern SCHEME_PATTERN = Pattern.compile("^(https?|wss?)://.*$");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ void httpPipelining() throws Exception {
x.addHandlerFirst(new HttpClientCodec()))
.receiveObject()
.ofType(HttpContent.class)
.as(BufferFlux::fromInbound)
.as(httpContentFlux ->
BufferFlux.fromInbound(httpContentFlux, preferredAllocator(), o -> ((HttpContent<?>) o).payload()))
.asString()
.log()
.map(Integer::parseInt)
Expand Down

0 comments on commit 7f6a52e

Please sign in to comment.