From 6ef378f960dd28e53cb2dd8f0a438b0b0e0e35e7 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Tue, 5 Nov 2019 13:38:18 +0000 Subject: [PATCH] refactor: Cleaned up loading code (#3726) This PR cleans up some of the function loading code, mainly replacing some overly complex streams based logic with simpler direct iteration. --- .../confluent/ksql/function/UdafLoader.java | 95 ++++++++-------- .../io/confluent/ksql/function/UdfLoader.java | 103 ++++++++++-------- .../confluent/ksql/function/UdtfLoader.java | 90 ++++++++------- .../ksql/function/UserFunctionLoader.java | 2 +- .../ksql/function/UdfLoaderTest.java | 5 - 5 files changed, 146 insertions(+), 149 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java index 5a9dbe7a85e8..8402be1d4a97 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdafLoader.java @@ -22,10 +22,9 @@ import io.confluent.ksql.schema.ksql.SqlTypeParser; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.kafka.common.metrics.Metrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,53 +52,49 @@ class UdafLoader { void loadUdafFromClass(final Class theClass, final String path) { final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class); - final List argCreators - = Arrays.stream(theClass.getMethods()) - .filter(method -> method.getAnnotation(UdafFactory.class) != null) - .filter(method -> { - if (!Modifier.isStatic(method.getModifiers())) { - LOGGER.warn( - "Trying to create a UDAF from a non-static factory method. Udaf factory" - + " methods must be static. class={}, method={}, name={}", - method.getDeclaringClass(), - method.getName(), - udafAnnotation.name() - ); - return false; - } - return true; - }) - .map(method -> { - final UdafFactory annotation = method.getAnnotation(UdafFactory.class); - try { - LOGGER.info( - "Adding UDAF name={} from path={} class={}", - udafAnnotation.name(), - path, - method.getDeclaringClass() - ); - return Optional.of(createUdafFactoryInvoker( - method, - FunctionName.of(udafAnnotation.name()), - annotation.description(), - annotation.paramSchema(), - annotation.aggregateSchema(), - annotation.returnSchema() - )); - } catch (final Exception e) { - LOGGER.warn( - "Failed to create UDAF name={}, method={}, class={}, path={}", - udafAnnotation.name(), - method.getName(), - method.getDeclaringClass(), - path, - e - ); - } - return Optional.empty(); - }).filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); + + final List invokers = new ArrayList<>(); + for (Method method : theClass.getMethods()) { + if (method.getAnnotation(UdafFactory.class) != null) { + if (!Modifier.isStatic(method.getModifiers())) { + LOGGER.warn( + "Trying to create a UDAF from a non-static factory method. Udaf factory" + + " methods must be static. class={}, method={}, name={}", + method.getDeclaringClass(), + method.getName(), + udafAnnotation.name() + ); + continue; + } + final UdafFactory annotation = method.getAnnotation(UdafFactory.class); + try { + LOGGER.info( + "Adding UDAF name={} from path={} class={}", + udafAnnotation.name(), + path, + method.getDeclaringClass() + ); + final UdafFactoryInvoker invoker = createUdafFactoryInvoker( + method, + FunctionName.of(udafAnnotation.name()), + annotation.description(), + annotation.paramSchema(), + annotation.aggregateSchema(), + annotation.returnSchema() + ); + invokers.add(invoker); + } catch (final Exception e) { + LOGGER.warn( + "Failed to create UDAF name={}, method={}, class={}, path={}", + udafAnnotation.name(), + method.getName(), + method.getDeclaringClass(), + path, + e + ); + } + } + } functionRegistry.addAggregateFunctionFactory(new UdafAggregateFunctionFactory( new UdfMetadata( @@ -110,7 +105,7 @@ void loadUdafFromClass(final Class theClass, final String path) { path, false ), - argCreators + invokers )); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java index f1c52136a0fe..362ff9d4b4c7 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java @@ -23,11 +23,12 @@ import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.name.FunctionName; import io.confluent.ksql.schema.ksql.SqlTypeParser; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.lang.reflect.Method; -import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.function.Function; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; @@ -45,24 +46,20 @@ public class UdfLoader { private final MutableFunctionRegistry functionRegistry; private final Optional metrics; private final SqlTypeParser typeParser; - private final ClassLoader parentClassLoader; private final boolean throwExceptionOnLoadFailure; UdfLoader( final MutableFunctionRegistry functionRegistry, final Optional metrics, final SqlTypeParser typeParser, - final ClassLoader parentClassLoader, final boolean throwExceptionOnLoadFailure ) { this.functionRegistry = functionRegistry; this.metrics = metrics; this.typeParser = typeParser; - this.parentClassLoader = parentClassLoader; this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure; } - // Does not handle customer udfs, i.e the loader is the ParentClassLoader and path is internal // This method is only used from tests @VisibleForTesting void loadUdfFromClass(final Class... udfClasses) { @@ -102,35 +99,36 @@ void loadUdfFromClass( functionRegistry.ensureFunctionFactory(factory); - Arrays.stream(theClass.getMethods()) - .filter(method -> method.getAnnotation(Udf.class) != null) - .map(method -> { - try { - return Optional.of(createFunction(theClass, udfDescriptionAnnotation, method, path, - sensorName, udfClass - )); - } catch (final KsqlException e) { - if (throwExceptionOnLoadFailure) { - throw e; - } else { - LOGGER.warn( - "Failed to add UDF to the MetaStore. name={} method={}", - udfDescriptionAnnotation.name(), - method, - e - ); - } + for (Method method : theClass.getMethods()) { + final Udf udfAnnotation = method.getAnnotation(Udf.class); + if (udfAnnotation != null) { + final KsqlFunction function; + try { + function = createFunction(theClass, udfDescriptionAnnotation, udfAnnotation, method, path, + sensorName, udfClass + ); + } catch (final KsqlException e) { + if (throwExceptionOnLoadFailure) { + throw e; + } else { + LOGGER.warn( + "Failed to add UDF to the MetaStore. name={} method={}", + udfDescriptionAnnotation.name(), + method, + e + ); + continue; } - return Optional.empty(); - }) - .filter(Optional::isPresent) - .map(Optional::get) - .forEach(factory::addFunction); + } + factory.addFunction(function); + } + } } private KsqlFunction createFunction( final Class theClass, final UdfDescription udfDescriptionAnnotation, + final Udf udfAnnotation, final Method method, final String path, final String sensorName, @@ -140,7 +138,6 @@ private KsqlFunction createFunction( FunctionLoaderUtils .instantiateFunctionInstance(method.getDeclaringClass(), udfDescriptionAnnotation.name()); final FunctionInvoker invoker = FunctionLoaderUtils.createFunctionInvoker(method); - final Udf udfAnnotation = method.getAnnotation(Udf.class); final String functionName = udfDescriptionAnnotation.name(); LOGGER.info("Adding function " + functionName + " for method " + method); @@ -151,33 +148,47 @@ private KsqlFunction createFunction( final Schema javaReturnSchema = FunctionLoaderUtils .getReturnType(method, udfAnnotation.schema(), typeParser); - return KsqlFunction.create( - FunctionLoaderUtils.handleUdfReturnSchema( + final Function, Schema> schemaProviderFunction = FunctionLoaderUtils + .handleUdfReturnSchema( theClass, javaReturnSchema, udfAnnotation.schemaProvider(), udfDescriptionAnnotation.name() - ), + ); + + return KsqlFunction.create( + schemaProviderFunction, javaReturnSchema, parameters, FunctionName.of(functionName.toUpperCase()), udfClass, - ksqlConfig -> { - final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance( - method.getDeclaringClass(), udfDescriptionAnnotation.name()); - if (actualUdf instanceof Configurable) { - ((Configurable) actualUdf) - .configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName)); - } - final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf); - return metrics.map(m -> new UdfMetricProducer( - m.getSensor(sensorName), - theUdf, - Time.SYSTEM - )).orElse(theUdf); - }, udfAnnotation.description(), + getUdfFactory(method, udfDescriptionAnnotation, functionName, invoker, sensorName), + udfAnnotation.description(), path, method.isVarArgs() ); } + + private Function getUdfFactory( + final Method method, + final UdfDescription udfDescriptionAnnotation, + final String functionName, + final FunctionInvoker invoker, + final String sensorName + ) { + return ksqlConfig -> { + final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance( + method.getDeclaringClass(), udfDescriptionAnnotation.name()); + if (actualUdf instanceof Configurable) { + ((Configurable) actualUdf) + .configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName)); + } + final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf); + return metrics.map(m -> new UdfMetricProducer( + m.getSensor(sensorName), + theUdf, + Time.SYSTEM + )).orElse(theUdf); + }; + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java index 3b7e4dfc66ef..fe3f399fd566 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UdtfLoader.java @@ -24,7 +24,6 @@ import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -79,57 +78,54 @@ public void loadUdtfFromClass( false ); - final TableFunctionFactory udtfFactory = new TableFunctionFactory(metadata); + final TableFunctionFactory factory = new TableFunctionFactory(metadata); - Arrays.stream(theClass.getMethods()) - .filter(method -> method.getAnnotation(Udtf.class) != null) - .map(method -> { - try { - final Udtf annotation = method.getAnnotation(Udtf.class); - if (method.getReturnType() != List.class) { - throw new KsqlException(String - .format("UDTF functions must return a List. Class %s Method %s", - theClass.getName(), method.getName() - )); - } - final Type ret = method.getGenericReturnType(); - if (!(ret instanceof ParameterizedType)) { - throw new KsqlException(String - .format( - "UDTF functions must return a parameterized List. Class %s Method %s", - theClass.getName(), method.getName() - )); - } - final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0]; - final Schema returnType = FunctionLoaderUtils - .getReturnType(method, typeArg, annotation.schema(), typeParser); - final List parameters = FunctionLoaderUtils - .createParameters(method, functionName, typeParser); - return Optional - .of(createTableFunction(method, FunctionName.of(functionName), returnType, - parameters, - udtfDescriptionAnnotation.description(), - annotation + for (Method method : theClass.getMethods()) { + if (method.getAnnotation(Udtf.class) != null) { + final Udtf annotation = method.getAnnotation(Udtf.class); + try { + if (method.getReturnType() != List.class) { + throw new KsqlException(String + .format("UDTF functions must return a List. Class %s Method %s", + theClass.getName(), method.getName() )); - } catch (final KsqlException e) { - if (throwExceptionOnLoadFailure) { - throw e; - } else { - LOGGER.warn( - "Failed to add UDTF to the MetaStore. name={} method={}", - udtfDescriptionAnnotation.name(), - method, - e + } + final Type ret = method.getGenericReturnType(); + if (!(ret instanceof ParameterizedType)) { + throw new KsqlException(String + .format( + "UDTF functions must return a parameterized List. Class %s Method %s", + theClass.getName(), method.getName() + )); + } + final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0]; + final Schema returnType = FunctionLoaderUtils + .getReturnType(method, typeArg, annotation.schema(), typeParser); + final List parameters = FunctionLoaderUtils + .createParameters(method, functionName, typeParser); + final KsqlTableFunction tableFunction = + createTableFunction(method, FunctionName.of(functionName), returnType, + parameters, + udtfDescriptionAnnotation.description(), + annotation ); - } + factory.addFunction(tableFunction); + } catch (final KsqlException e) { + if (throwExceptionOnLoadFailure) { + throw e; + } else { + LOGGER.warn( + "Failed to add UDTF to the MetaStore. name={} method={}", + udtfDescriptionAnnotation.name(), + method, + e + ); } - return Optional.empty(); - }) - .filter(Optional::isPresent) - .map(Optional::get) - .forEach(udtfFactory::addFunction); + } + } + } - functionRegistry.addTableFunctionFactory(udtfFactory); + functionRegistry.addTableFunctionFactory(factory); } private KsqlTableFunction createTableFunction( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java b/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java index acf82b0382b7..735b0b447fc2 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/UserFunctionLoader.java @@ -72,7 +72,7 @@ public UserFunctionLoader( Objects.requireNonNull(metrics, "metrics can't be null"); this.loadCustomerUdfs = loadCustomerUdfs; final SqlTypeParser typeParser = SqlTypeParser.create(TypeRegistry.EMPTY); - this.udfLoader = new UdfLoader(functionRegistry, metrics, typeParser, parentClassLoader, false); + this.udfLoader = new UdfLoader(functionRegistry, metrics, typeParser, false); this.udafLoader = new UdafLoader(functionRegistry, metrics, typeParser); this.udtfLoader = new UdtfLoader(functionRegistry, metrics, typeParser, false); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index 4683b6edc5af..4c1890a38b26 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -280,7 +280,6 @@ public void shouldThrowOnMissingAnnotation() throws ClassNotFoundException { functionRegistry, Optional.empty(), SqlTypeParser.create(TypeRegistry.EMPTY), - udfClassLoader, true ); @@ -307,7 +306,6 @@ public void shouldThrowOnMissingSchemaProvider() throws ClassNotFoundException { functionRegistry, Optional.empty(), SqlTypeParser.create(TypeRegistry.EMPTY), - udfClassLoader, true ); @@ -335,7 +333,6 @@ public void shouldThrowOnReturnDecimalWithoutSchemaProvider() throws ClassNotFou functionRegistry, Optional.empty(), SqlTypeParser.create(TypeRegistry.EMPTY), - udfClassLoader, true ); @@ -467,7 +464,6 @@ public void shouldNotLoadInternalUdfs() { functionRegistry, Optional.empty(), SqlTypeParser.create(TypeRegistry.EMPTY), - PARENT_CLASS_LOADER, true ); udfLoader.loadUdfFromClass(UdfLoaderTest.SomeFunctionUdf.class); @@ -488,7 +484,6 @@ public void shouldLoadSomeFunction() { functionRegistry, Optional.empty(), SqlTypeParser.create(TypeRegistry.EMPTY), - PARENT_CLASS_LOADER, true ); final List args = ImmutableList.of(