Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Theia 技术揭秘之 JSON-RPC 通信 #85

Open
Pines-Cheng opened this issue Jan 5, 2021 · 4 comments
Open

Theia 技术揭秘之 JSON-RPC 通信 #85

Pines-Cheng opened this issue Jan 5, 2021 · 4 comments
Labels

Comments

@Pines-Cheng
Copy link
Owner

Pines-Cheng commented Jan 5, 2021

image

Theia 框架前端 UI 布局和 Services 一样,具备灵活可拓展的特点。VSCode 是内置了一套基本的组件系统,而 Theia 框架的 UI 布局基于 PhosphorJS 框架。 PhosphorJS 提供了包含 widgets、layouts、事件和数据结构的丰富工具包。这使得开发人员能够构建可扩展的、高性能的、类桌面的 Web 应用程序,比如 JupyterLab。

PhosphorJS 作者退休,项目已归档,该项目现在被 Jupyter 团队重命名为 jupyterlab/lumino 继续维护。见 issue:jupyterlab/frontends-team-compass#28

写在前面

前置条件:

  1. 了解 Theia 的简单原理及前后端模块加载的方式
  2. 了解 InversifyJS 的依赖注入的原理和使用

Theia JSON RPC 实现的缺点:

  1. 概念多,什么 factory,proxy 等,server 和 client 概念有点混淆。
  2. 每次添加接口都需要实现 IServer/IClient/IWatcher,然后按照规范注入,工作量并不少
  3. 和 Inversify 、Theia 源码、后端服务耦合严重,没有独立成包

Theia JSON-RPC 协议示例

image

通信场景

  1. Server 与 Browser

通过 Websocket 信道进行通信。

  1. 插件(Web Worker)
this.postMessage(m);
  1. iframe 与 Browser (Webview)
        postMessage(channel, data) {
            window.parent.postMessage({ target: id, channel, data }, '*');
        }

添加日志调试 JSON RPC 服务

在启动后,Theia 会启动一个 Express 服务。前后端的 JSON-RPC 通信,正是基于 Express 上的 Websocket 连接。

接下来将创建调试日志系统服务,然后通过 JSON RPC 连接到它。

注册服务

因此,你要做的第一件事是暴露服务,以便前端可以连接到它。

你需要创建类似于下面这个(logger-server-module. ts)的后端服务器模块文件:

import { ContainerModule } from 'inversify';
import { ConnectionHandler, JsonRpcConnectionHandler } from "../../messaging/common";
import { ILoggerServer, ILoggerClient } from '../../application/common/logger-protocol';

export const loggerServerModule = new ContainerModule(bind => {
    bind(ConnectionHandler).toDynamicValue(ctx =>
        new JsonRpcConnectionHandler<ILoggerClient>("/services/logger", client => {
            const loggerServer = ctx.container.get<ILoggerServer>(ILoggerServer);
            loggerServer.setClient(client);
            return loggerServer;
        })
    ).inSingletonScope()
});

核心在于 ConnectionHandlerJsonRpcConnectionHandler

  • ConnectionHandler:是一个简单的接口,它指定连接的 path 以及 onConnection 方法。
  • JsonRpcConnectionHandler:这个工厂允许您创建一个连接处理程序,onConnection 创建代理对象到 JSON-RPC 的后端调用的对象,并将本地对象暴露给 JSON-RPC。
  • ILoggerServer:定义通过 JSON-RPC 调用的后端对象。
  • ILoggerClient:是一个 Client 对象,定义来自后端对象的通知的接收。

ConnectionHandler

ConnectionHandler 类型绑定到 messaging-module.ts 中的 ContributionProvider

MessagingContribution 启动(调用 onStart)时,它为所有绑定 ConnectionHandlers 创建一个 Websocket 连接。

即依次在 Server 注册 path,并绑定 onConnection 事件。

