Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging middleware #614

Open
PumpkinSeed opened this issue Feb 9, 2021 · 7 comments
Open

Logging middleware #614

PumpkinSeed opened this issue Feb 9, 2021 · 7 comments

Comments

@PumpkinSeed
Copy link

Firstly thanks for the library. :)

I wanted to implement a logging middleware but I had a problem. There is the actual implementation.

struct LoggerMiddleware{}

impl RequestMiddleware for LoggerMiddleware {
    fn on_request(&self, request: Request<Body>) -> RequestMiddlewareAction {
        info!("Incoming {:?}", request);
        RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors: false, request }
    }
}

It logged that line:

[2021-02-09T20:09:26Z INFO  ethock_lib::server] Incoming Request { method: POST, uri: /, version: HTTP/1.1, headers: {"content-type": "application/json", "content-length": "72", "accept": "*/*", "host": "127.0.0.1:8545"}, body: Body(Streaming) }
// important part: 
body: Body(Streaming)

Is there any way that I can make it visible?

@tomusdrw
Copy link
Contributor

tomusdrw commented Feb 9, 2021

I'm afraid it's not really possible. Body is going to be a Stream, so you need to poll next elements in an async context (and the sender controls the rate at which it is being sent) and the middleware has to respond immediately.

You could block (see tokio::spawn_blocking) and read the response, but performance-wise it might not be the best idea.

@PumpkinSeed
Copy link
Author

PumpkinSeed commented Feb 10, 2021

Thanks for the help. The performance not a problem, since the package purpose will be logging layer, so that would be the functionality to log these out.

use jsonrpc_http_server::hyper::{Request, Body};

...

struct LoggerMiddleware{}

impl RequestMiddleware for LoggerMiddleware {
    fn on_request(&self, request: Request<Body>) -> RequestMiddlewareAction {
        let body = request.body();
        let res = futures::executor::block_on(p(body.clone()));
        info!("Incoming {:?}", res);
        RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors: false, request }
    }
}

async fn p(body: &Body) -> String {
    hyper::body::to_bytes(body);
    "done computing".to_string()
}

This is my current code, but it's throw me an error:

error[E0277]: the trait bound `&jsonrpc_http_server::hyper::Body: hyper::body::HttpBody` is not satisfied
   --> src/server.rs:563:27
    |
563 |     hyper::body::to_bytes(body);
    |                           ^^^^ the trait `hyper::body::HttpBody` is not implemented for `&jsonrpc_http_server::hyper::Body`
    | 
   ::: /home/loow/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.4/src/body/to_bytes.rs:18:8
    |
18  |     T: HttpBody,
    |        -------- required by this bound in `hyper::body::to_bytes`

That's weird because the IDE tells me that it's implementing the HttpBody trait and show me the poll_data function as well.

Exact implementation

@tomusdrw
Copy link
Contributor

@PumpkinSeed we use hyper = "0.13" and you seem to be importing hyper = "0.14"

@PumpkinSeed
Copy link
Author

Yes that was the issue. I made some modification, and it's compile now (which should be a good point in Rust), but I got timeout.

struct LoggerMiddleware{}

impl RequestMiddleware for LoggerMiddleware {
    fn on_request(&self, mut request: Request<Body>) -> RequestMiddlewareAction {
        println!("---------- 11");
        let body = request.body_mut();
        println!("---------- 12");
        let res = futures::executor::block_on(p(body));
        println!("---------- 13");
        info!("Incoming {:?}", res);
        println!("---------- 14");
        RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors: false, request }
    }
}

async fn p(body: &mut Body) -> String {
    println!("---------- 21");
    match hyper::body::to_bytes(body).await {
        Ok(data) => {println!("test: {:?}", data)}
        Err(err) =>{println!("error: {:?}", err)}
    };
    println!("---------- 22");
    "done computing".to_string()
}

The output:

---------- 11
---------- 12
---------- 21
error sending request for url (http://127.0.0.1:8545/): operation timed out

I do really appreciate the help. If I completely figure out I will contribute back with an example details the usage. Maybe it will be a big help in the future.

@tomusdrw
Copy link
Contributor

So hyper is running on top of tokio=0.2 thread pool, futures::executor::block_on is spawning another runtime and blocking the tokio thread (without really notifying tokio about that), which may cause deadlocks like that, especially given that we are polling hyper component (body), which most likely MUST be running on tokio and not on futures::executor::LocalExecutor.

Could you please instead try using tokio::task::block_in_place, which uses tokio's threadpool and marks the thread as doing a blocking operation.

Also please double note that this is really not a recommended way of using middlewares, and it will most likely back fire again in the (hopefuly not) near future.

@zzhengzhuo
Copy link

zzhengzhuo commented Oct 31, 2021

Will it be better to use tracing like this?

#[derive(Debug, Default)]
struct LoggingMiddleware{
}
#[derive(Clone, Debug, Default)]
struct Meta(());
impl Metadata for Meta {}

impl Middleware<Meta> for LoggingMiddleware {
    type Future = FutureResponse;
    type CallFuture = middleware::NoopCallFuture;

    fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X>
    where
        F: FnOnce(Request, Meta) -> X + Send,
        X: Future<Output = Option<Response>> + Send + 'static,
    {
        let start = Instant::now();
        let req_id = Uuid::new_v4().to_string();
        let span = info_span!("jsonrpc",request_id = %req_id);
        let _entered = span.entered();
        let span1= info_span!("json calling");
        log::info!("request: {}",serde_json::to_string_pretty(&request).unwrap_or_default());

        Either::Left(Box::pin(next(request, meta).map(move |res| {
            if let Some(v) = &res {
                log::info!("response:{}", serde_json::to_string_pretty(&v).unwrap_or_default());
            }
            log::info!("Processing took: {:.6}", start.elapsed().as_micros() as f64/1000_000.0);
            res
        }).instrument(span1)))
    }
  
}

@peng-huang-ch
Copy link

@tomusdrw It's my output. hope can help u

Processing request Single(MethodCall(MethodCall { jsonrpc: Some(V2), method: "say_hello", params: Array([]), id: Num(123) }))
Processing took: 121.083µs
use jsonrpc_http_server::jsonrpc_core::futures_util::{future::Either, FutureExt};
use jsonrpc_http_server::jsonrpc_core::*;
use jsonrpc_http_server::{cors::AccessControlAllowHeaders, hyper, RestApi, ServerBuilder};
use std::future::Future;
use std::sync::atomic::AtomicUsize;
use std::time::Instant;

#[derive(Default, Clone)]
struct Meta {
    auth: Option<String>,
}

impl Metadata for Meta {}
#[derive(Default)]
struct LoggerMiddleware(AtomicUsize);

impl Middleware<Meta> for LoggerMiddleware {
    type Future = FutureResponse;
    type CallFuture = middleware::NoopCallFuture;

    fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X>
    where
        F: FnOnce(Request, Meta) -> X + Send,
        X: Future<Output = Option<Response>> + Send + 'static,
    {
        let start = Instant::now();
        println!("Processing request {:?}", request);

        Either::Left(Box::pin(next(request, meta).map(move |res| {
            println!("Processing took: {:?}", start.elapsed());
            res
        })))
    }
}
fn main() {
    env_logger::init();
    let mut io = MetaIoHandler::new(Compatibility::Both, LoggerMiddleware::default());

    io.add_method_with_meta("say_hello", |_params: Params, meta: Meta| {
        let auth = meta.auth.unwrap_or_else(String::new);
        futures::future::ready(if auth.as_str() == "let-me-in" {
            Ok(Value::String("Hello World!".to_owned()))
        } else {
            Ok(Value::String("Access Denied!".to_owned()))
        })
    });

    let server = ServerBuilder::new(io)
        .cors_allow_headers(AccessControlAllowHeaders::Only(vec![
            "Authorization".to_owned()
        ]))
        .rest_api(RestApi::Unsecure)
        // You can also implement `MetaExtractor` trait and pass a struct here.
        .meta_extractor(|req: &hyper::Request<hyper::Body>| {
            let auth = req
                .headers()
                .get(hyper::header::AUTHORIZATION)
                .map(|h| h.to_str().unwrap_or("").to_owned());

            Meta { auth }
        })
        .start_http(&"127.0.0.1:3030".parse().unwrap())
        .expect("Unable to start RPC server");

    server.wait();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants