Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make handling comm messages optional in a kernel connection. #6929

Merged
merged 5 commits into from Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 52 additions & 8 deletions packages/services/src/kernel/default.ts
Expand Up @@ -5,7 +5,7 @@ import { URLExt } from '@jupyterlab/coreutils';

import { UUID } from '@phosphor/coreutils';

import { ArrayExt, each, find } from '@phosphor/algorithm';
import { ArrayExt, each, find, some } from '@phosphor/algorithm';

import { JSONExt, JSONObject, PromiseDelegate } from '@phosphor/coreutils';

Expand Down Expand Up @@ -61,6 +61,8 @@ export class DefaultKernel implements Kernel.IKernel {
options.serverSettings || ServerConnection.makeSettings();
this._clientId = options.clientId || UUID.uuid4();
this._username = options.username || '';
this.handleComms =
options.handleComms === undefined ? true : options.handleComms;

void this._readyPromise.promise.then(() => {
this._sendPending();
Expand All @@ -82,6 +84,18 @@ export class DefaultKernel implements Kernel.IKernel {
*/
readonly serverSettings: ServerConnection.ISettings;

/**
* Handle comm messages
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
readonly handleComms: boolean;

/**
* A signal emitted when the kernel status changes.
*/
Expand Down Expand Up @@ -215,12 +229,14 @@ export class DefaultKernel implements Kernel.IKernel {
/**
* Clone the current kernel with a new clientId.
*/
clone(): Kernel.IKernel {
clone(options: Kernel.IOptions = {}): Kernel.IKernel {
return new DefaultKernel(
{
name: this._name,
username: this._username,
serverSettings: this.serverSettings
serverSettings: this.serverSettings,
handleComms: this.handleComms,
...options
},
this._id
);
Expand Down Expand Up @@ -712,11 +728,16 @@ export class DefaultKernel implements Kernel.IKernel {
*
* #### Notes
* If a client-side comm already exists with the given commId, it is returned.
* An error is thrown if the kernel does not handle comms.
*/
connectToComm(
targetName: string,
commId: string = UUID.uuid4()
): Kernel.IComm {
if (!this.handleComms) {
throw new Error('Comms are disabled on this kernel connection');
}

if (this._comms.has(commId)) {
return this._comms.get(commId);
}
Expand Down Expand Up @@ -752,6 +773,10 @@ export class DefaultKernel implements Kernel.IKernel {
msg: KernelMessage.ICommOpenMsg
) => void | PromiseLike<void>
): void {
if (!this.handleComms) {
return;
}

this._targetRegistry[targetName] = callback;
}

Expand All @@ -763,7 +788,7 @@ export class DefaultKernel implements Kernel.IKernel {
* @param callback - The callback to remove.
*
* #### Notes
* The comm target is only removed the callback argument matches.
* The comm target is only removed if the callback argument matches.
*/
removeCommTarget(
targetName: string,
Expand All @@ -772,6 +797,10 @@ export class DefaultKernel implements Kernel.IKernel {
msg: KernelMessage.ICommOpenMsg
) => void | PromiseLike<void>
): void {
if (!this.handleComms) {
return;
}

if (!this.isDisposed && this._targetRegistry[targetName] === callback) {
delete this._targetRegistry[targetName];
}
Expand Down Expand Up @@ -1280,13 +1309,19 @@ export class DefaultKernel implements Kernel.IKernel {
}
break;
case 'comm_open':
await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
if (this.handleComms) {
await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
}
break;
case 'comm_msg':
await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
if (this.handleComms) {
await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
}
break;
case 'comm_close':
await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
if (this.handleComms) {
await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
}
break;
default:
break;
Expand Down Expand Up @@ -1663,7 +1698,16 @@ namespace Private {
return value.id === model.id;
});
if (kernel) {
return kernel.clone();
// If there is already a kernel connection handling comms, don't handle
// them in our clone, since the comm message protocol has implicit
// assumptions that only one connection is handling comm messages.
// See https://github.com/jupyter/jupyter_client/issues/263
const handleComms = !some(
runningKernels,
k => k.id === model.id && k.handleComms
);
const newKernel = kernel.clone({ handleComms });
return newKernel;
}

return new DefaultKernel({ name: model.name, serverSettings }, model.id);
Expand Down
24 changes: 24 additions & 0 deletions packages/services/src/kernel/kernel.ts
Expand Up @@ -90,6 +90,18 @@ export namespace Kernel {
*/
readonly ready: Promise<void>;

/**
* Whether the kernel connection handles comm messages.
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
handleComms: boolean;

/**
* Get the kernel spec.
*
Expand Down Expand Up @@ -637,6 +649,18 @@ export namespace Kernel {
*/
username?: string;

/**
* Whether the kernel connection should handle comm messages
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
handleComms?: boolean;

/**
* The unique identifier for the kernel client.
*/
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/config/config.spec.ts
Expand Up @@ -9,12 +9,9 @@ import { JSONObject } from '@phosphor/coreutils';

import { ConfigSection, ConfigWithDefaults } from '@jupyterlab/services';

import {
expectFailure,
handleRequest,
makeSettings,
getRequestHandler
} from '../utils';
import { expectFailure } from '@jupyterlab/testutils';

import { handleRequest, makeSettings, getRequestHandler } from '../utils';

/**
* Generate a random config section name.
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/contents/index.spec.ts
Expand Up @@ -10,12 +10,9 @@ import {
ServerConnection
} from '@jupyterlab/services';

import {
DEFAULT_FILE,
makeSettings,
expectFailure,
handleRequest
} from '../utils';
import { expectFailure } from '@jupyterlab/testutils';

import { DEFAULT_FILE, makeSettings, handleRequest } from '../utils';

const DEFAULT_DIR: Contents.IModel = {
name: 'bar',
Expand Down
42 changes: 42 additions & 0 deletions tests/test-services/src/kernel/comm.spec.ts
Expand Up @@ -7,6 +7,8 @@ import { PromiseDelegate } from '@phosphor/coreutils';

import { KernelMessage, Kernel } from '@jupyterlab/services';

import { isFulfilled } from '@jupyterlab/testutils';

import { init } from '../utils';

// Initialize fetch override.
Expand Down Expand Up @@ -82,6 +84,15 @@ describe('jupyter.services - Comm', () => {
const comm2 = kernel.connectToComm('test', '1234');
expect(comm).to.equal(comm2);
});

it('should throw an error when the kernel does not handle comms', async () => {
const kernel2 = await Kernel.startNew({
name: 'ipython',
handleComms: false
});
expect(kernel2.handleComms).to.be.false;
expect(() => kernel2.connectToComm('test', '1234')).to.throw();
});
});

describe('#registerCommTarget()', () => {
Expand All @@ -105,6 +116,37 @@ describe('jupyter.services - Comm', () => {
kernel.removeCommTarget('test', hook);
comm.dispose();
});

it('should do nothing if the kernel does not handle comms', async () => {
const promise = new PromiseDelegate<
[Kernel.IComm, KernelMessage.ICommOpenMsg]
>();
const hook = (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => {
promise.resolve([comm, msg]);
};
const kernel2 = await Kernel.startNew({
name: 'ipython',
handleComms: false
});
kernel2.registerCommTarget('test', hook);

// Request the comm creation.
await kernel2.requestExecute({ code: SEND }, true).done;

// The promise above should not be fulfilled, even after a short delay.
expect(await isFulfilled(promise.promise, 300)).to.be.false;

// The kernel comm has not been closed - we just completely ignored it.
let reply = await kernel2.requestExecute(
{ code: `assert comm._closed is False` },
true
).done;
// If the assert was false, we would get an 'error' status
expect(reply.content.status).to.equal('ok');

// Clean up
kernel2.removeCommTarget('test', hook);
});
});

describe('#commInfo()', () => {
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/kernel/ikernel.spec.ts
Expand Up @@ -11,12 +11,9 @@ import { PromiseDelegate } from '@phosphor/coreutils';

import { Kernel, KernelMessage } from '@jupyterlab/services';

import {
expectFailure,
KernelTester,
handleRequest,
testEmission
} from '../utils';
import { expectFailure, testEmission } from '@jupyterlab/testutils';

import { KernelTester, handleRequest } from '../utils';

describe('Kernel.IKernel', () => {
let defaultKernel: Kernel.IKernel;
Expand Down
24 changes: 21 additions & 3 deletions tests/test-services/src/kernel/kernel.spec.ts
Expand Up @@ -9,13 +9,13 @@ import { toArray } from '@phosphor/algorithm';

import { Kernel } from '@jupyterlab/services';

import { expectFailure, testEmission } from '@jupyterlab/testutils';

import {
expectFailure,
KernelTester,
makeSettings,
PYTHON_SPEC,
getRequestHandler,
testEmission
getRequestHandler
} from '../utils';

const PYTHON3_SPEC = JSON.parse(JSON.stringify(PYTHON_SPEC));
Expand Down Expand Up @@ -164,6 +164,24 @@ describe('kernel', () => {
expect(kernel.id).to.equal(id);
kernel.dispose();
});

it('should turn off comm handling in the new connection if it was enabled in first kernel', async () => {
const kernel = await Kernel.startNew();
expect(kernel.handleComms).to.be.true;
const kernel2 = Kernel.connectTo(kernel.model);
expect(kernel2.handleComms).to.be.false;
kernel.dispose();
kernel2.dispose();
});

it('should turn on comm handling in the new connection if it was disabled in all other connections', async () => {
const kernel = await Kernel.startNew({ handleComms: false });
expect(kernel.handleComms).to.be.false;
const kernel2 = Kernel.connectTo(defaultKernel.model);
expect(kernel2.handleComms).to.be.true;
kernel.dispose();
kernel2.dispose();
});
});

describe('Kernel.shutdown()', () => {
Expand Down
5 changes: 3 additions & 2 deletions tests/test-services/src/kernel/manager.spec.ts
Expand Up @@ -9,12 +9,13 @@ import { JSONExt } from '@phosphor/coreutils';

import { KernelManager, Kernel } from '@jupyterlab/services';

import { testEmission } from '@jupyterlab/testutils';

import {
PYTHON_SPEC,
KERNELSPECS,
handleRequest,
makeSettings,
testEmission
makeSettings
} from '../utils';

class TestManager extends KernelManager {
Expand Down
10 changes: 3 additions & 7 deletions tests/test-services/src/session/isession.spec.ts
Expand Up @@ -13,13 +13,9 @@ import { Kernel, KernelMessage } from '@jupyterlab/services';

import { Session } from '@jupyterlab/services';

import {
expectFailure,
handleRequest,
SessionTester,
init,
testEmission
} from '../utils';
import { expectFailure, testEmission } from '@jupyterlab/testutils';

import { handleRequest, SessionTester, init } from '../utils';

init();

Expand Down
4 changes: 3 additions & 1 deletion tests/test-services/src/session/manager.spec.ts
Expand Up @@ -16,7 +16,9 @@ import {
Session
} from '@jupyterlab/services';

import { KERNELSPECS, handleRequest, testEmission } from '../utils';
import { testEmission } from '@jupyterlab/testutils';

import { KERNELSPECS, handleRequest } from '../utils';

class TestManager extends SessionManager {
intercept: Kernel.ISpecModels | null = null;
Expand Down
3 changes: 2 additions & 1 deletion tests/test-services/src/session/session.spec.ts
Expand Up @@ -11,8 +11,9 @@ import { ServerConnection } from '@jupyterlab/services';

import { Session } from '@jupyterlab/services';

import { expectFailure } from '@jupyterlab/testutils';

import {
expectFailure,
makeSettings,
SessionTester,
createSessionModel,
Expand Down
3 changes: 2 additions & 1 deletion tests/test-services/src/terminal/manager.spec.ts
Expand Up @@ -10,7 +10,8 @@ import {
TerminalSession,
TerminalManager
} from '@jupyterlab/services';
import { testEmission } from '../utils';

import { testEmission } from '@jupyterlab/testutils';

describe('terminal', () => {
let manager: TerminalSession.IManager;
Expand Down