Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Don't send notifications to unsubscribed clients of PubSub #5960

Merged
merged 3 commits into from
Jul 10, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions rpc/src/v1/helpers/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Generic poll manager for Pub-Sub.

use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use util::Mutex;

use jsonrpc_core::futures::future::{self, Either};
Expand All @@ -34,7 +35,8 @@ struct Subscription {
method: String,
params: core::Params,
sink: mpsc::Sender<Result<core::Value, core::Error>>,
last_result: Arc<Mutex<Option<core::Output>>>,
/// a flag if subscription is still active and last returned value
last_result: Arc<(AtomicBool, Mutex<Option<core::Output>>)>,
}

/// A struct managing all subscriptions.
Expand Down Expand Up @@ -68,10 +70,10 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
{
let (sink, stream) = mpsc::channel(1);
let subscription = Subscription {
metadata: metadata,
method: method,
params: params,
sink: sink,
metadata,
method,
params,
sink,
last_result: Default::default(),
};
let id = self.subscribers.insert(subscription);
Expand All @@ -80,7 +82,9 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {

pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool {
debug!(target: "pubsub", "Removing subscription: {:?}", id);
self.subscribers.remove(id).is_some()
self.subscribers.remove(id).map(|subscription| {
subscription.last_result.0.store(true, atomic::Ordering::SeqCst);
}).is_some()
}

pub fn tick(&self) -> BoxFuture<(), ()> {
Expand All @@ -100,7 +104,12 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
let sender = subscription.sink.clone();

let result = result.and_then(move |response| {
let mut last_result = last_result.lock();
// quick check if the subscription is still valid
if last_result.0.load(atomic::Ordering::SeqCst) {
return Either::B(future::ok(()))
}

let mut last_result = last_result.1.lock();
if *last_result != response && response.is_some() {
let output = response.expect("Existence proved by the condition.");
debug!(target: "pubsub", "Got new response, sending: {:?}", output);
Expand Down