Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ImmutableLookupMap for static lookups. #15675

Merged
merged 11 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.lookup.ImmutableLookupMap;
import org.apache.druid.query.lookup.LookupExtractor;

import java.util.HashMap;
Expand All @@ -35,6 +37,11 @@
*/
public class LookupBenchmarkUtil
{
/**
* Length of keys. They are zero-padded to this size.
*/
private static final int KEY_LENGTH = 20;

public enum LookupType
{
HASHMAP {
Expand Down Expand Up @@ -69,6 +76,17 @@ public LookupExtractor build(Iterable<Pair<String, String>> keyValuePairs)
}
return new MapLookupExtractor(map, false);
}
},
REVERSIBLE {
@Override
public LookupExtractor build(Iterable<Pair<String, String>> keyValuePairs)
{
final Map<String, String> map = new HashMap<>();
for (final Pair<String, String> keyValuePair : keyValuePairs) {
map.put(keyValuePair.lhs, keyValuePair.rhs);
}
return ImmutableLookupMap.fromMap(map).asLookupExtractor(false, () -> new byte[0]);
}
};

public abstract LookupExtractor build(Iterable<Pair<String, String>> keyValuePairs);
Expand All @@ -91,9 +109,14 @@ public static LookupExtractor makeLookupExtractor(final LookupType lookupType, f

final Iterable<Pair<String, String>> keys =
() -> IntStream.range(0, numKeys)
.mapToObj(i -> Pair.of(String.valueOf(i), String.valueOf(i % numValues)))
.mapToObj(i -> Pair.of(makeKeyOrValue(i), makeKeyOrValue(i % numValues)))
.iterator();

return lookupType.build(keys);
}

public static String makeKeyOrValue(final int i)
{
return StringUtils.format("%0" + KEY_LENGTH + "d", i);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.druid.benchmark.lookup;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.lookup.LookupExtractor;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -58,7 +60,7 @@ public class LookupExtractorBenchmark
/**
* Type of lookup to benchmark. All are members of enum {@link LookupBenchmarkUtil.LookupType}.
*/
@Param({"hashmap", "guava", "fastutil"})
@Param({"reversible"})
private String lookupType;

/**
Expand All @@ -74,6 +76,7 @@ public class LookupExtractorBenchmark
private int keysPerValue;

private LookupExtractor lookup;
private String oneValue;
private Set<String> oneThousandValues;

@Setup(Level.Trial)
Expand All @@ -86,12 +89,15 @@ public void setup()
numValues
);

Preconditions.checkArgument(lookup.keySet().size() == numKeys);
Preconditions.checkArgument(lookup.asMap().size() == numKeys);

// Values to unapply for the benchmark lookupUnapplyOne.
oneValue = LookupBenchmarkUtil.makeKeyOrValue(0);

// Set of values to unapply for the benchmark lookupUnapplyOneThousand.
oneThousandValues = new HashSet<>();
for (int i = 0; i < 1000; i++) {
oneThousandValues.add(String.valueOf(i));
oneThousandValues.add(LookupBenchmarkUtil.makeKeyOrValue(i));
}
}

Expand All @@ -105,17 +111,23 @@ public void lookupApply(Blackhole blackhole)

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void lookupUnapplyOne(Blackhole blackhole)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupUnapplyOne()
{
blackhole.consume(lookup.unapplyAll(Collections.singleton("0")));
final int numKeys = Iterators.size(lookup.unapplyAll(Collections.singleton(oneValue)));
if (numKeys != keysPerValue) {
throw new ISE("Expected [%s] keys, got[%s]", keysPerValue, numKeys);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void lookupUnapplyOneThousand(Blackhole blackhole)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void lookupUnapplyOneThousand()
{
blackhole.consume(lookup.unapplyAll(oneThousandValues));
final int numKeys = Iterators.size(lookup.unapplyAll(oneThousandValues));
if (numKeys != keysPerValue * 1000) {
throw new ISE("Expected [%s] keys, got[%s]", keysPerValue * 1000, numKeys);
}
}
}
5 changes: 5 additions & 0 deletions extensions-core/lookups-cached-global/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.lookup.namespace.ExtractionNamespace;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

@JsonTypeName("cachedNamespace")
public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
Expand Down Expand Up @@ -225,23 +224,18 @@ public LookupExtractor get()
throw new ISE("%s: %s, extractorID = %s", entry, noCacheReason, extractorID);
}
CacheScheduler.VersionedCache versionedCache = (CacheScheduler.VersionedCache) cacheState;
Map<String, String> map = versionedCache.getCache();
final byte[] v = StringUtils.toUtf8(versionedCache.getVersion());
final byte[] id = StringUtils.toUtf8(extractorID);
return new MapLookupExtractor(map, isInjective())
{
@Override
public byte[] getCacheKey()
{
return ByteBuffer
final byte injectiveByte = isInjective() ? (byte) 1 : (byte) 0;
final Supplier<byte[]> cacheKey = () ->
ByteBuffer
.allocate(CLASS_CACHE_KEY.length + id.length + 1 + v.length + 1 + 1)
.put(CLASS_CACHE_KEY)
.put(id).put((byte) 0xFF)
.put(v).put((byte) 0xFF)
.put(isOneToOne() ? (byte) 1 : (byte) 0)
.put(injectiveByte)
.array();
}
};
return versionedCache.asLookupExtractor(isInjective(), cacheKey);
}
finally {
readLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.ServletResourceUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;

import javax.ws.rs.GET;
Expand Down Expand Up @@ -95,6 +94,6 @@ public Response getMap()

private Map<String, String> getLatest()
{
return ((MapLookupExtractor) factory.get()).getMap();
return factory.get().asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.druid.server.lookup.namespace.cache;

import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupExtractor;

import java.io.Closeable;
import java.util.Map;
import java.util.function.Supplier;

public final class CacheHandler implements AutoCloseable, Closeable
{
Expand All @@ -45,6 +47,14 @@ public Map<String, String> getCache()
return cache;
}

/**
* Returns a {@link LookupExtractor} view of the cached data.
*/
public LookupExtractor asLookupExtractor(final boolean isOneToOne, final Supplier<byte[]> cacheKeySupplier)
{
return cacheManager.asLookupExtractor(this, isOneToOne, cacheKeySupplier);
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.ExtractionNamespace;

Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Usage:
Expand Down Expand Up @@ -409,6 +411,14 @@ public Map<String, String> getCache()
return cacheHandler.getCache();
}

/**
* Returns a {@link LookupExtractor} view of the cached data.
*/
public LookupExtractor asLookupExtractor(final boolean isOneToOne, final Supplier<byte[]> cacheKeySupplier)
{
return cacheHandler.asLookupExtractor(isOneToOne, cacheKeySupplier);
}

public String getVersion()
{
return version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* Usage:
Expand Down Expand Up @@ -112,6 +114,16 @@ boolean waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedExceptio
*/
public abstract CacheHandler attachCache(CacheHandler cache);

/**
* Given a cache from {@link #createCache()} or {@link #allocateCache()}, return a {@link LookupExtractor}
* view of it.
*/
public abstract LookupExtractor asLookupExtractor(
CacheHandler cacheHandler,
boolean isOneToOne,
Supplier<byte[]> cacheKeySupplier
);

abstract void disposeCache(CacheHandler cacheHandler);

abstract int cacheCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import org.mapdb.DB;
import org.mapdb.DBMaker;
Expand All @@ -38,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
*
Expand Down Expand Up @@ -82,7 +85,8 @@ public void run()
try {
doDispose();
// Log statement goes after doDispose(), because logging may fail (e. g. if we are in shutdownHooks).
log.error("OffHeapNamespaceExtractionCacheManager.disposeCache() was not called, disposed resources by the JVM");
log.error(
"OffHeapNamespaceExtractionCacheManager.disposeCache() was not called, disposed resources by the JVM");
}
catch (Throwable t) {
try {
Expand Down Expand Up @@ -251,6 +255,23 @@ public CacheHandler attachCache(CacheHandler cache)
return cache;
}

@Override
public LookupExtractor asLookupExtractor(
final CacheHandler cacheHandler,
final boolean isOneToOne,
final Supplier<byte[]> cacheKeySupplier
)
{
return new MapLookupExtractor(cacheHandler.getCache(), isOneToOne)
{
@Override
public byte[] getCacheKey()
{
return cacheKeySupplier.get();
}
};
}

@Override
void disposeCache(CacheHandler cacheHandler)
{
Expand Down
Loading
Loading