Skip to content

Commit

Permalink
Merge pull request #6544 from JohanMabille/control
Browse files Browse the repository at this point in the history
Control
  • Loading branch information
jasongrout committed Jun 21, 2019
2 parents dcfe0ba + dc0f3c2 commit a8e4af9
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 44 deletions.
14 changes: 7 additions & 7 deletions packages/outputarea/src/widget.ts
Expand Up @@ -144,15 +144,15 @@ export class OutputArea extends Widget {
/**
* The kernel future associated with the output area.
*/
get future(): Kernel.IFuture<
get future(): Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
> | null {
return this._future;
}

set future(
value: Kernel.IFuture<
value: Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
> | null
Expand Down Expand Up @@ -284,7 +284,7 @@ export class OutputArea extends Widget {
*/
protected onInputRequest(
msg: KernelMessage.IInputRequestMsg,
future: Kernel.IFuture
future: Kernel.IShellFuture
): void {
// Add an output widget to the end.
let factory = this.contentFactory;
Expand Down Expand Up @@ -493,7 +493,7 @@ export class OutputArea extends Widget {
};

private _minHeightTimeout: number = null;
private _future: Kernel.IFuture<
private _future: Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
> | null = null;
Expand All @@ -506,7 +506,7 @@ export class SimplifiedOutputArea extends OutputArea {
*/
protected onInputRequest(
msg: KernelMessage.IInputRequestMsg,
future: Kernel.IFuture
future: Kernel.IShellFuture
): void {
return;
}
Expand Down Expand Up @@ -755,7 +755,7 @@ export class Stdin extends Widget implements IStdin {
this._input.removeEventListener('keydown', this);
}

private _future: Kernel.IFuture = null;
private _future: Kernel.IShellFuture = null;
private _input: HTMLInputElement = null;
private _value: string;
private _promise = new PromiseDelegate<void>();
Expand All @@ -779,7 +779,7 @@ export namespace Stdin {
/**
* The kernel future associated with the request.
*/
future: Kernel.IFuture;
future: Kernel.IShellFuture;
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/services/src/kernel/comm.ts
Expand Up @@ -103,7 +103,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
data?: JSONObject,
metadata?: JSONObject,
buffers: (ArrayBuffer | ArrayBufferView)[] = []
): Kernel.IFuture {
): Kernel.IShellFuture {
if (this.isDisposed || this._kernel.isDisposed) {
throw new Error('Cannot open');
}
Expand Down Expand Up @@ -136,7 +136,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
metadata?: JSONObject,
buffers: (ArrayBuffer | ArrayBufferView)[] = [],
disposeOnDone: boolean = true
): Kernel.IFuture {
): Kernel.IShellFuture {
if (this.isDisposed || this._kernel.isDisposed) {
throw new Error('Cannot send');
}
Expand Down Expand Up @@ -170,7 +170,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
data?: JSONObject,
metadata?: JSONObject,
buffers: (ArrayBuffer | ArrayBufferView)[] = []
): Kernel.IFuture {
): Kernel.IShellFuture {
if (this.isDisposed || this._kernel.isDisposed) {
throw new Error('Cannot close');
}
Expand Down
87 changes: 80 additions & 7 deletions packages/services/src/kernel/default.ts
Expand Up @@ -19,7 +19,11 @@ import { Kernel } from './kernel';

import { KernelMessage } from './messages';

import { KernelFutureHandler } from './future';
import {
KernelFutureHandler,
KernelShellFutureHandler,
KernelControlFutureHandler
} from './future';

import * as serialize from './serialize';

Expand Down Expand Up @@ -263,7 +267,60 @@ export class DefaultKernel implements Kernel.IKernel {
msg: KernelMessage.IShellMessage<T>,
expectReply = false,
disposeOnDone = true
): Kernel.IFuture<KernelMessage.IShellMessage<T>> {
): Kernel.IShellFuture<KernelMessage.IShellMessage<T>> {
return this._sendKernelShellControl(
KernelShellFutureHandler,
msg,
expectReply,
disposeOnDone
) as Kernel.IShellFuture<KernelMessage.IShellMessage<T>>;
}

/**
* Send a control message to the kernel.
*
* #### Notes
* Send a message to the kernel's control channel, yielding a future object
* for accepting replies.
*
* If `expectReply` is given and `true`, the future is disposed when both a
* control reply and an idle status message are received. If `expectReply`
* is not given or is `false`, the future is resolved when an idle status
* message is received.
* If `disposeOnDone` is not given or is `true`, the Future is disposed at this point.
* If `disposeOnDone` is given and `false`, it is up to the caller to dispose of the Future.
*
* All replies are validated as valid kernel messages.
*
* If the kernel status is `dead`, this will throw an error.
*/
sendControlMessage<T extends KernelMessage.ControlMessageType>(
msg: KernelMessage.IControlMessage<T>,
expectReply = false,
disposeOnDone = true
): Kernel.IControlFuture<KernelMessage.IControlMessage<T>> {
return this._sendKernelShellControl(
KernelControlFutureHandler,
msg,
expectReply,
disposeOnDone
) as Kernel.IControlFuture<KernelMessage.IControlMessage<T>>;
}

private _sendKernelShellControl<
REQUEST extends KernelMessage.IShellControlMessage,
REPLY extends KernelMessage.IShellControlMessage,
KFH extends new (...params: any[]) => KernelFutureHandler<REQUEST, REPLY>,
T extends KernelMessage.IMessage
>(
ctor: KFH,
msg: T,
expectReply = false,
disposeOnDone = true
): Kernel.IFuture<
KernelMessage.IShellControlMessage,
KernelMessage.IShellControlMessage
> {
if (this.status === 'dead') {
throw new Error('Kernel is dead');
}
Expand All @@ -273,7 +330,7 @@ export class DefaultKernel implements Kernel.IKernel {
this._ws.send(serialize.serialize(msg));
}
this._anyMessage.emit({ msg, direction: 'send' });
let future = new KernelFutureHandler(
let future = new ctor(
() => {
let msgId = msg.header.msg_id;
this._futures.delete(msgId);
Expand Down Expand Up @@ -513,7 +570,7 @@ export class DefaultKernel implements Kernel.IKernel {
content: KernelMessage.IExecuteRequestMsg['content'],
disposeOnDone: boolean = true,
metadata?: JSONObject
): Kernel.IFuture<
): Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
> {
Expand All @@ -531,7 +588,11 @@ export class DefaultKernel implements Kernel.IKernel {
session: this._clientId,
content: { ...defaults, ...content }
});
return this.sendShellMessage(msg, true, disposeOnDone) as Kernel.IFuture<
return this.sendShellMessage(
msg,
true,
disposeOnDone
) as Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
>;
Expand Down Expand Up @@ -899,7 +960,13 @@ export class DefaultKernel implements Kernel.IKernel {
});
this._msgChain = Promise.resolve();
this._kernelSession = '';
this._futures = new Map<string, KernelFutureHandler>();
this._futures = new Map<
string,
KernelFutureHandler<
KernelMessage.IShellControlMessage,
KernelMessage.IShellControlMessage
>
>();
this._comms = new Map<string, Kernel.IComm>();
this._displayIdToParentIds.clear();
this._msgIdToDisplayIds.clear();
Expand Down Expand Up @@ -1232,7 +1299,13 @@ export class DefaultKernel implements Kernel.IKernel {
private _isReady = false;
private _readyPromise = new PromiseDelegate<void>();
private _initialized = false;
private _futures = new Map<string, KernelFutureHandler>();
private _futures = new Map<
string,
KernelFutureHandler<
KernelMessage.IShellControlMessage,
KernelMessage.IShellControlMessage
>
>();
private _comms = new Map<string, Kernel.IComm>();
private _targetRegistry: {
[key: string]: (
Expand Down
28 changes: 24 additions & 4 deletions packages/services/src/kernel/future.ts
Expand Up @@ -19,9 +19,9 @@ declare var setImmediate: any;
* is considered done when the `idle` status is received.
*
*/
export class KernelFutureHandler<
REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
export abstract class KernelFutureHandler<
REQUEST extends KernelMessage.IShellControlMessage,
REPLY extends KernelMessage.IShellControlMessage
> extends DisposableDelegate implements Kernel.IFuture<REQUEST, REPLY> {
/**
* Construct a new KernelFutureHandler.
Expand Down Expand Up @@ -194,8 +194,16 @@ export class KernelFutureHandler<
*/
async handleMsg(msg: KernelMessage.IMessage): Promise<void> {
switch (msg.channel) {
case 'control':
case 'shell':
await this._handleReply(msg as REPLY);
if (
msg.channel === this.msg.channel &&
(msg.parent_header as KernelMessage.IHeader<
KernelMessage.MessageType
>).msg_id === this.msg.header.msg_id
) {
await this._handleReply(msg as REPLY);
}
break;
case 'stdin':
await this._handleStdin(msg as KernelMessage.IStdinMessage);
Expand Down Expand Up @@ -290,6 +298,18 @@ export class KernelFutureHandler<
private _kernel: Kernel.IKernel;
}

export class KernelControlFutureHandler<
REQUEST extends KernelMessage.IControlMessage = KernelMessage.IControlMessage,
REPLY extends KernelMessage.IControlMessage = KernelMessage.IControlMessage
> extends KernelFutureHandler<REQUEST, REPLY>
implements Kernel.IControlFuture<REQUEST, REPLY> {}

export class KernelShellFutureHandler<
REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
> extends KernelFutureHandler<REQUEST, REPLY>
implements Kernel.IShellFuture<REQUEST, REPLY> {}

namespace Private {
/**
* A no-op function.
Expand Down
37 changes: 26 additions & 11 deletions packages/services/src/kernel/kernel.ts
Expand Up @@ -130,8 +130,13 @@ export namespace Kernel {
msg: KernelMessage.IShellMessage<T>,
expectReply?: boolean,
disposeOnDone?: boolean
): Kernel.IFuture<KernelMessage.IShellMessage<T>>;
): Kernel.IShellFuture<KernelMessage.IShellMessage<T>>;

sendControlMessage<T extends KernelMessage.ControlMessageType>(
msg: KernelMessage.IControlMessage<T>,
expectReply?: boolean,
disposeOnDone?: boolean
): Kernel.IControlFuture<KernelMessage.IControlMessage<T>>;
/**
* Reconnect to a disconnected kernel.
*
Expand Down Expand Up @@ -268,7 +273,7 @@ export namespace Kernel {
content: KernelMessage.IExecuteRequestMsg['content'],
disposeOnDone?: boolean,
metadata?: JSONObject
): Kernel.IFuture<
): Kernel.IShellFuture<
KernelMessage.IExecuteRequestMsg,
KernelMessage.IExecuteReplyMsg
>;
Expand Down Expand Up @@ -743,8 +748,8 @@ export namespace Kernel {
* responses that may come from the kernel.
*/
export interface IFuture<
REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
REQUEST extends KernelMessage.IShellControlMessage,
REPLY extends KernelMessage.IShellControlMessage
> extends IDisposable {
/**
* The original outgoing message.
Expand Down Expand Up @@ -775,22 +780,22 @@ export namespace Kernel {
onReply: (msg: REPLY) => void | PromiseLike<void>;

/**
* The stdin handler for the kernel future.
* The iopub handler for the kernel future.
*
* #### Notes
* If the handler returns a promise, all kernel message processing pauses
* until the promise is resolved.
*/
onStdin: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>;
onIOPub: (msg: KernelMessage.IIOPubMessage) => void | PromiseLike<void>;

/**
* The iopub handler for the kernel future.
* The stdin handler for the kernel future.
*
* #### Notes
* If the handler returns a promise, all kernel message processing pauses
* until the promise is resolved.
*/
onIOPub: (msg: KernelMessage.IIOPubMessage) => void | PromiseLike<void>;
onStdin: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>;

/**
* Register hook for IOPub messages.
Expand Down Expand Up @@ -833,6 +838,16 @@ export namespace Kernel {
sendInputReply(content: KernelMessage.IInputReplyMsg['content']): void;
}

export interface IShellFuture<
REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
> extends IFuture<REQUEST, REPLY> {}

export interface IControlFuture<
REQUEST extends KernelMessage.IControlMessage = KernelMessage.IControlMessage,
REPLY extends KernelMessage.IControlMessage = KernelMessage.IControlMessage
> extends IFuture<REQUEST, REPLY> {}

/**
* A client side Comm interface.
*/
Expand Down Expand Up @@ -883,7 +898,7 @@ export namespace Kernel {
data?: JSONValue,
metadata?: JSONObject,
buffers?: (ArrayBuffer | ArrayBufferView)[]
): IFuture;
): IShellFuture;

/**
* Send a `comm_msg` message to the kernel.
Expand All @@ -906,7 +921,7 @@ export namespace Kernel {
metadata?: JSONObject,
buffers?: (ArrayBuffer | ArrayBufferView)[],
disposeOnDone?: boolean
): IFuture;
): IShellFuture;

/**
* Close the comm.
Expand All @@ -927,7 +942,7 @@ export namespace Kernel {
data?: JSONValue,
metadata?: JSONObject,
buffers?: (ArrayBuffer | ArrayBufferView)[]
): IFuture;
): IShellFuture;
}

/**
Expand Down

0 comments on commit a8e4af9

Please sign in to comment.