Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][connector-elasticsearch] support elasticsearch nest type && spark with Array<map> #8492

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> {
private final Class<T> arrayClass;
private final SeaTunnelDataType<E> elementType;

protected ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
public ArrayType(Class<T> arrayClass, SeaTunnelDataType<E> elementType) {
this.arrayClass = arrayClass;
this.elementType = elementType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
if (elementType instanceof DecimalType) {
return ((Object[]) v).length * 36;
}

if (elementType instanceof LocalTimeType) {
SqlType eleSqlType = elementType.getSqlType();
switch (eleSqlType) {
Expand Down Expand Up @@ -232,6 +231,8 @@ private int getBytesForArray(Object v, SeaTunnelDataType<?> dataType) {
return getArrayNotNullSize((Long[]) v) * 8;
case DOUBLE:
return getArrayNotNullSize((Double[]) v) * 8;
case MAP:
return getArrayMapNotNullSize(v);
case NULL:
default:
return 0;
Expand All @@ -248,6 +249,19 @@ private int getArrayNotNullSize(Object[] values) {
return c;
}

private int getArrayMapNotNullSize(Object v) {
int size = 0;
if (Objects.nonNull(v)) {
for (Map o : (Map[]) v) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) o).entrySet()) {
size += getBytesForValue(entry.getKey()) + getBytesForValue(entry.getValue());
}
}
}

return size;
}

public int getBytesSize() {
if (size == 0) {
int s = 0;
Expand Down Expand Up @@ -305,6 +319,9 @@ private int getBytesForValue(Object v) {
return getBytesForArray(v, BasicType.FLOAT_TYPE);
case "Double[]":
return getBytesForArray(v, BasicType.DOUBLE_TYPE);
case "Map[]":
return getBytesForArray(
v, new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE));
case "HashMap":
case "LinkedHashMap":
int size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.api.table.type;

import org.apache.seatunnel.shade.com.google.common.collect.Maps;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -43,6 +45,9 @@ void testForRowSize() {
new Object[] {
1, "test", 1L, new BigDecimal("3333.333"),
}));

Map<String, Object> objectMap = Maps.newHashMap();
objectMap.put("name", "cosmos");
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
Expand All @@ -58,7 +63,8 @@ void testForRowSize() {
new Float[] {1F, 2F},
new Boolean[] {Boolean.TRUE, Boolean.FALSE},
new Byte[] {1, 2, 3, 4},
new Short[] {Short.parseShort("1")}
new Short[] {Short.parseShort("1")},
new Map[] {objectMap}
});

SeaTunnelRow row2 =
Expand All @@ -76,14 +82,15 @@ void testForRowSize() {
new Float[] {1F, 2F, null},
new Boolean[] {Boolean.TRUE, Boolean.FALSE, null},
new Byte[] {1, 2, 3, 4, null},
new Short[] {Short.parseShort("1"), null}
new Short[] {Short.parseShort("1"), null},
new Map[] {objectMap}
});

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {
"f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10",
"f11", "f12"
"f11", "f12", "f13"
},
new SeaTunnelDataType<?>[] {
BasicType.INT_TYPE,
Expand All @@ -107,14 +114,17 @@ void testForRowSize() {
ArrayType.FLOAT_ARRAY_TYPE,
ArrayType.BOOLEAN_ARRAY_TYPE,
ArrayType.BYTE_ARRAY_TYPE,
ArrayType.SHORT_ARRAY_TYPE
ArrayType.SHORT_ARRAY_TYPE,
new ArrayType<>(
Map[].class,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE))
});

Assertions.assertEquals(249, row.getBytesSize(rowType));
Assertions.assertEquals(249, row.getBytesSize());
Assertions.assertEquals(259, row.getBytesSize(rowType));
Assertions.assertEquals(259, row.getBytesSize());

Assertions.assertEquals(249, row2.getBytesSize(rowType));
Assertions.assertEquals(249, row2.getBytesSize());
Assertions.assertEquals(259, row2.getBytesSize(rowType));
Assertions.assertEquals(259, row2.getBytesSize());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,13 @@ public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOEx
}
}
}