// packages/core/src/node/messaging/messaging-contribution.ts
export class MessagingContribution implements BackendApplicationContribution, MessagingService {
  constructor( @inject(ContributionProvider) @named(ConnectionHandler) protected readonly handlers: 
  ContributionProvider<ConnectionHandler>) {
      }

    // 服务启动时调用
    onStart(server: http.Server): void {
        // 遍历
        for (const handler of this.handlers.getContributions()) {
            const path = handler.path;
            try {
                createServerWebSocketConnection({
                    server,
                    path
                }, connection => handler.onConnection(connection));
            } catch (error) {
                console.error(error)
            }
        }
    }
}

JsonRpcConnectionHandler

我们看看一下 JsonRpcConnectionHandler 的实现,就会发现 onConnection 做了三件事:

  1. 基于 JsonRpcProxyFactory 和传入的 path 创建 factory 实例
  2. 通过 createProxy 方法创建代理 proxy
  3. 从 factory 创建一个代理对象:factory.target = this.targetFactory(proxy);
  4. 将 factory 和 connection 连接起来

第三步将调用 new JsonRpcConnectionHandler( ) 传入的函数:

        client => {
            const loggerServer = ctx.container.get<ILoggerServer>(ILoggerServer);
            loggerServer.setClient(client);
            return loggerServer;
        }

这将在 loggerServer 上设置 Client,在这种情况下,用于向前端发送 onLogLevelChanged 通知。

// packages/core/src/common/messaging/proxy-factory.ts
export class JsonRpcConnectionHandler<T extends object> implements ConnectionHandler {
    constructor(
        readonly path: string,
        readonly targetFactory: (proxy: JsonRpcProxy<T>) => any,
        readonly factoryConstructor: new () => JsonRpcProxyFactory<T> = JsonRpcProxyFactory
    ) { }
        onConnection(connection: MessageConnection): void {
	        // 1. 在 path “logger” 上创建了一个 JsonRpcProxy
	        const factory = new JsonRpcProxyFactory(this.path);
	        
	        // 2. 在 factory 类上创建了一个代理对象
	        // 这个对象可以使用 ILoggerClient 定义的接口调用 JSON-RPC 的另一端。
	        const proxy = factory.createProxy();
	        
	        // 3. 这里调用了 new JsonRpcConnectionHandler 传入的函数 client=>{},用于 loggerServer.setClient
	        factory.target = this.targetFactory(proxy);
        
          // 4. 这将 factory 与 connection 连接了起来
	        factory.listen(connection);
        }
    }
}

这样,services/* 的请求由 Webpack dev server 处理,请参阅 webpack.config.js

'/services/*': {
    target: 'ws://localhost:3000',
    ws: true
},

Server 实现

Server 定义通过 JSON-RPC 调用的后端对象,ILoggerServer 接口如下,这里定义了 4 个方法。

// packages/core/src/common/logger-protocol.ts
export interface ILoggerServer extends JsonRpcServer<ILoggerClient> {
    setLogLevel(name: string, logLevel: number): Promise<void>;
    getLogLevel(name: string): Promise<number>;
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    log(name: string, logLevel: number, message: any, params: any[]): Promise<void>;
    child(name: string): Promise<void>;
}

继承自 JsonRpcServer

// packages/core/src/common/messaging/proxy-factory.ts
export type JsonRpcServer<Client> = Disposable & {
    /**
     * If this server is a proxy to a remote server then
     * a client is used as a local object
     * to handle JSON-RPC messages from the remote server.
     */
    setClient(client: Client | undefined): void;
    getClient?(): Client | undefined;
};

当前,源码中仅有 ConsoleLoggerServer 的实现: export class ConsoleLoggerServer implements ILoggerServer {}

Client 实现

Client 用于定义接收来自后端对象的通知,DispatchingLoggerClient 实现如下:

// packages/core/src/common/logger-protocol.ts
@injectable()
export class DispatchingLoggerClient implements ILoggerClient {

    readonly clients = new Set<ILoggerClient>();

