diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 60174b886453..3bfd8a41f7b6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -18,16 +18,21 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutDiscoveryJoiner;
+import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutMulticastJoiner;
+import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutTcpIpJoiner;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
import com.hazelcast.internal.cluster.Joiner;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
+import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -45,15 +50,28 @@ public NodeExtension createNodeExtension(@NonNull Node node) {
@Override
public Joiner createJoiner(Node node) {
+
JoinConfig join =
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();
- if (join.getTcpIpConfig().isEnabled()) {
+ // update for seatunnel, lite member can not become master node
+ if (join.getMulticastConfig().isEnabled() && node.multicastService != null) {
+ log.info("Using LiteNodeDropOutMulticast Multicast discovery");
+ return new LiteNodeDropOutMulticastJoiner(node);
+ } else if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
+ } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
+ || isAnyAliasedConfigEnabled(join)
+ || join.isAutoDetectionEnabled()) {
+ log.info("Using LiteNodeDropOutDiscoveryJoiner Discovery SPI");
+ return new LiteNodeDropOutDiscoveryJoiner(node);
}
+ return null;
+ }
- return super.createJoiner(node);
+ private boolean isAnyAliasedConfigEnabled(JoinConfig join) {
+ return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
new file mode 100644
index 000000000000..a3ae66f372e0
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutDiscoveryJoiner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.impl.MemberImpl;
+import com.hazelcast.config.JoinConfig;
+import com.hazelcast.instance.EndpointQualifier;
+import com.hazelcast.instance.ProtocolType;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
+import com.hazelcast.internal.util.Preconditions;
+import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
+import com.hazelcast.internal.util.concurrent.IdleStrategy;
+import com.hazelcast.spi.discovery.DiscoveryNode;
+import com.hazelcast.spi.discovery.integration.DiscoveryService;
+import com.hazelcast.spi.properties.ClusterProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
+import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
+
+public class LiteNodeDropOutDiscoveryJoiner extends LiteNodeDropOutTcpIpJoiner {
+
+ private final DiscoveryService discoveryService;
+ private final boolean usePublicAddress;
+ private final IdleStrategy idleStrategy;
+ private final int maximumWaitingTimeBeforeJoinSeconds;
+
+ public LiteNodeDropOutDiscoveryJoiner(Node node) {
+ super(node);
+ this.idleStrategy =
+ new BackoffIdleStrategy(
+ 0L,
+ 0L,
+ TimeUnit.MILLISECONDS.toNanos(10L),
+ TimeUnit.MILLISECONDS.toNanos(500L));
+ this.maximumWaitingTimeBeforeJoinSeconds =
+ node.getProperties().getInteger(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN);
+ this.discoveryService = node.discoveryService;
+ this.usePublicAddress = usePublicAddress(node.getConfig().getNetworkConfig().getJoin());
+ }
+
+ private boolean usePublicAddress(JoinConfig join) {
+ return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
+ || allUsePublicAddress(
+ AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
+ }
+
+ protected Collection
getPossibleAddressesForInitialJoin() {
+ long deadLine =
+ System.nanoTime()
+ + TimeUnit.SECONDS.toNanos((long) this.maximumWaitingTimeBeforeJoinSeconds);
+
+ for (int i = 0; System.nanoTime() < deadLine; ++i) {
+ Collection possibleAddresses = this.getPossibleAddresses();
+ if (!possibleAddresses.isEmpty()) {
+ return possibleAddresses;
+ }
+
+ this.idleStrategy.idle((long) i);
+ }
+
+ return Collections.emptyList();
+ }
+
+ protected Collection getPossibleAddresses() {
+ Iterable discoveredNodes =
+ (Iterable)
+ Preconditions.checkNotNull(
+ this.discoveryService.discoverNodes(),
+ "Discovered nodes cannot be null!");
+ MemberImpl localMember = this.node.nodeEngine.getLocalMember();
+ Set localAddresses = this.node.getLocalAddressRegistry().getLocalAddresses();
+ Collection possibleMembers = new ArrayList();
+ Iterator var5 = discoveredNodes.iterator();
+
+ while (var5.hasNext()) {
+ DiscoveryNode discoveryNode = (DiscoveryNode) var5.next();
+ Address discoveredAddress =
+ this.usePublicAddress
+ ? discoveryNode.getPublicAddress()
+ : discoveryNode.getPrivateAddress();
+ if (localAddresses.contains(discoveredAddress)) {
+ if (!this.usePublicAddress && discoveryNode.getPublicAddress() != null) {
+ localMember
+ .getAddressMap()
+ .put(
+ EndpointQualifier.resolve(ProtocolType.CLIENT, "public"),
+ this.publicAddress(localMember, discoveryNode));
+ }
+ } else {
+ possibleMembers.add(discoveredAddress);
+ }
+ }
+
+ return possibleMembers;
+ }
+
+ private Address publicAddress(MemberImpl localMember, DiscoveryNode discoveryNode) {
+ if (localMember.getAddressMap().containsKey(EndpointQualifier.CLIENT)) {
+ try {
+ String publicHost = discoveryNode.getPublicAddress().getHost();
+ int clientPort =
+ ((Address) localMember.getAddressMap().get(EndpointQualifier.CLIENT))
+ .getPort();
+ return new Address(publicHost, clientPort);
+ } catch (Exception var5) {
+ Exception e = var5;
+ this.logger.fine(e);
+ }
+ }
+
+ return discoveryNode.getPublicAddress();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
new file mode 100644
index 000000000000..0cbe406d7570
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutMulticastJoiner.java
@@ -0,0 +1,150 @@
+package org.apache.seatunnel.engine.server.joiner;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.config.ConfigAccessor;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.cluster.impl.JoinRequest;
+import com.hazelcast.internal.cluster.impl.MulticastJoiner;
+import com.hazelcast.internal.util.Clock;
+import com.hazelcast.internal.util.RandomPicker;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class LiteNodeDropOutMulticastJoiner extends MulticastJoiner {
+
+ private static final long JOIN_RETRY_INTERVAL = 1000L;
+ private final AtomicInteger currentTryCount = new AtomicInteger(0);
+ private final AtomicInteger maxTryCount = new AtomicInteger(calculateTryCount());
+
+ public LiteNodeDropOutMulticastJoiner(Node node) {
+ super(node);
+ }
+
+ @Override
+ public void doJoin() {
+ long joinStartTime = Clock.currentTimeMillis();
+ long maxJoinMillis = getMaxJoinMillis();
+ Address thisAddress = node.getThisAddress();
+
+ while (shouldRetry() && (Clock.currentTimeMillis() - joinStartTime < maxJoinMillis)) {
+
+ // clear master node
+ clusterService.setMasterAddressToJoin(null);
+
+ Address masterAddress = getTargetAddress();
+ if (masterAddress == null) {
+ masterAddress = findMasterWithMulticast();
+ }
+ clusterService.setMasterAddressToJoin(masterAddress);
+
+ if (masterAddress == null || thisAddress.equals(masterAddress)) {
+ if (node.isLiteMember()) {
+ log.info("This node is lite member. No need to join to a master node.");
+ continue;
+ } else {
+ clusterJoinManager.setThisMemberAsMaster();
+ return;
+ }
+ }
+
+ logger.info("Trying to join to discovered node: " + masterAddress);
+ joinMaster();
+ }
+ }
+
+ private void joinMaster() {
+ long maxMasterJoinTime = getMaxJoinTimeToMasterNode();
+ long start = Clock.currentTimeMillis();
+
+ while (shouldRetry() && Clock.currentTimeMillis() - start < maxMasterJoinTime) {
+
+ Address master = clusterService.getMasterAddress();
+ if (master != null) {
+ if (logger.isFineEnabled()) {
+ logger.fine("Joining to master " + master);
+ }
+ clusterJoinManager.sendJoinRequest(master);
+ } else {
+ break;
+ }
+
+ try {
+ clusterService.blockOnJoin(JOIN_RETRY_INTERVAL);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (isBlacklisted(master)) {
+ clusterService.setMasterAddressToJoin(null);
+ return;
+ }
+ }
+ }
+
+ private Address findMasterWithMulticast() {
+ try {
+ if (this.logger.isFineEnabled()) {
+ this.logger.fine("Searching for master node. Max tries: " + maxTryCount.get());
+ }
+
+ JoinRequest joinRequest = this.node.createJoinRequest((Address) null);
+
+ while (this.node.isRunning()
+ && currentTryCount.incrementAndGet() <= maxTryCount.get()) {
+ joinRequest.setTryCount(currentTryCount.get());
+ this.node.multicastService.send(joinRequest);
+ Address masterAddress = this.clusterService.getMasterAddress();
+ if (masterAddress != null) {
+ Address var3 = masterAddress;
+ return var3;
+ }
+
+ Thread.sleep((long) this.getPublishInterval());
+ }
+
+ return null;
+ } catch (Exception var7) {
+ Exception e = var7;
+ if (this.logger != null) {
+ this.logger.warning(e);
+ }
+
+ return null;
+ } finally {
+ currentTryCount.set(0);
+ }
+ }
+
+ private int calculateTryCount() {
+ NetworkConfig networkConfig = ConfigAccessor.getActiveMemberNetworkConfig(this.config);
+ long timeoutMillis =
+ TimeUnit.SECONDS.toMillis(
+ (long)
+ networkConfig
+ .getJoin()
+ .getMulticastConfig()
+ .getMulticastTimeoutSeconds());
+ int avgPublishInterval = 125;
+ int tryCount = (int) timeoutMillis / avgPublishInterval;
+ String host = this.node.getThisAddress().getHost();
+
+ int lastDigits;
+ try {
+ lastDigits = Integer.parseInt(host.substring(host.lastIndexOf(46) + 1));
+ } catch (NumberFormatException var9) {
+ lastDigits = RandomPicker.getInt(512);
+ }
+
+ int portDiff = this.node.getThisAddress().getPort() - networkConfig.getPort();
+ tryCount += (lastDigits + portDiff) % 10;
+ return tryCount;
+ }
+
+ private int getPublishInterval() {
+ return RandomPicker.getInt(50, 200);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
similarity index 99%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
index 67aac64aca27..afb3eab795e2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/LiteNodeDropOutTcpIpJoiner.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/joiner/LiteNodeDropOutTcpIpJoiner.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server;
+package org.apache.seatunnel.engine.server.joiner;
import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;