Skip to content

Commit

Permalink
Performance: Avoid converting responses to serde_json::Value
Browse files Browse the repository at this point in the history
Previously, all responses were converted to serde_json::Value before
being serialized to string. Since jsonrpc does not inspect the result
object in any way, this step could be skipped.

Now, result objects are serialized to string much earlier and the Value
step is skipped.

This patch has large performance benefits for huge responses: In tests
with 4.5 GB responses, the jsonrpc serialization overhead after
returning from an rpc function was reduced by around 35%. Which means
several seconds of speed-up in response times.

To accomplish this while mostly retaining API compatibility, the traits
RpcMethod, RpcMethodSync, RpcMethodSimple are now generic in their
return type and are wrapped when added to an io handler.

There's now a distinction between the parsed representation of jsonrpc
responses (Output/Response) and result of rpc functions calls
(WrapOutput/WrapResponse) to allow the latter to carry the
pre-serialized strings.

Review notes:
- I'm not happy with the WrapOutput / WrapResponse names, glad for
  suggestions.
- Similarly, rpc_wrap must be renamed and moved.
- The http handler health request is now awkward, and must extract the
  result/error from the already-serialized response. Probably worth it.
- The largest API breakage I could see is in the middleware, where the
  futures now return WrapOutput/WrapResult instead of Output/Result.
- One could make WrapOutput just be String, but having a separate type
  is likely easier to read.

See #212
  • Loading branch information
ckamm committed Sep 27, 2021
1 parent dc6b8ed commit b2e2892
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 154 deletions.
6 changes: 5 additions & 1 deletion core-client/transports/src/transports/http.rs
Expand Up @@ -139,12 +139,16 @@ mod tests {
}

fn io() -> IoHandler {
use jsonrpc_core::Result;

let mut io = IoHandler::default();
io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
_ => Ok(Value::String("world".into())),
});
io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
io.add_sync_method("fail", |_: Params| -> Result<i64> {
Err(Error::new(ErrorCode::ServerError(-34)))
});
io.add_notification("notify", |params: Params| {
let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param");
assert_eq!(value, 12);
Expand Down
2 changes: 1 addition & 1 deletion core/examples/middlewares.rs
Expand Up @@ -17,7 +17,7 @@ impl Middleware<Meta> for MyMiddleware {
fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X>
where
F: FnOnce(Request, Meta) -> X + Send,
X: Future<Output = Option<Response>> + Send + 'static,
X: Future<Output = Option<WrapResponse>> + Send + 'static,
{
let start = Instant::now();
let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst);
Expand Down
56 changes: 40 additions & 16 deletions core/src/calls.rs
@@ -1,5 +1,7 @@
use crate::types::{Error, Params, Value};
use crate::types::{Error, Id, Params, Value, Version, WrapOutput};
use crate::BoxFuture;
use futures_util::{self, future, FutureExt};
use serde::Serialize;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
Expand Down Expand Up @@ -30,23 +32,23 @@ impl<T, E> WrapFuture<T, E> for BoxFuture<Result<T, E>> {
}

/// A synchronous or asynchronous method.
pub trait RpcMethodSync: Send + Sync + 'static {
pub trait RpcMethodSync<R = Value>: Send + Sync + 'static {
/// Call method
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>>;
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>>;
}

/// Asynchronous Method
pub trait RpcMethodSimple: Send + Sync + 'static {
pub trait RpcMethodSimple<R = Value>: Send + Sync + 'static {
/// Output future
type Out: Future<Output = Result<Value, Error>> + Send;
type Out: Future<Output = Result<R, Error>> + Send;
/// Call method
fn call(&self, params: Params) -> Self::Out;
}

/// Asynchronous Method with Metadata
pub trait RpcMethod<T: Metadata>: Send + Sync + 'static {
pub trait RpcMethod<T: Metadata, R = Value>: Send + Sync + 'static {
/// Call method
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>>;
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>>;
}

/// Notification
Expand All @@ -61,11 +63,22 @@ pub trait RpcNotification<T: Metadata>: Send + Sync + 'static {
fn execute(&self, params: Params, meta: T);
}

pub trait RpcMethodWrapped<T: Metadata>: Send + Sync + 'static {
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<WrapOutput>>;
}

pub fn rpc_wrap<T: Metadata, R: Serialize + Send + 'static, F: RpcMethod<T, R>>(f: F) -> Arc<dyn RpcMethodWrapped<T>> {
Arc::new(move |params: Params, meta: T, jsonrpc: Option<Version>, id: Id| {
let result = f.call(params, meta);
result.then(move |r| future::ready(Some(WrapOutput::from(r, id, jsonrpc))))
})
}

