Skip to content

Commit

Permalink
Remove edge-triggered support for epoll and just always use level-tri… (
Browse files Browse the repository at this point in the history
#14031)

…ggered

Motivation:

We supported edge-triggered and level-triggered modes for our epoll
transport. This made things more complex while not really providing much
value.

Modifications:

- Remove edge-triggered support and just use level-triggered all the
time

Result:

Less complexity. Fixes #14007
  • Loading branch information
normanmaurer committed May 3, 2024
1 parent d3e830c commit dacd5c0
Show file tree
Hide file tree
Showing 25 changed files with 52 additions and 462 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
boolean epollInReadyRunnablePending;
private EpollIoOps ops;
private EpollIoOps inital;

protected volatile boolean active;

AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
Expand All @@ -103,14 +104,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
this.ops = initialOps;
}

void add(EpollIoOps add) {
ops = ops.with(add);
}

void remove(EpollIoOps remove) {
ops = ops.without(remove);
}

static boolean isSoErrorZero(Socket fd) {
try {
return fd.getSoError() == 0;
Expand Down Expand Up @@ -259,12 +252,6 @@ protected void doBeginRead() throws Exception {
// executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
// never get data after this.
setFlag(Native.EPOLLIN);

// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
if (unsafe.maybeMoreDataToRead) {
unsafe.executeEpollInReadyRunnable(config());
}
}

final boolean shouldBreakEpollInReady(ChannelConfig config) {
Expand Down Expand Up @@ -451,7 +438,6 @@ final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boo

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
boolean readPending;
boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
private final Runnable epollInReadyRunnable = new Runnable() {
@Override
Expand Down Expand Up @@ -521,39 +507,14 @@ public void handle(IoRegistration registration, IoEvent event) {
*/
abstract void epollInReady();

final void epollInBefore() {
maybeMoreDataToRead = false;
}

final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();

if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config);
} else if (!readPending && !config.isAutoRead()) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearEpollIn();
}
}

final void executeEpollInReadyRunnable(ChannelConfig config) {
if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
return;
}
epollInReadyRunnablePending = true;
eventLoop().execute(epollInReadyRunnable);
final boolean shouldStopReading(ChannelConfig config) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
return !readPending && !config.isAutoRead();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
allocHandle.attemptedBytesRead(1);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -124,7 +121,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,12 +745,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

ByteBuf byteBuf = null;
boolean close = false;
Expand Down Expand Up @@ -824,7 +821,9 @@ void epollInReady() {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (sQueue == null) {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
} else {
if (!config.isAutoRead()) {
clearEpollIn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.unix.IntegerUnixChannelOption;
import io.netty.channel.unix.RawUnixChannelOption;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -34,6 +35,9 @@
import static io.netty.channel.unix.Limits.SSIZE_MAX;

public class EpollChannelConfig extends DefaultChannelConfig {

private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(EpollChannelConfig.class);

private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;

protected EpollChannelConfig(Channel channel) {
Expand Down Expand Up @@ -180,10 +184,12 @@ public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollMode getEpollMode() {
return ((AbstractEpollChannel) channel).isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
return EpollMode.LEVEL_TRIGGERED;
}

/**
Expand All @@ -193,22 +199,12 @@ public EpollMode getEpollMode() {
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

AbstractEpollChannel epollChannel = (AbstractEpollChannel) channel;
checkChannelNotRegistered();
switch (mode) {
case EDGE_TRIGGERED:
epollChannel.add(EpollIoOps.EPOLLET);
break;
case LEVEL_TRIGGERED:
epollChannel.remove(EpollIoOps.EPOLLET);
break;
default:
throw new Error();
}
LOGGER.debug("Changing the EpollMode is not supported anymore, this is just a no-op");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK");
public static final ChannelOption<Integer> SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL");

/**
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public static final ChannelOption<EpollMode> EPOLL_MODE =
ChannelOption.valueOf(EpollChannelOption.class, "EPOLL_MODE");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -568,7 +565,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -375,7 +372,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,9 @@ private void epollInReadFd() {
}
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
epollInBefore();

try {
readLoop: do {
Expand Down Expand Up @@ -187,7 +185,9 @@ private void epollInReadFd() {
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

/**
* The <a href="https://linux.die.net//man/7/epoll">epoll</a> mode to use.
*
* @deprecated Netty always uses level-triggered mode.
*/
@Deprecated
public enum EpollMode {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public boolean get() {
return maybeMoreDataToRead();
}
};
private boolean isEdgeTriggered;
private boolean receivedRdHup;

EpollRecvByteAllocatorHandle(ExtendedHandle handle) {
Expand All @@ -47,25 +46,7 @@ final boolean isReceivedRdHup() {
}

boolean maybeMoreDataToRead() {
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="https://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. It is expected that the
* {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not
* read, and will force a EPOLLIN ready event.
*
* It is assumed RDHUP is handled externally by checking {@link #isReceivedRdHup()}.
*/
return (isEdgeTriggered && lastBytesRead() > 0) ||
(!isEdgeTriggered && lastBytesRead() == attemptedBytesRead());
}

final void edgeTriggered(boolean edgeTriggered) {
isEdgeTriggered = edgeTriggered;
}

final boolean isEdgeTriggered() {
return isEdgeTriggered;
return lastBytesRead() == attemptedBytesRead();
}

@Override
Expand All @@ -77,7 +58,7 @@ public final ByteBuf allocate(ByteBufAllocator alloc) {

@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
return isReceivedRdHup() || ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.netty.channel.epoll;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.ServerSocketChannel;

Expand Down

This file was deleted.

0 comments on commit dacd5c0

Please sign in to comment.