Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove edge-triggered support for epoll and just always use level-tri… #14031

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
boolean epollInReadyRunnablePending;
private EpollIoOps ops;
private EpollIoOps inital;

protected volatile boolean active;

AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
Expand All @@ -103,14 +104,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
this.ops = initialOps;
}

void add(EpollIoOps add) {
ops = ops.with(add);
}

void remove(EpollIoOps remove) {
ops = ops.without(remove);
}

static boolean isSoErrorZero(Socket fd) {
try {
return fd.getSoError() == 0;
Expand Down Expand Up @@ -259,12 +252,6 @@ protected void doBeginRead() throws Exception {
// executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
// never get data after this.
setFlag(Native.EPOLLIN);

// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
if (unsafe.maybeMoreDataToRead) {
unsafe.executeEpollInReadyRunnable(config());
}
}

final boolean shouldBreakEpollInReady(ChannelConfig config) {
Expand Down Expand Up @@ -451,7 +438,6 @@ final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boo

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
boolean readPending;
boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
private final Runnable epollInReadyRunnable = new Runnable() {
@Override
Expand Down Expand Up @@ -521,39 +507,14 @@ public void handle(IoRegistration registration, IoEvent event) {
*/
abstract void epollInReady();

final void epollInBefore() {
maybeMoreDataToRead = false;
}

final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();

if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config);
} else if (!readPending && !config.isAutoRead()) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearEpollIn();
}
}

final void executeEpollInReadyRunnable(ChannelConfig config) {
if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
return;
}
epollInReadyRunnablePending = true;
eventLoop().execute(epollInReadyRunnable);
final boolean shouldStopReading(ChannelConfig config) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
return !readPending && !config.isAutoRead();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
allocHandle.attemptedBytesRead(1);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -124,7 +121,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,12 +745,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

ByteBuf byteBuf = null;
boolean close = false;
Expand Down Expand Up @@ -824,7 +821,9 @@ void epollInReady() {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (sQueue == null) {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
} else {
if (!config.isAutoRead()) {
clearEpollIn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.unix.IntegerUnixChannelOption;
import io.netty.channel.unix.RawUnixChannelOption;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -34,6 +35,9 @@
import static io.netty.channel.unix.Limits.SSIZE_MAX;

public class EpollChannelConfig extends DefaultChannelConfig {

private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(EpollChannelConfig.class);

private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;

protected EpollChannelConfig(Channel channel) {
Expand Down Expand Up @@ -180,10 +184,12 @@ public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollMode getEpollMode() {
return ((AbstractEpollChannel) channel).isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
return EpollMode.LEVEL_TRIGGERED;
}

/**
Expand All @@ -193,22 +199,12 @@ public EpollMode getEpollMode() {
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

AbstractEpollChannel epollChannel = (AbstractEpollChannel) channel;
checkChannelNotRegistered();
switch (mode) {
case EDGE_TRIGGERED:
epollChannel.add(EpollIoOps.EPOLLET);
break;
case LEVEL_TRIGGERED:
epollChannel.remove(EpollIoOps.EPOLLET);
break;
default:
throw new Error();
}
LOGGER.debug("Changing the EpollMode is not supported anymore, this is just a no-op");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK");
public static final ChannelOption<Integer> SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL");

/**
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public static final ChannelOption<EpollMode> EPOLL_MODE =
ChannelOption.valueOf(EpollChannelOption.class, "EPOLL_MODE");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -568,7 +565,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
Comment on lines +568 to +570
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, this whole if block is quite repetitive so maybe it would be a good candidate for a new definition of epollInFinally(Config).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it but I found it more readable this way :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough reason. It's not a lot of code in any case.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -375,7 +372,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,9 @@ private void epollInReadFd() {
}
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
epollInBefore();

try {
readLoop: do {
Expand Down Expand Up @@ -187,7 +185,9 @@ private void epollInReadFd() {
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

/**
* The <a href="https://linux.die.net//man/7/epoll">epoll</a> mode to use.
*
* @deprecated Netty always uses level-triggered mode.
*/
@Deprecated
public enum EpollMode {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public boolean get() {
return maybeMoreDataToRead();
}
};
private boolean isEdgeTriggered;
private boolean receivedRdHup;

EpollRecvByteAllocatorHandle(ExtendedHandle handle) {
Expand All @@ -47,25 +46,7 @@ final boolean isReceivedRdHup() {
}

boolean maybeMoreDataToRead() {
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="https://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. It is expected that the
* {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not
* read, and will force a EPOLLIN ready event.
*
* It is assumed RDHUP is handled externally by checking {@link #isReceivedRdHup()}.
*/
return (isEdgeTriggered && lastBytesRead() > 0) ||
(!isEdgeTriggered && lastBytesRead() == attemptedBytesRead());
}

final void edgeTriggered(boolean edgeTriggered) {
isEdgeTriggered = edgeTriggered;
}

final boolean isEdgeTriggered() {
return isEdgeTriggered;
return lastBytesRead() == attemptedBytesRead();
}

@Override
Expand All @@ -77,7 +58,7 @@ public final ByteBuf allocate(ByteBufAllocator alloc) {

@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
return isReceivedRdHup() || ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.netty.channel.epoll;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.ServerSocketChannel;

Expand Down

This file was deleted.