Skip to content

Commit

Permalink
Addressing Julien's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Apr 3, 2024
1 parent 87e355b commit ab6f258
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 181 deletions.
33 changes: 16 additions & 17 deletions src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,6 @@ void setRequest(HttpRequest request) {
}
}

private InboundBuffer createAndSetInboundBuffer() {
assert pending == null;
InboundBuffer<Object> pending = new InboundBuffer<>(context, 8);
this.pending = pending;
pending.drainHandler(v -> conn.doResume());
pending.handler(buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
});
return pending;
}

void handleContent(Buffer buffer) {
InboundBuffer<Object> queue;
synchronized (conn) {
Expand Down Expand Up @@ -349,19 +334,33 @@ public HttpServerRequest pause() {
if (pending != null) {
pending.pause();
} else {
createAndSetInboundBuffer().pause();
pending = InboundBuffer.createPaused(context, 8, pendingDrainHandler(), pendingHandler());
}
return this;
}
}

private Handler<Object> pendingHandler() {
return buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
};
}

private Handler<Void> pendingDrainHandler() {
return v -> conn.doResume();
}

@Override
public HttpServerRequest fetch(long amount) {
synchronized (conn) {
if (pending != null) {
pending.fetch(amount);
} else {
createAndSetInboundBuffer().fetch(amount);
pending = InboundBuffer.createAndFetch(context, 8, amount, pendingDrainHandler(), pendingHandler());
}
return this;
}
Expand Down
62 changes: 50 additions & 12 deletions src/main/java/io/vertx/core/streams/impl/InboundBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
*/
package io.vertx.core.streams.impl;

import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Objects;

/**
* A buffer that transfers elements to an handler with back-pressure.
Expand Down Expand Up @@ -87,6 +86,27 @@ public InboundBuffer(Context context) {
}

public InboundBuffer(Context context, long highWaterMark) {
this(context, highWaterMark, Long.MAX_VALUE, null, null);
}

public static <E> InboundBuffer<E> createPaused(Context context, long highWaterMark, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
return new InboundBuffer<>(context, highWaterMark, 0L, drainHandler, handler);
}

public static <E> InboundBuffer<E> createAndFetch(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
Objects.requireNonNull(drainHandler);
Objects.requireNonNull(handler);
checkPositiveAmount(demand);
InboundBuffer<E> inboundBuffer = new InboundBuffer<>(context, highWaterMark, Long.MAX_VALUE, drainHandler, handler);
if (inboundBuffer.emit(demand)) {
inboundBuffer.context.runOnContext(v -> inboundBuffer.drain());
}
return inboundBuffer;
}

private InboundBuffer(Context context, long highWaterMark, long demand, Handler<Void> drainHandler, Handler<E> handler) {
if (context == null) {
throw new NullPointerException("context must not be null");
}
Expand All @@ -95,9 +115,11 @@ public InboundBuffer(Context context, long highWaterMark) {
}
this.context = (ContextInternal) context;
this.highWaterMark = highWaterMark;
this.demand = Long.MAX_VALUE;
this.demand = demand;
// empty ArrayDeque's constructor ArrayDeque allocates 16 elements; let's delay the allocation to be of the proper size
this.pending = null;
this.drainHandler = drainHandler;
this.handler = handler;
}

private void checkThread() {
Expand Down Expand Up @@ -280,23 +302,39 @@ private void handleException(Throwable err) {
* @return {@code true} when the buffer will be drained
*/
public boolean fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
checkPositiveAmount(amount);
synchronized (this) {
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
if (!emit(amount)) {
return false;
}
emitting = true;
}
asyncDrain();
return true;
}

private void asyncDrain() {
context.runOnContext(v -> drain());
}

private boolean emit(long amount) {
assert amount >= 0L;
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (emitting || (isEmpty() && !overflow)) {
return false;
}
emitting = true;
return true;
}

private static void checkPositiveAmount(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException();
}
}

/**
* Read the most recent element synchronously.
* <p/>
Expand Down

0 comments on commit ab6f258

Please sign in to comment.