From b1409f1267c59cf4149ffcde8be2d3d7a6f46c50 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 7 Jan 2021 14:24:29 -0800 Subject: [PATCH 1/5] Add isExecuteRequestMsg typecheck function. --- packages/services/src/kernel/messages.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/services/src/kernel/messages.ts b/packages/services/src/kernel/messages.ts index c66dc5f8ff4f..3c53ad45b2fa 100644 --- a/packages/services/src/kernel/messages.ts +++ b/packages/services/src/kernel/messages.ts @@ -1030,6 +1030,13 @@ export interface IExecuteRequestMsg extends IShellMessage<'execute_request'> { }; } +/** + * Test whether a kernel message is an `'execute_request'` message. + */ +export function isExecuteRequestMsg(msg: IMessage): msg is IExecuteRequestMsg { + return msg.header.msg_type === 'execute_request'; +} + /** * The content of an `execute-reply` message. * From cc746096826a0a7df93953b9be29bf2afe13f2fe Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 7 Jan 2021 14:32:54 -0800 Subject: [PATCH 2/5] Manage kernel message queueing better to prevent out-of-order execution. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9566 Followup on #8562 Changes solution in #9484 If we restarted a kernel, then quickly evaluated a lot of cells, we were often seeing the cells evaluated out of order. This came because the initial evaluations would be queued (because we had the kernel restarting sentinel in place), but later evaluations would happen synchronously, even if there were still messages queued. The logic is now changed to (almost) always queue a message if there are already queued messages waiting to be sent to preserve the message order. One exception to this is the kernel info request when we are restarting. We redo the logic in #9484 to encode the exception in the _sendMessage function (rather than hacking around the conditions for queueing a message). This brings the exception closer to the logic it is working around, so it is a bit cleaner. Also, we realize that the sendMessage `queue` parameter is really signifying when we are sending pending messages. As such, we always try to send those messages if we can. Finally, we saw that there was a race condition between sending messages after a restart and when the websocket was reconnected, leading to some stalled initial message replies. We delete the logic that sends pending messages on shutdown_reply, since those pending messages will be more correctly sent when the websocket reconnects anyway. We also don’t worry about setting the kernel session there since the calling function does that logic. --- packages/services/src/kernel/default.ts | 95 +++++++++++++++++-------- 1 file changed, 65 insertions(+), 30 deletions(-) diff --git a/packages/services/src/kernel/default.ts b/packages/services/src/kernel/default.ts index 6359c99404c2..98087f9664cf 100644 --- a/packages/services/src/kernel/default.ts +++ b/packages/services/src/kernel/default.ts @@ -376,22 +376,70 @@ export class KernelConnection implements Kernel.IKernelConnection { * * If queue is true, queue the message for later sending if we cannot send * now. Otherwise throw an error. + * + * #### Notes + * As an exception to the queueing, if we are sending a kernel_info_request + * message while we think the kernel is restarting, we send the message + * immediately without queueing. This is so that we can trigger a message + * back, which will then clear the kernel restarting state. */ private _sendMessage(msg: KernelMessage.IMessage, queue = true) { if (this.status === 'dead') { throw new Error('Kernel is dead'); } - // Send if the ws allows it, otherwise buffer the message. + function formatMessage(msg: KernelMessage.IMessage) { + var output = msg.header.msg_type; + if (KernelMessage.isExecuteRequestMsg(msg)) { + output += `: ${msg.content.code.substring(0, 10)}` + } + return output + } + + // If we have a kernel_info_request and we are restarting, send the + // kernel_info_request immediately if we can, and if not throw an error so + // we can retry later. We do this because we must get at least one message + // from the kernel to reset the kernel session (thus clearing the restart + // status sentinel). + if ( + this._kernelSession === RESTARTING_KERNEL_SESSION && + KernelMessage.isInfoRequestMsg(msg) + ) { + if (this.connectionStatus === 'connected') { + console.log('Sending kernel info request immediately'); + this._ws!.send(serialize.serialize(msg)); + return; + } else { + throw new Error('Could not send message: status is not connected'); + } + } + + // If there are pending messages, add to the queue so we keep messages in order + if (queue && this._pendingMessages.length > 0) { + console.log( + `Queueing message because others are on the queue (${formatMessage(msg)})`, + msg, + this._pendingMessages + ); + this._pendingMessages.push(msg); + return; + } + + // Send if the ws allows it, otherwise queue the message. if ( this.connectionStatus === 'connected' && this._kernelSession !== RESTARTING_KERNEL_SESSION ) { + console.log(`Sending message immediately ${formatMessage(msg)}`, msg); this._ws!.send(serialize.serialize(msg)); } else if (queue) { + console.log( + `Queueing message (connectionStatus: ${this.connectionStatus}, kernel session ${this._kernelSession}, ${formatMessage(msg)})`, + msg + ); this._pendingMessages.push(msg); } else { - throw new Error('Could not send message'); + throw new Error(`Could not send message: ${formatMessage(msg)}`); } } @@ -1060,6 +1108,7 @@ export class KernelConnection implements Kernel.IKernelConnection { this._kernelSession !== RESTARTING_KERNEL_SESSION && this._pendingMessages.length > 0 ) { + console.error('Sending pending message', this._pendingMessages[0]); this._sendMessage(this._pendingMessages[0], false); // We shift the message off the queue after the message is sent so that @@ -1258,29 +1307,17 @@ export class KernelConnection implements Kernel.IKernelConnection { if (this.status !== 'dead') { if (connectionStatus === 'connected') { - let restarting = false; - if (this._kernelSession === RESTARTING_KERNEL_SESSION) { - // Reset kernelSession on new connections, - // so we can start sending messages. - // Otherwise, requests are queued forever until we get a ws message - // that we did not ask for. - this._kernelSession = ''; - restarting = true; - } + let restarting = this._kernelSession === RESTARTING_KERNEL_SESSION; - // Send pending messages, and make sure we send at least one message - // to get kernel status back. - // always request kernel info first, to get kernel status back - // and ensure iopub is fully established + // Send a kernel info request to make sure we send at least one + // message to get kernel status back. Always request kernel info + // first, to get kernel status back and ensure iopub is fully + // established. If we are restarting, this message will skip the queue + // and be sent immediately. let p = this.requestKernelInfo(); - if (restarting) { - // preserve restarting state until a reply is received - // so messages don't hop ahead of the pending queue - // while we are waiting for the kernelInfoReply - this._kernelSession = RESTARTING_KERNEL_SESSION; - } - // only after kernelInfo resolves (or after a timeout) - // start sending our pending messages, if any + + // Send any pending messages after the kernelInfo resolves, or after a + // timeout as a failsafe. let sendPendingCalled = false; let sendPendingOnce = () => { @@ -1289,8 +1326,11 @@ export class KernelConnection implements Kernel.IKernelConnection { } sendPendingCalled = true; if (restarting && this._kernelSession === RESTARTING_KERNEL_SESSION) { - // we were restarting and a message didn't arrive to set the session - // it would be better to retry here + // We were restarting and a message didn't arrive to set the + // session, but we just assume the restart succeeded and send any + // pending messages. + + // FIXME: it would be better to retry the kernel_info_request here this._kernelSession = ''; } clearTimeout(timeoutHandle); @@ -1317,11 +1357,6 @@ export class KernelConnection implements Kernel.IKernelConnection { private async _handleMessage(msg: KernelMessage.IMessage): Promise { let handled = false; - if (msg.header.msg_type === 'shutdown_reply') { - this._kernelSession = msg.header.session; - this._sendPending(); - } - // Check to see if we have a display_id we need to reroute. if ( msg.parent_header && From 56faad16a333c1f11603281c499efd55d45f5d0c Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 7 Jan 2021 14:34:22 -0800 Subject: [PATCH 3/5] Remove debugging messages. --- packages/services/src/kernel/default.ts | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/packages/services/src/kernel/default.ts b/packages/services/src/kernel/default.ts index 98087f9664cf..a09eb89a06b5 100644 --- a/packages/services/src/kernel/default.ts +++ b/packages/services/src/kernel/default.ts @@ -388,14 +388,6 @@ export class KernelConnection implements Kernel.IKernelConnection { throw new Error('Kernel is dead'); } - function formatMessage(msg: KernelMessage.IMessage) { - var output = msg.header.msg_type; - if (KernelMessage.isExecuteRequestMsg(msg)) { - output += `: ${msg.content.code.substring(0, 10)}` - } - return output - } - // If we have a kernel_info_request and we are restarting, send the // kernel_info_request immediately if we can, and if not throw an error so // we can retry later. We do this because we must get at least one message @@ -406,7 +398,6 @@ export class KernelConnection implements Kernel.IKernelConnection { KernelMessage.isInfoRequestMsg(msg) ) { if (this.connectionStatus === 'connected') { - console.log('Sending kernel info request immediately'); this._ws!.send(serialize.serialize(msg)); return; } else { @@ -416,11 +407,6 @@ export class KernelConnection implements Kernel.IKernelConnection { // If there are pending messages, add to the queue so we keep messages in order if (queue && this._pendingMessages.length > 0) { - console.log( - `Queueing message because others are on the queue (${formatMessage(msg)})`, - msg, - this._pendingMessages - ); this._pendingMessages.push(msg); return; } @@ -430,16 +416,11 @@ export class KernelConnection implements Kernel.IKernelConnection { this.connectionStatus === 'connected' && this._kernelSession !== RESTARTING_KERNEL_SESSION ) { - console.log(`Sending message immediately ${formatMessage(msg)}`, msg); this._ws!.send(serialize.serialize(msg)); } else if (queue) { - console.log( - `Queueing message (connectionStatus: ${this.connectionStatus}, kernel session ${this._kernelSession}, ${formatMessage(msg)})`, - msg - ); this._pendingMessages.push(msg); } else { - throw new Error(`Could not send message: ${formatMessage(msg)}`); + throw new Error('Could not send message'); } } @@ -1108,7 +1089,6 @@ export class KernelConnection implements Kernel.IKernelConnection { this._kernelSession !== RESTARTING_KERNEL_SESSION && this._pendingMessages.length > 0 ) { - console.error('Sending pending message', this._pendingMessages[0]); this._sendMessage(this._pendingMessages[0], false); // We shift the message off the queue after the message is sent so that From d63d9f9fddf6ae13ddda0ba058ab56ac219111a6 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 7 Jan 2021 16:16:04 -0800 Subject: [PATCH 4/5] lint --- packages/csvviewer/src/model.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/csvviewer/src/model.ts b/packages/csvviewer/src/model.ts index 36b5b1b35620..77cb84ea3047 100644 --- a/packages/csvviewer/src/model.ts +++ b/packages/csvviewer/src/model.ts @@ -441,7 +441,7 @@ export class DSVModel extends DataModel implements IDisposable { // If we have already set up our initial bookkeeping, return early if we // did not get any new rows beyond the last row that we've parsed, i.e., // nrows===1. - if (this._startedParsing && (nrows <= reparse)) { + if (this._startedParsing && nrows <= reparse) { this._doneParsing = true; this._ready.resolve(undefined); return; From b5ae0892c9a38627387b21e641d768acb01206cb Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Fri, 8 Jan 2021 01:39:36 -0800 Subject: [PATCH 5/5] Remove unused type guard function. --- packages/services/src/kernel/messages.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packages/services/src/kernel/messages.ts b/packages/services/src/kernel/messages.ts index 3c53ad45b2fa..c66dc5f8ff4f 100644 --- a/packages/services/src/kernel/messages.ts +++ b/packages/services/src/kernel/messages.ts @@ -1030,13 +1030,6 @@ export interface IExecuteRequestMsg extends IShellMessage<'execute_request'> { }; } -/** - * Test whether a kernel message is an `'execute_request'` message. - */ -export function isExecuteRequestMsg(msg: IMessage): msg is IExecuteRequestMsg { - return msg.header.msg_type === 'execute_request'; -} - /** * The content of an `execute-reply` message. *