Skip to content

Commit

Permalink
Return error when unsubscribing from invalid subscription id (#619)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry committed Mar 31, 2021
1 parent 135d929 commit 014b1ec
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
1 change: 1 addition & 0 deletions pubsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde = "1.0"

[dev-dependencies]
jsonrpc-tcp-server = { version = "17.0", path = "../tcp" }
serde_json = "1.0"

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
107 changes: 91 additions & 16 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ impl Session {
}

/// Removes existing subscription.
fn remove_subscription(&self, name: &str, id: &SubscriptionId) {
self.active_subscriptions.lock().remove(&(id.clone(), name.into()));
fn remove_subscription(&self, name: &str, id: &SubscriptionId) -> bool {
self.active_subscriptions
.lock()
.remove(&(id.clone(), name.into()))
.is_some()
}
}

Expand Down Expand Up @@ -335,8 +338,11 @@ where
};
match (meta.session(), id) {
(Some(session), Some(id)) => {
session.remove_subscription(&self.notification, &id);
Box::pin(self.unsubscribe.call(id, Some(meta)))
if session.remove_subscription(&self.notification, &id) {
Box::pin(self.unsubscribe.call(id, Some(meta)))
} else {
Box::pin(future::err(core::Error::invalid_params("Invalid subscription id.")))
}
}
(Some(_), None) => Box::pin(future::err(core::Error::invalid_params("Expected subscription id."))),
_ => Box::pin(future::err(subscriptions_unavailable())),
Expand Down Expand Up @@ -392,13 +398,36 @@ mod tests {
});

// when
session.remove_subscription("test", &id);
let removed = session.remove_subscription("test", &id);
drop(session);

// then
assert_eq!(removed, true);
assert_eq!(called.load(Ordering::SeqCst), false);
}

#[test]
fn should_not_remove_subscription_if_invalid() {
// given
let id = SubscriptionId::Number(1);
let called = Arc::new(AtomicBool::new(false));
let called2 = called.clone();
let other_session = session().0;
let session = session().0;
session.add_subscription("test", &id, move |id| {
assert_eq!(id, SubscriptionId::Number(1));
called2.store(true, Ordering::SeqCst);
});

// when
let removed = other_session.remove_subscription("test", &id);
drop(session);

// then
assert_eq!(removed, false);
assert_eq!(called.load(Ordering::SeqCst), true);
}

#[test]
fn should_unregister_in_case_of_collision() {
// given
Expand Down Expand Up @@ -485,40 +514,86 @@ mod tests {
});
}

#[derive(Clone, Default)]
struct Metadata;
#[derive(Clone)]
struct Metadata(Arc<Session>);
impl core::Metadata for Metadata {}
impl PubSubMetadata for Metadata {
fn session(&self) -> Option<Arc<Session>> {
Some(Arc::new(session().0))
Some(self.0.clone())
}
}
impl Default for Metadata {
fn default() -> Self {
Self(Arc::new(session().0))
}
}

#[test]
fn should_subscribe() {
// given
let called = Arc::new(AtomicBool::new(false));
let called2 = called.clone();
let (subscribe, _) = new_subscription(
"test".into(),
move |params, _meta, _subscriber| {
move |params, _meta, subscriber: Subscriber| {
assert_eq!(params, core::Params::None);
called2.store(true, Ordering::SeqCst);
let _sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
},
|_id, _meta| async { Ok(core::Value::Bool(true)) },
);
let meta = Metadata;

// when
let meta = Metadata::default();
let result = subscribe.call(core::Params::None, meta);

// then
assert_eq!(called.load(Ordering::SeqCst), true);
assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(5)));
}

#[test]
fn should_unsubscribe() {
// given
const SUB_ID: u64 = 5;
let (subscribe, unsubscribe) = new_subscription(
"test".into(),
move |params, _meta, subscriber: Subscriber| {
assert_eq!(params, core::Params::None);
let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap();
},
|_id, _meta| async { Ok(core::Value::Bool(true)) },
);

// when
let meta = Metadata::default();
futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap();
let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID)]), meta);

// then
assert_eq!(futures::executor::block_on(result), Ok(serde_json::json!(true)));
}

#[test]
fn should_not_unsubscribe_if_invalid() {
// given
const SUB_ID: u64 = 5;
let (subscribe, unsubscribe) = new_subscription(
"test".into(),
move |params, _meta, subscriber: Subscriber| {
assert_eq!(params, core::Params::None);
let _sink = subscriber.assign_id(SubscriptionId::Number(SUB_ID)).unwrap();
},
|_id, _meta| async { Ok(core::Value::Bool(true)) },
);

// when
let meta = Metadata::default();
futures::executor::block_on(subscribe.call(core::Params::None, meta.clone())).unwrap();
let result = unsubscribe.call(core::Params::Array(vec![serde_json::json!(SUB_ID + 1)]), meta);

// then
assert_eq!(
futures::executor::block_on(result),
Err(core::Error {
code: core::ErrorCode::ServerError(-32091),
message: "Subscription rejected".into(),
code: core::ErrorCode::InvalidParams,
message: "Invalid subscription id.".into(),
data: None,
})
);
Expand Down

0 comments on commit 014b1ec

Please sign in to comment.