Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the completionObjects leak problem. #4285

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

horizonzy
Copy link
Member

Fixes #4278, you need to go through #4278 for the context.

When a connection is broken, it will trigger PerChannelBookieClient#channelInactive.

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Disconnected from bookie channel {}", ctx.channel());
if (ctx.channel() != null) {
closeChannel(ctx.channel());
if (ctx.channel().pipeline().get(SslHandler.class) != null) {
activeTlsChannelCounter.dec();
} else {
activeNonTlsChannelCounter.dec();
}
}
errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
errorOutPendingOps(BKException.Code.BookieHandleNotAvailableException);
synchronized (this) {
if (this.channel == ctx.channel()
&& state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
channel = null;
}
}

Point 1. line_1303, it will drain all completionObjects, and complete with BookieHandleNotAvailableException.

Point 2. line_1310, set PerChannelBookieClient#channel = null.

When PerChannelBookieClient#addEntry.

void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
if (toSend instanceof ByteBuf) {
request = ((ByteBuf) toSend).retainedDuplicate();
} else {
request = ByteBufList.clone((ByteBufList) toSend);
}
} else {
final long txnId = getTxnId();
completionKey = new TxnCompletionKey(txnId, OperationType.ADD_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.ADD_ENTRY)
.setTxnId(txnId);
if (((short) options & BookieProtocol.FLAG_HIGH_PRIORITY) == BookieProtocol.FLAG_HIGH_PRIORITY) {
headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
ByteString body = null;
ByteBufList bufToSend = (ByteBufList) toSend;
if (bufToSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
bufToSend.readableBytes());
} else {
for (int i = 0; i < bufToSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
}
AddRequest.Builder addBuilder = AddRequest.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId)
.setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
.setBody(body);
if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
}
if (!writeFlags.isEmpty()) {
// add flags only if needed, in order to be able to talk with old bookies
addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
}
request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setAddRequest(addBuilder)
.build();
}
putCompletionKeyValue(completionKey,
acquireAddCompletion(completionKey,
cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
// usually checked in writeAndFlush, but we have extra check
// because we need to release toSend.
errorOut(completionKey);
ReferenceCountUtil.release(toSend);
return;
} else {
// addEntry times out on backpressure
writeAndFlush(c, completionKey, request, allowFastFail);
}
}

Point 3. line_840, it will put completionKeyValue to completionObjects.

Point 4. line_852, if the channel is not null, invoke writeAndFlush.

There is a race condition between PerChannelBookieClient#channelInactive and PerChannelBookieClient#addEntry.

There is the timeline.

Point 1 -> Point 3 -> Point 4 -> Point 2.

It will write and flush the AddRequest to the netty channel. In the bookkeeper, there is a weakness in PerChannelBookieClient#writeAndFlush.

private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request,
final boolean allowFastFail) {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
return;
}
final boolean isChannelWritable = channel.isWritable();
if (isWritable != isChannelWritable) {
// isWritable is volatile so simple "isWritable = channel.isWritable()" would be slower
isWritable = isChannelWritable;
}
if (allowFastFail && !isWritable) {
LOG.warn("Operation {} failed: TooManyRequestsException",
StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
return;
}
try {
final long startTime = MathUtils.nowInNano();
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
channel.writeAndFlush(request, promise);
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
}
}
void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut();
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
}
}
void errorOut(final CompletionKey key, final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
}
}
/**
* Errors out pending ops from per channel bookie client. As the channel
* is being closed, all the operations waiting on the connection
* will be sent to completion with error.
*/
void errorOutPendingOps(int rc) {
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
synchronized (this) {
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
}
/**
* Errors out pending entries. We call this method from one thread to avoid
* concurrent executions to QuorumOpMonitor (implements callbacks). It seems
* simpler to call it from BookieHandle instead of calling directly from
* here.
*/
void errorOutOutstandingEntries(int rc) {
Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
while (multikey.isPresent()) {
multikey.ifPresent(k -> errorOut(k, rc));
multikey = completionObjectsV2Conflicts.getAnyKey();
}
for (CompletionKey key : completionObjects.keys()) {
errorOut(key, rc);
}

At line_1230, we define a promise for the netty write and flush, if the write and flush failed, we only record the metrics at line_1211, not remove the completionKey from completionObjects. The completionKey will leak in the completionObjects.
If the PerChannelBookieClient#addEntryTimeoutNanos is disabled, the timeoutCheck won't work, so the completionKey exists in the completionObjects forever.

In #4278, the PerChannelBookieClient#addEntryTimeoutNanos is enabled, So you will see that 5 seconds(default timeout) passed after bk1 died, the step 9 trigger PendingAddOp timeout, then cause the issue in #4278.

So, as long as we remove the completionKey from completionObjects when write and flush AddRequest failed, we can solve the problem.

@@ -1209,6 +1209,7 @@ private void writeAndFlush(final Channel channel,
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
errorOut(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #4278, the PerChannelBookieClient#addEntryTimeoutNanos is enabled, So you will see that 5 seconds(default timeout) passed after bk1 died, the step 9 trigger PendingAddOp timeout, then cause the issue in #4278.

I have a question, since we have a default thread to detect timeout requests every 5 seconds and remove the timedout CompletionKey from completionObjects, even if the write fails, will the timeout detection task not remove the key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the timeout detection will remove the key.

# Conflicts:
#	bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants