Skip to content

Commit

Permalink
refactor: Cleaned up loading code (#3726)
Browse files Browse the repository at this point in the history
This PR cleans up some of the function loading code, mainly replacing some overly complex streams based logic with simpler direct iteration.
  • Loading branch information
purplefox authored and Tim Fox committed Nov 5, 2019
1 parent 0a4558b commit 6ef378f
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import io.confluent.ksql.schema.ksql.SqlTypeParser;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,53 +52,49 @@ class UdafLoader {

void loadUdafFromClass(final Class<?> theClass, final String path) {
final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class);
final List<UdafFactoryInvoker> argCreators
= Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(UdafFactory.class) != null)
.filter(method -> {
if (!Modifier.isStatic(method.getModifiers())) {
LOGGER.warn(
"Trying to create a UDAF from a non-static factory method. Udaf factory"
+ " methods must be static. class={}, method={}, name={}",
method.getDeclaringClass(),
method.getName(),
udafAnnotation.name()
);
return false;
}
return true;
})
.map(method -> {
final UdafFactory annotation = method.getAnnotation(UdafFactory.class);
try {
LOGGER.info(
"Adding UDAF name={} from path={} class={}",
udafAnnotation.name(),
path,
method.getDeclaringClass()
);
return Optional.of(createUdafFactoryInvoker(
method,
FunctionName.of(udafAnnotation.name()),
annotation.description(),
annotation.paramSchema(),
annotation.aggregateSchema(),
annotation.returnSchema()
));
} catch (final Exception e) {
LOGGER.warn(
"Failed to create UDAF name={}, method={}, class={}, path={}",
udafAnnotation.name(),
method.getName(),
method.getDeclaringClass(),
path,
e
);
}
return Optional.<UdafFactoryInvoker>empty();
}).filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());

final List<UdafFactoryInvoker> invokers = new ArrayList<>();
for (Method method : theClass.getMethods()) {
if (method.getAnnotation(UdafFactory.class) != null) {
if (!Modifier.isStatic(method.getModifiers())) {
LOGGER.warn(
"Trying to create a UDAF from a non-static factory method. Udaf factory"
+ " methods must be static. class={}, method={}, name={}",
method.getDeclaringClass(),
method.getName(),
udafAnnotation.name()
);
continue;
}
final UdafFactory annotation = method.getAnnotation(UdafFactory.class);
try {
LOGGER.info(
"Adding UDAF name={} from path={} class={}",
udafAnnotation.name(),
path,
method.getDeclaringClass()
);
final UdafFactoryInvoker invoker = createUdafFactoryInvoker(
method,
FunctionName.of(udafAnnotation.name()),
annotation.description(),
annotation.paramSchema(),
annotation.aggregateSchema(),
annotation.returnSchema()
);
invokers.add(invoker);
} catch (final Exception e) {
LOGGER.warn(
"Failed to create UDAF name={}, method={}, class={}, path={}",
udafAnnotation.name(),
method.getName(),
method.getDeclaringClass(),
path,
e
);
}
}
}

functionRegistry.addAggregateFunctionFactory(new UdafAggregateFunctionFactory(
new UdfMetadata(
Expand All @@ -110,7 +105,7 @@ void loadUdafFromClass(final Class<?> theClass, final String path) {
path,
false
),
argCreators
invokers
));
}

Expand Down
103 changes: 57 additions & 46 deletions ksql-engine/src/main/java/io/confluent/ksql/function/UdfLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.SqlTypeParser;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
Expand All @@ -45,24 +46,20 @@ public class UdfLoader {
private final MutableFunctionRegistry functionRegistry;
private final Optional<Metrics> metrics;
private final SqlTypeParser typeParser;
private final ClassLoader parentClassLoader;
private final boolean throwExceptionOnLoadFailure;

UdfLoader(
final MutableFunctionRegistry functionRegistry,
final Optional<Metrics> metrics,
final SqlTypeParser typeParser,
final ClassLoader parentClassLoader,
final boolean throwExceptionOnLoadFailure
) {
this.functionRegistry = functionRegistry;
this.metrics = metrics;
this.typeParser = typeParser;
this.parentClassLoader = parentClassLoader;
this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure;
}

// Does not handle customer udfs, i.e the loader is the ParentClassLoader and path is internal
// This method is only used from tests
@VisibleForTesting
void loadUdfFromClass(final Class<?>... udfClasses) {
Expand Down Expand Up @@ -102,35 +99,36 @@ void loadUdfFromClass(

functionRegistry.ensureFunctionFactory(factory);

Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(Udf.class) != null)
.map(method -> {
try {
return Optional.of(createFunction(theClass, udfDescriptionAnnotation, method, path,
sensorName, udfClass
));
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDF to the MetaStore. name={} method={}",
udfDescriptionAnnotation.name(),
method,
e
);
}
for (Method method : theClass.getMethods()) {
final Udf udfAnnotation = method.getAnnotation(Udf.class);
if (udfAnnotation != null) {
final KsqlFunction function;
try {
function = createFunction(theClass, udfDescriptionAnnotation, udfAnnotation, method, path,
sensorName, udfClass
);
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDF to the MetaStore. name={} method={}",
udfDescriptionAnnotation.name(),
method,
e
);
continue;
}
return Optional.<KsqlFunction>empty();
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(factory::addFunction);
}
factory.addFunction(function);
}
}
}

