Skip to content

Commit

Permalink
fix listener on follower if region changed
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Juncen <[email protected]>
  • Loading branch information
YuJuncen committed Feb 28, 2022
1 parent 43a0d8b commit 6a0f1d9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 15 deletions.
15 changes: 9 additions & 6 deletions components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,16 @@ where
self.resolvers.as_ref().remove(&region.id);
}
ObserveOp::RefreshResolver { region } => {
self.observer.subs.deregister_region(region.id);
let canceled = self.observer.subs.deregister_region(region.id);
self.resolvers.as_ref().remove(&region.id);
if let Err(e) = self.observe_over(&region) {
e.report(format!(
"register region {} to raftstore when refreshing",
region.get_id()
));

if canceled {
if let Err(e) = self.observe_over(&region) {
e.report(format!(
"register region {} to raftstore when refreshing",
region.get_id()
));
}
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,24 @@ impl SubscriptionTracer {
}
}

pub fn deregister_region(&self, region_id: u64) {
/// try to mark a region no longer be tracked by this observer.
/// returns whether success (it failed if the region hasn't been observed when calling this.)
pub fn deregister_region(&self, region_id: u64) -> bool {
match self.0.remove(&region_id) {
Some(o) => {
o.1.stop_observing();
info!("stop listen stream from store"; "observer" => ?o.1, "region_id"=> %region_id);
true
}
None => {
warn!("trying to deregister region not registered"; "region_id" => %region_id);
false
}
}
}

/// check whether the region_id should be observed by this observer.
pub fn should_observe(&self, region_id: u64) -> bool {
pub fn is_observing(&self, region_id: u64) -> bool {
let mut exists = false;

// The region traced, check it whether is still be observing,
Expand Down Expand Up @@ -147,7 +151,7 @@ impl<E: KvEngine> CmdObserver<E> for BackupStreamObserver {
!cb.is_empty()
&& cb.level == ObserveLevel::All
// Once the observe has been canceled by outside things, we should be able to stop.
&& self.subs.should_observe(cb.region_id)
&& self.subs.is_observing(cb.region_id)
})
.cloned()
.collect();
Expand Down Expand Up @@ -185,15 +189,15 @@ impl RegionChangeObserver for BackupStreamObserver {
_role: StateRole,
) {
match event {
RegionChangeEvent::Destroy if self.subs.should_observe(ctx.region().get_id()) => {
RegionChangeEvent::Destroy if self.subs.is_observing(ctx.region().get_id()) => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::Stop {
region: ctx.region().clone(),
})
);
}
RegionChangeEvent::Update => {
RegionChangeEvent::Update if self.subs.is_observing(ctx.region().get_id()) => {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::RefreshResolver {
Expand Down Expand Up @@ -255,9 +259,9 @@ mod tests {
} else {
panic!("unexpected message received: it is {}", task);
}
assert!(o.subs.should_observe(42));
assert!(o.subs.is_observing(42));
handle.stop_observing();
assert!(!o.subs.should_observe(42));
assert!(!o.subs.is_observing(42));
}

#[test]
Expand Down Expand Up @@ -311,14 +315,14 @@ mod tests {
o.on_role_change(&mut ctx, StateRole::Leader);
let task = rx.recv_timeout(Duration::from_millis(20));
assert!(task.is_err(), "it is {:?}", task);
assert!(!o.subs.should_observe(43));
assert!(!o.subs.is_observing(43));

// Test region out of range won't be added to observe list.
let mut ctx = ObserverContext::new(&r);
o.on_region_changed(&mut ctx, RegionChangeEvent::Create, StateRole::Leader);
let task = rx.recv_timeout(Duration::from_millis(20));
assert!(task.is_err(), "it is {:?}", task);
assert!(!o.subs.should_observe(43));
assert!(!o.subs.is_observing(43));

// Test give up subscripting when become follower.
let r = fake_region(42, b"0008", b"0009");
Expand Down

0 comments on commit 6a0f1d9

Please sign in to comment.