From e30abb435d53bb9de062a96f8c32d1e6f156e3f9 Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Thu, 1 Aug 2024 10:07:22 +0800 Subject: [PATCH 1/2] [INLONG-10721][Sort] Add elasticsearch7 connector on flink 1.18. --- .../sort-connectors/elasticsearch7/pom.xml | 137 +++++++ .../Elasticsearch7ApiCallBridge.java | 152 ++++++++ .../Elasticsearch7BulkProcessorIndexer.java | 84 +++++ .../elasticsearch7/ElasticsearchSink.java | 268 ++++++++++++++ .../table/Elasticsearch7Configuration.java | 71 ++++ .../table/Elasticsearch7DynamicSink.java | 340 ++++++++++++++++++ .../Elasticsearch7DynamicSinkFactory.java | 170 +++++++++ .../org.apache.flink.table.factories.Factory | 15 + .../sort-flink-v1.18/sort-connectors/pom.xml | 1 + 9 files changed, 1238 insertions(+) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/pom.xml create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/pom.xml new file mode 100644 index 0000000000..6da05dc994 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/pom.xml @@ -0,0 +1,137 @@ + + + + 4.0.0 + + org.apache.inlong + sort-connectors-v1.18 + 1.14.0-SNAPSHOT + + + sort-connector-elasticsearch7-v1.18 + jar + Apache InLong - Sort-connector-elasticsearch7 + + + ${project.parent.parent.parent.parent.parent.basedir} + 7.10.2 + 3.0.1-1.17 + + + + + org.apache.inlong + sort-flink-dependencies-v1.18 + ${project.version} + provided + + + org.apache.flink + flink-connector-elasticsearch7 + ${elasticsearch.connector.version} + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.inlong + sort-connector-elasticsearch-base-v1.18 + ${project.version} + + + org.elasticsearch.client + elasticsearch-rest-client + ${elasticsearch.version} + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${elasticsearch.version} + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + org.ow2.asm + * + + + + + org.apache.logging.log4j + log4j-api + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + + shade + + package + + true + + + org.apache.inlong:sort-connector-* + + org/apache/inlong/** + META-INF/services/org.apache.flink.table.factories.Factory + + + + *:* + + log4j.properties + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + + + diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java new file mode 100644 index 0000000000..967c064522 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7; + +import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.util.Preconditions; +import org.apache.http.HttpHost; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 7 and later versions. */ +@Internal +public class Elasticsearch7ApiCallBridge + implements + ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -5222683870097809633L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7ApiCallBridge.class); + + /** User-provided HTTP Host. */ + private final List httpHosts; + + /** The factory to configure the rest client. */ + private final RestClientFactory restClientFactory; + + Elasticsearch7ApiCallBridge(List httpHosts, RestClientFactory restClientFactory) { + Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty()); + this.httpHosts = httpHosts; + this.restClientFactory = Preconditions.checkNotNull(restClientFactory); + } + + @Override + public RestHighLevelClient createClient() { + RestClientBuilder builder = + RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])); + restClientFactory.configureRestClientBuilder(builder); + + RestHighLevelClient rhlClient = new RestHighLevelClient(builder); + + return rhlClient; + } + + @Override + public BulkProcessor.Builder createBulkProcessorBuilder( + RestHighLevelClient client, BulkProcessor.Listener listener) { + return BulkProcessor.builder( + (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), + listener); + } + + @Override + public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) { + if (!bulkItemResponse.isFailed()) { + return null; + } else { + return bulkItemResponse.getFailure().getCause(); + } + } + + @Override + public void configureBulkProcessorFlushInterval( + BulkProcessor.Builder builder, long flushIntervalMillis) { + builder.setFlushInterval(TimeValue.timeValueMillis(flushIntervalMillis)); + } + + @Override + public void configureBulkProcessorBackoff( + BulkProcessor.Builder builder, + @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) { + + BackoffPolicy backoffPolicy; + if (flushBackoffPolicy != null) { + switch (flushBackoffPolicy.getBackoffType()) { + case CONSTANT: + backoffPolicy = + BackoffPolicy.constantBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + break; + case EXPONENTIAL: + default: + backoffPolicy = + BackoffPolicy.exponentialBackoff( + new TimeValue(flushBackoffPolicy.getDelayMillis()), + flushBackoffPolicy.getMaxRetryCount()); + } + } else { + backoffPolicy = BackoffPolicy.noBackoff(); + } + + builder.setBackoffPolicy(backoffPolicy); + } + + @Override + public RequestIndexer createBulkProcessorIndexer( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequestsRef) { + return new Elasticsearch7BulkProcessorIndexer( + bulkProcessor, flushOnCheckpoint, numPendingRequestsRef); + } + + @Override + public void verifyClientConnection(RestHighLevelClient client) throws IOException { + if (LOG.isInfoEnabled()) { + LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts); + } + + if (!client.ping(RequestOptions.DEFAULT)) { + throw new RuntimeException("There are no reachable Elasticsearch nodes!"); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString()); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java new file mode 100644 index 0000000000..b4ec3c511a --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7; + +import org.apache.inlong.sort.elasticsearch.RequestIndexer; + +import org.apache.flink.annotation.Internal; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest + * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster. + * + *

