Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Elastic search] Support multi-table source feature #7502

Merged
merged 22 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2d55adc
[improve][elasticsSearch]source support multiSource
FuYouJ Aug 26, 2024
22765c1
Merge branch 'dev' into esMultiSource
FuYouJ Aug 26, 2024
1d6c8d7
[improve][elasticsSearch]source support multiSource,update docs
FuYouJ Aug 26, 2024
26d0473
update paimon.apache.org deadlink
FuYouJ Aug 26, 2024
2fbbfa7
Revert "update paimon.apache.org deadlink"
FuYouJ Aug 26, 2024
eb6a269
Merge branch 'dev' into esMultiSource
FuYouJ Aug 27, 2024
8e45aa6
[improve][elasticsSearch][document] add source chinese document
FuYouJ Aug 27, 2024
849fe75
[improve][elasticsSearch]spark multi-table synchronization test is en…
FuYouJ Aug 27, 2024
c348e7c
[improve][elasticsSearch]docs remove schema demo,e2eTest add null case
FuYouJ Aug 28, 2024
50a5da4
[improve][elasticsSearch]use checkIndexExist method instead of getInd…
FuYouJ Aug 29, 2024
aa900c5
[improve][elasticsSearch]Undo the modifications to the st_index_full_…
FuYouJ Aug 31, 2024
2860a9f
[improve][elasticsSearch] index refresh time as a static variable
FuYouJ Aug 31, 2024
9860d52
[improve][elasticsSearch]remove useless //
FuYouJ Aug 31, 2024
4925fb6
[improve][elasticsSearch]multiple indexes use different test data sets
FuYouJ Aug 31, 2024
95e923b
[improve][elasticsSearch]EsRestClient implements Closeable
FuYouJ Aug 31, 2024
c0e657d
Merge branch 'dev' into esMultiSource
FuYouJ Sep 2, 2024
096fbfd
[improve][elasticsSearch]Read different fields and write indexes to d…
FuYouJ Sep 2, 2024
470fab3
[improve][elasticsSearch]Read different fields and write indexes to d…
FuYouJ Sep 2, 2024
7af4eac
[improve][elasticsSearch]Read different fields and write indexes to d…
FuYouJ Sep 2, 2024
a29b525
[improve][elasticsSearch]add thread sleep code to ensure test stability
FuYouJ Sep 2, 2024
a477e6a
Merge remote-tracking branch 'origin/dev' into esMultiSource
FuYouJ Sep 3, 2024
c9a2cc0
[improve][elasticsSearch]add warning log, adjust the description of t…
FuYouJ Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 108 additions & 20 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@ support version >= 2.x and <= 8.x.

## Options

| name | type | required | default value |
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
|-------------------------|---------|----------|-------------------|
| hosts | array | yes | - |
| username | string | no | - |
| password | string | no | - |
| index | string | yes | - |
| source | array | no | - |
| query | json | no | {"match_all": {}} |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| array_column | map | no | |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
| tls_truststore_password | string | no | - |
| common-options | | no | - |
| name | type | required | default value |
| ----------------------- | ------- | -------- | ------------------------------------ |
| hosts | array | yes | - |
| username | string | no | - |
| password | string | no | - |
| index | string | yes | - |
| source | array | no | - |
| query | json | no | {"match_all": {}} |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
| index_list | array | no | used to define a multiple table task |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| array_column | map | no | |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
| tls_truststore_password | string | no | - |
| common-options | | no | - |

### hosts [array]

Expand Down Expand Up @@ -78,6 +79,10 @@ Amount of time Elasticsearch will keep the search context alive for scroll reque

Maximum number of hits to be returned with each Elasticsearch scroll request.

### index_list [array]

The `index_list` is used to define multi-index synchronization tasks. It is an array that contains the parameters required for single-table synchronization, such as `query`, `source/schema`, `scroll_size`, and `scroll_time`. It is recommended that `index_list` and `query` should not be configured at the same level simultaneously. Please refer to the upcoming multi-table synchronization example for more details.

### tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints
Expand Down Expand Up @@ -147,6 +152,90 @@ Elasticsearch {
}
```

Multi-table synchronization

> This example demonstrates how to read data from read_index1 and read_index2 and write it into the multi_source_write_test_index index. In read_index1, I used source to specify the fields to be read and to indicate which fields are array fields. In read_index2, I used schema to define the fields to be read (not recommended).

```hocon
source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index_list = [
{
index = "read_index1"
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
source = [
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp]
array_column = {
c_array = "array<tinyint>"
}
}
{
index = "read_index2"
query = {"match_all": {}}
schema = {
fields {
c_map = "map<string, tinyint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_int = int
c_date = date
c_timestamp = timestamp
}
}
}

]

}
}

transform {
}

sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false

index = "multi_source_write_test_index"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
}
```



SSL (Disable certificates validation)

```hocon
Expand Down Expand Up @@ -196,5 +285,4 @@ source {

- Add Elasticsearch Source Connector
- [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/seatunnel/pull/3997))
- [Feature] Support DSL

- [Feature] Support DSL
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
- [Connector-V2] [Clickhouse] Add a new optional configuration `clickhouse.config` to the source connector of ClickHouse (#7143)
- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,31 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.catalog.CatalogTable;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SourceConfig {
@Getter
@Setter
public class SourceConfig implements Serializable {

public static final Option<List<Map<String, Object>>> INDEX_LIST =
Options.key("index_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("index_list for multiTable sync");

public static final Option<String> INDEX =
Options.key("index")
Expand Down Expand Up @@ -61,11 +77,30 @@ public class SourceConfig {
.withDescription(
"Maximum number of hits to be returned with each Elasticsearch scroll request");

public static final Option<Map> QUERY =
public static final Option<Map<String, Object>> QUERY =
Options.key("query")
.objectType(Map.class)
.type(new TypeReference<Map<String, Object>>() {})
.defaultValue(
Collections.singletonMap("match_all", new HashMap<String, String>()))
.withDescription(
"Elasticsearch query language. You can control the range of data read");

private String index;
private List<String> source;
private Map<String, Object> query;
private String scrollTime;
private int scrollSize;

private CatalogTable catalogTable;

public SourceConfig clone() {
SourceConfig sourceConfig = new SourceConfig();
sourceConfig.setIndex(index);
sourceConfig.setSource(new ArrayList<>(source));
sourceConfig.setQuery(new HashMap<>(query));
sourceConfig.setScrollTime(scrollTime);
sourceConfig.setScrollSize(scrollSize);
sourceConfig.setCatalogTable(catalogTable);
return sourceConfig;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
fieldName, value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)),
ex);
}
return new SeaTunnelRow(seaTunnelFields);
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(seaTunnelFields);
seaTunnelRow.setTableId(rowRecord.getTableId());
return seaTunnelRow;
}

Object convertValue(SeaTunnelDataType<?> fieldType, String fieldValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@
public class ElasticsearchRecord {
private Map<String, Object> doc;
private List<String> source;

private String tableId;
}
Loading
Loading