    onLogLevelChanged(event: ILogLevelChangedEvent): void {
        this.clients.forEach(client => client.onLogLevelChanged(event));
    }

}

前端连接服务

上面我们创建了后端服务,接下来我们需要从前端连接它。

分为以下三步:

  1. 创建了一个 watcher,使用 loggerWatcher Client 从后端获取事件通知
  2. 获得了 Websocket 连接
  3. 通过loggerWatcher.getLoggerClient()获得本地对象,用来来处理来自远程对象的 JSON-RPC 消息,通过传入 createProxy 创建一个代理
// logger-frontend-module. ts
import { ContainerModule, Container } from 'inversify';
import { WebSocketConnectionProvider } from '../../messaging/browser/connection';
import { ILogger, LoggerFactory, LoggerOptions, Logger } from '../common/logger';
import { ILoggerServer } from '../common/logger-protocol';
import { LoggerWatcher } from '../common/logger-watcher';

export const loggerFrontendModule = new ContainerModule(bind => {
    bind(ILogger).to(Logger).inSingletonScope();
    	  // 1. 这里创建了一个 watcher,使用 loggerWatcher Client从后端获取事件通知
    bind(LoggerWatcher).toSelf().inSingletonScope();
    bind(ILoggerServer).toDynamicValue(ctx => {
        const loggerWatcher = ctx.container.get(LoggerWatcher);
	// 2. 这里获得了一个 Websocket 连接
        const connection = ctx.container.get(WebSocketConnectionProvider);
		// 3. 这里,我们传入了一个用于处理 JSON-RPC 的对象。
        return connection.createProxy<ILoggerServer>("/services/logger", loggerWatcher.getLoggerClient());
    }).inSingletonScope();
});

WebSocketConnectionProvider 的 connection.createProxy 实际执行以下代码:

// packages/core/src/common/messaging/abstract-connection-provider.ts
export abstract class AbstractConnectionProvider<AbstractOptions extends object> {
    /**
     * Create a proxy object to remote interface of T type
     * over a web socket connection for the given path.
     */
    createProxy<T extends object>(path: string, arg?: object): JsonRpcProxy<T> {
        const factory = arg instanceof JsonRpcProxyFactory ? arg : new JsonRpcProxyFactory<T>(arg);
        this.listen({
            path,
            onConnection: c => factory.listen(c)
        });
        return factory.createProxy();
    }
    /**
     * Install a connection handler for the given path.
     */
    listen(handler: ConnectionHandler, options?: AbstractOptions): void {
        this.openChannel(handler.path, channel => {
            const connection = createWebSocketConnection(channel, this.createLogger());
            connection.onDispose(() => channel.close());
            handler.onConnection(connection);
        }, options);
    }
}

接下来,即可使用 ILoggerService 获取对象进行 RPC 调用。

LoggerWatcher

LoggerWatcher 定义了 onLogLevelChanged 的消息响应。

@injectable()
export class LoggerWatcher {

    getLoggerClient(): ILoggerClient {
        const emitter = this.onLogLevelChangedEmitter;
        return {
            onLogLevelChanged(event: ILogLevelChangedEvent): void {
                emitter.fire(event);
            }
        };
    }

    private onLogLevelChangedEmitter = new Emitter<ILogLevelChangedEvent>();

    get onLogLevelChanged(): Event<ILogLevelChangedEvent> {
        return this.onLogLevelChangedEmitter.event;
    }

    // FIXME: get rid of it, backend services should as well set a client on the server
    fireLogLevelChanged(event: ILogLevelChangedEvent): void {
        this.onLogLevelChangedEmitter.fire(event);
    }
}

加载模块

需要导入模块和加载进主容器两步。

// 导入模块
import { loggerServerModule } from 'theia-core/lib/application/node/logger-server-module';

// 加载进容器
container.load(loggerServerModule);

完整的通信例子可以看:

Add debug logging support · eclipse-theia/theia@99d191f

源码

