Skip to content

Commit

Permalink
feat: introduce transform and reduce invocation function for lambdas
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Feb 23, 2021
1 parent ea30397 commit a9eb41d
Show file tree
Hide file tree
Showing 58 changed files with 5,073 additions and 371 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.lambda;

import io.confluent.ksql.execution.codegen.helpers.TriFunction;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiFunction;

/**
* Reduce a collection using an initial state and function
*/
@UdfDescription(
name = "reduce",
category = FunctionCategory.LAMBDA,
description = "Reduce the input collection down to a single value "
+ "using an initial state and a function. "
+ "The initial state (s) is passed into the scope of the function. "
+ "Each invocation returns a new value for s, "
+ "which the next invocation will receive. "
+ "The final value for s is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class Reduce {

@Udf(description = "When reducing an array, "
+ "the reduce function must have two arguments. "
+ "The two arguments for the reduce function are in order: "
+ "the array item and the state. "
+ "The final state is returned."
)
public <T,S> S reduceArray(
@UdfParameter(description = "The initial state.") final S initialState,
@UdfParameter(description = "The array.") final List<T> list,
@UdfParameter(description = "The reduce function.") final BiFunction<S, T, S> biFunction
) {
if (initialState == null) {
return null;
}

if (list == null) {
return initialState;
}

S state = initialState;
for (T listItem: list) {
state = biFunction.apply(state, listItem);
}
return state;
}

@Udf(description = "When reducing a map, "
+ "the reduce function must have three arguments. "
+ "The three arguments for the reduce function are in order: "
+ "the key, the value, and the state. "
+ "The final state is returned."
)
public <K,V,S> S reduceMap(
@UdfParameter(description = "The initial state.") final S initialState,
@UdfParameter(description = "The map.") final Map<K, V> map,
@UdfParameter(description = "The reduce function.") final TriFunction<S, K, V, S> triFunction
) {
if (initialState == null) {
return null;
}

if (map == null) {
return initialState;
}

S state = initialState;
for (Entry<K, V> entry : map.entrySet()) {
state = triFunction.apply(state, entry.getKey(), entry.getValue());
}
return state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,56 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.map;
package io.confluent.ksql.function.udf.lambda;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Transform a map's key and values using two lambda functions
* Transform a collection with a function
*/
@UdfDescription(
name = "map_transform",
category = FunctionCategory.MAP,
description = "Apply one function to each key and "
+ "one function to each value of a map. "
+ "The two arguments for each function are in order: key, value. "
+ "The first function provided will be applied to each key and the "
+ "second one applied to each value. "
+ "The transformed map is returned.",
name = "transform",
category = FunctionCategory.LAMBDA,
description = "Apply a function to each element in a collection. "
+ "The transformed collection is returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class MapTransform {
public class Transform {

@Udf
public <K,V,R,T> Map<R,T> mapTransform(
@Udf(description = "When transforming an array, "
+ "the function provided must have two arguments. "
+ "The two arguments for each function are in order: "
+ "the key and then the value. "
+ "The transformed array is returned."
)
public <T, R> List<R> transformArray(
@UdfParameter(description = "The array") final List<T> array,
@UdfParameter(description = "The lambda function") final Function<T, R> function
) {
if (array == null) {
return null;
}
return array.stream().map(function::apply).collect(Collectors.toList());
}

@Udf(description = "When transforming a map, "
+ "two functions must be provided. "
+ "For each map entry, the first function provided will "
+ "be applied to the key and the second one applied to the value. "
+ "Each function must have two arguments. "
+ "The two arguments for each function are in order: the key and then the value. "
+ "The transformed map is returned."
)
public <K,V,R,T> Map<R,T> transformMap(
@UdfParameter(description = "The map") final Map<K, V> map,
@UdfParameter(description = "The key lambda function") final BiFunction<K, V, R> biFunction1,
@UdfParameter(description = "The value lambda function") final BiFunction<K, V, T> biFunction2
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit a9eb41d

Please sign in to comment.