Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: add ARRAY_LENGTH UDF
Browse files Browse the repository at this point in the history
fixes: confluentinc#4724

`ARRAY_LENGTH` returns the length of any array passed to it, or `0` if null is passed.
big-andy-coates committed Mar 6, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 2d43e8c commit 426f34d
Showing 3 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.List;

/**
* Returns the length of an array
*/
@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static.
@UdfDescription(name = "ARRAY_LENGTH", description = "Returns the length on an array")
public class ArrayLength {

@Udf
public <T> int calcArrayLength(
@UdfParameter(description = "The array") final List<T> array
) {
return array == null
? 0
: array.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.Test;

public class ArrayLengthTest {

private ArrayLength udf;

@Before
public void setUp() {
udf = new ArrayLength();
}

@Test
public void shouldReturnZeroForNullArray() {
assertThat(udf.calcArrayLength(null), is(0));
}

@Test
public void shouldReturnArraySize() {
assertThat(udf.calcArrayLength(ImmutableList.of()), is(0));
assertThat(udf.calcArrayLength(ImmutableList.of(1)), is(1));
assertThat(udf.calcArrayLength(ImmutableList.of("one", "two")), is(2));
}
}
Original file line number Diff line number Diff line change
@@ -76,6 +76,36 @@
{"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": [1, -1]}},
{"topic": "OUTPUT", "key": "1", "value": {"KSQL_COL_0": [9, 6, 3, 0]}}
]
},
{
"name": "array_length - primitives",
"statements": [
"CREATE STREAM INPUT (boolean_array ARRAY<BOOLEAN>, int_array ARRAY<INT>, bigint_array ARRAY<BIGINT>, double_array ARRAY<DOUBLE>, string_array ARRAY<STRING>, decimal_array ARRAY<DECIMAL(2,1)>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ARRAY_LENGTH(boolean_array) AS boolean_len, ARRAY_LENGTH(int_array) AS int_len, ARRAY_LENGTH(bigint_array) AS bigint_len, ARRAY_LENGTH(double_array) AS double_len, ARRAY_LENGTH(string_array) AS string_len , ARRAY_LENGTH(decimal_array) AS decimal_len FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": {"boolean_array": [true], "int_array": [-1, 0], "bigint_array": [-1, 0, 1], "double_array": [0.0, 0.1, 0.2, 0.3], "string_array": ["a", "b", "c", "d", "e"], "decimal_array": [1.0, 1.1, 1.2, 1.3, 1.4, 1.5]}},
{"topic": "test_topic", "value": {}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"BOOLEAN_LEN": 1, "INT_LEN": 2, "BIGINT_LEN": 3, "DOUBLE_LEN": 4, "STRING_LEN": 5, "DECIMAL_LEN": 6}},
{"topic": "OUTPUT", "value": {"BOOLEAN_LEN": 0, "INT_LEN": 0, "BIGINT_LEN": 0, "DOUBLE_LEN": 0, "STRING_LEN": 0, "DECIMAL_LEN": 0}}
]
},
{
"name": "array_length - structured",
"statements": [
"CREATE STREAM INPUT (array_array ARRAY<ARRAY<BOOLEAN>>, map_array ARRAY<MAP<STRING,INT>>, struct_array ARRAY<STRUCT<V BIGINT>>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ARRAY_LENGTH(array_array) AS array_len, ARRAY_LENGTH(map_array) AS map_len, ARRAY_LENGTH(struct_array) AS struct_len FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": {"array_array": [[]], "map_array": [{}, {}], "struct_array": [{},{},{}]}},
{"topic": "test_topic", "value": {}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"ARRAY_LEN": 1, "MAP_LEN": 2, "STRUCT_LEN": 3}},
{"topic": "OUTPUT", "value": {"ARRAY_LEN": 0, "MAP_LEN": 0, "STRUCT_LEN": 0}}
]
}
]
}

0 comments on commit 426f34d

Please sign in to comment.