核心的接口和类有:ConnectionHandler,JsonRpcConnectionHandler 以及 JsonRpcProxyFactory,搞清楚他们的作用。

ConnectionHandler

ConnectionHandler 是一个简单的接口,它指定连接的 path 以及 onConnection 方法。

export interface ConnectionHandler {
    readonly path: string;
    onConnection(connection: MessageConnection): void;
}

JsonRpcConnectionHandler

JsonRpcProxyFactoryJsonRpcConnectionHandler 中被使用。

Websocket 连接正是在 JsonRpcConnectionHandler 类上建立的。建立连接的逻辑在 JsonRpcConnectionHandler 类的 onConnection 函数上,过程如下:

// packages/core/src/common/messaging/proxy-factory.ts
export class JsonRpcConnectionHandler<T extends object> implements ConnectionHandler {
    constructor(
        readonly path: string,
        readonly targetFactory: (proxy: JsonRpcProxy<T>) => any,
        readonly factoryConstructor: new () => JsonRpcProxyFactory<T> = JsonRpcProxyFactory
    ) { }
        onConnection(connection: MessageConnection): void {
	        // 在 path “logger” 上创建了一个 JsonRpcProxy
	        const factory = new JsonRpcProxyFactory(this.path);
	        
	        // 在 factory 类上创建了一个代理对象
	        // 这个对象可以使用 ILoggerClient 定义的接口调用 JSON-RPC 的另一端。
	        const proxy = factory.createProxy();
	        
	        // 这里调用了我们在参数中传入的函数
	        factory.target = this.targetFactory(proxy);
        
          // 这将 factory 与 connection 连接了起来
	        factory.listen(connection);
        }
    }
}

JsonRpcProxyFactory

JSON RPC 的核心在于:JsonRpcProxyFactory,源码里注释很详细,还有使用 Demo,值得好好学习一下。

// packages/core/src/common/messaging/proxy-factory.ts
/**
 * Factory for JSON-RPC proxy objects.
 *
 * A JSON-RPC proxy exposes the programmatic interface of an object through
 * JSON-RPC.  This allows remote programs to call methods of this objects by
 * sending JSON-RPC requests.  This takes place over a bi-directional stream,
 * where both ends can expose an object and both can call methods each other's
 * exposed object.
 *
 * For example, assuming we have an object of the following type on one end:
 *
 *     class Foo {
 *         bar(baz: number): number { return baz + 1 }
 *     }
 *
 * which we want to expose through a JSON-RPC interface.  We would do:
 *
 *     let target = new Foo()
 *     let factory = new JsonRpcProxyFactory<Foo>('/foo', target)
 *     factory.onConnection(connection)
 *
 * The party at the other end of the `connection`, in order to remotely call
 * methods on this object would do:
 *
 *     let factory = new JsonRpcProxyFactory<Foo>('/foo')
 *     factory.onConnection(connection)
 *     let proxy = factory.createProxy();
 *     let result = proxy.bar(42)
 *     // result is equal to 43
 *
 * One the wire, it would look like this:
 *
 *     --> {"jsonrpc": "2.0", "id": 0, "method": "bar", "params": {"baz": 42}}
 *     <-- {"jsonrpc": "2.0", "id": 0, "result": 43}
 *
 * Note that in the code of the caller, we didn't pass a target object to
 * JsonRpcProxyFactory, because we don't want/need to expose an object.
 * If we had passed a target object, the other side could've called methods on
 * it.
 *
 * @param <T> - The type of the object to expose to JSON-RPC.
 */
export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {

    protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
    protected readonly onDidCloseConnectionEmitter = new Emitter<void>();

    protected connectionPromiseResolve: (connection: MessageConnection) => void;
    protected connectionPromise: Promise<MessageConnection>;

    /**
     * Build a new JsonRpcProxyFactory.
     *
     * @param target - The object to expose to JSON-RPC methods calls.  If this
     *   is omitted, the proxy won't be able to handle requests, only send them.
     */
    constructor(public target?: any) {
        this.waitForConnection();
    }

    protected waitForConnection(): void {
        this.connectionPromise = new Promise(resolve =>
            this.connectionPromiseResolve = resolve
        );
        this.connectionPromise.then(connection => {
            connection.onClose(() =>
                this.onDidCloseConnectionEmitter.fire(undefined)
            );
            this.onDidOpenConnectionEmitter.fire(undefined);
        });
    }

    /**
     * Connect a MessageConnection to the factory.
     *
     * This connection will be used to send/receive JSON-RPC requests and
     * response.
     */
    listen(connection: MessageConnection): void {
        if (this.target) {
            for (const prop in this.target) {
                if (typeof this.target[prop] === 'function') {
                    connection.onRequest(prop, (...args) => this.onRequest(prop, ...args));
                    connection.onNotification(prop, (...args) => this.onNotification(prop, ...args));
                }
            }
        }
        connection.onDispose(() => this.waitForConnection());
        connection.listen();
        this.connectionPromiseResolve(connection);
    }

    /**
     * Process an incoming JSON-RPC method call.
     *
     * onRequest is called when the JSON-RPC connection received a method call
     * request.  It calls the corresponding method on [[target]].
     *
     * The return value is a Promise object that is resolved with the return
     * value of the method call, if it is successful.  The promise is rejected
     * if the called method does not exist or if it throws.
     *
     * @returns A promise of the method call completion.
     */
    protected async onRequest(method: string, ...args: any[]): Promise<any> {
        try {
            return await this.target[method](...args);
        } catch (error) {
            const e = this.serializeError(error);
            if (e instanceof ResponseError) {
                throw e;
            }
            const reason = e.message || '';
            const stack = e.stack || '';
            console.error(`Request ${method} failed with error: ${reason}`, stack);
            throw e;
        }
    }

    /**
     * Process an incoming JSON-RPC notification.
     *
     * Same as [[onRequest]], but called on incoming notifications rather than
     * methods calls.
     */
    protected onNotification(method: string, ...args: any[]): void {
        this.target[method](...args);
    }

    /**
     * Create a Proxy exposing the interface of an object of type T.  This Proxy
     * can be used to do JSON-RPC method calls on the remote target object as
     * if it was local.
     *
     * If `T` implements `JsonRpcServer` then a client is used as a target object for a remote target object.
     */
    createProxy(): JsonRpcProxy<T> {
        const result = new Proxy<T>(this as any, this);
        return result as any;
    }

    /**
     * Get a callable object that executes a JSON-RPC method call.
     *
     * Getting a property on the Proxy object returns a callable that, when
     * called, executes a JSON-RPC call.  The name of the property defines the
     * method to be called.  The callable takes a variable number of arguments,
     * which are passed in the JSON-RPC method call.
     *
     * For example, if you have a Proxy object:
     *
     *     let fooProxyFactory = JsonRpcProxyFactory<Foo>('/foo')
     *     let fooProxy = fooProxyFactory.createProxy()
     *
     * accessing `fooProxy.bar` will return a callable that, when called,
     * executes a JSON-RPC method call to method `bar`.  Therefore, doing
     * `fooProxy.bar()` will call the `bar` method on the remote Foo object.
     *
     * @param target - unused.
     * @param p - The property accessed on the Proxy object.
     * @param receiver - unused.
     * @returns A callable that executes the JSON-RPC call.
     */
    get(target: T, p: PropertyKey, receiver: any): any {
        if (p === 'setClient') {
            return (client: any) => {
                this.target = client;
            };
        }
        if (p === 'getClient') {
            return () => this.target;
        }
        if (p === 'onDidOpenConnection') {
            return this.onDidOpenConnectionEmitter.event;
        }
        if (p === 'onDidCloseConnection') {
            return this.onDidCloseConnectionEmitter.event;
        }
        const isNotify = this.isNotification(p);
        return (...args: any[]) => {
            const method = p.toString();
            const capturedError = new Error(`Request '${method}' failed`);
            return this.connectionPromise.then(connection =>
                new Promise((resolve, reject) => {
                    try {
                        if (isNotify) {  
                            // sendNotification
                            connection.sendNotification(method, ...args);
                            resolve();
                        } else {
                            // sendRequest
                            const resultPromise = connection.sendRequest(method, ...args) as Promise<any>;
                            resultPromise
                                .catch((err: any) => reject(this.deserializeError(capturedError, err)))
                                .then((result: any) => resolve(result));
                        }
                    } catch (err) {
                        reject(err);
                    }
                })
            );
        };
    }

    /**
     * Return whether the given property represents a notification.
     *
     * A property leads to a notification rather than a method call if its name
     * begins with `notify` or `on`.
     *
     * @param p - The property being called on the proxy.
     * @return Whether `p` represents a notification.
     */
    protected isNotification(p: PropertyKey): boolean {
        return p.toString().startsWith('notify') || p.toString().startsWith('on');
    }

    protected serializeError(e: any): any {
        if (ApplicationError.is(e)) {
            return new ResponseError(e.code, '',
                Object.assign({ kind: 'application' }, e.toJson())
            );
        }
        return e;
    }
    protected deserializeError(capturedError: Error, e: any): any {
        if (e instanceof ResponseError) {
            const capturedStack = capturedError.stack || '';
            if (e.data && e.data.kind === 'application') {
                const { stack, data, message } = e.data;
                return ApplicationError.fromJson(e.code, {
                    message: message || capturedError.message,
                    data,
                    stack: `${capturedStack}\nCaused by: ${stack}`
                });
            }
            e.stack = capturedStack;
        }
        return e;
    }

}

