Skip to content

Commit

Permalink
fix #1324 (WebSocket BatchRequest) supports Batch requests via WebSoc…
Browse files Browse the repository at this point in the history
…kets
  • Loading branch information
tenbits committed Jan 13, 2023
1 parent 9ad5132 commit 6954665
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/plenty-comics-bow.md
@@ -0,0 +1,5 @@
---
"hardhat": patch
---

Add BatchRequest support for WebSocket server
Expand Up @@ -82,40 +82,23 @@ export class JsonRpcHandler {
this._provider.addListener("notification", listener);

ws.on("message", async (msg) => {
let rpcReq: JsonRpcRequest | undefined;
let rpcResp: JsonRpcResponse | undefined;
let rpcReq: JsonRpcRequest | JsonRpcRequest[];
let rpcResp: JsonRpcResponse | JsonRpcResponse[];

try {
rpcReq = _readWsRequest(msg as string);

if (!isValidJsonRequest(rpcReq)) {
throw new InvalidRequestError("Invalid request");
}

rpcResp = await this._handleRequest(rpcReq);

// If eth_subscribe was successful, keep track of the subscription id,
// so we can cleanup on websocket close.
if (
rpcReq.method === "eth_subscribe" &&
isSuccessfulJsonResponse(rpcResp)
) {
subscriptions.push(rpcResp.result);
}
rpcResp = Array.isArray(rpcReq)
? await Promise.all(
rpcReq.map((req) =>
this._handleSingleWsRequest(req, subscriptions)
)
)
: await this._handleSingleWsRequest(rpcReq, subscriptions);
} catch (error) {
rpcResp = _handleError(error);
}

// Validate the RPC response.
if (!isValidJsonResponse(rpcResp)) {
// Malformed response coming from the provider, report to user as an internal error.
rpcResp = _handleError(new InternalError("Internal error"));
}

if (rpcReq !== undefined) {
rpcResp.id = rpcReq.id;
}

ws.send(JSON.stringify(rpcResp));
});

Expand Down Expand Up @@ -155,7 +138,9 @@ export class JsonRpcHandler {
res.end(JSON.stringify(rpcResp));
}

private async _handleSingleRequest(req: any): Promise<JsonRpcResponse> {
private async _handleSingleRequest(
req: JsonRpcRequest
): Promise<JsonRpcResponse> {
if (!isValidJsonRequest(req)) {
return _handleError(new InvalidRequestError("Invalid request"));
}
Expand All @@ -181,6 +166,21 @@ export class JsonRpcHandler {

return rpcResp;
}
private async _handleSingleWsRequest(
rpcReq: JsonRpcRequest,
subscriptions: string[]
) {
const rpcResp = await this._handleSingleRequest(rpcReq);
// If eth_subscribe was successful, keep track of the subscription id,
// so we can cleanup on websocket close.
if (
rpcReq.method === "eth_subscribe" &&
isSuccessfulJsonResponse(rpcResp)
) {
subscriptions.push(rpcResp.result);
}
return rpcResp;
}

private _handleRequest = async (
req: JsonRpcRequest
Expand Down Expand Up @@ -218,7 +218,7 @@ const _readJsonHttpRequest = async (req: IncomingMessage): Promise<any> => {
return json;
};

const _readWsRequest = (msg: string): JsonRpcRequest => {
const _readWsRequest = (msg: string): JsonRpcRequest | JsonRpcRequest[] => {
let json: any;
try {
json = JSON.parse(msg);
Expand Down
Expand Up @@ -118,6 +118,34 @@ describe("Eth module", function () {
assert.equal(newLogEvent.params.subscription, subscription);
});

it("Supports single and batched requests", async function () {
const { result: accounts } = await sendMethod("eth_accounts");
const [acc1, acc2] = accounts;
const balancesAcc1Resp = await sendJson({
jsonrpc: "2.0",
id: Math.random(),
method: "eth_getBalance",
params: [acc1],
});
const balancesResp = await sendJson([
{
jsonrpc: "2.0",
id: Math.random(),
method: "eth_getBalance",
params: [acc1],
},
{
jsonrpc: "2.0",
id: Math.random(),
method: "eth_getBalance",
params: [acc2],
},
]);

assert.match(balancesAcc1Resp.result, /^0x[\dA-F]+$/i);
assert.equal(balancesAcc1Resp.result, balancesResp[0].result);
});

async function subscribeTo(event: string, ...extraParams: any[]) {
const subscriptionPromise = new Promise<string>((resolve) => {
const listener: any = (message: any) => {
Expand Down Expand Up @@ -177,6 +205,43 @@ describe("Eth module", function () {
return result;
}

async function sendJson<
TBody extends TReq | TReq[],
TReq extends {
jsonrpc: "2.0";
id: number;
method: string;
params: any[];
},
TResp extends {
jsonrpc: "2.0";
id: number;
result: any;
}
>(body: TBody): Promise<TBody extends TReq[] ? TResp[] : TResp> {
const resultPromise = new Promise<any>((resolve) => {
const listener: any = (message: any) => {
const parsedMessage = JSON.parse(message.toString());
const receivedId = Array.isArray(parsedMessage)
? parsedMessage[0]?.id
: parsedMessage.id;
const sentId = Array.isArray(body) ? body[0]?.id : body.id;

if (receivedId === sentId) {
ws.removeListener("message", listener);
resolve(parsedMessage);
}
};

ws.on("message", listener);
});

ws.send(JSON.stringify(body));
const result = await resultPromise;

return result;
}

/**
* Send `method` with `params` and get the first message that corresponds to
* the given subscription.
Expand Down

0 comments on commit 6954665

Please sign in to comment.