-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager #6898
Conversation
…s; Fix a race condition in CoordinatorRuleManager; improve logic in DirectDruidClient and ResourcePool
@@ -92,7 +93,8 @@ | |||
|
|||
private static final Logger log = new Logger(DirectDruidClient.class); | |||
|
|||
private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = new ConcurrentHashMap<>(); | |||
private static final ConcurrentHashMap<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change ConcurrentHashMap
to ConcurrentMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's often should be the opposite: ConcurrentHashMap
should be deliberately used instead of ConcurrentMap
whenever compute()
, computeIfAbsent()
, etc. called on the map, because ConcurrentHashMap
guarantees atomicity and linearizability of such actions, but ConcurrentMap
doesn't. E. g. ConcurrentSkipListMap
merely guarantees that if two concurrent threads call computeIfAbsent()
on the same key at the same time, the program won't crash with IllegalStateException or ConcurrentModificationException, but the lambdas could be computed in parallel and it's unknown which wins.
I will go though this PR and change types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks for the explanation.
@@ -159,14 +158,15 @@ public int getNumOpenConnections() | |||
|
|||
Pair<JavaType, JavaType> types = typesMap.get(query.getClass()); | |||
if (types == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since using computeIfAbsent
, this if (types == null)
can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could improve concurrency, see #4397 (comment). I'll add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, computeIfAbsent()
may call get()
again(ConcurrentHashMap doesn't, ConcurrentSkipListMap does), so I think we shouldn't use computeIfAbsent()
, using put()
is ok here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If two queries of the same new type are run in parallel, there could be a race between them. Maybe it could be tolerated here because computation (body of the lambda) could be run in parallel and re-run for the same type with no harm, but computeIfAbsent()
is clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it's fine.
@@ -67,7 +67,7 @@ | |||
|
|||
private ExecutorService listenerExecutor; | |||
|
|||
private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); | |||
private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, change ConcurrentHashMap
to ConcurrentMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -121,7 +121,7 @@ | |||
private final IndexIO indexIO; | |||
private final IndexMerger indexMerger; | |||
private final Cache cache; | |||
private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); | |||
private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, change ConcurrentHashMap
to ConcurrentMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I guess this one looks like it could just be ConcurrentMap
return retVal; | ||
ConcurrentMap<SegmentId, String> segments = currentlyProcessingSegments.get(tier); | ||
List<String> segmentsAndHosts = new ArrayList<>(); | ||
segments.forEach((segmentId, serverId) -> segmentsAndHosts.add(segmentId + " ON " + serverId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why you didn't use StringUtils.format
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple string concatenation should be faster because it doesn't involve parsing of the format string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
…erge() is called on a ConcurrentHashMap, it's stored in a ConcurrentHashMap-typed variable, not ConcurrentMap; add comments explaining get()-before-computeIfAbsent() optimization; refactor Counters; fix a race condition in Intialization.java
@QiuMM in a newer commit I've added comments and enforced that @jihoonson FYI I refactored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I had a couple of comments and questions but nothing critical.
<constraint name="x" within="" contains="" /> | ||
<constraint name="y" nameOfExprType="java\.util\.concurrent\.ConcurrentMap" expressionTypes="java.util.concurrent.ConcurrentMap" exprTypeWithinHierarchy="true" within="" contains="" /> | ||
</searchConfiguration> | ||
<searchConfiguration name="A ConcurrentHashMap on which compute() is called should be assinged into variables of ConcurrentHashMap type, not ConcurrentMap" text="$x$.compute($y$, $z$)" recursive="true" caseInsensitive="true" type="JAVA"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider including the rationale in this message: it is not obvious that it's because ConcurrentMap does not guarantee atomicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is not assumed to be a message, it's a configuration name, I think I already overuse them. Probably neither desktop IntelliJ nor TeamCity CI are prepared for something multiline in this field.
{ | ||
return intCounters.computeIfAbsent(key, k -> new AtomicInteger()).addAndGet(val); | ||
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea why ConcurrentHashMap does not already employ an optimization like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a throughput vs. scalability tradeoff, + lack of information. We are potentially doing two operations instead of one, and avoid locking in some cases instead.
At some sites where computeIfAbsent()
is actually expected to find the key absent and compute the value most of the time, the get()
guard just makes things worse.
There is also an area where it's hard for me to say what approach is better, is that when the map is big and computeIfAbsent()
constitutes significant part of the app's CPU consumption (the bigger the map and the hotter computeIfAbsent()
call is, the more likely that it's better to not guard computeIfAbsent()
with get()
). I think it's never nearly the case on Druid nodes that computeIfAbsent()
is hot, but I could be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the ConcurrentHashMap's part, it would be useful if computeIfAbsentMoreScalableButMaybeDoingExtraWork()
existed, where they don't recompute hash bucket twice and just walk the collision chain twice. But it's easy to imagine why such method doesn't exist.
…zation; IdentityHashMap optimization
11057ee
to
14307c3
Compare
It looks like some unit tests are failing now with similar messages. Maybe a recent change broke something? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
@@ -67,7 +67,7 @@ | |||
|
|||
private ExecutorService listenerExecutor; | |||
|
|||
private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); | |||
private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -121,7 +121,7 @@ | |||
private final IndexIO indexIO; | |||
private final IndexMerger indexMerger; | |||
private final Cache cache; | |||
private final Map<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); | |||
private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I guess this one looks like it could just be ConcurrentMap
Concurrent maps (i. e.
ConcurrentHashMap
andConcurrentSkipListMap
) should be assigned into variables of their respective type orConcurrentMap
, but not justMap
.Why this is important could be seen in
CoordinatorRuleManager
, where it's pretty obvious that codehas a race condition, but previously when the type of the variable was
Map
it was not obvious.This race condition in
CoordinatorRuleManager
is fixed in this PR. Also, improved logic inDirectDruidClient
andResourcePool
.