diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index d4a6a036929a..831fc9aeca80 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -463,7 +463,7 @@ the entire substring is returned by default. For example, `REGEXP_EXTRACT("(.*) (.*)", 'hello there', 2)` returns "there". -### REGEXP_EXTRACT_ALL +### `REGEXP_EXTRACT_ALL` ```sql REGEXP_EXTRACT_ALL('.*', col1) @@ -481,6 +481,24 @@ the entire substring is returned by default. For example, `REGEXP_EXTRACT("(\\w+) (\\w+)", 'hello there nice day', 2)` returns `['there', 'day']`. +### `REGEXP_SPLIT_TO_ARRAY` + +```sql +REGEXP_SPLIT_TO_ARRAY(col1, 'a.b+') +``` + +Splits a string into an array of substrings based +on a regular expression. If there is no match, +the original string is returned as the only +element in the array. If the regular expression is empty, +then all characters in the string are split. +If either the string or the regular expression is `NULL`, a +NULL value is returned. + +If the regular expression is found at the beginning or end +of the string, or there are contiguous matches, +then an empty element is added to the array. + ### `SPLIT` ```sql diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/RegexpSplitToArray.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/RegexpSplitToArray.java new file mode 100644 index 000000000000..f09cce61381c --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/RegexpSplitToArray.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.string; + +import com.google.common.base.Splitter; +import io.confluent.ksql.function.KsqlFunctionException; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +@UdfDescription(name = "regexp_split_to_array", + author = KsqlConstants.CONFLUENT_AUTHOR, + description = "Splits a string into an array of substrings based on a regexp. " + + "If the regexp is found at the beginning of the string, end of the string, or there " + + "are contiguous matches in the string, then empty strings are added to the array. " + + "If the regexp is not found, then the original string is returned as the only " + + "element in the array. If the regexp is empty, then all characters in the string are " + + "split.") +public class RegexpSplitToArray { + + @Udf(description = "Splits a string into an array of substrings based on a regexp.") + public List regexpSplit( + @UdfParameter( + description = "The string to be split. If NULL, then function returns NULL.") + final String string, + @UdfParameter( + description = "The regular expression to split the string by. " + + "If NULL, then function returns NULL.") + final String regexp) { + if (string == null || regexp == null) { + return null; + } + + // Use Guava version to be compatible with other splitting functions. + final Pattern p = getPattern(regexp); + if (regexp.isEmpty() || p.matcher("").matches()) { + return Arrays.asList(p.split(string)); + } else { + return Splitter.on(p).splitToList(string); + } + } + + private Pattern getPattern(final String regexp) { + try { + return Pattern.compile(regexp); + } catch (PatternSyntaxException e) { + throw new KsqlFunctionException("Invalid regular expression pattern: " + regexp, e); + } + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/RegexpSplitToArrayTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/RegexpSplitToArrayTest.java new file mode 100644 index 000000000000..9148a5ca04cc --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/RegexpSplitToArrayTest.java @@ -0,0 +1,79 @@ +package io.confluent.ksql.function.udf.string; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import io.confluent.ksql.function.KsqlFunctionException; +import org.junit.Test; + +public class RegexpSplitToArrayTest { + + private final RegexpSplitToArray udf = new RegexpSplitToArray(); + + @Test + public void shouldReturnNull() { + assertThat(udf.regexpSplit(null, "a"), is(nullValue())); + assertThat(udf.regexpSplit("string", null), is(nullValue())); + assertThat(udf.regexpSplit(null, null), is(nullValue())); + } + + @Test + public void shouldReturnOriginalStringOnNotFoundRegexp() { + assertThat(udf.regexpSplit("", "z"), contains("")); + assertThat(udf.regexpSplit("x-y", "z"), contains("x-y")); + } + + @Test + public void shouldSplitAllCharactersByGivenAnEmptyRegexp() { + assertThat(udf.regexpSplit("", ""), contains("")); + assertThat(udf.regexpSplit("x-y", ""), contains("x", "-", "y")); + assertThat(udf.regexpSplit("x", ""), contains("x")); + } + + @Test + public void shouldSplitStringByGivenRegexp() { + assertThat(udf.regexpSplit("x-y", "-"), contains("x", "y")); + assertThat(udf.regexpSplit("x-y", "x"), contains("", "-y")); + assertThat(udf.regexpSplit("x-y", "y"), contains("x-", "")); + assertThat(udf.regexpSplit("a-b-c-d", "-"), contains("a", "b", "c", "d")); + + assertThat(udf.regexpSplit("x-y", "."), contains("", "", "", "")); + assertThat(udf.regexpSplit("a-b-c-b-d", ".b."), contains("a", "c", "d")); + assertThat(udf.regexpSplit("a-b-c", "^.."), contains("", "b-c")); + assertThat(udf.regexpSplit("a-b-ccatd-ecatf", "(-|cat)"), + contains("a", "b", "c", "d", "e", "f")); + } + + @Test + public void shouldSplitAndAddEmptySpacesIfRegexpIsFoundAtTheBeginningOrEnd() { + assertThat(udf.regexpSplit("-A", "-"), contains("", "A")); + assertThat(udf.regexpSplit("-A-B", "-"), contains("", "A", "B")); + assertThat(udf.regexpSplit("A-", "-"), contains("A", "")); + assertThat(udf.regexpSplit("A-B-", "-"), contains("A", "B", "")); + assertThat(udf.regexpSplit("-A-B-", "-"), contains("", "A", "B", "")); + + assertThat(udf.regexpSplit("A", "^"), contains("A")); + assertThat(udf.regexpSplit("A", "$"), contains("A")); + assertThat(udf.regexpSplit("AB", "^"), contains("AB")); + assertThat(udf.regexpSplit("AB", "$"), contains("AB")); + } + + @Test + public void shouldSplitAndAddEmptySpacesIfRegexIsFoundInContiguousPositions() { + assertThat(udf.regexpSplit("A--A", "-"), contains("A", "", "A")); + assertThat(udf.regexpSplit("z--A--z", "-"), contains("z", "", "A", "", "z")); + assertThat(udf.regexpSplit("--A--A", "-"), contains("", "", "A", "", "A")); + assertThat(udf.regexpSplit("A--A--", "-"), contains("A", "", "A", "", "")); + + assertThat(udf.regexpSplit("aababa", "ab"), contains("a", "", "a")); + assertThat(udf.regexpSplit("aababa", "(ab)+"), contains("a", "a")); + assertThat(udf.regexpSplit("aabcda", "(ab|cd)"), contains("a", "", "a")); + } + + @Test(expected = KsqlFunctionException.class) + public void shouldThrowOnInvalidPattern() { + udf.regexpSplit("abcd", "(()"); + } +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/plan.json new file mode 100644 index 000000000000..543307e23a26 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/plan.json @@ -0,0 +1,126 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, INPUT_STRING STRING, PATTERN STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n REGEXP_SPLIT_TO_ARRAY(TEST.INPUT_STRING, TEST.PATTERN) EXTRACTED\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `EXTRACTED` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "REGEXP_SPLIT_TO_ARRAY(INPUT_STRING, PATTERN) AS EXTRACTED" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.error.classifier.regex" : "" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/spec.json new file mode 100644 index 000000000000..c63b2f47909c --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/spec.json @@ -0,0 +1,116 @@ +{ + "version" : "6.0.0", + "timestamp" : 1591047957798, + "path" : "query-validation-tests/split.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT> NOT NULL" + }, + "testCase" : { + "name" : "regexp_split_to_array", + "inputs" : [ { + "topic" : "test_topic", + "key" : "", + "value" : { + "input_string" : "aabcda", + "pattern" : "(ab|cd)" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "input_string" : "aabdcda", + "pattern" : "(ab|cd)" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "input_string" : "zxy", + "pattern" : "(ab|cd)" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "input_string" : null, + "pattern" : "(ab|cd)" + } + }, { + "topic" : "test_topic", + "key" : "", + "value" : { + "input_string" : "zxy", + "pattern" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "EXTRACTED" : [ "a", "", "a" ] + } + }, { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "EXTRACTED" : [ "a", "d", "a" ] + } + }, { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "EXTRACTED" : [ "zxy" ] + } + }, { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "EXTRACTED" : null + } + }, { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "EXTRACTED" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (K STRING KEY, input_string VARCHAR, pattern VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, REGEXP_SPLIT_TO_ARRAY(input_string, pattern) AS EXTRACTED FROM TEST;" ], + "post" : { + "topics" : { + "topics" : [ { + "name" : "OUTPUT", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "test_topic", + "keyFormat" : { + "formatInfo" : { + "format" : "KAFKA" + } + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/split_-_regexp_split_to_array/6.0.0_1591047957798/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/split.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/split.json index 4891ed7607ed..9376a3d1f9df 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/split.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/split.json @@ -84,6 +84,27 @@ {"topic": "OUTPUT", "key": "3", "value": {"S1": "A", "S2": "A"}, "timestamp": 0}, {"topic": "OUTPUT", "key": "4", "value": {"S1": "1", "S2": "3"}, "timestamp": 0} ] + }, + { + "name": "regexp_split_to_array", + "statements": [ + "CREATE STREAM TEST (K STRING KEY, input_string VARCHAR, pattern VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, REGEXP_SPLIT_TO_ARRAY(input_string, pattern) AS EXTRACTED FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"input_string": "aabcda", "pattern": "(ab|cd)"}}, + {"topic": "test_topic", "value": {"input_string": "aabdcda", "pattern": "(ab|cd)"}}, + {"topic": "test_topic", "value": {"input_string": "zxy", "pattern": "(ab|cd)"}}, + {"topic": "test_topic", "value": {"input_string": null, "pattern": "(ab|cd)"}}, + {"topic": "test_topic", "value": {"input_string": "zxy", "pattern": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "value": {"EXTRACTED": ["a", "", "a"]}}, + {"topic": "OUTPUT", "value": {"EXTRACTED": ["a", "d", "a"]}}, + {"topic": "OUTPUT", "value": {"EXTRACTED": ["zxy"]}}, + {"topic": "OUTPUT", "value": {"EXTRACTED": null}}, + {"topic": "OUTPUT", "value": {"EXTRACTED": null}} + ] } ] } \ No newline at end of file