private KsqlFunction createFunction(
final Class theClass,
final UdfDescription udfDescriptionAnnotation,
final Udf udfAnnotation,
final Method method,
final String path,
final String sensorName,
Expand All @@ -140,7 +138,6 @@ private KsqlFunction createFunction(
FunctionLoaderUtils
.instantiateFunctionInstance(method.getDeclaringClass(), udfDescriptionAnnotation.name());
final FunctionInvoker invoker = FunctionLoaderUtils.createFunctionInvoker(method);
final Udf udfAnnotation = method.getAnnotation(Udf.class);
final String functionName = udfDescriptionAnnotation.name();

LOGGER.info("Adding function " + functionName + " for method " + method);
Expand All @@ -151,33 +148,47 @@ private KsqlFunction createFunction(
final Schema javaReturnSchema = FunctionLoaderUtils
.getReturnType(method, udfAnnotation.schema(), typeParser);

return KsqlFunction.create(
FunctionLoaderUtils.handleUdfReturnSchema(
final Function<List<Schema>, Schema> schemaProviderFunction = FunctionLoaderUtils
.handleUdfReturnSchema(
theClass,
javaReturnSchema,
udfAnnotation.schemaProvider(),
udfDescriptionAnnotation.name()
),
);

return KsqlFunction.create(
schemaProviderFunction,
javaReturnSchema,
parameters,
FunctionName.of(functionName.toUpperCase()),
udfClass,
ksqlConfig -> {
final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance(
method.getDeclaringClass(), udfDescriptionAnnotation.name());
if (actualUdf instanceof Configurable) {
((Configurable) actualUdf)
.configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName));
}
final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf);
return metrics.<Kudf>map(m -> new UdfMetricProducer(
m.getSensor(sensorName),
theUdf,
Time.SYSTEM
)).orElse(theUdf);
}, udfAnnotation.description(),
getUdfFactory(method, udfDescriptionAnnotation, functionName, invoker, sensorName),
udfAnnotation.description(),
path,
method.isVarArgs()
);
}

private Function<KsqlConfig, Kudf> getUdfFactory(
final Method method,
final UdfDescription udfDescriptionAnnotation,
final String functionName,
final FunctionInvoker invoker,
final String sensorName
) {
return ksqlConfig -> {
final Object actualUdf = FunctionLoaderUtils.instantiateFunctionInstance(
method.getDeclaringClass(), udfDescriptionAnnotation.name());
if (actualUdf instanceof Configurable) {
((Configurable) actualUdf)
.configure(ksqlConfig.getKsqlFunctionsConfigProps(functionName));
}
final PluggableUdf theUdf = new PluggableUdf(invoker, actualUdf);
return metrics.<Kudf>map(m -> new UdfMetricProducer(
m.getSensor(sensorName),
theUdf,
Time.SYSTEM
)).orElse(theUdf);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -79,57 +78,54 @@ public void loadUdtfFromClass(
false
);

final TableFunctionFactory udtfFactory = new TableFunctionFactory(metadata);
final TableFunctionFactory factory = new TableFunctionFactory(metadata);

Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(Udtf.class) != null)
.map(method -> {
try {
final Udtf annotation = method.getAnnotation(Udtf.class);
if (method.getReturnType() != List.class) {
throw new KsqlException(String
.format("UDTF functions must return a List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type ret = method.getGenericReturnType();
if (!(ret instanceof ParameterizedType)) {
throw new KsqlException(String
.format(
"UDTF functions must return a parameterized List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0];
final Schema returnType = FunctionLoaderUtils
.getReturnType(method, typeArg, annotation.schema(), typeParser);
final List<Schema> parameters = FunctionLoaderUtils
.createParameters(method, functionName, typeParser);
return Optional
.of(createTableFunction(method, FunctionName.of(functionName), returnType,
parameters,
udtfDescriptionAnnotation.description(),
annotation
for (Method method : theClass.getMethods()) {
if (method.getAnnotation(Udtf.class) != null) {
final Udtf annotation = method.getAnnotation(Udtf.class);
try {
if (method.getReturnType() != List.class) {
throw new KsqlException(String
.format("UDTF functions must return a List. Class %s Method %s",
theClass.getName(), method.getName()
));
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDTF to the MetaStore. name={} method={}",
udtfDescriptionAnnotation.name(),
method,
e
}
final Type ret = method.getGenericReturnType();
if (!(ret instanceof ParameterizedType)) {
throw new KsqlException(String
.format(
"UDTF functions must return a parameterized List. Class %s Method %s",
theClass.getName(), method.getName()
));
}
final Type typeArg = ((ParameterizedType) ret).getActualTypeArguments()[0];
final Schema returnType = FunctionLoaderUtils
.getReturnType(method, typeArg, annotation.schema(), typeParser);
final List<Schema> parameters = FunctionLoaderUtils
.createParameters(method, functionName, typeParser);
final KsqlTableFunction tableFunction =
createTableFunction(method, FunctionName.of(functionName), returnType,
parameters,
udtfDescriptionAnnotation.description(),
annotation
);
}
factory.addFunction(tableFunction);
} catch (final KsqlException e) {
if (throwExceptionOnLoadFailure) {
throw e;
} else {
LOGGER.warn(
"Failed to add UDTF to the MetaStore. name={} method={}",
udtfDescriptionAnnotation.name(),
method,
e
);
}
return Optional.<KsqlTableFunction>empty();
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(udtfFactory::addFunction);
}
}
}

functionRegistry.addTableFunctionFactory(udtfFactory);
functionRegistry.addTableFunctionFactory(factory);
}

private KsqlTableFunction createTableFunction(
Expand Down
Loading

0 comments on commit 6ef378f

Please sign in to comment.