Skip to content

Commit

Permalink
Fix InboundBuffer::pause & write performance
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Apr 11, 2024
1 parent 50e5a06 commit a789e11
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 195 deletions.
62 changes: 32 additions & 30 deletions src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,6 @@ void setRequest(HttpRequest request) {
}
}

private InboundBuffer<Object> pendingQueue() {
if (pending == null) {
pending = new InboundBuffer<>(context, 8);
pending.drainHandler(v -> conn.doResume());
pending.handler(buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
});
}
return pending;
}

void handleContent(Buffer buffer) {
InboundBuffer<Object> queue;
synchronized (conn) {
Expand Down Expand Up @@ -346,15 +331,37 @@ public HttpServerRequest exceptionHandler(Handler<Throwable> handler) {
@Override
public HttpServerRequest pause() {
synchronized (conn) {
pendingQueue().pause();
if (pending != null) {
pending.pause();
} else {
pending = InboundBuffer.createPaused(context, 8, pendingDrainHandler(), pendingHandler());
}
return this;
}
}

private Handler<Object> pendingHandler() {
return buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
};
}

private Handler<Void> pendingDrainHandler() {
return v -> conn.doResume();
}

@Override
public HttpServerRequest fetch(long amount) {
synchronized (conn) {
pendingQueue().fetch(amount);
if (pending != null) {
pending.fetch(amount);
} else {
pending = InboundBuffer.createAndFetch(context, 8, amount, pendingDrainHandler(), pendingHandler());
}
return this;
}
}
Expand Down Expand Up @@ -572,17 +579,17 @@ void handleEnd() {
ended = true;
queue = pending;
if (queue == null) {
handler = onRequestComplete();
handler = endRequest();
}
}
if (queue != null) {
queue.write(InboundBuffer.END_SENTINEL);
} else {
handleEnd(handler);
} else if (handler != null) {
handler.handleEnd();
}
}

private HttpEventHandler onRequestComplete() {
private HttpEventHandler endRequest() {
assert Thread.holdsLock(conn);
if (METRICS_ENABLED) {
reportRequestComplete();
Expand All @@ -593,19 +600,14 @@ private HttpEventHandler onRequestComplete() {
return eventHandler;
}

private void handleEnd(HttpEventHandler handler) {
// If there have been uploads then we let the last one call the end handler once any fileuploads are complete
if (handler != null) {
handler.handleEnd();
}
}

private void onEnd() {
HttpEventHandler handler;
synchronized (conn) {
handler = onRequestComplete();
handler = endRequest();
}
if (handler != null) {
handler.handleEnd();
}
handleEnd(handler);
}

private void reportRequestComplete() {
Expand Down
66 changes: 53 additions & 13 deletions src/main/java/io/vertx/core/streams/impl/InboundBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
*/
package io.vertx.core.streams.impl;

import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Objects;

/**
* A buffer that transfers elements to an handler with back-pressure.
Expand Down Expand Up @@ -87,6 +86,28 @@ public InboundBuffer(Context context) {
}

public InboundBuffer(Context context, long highWaterMark) {
this(context, highWaterMark, Long.MAX_VALUE, null, null);
}

public static <E> InboundBuffer<E> createPaused(Context context, long highWaterMark, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
return new InboundBuffer<>(context, highWaterMark, 0L, drainHandler, handler);
}

public static <E> InboundBuffer<E> createAndFetch(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
checkPositiveAmount(demand);
InboundBuffer<E> inboundBuffer = new InboundBuffer<>(context, highWaterMark, Long.MAX_VALUE, drainHandler, handler);
if (inboundBuffer.emit(demand)) {
inboundBuffer.asyncDrain();
inboundBuffer.context.runOnContext(v -> inboundBuffer.drain());
}
return inboundBuffer;
}

private InboundBuffer(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
if (context == null) {
throw new NullPointerException("context must not be null");
}
Expand All @@ -95,9 +116,11 @@ public InboundBuffer(Context context, long highWaterMark) {
}
this.context = (ContextInternal) context;
this.highWaterMark = highWaterMark;
this.demand = Long.MAX_VALUE;
this.demand = demand;
// empty ArrayDeque's constructor ArrayDeque allocates 16 elements; let's delay the allocation to be of the proper size
this.pending = null;
this.drainHandler = drainHandler;
this.handler = handler;
}

private void checkThread() {
Expand Down Expand Up @@ -139,7 +162,8 @@ private boolean checkWritable() {
if (demand == Long.MAX_VALUE) {
return true;
} else {
long actual = size() - demand;
int size = pending == null ? 0 : pending.size();
long actual = size - demand;
boolean writable = actual < highWaterMark;
overflow |= !writable;
return writable;
Expand Down Expand Up @@ -279,23 +303,39 @@ private void handleException(Throwable err) {
* @return {@code true} when the buffer will be drained
*/
public boolean fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
checkPositiveAmount(amount);
synchronized (this) {
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
if (!emit(amount)) {
return false;
}
emitting = true;
}
asyncDrain();
return true;
}

private void asyncDrain() {
context.runOnContext(v -> drain());
}

private boolean emit(long amount) {
assert amount >= 0L;
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
return false;
}
emitting = true;
return true;
}

private static void checkPositiveAmount(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
}

/**
* Read the most recent element synchronously.
* <p/>
Expand Down

0 comments on commit a789e11

Please sign in to comment.