Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager #6898

Merged
merged 10 commits into from
Feb 4, 2019

Conversation

leventov
Copy link
Member

Concurrent maps (i. e. ConcurrentHashMap and ConcurrentSkipListMap) should be assigned into variables of their respective type or ConcurrentMap, but not just Map.

Why this is important could be seen in CoordinatorRuleManager, where it's pretty obvious that code

ConcurrentMap<String, List<Rule>> theRules = rules.get();
if (theRules.get(dataSource) != null) {
  retVal.addAll(theRules.get(dataSource));
}

has a race condition, but previously when the type of the variable was Map it was not obvious.

This race condition in CoordinatorRuleManager is fixed in this PR. Also, improved logic in DirectDruidClient and ResourcePool.

…s; Fix a race condition in CoordinatorRuleManager; improve logic in DirectDruidClient and ResourcePool
@leventov leventov changed the title Prohibit assigning concurrent maps into Map-types variables and fields Prohibit assigning concurrent maps into Map-typed variables and fields Jan 22, 2019
@leventov leventov changed the title Prohibit assigning concurrent maps into Map-typed variables and fields Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager Jan 22, 2019
@@ -92,7 +93,8 @@

private static final Logger log = new Logger(DirectDruidClient.class);

private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change ConcurrentHashMap to ConcurrentMap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's often should be the opposite: ConcurrentHashMap should be deliberately used instead of ConcurrentMap whenever compute(), computeIfAbsent(), etc. called on the map, because ConcurrentHashMap guarantees atomicity and linearizability of such actions, but ConcurrentMap doesn't. E. g. ConcurrentSkipListMap merely guarantees that if two concurrent threads call computeIfAbsent() on the same key at the same time, the program won't crash with IllegalStateException or ConcurrentModificationException, but the lambdas could be computed in parallel and it's unknown which wins.

I will go though this PR and change types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation.

@@ -159,14 +158,15 @@ public int getNumOpenConnections()

Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since using computeIfAbsent, this if (types == null) can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could improve concurrency, see #4397 (comment). I'll add a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

@QiuMM QiuMM Jan 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, computeIfAbsent() may call get() again(ConcurrentHashMap doesn't, ConcurrentSkipListMap does), so I think we shouldn't use computeIfAbsent(), using put() is ok here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two queries of the same new type are run in parallel, there could be a race between them. Maybe it could be tolerated here because computation (body of the lambda) could be run in parallel and re-run for the same type with no harm, but computeIfAbsent() is clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it's fine.

@@ -67,7 +67,7 @@

private ExecutorService listenerExecutor;

private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, change ConcurrentHashMap to ConcurrentMap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -121,7 +121,7 @@
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final Cache cache;
private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, change ConcurrentHashMap to ConcurrentMap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I guess this one looks like it could just be ConcurrentMap

return retVal;
ConcurrentMap<SegmentId, String> segments = currentlyProcessingSegments.get(tier);
List<String> segmentsAndHosts = new ArrayList<>();
segments.forEach((segmentId, serverId) -> segmentsAndHosts.add(segmentId + " ON " + serverId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why you didn't use StringUtils.format.

Copy link
Member Author

@leventov leventov Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple string concatenation should be faster because it doesn't involve parsing of the format string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

…erge() is called on a ConcurrentHashMap, it's stored in a ConcurrentHashMap-typed variable, not ConcurrentMap; add comments explaining get()-before-computeIfAbsent() optimization; refactor Counters; fix a race condition in Intialization.java
@leventov
Copy link
Member Author

@QiuMM in a newer commit I've added comments and enforced that ConcurrentHashMap type is used when needed.

@jihoonson FYI I refactored Counters so that now it just holds static utility methods.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I had a couple of comments and questions but nothing critical.

<constraint name="x" within="" contains="" />
<constraint name="y" nameOfExprType="java\.util\.concurrent\.ConcurrentMap" expressionTypes="java.util.concurrent.ConcurrentMap" exprTypeWithinHierarchy="true" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="A ConcurrentHashMap on which compute() is called should be assinged into variables of ConcurrentHashMap type, not ConcurrentMap" text="$x$.compute($y$, $z$)" recursive="true" caseInsensitive="true" type="JAVA">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider including the rationale in this message: it is not obvious that it's because ConcurrentMap does not guarantee atomicity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is not assumed to be a message, it's a configuration name, I think I already overuse them. Probably neither desktop IntelliJ nor TeamCity CI are prepared for something multiline in this field.

{
return intCounters.computeIfAbsent(key, k -> new AtomicInteger()).addAndGet(val);
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why ConcurrentHashMap does not already employ an optimization like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a throughput vs. scalability tradeoff, + lack of information. We are potentially doing two operations instead of one, and avoid locking in some cases instead.

At some sites where computeIfAbsent() is actually expected to find the key absent and compute the value most of the time, the get() guard just makes things worse.

There is also an area where it's hard for me to say what approach is better, is that when the map is big and computeIfAbsent() constitutes significant part of the app's CPU consumption (the bigger the map and the hotter computeIfAbsent() call is, the more likely that it's better to not guard computeIfAbsent() with get()). I think it's never nearly the case on Druid nodes that computeIfAbsent() is hot, but I could be wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the ConcurrentHashMap's part, it would be useful if computeIfAbsentMoreScalableButMaybeDoingExtraWork() existed, where they don't recompute hash bucket twice and just walk the collision chain twice. But it's easy to imagine why such method doesn't exist.

@leventov leventov force-pushed the concurrent-map-type branch from 11057ee to 14307c3 Compare January 29, 2019 04:07
@gianm
Copy link
Contributor

gianm commented Jan 29, 2019

It looks like some unit tests are failing now with similar messages. Maybe a recent change broke something?

@jon-wei jon-wei added this to the 0.14.0 milestone Feb 1, 2019
Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@@ -67,7 +67,7 @@

private ExecutorService listenerExecutor;

private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -121,7 +121,7 @@
private final IndexIO indexIO;
private final IndexMerger indexMerger;
private final Cache cache;
private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I guess this one looks like it could just be ConcurrentMap

@leventov leventov added the WIP label Feb 4, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants