Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager #6898

Merged
merged 10 commits into from
Feb 4, 2019
34 changes: 33 additions & 1 deletion .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -24,13 +24,16 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

@@ -123,13 +126,23 @@ protected void finalize() throws Throwable
public void close()
{
closed.set(true);
final Map<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
for (K k : ImmutableSet.copyOf(mapView.keySet())) {
mapView.remove(k).close();
final ConcurrentMap<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
for (Iterator<Map.Entry<K, ImmediateCreationResourceHolder<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<K, ImmediateCreationResourceHolder<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class ImmediateCreationResourceHolder<K, V>
private static class ImmediateCreationResourceHolder<K, V> implements Closeable
{
private final int maxSize;
private final K key;
@@ -265,7 +278,8 @@ private boolean holderListContains(V object)
return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object));
}

void close()
@Override
public void close()
{
synchronized (this) {
closed = true;
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -184,7 +185,7 @@ public boolean start()
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
cacheHandler = cacheManager.createCache();
final Map<String, String> map = cacheHandler.getCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -123,8 +124,8 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";

private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> nextOffsets = new ConcurrentHashMap<>();

// The pause lock and associated conditions are to support coordination between the Jetty threads and the main
// ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
@@ -2393,8 +2392,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -341,7 +342,8 @@ private void rescheduleRunnable(long delayMillis)

private ScheduledExecutorService scheduledExec;

private final Map<StreamPartition<String>, PartitionResource> partitionResources = new ConcurrentHashMap<>();
private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources =
new ConcurrentHashMap<>();
private BlockingQueue<OrderedPartitionableRecord<String, String>> records;

private volatile boolean checkPartitionsStarted = false;
Original file line number Diff line number Diff line change
@@ -55,7 +55,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
@@ -2705,8 +2704,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Original file line number Diff line number Diff line change
@@ -114,7 +114,7 @@ void monitor(ServiceEmitter serviceEmitter)
long size = 0;
expungeCollectedCaches();
for (WeakReference<ConcurrentMap<String, String>> cacheRef : caches) {
final Map<String, String> cache = cacheRef.get();
final ConcurrentMap<String, String> cache = cacheRef.get();
if (cache == null) {
continue;
}
Original file line number Diff line number Diff line change
@@ -19,54 +19,31 @@

package org.apache.druid.indexing.common;

import com.google.common.util.concurrent.AtomicDouble;

import javax.annotation.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.concurrent.atomic.AtomicLong;

public class Counters
public final class Counters
{
private final ConcurrentMap<String, AtomicInteger> intCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AtomicDouble> doubleCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AtomicReference> objectCounters = new ConcurrentHashMap<>();

public int increment(String key, int val)
public static <K> int incrementAndGetInt(ConcurrentHashMap<K, AtomicInteger> counters, K key)
{
return intCounters.computeIfAbsent(key, k -> new AtomicInteger()).addAndGet(val);
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why ConcurrentHashMap does not already employ an optimization like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a throughput vs. scalability tradeoff, + lack of information. We are potentially doing two operations instead of one, and avoid locking in some cases instead.

At some sites where computeIfAbsent() is actually expected to find the key absent and compute the value most of the time, the get() guard just makes things worse.

There is also an area where it's hard for me to say what approach is better, is that when the map is big and computeIfAbsent() constitutes significant part of the app's CPU consumption (the bigger the map and the hotter computeIfAbsent() call is, the more likely that it's better to not guard computeIfAbsent() with get()). I think it's never nearly the case on Druid nodes that computeIfAbsent() is hot, but I could be wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the ConcurrentHashMap's part, it would be useful if computeIfAbsentMoreScalableButMaybeDoingExtraWork() existed, where they don't recompute hash bucket twice and just walk the collision chain twice. But it's easy to imagine why such method doesn't exist.

AtomicInteger counter = counters.get(key);
if (counter == null) {
counter = counters.computeIfAbsent(key, k -> new AtomicInteger());
}
return counter.incrementAndGet();
}

public double increment(String key, double val)
public static <K> long incrementAndGetLong(ConcurrentHashMap<K, AtomicLong> counters, K key)
{
return doubleCounters.computeIfAbsent(key, k -> new AtomicDouble()).addAndGet(val);
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
AtomicLong counter = counters.get(key);
if (counter == null) {
counter = counters.computeIfAbsent(key, k -> new AtomicLong());
}
return counter.incrementAndGet();
}

public Object increment(String key, Object obj, BinaryOperator mergeFunction)
{
return objectCounters.computeIfAbsent(key, k -> new AtomicReference()).accumulateAndGet(obj, mergeFunction);
}

@Nullable
public Integer getIntCounter(String key)
{
final AtomicInteger atomicInteger = intCounters.get(key);
return atomicInteger == null ? null : atomicInteger.get();
}

@Nullable
public Double getDoubleCounter(String key)
{
final AtomicDouble atomicDouble = doubleCounters.get(key);
return atomicDouble == null ? null : atomicDouble.get();
}

@Nullable
public Object getObjectCounter(String key)
{
final AtomicReference atomicReference = objectCounters.get(key);
return atomicReference == null ? null : atomicReference.get();
}
private Counters() {}
}
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.actions;

import com.google.inject.Inject;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
@@ -34,24 +33,21 @@ public class TaskActionToolbox
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
private final SupervisorManager supervisorManager;
private final Counters counters;

@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter,
SupervisorManager supervisorManager,
Counters counters
SupervisorManager supervisorManager
)
{
this.taskLockbox = taskLockbox;
this.taskStorage = taskStorage;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
this.supervisorManager = supervisorManager;
this.counters = counters;
}

public TaskLockbox getTaskLockbox()
@@ -78,9 +74,4 @@ public SupervisorManager getSupervisorManager()
{
return supervisorManager;
}

public Counters getCounters()
{
return counters;
}
}
Original file line number Diff line number Diff line change
@@ -76,6 +76,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
@@ -98,7 +100,7 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan
private final AuthorizerMapper authorizerMapper;
private final RowIngestionMetersFactory rowIngestionMetersFactory;

private final Counters counters = new Counters();
private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();

private volatile ParallelIndexTaskRunner runner;

@@ -377,7 +379,7 @@ SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
}

final int partitionNum = counters.increment(interval.toString(), 1);
final int partitionNum = Counters.incrementAndGetInt(partitionNumCountersPerInterval, interval);
return new SegmentIdWithShardSpec(
dataSource,
interval,
Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -82,8 +81,8 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
private final BlockingQueue<SubTaskCompleteEvent<ParallelIndexSubTask>> taskCompleteEvents =
new LinkedBlockingDeque<>();

// subTaskId -> report
private final ConcurrentMap<String, PushedSegmentsReport> segmentsMap = new ConcurrentHashMap<>();
/** subTaskId -> report */
private final ConcurrentHashMap<String, PushedSegmentsReport> segmentsMap = new ConcurrentHashMap<>();

private volatile boolean stopped;
private volatile TaskMonitor<ParallelIndexSubTask> taskMonitor;
Original file line number Diff line number Diff line change
@@ -85,6 +85,7 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -107,8 +108,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private final PortFinder portFinder;
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();

// Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting.
private final Map<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
/** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
private final ConcurrentMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();

private volatile boolean stopping = false;

Loading