Skip to content

Commit

Permalink
feat: modifications for ARRAY_JOIN as discussed in #5474
Browse files Browse the repository at this point in the history
as requested by reviewers:

- removed BigInteger from primitive types
- added Long and BigDecimal to supported primitive types
- dropped support for complex nested types
- switched to hamcrest matchers in unit tests
- added QueryTranslationTest for via PlannedTestGeneratorTest
- adapted docs accordingly
  • Loading branch information
hpgrahsl committed Jun 12, 2020
1 parent 86542b0 commit 2bb9136
Show file tree
Hide file tree
Showing 13 changed files with 815 additions and 225 deletions.
9 changes: 4 additions & 5 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,10 @@ include both endpoints.
ARRAY_JOIN(col1, delimiter)
```

Creates a flat String representation of all the elements contained in the given array.
The elements in the resulting String are separated by the chosen `delimiter`,
which is an optional parameter that falls back to a comma `,`. The array may contain
all valid ksql types and supports nested and complex data structures as well,
for instance, lists of lists or structs containing maps.
Creates a flat string representation of all the elements contained in the given array.
The elements in the resulting string are separated by the chosen `delimiter`,
which is an optional parameter that falls back to a comma `,`. The current implementation only
allows for array elements of primitive ksqlDB types.

## Strings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@

package io.confluent.ksql.function.udf.array;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.HashSet;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import org.apache.kafka.connect.data.Struct;

@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static.
@UdfDescription(
Expand All @@ -38,8 +35,8 @@
public class ArrayJoin {

private static final String DEFAULT_DELIMITER = ",";
private static final Set<Class> KSQL_PRIMITIVES = new HashSet<>(
Arrays.asList(Boolean.class,Integer.class,BigInteger.class,Double.class,String.class)
private static final Set<Class> KSQL_PRIMITIVES = ImmutableSet.of(
Boolean.class,Integer.class,Long.class,Double.class,BigDecimal.class,String.class
);

@Udf
Expand All @@ -62,7 +59,7 @@ public <T> String join(
return null;
}

final StringJoiner sj = new StringJoiner(delimiter);
final StringJoiner sj = new StringJoiner(delimiter == null ? "" : delimiter);
array.forEach(e -> processElement(e, sj));
return sj.toString();

Expand All @@ -73,12 +70,6 @@ private static <T> void processElement(final T element, final StringJoiner joine

if (element == null || KSQL_PRIMITIVES.contains(element.getClass())) {
handlePrimitiveType(element, joiner);
} else if (element instanceof List) {
handleListType((List)element,joiner);
} else if (element instanceof Map) {
handleMapType((Map) element, joiner);
} else if (element instanceof Struct) {
handleStructType((Struct)element, joiner);
} else {
throw new KsqlFunctionException("error: hit element of type "
+ element.getClass().getTypeName() + " which is currently not supported");
Expand All @@ -90,22 +81,4 @@ private static void handlePrimitiveType(final Object element, final StringJoiner
joiner.add(element != null ? element.toString() : null);
}

private static void handleListType(final List<?> element, final StringJoiner joiner) {
element.forEach(e -> processElement(e, joiner));
}

private static void handleMapType(final Map<String,?> element, final StringJoiner joiner) {
element.entrySet().forEach(e -> {
joiner.add(((Map.Entry) e).getKey().toString());
processElement(((Map.Entry) e).getValue(), joiner);
});
}

private static void handleStructType(final Struct element, final StringJoiner joiner) {
element.schema().fields().forEach(f -> {
joiner.add(f.name());
processElement(element.get(f), joiner);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,227 +15,100 @@

package io.confluent.ksql.function.udf.array;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import io.confluent.ksql.function.KsqlFunctionException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.BeforeClass;
import org.junit.Test;

public class ArrayJoinTest {

private static final String CUSTOM_DELIMITER = "|";
private static Struct STRUCT_DATA;

private final ArrayJoin arrayJoinUDF = new ArrayJoin();

@BeforeClass
public static void initializeComplexStructTypeSampleData() {

Schema structSchema = SchemaBuilder.struct()
.field("f1", Schema.STRING_SCHEMA)
.field("f2", Schema.INT32_SCHEMA)
.field("f3", Schema.BOOLEAN_SCHEMA)
.field("f4", SchemaBuilder.struct()
.field("f4-1", Schema.STRING_SCHEMA)
.build()
)
.field("f5", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("f6", SchemaBuilder.array(SchemaBuilder.struct()
.field("k", Schema.STRING_SCHEMA)
.field("v", Schema.INT32_SCHEMA)
.build())
)
.field("f7",
SchemaBuilder.array(SchemaBuilder.array(SchemaBuilder.array(Schema.INT32_SCHEMA))))
.field("f8", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
.field("f9", SchemaBuilder
.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.STRING_SCHEMA).build()).build())
.field("f10", SchemaBuilder
.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.INT32_SCHEMA).build()).build())
.build();

STRUCT_DATA = new Struct(structSchema)
.put("f1", "ksqldb UDF sample data")
.put("f2", 42)
.put("f3", true)
.put("f4", new Struct(structSchema.field("f4").schema())
.put("f4-1", "hello ksqldb")
)
.put("f5", Arrays.asList("str_1", "str_2", "...", "str_N"))
.put("f6", Arrays.asList(
new Struct(structSchema.field("f6").schema().valueSchema())
.put("k", "a").put("v", 1),
new Struct(structSchema.field("f6").schema().valueSchema())
.put("k", "b").put("v", 2),
new Struct(structSchema.field("f6").schema().valueSchema())
.put("k", "c").put("v", 3)
)
)
.put("f7", Arrays.asList(
Arrays.asList(Arrays.asList(0,1), Arrays.asList(2,3,4), Arrays.asList(5, 6),
Arrays.asList(7,8,9)),
Arrays.asList(Arrays.asList(9,8,7),Arrays.asList(6,5), Arrays.asList(4,3,2),
Arrays.asList(1,0)
)
))
.put("f8", new LinkedHashMap<String, Integer>() {{
put("k1", 6);
put("k2", 5);
put("k3", 4);
}})
.put("f9", new LinkedHashMap<String, List<String>>() {{
put("k1", Arrays.asList("v1-a", "v1-b"));
put("k2", Arrays.asList("v2-a","v2-b", "v2-c", "v2-d"));
put("k3", Arrays.asList("v3-a","v3-b","v3-c"));
}})
.put("f10", new LinkedHashMap<String, List<Integer>>() {{
put("k1", Arrays.asList(12, 21));
put("k2", Arrays.asList(23, 32));
put("k3", Arrays.asList(24, 42));
}});
}

@Test
public void shouldReturnNullForNullInput() {
assertNull(arrayJoinUDF.join(null));
assertNull(arrayJoinUDF.join(null,CUSTOM_DELIMITER));
assertThat(arrayJoinUDF.join(null), nullValue());
assertThat(arrayJoinUDF.join(null,CUSTOM_DELIMITER), nullValue());
}

@Test
public void shouldReturnEmptyStringForEmptyArrays() {
assertTrue(arrayJoinUDF.join(Collections.emptyList()).isEmpty());
assertTrue(arrayJoinUDF.join(Collections.emptyList(),CUSTOM_DELIMITER).isEmpty());
assertThat(arrayJoinUDF.join(Collections.emptyList()).isEmpty(),is(true));
assertThat(arrayJoinUDF.join(Collections.emptyList(),CUSTOM_DELIMITER).isEmpty(),is(true));
}

@Test
public void shouldReturnCorrectStringForFlatArraysWithPrimitiveTypes() {

assertEquals("true,null,false",
arrayJoinUDF.join(Arrays.asList(true, null, false)));
assertEquals("true"+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"false",
arrayJoinUDF.join(Arrays.asList(true,null,false),CUSTOM_DELIMITER));

assertEquals("1,23,-42,0",
arrayJoinUDF.join(Arrays.asList(1,23,-42,0)));
assertEquals("1"+CUSTOM_DELIMITER+"23"+CUSTOM_DELIMITER+"-42"+CUSTOM_DELIMITER+"0",
arrayJoinUDF.join(Arrays.asList(1,23,-42,0),CUSTOM_DELIMITER));

assertEquals("-4294967297,8589934592",
arrayJoinUDF.join(Arrays.asList(new BigInteger("-4294967297"),
new BigInteger("8589934592")))
assertThat(arrayJoinUDF.join(Arrays.asList(true, null, false),""),
is("truenullfalse")
);
assertEquals("-4294967297"+CUSTOM_DELIMITER+"8589934592",
arrayJoinUDF.join(Arrays.asList(
new BigInteger("-4294967297"), new BigInteger("8589934592")
), CUSTOM_DELIMITER)
assertThat(arrayJoinUDF.join(Arrays.asList(true, null, false)),
is("true,null,false")
);
assertThat(arrayJoinUDF.join(Arrays.asList(true,null,false),CUSTOM_DELIMITER),
is("true"+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"false")
);

assertEquals("1.23,-23.42,0.0",
arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0)));
assertEquals("1.23"+CUSTOM_DELIMITER+"-23.42"+CUSTOM_DELIMITER+"0.0",
arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0),CUSTOM_DELIMITER));

assertEquals("hello,from,,ksqldb,udf,null",
arrayJoinUDF.join(Arrays.asList("hello","from","","ksqldb","udf",null)));
assertEquals("hello"+CUSTOM_DELIMITER+"from"+CUSTOM_DELIMITER+CUSTOM_DELIMITER
+"ksqldb"+CUSTOM_DELIMITER+"udf"+CUSTOM_DELIMITER+"null",
arrayJoinUDF.join(Arrays.asList("hello","from","","ksqldb","udf",null),CUSTOM_DELIMITER));

}

@Test
public void shouldReturnCorrectStringForNestedArraysWithPrimitiveTypes() {

assertEquals("true,false,null,null,true",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(true,false),null,Arrays.asList(null,true))));
assertEquals("true"+CUSTOM_DELIMITER+"false"+CUSTOM_DELIMITER+"null"
+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"true",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(true,false), null, Arrays.asList(null,true)),
CUSTOM_DELIMITER));

assertEquals("0,0,7,null,100,-10",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(0,0,7),null,Arrays.asList(100,-10))));
assertEquals("0"+CUSTOM_DELIMITER+"0"+CUSTOM_DELIMITER+"7"
+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"100"+CUSTOM_DELIMITER+"-10",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(0,0,7),null,Arrays.asList(100,-10)),
CUSTOM_DELIMITER));

assertEquals("-4294967297,0,8589934592,null,1",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(new BigInteger("-4294967297"),
BigInteger.ZERO,new BigInteger("8589934592")),Arrays.asList(null,BigInteger.ONE))));
assertEquals("-4294967297"+CUSTOM_DELIMITER+"0"+CUSTOM_DELIMITER+"8589934592"
+CUSTOM_DELIMITER+"null"+CUSTOM_DELIMITER+"1",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(new BigInteger("-4294967297"),
BigInteger.ZERO,new BigInteger("8589934592")),Arrays.asList(null,BigInteger.ONE)),
CUSTOM_DELIMITER));

assertEquals("1.23,-23.42,0.0,1.0",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(1.23,-23.42),Arrays.asList(0.0,1.0))));
assertEquals("1.23"+CUSTOM_DELIMITER+"-23.42"+CUSTOM_DELIMITER+"0.0"+CUSTOM_DELIMITER+"1.0",
arrayJoinUDF.join(Arrays.asList(Arrays.asList(1.23,-23.42),Arrays.asList(0.0,1.0)),
CUSTOM_DELIMITER));

assertEquals("hello,from,,ksqldb,udf,null",
arrayJoinUDF.join(Arrays.asList(Arrays.asList("hello","from"),
Arrays.asList("","ksqldb", "udf",null))));
assertEquals("hello"+CUSTOM_DELIMITER+"from"+CUSTOM_DELIMITER+CUSTOM_DELIMITER
+"ksqldb"+CUSTOM_DELIMITER+"udf"+CUSTOM_DELIMITER+"null",
arrayJoinUDF.join(Arrays.asList(Arrays.asList("hello","from"),
Arrays.asList("","ksqldb", "udf",null)),CUSTOM_DELIMITER));

}

@Test
public void shouldReturnCorrectStringForNestedComplexTypes() {

Map<String,Integer> mapData1 = new LinkedHashMap<>();
mapData1.put("k1",12);
mapData1.put("k2",34);
mapData1.put("k3",0);

Map<String,Integer> mapData2 = new LinkedHashMap<>();
mapData2.put("k4",null);
mapData2.put("k5",-100);

assertEquals("k1,12,k2,34,k3,0,k4,null,k5,-100",
arrayJoinUDF.join(Arrays.asList(mapData1,mapData2)));

Map<String,List<String>> mapData3 = new LinkedHashMap<>();
mapData3.put("k1",Arrays.asList("hello","from"));
mapData3.put("k2",Arrays.asList("ksqldb",""));

Map<String,List<String>> mapData4 = new LinkedHashMap<>();
mapData4.put("k3",Arrays.asList(null,"",""));
assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0),null), is("123-420"));
assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0)), is("1,23,-42,0"));
assertThat(arrayJoinUDF.join(Arrays.asList(1,23,-42,0),CUSTOM_DELIMITER),
is("1"+CUSTOM_DELIMITER+"23"+CUSTOM_DELIMITER+"-42"+CUSTOM_DELIMITER+"0")
);

assertEquals("k1,hello,from,k2,ksqldb,,k3,null,,",
arrayJoinUDF.join(Arrays.asList(mapData3,mapData4)));
assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L),""),
is("-42949672978589934592")
);
assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L)),
is("-4294967297,8589934592")
);
assertThat(arrayJoinUDF.join(Arrays.asList(-4294967297L, 8589934592L), CUSTOM_DELIMITER),
is("-4294967297"+CUSTOM_DELIMITER+"8589934592")
);

Map<String,Map<String,Integer>> mapData5 = new LinkedHashMap<>();
mapData5.put("k1",mapData1);
mapData5.put("k2",mapData2);
assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0),null),
is("1.23-23.420.0")
);
assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0)),
is("1.23,-23.42,0.0")
);
assertThat(arrayJoinUDF.join(Arrays.asList(1.23,-23.42,0.0),CUSTOM_DELIMITER),
is("1.23"+CUSTOM_DELIMITER+"-23.42"+CUSTOM_DELIMITER+"0.0")
);

assertEquals("k1,k1,12,k2,34,k3,0,k2,k4,null,k5,-100",
arrayJoinUDF.join(Collections.singletonList(mapData5)));
assertThat(arrayJoinUDF.join(
Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65")),null
),
is("123.45987.65")
);
assertThat(arrayJoinUDF.join(Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65"))),
is("123.45,987.65")
);
assertThat(arrayJoinUDF.join(
Arrays.asList(new BigDecimal("123.45"), new BigDecimal("987.65")),CUSTOM_DELIMITER),
is("123.45"+CUSTOM_DELIMITER+"987.65")
);

assertEquals("f1,ksqldb UDF sample data,f2,42,f3,true,f4,f4-1,hello ksqldb," +
"f5,str_1,str_2,...,str_N,f6,k,a,v,1,k,b,v,2,k,c,v,3,f7,0,1,2,3,4,5,6,7,8,9," +
"9,8,7,6,5,4,3,2,1,0,f8,k1,6,k2,5,k3,4,f9,k1,v1-a,v1-b,k2,v2-a,v2-b,v2-c,v2-d," +
"k3,v3-a,v3-b,v3-c,f10,k1,12,21,k2,23,32,k3,24,42",
arrayJoinUDF.join(Collections.singletonList(STRUCT_DATA)));
assertThat(arrayJoinUDF.join(Arrays.asList("Hello","From","","Ksqldb","Udf"),""),
is("HelloFromKsqldbUdf")
);
assertThat(arrayJoinUDF.join(Arrays.asList("Hello","From","","Ksqldb","Udf")),
is("Hello,From,,Ksqldb,Udf")
);
assertThat(
arrayJoinUDF.join(Arrays.asList("hello","from","","ksqldb","udf",null),CUSTOM_DELIMITER),
is("hello"+CUSTOM_DELIMITER+"from"+CUSTOM_DELIMITER+CUSTOM_DELIMITER
+"ksqldb"+CUSTOM_DELIMITER+"udf"+CUSTOM_DELIMITER+"null")
);

}

Expand All @@ -244,7 +117,7 @@ public void shouldThrowExceptionForExamplesOfUnsupportedElementTypes() {
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList('a','b')));
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList(BigDecimal.ZERO,BigDecimal.ONE)));
() -> arrayJoinUDF.join(Arrays.asList(BigInteger.ONE,BigInteger.ZERO)));
assertThrows(KsqlFunctionException.class,
() -> arrayJoinUDF.join(Arrays.asList(-23.0f,42.42f,0.0f)));
assertThrows(KsqlFunctionException.class,
Expand Down
Loading

0 comments on commit 2bb9136

Please sign in to comment.