Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prohibit assigning concurrent maps into Map-typed variables and fields and fix a race condition in CoordinatorRuleManager #6898

Merged
merged 10 commits into from
Feb 4, 2019
34 changes: 33 additions & 1 deletion .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void setup() throws IOException

factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public String getFormatString()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ public void setup() throws IOException
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public String getFormatString()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
);
}

public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void setup() throws IOException
new SearchStrategySelector(Suppliers.ofInstance(config)),
new SearchQueryQueryToolChest(
config,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void setup() throws IOException
factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
JSON_MAPPER,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator(),
selectConfigSupplier
),
new SelectQueryEngine(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void setup() throws IOException

factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ public void setup() throws IOException
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ private void setupQueries()
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down Expand Up @@ -270,7 +273,7 @@ private void setupQueries()

timeseriesQuery = timeseriesQueryBuilder.build();
timeseriesFactory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryQueryToolChest(QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,19 @@ public void emit(Event event)
{
try {
URI uri = uriExtractor.apply(event);
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
// See https://github.com/apache/incubator-druid/pull/6898#discussion_r251384586.
HttpPostEmitter emitter = emitters.get(uri);
if (emitter == null) {
try {
emitter = emitters.computeIfAbsent(uri, u -> {
try {
return innerLifecycle.addMaybeStartManagedInstance(
new HttpPostEmitter(
config.buildHttpEmitterConfig(u.toString()),
client,
jsonMapper
)
new HttpPostEmitter(config.buildHttpEmitterConfig(u.toString()), client, jsonMapper)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -123,13 +126,23 @@ protected void finalize() throws Throwable
public void close()
{
closed.set(true);
final Map<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
for (K k : ImmutableSet.copyOf(mapView.keySet())) {
mapView.remove(k).close();
final ConcurrentMap<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
for (Iterator<Map.Entry<K, ImmediateCreationResourceHolder<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<K, ImmediateCreationResourceHolder<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class ImmediateCreationResourceHolder<K, V>
private static class ImmediateCreationResourceHolder<K, V> implements Closeable
{
private final int maxSize;
private final K key;
Expand Down Expand Up @@ -265,7 +278,8 @@ private boolean holderListContains(V object)
return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object));
}

void close()
@Override
public void close()
{
synchronized (this) {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testMakePostComputeManipulatorFn()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public int getNumThreads()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static Iterable<Object[]> constructorFeeder() throws IOException
SelectQueryRunnerFactory factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator(),
selectConfigSupplier
),
new SelectQueryEngine(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() throws IOException
new StupidPool<>("map-virtual-column-test", () -> ByteBuffer.allocate(1024)),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Iterable<Object[]> constructorFeeder()
defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand All @@ -91,7 +91,7 @@ public static Iterable<Object[]> constructorFeeder()
customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Iterable<Object[]> constructorFeeder()
defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand All @@ -91,7 +91,7 @@ public static Iterable<Object[]> constructorFeeder()
customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -184,7 +185,7 @@ public boolean start()
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
cacheHandler = cacheManager.createCache();
final Map<String, String> map = cacheHandler.getCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -123,8 +124,8 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";

private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> nextOffsets = new ConcurrentHashMap<>();

// The pause lock and associated conditions are to support coordination between the Jetty threads and the main
// ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -2393,8 +2392,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -341,7 +342,8 @@ private void rescheduleRunnable(long delayMillis)

private ScheduledExecutorService scheduledExec;

private final Map<StreamPartition<String>, PartitionResource> partitionResources = new ConcurrentHashMap<>();
private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources =
new ConcurrentHashMap<>();
private BlockingQueue<OrderedPartitionableRecord<String, String>> records;

private volatile boolean checkPartitionsStarted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -2705,8 +2704,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Expand Down
Loading