diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayLength.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayLength.java new file mode 100644 index 000000000000..22dba010dbf5 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/array/ArrayLength.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import java.util.List; + +/** + * Returns the length of an array + */ +@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static. +@UdfDescription(name = "ARRAY_LENGTH", description = "Returns the length on an array") +public class ArrayLength { + + @Udf + public int calcArrayLength( + @UdfParameter(description = "The array") final List array + ) { + return array == null + ? 0 + : array.size(); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayLengthTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayLengthTest.java new file mode 100644 index 000000000000..22ae58bd6dc8 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/array/ArrayLengthTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.array; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; + +public class ArrayLengthTest { + + private ArrayLength udf; + + @Before + public void setUp() { + udf = new ArrayLength(); + } + + @Test + public void shouldReturnZeroForNullArray() { + assertThat(udf.calcArrayLength(null), is(0)); + } + + @Test + public void shouldReturnArraySize() { + assertThat(udf.calcArrayLength(ImmutableList.of()), is(0)); + assertThat(udf.calcArrayLength(ImmutableList.of(1)), is(1)); + assertThat(udf.calcArrayLength(ImmutableList.of("one", "two")), is(2)); + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/array.json b/ksql-functional-tests/src/test/resources/query-validation-tests/array.json index 351c8f372f9c..8131916ecf3f 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/array.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/array.json @@ -76,6 +76,36 @@ {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": [1, -1]}}, {"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": [9, 6, 3, 0]}} ] + }, + { + "name": "array_length - primitives", + "statements": [ + "CREATE STREAM INPUT (boolean_array ARRAY, int_array ARRAY, bigint_array ARRAY, double_array ARRAY, string_array ARRAY, decimal_array ARRAY) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ARRAY_LENGTH(boolean_array) AS boolean_len, ARRAY_LENGTH(int_array) AS int_len, ARRAY_LENGTH(bigint_array) AS bigint_len, ARRAY_LENGTH(double_array) AS double_len, ARRAY_LENGTH(string_array) AS string_len , ARRAY_LENGTH(decimal_array) AS decimal_len FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"boolean_array": [true], "int_array": [-1, 0], "bigint_array": [-1, 0, 1], "double_array": [0.0, 0.1, 0.2, 0.3], "string_array": ["a", "b", "c", "d", "e"], "decimal_array": [1.0, 1.1, 1.2, 1.3, 1.4, 1.5]}}, + {"topic": "test_topic", "value": {}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"BOOLEAN_LEN": 1, "INT_LEN": 2, "BIGINT_LEN": 3, "DOUBLE_LEN": 4, "STRING_LEN": 5, "DECIMAL_LEN": 6}}, + {"topic": "OUTPUT", "value": {"BOOLEAN_LEN": 0, "INT_LEN": 0, "BIGINT_LEN": 0, "DOUBLE_LEN": 0, "STRING_LEN": 0, "DECIMAL_LEN": 0}} + ] + }, + { + "name": "array_length - structured", + "statements": [ + "CREATE STREAM INPUT (array_array ARRAY>, map_array ARRAY>, struct_array ARRAY>) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT ARRAY_LENGTH(array_array) AS array_len, ARRAY_LENGTH(map_array) AS map_len, ARRAY_LENGTH(struct_array) AS struct_len FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"array_array": [[]], "map_array": [{}, {}], "struct_array": [{},{},{}]}}, + {"topic": "test_topic", "value": {}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"ARRAY_LEN": 1, "MAP_LEN": 2, "STRUCT_LEN": 3}}, + {"topic": "OUTPUT", "value": {"ARRAY_LEN": 0, "MAP_LEN": 0, "STRUCT_LEN": 0}} + ] } ] } \ No newline at end of file