写在最后

个人还是觉得 cyrus-and/chrome-remote-interface 使用协议定义文件自动生成方式更优雅,代码更简洁。且独立成包,每次只需要添加 protocol 类型文件内容即可自动生成接口。

不过 chrome-remote-interface 只是一个客户端接口,并没有服务端。个人参考着设计了基于 Websocket 的 JSON RPC 协议规范和及 API。:cloudbase-interface,具有以下优点:

  1. 包含服务端和客户端
  2. 不管后端使用什么 websocket 框架,只需要提供:serverAdaptor 接口的实现即可。
  3. 使用中间件的思想扩充 API

参考

@Pines-Cheng Pines-Cheng added the IDE label Jan 5, 2021
@zzkkui
Copy link

zzkkui commented Jul 25, 2021

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

@Pines-Cheng
Copy link
Owner Author

Pines-Cheng commented Jul 27, 2021

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

@zzkkui 方法很多,最简单的方式就是通过注入 JS 脚本轮询实现。

@zzkkui
Copy link

zzkkui commented Jul 27, 2021

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

@zzkkui 方法很多,最简单的方式就是通过注入 JS 脚本轮询实现。

对,肯定会采用轮询,目前的话我觉得有两种方法可行,一种就是通过mouseover 事件,另外一种就是追踪 ws 的message,判断是否是像下面这种状态
image
image
但是如何在不修改theia源码的情况下,我还没找方法通过扩展的方式来拦截
https://github.com/eclipse-theia/theia/blob/65509341ebf2e6c2aedd98c066c9d405a111200f/packages/core/src/common/messaging/web-socket-channel.ts#L118-L122
或者
https://github.com/eclipse-theia/theia/blob/65509341ebf2e6c2aedd98c066c9d405a111200f/packages/core/src/common/messaging/abstract-connection-provider.ts#L107-L116

不知道您这边有做过相关研究没。谢谢

@Pines-Cheng
Copy link
Owner Author

Pines-Cheng commented Jul 30, 2021

@zzkkui 分为两种情况吧:

  1. 页面关闭。
  2. 页面超过时间无活动,我觉得直接监听 mouseover 等事件,加个 debounce 触发弹窗或关闭就行了,没必要做到 ws 那么细的粒度,除非有额外的需求。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants