-
Notifications
You must be signed in to change notification settings - Fork 276
/
pubsub-macros.rs
111 lines (91 loc) · 2.88 KB
/
pubsub-macros.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use serde_json;
use jsonrpc_core;
use jsonrpc_pubsub;
#[macro_use]
extern crate jsonrpc_derive;
use std::sync::Arc;
use jsonrpc_core::futures::sync::mpsc;
use jsonrpc_pubsub::{PubSubHandler, SubscriptionId, Session, PubSubMetadata};
use jsonrpc_pubsub::typed::Subscriber;
pub enum MyError {}
impl From<MyError> for jsonrpc_core::Error {
fn from(_e: MyError) -> Self {
unreachable!()
}
}
type Result<T> = ::std::result::Result<T, MyError>;
#[rpc]
pub trait Rpc {
type Metadata;
/// Hello subscription
#[pubsub(subscription = "hello", subscribe, name = "hello_subscribe", alias("hello_alias"))]
fn subscribe(&self, _: Self::Metadata, _: Subscriber<String>, _: u32, _: Option<u64>);
/// Unsubscribe from hello subscription.
#[pubsub(subscription = "hello", unsubscribe, name = "hello_unsubscribe")]
fn unsubscribe(&self, _: Option<Self::Metadata>, _: SubscriptionId) -> Result<bool>;
/// A regular rpc method alongside pubsub
#[rpc(name = "add")]
fn add(&self, _: u64, _: u64) -> Result<u64>;
}
#[derive(Default)]
struct RpcImpl;
impl Rpc for RpcImpl {
type Metadata = Metadata;
fn subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<String>, _pre: u32, _trailing: Option<u64>) {
let _sink = subscriber.assign_id(SubscriptionId::Number(5));
}
fn unsubscribe(&self, _meta: Option<Self::Metadata>, _id: SubscriptionId) -> Result<bool> {
Ok(true)
}
fn add(&self, a: u64, b: u64) -> Result<u64> {
Ok(a + b)
}
}
#[derive(Clone, Default)]
struct Metadata;
impl jsonrpc_core::Metadata for Metadata {}
impl PubSubMetadata for Metadata {
fn session(&self) -> Option<Arc<Session>> {
let (tx, _rx) = mpsc::channel(1);
Some(Arc::new(Session::new(tx)))
}
}
#[test]
fn test_invalid_trailing_pubsub_params() {
let mut io = PubSubHandler::default();
let rpc = RpcImpl::default();
io.extend_with(rpc.to_delegate());
// when
let meta = Metadata;
let req = r#"{"jsonrpc":"2.0","id":1,"method":"hello_subscribe","params":[]}"#;
let res = io.handle_request_sync(req, meta);
let expected = r#"{
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "`params` should have at least 1 argument(s)"
},
"id": 1
}"#;
let expected: jsonrpc_core::Response = serde_json::from_str(expected).unwrap();
let result: jsonrpc_core::Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_subscribe_with_alias() {
let mut io = PubSubHandler::default();
let rpc = RpcImpl::default();
io.extend_with(rpc.to_delegate());
// when
let meta = Metadata;
let req = r#"{"jsonrpc":"2.0","id":1,"method":"hello_alias","params":[1]}"#;
let res = io.handle_request_sync(req, meta);
let expected = r#"{
"jsonrpc": "2.0",
"result": 5,
"id": 1
}"#;
let expected: jsonrpc_core::Response = serde_json::from_str(expected).unwrap();
let result: jsonrpc_core::Response = serde_json::from_str(&res.unwrap()).unwrap();
assert_eq!(expected, result);
}