Skip to content

Commit

Permalink
Merge pull request #6929 from jasongrout/handleComms
Browse files Browse the repository at this point in the history
Make handling comm messages optional in a kernel connection.
  • Loading branch information
blink1073 committed Aug 2, 2019
2 parents 52a3305 + 976fbb0 commit 4a6dae3
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 141 deletions.
60 changes: 52 additions & 8 deletions packages/services/src/kernel/default.ts
Expand Up @@ -5,7 +5,7 @@ import { URLExt } from '@jupyterlab/coreutils';

import { UUID } from '@phosphor/coreutils';

import { ArrayExt, each, find } from '@phosphor/algorithm';
import { ArrayExt, each, find, some } from '@phosphor/algorithm';

import { JSONExt, JSONObject, PromiseDelegate } from '@phosphor/coreutils';

Expand Down Expand Up @@ -61,6 +61,8 @@ export class DefaultKernel implements Kernel.IKernel {
options.serverSettings || ServerConnection.makeSettings();
this._clientId = options.clientId || UUID.uuid4();
this._username = options.username || '';
this.handleComms =
options.handleComms === undefined ? true : options.handleComms;

void this._readyPromise.promise.then(() => {
this._sendPending();
Expand All @@ -82,6 +84,18 @@ export class DefaultKernel implements Kernel.IKernel {
*/
readonly serverSettings: ServerConnection.ISettings;

/**
* Handle comm messages
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
readonly handleComms: boolean;

/**
* A signal emitted when the kernel status changes.
*/
Expand Down Expand Up @@ -215,12 +229,14 @@ export class DefaultKernel implements Kernel.IKernel {
/**
* Clone the current kernel with a new clientId.
*/
clone(): Kernel.IKernel {
clone(options: Kernel.IOptions = {}): Kernel.IKernel {
return new DefaultKernel(
{
name: this._name,
username: this._username,
serverSettings: this.serverSettings
serverSettings: this.serverSettings,
handleComms: this.handleComms,
...options
},
this._id
);
Expand Down Expand Up @@ -712,11 +728,16 @@ export class DefaultKernel implements Kernel.IKernel {
*
* #### Notes
* If a client-side comm already exists with the given commId, it is returned.
* An error is thrown if the kernel does not handle comms.
*/
connectToComm(
targetName: string,
commId: string = UUID.uuid4()
): Kernel.IComm {
if (!this.handleComms) {
throw new Error('Comms are disabled on this kernel connection');
}

if (this._comms.has(commId)) {
return this._comms.get(commId);
}
Expand Down Expand Up @@ -752,6 +773,10 @@ export class DefaultKernel implements Kernel.IKernel {
msg: KernelMessage.ICommOpenMsg
) => void | PromiseLike<void>
): void {
if (!this.handleComms) {
return;
}

this._targetRegistry[targetName] = callback;
}

Expand All @@ -763,7 +788,7 @@ export class DefaultKernel implements Kernel.IKernel {
* @param callback - The callback to remove.
*
* #### Notes
* The comm target is only removed the callback argument matches.
* The comm target is only removed if the callback argument matches.
*/
removeCommTarget(
targetName: string,
Expand All @@ -772,6 +797,10 @@ export class DefaultKernel implements Kernel.IKernel {
msg: KernelMessage.ICommOpenMsg
) => void | PromiseLike<void>
): void {
if (!this.handleComms) {
return;
}

if (!this.isDisposed && this._targetRegistry[targetName] === callback) {
delete this._targetRegistry[targetName];
}
Expand Down Expand Up @@ -1280,13 +1309,19 @@ export class DefaultKernel implements Kernel.IKernel {
}
break;
case 'comm_open':
await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
if (this.handleComms) {
await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
}
break;
case 'comm_msg':
await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
if (this.handleComms) {
await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
}
break;
case 'comm_close':
await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
if (this.handleComms) {
await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
}
break;
default:
break;
Expand Down Expand Up @@ -1663,7 +1698,16 @@ namespace Private {
return value.id === model.id;
});
if (kernel) {
return kernel.clone();
// If there is already a kernel connection handling comms, don't handle
// them in our clone, since the comm message protocol has implicit
// assumptions that only one connection is handling comm messages.
// See https://github.com/jupyter/jupyter_client/issues/263
const handleComms = !some(
runningKernels,
k => k.id === model.id && k.handleComms
);
const newKernel = kernel.clone({ handleComms });
return newKernel;
}

return new DefaultKernel({ name: model.name, serverSettings }, model.id);
Expand Down
24 changes: 24 additions & 0 deletions packages/services/src/kernel/kernel.ts
Expand Up @@ -90,6 +90,18 @@ export namespace Kernel {
*/
readonly ready: Promise<void>;

/**
* Whether the kernel connection handles comm messages.
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
handleComms: boolean;

/**
* Get the kernel spec.
*
Expand Down Expand Up @@ -637,6 +649,18 @@ export namespace Kernel {
*/
username?: string;

/**
* Whether the kernel connection should handle comm messages
*
* #### Notes
* The comm message protocol currently has implicit assumptions that only
* one kernel connection is handling comm messages. This option allows a
* kernel connection to opt out of handling comms.
*
* See https://github.com/jupyter/jupyter_client/issues/263
*/
handleComms?: boolean;

/**
* The unique identifier for the kernel client.
*/
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/config/config.spec.ts
Expand Up @@ -9,12 +9,9 @@ import { JSONObject } from '@phosphor/coreutils';

import { ConfigSection, ConfigWithDefaults } from '@jupyterlab/services';

import {
expectFailure,
handleRequest,
makeSettings,
getRequestHandler
} from '../utils';
import { expectFailure } from '@jupyterlab/testutils';

import { handleRequest, makeSettings, getRequestHandler } from '../utils';

/**
* Generate a random config section name.
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/contents/index.spec.ts
Expand Up @@ -10,12 +10,9 @@ import {
ServerConnection
} from '@jupyterlab/services';

import {
DEFAULT_FILE,
makeSettings,
expectFailure,
handleRequest
} from '../utils';
import { expectFailure } from '@jupyterlab/testutils';

import { DEFAULT_FILE, makeSettings, handleRequest } from '../utils';

const DEFAULT_DIR: Contents.IModel = {
name: 'bar',
Expand Down
42 changes: 42 additions & 0 deletions tests/test-services/src/kernel/comm.spec.ts
Expand Up @@ -7,6 +7,8 @@ import { PromiseDelegate } from '@phosphor/coreutils';

import { KernelMessage, Kernel } from '@jupyterlab/services';

import { isFulfilled } from '@jupyterlab/testutils';

import { init } from '../utils';

// Initialize fetch override.
Expand Down Expand Up @@ -82,6 +84,15 @@ describe('jupyter.services - Comm', () => {
const comm2 = kernel.connectToComm('test', '1234');
expect(comm).to.equal(comm2);
});

it('should throw an error when the kernel does not handle comms', async () => {
const kernel2 = await Kernel.startNew({
name: 'ipython',
handleComms: false
});
expect(kernel2.handleComms).to.be.false;
expect(() => kernel2.connectToComm('test', '1234')).to.throw();
});
});

describe('#registerCommTarget()', () => {
Expand All @@ -105,6 +116,37 @@ describe('jupyter.services - Comm', () => {
kernel.removeCommTarget('test', hook);
comm.dispose();
});

it('should do nothing if the kernel does not handle comms', async () => {
const promise = new PromiseDelegate<
[Kernel.IComm, KernelMessage.ICommOpenMsg]
>();
const hook = (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => {
promise.resolve([comm, msg]);
};
const kernel2 = await Kernel.startNew({
name: 'ipython',
handleComms: false
});
kernel2.registerCommTarget('test', hook);

// Request the comm creation.
await kernel2.requestExecute({ code: SEND }, true).done;

// The promise above should not be fulfilled, even after a short delay.
expect(await isFulfilled(promise.promise, 300)).to.be.false;

// The kernel comm has not been closed - we just completely ignored it.
let reply = await kernel2.requestExecute(
{ code: `assert comm._closed is False` },
true
).done;
// If the assert was false, we would get an 'error' status
expect(reply.content.status).to.equal('ok');

// Clean up
kernel2.removeCommTarget('test', hook);
});
});

describe('#commInfo()', () => {
Expand Down
9 changes: 3 additions & 6 deletions tests/test-services/src/kernel/ikernel.spec.ts
Expand Up @@ -11,12 +11,9 @@ import { PromiseDelegate } from '@phosphor/coreutils';

import { Kernel, KernelMessage } from '@jupyterlab/services';

import {
expectFailure,
KernelTester,
handleRequest,
testEmission
} from '../utils';
import { expectFailure, testEmission } from '@jupyterlab/testutils';

import { KernelTester, handleRequest } from '../utils';

describe('Kernel.IKernel', () => {
let defaultKernel: Kernel.IKernel;
Expand Down
24 changes: 21 additions & 3 deletions tests/test-services/src/kernel/kernel.spec.ts
Expand Up @@ -9,13 +9,13 @@ import { toArray } from '@phosphor/algorithm';

import { Kernel } from '@jupyterlab/services';

import { expectFailure, testEmission } from '@jupyterlab/testutils';

import {
expectFailure,
KernelTester,
makeSettings,
PYTHON_SPEC,
getRequestHandler,
testEmission
getRequestHandler
} from '../utils';

const PYTHON3_SPEC = JSON.parse(JSON.stringify(PYTHON_SPEC));
Expand Down Expand Up @@ -164,6 +164,24 @@ describe('kernel', () => {
expect(kernel.id).to.equal(id);
kernel.dispose();
});

it('should turn off comm handling in the new connection if it was enabled in first kernel', async () => {
const kernel = await Kernel.startNew();
expect(kernel.handleComms).to.be.true;
const kernel2 = Kernel.connectTo(kernel.model);
expect(kernel2.handleComms).to.be.false;
kernel.dispose();
kernel2.dispose();
});

it('should turn on comm handling in the new connection if it was disabled in all other connections', async () => {
const kernel = await Kernel.startNew({ handleComms: false });
expect(kernel.handleComms).to.be.false;
const kernel2 = Kernel.connectTo(defaultKernel.model);
expect(kernel2.handleComms).to.be.true;
kernel.dispose();
kernel2.dispose();
});
});

describe('Kernel.shutdown()', () => {
Expand Down
5 changes: 3 additions & 2 deletions tests/test-services/src/kernel/manager.spec.ts
Expand Up @@ -9,12 +9,13 @@ import { JSONExt } from '@phosphor/coreutils';

import { KernelManager, Kernel } from '@jupyterlab/services';

import { testEmission } from '@jupyterlab/testutils';

import {
PYTHON_SPEC,
KERNELSPECS,
handleRequest,
makeSettings,
testEmission
makeSettings
} from '../utils';

class TestManager extends KernelManager {
Expand Down
10 changes: 3 additions & 7 deletions tests/test-services/src/session/isession.spec.ts
Expand Up @@ -13,13 +13,9 @@ import { Kernel, KernelMessage } from '@jupyterlab/services';

import { Session } from '@jupyterlab/services';

import {
expectFailure,
handleRequest,
SessionTester,
init,
testEmission
} from '../utils';
import { expectFailure, testEmission } from '@jupyterlab/testutils';

import { handleRequest, SessionTester, init } from '../utils';

init();

Expand Down
4 changes: 3 additions & 1 deletion tests/test-services/src/session/manager.spec.ts
Expand Up @@ -16,7 +16,9 @@ import {
Session
} from '@jupyterlab/services';

import { KERNELSPECS, handleRequest, testEmission } from '../utils';
import { testEmission } from '@jupyterlab/testutils';

import { KERNELSPECS, handleRequest } from '../utils';

class TestManager extends SessionManager {
intercept: Kernel.ISpecModels | null = null;
Expand Down
3 changes: 2 additions & 1 deletion tests/test-services/src/session/session.spec.ts
Expand Up @@ -11,8 +11,9 @@ import { ServerConnection } from '@jupyterlab/services';

import { Session } from '@jupyterlab/services';

import { expectFailure } from '@jupyterlab/testutils';

import {
expectFailure,
makeSettings,
SessionTester,
createSessionModel,
Expand Down
3 changes: 2 additions & 1 deletion tests/test-services/src/terminal/manager.spec.ts
Expand Up @@ -10,7 +10,8 @@ import {
TerminalSession,
TerminalManager
} from '@jupyterlab/services';
import { testEmission } from '../utils';

import { testEmission } from '@jupyterlab/testutils';

describe('terminal', () => {
let manager: TerminalSession.IManager;
Expand Down

0 comments on commit 4a6dae3

Please sign in to comment.