public static boolean isJsonArray(String jsonString) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonString);
return jsonNode.isArray();
} catch (Exception e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.LONG_RANGE;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.MATCH_ONLY_TEXT;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.NESTED;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.PERCOLATOR;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.POINT;
Expand Down Expand Up @@ -150,6 +149,12 @@ public Column convert(BasicTypeDefine<EsType> typeDefine) {
});
builder.dataType(rowType);
break;
case EsType.NESTED:
builder.dataType(
new ArrayType<>(
Map[].class,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)));
break;
case INTEGER:
case TOKEN_COUNT:
builder.dataType(BasicType.INT_TYPE);
Expand Down Expand Up @@ -207,7 +212,6 @@ public Column convert(BasicTypeDefine<EsType> typeDefine) {
case COMPLETION:
case STRING:
case GEO_SHAPE:
case NESTED:
case PERCOLATOR:
case POINT:
case RANK_FEATURES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -177,7 +178,12 @@ Object convertValue(SeaTunnelDataType<?> fieldType, String fieldValue)
} else if (fieldType instanceof ArrayType) {
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
SeaTunnelDataType<?> elementType = arrayType.getElementType();
List<String> stringList = JsonUtils.toList(fieldValue, String.class);
List<String> stringList = new ArrayList<>();
if (elementType instanceof MapType && !JsonUtils.isJsonArray(fieldValue)) {
stringList.add(fieldValue);
} else {
stringList = JsonUtils.toList(fieldValue, String.class);
}
Object arr = Array.newInstance(elementType.getTypeClass(), stringList.size());
for (int i = 0; i < stringList.size(); i++) {
Object convertValue = convertValue(elementType, stringList.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void startUp() throws Exception {
createIndexDocs();
createIndexWithFullType();
createIndexForResourceNull("st_index4");
createIndexWithNestType();
}

/** create a index,and bulk some documents */
Expand All @@ -156,6 +157,31 @@ private void createIndexDocsByName(String indexName, List<String> testDataSet) {
esRestClient.bulk(requestBody.toString());
}

private void createIndexWithNestType() throws IOException, InterruptedException {
String mapping =
IOUtils.toString(
ContainerUtil.getResourcesFile("/elasticsearch/st_index_nest_mapping.json")
.toURI(),
StandardCharsets.UTF_8);
esRestClient.createIndex("st_index_nest", mapping);
esRestClient.createIndex("st_index_nest_copy", mapping);
BulkResponse response =
esRestClient.bulk(
"{ \"index\" : { \"_index\" : \"st_index_nest\", \"_id\" : \"1\" } }\n"
+ IOUtils.toString(
ContainerUtil.getResourcesFile(
"/elasticsearch/st_index_nest_data.json")
.toURI(),
StandardCharsets.UTF_8)
.replace("\n", "")
+ "\n");
Assertions.assertFalse(response.isErrors(), response.getResponse());
// waiting index refresh
Thread.sleep(INDEX_REFRESH_MILL_DELAY);
Assertions.assertEquals(
3, esRestClient.getIndexDocsCount("st_index_nest").get(0).getDocsCount());
}

private void createIndexWithFullType() throws IOException, InterruptedException {
String mapping =
IOUtils.toString(
Expand Down Expand Up @@ -202,6 +228,21 @@ public void testElasticsearchWithSchema(TestContainer container)
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

@TestTemplate
public void testElasticsearchWithNestSchema(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf");
Assertions.assertEquals(0, execResult.getExitCode());

List<String> sinkData = readSinkDataWithNestSchema("st_index_nest_copy");
String data =
"{\"address\":[{\"zipcode\":\"10001\",\"city\":\"New York\",\"street\":\"123 Main St\"},"
+ "{\"zipcode\":\"90001\",\"city\":\"Los Angeles\",\"street\":\"456 Elm St\"}],\"name\":\"John Doe\"}";

Assertions.assertIterableEquals(Lists.newArrayList(data), sinkData);
}

@TestTemplate
public void testElasticsSearchWithMultiSourceByFilter(TestContainer container)
throws InterruptedException, IOException {
Expand Down Expand Up @@ -546,6 +587,13 @@ private List<String> readSinkDataWithSchema(String index) throws InterruptedExce
return getDocsWithTransformTimestamp(source, index);
}

private List<String> readSinkDataWithNestSchema(String index) throws InterruptedException {
// wait for index refresh
Thread.sleep(INDEX_REFRESH_MILL_DELAY);
List<String> source = Lists.newArrayList("name", "address");
return getDocsWithNestType(source, index);
}

private List<String> readMultiSinkData(String index, List<String> source)
throws InterruptedException {
// wait for index refresh
Expand Down Expand Up @@ -604,6 +652,25 @@ private List<String> getDocsWithTransformTimestamp(List<String> source, String i
return docs;
}

private List<String> getDocsWithNestType(List<String> source, String index) {
Map<String, Object> query = new HashMap<>();
query.put("match_all", new HashMap<>());
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
});
List<String> docs =
scrollResult.getDocs().stream()
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
return docs;
}

private List<String> getDocsWithTransformDate(List<String> source, String index) {
return getDocsWithTransformDate(source, index, Collections.emptyList());
}
Expand Down Expand Up @@ -739,6 +806,13 @@ private List<String> mapTestDatasetForDSL(List<String> testDataset) {
.collect(Collectors.toList());
}

private List<String> mapTestDatasetForNest(List<String> testDataset) {
return testDataset.stream()
.map(JsonUtils::parseObject)
.map(JsonNode::toString)
.collect(Collectors.toList());
}

/**
* Use custom filtering criteria to query data
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
#checkpoint.interval = 10000
}

source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
index = "st_index_nest"
source = ["address","name"]
query = {"match_all": {}}
tls_verify_certificate = false
tls_verify_hostname = false
}
}

transform {
}

sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
index = "st_index_nest_copy"
tls_verify_certificate = false
tls_verify_hostname = false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "John Doe",
"address": [
{
"street": "123 Main St",
"city": "New York",
"zipcode": "10001"
},
{
"street": "456 Elm St",
"city": "Los Angeles",
"zipcode": "90001"
}
]
}
Loading
Loading