From 61ea7c65a64bb977c2e54ed1833d8d2190bb4007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 9 Dec 2020 12:05:06 +0100 Subject: [PATCH] [forwardport] Update `local` client to support middleware (Kudos Seun) (#589) (#600) * Update `local` client to support middleware (Kudos Seun) (#589) * v15.0.1 * v15.0.1 => v15.1.0 * v15.0.1 => v15.1.0 * cargo fmt * fix tests * adds *_with_middleware methods * remove Unpin bound, add documentation, cargo fmt * 15.0.1 * bump to 15.1.0 Co-authored-by: Seun Lanlege Co-authored-by: Niklas * cargo fmt --all Co-authored-by: Seun Lanlege Co-authored-by: Niklas --- .../transports/src/transports/local.rs | 77 +++++++++++++++---- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/core-client/transports/src/transports/local.rs b/core-client/transports/src/transports/local.rs index e4d91eaef..da91a554b 100644 --- a/core-client/transports/src/transports/local.rs +++ b/core-client/transports/src/transports/local.rs @@ -6,7 +6,7 @@ use futures::{ task::{Context, Poll}, Future, Sink, SinkExt, Stream, StreamExt, }; -use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata}; +use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Middleware}; use jsonrpc_pubsub::Session; use std::ops::Deref; use std::pin::Pin; @@ -26,10 +26,11 @@ enum Buffered { None, } -impl LocalRpc +impl LocalRpc where TMetadata: Metadata, - THandler: Deref>, + TMiddleware: Middleware, + THandler: Deref>, { /// Creates a new `LocalRpc` with default metadata. pub fn new(handler: THandler) -> Self @@ -50,10 +51,11 @@ where } } -impl Stream for LocalRpc +impl Stream for LocalRpc where TMetadata: Metadata + Unpin, - THandler: Deref> + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, { type Item = String; @@ -62,10 +64,11 @@ where } } -impl LocalRpc +impl LocalRpc where TMetadata: Metadata + Unpin, - THandler: Deref> + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, { fn poll_buffered(&mut self, cx: &mut Context) -> Poll> { let response = match self.buffered { @@ -87,10 +90,11 @@ where } } -impl Sink for LocalRpc +impl Sink for LocalRpc where TMetadata: Metadata + Unpin, - THandler: Deref> + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, { type Error = RpcError; @@ -121,14 +125,15 @@ where } } -/// Connects to a `Deref`. -pub fn connect_with_metadata( +/// Connects to a `Deref` specifying a custom middleware implementation. +pub fn connect_with_metadata_and_middleware( handler: THandler, meta: TMetadata, ) -> (TClient, impl Future>) where TClient: From, - THandler: Deref> + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, TMetadata: Metadata + Unpin, { let (sink, stream) = LocalRpc::with_metadata(handler, meta).split(); @@ -137,24 +142,53 @@ where (client, rpc_client) } +/// Connects to a `Deref`. +pub fn connect_with_metadata( + handler: THandler, + meta: TMetadata, +) -> (TClient, impl Future>) +where + TClient: From, + TMetadata: Metadata + Unpin, + THandler: Deref> + Unpin, +{ + connect_with_metadata_and_middleware(handler, meta) +} + +/// Connects to a `Deref` specifying a custom middleware implementation. +pub fn connect_with_middleware( + handler: THandler, +) -> (TClient, impl Future>) +where + TClient: From, + TMetadata: Metadata + Default + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, +{ + connect_with_metadata_and_middleware(handler, Default::default()) +} + /// Connects to a `Deref`. pub fn connect(handler: THandler) -> (TClient, impl Future>) where TClient: From, - THandler: Deref> + Unpin, TMetadata: Metadata + Default + Unpin, + THandler: Deref> + Unpin, { - connect_with_metadata(handler, Default::default()) + connect_with_middleware(handler) } /// Metadata for LocalRpc. pub type LocalMeta = Arc; -/// Connects with pubsub. -pub fn connect_with_pubsub(handler: THandler) -> (TClient, impl Future>) +/// Connects with pubsub specifying a custom middleware implementation. +pub fn connect_with_pubsub_and_middleware( + handler: THandler, +) -> (TClient, impl Future>) where TClient: From, - THandler: Deref> + Unpin, + TMiddleware: Middleware + Unpin, + THandler: Deref> + Unpin, { let (tx, rx) = mpsc::unbounded(); let meta = Arc::new(Session::new(tx)); @@ -164,3 +198,12 @@ where let client = TClient::from(sender); (client, rpc_client) } + +/// Connects with pubsub. +pub fn connect_with_pubsub(handler: THandler) -> (TClient, impl Future>) +where + TClient: From, + THandler: Deref> + Unpin, +{ + connect_with_pubsub_and_middleware(handler) +}