diff --git a/docs/en/connector-v2/source/Elasticsearch.md b/docs/en/connector-v2/source/Elasticsearch.md index 9fa1faa5d04..932f2c5a823 100644 --- a/docs/en/connector-v2/source/Elasticsearch.md +++ b/docs/en/connector-v2/source/Elasticsearch.md @@ -19,24 +19,27 @@ support version >= 2.x and <= 8.x. ## Options -| name | type | required | default value | -|-------------------------|---------|----------|-------------------| -| hosts | array | yes | - | -| username | string | no | - | -| password | string | no | - | -| index | string | yes | - | -| source | array | no | - | -| query | json | no | {"match_all": {}} | -| scroll_time | string | no | 1m | -| scroll_size | int | no | 100 | -| tls_verify_certificate | boolean | no | true | -| tls_verify_hostnames | boolean | no | true | -| array_column | map | no | | -| tls_keystore_path | string | no | - | -| tls_keystore_password | string | no | - | -| tls_truststore_path | string | no | - | -| tls_truststore_password | string | no | - | -| common-options | | no | - | +| name | type | required | default value | +| ----------------------- | ------- | -------- | ------------------------------------------------------------ | +| hosts | array | yes | - | +| username | string | no | - | +| password | string | no | - | +| index | string | no | If the index list does not exist, the index must be configured | +| index_list | array | no | used to define a multiple table task | +| source | array | no | - | +| query | json | no | {"match_all": {}} | +| scroll_time | string | no | 1m | +| scroll_size | int | no | 100 | +| tls_verify_certificate | boolean | no | true | +| tls_verify_hostnames | boolean | no | true | +| array_column | map | no | | +| tls_keystore_path | string | no | - | +| tls_keystore_password | string | no | - | +| tls_truststore_path | string | no | - | +| tls_truststore_password | string | no | - | +| common-options | | no | - | + + ### hosts [array] @@ -78,6 +81,10 @@ Amount of time Elasticsearch will keep the search context alive for scroll reque Maximum number of hits to be returned with each Elasticsearch scroll request. +### index_list [array] + +The `index_list` is used to define multi-index synchronization tasks. It is an array that contains the parameters required for single-table synchronization, such as `query`, `source/schema`, `scroll_size`, and `scroll_time`. It is recommended that `index_list` and `query` should not be configured at the same level simultaneously. Please refer to the upcoming multi-table synchronization example for more details. + ### tls_verify_certificate [boolean] Enable certificates validation for HTTPS endpoints @@ -108,46 +115,94 @@ Source plugin common parameters, please refer to [Source Common Options](../sour ## Examples -simple +Demo 1 + +> This case will read data from indices matching the seatunnel-* pattern based on a query. The query will only return documents containing the id, name, age, tags, and phones fields. In this example, the source field configuration is used to specify which fields should be read, and the array_column is used to indicate that tags and phones should be treated as arrays. ```hocon Elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-*" - source = ["_id","name","age"] + array_column = {tags = "array",phones = "array"} + source = ["_id","name","age","tags","phones"] query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}} } ``` -complex +Demo 2 : Multi-table synchronization + +> This example demonstrates how to read different data from ``read_index1`` and ``read_index2`` and write separately to ``read_index1_copy``,``read_index2_copy``. +> in `read_index1`,I used source to specify the fields to be read and specify which fields are array fields using the 'array_column'. ```hocon -Elasticsearch { - hosts = ["elasticsearch:9200"] - index = "st_index" - schema = { - fields { - c_map = "map" - c_array = "array" - c_string = string - c_boolean = boolean - c_tinyint = tinyint - c_smallint = smallint - c_int = int - c_bigint = bigint - c_float = float - c_double = double - c_decimal = "decimal(2, 1)" - c_bytes = bytes - c_date = date - c_timestamp = timestamp - } - } - query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}} +source { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + index_list = [ + { + index = "read_index1" + query = {"range": {"c_int": {"gte": 10, "lte": 20}}} + source = [ + c_map, + c_array, + c_string, + c_boolean, + c_tinyint, + c_smallint, + c_bigint, + c_float, + c_double, + c_decimal, + c_bytes, + c_int, + c_date, + c_timestamp] + array_column = { + c_array = "array" + } + } + { + index = "read_index2" + query = {"match_all": {}} + source = [ + c_int2, + c_date2, + c_null + ] + + } + + ] + + } +} + +transform { +} + +sink { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + + index = "${table_name}_copy" + index_type = "st" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } } ``` -SSL (Disable certificates validation) + + +Demo 3 : SSL (Disable certificates validation) ```hocon source { @@ -161,7 +216,7 @@ source { } ``` -SSL (Disable hostname validation) +Demo 4 :SSL (Disable hostname validation) ```hocon source { @@ -175,7 +230,7 @@ source { } ``` -SSL (Enable certificates validation) +Demo 5 :SSL (Enable certificates validation) ```hocon source { @@ -196,5 +251,4 @@ source { - Add Elasticsearch Source Connector - [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/seatunnel/pull/3997)) -- [Feature] Support DSL - +- [Feature] Support DSL \ No newline at end of file diff --git a/docs/zh/connector-v2/source/Elasticsearch.md b/docs/zh/connector-v2/source/Elasticsearch.md new file mode 100644 index 00000000000..7a27f2b9371 --- /dev/null +++ b/docs/zh/connector-v2/source/Elasticsearch.md @@ -0,0 +1,247 @@ +# Elasticsearch + +> Elasticsearch source 连接器 + +## 简介 + +支持读取 Elasticsearch2.x 版本和 8.x 版本之间的数据 + +## Key features + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [ ] [精准一次](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [ ] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户自定义的分片](../../concept/connector-v2-features.md) + +## 配置参数选项 + +| 参数名称 | 类型 | 是否必须 | 默认值或者描述 | +| ----------------------- | ------- | -------- | ------------------------------------------------------- | +| hosts | 数组 | | - | +| username | string | no | - | +| password | string | no | - | +| index | string | No | 单索引同步配置,如果index_list没有配置,则必须配置index | +| index_list | array | no | 用来定义多索引同步任务 | +| source | array | no | - | +| query | json | no | {"match_all": {}} | +| scroll_time | string | no | 1m | +| scroll_size | int | no | 100 | +| tls_verify_certificate | boolean | no | true | +| tls_verify_hostnames | boolean | no | true | +| array_column | map | no | | +| tls_keystore_path | string | no | - | +| tls_keystore_password | string | no | - | +| tls_truststore_path | string | no | - | +| tls_truststore_password | string | no | - | +| common-options | | no | - | + +### hosts [array] + +Elasticsearch 集群的 HTTP 地址,格式为 `host:port`,允许指定多个主机。例如:`["host1:9200", "host2:9200"]`。 + +### username [string] + +用户名 + +### password [string] + +密码 + +### index [string] + +Elasticsearch 索引名称,支持 * 模糊匹配。比如存在索引index1,index2,可以指定index*同时读取两个索引的数据。 + +### source [array] + +索引的字段 + +你可以通过指定字段 `_id` 来获取文档 ID。如果将 `_id` 写入到其他索引,由于 Elasticsearch 的限制,你需要为 `_id` 指定一个别名。 + +如果你没有配置 `source`,它将自动从索引的映射中获取。 + +### array_column [array] + +由于 Elasticsearch 中没有数组索引,因此需要指定数组类型。 + +假设tags和phones是数组类型: + +```hocon +array_column = {tags = "array",phones = "array"} +``` + +### query [json] + +ElasticsSearch的原生查询语句,用于控制读取哪些数据写入到其他数据源。 + +### scroll_time [String] + +`Seatunnel`底层会使用滚动查询来查询数据,所以需要使用这个参数控制搜索上下文的时间长度。 + +### scroll_size [int] + +滚动查询的最大文档数量。 + +### index_list [array] + +`index_list` 用于定义多索引同步任务。它是一个数组,包含单表同步所需的参数,如 `query`、`source/schema`、`scroll_size` 和 `scroll_time`。建议不要将 `index_list` 和 `query` 配置在同一层级。有关更多详细信息,请参考后面的多表同步示例。 + +### tls_verify_certificate [boolean] + +启用 HTTPS 端点的证书验证 + +### tls_verify_hostname [boolean] + +启用 HTTPS 端点的主机名验证 + +### tls_keystore_path [string] + +PEM 或 JKS 密钥库的路径。该文件必须对运行 SeaTunnel 的操作系统用户可读。 + +### tls_keystore_password [string] + +指定密钥库的密钥密码。 + +### tls_truststore_path [string] + +PEM 或 JKS 信任库的路径。该文件必须对运行 SeaTunnel 的操作系统用户可读。 + +### tls_truststore_password [string] + +指定信任库的密钥密码。 + +### common options + +Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md) + +## 使用案例 + +案例一 + +> 案例一会从满足seatunnel-*匹配的索引中按照query读取数据,查询只会返回文档`id`,`name`,`age`,`tags`,`phones` 三个字段。在这个例子中,使用了source字段配置应该读取哪些字段,使用`array_column`指定了`tags`,`phones`应该被当做数组处理。 + +```hocon +Elasticsearch { + hosts = ["localhost:9200"] + index = "seatunnel-*" + array_column = {tags = "array",phones = "array"} + source = ["_id","name","age","tags","phones"] + query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}} +} +``` + +案例二:多索引同步 + +> 此示例演示了如何从 `read_index1` 和 `read_index2` 中读取不同的数据数据,并将其分别写入 `read_index1_copy`,`read_index12_copy` 索引。 +> 在 `read_index1` 中,我使用 `source` 来指定要读取的字段,并使用`array_column`指明哪些字段是数组字段。 + +```hocon +source { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + index_list = [ + { + index = "read_index1" + query = {"range": {"c_int": {"gte": 10, "lte": 20}}} + source = [ + c_map, + c_array, + c_string, + c_boolean, + c_tinyint, + c_smallint, + c_bigint, + c_float, + c_double, + c_decimal, + c_bytes, + c_int, + c_date, + c_timestamp + ] + array_column = { + c_array = "array" + } + } + { + index = "read_index2" + query = {"match_all": {}} + source = [ + c_int2, + c_date2, + c_null + ] + + } + + ] + + } +} + +transform { +} + +sink { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + + index = "multi_source_write_test_index" + index_type = "st" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +} +``` + +案例三:SSL(禁用证书验证) + +```hocon +source { + Elasticsearch { + hosts = ["https://localhost:9200"] + username = "elastic" + password = "elasticsearch" + + tls_verify_certificate = false + } +} +``` + +案例四:SSL(禁用主机名验证) + +```hocon +source { + Elasticsearch { + hosts = ["https://localhost:9200"] + username = "elastic" + password = "elasticsearch" + + tls_verify_hostname = false + } +} +``` + +案例五:SSL(启用证书验证) + +```hocon +source { + Elasticsearch { + hosts = ["https://localhost:9200"] + username = "elastic" + password = "elasticsearch" + + tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12" + tls_keystore_password = "${your password}" + } +} +``` \ No newline at end of file diff --git a/release-note.md b/release-note.md index 53f5e10cc65..4ed0d51fed9 100644 --- a/release-note.md +++ b/release-note.md @@ -58,6 +58,7 @@ - [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997) - [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085) - [Connector-V2] [Clickhouse] Add a new optional configuration `clickhouse.config` to the source connector of ClickHouse (#7143) +- [Connector-V2] [ElasticsSource] Source support multiSource (#6730) ### Zeta(ST-Engine) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java index bbf594eb10b..32a86dd75d6 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java @@ -110,8 +110,7 @@ public String getDefaultDatabase() throws CatalogException { public boolean databaseExists(String databaseName) throws CatalogException { // check if the index exist try { - List indexDocsCount = esRestClient.getIndexDocsCount(databaseName); - return true; + return esRestClient.checkIndexExist(databaseName); } catch (Exception e) { log.error( String.format( diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index f80f20f6736..b54541bf93b 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -58,6 +58,7 @@ import javax.net.ssl.SSLContext; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -76,7 +77,7 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType.OBJECT; @Slf4j -public class EsRestClient { +public class EsRestClient implements Closeable { private static final int CONNECTION_REQUEST_TIMEOUT = 10 * 1000; @@ -258,6 +259,7 @@ public ElasticsearchClusterInfo getClusterInfo() { } } + @Override public void close() { try { restClient.close(); @@ -370,6 +372,30 @@ private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) { return scrollResult; } + /** + * Instead of the getIndexDocsCount method to determine if the index exists, + * + *

