Skip to content

Commit

Permalink
Merge pull request #9571 from jasongrout/queuemsg
Browse files Browse the repository at this point in the history
Manage kernel message queueing better to prevent out-of-order execution
  • Loading branch information
blink1073 committed Jan 8, 2021
2 parents d8d6f91 + b5ae089 commit b62371e
Showing 1 changed file with 44 additions and 29 deletions.
73 changes: 44 additions & 29 deletions packages/services/src/kernel/default.ts
Expand Up @@ -376,13 +376,42 @@ export class KernelConnection implements Kernel.IKernelConnection {
*
* If queue is true, queue the message for later sending if we cannot send
* now. Otherwise throw an error.
*
* #### Notes
* As an exception to the queueing, if we are sending a kernel_info_request
* message while we think the kernel is restarting, we send the message
* immediately without queueing. This is so that we can trigger a message
* back, which will then clear the kernel restarting state.
*/
private _sendMessage(msg: KernelMessage.IMessage, queue = true) {
if (this.status === 'dead') {
throw new Error('Kernel is dead');
}

// Send if the ws allows it, otherwise buffer the message.
// If we have a kernel_info_request and we are restarting, send the
// kernel_info_request immediately if we can, and if not throw an error so
// we can retry later. We do this because we must get at least one message
// from the kernel to reset the kernel session (thus clearing the restart
// status sentinel).
if (
this._kernelSession === RESTARTING_KERNEL_SESSION &&
KernelMessage.isInfoRequestMsg(msg)
) {
if (this.connectionStatus === 'connected') {
this._ws!.send(serialize.serialize(msg));
return;
} else {
throw new Error('Could not send message: status is not connected');
}
}

// If there are pending messages, add to the queue so we keep messages in order
if (queue && this._pendingMessages.length > 0) {
this._pendingMessages.push(msg);
return;
}

// Send if the ws allows it, otherwise queue the message.
if (
this.connectionStatus === 'connected' &&
this._kernelSession !== RESTARTING_KERNEL_SESSION
Expand Down Expand Up @@ -1258,29 +1287,17 @@ export class KernelConnection implements Kernel.IKernelConnection {

if (this.status !== 'dead') {
if (connectionStatus === 'connected') {
let restarting = false;
if (this._kernelSession === RESTARTING_KERNEL_SESSION) {
// Reset kernelSession on new connections,
// so we can start sending messages.
// Otherwise, requests are queued forever until we get a ws message
// that we did not ask for.
this._kernelSession = '';
restarting = true;
}
let restarting = this._kernelSession === RESTARTING_KERNEL_SESSION;

// Send pending messages, and make sure we send at least one message
// to get kernel status back.
// always request kernel info first, to get kernel status back
// and ensure iopub is fully established
// Send a kernel info request to make sure we send at least one
// message to get kernel status back. Always request kernel info
// first, to get kernel status back and ensure iopub is fully
// established. If we are restarting, this message will skip the queue
// and be sent immediately.
let p = this.requestKernelInfo();
if (restarting) {
// preserve restarting state until a reply is received
// so messages don't hop ahead of the pending queue
// while we are waiting for the kernelInfoReply
this._kernelSession = RESTARTING_KERNEL_SESSION;
}
// only after kernelInfo resolves (or after a timeout)
// start sending our pending messages, if any

// Send any pending messages after the kernelInfo resolves, or after a
// timeout as a failsafe.

let sendPendingCalled = false;
let sendPendingOnce = () => {
Expand All @@ -1289,8 +1306,11 @@ export class KernelConnection implements Kernel.IKernelConnection {
}
sendPendingCalled = true;
if (restarting && this._kernelSession === RESTARTING_KERNEL_SESSION) {
// we were restarting and a message didn't arrive to set the session
// it would be better to retry here
// We were restarting and a message didn't arrive to set the
// session, but we just assume the restart succeeded and send any
// pending messages.

// FIXME: it would be better to retry the kernel_info_request here
this._kernelSession = '';
}
clearTimeout(timeoutHandle);
Expand All @@ -1317,11 +1337,6 @@ export class KernelConnection implements Kernel.IKernelConnection {
private async _handleMessage(msg: KernelMessage.IMessage): Promise<void> {
let handled = false;

if (msg.header.msg_type === 'shutdown_reply') {
this._kernelSession = msg.header.session;
this._sendPending();
}

// Check to see if we have a display_id we need to reroute.
if (
msg.parent_header &&
Expand Down

0 comments on commit b62371e

Please sign in to comment.