-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: new UDFs for working with Maps (#5536)
* initial version 3 new map udfs * make a couple of fields final * review feedback * Update docs/developer-guide/ksqldb-reference/scalar-functions.md Jim's doc feedback Co-authored-by: Jim Galasyn <[email protected]> * Update docs/developer-guide/ksqldb-reference/scalar-functions.md Jim's doc feedback Co-authored-by: Jim Galasyn <[email protected]> * Update docs/developer-guide/ksqldb-reference/scalar-functions.md Jim's doc feedback Co-authored-by: Jim Galasyn <[email protected]> Co-authored-by: Jim Galasyn <[email protected]>
- Loading branch information
1 parent
00f5083
commit bc9ad2e
Showing
23 changed files
with
1,867 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapKeys.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and limitations under the | ||
* License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udf.map; | ||
|
||
import com.google.common.collect.Lists; | ||
import io.confluent.ksql.function.udf.Udf; | ||
import io.confluent.ksql.function.udf.UdfDescription; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
@UdfDescription( | ||
name = "map_keys", | ||
description = "Returns an array of all the keys from the specified map, " | ||
+ "or NULL if the input map is NULL.") | ||
public class MapKeys { | ||
|
||
@Udf | ||
public <T> List<String> mapKeys(final Map<String, T> input) { | ||
if (input == null) { | ||
return null; | ||
} | ||
return Lists.newArrayList(input.keySet()); | ||
} | ||
|
||
} |
52 changes: 52 additions & 0 deletions
52
ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapUnion.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and limitations under the | ||
* License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udf.map; | ||
|
||
import io.confluent.ksql.function.udf.Udf; | ||
import io.confluent.ksql.function.udf.UdfDescription; | ||
import io.confluent.ksql.function.udf.UdfParameter; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
@UdfDescription( | ||
name = "map_union", | ||
description = "Returns a new map containing the union of all entries from both input maps. " | ||
+ "If a key is present in both input maps then the value from map2 is the one which " | ||
+ "appears in the result. Returns NULL if all of the input maps are NULL.") | ||
public class MapUnion { | ||
|
||
@Udf | ||
public <T> Map<String, T> union( | ||
@UdfParameter(description = "first map to union") final Map<String, T> map1, | ||
@UdfParameter(description = "second map to union") final Map<String, T> map2) { | ||
|
||
final List<Map<String, T>> nonNullInputs = | ||
Stream.of(map1, map2) | ||
.filter(Objects::nonNull) | ||
.collect(Collectors.toList()); | ||
if (nonNullInputs.size() == 0) { | ||
return null; | ||
} | ||
|
||
final Map<String, T> output = new HashMap<>(); | ||
nonNullInputs.stream() | ||
.forEach(output::putAll); | ||
return output; | ||
} | ||
} |
37 changes: 37 additions & 0 deletions
37
ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/map/MapValues.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and limitations under the | ||
* License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udf.map; | ||
|
||
import com.google.common.collect.Lists; | ||
import io.confluent.ksql.function.udf.Udf; | ||
import io.confluent.ksql.function.udf.UdfDescription; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
@UdfDescription( | ||
name = "map_values", | ||
description = "Returns an array of all the values from the specified map, " | ||
+ "or NULL if the input map is NULL.") | ||
public class MapValues { | ||
|
||
@Udf | ||
public <T> List<T> mapValues(final Map<String, T> input) { | ||
if (input == null) { | ||
return null; | ||
} | ||
return Lists.newArrayList(input.values()); | ||
} | ||
|
||
} |
88 changes: 88 additions & 0 deletions
88
ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/map/MapKeysTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use this file except | ||
* in compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License | ||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and limitations under the | ||
* License. | ||
*/ | ||
|
||
package io.confluent.ksql.function.udf.map; | ||
|
||
import static org.hamcrest.CoreMatchers.nullValue; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
import static org.hamcrest.Matchers.empty; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
import com.google.common.collect.Maps; | ||
import java.math.BigDecimal; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
public class MapKeysTest { | ||
|
||
private MapKeys udf; | ||
|
||
@Before | ||
public void setUp() { | ||
udf = new MapKeys(); | ||
} | ||
|
||
@Test | ||
public void shouldGetKeys() { | ||
final Map<String, String> input = new HashMap<>(); | ||
input.put("foo", "spam"); | ||
input.put("bar", "baloney"); | ||
assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar")); | ||
} | ||
|
||
@Test | ||
public void shouldHandleComplexValueTypes() { | ||
final Map<String, Map<String, List<Double>>> input = Maps.newHashMap(); | ||
|
||
final Map<String, List<Double>> entry1 = Maps.newHashMap(); | ||
entry1.put("apple", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); | ||
entry1.put("banana", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); | ||
input.put("foo", entry1); | ||
|
||
final Map<String, List<Double>> entry2 = Maps.newHashMap(); | ||
entry2.put("cherry", Arrays.asList(Double.valueOf(12.34), Double.valueOf(56.78))); | ||
entry2.put("date", Arrays.asList(Double.valueOf(43.21), Double.valueOf(87.65))); | ||
input.put("bar", entry2); | ||
|
||
assertThat(udf.mapKeys(input), containsInAnyOrder("foo", "bar")); | ||
} | ||
|
||
@Test | ||
public void shouldReturnNullForNullInput() { | ||
List<String> result = udf.mapKeys((Map<String, Long>) null); | ||
assertThat(result, is(nullValue())); | ||
} | ||
|
||
@Test | ||
public void shouldReturnNullsFromMapWithNulls() { | ||
final Map<String, Integer> input = Maps.newHashMap(); | ||
input.put("foo", 1); | ||
input.put(null, null); | ||
input.put("bar", null); | ||
List<String> result = udf.mapKeys(input); | ||
assertThat(result, containsInAnyOrder(null, "foo", "bar")); | ||
} | ||
|
||
@Test | ||
public void shouldReturnEmptyListFromEmptyMap() { | ||
final Map<String, BigDecimal> input = Maps.newHashMap(); | ||
assertThat(udf.mapKeys(input), empty()); | ||
} | ||
|
||
} |
Oops, something went wrong.