+ * + *

getIndexDocsCount throws an exception if the index does not exist + * + *

+ * + * @param index index + * @return true or false + */ + public boolean checkIndexExist(String index) { + Request request = new Request("HEAD", "/" + index); + try { + Response response = restClient.performRequest(request); + int statusCode = response.getStatusLine().getStatusCode(); + return statusCode == 200; + } catch (Exception ex) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CHECK_INDEX_FAILED, ex); + } + } + public List getIndexDocsCount(String index) { String endpoint = String.format("/_cat/indices/%s?h=index,docsCount&format=json", index); Request request = new Request("GET", endpoint); diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java index c63cd375952..ffeb69d67f2 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java @@ -17,15 +17,31 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.table.catalog.CatalogTable; + +import lombok.Getter; +import lombok.Setter; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -public class SourceConfig { +@Getter +@Setter +public class SourceConfig implements Serializable { + + public static final Option>> INDEX_LIST = + Options.key("index_list") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("index_list for multiTable sync"); public static final Option INDEX = Options.key("index") @@ -61,11 +77,30 @@ public class SourceConfig { .withDescription( "Maximum number of hits to be returned with each Elasticsearch scroll request"); - public static final Option QUERY = + public static final Option> QUERY = Options.key("query") - .objectType(Map.class) + .type(new TypeReference>() {}) .defaultValue( Collections.singletonMap("match_all", new HashMap())) .withDescription( "Elasticsearch query language. You can control the range of data read"); + + private String index; + private List source; + private Map query; + private String scrollTime; + private int scrollSize; + + private CatalogTable catalogTable; + + public SourceConfig clone() { + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setIndex(index); + sourceConfig.setSource(new ArrayList<>(source)); + sourceConfig.setQuery(new HashMap<>(query)); + sourceConfig.setScrollTime(scrollTime); + sourceConfig.setScrollSize(scrollSize); + sourceConfig.setCatalogTable(catalogTable); + return sourceConfig; + } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java deleted file mode 100644 index 6c0a5667da7..00000000000 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/SourceIndexInfo.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source; - -import lombok.AllArgsConstructor; -import lombok.Data; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -@Data -@AllArgsConstructor -public class SourceIndexInfo implements Serializable { - private String index; - private List source; - private Map query; - private String scrollTime; - private int scrollSize; -} diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java index fe182868d4d..8ffbb7f4b9d 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java @@ -29,7 +29,11 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode { DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"), CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"), ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"), - CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed"); + CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed"), + CHECK_INDEX_FAILED("ELASTICSEARCH-10", "Failed to check whether the index exists"), + SOURCE_CONFIG_ERROR_01( + "ELASTICSEARCH-11", + "'index' or 'index_list' must be configured, with at least one being required."); ; private final String code; diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java index a409a4ae886..fd176f2f034 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java @@ -137,7 +137,9 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) { fieldName, value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex); } - return new SeaTunnelRow(seaTunnelFields); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(seaTunnelFields); + seaTunnelRow.setTableId(rowRecord.getTableId()); + return seaTunnelRow; } Object convertValue(SeaTunnelDataType fieldType, String fieldValue) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java index 3e5eb10b582..57c9dcb084f 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/ElasticsearchRecord.java @@ -30,4 +30,6 @@ public class ElasticsearchRecord { private Map doc; private List source; + + private String tableId; } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java index 7b153f0be3a..a22ca179569 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java @@ -40,6 +40,8 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; import org.apache.commons.collections4.CollectionUtils; @@ -50,6 +52,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Slf4j public class ElasticsearchSource @@ -58,30 +61,60 @@ public class ElasticsearchSource SupportParallelism, SupportColumnProjection { - private final ReadonlyConfig config; + private final List sourceConfigList; + private final ReadonlyConfig connectionConfig; - private CatalogTable catalogTable; + public ElasticsearchSource(ReadonlyConfig config) { + this.connectionConfig = config; + boolean multiSource = config.getOptional(SourceConfig.INDEX_LIST).isPresent(); + boolean singleSource = config.getOptional(SourceConfig.INDEX).isPresent(); + if (multiSource && singleSource) { + log.warn( + "Elasticsearch Source config warn: when both 'index' and 'index_list' are present in the configuration, only the 'index_list' configuration will take effect"); + } + if (!multiSource && !singleSource) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01, + ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription()); + } + if (multiSource) { + this.sourceConfigList = createMultiSource(config); + } else { + this.sourceConfigList = Collections.singletonList(parseOneIndexQueryConfig(config)); + } + } + + private List createMultiSource(ReadonlyConfig config) { + List> configMaps = config.get(SourceConfig.INDEX_LIST); + List configList = + configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList()); + List sourceConfigList = new ArrayList<>(configList.size()); + for (ReadonlyConfig readonlyConfig : configList) { + SourceConfig sourceConfig = parseOneIndexQueryConfig(readonlyConfig); + sourceConfigList.add(sourceConfig); + } + return sourceConfigList; + } - private List source; + private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConfig) { - private Map arrayColumn; + Map query = readonlyConfig.get(SourceConfig.QUERY); + String index = readonlyConfig.get(SourceConfig.INDEX); - public ElasticsearchSource(ReadonlyConfig config) { - this.config = config; - if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { + CatalogTable catalogTable; + List source; + Map arrayColumn; + + if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { // todo: We need to remove the schema in ES. log.warn( - "The schema config in ElasticSearch sink is deprecated, please use source config instead!"); - catalogTable = CatalogTableUtil.buildWithConfig(config); + "The schema config in ElasticSearch source/sink is deprecated, please use source config instead!"); + catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig); source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames()); } else { - source = config.get(SourceConfig.SOURCE); - arrayColumn = config.get(SourceConfig.ARRAY_COLUMN); - EsRestClient esRestClient = EsRestClient.createInstance(config); - Map> esFieldType = - esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source); - esRestClient.close(); - + source = readonlyConfig.get(SourceConfig.SOURCE); + arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN); + Map> esFieldType = getFieldTypeMapping(index, source); if (CollectionUtils.isEmpty(source)) { source = new ArrayList<>(esFieldType.keySet()); } @@ -90,26 +123,48 @@ public ElasticsearchSource(ReadonlyConfig config) { for (int i = 0; i < source.size(); i++) { String key = source.get(i); + String sourceType = esFieldType.get(key).getDataType(); if (arrayColumn.containsKey(key)) { String value = arrayColumn.get(key); SeaTunnelDataType dataType = SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value); - builder.column(PhysicalColumn.of(key, dataType, 0, true, null, null)); + builder.column( + PhysicalColumn.of( + key, dataType, 0L, true, null, null, sourceType, null)); continue; } builder.column( - PhysicalColumn.of(source.get(i), fieldTypes[i], 0, true, null, null)); + PhysicalColumn.of( + source.get(i), + fieldTypes[i], + 0L, + true, + null, + null, + sourceType, + null)); } catalogTable = CatalogTable.of( - TableIdentifier.of( - "elasticsearch", null, config.get(SourceConfig.INDEX)), + TableIdentifier.of("elasticsearch", null, index), builder.build(), Collections.emptyMap(), Collections.emptyList(), ""); } + + String scrollTime = readonlyConfig.get(SourceConfig.SCROLL_TIME); + int scrollSize = readonlyConfig.get(SourceConfig.SCROLL_SIZE); + SourceConfig sourceConfig = new SourceConfig(); + sourceConfig.setSource(source); + sourceConfig.setCatalogTable(catalogTable); + sourceConfig.setQuery(query); + sourceConfig.setScrollTime(scrollTime); + sourceConfig.setScrollSize(scrollSize); + sourceConfig.setIndex(index); + sourceConfig.setCatalogTable(catalogTable); + return sourceConfig; } @Override @@ -124,21 +179,23 @@ public Boundedness getBoundedness() { @Override public List getProducedCatalogTables() { - return Collections.singletonList(catalogTable); + return sourceConfigList.stream() + .map(SourceConfig::getCatalogTable) + .collect(Collectors.toList()); } @Override public SourceReader createReader( SourceReader.Context readerContext) { - return new ElasticsearchSourceReader( - readerContext, config, catalogTable.getSeaTunnelRowType()); + return new ElasticsearchSourceReader(readerContext, connectionConfig); } @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) { - return new ElasticsearchSourceSplitEnumerator(enumeratorContext, config, source); + return new ElasticsearchSourceSplitEnumerator( + enumeratorContext, connectionConfig, sourceConfigList); } @Override @@ -147,7 +204,7 @@ public SourceReader createReader( SourceSplitEnumerator.Context enumeratorContext, ElasticsearchSourceState sourceState) { return new ElasticsearchSourceSplitEnumerator( - enumeratorContext, sourceState, config, source); + enumeratorContext, sourceState, connectionConfig, sourceConfigList); } @VisibleForTesting @@ -162,4 +219,13 @@ public static SeaTunnelDataType[] getSeaTunnelDataType( } return fieldTypes; } + + private Map> getFieldTypeMapping( + String index, List source) { + // EsRestClient#getFieldTypeMapping may throw runtime exception + // so here we use try-resources-finally to close the resource + try (EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig)) { + return esRestClient.getFieldTypeMapping(index, source); + } + } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java index 6ff08b7d069..8f41256e37c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; @@ -40,10 +39,10 @@ import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX; +import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX_LIST; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.QUERY; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME; -import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE; @AutoService(Factory.class) public class ElasticsearchSourceFactory implements TableSourceFactory { @@ -55,8 +54,10 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HOSTS, INDEX) + .required(HOSTS) .optional( + INDEX, + INDEX_LIST, USERNAME, PASSWORD, SCROLL_TIME, @@ -68,7 +69,6 @@ public OptionRule optionRule() { TLS_KEY_STORE_PASSWORD, TLS_TRUST_STORE_PATH, TLS_TRUST_STORE_PASSWORD) - .exclusive(SOURCE, TableSchemaOptions.SCHEMA) .build(); } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java index 7d2398816a1..a58c2c622d8 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.SeaTunnelRowDeserializer; @@ -44,27 +44,23 @@ public class ElasticsearchSourceReader SourceReader.Context context; - private final ReadonlyConfig config; + private final ReadonlyConfig connConfig; private EsRestClient esRestClient; - private final SeaTunnelRowDeserializer deserializer; - Deque splits = new LinkedList<>(); boolean noMoreSplit; private final long pollNextWaitTime = 1000L; - public ElasticsearchSourceReader( - SourceReader.Context context, ReadonlyConfig config, SeaTunnelRowType rowTypeInfo) { + public ElasticsearchSourceReader(SourceReader.Context context, ReadonlyConfig connConfig) { this.context = context; - this.config = config; - this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo); + this.connConfig = connConfig; } @Override public void open() { - esRestClient = EsRestClient.createInstance(this.config); + esRestClient = EsRestClient.createInstance(this.connConfig); } @Override @@ -77,7 +73,10 @@ public void pollNext(Collector output) throws Exception { synchronized (output.getCheckpointLock()) { ElasticsearchSourceSplit split = splits.poll(); if (split != null) { - SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo(); + SeaTunnelRowType seaTunnelRowType = split.getSeaTunnelRowType(); + SeaTunnelRowDeserializer deserializer = + new DefaultSeaTunnelRowDeserializer(seaTunnelRowType); + SourceConfig sourceIndexInfo = split.getSourceConfig(); ScrollResult scrollResult = esRestClient.searchByScroll( sourceIndexInfo.getIndex(), @@ -85,12 +84,12 @@ public void pollNext(Collector output) throws Exception { sourceIndexInfo.getQuery(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize()); - outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) { scrollResult = esRestClient.searchWithScrollId( scrollResult.getScrollId(), sourceIndexInfo.getScrollTime()); - outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output); + outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer); } } else if (noMoreSplit) { // signal to the source that we have reached the end of the data. @@ -103,10 +102,15 @@ public void pollNext(Collector output) throws Exception { } private void outputFromScrollResult( - ScrollResult scrollResult, List source, Collector output) { + ScrollResult scrollResult, + SourceConfig sourceConfig, + Collector output, + SeaTunnelRowDeserializer deserializer) { + List source = sourceConfig.getSource(); + String tableId = sourceConfig.getCatalogTable().getTablePath().toString(); for (Map doc : scrollResult.getDocs()) { SeaTunnelRow seaTunnelRow = - deserializer.deserialize(new ElasticsearchRecord(doc, source)); + deserializer.deserialize(new ElasticsearchRecord(doc, source, tableId)); output.collect(seaTunnelRow); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java index f2ad78fa5c4..3c7d25b5b49 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java @@ -18,7 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source; import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig; import lombok.AllArgsConstructor; import lombok.Getter; @@ -32,7 +33,11 @@ public class ElasticsearchSourceSplit implements SourceSplit { private String splitId; - @Getter private SourceIndexInfo sourceIndexInfo; + @Getter private SourceConfig sourceConfig; + + public SeaTunnelRowType getSeaTunnelRowType() { + return sourceConfig.getCatalogTable().getSeaTunnelRowType(); + } @Override public String splitId() { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index 107aaac322a..5e3356ebd65 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -19,11 +19,10 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; import lombok.extern.slf4j.Slf4j; @@ -45,7 +44,7 @@ public class ElasticsearchSourceSplitEnumerator private final SourceSplitEnumerator.Context context; - private final ReadonlyConfig config; + private final ReadonlyConfig connConfig; private EsRestClient esRestClient; @@ -53,36 +52,36 @@ public class ElasticsearchSourceSplitEnumerator private Map> pendingSplit; - private final List source; + private final List sourceConfigs; private volatile boolean shouldEnumerate; public ElasticsearchSourceSplitEnumerator( SourceSplitEnumerator.Context context, - ReadonlyConfig config, - List source) { - this(context, null, config, source); + ReadonlyConfig connConfig, + List sourceConfigs) { + this(context, null, connConfig, sourceConfigs); } public ElasticsearchSourceSplitEnumerator( SourceSplitEnumerator.Context context, ElasticsearchSourceState sourceState, - ReadonlyConfig config, - List source) { + ReadonlyConfig connConfig, + List sourceConfigs) { this.context = context; - this.config = config; + this.connConfig = connConfig; this.pendingSplit = new HashMap<>(); this.shouldEnumerate = sourceState == null; if (sourceState != null) { this.shouldEnumerate = sourceState.isShouldEnumerate(); this.pendingSplit.putAll(sourceState.getPendingSplit()); } - this.source = source; + this.sourceConfigs = sourceConfigs; } @Override public void open() { - esRestClient = EsRestClient.createInstance(config); + esRestClient = EsRestClient.createInstance(connConfig); } @Override @@ -140,26 +139,22 @@ private void assignSplit(Collection readers) { private List getElasticsearchSplit() { List splits = new ArrayList<>(); - String scrollTime = config.get(SourceConfig.SCROLL_TIME); - int scrollSize = config.get(SourceConfig.SCROLL_SIZE); - Map query = config.get(SourceConfig.QUERY); - List indexDocsCounts = - esRestClient.getIndexDocsCount(config.get(SourceConfig.INDEX)); - indexDocsCounts = - indexDocsCounts.stream() - .filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) - .sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)) - .collect(Collectors.toList()); - for (IndexDocsCount indexDocsCount : indexDocsCounts) { - splits.add( - new ElasticsearchSourceSplit( - String.valueOf(indexDocsCount.getIndex().hashCode()), - new SourceIndexInfo( - indexDocsCount.getIndex(), - source, - query, - scrollTime, - scrollSize))); + for (SourceConfig sourceConfig : sourceConfigs) { + + String index = sourceConfig.getIndex(); + List indexDocsCounts = esRestClient.getIndexDocsCount(index); + indexDocsCounts = + indexDocsCounts.stream() + .filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) + .sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)) + .collect(Collectors.toList()); + for (IndexDocsCount indexDocsCount : indexDocsCounts) { + SourceConfig cloneCfg = sourceConfig.clone(); + cloneCfg.setIndex(indexDocsCount.getIndex()); + splits.add( + new ElasticsearchSourceSplit( + String.valueOf(indexDocsCount.getIndex().hashCode()), cloneCfg)); + } } return splits; } @@ -185,7 +180,7 @@ public int currentUnassignedSplitSize() { @Override public void handleSplitRequest(int subtaskId) { throw new ElasticsearchConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + CommonErrorCode.OPERATION_NOT_SUPPORTED, "Unsupported handleSplitRequest: " + subtaskId); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index e92c3993780..650918e3fc6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -68,17 +69,27 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j public class ElasticsearchIT extends TestSuiteBase implements TestResource { - private List testDataset; + private static final long INDEX_REFRESH_MILL_DELAY = 5000L; + + private List testDataset1; + + private List testDataset2; private ElasticsearchContainer container; @@ -114,7 +125,8 @@ public void startUp() throws Exception { Optional.empty(), Optional.empty(), Optional.empty()); - testDataset = generateTestDataSet(); + testDataset1 = generateTestDataSet1(); + testDataset2 = generateTestDataSet2(); createIndexForResourceNull("st_index"); createIndexDocs(); createIndexWithFullType(); @@ -123,15 +135,18 @@ public void startUp() throws Exception { /** create a index,and bulk some documents */ private void createIndexDocs() { + createIndexDocsByName("st_index"); + } + + private void createIndexDocsByName(String indexName) { + createIndexDocsByName(indexName, testDataset1); + } + + private void createIndexDocsByName(String indexName, List testDataSet) { StringBuilder requestBody = new StringBuilder(); - Map indexInner = new HashMap<>(); - indexInner.put("_index", "st"); - - Map> indexParam = new HashMap<>(); - indexParam.put("index", indexInner); - String indexHeader = "{\"index\":{\"_index\":\"st_index\"}\n"; - for (int i = 0; i < testDataset.size(); i++) { - String row = testDataset.get(i); + String indexHeader = String.format("{\"index\":{\"_index\":\"%s\"}\n", indexName); + for (int i = 0; i < testDataSet.size(); i++) { + String row = testDataSet.get(i); requestBody.append(indexHeader); requestBody.append(row); requestBody.append("\n"); @@ -159,7 +174,7 @@ private void createIndexWithFullType() throws IOException, InterruptedException + "\n"); Assertions.assertFalse(response.isErrors(), response.getResponse()); // waiting index refresh - Thread.sleep(2000L); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); Assertions.assertEquals( 2, esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount()); } @@ -175,16 +190,121 @@ private void createIndexForResourceNull(String indexName) throws IOException { } @TestTemplate - public void testElasticsearch(TestContainer container) + public void testElasticsearchWithSchema(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/elasticsearch/elasticsearch_source_and_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - List sinkData = readSinkData("st_index2"); + List sinkData = readSinkDataWithSchema("st_index2"); // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}} Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Currently FLINK do not support multiple table read") + public void testElasticsSearchWithMultiSourceByFilter(TestContainer container) + throws InterruptedException, IOException { + // read read_filter_index1,read_filter_index2 + // write into read_filter_index1_copy,read_filter_index2_copy + createIndexDocsByName("read_filter_index1", testDataset1); + createIndexDocsByName("read_filter_index2", testDataset2); + + Container.ExecResult execResult = + container.executeJob( + "/elasticsearch/elasticsearch_multi_source_and_sink_by_filter.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + HashMap rangeParam = new HashMap<>(); + rangeParam.put("gte", 10); + rangeParam.put("lte", 20); + HashMap range1 = new HashMap<>(); + range1.put("c_int", rangeParam); + Map query1 = new HashMap<>(); + query1.put("range", range1); + + Map query2 = new HashMap<>(); + HashMap range2 = new HashMap<>(); + range2.put("c_int2", rangeParam); + query2.put("range", range2); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(INDEX_REFRESH_MILL_DELAY)); + Set sinkData1 = + new HashSet<>( + getDocsWithTransformDate( + // read all field + Collections.emptyList(), + // read indexName + "read_filter_index1_copy", + // allowed c_null serialized if null + Lists.newArrayList("c_null"), + // query condition + query1, + // transformDate field:c_date + Lists.newArrayList("c_date"), + // order field + "c_int")); + + List index1Data = + mapTestDatasetForDSL( + // use testDataset1 + testDataset1, + // filter testDataset1 match sinkData1 + doc -> { + if (doc.has("c_int")) { + int cInt = doc.get("c_int").asInt(); + return cInt >= 10 && cInt <= 20; + } + return false; + }, + // mapping document all field to string + JsonNode::toString); + Assertions.assertEquals(sinkData1.size(), index1Data.size()); + index1Data.forEach(sinkData1::remove); + // data is completely consistent, and the size is zero after deletion + Assertions.assertEquals(0, sinkData1.size()); + + List index2Data = + mapTestDatasetForDSL( + testDataset2, + // use customer predicate filter data to match sinkData2 + doc -> { + if (doc.has("c_int2")) { + int cInt = doc.get("c_int2").asInt(); + return cInt >= 10 && cInt <= 20; + } + return false; + }, + // mapping doc to string,keep only three fields + doc -> { + Map map = new HashMap<>(); + map.put("c_int2", doc.get("c_int2")); + map.put("c_null2", doc.get("c_null2")); + map.put("c_date2", doc.get("c_date2")); + return JsonUtils.toJsonString(map); + }); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(INDEX_REFRESH_MILL_DELAY)); + Set sinkData2 = + new HashSet<>( + getDocsWithTransformDate( + // read three fields from index + Lists.newArrayList("c_int2", "c_null2", "c_date2"), + "read_filter_index2_copy", + //// allowed c_null serialized if null + Lists.newArrayList("c_null2"), + query2, + // // transformDate field:c_date2 + Lists.newArrayList("c_date2"), + // order by c_int2 + "c_int2")); + Assertions.assertEquals(sinkData2.size(), index2Data.size()); + index2Data.forEach(sinkData2::remove); + Assertions.assertEquals(0, sinkData2.size()); + } + @DisabledOnContainer( value = {}, type = {EngineType.FLINK}, @@ -234,7 +354,7 @@ public void testElasticsearchWithFullType(TestContainer container) Container.ExecResult execResult = container.executeJob("/elasticsearch/elasticsearch_source_and_sink_full_type.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - Thread.sleep(2000L); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); Assertions.assertEquals( 1, esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount()); @@ -248,12 +368,12 @@ public void testElasticsearchWithoutSchema(TestContainer container) container.executeJob( "/elasticsearch/elasticsearch_source_without_schema_and_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - List sinkData = readSinkDataWithOutSchema(); + List sinkData = readSinkDataWithOutSchema("st_index4"); // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}} Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData); } - private List generateTestDataSet() throws JsonProcessingException { + private List generateTestDataSet1() throws JsonProcessingException { String[] fields = new String[] { "c_map", @@ -304,17 +424,83 @@ private List generateTestDataSet() throws JsonProcessingException { return documents; } - private List readSinkDataWithOutSchema() throws InterruptedException { + private List generateTestDataSet2() throws JsonProcessingException { + String[] fields = + new String[] { + "c_map2", + "c_array2", + "c_string2", + "c_boolean2", + "c_tinyint2", + "c_smallint2", + "c_bigint2", + "c_float2", + "c_double2", + "c_decimal2", + "c_bytes2", + "c_int2", + "c_date2", + "c_timestamp2", + "c_null2" + }; + + List documents = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 100; i++) { + Map doc = new HashMap<>(); + Object[] values = + new Object[] { + Collections.singletonMap("key2", Short.parseShort(String.valueOf(i))), + new Byte[] { + Byte.parseByte("11"), Byte.parseByte("22"), Byte.parseByte("33") + }, + "string2", + Boolean.FALSE, + Byte.parseByte("2"), + Short.parseShort("2"), + Long.parseLong("2"), + Float.parseFloat("2.2"), + Double.parseDouble("2.2"), + BigDecimal.valueOf(22, 1), + "test2".getBytes(), + i, + LocalDate.now().toString(), + System.currentTimeMillis(), + // Null values are also a basic use case for testing + null + }; + for (int j = 0; j < fields.length; j++) { + doc.put(fields[j], values[j]); + } + documents.add(objectMapper.writeValueAsString(doc)); + } + return documents; + } + + private List readSinkDataWithOutSchema(String indexName) throws InterruptedException { Map> esFieldType = - esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList()); - Thread.sleep(2000); + esRestClient.getFieldTypeMapping(indexName, Lists.newArrayList()); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); List source = new ArrayList<>(esFieldType.keySet()); - return getDocsWithTransformDate(source, "st_index4"); + return getDocsWithTransformDate(source, indexName); } - private List readSinkData(String index) throws InterruptedException { + // Null values are also a basic use case for testing + // To ensure consistency in comparisons, we need to explicitly serialize null values. + private List readSinkDataWithOutSchema(String indexName, List nullAllowedFields) + throws InterruptedException { + Map> esFieldType = + esRestClient.getFieldTypeMapping(indexName, Lists.newArrayList()); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); + List source = new ArrayList<>(esFieldType.keySet()); + return getDocsWithTransformDate(source, indexName, nullAllowedFields); + } + + // The timestamp type in Elasticsearch is incompatible with that in Seatunnel, + // and we need to handle the conversion here. + private List readSinkDataWithSchema(String index) throws InterruptedException { // wait for index refresh - Thread.sleep(2000); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); List source = Lists.newArrayList( "c_map", @@ -338,7 +524,7 @@ private List readSinkData(String index) throws InterruptedException { private List readMultiSinkData(String index, List source) throws InterruptedException { // wait for index refresh - Thread.sleep(2000); + Thread.sleep(INDEX_REFRESH_MILL_DELAY); Map query = new HashMap<>(); query.put("match_all", Maps.newHashMap()); @@ -394,6 +580,19 @@ private List getDocsWithTransformTimestamp(List source, String i } private List getDocsWithTransformDate(List source, String index) { + return getDocsWithTransformDate(source, index, Collections.emptyList()); + } + + /** + * use default query: c_int >= 10 and c_int <=20 + * + * @param source The field to be read + * @param index indexName + * @param nullAllowedFields If the value of the field is null, it will be serialized to 'null' + * @return serialized data as jsonString + */ + private List getDocsWithTransformDate( + List source, String index, List nullAllowedFields) { HashMap rangeParam = new HashMap<>(); rangeParam.put("gte", 10); rangeParam.put("lte", 20); @@ -409,6 +608,11 @@ private List getDocsWithTransformDate(List source, String index) x.remove("_index"); x.remove("_type"); x.remove("_id"); + for (String field : nullAllowedFields) { + if (!x.containsKey(field)) { + x.put(field, null); + } + } x.replace( "c_date", LocalDate.parse( @@ -427,7 +631,75 @@ private List getDocsWithTransformDate(List source, String index) return docs; } + /** + * use customer query read data + * + * @param source The field to be read + * @param index read index + * @param nullAllowedFields If the value of the field is null, it will be serialized to 'null' + * @param query dls query + * @param dateFields dateField will format with yyyy-MM-dd'T'HH:mm + * @param orderField how to oder data + * @return serialized data as jsonString + */ + private List getDocsWithTransformDate( + List source, + String index, + List nullAllowedFields, + Map query, + List dateFields, + String orderField) { + ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000); + scrollResult + .getDocs() + .forEach( + x -> { + x.remove("_index"); + x.remove("_type"); + x.remove("_id"); + for (String field : nullAllowedFields) { + if (!x.containsKey(field)) { + x.put(field, null); + } + } + for (String dateField : dateFields) { + if (x.containsKey(dateField)) { + x.replace( + dateField, + LocalDate.parse( + x.get(dateField).toString(), + DateTimeFormatter.ofPattern( + "yyyy-MM-dd'T'HH:mm")) + .toString()); + } + } + }); + List docs = + scrollResult.getDocs().stream() + .sorted( + Comparator.comparingInt( + o -> Integer.parseInt(o.get(orderField).toString()))) + .map(JsonUtils::toJsonString) + .collect(Collectors.toList()); + return docs; + } + + /** + * default testDataset1 + * + * @return testDataset1 as jsonString array + */ private List mapTestDatasetForDSL() { + return mapTestDatasetForDSL(testDataset1); + } + + /** + * default query filter,c_int >=10 and c_int <= 20 + * + * @param testDataset testDataset + * @return c_int >=10 and c_int <= 20 filtered data + */ + private List mapTestDatasetForDSL(List testDataset) { return testDataset.stream() .map(JsonUtils::parseObject) .filter( @@ -442,6 +714,25 @@ private List mapTestDatasetForDSL() { .collect(Collectors.toList()); } + /** + * Use custom filtering criteria to query data + * + * @param testDataset testDataset + * @param predicate customer query filter + * @param mapStrFunc mapping doc to string + * @return filtered data + */ + private List mapTestDatasetForDSL( + List testDataset, + Predicate predicate, + Function mapStrFunc) { + return testDataset.stream() + .map(JsonUtils::parseObject) + .filter(predicate) + .map(mapStrFunc) + .collect(Collectors.toList()); + } + @AfterEach @Override public void tearDown() { @@ -488,7 +779,7 @@ public void testCatalog() throws InterruptedException, JsonProcessingException { requestBody.append("\n"); } esRestClient.bulk(requestBody.toString()); - Thread.sleep(2000); // Wait for data to be indexed + Thread.sleep(INDEX_REFRESH_MILL_DELAY); // Wait for data to be indexed // Verify data exists List sourceFields = Arrays.asList("field1", "field2"); @@ -500,7 +791,7 @@ public void testCatalog() throws InterruptedException, JsonProcessingException { // Truncate the table elasticSearchCatalog.truncateTable(tablePath, false); - Thread.sleep(2000); // Wait for data to be indexed + Thread.sleep(INDEX_REFRESH_MILL_DELAY); // Wait for data to be indexed // Verify data is deleted scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); @@ -526,4 +817,18 @@ private List generateTestData() throws JsonProcessingException { } return data; } + + /** + * elastic query all dsl + * + * @return elastic query all dsl + */ + private Map queryAll() { + // "query": { + // "match_all": {} + // } + Map matchAll = new HashMap<>(); + matchAll.put("match_all", new HashMap<>()); + return matchAll; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_multi_source_and_sink_by_filter.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_multi_source_and_sink_by_filter.conf new file mode 100644 index 00000000000..9ac7d3743aa --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_multi_source_and_sink_by_filter.conf @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + #checkpoint.interval = 10000 +} + +source { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + index_list = [ + { + index = "read_filter_index1" + query = {"range": {"c_int": {"gte": 10, "lte": 20}}} + source = [ + c_map, + c_array, + c_string, + c_boolean, + c_tinyint, + c_smallint, + c_bigint, + c_float, + c_double, + c_decimal, + c_bytes, + c_int, + c_date, + c_timestamp, + c_null + ] + array_column = { + c_array = "array" + } + } + { + index = "read_filter_index2" + query = {"range": {"c_int2": {"gte": 10, "lte": 20}}} + source = [ + c_int2, + c_null2, + c_date2 + ] + + } + + ] + + } +} + +transform { +} + +sink { + Elasticsearch { + hosts = ["https://elasticsearch:9200"] + username = "elastic" + password = "elasticsearch" + tls_verify_certificate = false + tls_verify_hostname = false + + index = "${table_name}_copy" + index_type = "st" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +}