Skip to content

Commit

Permalink
Resolve concurrency with watcher trigger service (#39164)
Browse files Browse the repository at this point in the history
The watcher trigger service could attempt to modify the perWatchStats
map simultaneously from multiple threads. This would cause the
internal state to become inconsistent, in particular the count()
method may return an incorrect value for the number of watches.

This changes replaces the implementation of the map with a
ConcurrentHashMap so that its internal state remains consistent even
when accessed from mutiple threads.

Backport of: #39092
  • Loading branch information
tvernum authored Feb 20, 2019
1 parent ec2b64a commit 4aa50ed
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

Expand All @@ -29,7 +30,7 @@ public class TriggerService {

private final GroupedConsumer consumer = new GroupedConsumer();
private final Map<String, TriggerEngine> engines;
private final Map<String, TriggerWatchStats> perWatchStats = new HashMap<>();
private final Map<String, TriggerWatchStats> perWatchStats = new ConcurrentHashMap<>();

public TriggerService(Set<TriggerEngine> engines) {
Map<String, TriggerEngine> builder = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() {
Trigger trigger = mock(Trigger.class);
when(trigger.type()).thenReturn(engineType);

final String id = randomAlphaOfLengthBetween(3, 12);
Watch watch = mock(Watch.class);
when(watch.trigger()).thenReturn(trigger);
when(watch.id()).thenReturn(id);
when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
ExecutableNoneInput noneInput = new ExecutableNoneInput();
when(watch.input()).thenReturn(noneInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public void testLoadMalformedWatchRecord() throws Exception {
});
}

@AwaitsFix(bugUrl = "Supposedly fixed; https://github.com/elastic/x-pack-elasticsearch/issues/1915")
public void testLoadExistingWatchesUponStartup() throws Exception {
stopWatcher();

Expand Down

0 comments on commit 4aa50ed

Please sign in to comment.