Note: This class is binary compatible to Elasticsearch 7. + */ +@Internal +class Elasticsearch7BulkProcessorIndexer implements RequestIndexer { + + private final BulkProcessor bulkProcessor; + private final boolean flushOnCheckpoint; + private final AtomicLong numPendingRequestsRef; + + Elasticsearch7BulkProcessorIndexer( + BulkProcessor bulkProcessor, + boolean flushOnCheckpoint, + AtomicLong numPendingRequestsRef) { + this.bulkProcessor = checkNotNull(bulkProcessor); + this.flushOnCheckpoint = flushOnCheckpoint; + this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef); + } + + @Override + public void add(DeleteRequest... deleteRequests) { + for (DeleteRequest deleteRequest : deleteRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(deleteRequest); + } + } + + @Override + public void add(IndexRequest... indexRequests) { + for (IndexRequest indexRequest : indexRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(indexRequest); + } + } + + @Override + public void add(UpdateRequest... updateRequests) { + for (UpdateRequest updateRequest : updateRequests) { + if (flushOnCheckpoint) { + numPendingRequestsRef.getAndIncrement(); + } + this.bulkProcessor.add(updateRequest); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java new file mode 100644 index 0000000000..f9ddd7e4f3 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7; + +import org.apache.inlong.sort.elasticsearch.ActionRequestFailureHandler; +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase; +import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; +import org.apache.inlong.sort.elasticsearch.util.NoOpFailureHandler; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.util.Preconditions; +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Elasticsearch 7.x sink that requests multiple {@link ActionRequest ActionRequests} against a + * cluster for each incoming element. + * + *

The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch + * cluster. The sink will fail if no cluster can be connected to using the provided transport + * addresses passed to the constructor. + * + *

Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest + * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour + * of the {@code BulkProcessor} can be configured using these config keys: + * + *

    + *
  • {@code bulk.flush.max.actions}: Maximum amount of elements to buffer + *
  • {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + *
  • {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + *
+ * + *

You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation + * of {@link ElasticsearchSinkFunction} for an example. + * + * @param Type of the elements handled by this sink + */ +@PublicEvolving +public class ElasticsearchSink extends ElasticsearchSinkBase { + + private static final long serialVersionUID = 1L; + + private ElasticsearchSink( + Map bulkRequestsConfig, + List httpHosts, + ElasticsearchSinkFunction elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler, + RestClientFactory restClientFactory) { + + super( + new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory), + bulkRequestsConfig, + elasticsearchSinkFunction, + failureHandler); + } + + /** + * A builder for creating an {@link ElasticsearchSink}. + * + * @param Type of the elements handled by the sink this builder creates. + * @deprecated This has been deprecated, please use {@link + * org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder}. + */ + @Deprecated + @PublicEvolving + public static class Builder { + + private final List httpHosts; + private final ElasticsearchSinkFunction elasticsearchSinkFunction; + + private Map bulkRequestsConfig = new HashMap<>(); + private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler(); + private RestClientFactory restClientFactory = restClientBuilder -> { + }; + + /** + * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link + * RestHighLevelClient}. + * + * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient} + * connects to. + * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} + * from the incoming element. + */ + public Builder( + List httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) { + this.httpHosts = Preconditions.checkNotNull(httpHosts); + this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction); + } + + /** + * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to + * disable it. + * + * @param numMaxActions the maximum number of actions to buffer per bulk request. + */ + public void setBulkFlushMaxActions(int numMaxActions) { + Preconditions.checkArgument( + numMaxActions == -1 || numMaxActions > 0, + "Max number of buffered actions must be larger than 0."); + + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions)); + } + + /** + * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to + * disable it. + * + * @param maxSizeMb the maximum size of buffered actions, in mb. + */ + public void setBulkFlushMaxSizeMb(int maxSizeMb) { + Preconditions.checkArgument( + maxSizeMb == -1 || maxSizeMb > 0, + "Max size of buffered actions must be larger than 0."); + + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb)); + } + + /** + * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it. + * + * @param intervalMillis the bulk flush interval, in milliseconds. + */ + public void setBulkFlushInterval(long intervalMillis) { + Preconditions.checkArgument( + intervalMillis == -1 || intervalMillis >= 0, + "Interval (in milliseconds) between each flush must be larger than or equal to 0."); + + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis)); + } + + /** + * Sets whether or not to enable bulk flush backoff behaviour. + * + * @param enabled whether or not to enable backoffs. + */ + public void setBulkFlushBackoff(boolean enabled) { + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled)); + } + + /** + * Sets the type of back of to use when flushing bulk requests. + * + * @param flushBackoffType the backoff type to use. + */ + public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) { + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE, + Preconditions.checkNotNull(flushBackoffType).toString()); + } + + /** + * Sets the maximum number of retries for a backoff attempt when flushing bulk requests. + * + * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk + * requests + */ + public void setBulkFlushBackoffRetries(int maxRetries) { + Preconditions.checkArgument( + maxRetries > 0, "Max number of backoff attempts must be larger than 0."); + + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries)); + } + + /** + * Sets the amount of delay between each backoff attempt when flushing bulk requests, in + * milliseconds. + * + * @param delayMillis the amount of delay between each backoff attempt when flushing bulk + * requests, in milliseconds. + */ + public void setBulkFlushBackoffDelay(long delayMillis) { + Preconditions.checkArgument( + delayMillis >= 0, + "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0."); + this.bulkRequestsConfig.put( + CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis)); + } + + /** + * Sets a failure handler for action requests. + * + * @param failureHandler This is used to handle failed {@link ActionRequest}. + */ + public void setFailureHandler(ActionRequestFailureHandler failureHandler) { + this.failureHandler = Preconditions.checkNotNull(failureHandler); + } + + /** + * Sets a REST client factory for custom client configuration. + * + * @param restClientFactory the factory that configures the rest client. + */ + public void setRestClientFactory(RestClientFactory restClientFactory) { + this.restClientFactory = Preconditions.checkNotNull(restClientFactory); + } + + /** + * Creates the Elasticsearch sink. + * + * @return the created Elasticsearch sink. + */ + public ElasticsearchSink build() { + return new ElasticsearchSink<>( + bulkRequestsConfig, + httpHosts, + elasticsearchSinkFunction, + failureHandler, + restClientFactory); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Builder builder = (Builder) o; + return Objects.equals(httpHosts, builder.httpHosts) + && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction) + && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig) + && Objects.equals(failureHandler, builder.failureHandler) + && Objects.equals(restClientFactory, builder.restClientFactory); + } + + @Override + public int hashCode() { + return Objects.hash( + httpHosts, + elasticsearchSinkFunction, + bulkRequestsConfig, + failureHandler, + restClientFactory); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java new file mode 100644 index 0000000000..e64006a882 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7.table; + +import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.http.HttpHost; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; + +/** Elasticsearch 7 specific configuration. */ +@Internal +final class Elasticsearch7Configuration extends ElasticsearchConfiguration { + + Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) { + super(config, classLoader); + } + + public List getHosts() { + return config.get(HOSTS_OPTION).stream() + .map(Elasticsearch7Configuration::validateAndParseHostsString) + .collect(Collectors.toList()); + } + + private static HttpHost validateAndParseHostsString(String host) { + try { + HttpHost httpHost = HttpHost.create(host); + if (httpHost.getPort() < 0) { + throw new ValidationException( + String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.", + host, HOSTS_OPTION.key())); + } + + if (httpHost.getSchemeName() == null) { + throw new ValidationException( + String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.", + host, HOSTS_OPTION.key())); + } + return httpHost; + } catch (Exception e) { + throw new ValidationException( + String.format( + "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.", + host, HOSTS_OPTION.key()), + e); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java new file mode 100644 index 0000000000..bed560d157 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7.table; + +import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory; +import org.apache.inlong.sort.elasticsearch.table.KeyExtractor; +import org.apache.inlong.sort.elasticsearch.table.RequestFactory; +import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction; +import org.apache.inlong.sort.elasticsearch7.ElasticsearchSink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.xcontent.XContentType; + +import javax.annotation.Nullable; + +import java.time.ZoneId; +import java.util.List; +import java.util.Objects; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a + * logical description. + */ +@Internal +final class Elasticsearch7DynamicSink implements DynamicTableSink { + + @VisibleForTesting + static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory(); + + private final EncodingFormat> format; + private final TableSchema schema; + private final Elasticsearch7Configuration config; + private final ZoneId localTimeZoneId; + private final boolean isDynamicIndexWithSystemTime; + + public Elasticsearch7DynamicSink( + EncodingFormat> format, + Elasticsearch7Configuration config, + TableSchema schema, + ZoneId localTimeZoneId) { + this(format, config, schema, localTimeZoneId, (ElasticsearchSink.Builder::new)); + } + + // -------------------------------------------------------------- + // Hack to make configuration testing possible. + // + // The code in this block should never be used outside of tests. + // Having a way to inject a builder we can assert the builder in + // the test. We can not assert everything though, e.g. it is not + // possible to assert flushing on checkpoint, as it is configured + // on the sink itself. + // -------------------------------------------------------------- + + private final ElasticSearchBuilderProvider builderProvider; + + @FunctionalInterface + interface ElasticSearchBuilderProvider { + + ElasticsearchSink.Builder createBuilder( + List httpHosts, RowElasticsearchSinkFunction upsertSinkFunction); + } + + Elasticsearch7DynamicSink( + EncodingFormat> format, + Elasticsearch7Configuration config, + TableSchema schema, + ZoneId localTimeZoneId, + ElasticSearchBuilderProvider builderProvider) { + this.format = format; + this.schema = schema; + this.config = config; + this.localTimeZoneId = localTimeZoneId; + this.isDynamicIndexWithSystemTime = isDynamicIndexWithSystemTime(); + this.builderProvider = builderProvider; + } + + // -------------------------------------------------------------- + // End of hack to make configuration testing possible + // -------------------------------------------------------------- + + public boolean isDynamicIndexWithSystemTime() { + IndexGeneratorFactory.IndexHelper indexHelper = new IndexGeneratorFactory.IndexHelper(); + return indexHelper.checkIsDynamicIndexWithSystemTimeFormat(config.getIndex()); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } + } + if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) { + throw new ValidationException( + "Dynamic indexing based on system time only works on append only stream."); + } + return builder.build(); + } + + @Override + public SinkFunctionProvider getSinkRuntimeProvider(Context context) { + return () -> { + SerializationSchema format = + this.format.createRuntimeEncoder(context, schema.toRowDataType()); + + final RowElasticsearchSinkFunction upsertFunction = + new RowElasticsearchSinkFunction( + IndexGeneratorFactory.createIndexGenerator( + config.getIndex(), schema, localTimeZoneId), + null, // this is deprecated in es 7+ + format, + XContentType.JSON, + REQUEST_FACTORY, + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + + final ElasticsearchSink.Builder builder = + builderProvider.createBuilder(config.getHosts(), upsertFunction); + + builder.setFailureHandler(config.getFailureHandler()); + builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions()); + builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20)); + builder.setBulkFlushInterval(config.getBulkFlushInterval()); + builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled()); + config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType); + config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries); + config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay); + + // we must overwrite the default factory which is defined with a lambda because of a bug + // in shading lambda serialization shading see FLINK-18006 + if (config.getUsername().isPresent() + && config.getPassword().isPresent() + && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get()) + && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) { + builder.setRestClientFactory( + new AuthRestClientFactory( + config.getPathPrefix().orElse(null), + config.getUsername().get(), + config.getPassword().get())); + } else { + builder.setRestClientFactory( + new DefaultRestClientFactory(config.getPathPrefix().orElse(null))); + } + + final ElasticsearchSink sink = builder.build(); + + if (config.isDisableFlushOnCheckpoint()) { + sink.disableFlushOnCheckpoint(); + } + + return sink; + }; + } + + @Override + public DynamicTableSink copy() { + return this; + } + + @Override + public String asSummaryString() { + return "Elasticsearch7"; + } + + /** Serializable {@link RestClientFactory} used by the sink. */ + @VisibleForTesting + static class DefaultRestClientFactory implements RestClientFactory { + + private final String pathPrefix; + + public DefaultRestClientFactory(@Nullable String pathPrefix) { + this.pathPrefix = pathPrefix; + } + + @Override + public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { + if (pathPrefix != null) { + restClientBuilder.setPathPrefix(pathPrefix); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultRestClientFactory that = (DefaultRestClientFactory) o; + return Objects.equals(pathPrefix, that.pathPrefix); + } + + @Override + public int hashCode() { + return Objects.hash(pathPrefix); + } + } + + /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */ + @VisibleForTesting + static class AuthRestClientFactory implements RestClientFactory { + + private final String pathPrefix; + private final String username; + private final String password; + private transient CredentialsProvider credentialsProvider; + + public AuthRestClientFactory( + @Nullable String pathPrefix, String username, String password) { + this.pathPrefix = pathPrefix; + this.password = password; + this.username = username; + } + + @Override + public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { + if (pathPrefix != null) { + restClientBuilder.setPathPrefix(pathPrefix); + } + if (credentialsProvider == null) { + credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + } + restClientBuilder.setHttpClientConfigCallback( + httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider( + credentialsProvider)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AuthRestClientFactory that = (AuthRestClientFactory) o; + return Objects.equals(pathPrefix, that.pathPrefix) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(pathPrefix, password, username); + } + } + + /** + * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the + * sink. + */ + private static class Elasticsearch7RequestFactory implements RequestFactory { + + @Override + public UpdateRequest createUpdateRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new UpdateRequest(index, key) + .doc(document, contentType) + .upsert(document, contentType); + } + + @Override + public IndexRequest createIndexRequest( + String index, + String docType, + String key, + XContentType contentType, + byte[] document) { + return new IndexRequest(index).id(key).source(document, contentType); + } + + @Override + public DeleteRequest createDeleteRequest(String index, String docType, String key) { + return new DeleteRequest(index, key); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o; + return Objects.equals(format, that.format) + && Objects.equals(schema, that.schema) + && Objects.equals(config, that.config) + && Objects.equals(builderProvider, that.builderProvider); + } + + @Override + public int hashCode() { + return Objects.hash(format, schema, config, builderProvider); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java new file mode 100644 index 0000000000..df7bc111d0 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.elasticsearch7.table; + +import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.StringUtils; + +import java.time.ZoneId; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.*; + +/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. */ +@Internal +public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory { + + private static final String IDENTIFIER = "elasticsearch7-inlong"; + + private static final Set> requiredOptions = + Stream.of(HOSTS_OPTION, INDEX_OPTION).collect(Collectors.toSet()); + private static final Set> optionalOptions = + Stream.of( + KEY_DELIMITER_OPTION, + FAILURE_HANDLER_OPTION, + FLUSH_ON_CHECKPOINT_OPTION, + BULK_FLASH_MAX_SIZE_OPTION, + BULK_FLUSH_MAX_ACTIONS_OPTION, + BULK_FLUSH_INTERVAL_OPTION, + BULK_FLUSH_BACKOFF_TYPE_OPTION, + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, + BULK_FLUSH_BACKOFF_DELAY_OPTION, + CONNECTION_PATH_PREFIX, + FORMAT_OPTION, + PASSWORD_OPTION, + USERNAME_OPTION) + .collect(Collectors.toSet()); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + TableSchema tableSchema = context.getCatalogTable().getSchema(); + ElasticsearchValidationUtils.validatePrimaryKey(tableSchema); + + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + + final EncodingFormat> format = + helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION); + + helper.validate(); + Configuration configuration = new Configuration(); + context.getCatalogTable().getOptions().forEach(configuration::setString); + Elasticsearch7Configuration config = + new Elasticsearch7Configuration(configuration, context.getClassLoader()); + + validate(config, configuration); + + return new Elasticsearch7DynamicSink( + format, + config, + TableSchemaUtils.getPhysicalSchema(tableSchema), + getLocalTimeZoneId(context.getConfiguration())); + } + + ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) { + final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); + final ZoneId zoneId = + TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + + return zoneId; + } + + private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) { + config.getFailureHandler(); // checks if we can instantiate the custom failure handler + config.getHosts(); // validate hosts + validate( + config.getIndex().length() >= 1, + () -> String.format("'%s' must not be empty", INDEX_OPTION.key())); + int maxActions = config.getBulkFlushMaxActions(); + validate( + maxActions == -1 || maxActions >= 1, + () -> String.format( + "'%s' must be at least 1. Got: %s", + BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions)); + long maxSize = config.getBulkFlushMaxByteSize(); + long mb1 = 1024 * 1024; + validate( + maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0), + () -> String.format( + "'%s' must be in MB granularity. Got: %s", + BULK_FLASH_MAX_SIZE_OPTION.key(), + originalConfiguration + .get(BULK_FLASH_MAX_SIZE_OPTION) + .toHumanReadableString())); + validate( + config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true), + () -> String.format( + "'%s' must be at least 1. Got: %s", + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), + config.getBulkFlushBackoffRetries().get())); + if (config.getUsername().isPresent() + && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) { + validate( + config.getPassword().isPresent() + && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()), + () -> String.format( + "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'", + USERNAME_OPTION.key(), + PASSWORD_OPTION.key(), + config.getUsername().get(), + config.getPassword().orElse(""))); + } + } + + private static void validate(boolean condition, Supplier message) { + if (!condition) { + throw new ValidationException(message.get()); + } + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + return optionalOptions; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..eff9a60c12 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.inlong.sort.elasticsearch7.table.Elasticsearch7DynamicSinkFactory diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml index e4cb21591e..d1c5fd7954 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml @@ -35,6 +35,7 @@ jdbc elasticsearch-base elasticsearch6 + elasticsearch7 From cd8ad5b19860cdceec5789758f6eb813cfbef08c Mon Sep 17 00:00:00 2001 From: XiaoYou201 Date: Thu, 1 Aug 2024 16:24:08 +0800 Subject: [PATCH 2/2] [INLONG-10721][Sort] Update LICENSE --- licenses/inlong-sort-connectors/LICENSE | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 419af16967..8c7c90658b 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -960,14 +960,23 @@ Source : org.apache.flink:flink-connector-elasticsearch-base-3.0.1-1.17.jar (Pl License : https://github.com/apache/flink/blob/master/LICENSE inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java - inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java - inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java Source : org.apache.flink:flink-connector-elasticsearch6-3.0.1-1.17.jar (Please note that the software have been modified.) License : https://github.com/apache/flink/blob/master/LICENSE + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java + inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/elasticsearch7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java +Source : org.apache.flink:flink-connector-elasticsearch6-3.0.1-1.17.jar (Please note that the software have been modified.) +License : https://github.com/apache/flink/blob/master/LICENSE + ======================================================================= Apache InLong Subcomponents: