Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up loading of UDFs/UDTFs/UDAFs #3726

Merged
merged 1 commit into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import io.confluent.ksql.schema.ksql.SqlTypeParser;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,53 +52,49 @@ class UdafLoader {

void loadUdafFromClass(final Class<?> theClass, final String path) {
final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class);
final List<UdafFactoryInvoker> argCreators
= Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(UdafFactory.class) != null)
.filter(method -> {
if (!Modifier.isStatic(method.getModifiers())) {
LOGGER.warn(
"Trying to create a UDAF from a non-static factory method. Udaf factory"
+ " methods must be static. class={}, method={}, name={}",
method.getDeclaringClass(),
method.getName(),
udafAnnotation.name()
);
return false;
}
return true;
})
.map(method -> {
final UdafFactory annotation = method.getAnnotation(UdafFactory.class);
try {
LOGGER.info(
"Adding UDAF name={} from path={} class={}",
udafAnnotation.name(),
path,
method.getDeclaringClass()
);
return Optional.of(createUdafFactoryInvoker(
method,
FunctionName.of(udafAnnotation.name()),
annotation.description(),
annotation.paramSchema(),
annotation.aggregateSchema(),
annotation.returnSchema()
));
} catch (final Exception e) {
LOGGER.warn(
"Failed to create UDAF name={}, method={}, class={}, path={}",
udafAnnotation.name(),
method.getName(),
method.getDeclaringClass(),
path,
e
);
}
return Optional.<UdafFactoryInvoker>empty();
}).filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

final List<UdafFactoryInvoker> invokers = new ArrayList<>();
for (Method method : theClass.getMethods()) {
if (method.getAnnotation(UdafFactory.class) != null) {
if (!Modifier.isStatic(method.getModifiers())) {
LOGGER.warn(
"Trying to create a UDAF from a non-static factory method. Udaf factory"
+ " methods must be static. class={}, method={}, name={}",
method.getDeclaringClass(),
method.getName(),
udafAnnotation.name()
);
continue;
}
final UdafFactory annotation = method.getAnnotation(UdafFactory.class);
try {
LOGGER.info(
"Adding UDAF name={} from path={} class={}",
udafAnnotation.name(),
path,
method.getDeclaringClass()
);
final UdafFactoryInvoker invoker = createUdafFactoryInvoker(
method,
FunctionName.of(udafAnnotation.name()),
annotation.description(),
annotation.paramSchema(),
annotation.aggregateSchema(),
annotation.returnSchema()
);
invokers.add(invoker);
} catch (final Exception e) {
LOGGER.warn(
"Failed to create UDAF name={}, method={}, class={}, path={}",
udafAnnotation.name(),
method.getName(),
method.getDeclaringClass(),
path,
e
);
}
}
}

functionRegistry.addAggregateFunctionFactory(new UdafAggregateFunctionFactory(
new UdfMetadata(
Expand All @@ -110,7 +105,7 @@ void loadUdafFromClass(final Class<?> theClass, final String path) {
path,
false
),
argCreators
invokers
));
}

Expand Down
103 changes: 57 additions & 46 deletions ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.SqlTypeParser;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
Expand All @@ -45,24 +46,20 @@ public class UdfLoader {
private final MutableFunctionRegistry functionRegistry;
private final Optional<Metrics> metrics;
private final SqlTypeParser typeParser;
private final ClassLoader parentClassLoader;
private final boolean throwExceptionOnLoadFailure;

UdfLoader(
final MutableFunctionRegistry functionRegistry,
final Optional<Metrics> metrics,
final SqlTypeParser typeParser,
final ClassLoader parentClassLoader,
final boolean throwExceptionOnLoadFailure
) {
this.functionRegistry = functionRegistry;
this.metrics = metrics;
this.typeParser = typeParser;
this.parentClassLoader = parentClassLoader;
this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure;
}

// Does not handle customer udfs, i.e the loader is the ParentClassLoader and path is internal
// This method is only used from tests
@VisibleForTesting
void loadUdfFromClass(final Class<?>... udfClasses) {
Expand Down Expand Up @@ -102,35 +99,36 @@ void loadUdfFromClass(

functionRegistry.ensureFunctionFactory(factory);

Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(Udf.class) != null)
.map(method -> {
try {
return Optional.of(createFunction(theClass, udfDescriptionAnnotation, method, path,
sensorName, udfClass
));
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDF to the MetaStore. name={} method={}",
udfDescriptionAnnotation.name(),
method,
e
);
}
for (Method method : theClass.getMethods()) {
final Udf udfAnnotation = method.getAnnotation(Udf.class);
if (udfAnnotation != null) {
final KsqlFunction function;
try {
function = createFunction(theClass, udfDescriptionAnnotation, udfAnnotation, method, path,
sensorName, udfClass
);
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDF to the MetaStore. name={} method={}",
udfDescriptionAnnotation.name(),
method,
e
);
continue;
}
return Optional.<KsqlFunction>empty();
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(factory::addFunction);
}
factory.addFunction(function);
}
}
}

private KsqlFunction createFunction(
final Class theClass,
final UdfDescription udfDescriptionAnnotation,
final Udf udfAnnotation,
final Method method,
final String path,
final String sensorName,
Expand All @@ -140,7 +138,6 @@ private KsqlFunction createFunction(
FunctionLoaderUtils
.instantiateFunctionInstance(method.getDeclaringClass(), udfDescriptionAnnotation.name());
final FunctionInvoker invoker = FunctionLoaderUtils.createFunctionInvoker(method);
final Udf udfAnnotation = method.getAnnotation(Udf.class);
final String functionName = udfDescriptionAnnotation.name();

LOGGER.info("Adding function " + functionName + " for method " + method);
Expand All @@ -151,33 +148,47 @@ private KsqlFunction createFunction(
final Schema javaReturnSchema = FunctionLoaderUtils
.getReturnType(method, udfAnnotation.schema(), typeParser);

return KsqlFunction.create(
FunctionLoaderUtils.handleUdfReturnSchema(
final Function<List<Schema>, Schema> schemaProviderFunction = FunctionLoaderUtils
.handleUdfReturnSchema(
theClass,
javaReturnSchema,
udfAnnotation.schemaProvider(),
udfDescriptionAnnotation.name()
),
);

return KsqlFunction.create(
schemaProviderFunction,
javaReturnSchema,
parameters,
FunctionName.of(functionName.toUpperCase()),
udfClass,
ksqlConfig -> {
final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance(
method.getDeclaringClass(), udfDescriptionAnnotation.name());
if (actualUdf instanceof Configurable) {
((Configurable) actualUdf)
.configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName));
}
final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf);
return metrics.<Kudf>map(m -> new UdfMetricProducer(
m.getSensor(sensorName),
theUdf,
Time.SYSTEM
)).orElse(theUdf);
}, udfAnnotation.description(),
getUdfFactory(method, udfDescriptionAnnotation, functionName, invoker, sensorName),
udfAnnotation.description(),
path,
method.isVarArgs()
);
}

private Function<KsqlConfig, Kudf> getUdfFactory(
final Method method,
final UdfDescription udfDescriptionAnnotation,
final String functionName,
final FunctionInvoker invoker,
final String sensorName
) {
return ksqlConfig -> {
final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance(
method.getDeclaringClass(), udfDescriptionAnnotation.name());
if (actualUdf instanceof Configurable) {
((Configurable) actualUdf)
.configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName));
}
final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf);
return metrics.<Kudf>map(m -> new UdfMetricProducer(
m.getSensor(sensorName),
theUdf,
Time.SYSTEM
)).orElse(theUdf);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -79,57 +78,54 @@ public void loadUdtfFromClass(
false
);

final TableFunctionFactory udtfFactory = new TableFunctionFactory(metadata);
final TableFunctionFactory factory = new TableFunctionFactory(metadata);

Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(Udtf.class) != null)
.map(method -> {
try {
final Udtf annotation = method.getAnnotation(Udtf.class);
if (method.getReturnType() != List.class) {
throw new KsqlException(String
.format("UDTF functions must return a List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type ret = method.getGenericReturnType();
if (!(ret instanceof ParameterizedType)) {
throw new KsqlException(String
.format(
"UDTF functions must return a parameterized List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0];
final Schema returnType = FunctionLoaderUtils
.getReturnType(method, typeArg, annotation.schema(), typeParser);
final List<Schema> parameters = FunctionLoaderUtils
.createParameters(method, functionName, typeParser);
return Optional
.of(createTableFunction(method, FunctionName.of(functionName), returnType,
parameters,
udtfDescriptionAnnotation.description(),
annotation
for (Method method : theClass.getMethods()) {
if (method.getAnnotation(Udtf.class) != null) {
final Udtf annotation = method.getAnnotation(Udtf.class);
try {
if (method.getReturnType() != List.class) {
throw new KsqlException(String
.format("UDTF functions must return a List. Class %s Method %s",
theClass.getName(), method.getName()
));
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDTF to the MetaStore. name={} method={}",
udtfDescriptionAnnotation.name(),
method,
e
}
final Type ret = method.getGenericReturnType();
if (!(ret instanceof ParameterizedType)) {
throw new KsqlException(String
.format(
"UDTF functions must return a parameterized List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0];
final Schema returnType = FunctionLoaderUtils
.getReturnType(method, typeArg, annotation.schema(), typeParser);
final List<Schema> parameters = FunctionLoaderUtils
.createParameters(method, functionName, typeParser);
final KsqlTableFunction tableFunction =
createTableFunction(method, FunctionName.of(functionName), returnType,
parameters,
udtfDescriptionAnnotation.description(),
annotation
);
}
factory.addFunction(tableFunction);
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDTF to the MetaStore. name={} method={}",
udtfDescriptionAnnotation.name(),
method,
e
);
}
return Optional.<KsqlTableFunction>empty();
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(udtfFactory::addFunction);
}
}
}

functionRegistry.addTableFunctionFactory(udtfFactory);
functionRegistry.addTableFunctionFactory(factory);
}

private KsqlTableFunction createTableFunction(
Expand Down
Loading