/// Possible Remote Procedures with Metadata
#[derive(Clone)]
pub enum RemoteProcedure<T: Metadata> {
/// A method call
Method(Arc<dyn RpcMethod<T>>),
Method(Arc<dyn RpcMethodWrapped<T>>),
/// A notification
Notification(Arc<dyn RpcNotification<T>>),
/// An alias to other method,
Expand All @@ -83,23 +96,23 @@ impl<T: Metadata> fmt::Debug for RemoteProcedure<T> {
}
}

impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSimple for F
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSimple<R> for F
where
F: Fn(Params) -> X,
X: Future<Output = Result<Value, Error>>,
X: Future<Output = Result<R, Error>>,
{
type Out = X;
fn call(&self, params: Params) -> Self::Out {
self(params)
}
}

impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSync for F
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSync<R> for F
where
F: Fn(Params) -> X,
X: WrapFuture<Value, Error>,
X: WrapFuture<R, Error>,
{
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>> {
self(params).into_future()
}
}
Expand All @@ -113,13 +126,13 @@ where
}
}

impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethod<T> for F
impl<F: Send + Sync + 'static, X: Send + 'static, T, R> RpcMethod<T, R> for F
where
T: Metadata,
F: Fn(Params, T) -> X,
X: Future<Output = Result<Value, Error>>,
X: Future<Output = Result<R, Error>>,
{
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>> {
Box::pin(self(params, meta))
}
}
Expand All @@ -133,3 +146,14 @@ where
self(params, meta)
}
}

impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethodWrapped<T> for F
where
T: Metadata,
F: Fn(Params, T, Option<Version>, Id) -> X,
X: Future<Output = Option<WrapOutput>>,
{
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<WrapOutput>> {
Box::pin(self(params, meta, jsonrpc, id))
}
}
31 changes: 17 additions & 14 deletions core/src/delegates.rs
@@ -1,27 +1,28 @@
//! Delegate rpc calls

use serde::Serialize;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use crate::calls::{Metadata, RemoteProcedure, RpcMethod, RpcNotification};
use crate::types::{Error, Params, Value};
use crate::calls::{rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcNotification};
use crate::types::{Error, Params};
use crate::BoxFuture;

struct DelegateAsyncMethod<T, F> {
delegate: Arc<T>,
closure: F,
}

impl<T, M, F, I> RpcMethod<M> for DelegateAsyncMethod<T, F>
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateAsyncMethod<T, F>
where
M: Metadata,
F: Fn(&T, Params) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
T: Send + Sync + 'static,
F: Send + Sync + 'static,
{
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<R>> {
let closure = &self.closure;
Box::pin(closure(&self.delegate, params))
}
Expand All @@ -32,15 +33,15 @@ struct DelegateMethodWithMeta<T, F> {
closure: F,
}

impl<T, M, F, I> RpcMethod<M> for DelegateMethodWithMeta<T, F>
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateMethodWithMeta<T, F>
where
M: Metadata,
F: Fn(&T, Params, M) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
T: Send + Sync + 'static,
F: Send + Sync + 'static,
{
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<R>> {
let closure = &self.closure;
Box::pin(closure(&self.delegate, params, meta))
}
Expand Down Expand Up @@ -112,31 +113,33 @@ where
}

/// Adds async method to the delegate.
pub fn add_method<F, I>(&mut self, name: &str, method: F)
pub fn add_method<F, I, R>(&mut self, name: &str, method: F)
where
F: Fn(&T, Params) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
F: Send + Sync + 'static,
R: Serialize + Send + 'static,
{
self.methods.insert(
name.into(),
RemoteProcedure::Method(Arc::new(DelegateAsyncMethod {
RemoteProcedure::Method(rpc_wrap(DelegateAsyncMethod {
delegate: self.delegate.clone(),
closure: method,
})),
);
}

/// Adds async method with metadata to the delegate.
pub fn add_method_with_meta<F, I>(&mut self, name: &str, method: F)
pub fn add_method_with_meta<F, I, R>(&mut self, name: &str, method: F)
where
F: Fn(&T, Params, M) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
F: Send + Sync + 'static,
R: Serialize + Send + 'static,
{
self.methods.insert(
name.into(),
RemoteProcedure::Method(Arc::new(DelegateMethodWithMeta {
RemoteProcedure::Method(rpc_wrap(DelegateMethodWithMeta {
delegate: self.delegate.clone(),
closure: method,
})),
Expand Down

0 comments on commit b2e2892

Please sign in to comment.