Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage kernel message queueing better to prevent out-of-order execution #9571

Merged
merged 5 commits into from Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/csvviewer/src/model.ts
Expand Up @@ -441,7 +441,7 @@ export class DSVModel extends DataModel implements IDisposable {
// If we have already set up our initial bookkeeping, return early if we
// did not get any new rows beyond the last row that we've parsed, i.e.,
// nrows===1.
if (this._startedParsing && (nrows <= reparse)) {
if (this._startedParsing && nrows <= reparse) {
this._doneParsing = true;
this._ready.resolve(undefined);
return;
Expand Down
73 changes: 44 additions & 29 deletions packages/services/src/kernel/default.ts
Expand Up @@ -376,13 +376,42 @@ export class KernelConnection implements Kernel.IKernelConnection {
*
* If queue is true, queue the message for later sending if we cannot send
* now. Otherwise throw an error.
*
* #### Notes
* As an exception to the queueing, if we are sending a kernel_info_request
* message while we think the kernel is restarting, we send the message
* immediately without queueing. This is so that we can trigger a message
* back, which will then clear the kernel restarting state.
*/
private _sendMessage(msg: KernelMessage.IMessage, queue = true) {
if (this.status === 'dead') {
throw new Error('Kernel is dead');
}

// Send if the ws allows it, otherwise buffer the message.
// If we have a kernel_info_request and we are restarting, send the
// kernel_info_request immediately if we can, and if not throw an error so
// we can retry later. We do this because we must get at least one message
// from the kernel to reset the kernel session (thus clearing the restart
// status sentinel).
if (
this._kernelSession === RESTARTING_KERNEL_SESSION &&
KernelMessage.isInfoRequestMsg(msg)
) {
if (this.connectionStatus === 'connected') {
this._ws!.send(serialize.serialize(msg));
return;
} else {
throw new Error('Could not send message: status is not connected');
}
}

// If there are pending messages, add to the queue so we keep messages in order
if (queue && this._pendingMessages.length > 0) {
this._pendingMessages.push(msg);
return;
}

// Send if the ws allows it, otherwise queue the message.
if (
this.connectionStatus === 'connected' &&
this._kernelSession !== RESTARTING_KERNEL_SESSION
Expand Down Expand Up @@ -1258,29 +1287,17 @@ export class KernelConnection implements Kernel.IKernelConnection {

if (this.status !== 'dead') {
if (connectionStatus === 'connected') {
let restarting = false;
if (this._kernelSession === RESTARTING_KERNEL_SESSION) {
// Reset kernelSession on new connections,
// so we can start sending messages.
// Otherwise, requests are queued forever until we get a ws message
// that we did not ask for.
this._kernelSession = '';
restarting = true;
}
let restarting = this._kernelSession === RESTARTING_KERNEL_SESSION;

// Send pending messages, and make sure we send at least one message
// to get kernel status back.
// always request kernel info first, to get kernel status back
// and ensure iopub is fully established
// Send a kernel info request to make sure we send at least one
// message to get kernel status back. Always request kernel info
// first, to get kernel status back and ensure iopub is fully
// established. If we are restarting, this message will skip the queue
// and be sent immediately.
let p = this.requestKernelInfo();
if (restarting) {
// preserve restarting state until a reply is received
// so messages don't hop ahead of the pending queue
// while we are waiting for the kernelInfoReply
this._kernelSession = RESTARTING_KERNEL_SESSION;
}
// only after kernelInfo resolves (or after a timeout)
// start sending our pending messages, if any

// Send any pending messages after the kernelInfo resolves, or after a
// timeout as a failsafe.

let sendPendingCalled = false;
let sendPendingOnce = () => {
Expand All @@ -1289,8 +1306,11 @@ export class KernelConnection implements Kernel.IKernelConnection {
}
sendPendingCalled = true;
if (restarting && this._kernelSession === RESTARTING_KERNEL_SESSION) {
// we were restarting and a message didn't arrive to set the session
// it would be better to retry here
// We were restarting and a message didn't arrive to set the
// session, but we just assume the restart succeeded and send any
// pending messages.

// FIXME: it would be better to retry the kernel_info_request here
this._kernelSession = '';
}
clearTimeout(timeoutHandle);
Expand All @@ -1317,11 +1337,6 @@ export class KernelConnection implements Kernel.IKernelConnection {
private async _handleMessage(msg: KernelMessage.IMessage): Promise<void> {
let handled = false;

if (msg.header.msg_type === 'shutdown_reply') {
this._kernelSession = msg.header.session;
this._sendPending();
}

// Check to see if we have a display_id we need to reroute.
if (
msg.parent_header &&
Expand Down