diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml index afe24b9a727b..f74a3d6924e6 100644 --- a/core/trino-main/pom.xml +++ b/core/trino-main/pom.xml @@ -303,6 +303,12 @@ commons-math3 + + org.apache.commons + commons-pool2 + 2.11.1 + + org.apache.lucene lucene-analyzers-common diff --git a/core/trino-main/src/main/java/io/trino/metadata/ResolvedFunction.java b/core/trino-main/src/main/java/io/trino/metadata/ResolvedFunction.java index 398d1a62edaf..4e99c20bb217 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/ResolvedFunction.java +++ b/core/trino-main/src/main/java/io/trino/metadata/ResolvedFunction.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Splitter; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.compress.zstd.ZstdCompressor; @@ -23,6 +24,7 @@ import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodecFactory; import io.airlift.json.ObjectMapperProvider; +import io.trino.collect.cache.NonEvictableCache; import io.trino.spi.type.Type; import io.trino.spi.type.TypeId; import io.trino.spi.type.TypeSignature; @@ -30,23 +32,29 @@ import io.trino.type.TypeDeserializer; import io.trino.type.TypeSignatureDeserializer; import io.trino.type.TypeSignatureKeyDeserializer; +import io.trino.util.ThreadSafeCompressorDecompressor; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.BaseEncoding.base32Hex; +import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; +import static io.trino.metadata.ResolvedFunction.ResolvedFunctionDecoder.resolveQualifiedName; import static java.lang.Math.toIntExact; +import static java.nio.ByteBuffer.allocate; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; public class ResolvedFunction { - private static final JsonCodec SERIALIZE_JSON_CODEC = new JsonCodecFactory().jsonCodec(ResolvedFunction.class); private static final String PREFIX = "@"; private final BoundSignature signature; private final FunctionId functionId; @@ -55,6 +63,7 @@ public class ResolvedFunction private final FunctionNullability functionNullability; private final Map typeDependencies; private final Set functionDependencies; + private final QualifiedName qualifiedName; @JsonCreator public ResolvedFunction( @@ -73,6 +82,7 @@ public ResolvedFunction( this.functionNullability = requireNonNull(functionNullability, "nullability is null"); this.typeDependencies = ImmutableMap.copyOf(requireNonNull(typeDependencies, "typeDependencies is null")); this.functionDependencies = ImmutableSet.copyOf(requireNonNull(functionDependencies, "functionDependencies is null")); + this.qualifiedName = resolveQualifiedName(this); checkArgument(functionNullability.getArgumentNullable().size() == signature.getArgumentTypes().size(), "signature and functionNullability must have same argument count"); } @@ -125,17 +135,7 @@ public static boolean isResolved(QualifiedName name) public QualifiedName toQualifiedName() { - byte[] json = SERIALIZE_JSON_CODEC.toJsonBytes(this); - - // json can be large so use zstd to compress - ZstdCompressor compressor = new ZstdCompressor(); - byte[] compressed = new byte[compressor.maxCompressedLength(json.length)]; - int outputSize = compressor.compress(json, 0, json.length, compressed, 0, compressed.length); - - // names are case insensitive, so use base32 instead of base64 - String base32 = base32Hex().encode(compressed, 0, outputSize); - // add name so expressions are still readable - return QualifiedName.of(PREFIX + signature.getName() + PREFIX + base32); + return qualifiedName; } public static String extractFunctionName(QualifiedName qualifiedName) @@ -181,6 +181,11 @@ public String toString() public static class ResolvedFunctionDecoder { + private static final JsonCodec SERIALIZE_JSON_CODEC = new JsonCodecFactory().jsonCodec(ResolvedFunction.class); + private static final ThreadSafeCompressorDecompressor COMPRESSOR_DECOMPRESSOR = new ThreadSafeCompressorDecompressor(ZstdCompressor::new, ZstdDecompressor::new); + private static final NonEvictableCache resolvedFunctionsCache = buildNonEvictableCache(CacheBuilder.newBuilder().maximumSize(1024)); + private static final NonEvictableCache qualifiedFunctionsCache = buildNonEvictableCache(CacheBuilder.newBuilder().maximumSize(1024)); + private final JsonCodec jsonCodec; public ResolvedFunctionDecoder(Function typeLoader) @@ -196,26 +201,49 @@ Type.class, new TypeDeserializer(typeLoader), public Optional fromQualifiedName(QualifiedName qualifiedName) { - String data = qualifiedName.getSuffix(); - if (!data.startsWith(PREFIX)) { + if (!qualifiedName.getSuffix().startsWith(PREFIX)) { return Optional.empty(); } - List parts = Splitter.on(PREFIX).splitToList(data.subSequence(1, data.length())); - checkArgument(parts.size() == 2, "Expected encoded resolved function to contain two parts: %s", qualifiedName); - String name = parts.get(0); + try { + return Optional.of(resolvedFunctionsCache.get(qualifiedName, () -> deserialize(qualifiedName))); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + static QualifiedName resolveQualifiedName(ResolvedFunction function) + { + try { + return qualifiedFunctionsCache.get(function, () -> serialize(function)); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private ResolvedFunction deserialize(QualifiedName qualifiedName) + { + String data = qualifiedName.getSuffix(); + List parts = Splitter.on(PREFIX).splitToList(data.substring(1)); + checkArgument(parts.size() == 2, "Expected encoded resolved function to contain two parts: %s", qualifiedName); String base32 = parts.get(1); // name may have been lower cased, but base32 decoder requires upper case base32 = base32.toUpperCase(ENGLISH); byte[] compressed = base32Hex().decode(base32); - byte[] json = new byte[toIntExact(ZstdDecompressor.getDecompressedSize(compressed, 0, compressed.length))]; - new ZstdDecompressor().decompress(compressed, 0, compressed.length, json, 0, json.length); + ByteBuffer decompressed = allocate(toIntExact(ZstdDecompressor.getDecompressedSize(compressed, 0, compressed.length))); + COMPRESSOR_DECOMPRESSOR.decompress(ByteBuffer.wrap(compressed), decompressed); + return jsonCodec.fromJson(Arrays.copyOf(decompressed.array(), decompressed.position())); + } - ResolvedFunction resolvedFunction = jsonCodec.fromJson(json); - checkArgument(resolvedFunction.getSignature().getName().equalsIgnoreCase(name), - "Expected decoded function to have name %s, but name is %s", resolvedFunction.getSignature().getName(), name); - return Optional.of(resolvedFunction); + static QualifiedName serialize(ResolvedFunction function) + { + byte[] value = SERIALIZE_JSON_CODEC.toJsonBytes(function); + ByteBuffer compressed = allocate(COMPRESSOR_DECOMPRESSOR.maxCompressedLength(value.length)); + COMPRESSOR_DECOMPRESSOR.compress(ByteBuffer.wrap(value), compressed); + return QualifiedName.of(PREFIX + function.signature.getName() + PREFIX + base32Hex().encode(compressed.array(), 0, compressed.position())); } } } diff --git a/core/trino-main/src/main/java/io/trino/util/ThreadSafeCompressorDecompressor.java b/core/trino-main/src/main/java/io/trino/util/ThreadSafeCompressorDecompressor.java new file mode 100644 index 000000000000..c3d9560df6b4 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/util/ThreadSafeCompressorDecompressor.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.util; + +import io.airlift.compress.Compressor; +import io.airlift.compress.Decompressor; +import io.airlift.compress.MalformedInputException; +import org.apache.commons.pool2.DestroyMode; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.SoftReferenceObjectPool; + +import java.nio.ByteBuffer; +import java.util.function.Function; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class ThreadSafeCompressorDecompressor + implements Compressor, Decompressor +{ + private final SoftReferenceObjectPool compressors; + private final SoftReferenceObjectPool decompressors; + + public ThreadSafeCompressorDecompressor(Supplier compressorFactory, Supplier decompressorFactory) + { + this.compressors = new SoftReferenceObjectPool<>(new CompressorObjectFactory<>(requireNonNull(compressorFactory, "compressorFactory is null"))); + this.decompressors = new SoftReferenceObjectPool<>(new CompressorObjectFactory<>(requireNonNull(decompressorFactory, "decompressorFactory is null"))); + } + + @Override + public int maxCompressedLength(int uncompressedSize) + { + return with(compressors, compressor -> compressor.maxCompressedLength(uncompressedSize)); + } + + @Override + public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) + { + return with(compressors, compressor -> compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength)); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) + { + with(compressors, compressor -> { + compressor.compress(input, output); + return true; + }); + } + + @Override + public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength) throws MalformedInputException + { + return with(decompressors, decompressor -> decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength)); + } + + @Override + public void decompress(ByteBuffer input, ByteBuffer output) throws MalformedInputException + { + with(decompressors, decompressor -> + { + decompressor.decompress(input, output); + return true; + }); + } + + private T with(ObjectPool pool, Function call) + { + U delegate = null; + try { + delegate = pool.borrowObject(); + return call.apply(delegate); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + if (delegate != null) { + try { + pool.returnObject(delegate); + } + catch (Exception ignored) { + } + } + } + } + + private static class CompressorObjectFactory + implements PooledObjectFactory + { + private final Supplier delegate; + + CompressorObjectFactory(Supplier delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void activateObject(PooledObject pooledObject) + { + } + + @Override + public void destroyObject(PooledObject pooledObject) + { + } + + @Override + public void destroyObject(PooledObject pooledObject, DestroyMode destroyMode) + { + destroyObject(pooledObject); + } + + @Override + public PooledObject makeObject() + { + return new DefaultPooledObject<>(delegate.get()); + } + + @Override + public void passivateObject(PooledObject pooledObject) + { + } + + @Override + public boolean validateObject(PooledObject pooledObject) + { + return true; + } + } +} diff --git a/core/trino-parser/src/main/java/io/trino/sql/tree/QualifiedName.java b/core/trino-parser/src/main/java/io/trino/sql/tree/QualifiedName.java index 0939acde8ed7..1bde85186c77 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/tree/QualifiedName.java +++ b/core/trino-parser/src/main/java/io/trino/sql/tree/QualifiedName.java @@ -15,7 +15,6 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.util.List; @@ -30,8 +29,11 @@ public class QualifiedName { - private final List parts; private final List originalParts; + private final List parts; + private final Optional prefix; + private final String suffix; + private final String name; public static QualifiedName of(String first, String... rest) { @@ -56,7 +58,25 @@ public static QualifiedName of(Iterable originalParts) private QualifiedName(List originalParts) { this.originalParts = originalParts; - this.parts = originalParts.stream().map(identifier -> identifier.getValue().toLowerCase(ENGLISH)).collect(toImmutableList()); + this.parts = originalParts.stream() + .map(QualifiedName::mapIdentifier) + .collect(toImmutableList()); + this.name = Joiner.on(".").join(parts); + + if (originalParts.size() == 1) { + this.prefix = Optional.empty(); + this.suffix = mapIdentifier(originalParts.get(0)); + } + else { + List subList = originalParts.subList(0, originalParts.size() - 1); + this.prefix = Optional.of(new QualifiedName(subList)); + this.suffix = mapIdentifier(originalParts.get(originalParts.size() - 1)); + } + } + + private static String mapIdentifier(Identifier identifier) + { + return identifier.getValue().toLowerCase(ENGLISH); } public List getParts() @@ -69,24 +89,13 @@ public List getOriginalParts() return originalParts; } - @Override - public String toString() - { - return Joiner.on('.').join(parts); - } - /** * For an identifier of the form "a.b.c.d", returns "a.b.c" * For an identifier of the form "a", returns absent */ public Optional getPrefix() { - if (parts.size() == 1) { - return Optional.empty(); - } - - List subList = originalParts.subList(0, originalParts.size() - 1); - return Optional.of(new QualifiedName(subList)); + return this.prefix; } public boolean hasSuffix(QualifiedName suffix) @@ -96,13 +105,12 @@ public boolean hasSuffix(QualifiedName suffix) } int start = parts.size() - suffix.getParts().size(); - return parts.subList(start, parts.size()).equals(suffix.getParts()); } public String getSuffix() { - return Iterables.getLast(parts); + return this.suffix; } @Override @@ -122,4 +130,10 @@ public int hashCode() { return parts.hashCode(); } + + @Override + public String toString() + { + return name; + } }