Skip to content

Commit

Permalink
[improve] improve hazelcast joiner, lite node can't be election as ma…
Browse files Browse the repository at this point in the history
…ster
  • Loading branch information
liunaijie committed Dec 10, 2024
1 parent 2f3c54c commit a24af69
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
package org.apache.seatunnel.engine.server;

import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutDiscoveryJoiner;
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutMulticastJoiner;
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutTcpIpJoiner;

import com.hazelcast.config.JoinConfig;
import com.hazelcast.instance.impl.DefaultNodeContext;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeExtension;
import com.hazelcast.internal.cluster.Joiner;
import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;

@Slf4j
public class SeaTunnelNodeContext extends DefaultNodeContext {
Expand All @@ -45,15 +50,28 @@ public NodeExtension createNodeExtension(@NonNull Node node) {

@Override
public Joiner createJoiner(Node node) {

JoinConfig join =
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
join.verify();

if (join.getTcpIpConfig().isEnabled()) {
// update for seatunnel, lite member can not become master node
if (join.getMulticastConfig().isEnabled() && node.multicastService != null) {
log.info("Using LiteNodeDropOutMulticast Multicast discovery");
return new LiteNodeDropOutMulticastJoiner(node);
} else if (join.getTcpIpConfig().isEnabled()) {
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
return new LiteNodeDropOutTcpIpJoiner(node);
} else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
|| isAnyAliasedConfigEnabled(join)
|| join.isAutoDetectionEnabled()) {
log.info("Using LiteNodeDropOutDiscoveryJoiner Discovery SPI");
return new LiteNodeDropOutDiscoveryJoiner(node);
}
return null;
}

return super.createJoiner(node);
private boolean isAnyAliasedConfigEnabled(JoinConfig join) {
return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.joiner;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.ProtocolType;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.spi.discovery.integration.DiscoveryService;
import com.hazelcast.spi.properties.ClusterProperty;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;

public class LiteNodeDropOutDiscoveryJoiner extends LiteNodeDropOutTcpIpJoiner {

private final DiscoveryService discoveryService;
private final boolean usePublicAddress;
private final IdleStrategy idleStrategy;
private final int maximumWaitingTimeBeforeJoinSeconds;

public LiteNodeDropOutDiscoveryJoiner(Node node) {
super(node);
this.idleStrategy =
new BackoffIdleStrategy(
0L,
0L,
TimeUnit.MILLISECONDS.toNanos(10L),
TimeUnit.MILLISECONDS.toNanos(500L));
this.maximumWaitingTimeBeforeJoinSeconds =
node.getProperties().getInteger(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN);
this.discoveryService = node.discoveryService;
this.usePublicAddress = usePublicAddress(node.getConfig().getNetworkConfig().getJoin());
}

private boolean usePublicAddress(JoinConfig join) {
return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
|| allUsePublicAddress(
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
}

protected Collection<Address> getPossibleAddressesForInitialJoin() {
long deadLine =
System.nanoTime()
+ TimeUnit.SECONDS.toNanos((long) this.maximumWaitingTimeBeforeJoinSeconds);

for (int i = 0; System.nanoTime() < deadLine; ++i) {
Collection<Address> possibleAddresses = this.getPossibleAddresses();
if (!possibleAddresses.isEmpty()) {
return possibleAddresses;
}

this.idleStrategy.idle((long) i);
}

return Collections.emptyList();
}

protected Collection<Address> getPossibleAddresses() {
Iterable<DiscoveryNode> discoveredNodes =
(Iterable)
Preconditions.checkNotNull(
this.discoveryService.discoverNodes(),
"Discovered nodes cannot be null!");
MemberImpl localMember = this.node.nodeEngine.getLocalMember();
Set<Address> localAddresses = this.node.getLocalAddressRegistry().getLocalAddresses();
Collection<Address> possibleMembers = new ArrayList();
Iterator var5 = discoveredNodes.iterator();

while (var5.hasNext()) {
DiscoveryNode discoveryNode = (DiscoveryNode) var5.next();
Address discoveredAddress =
this.usePublicAddress
? discoveryNode.getPublicAddress()
: discoveryNode.getPrivateAddress();
if (localAddresses.contains(discoveredAddress)) {
if (!this.usePublicAddress && discoveryNode.getPublicAddress() != null) {
localMember
.getAddressMap()
.put(
EndpointQualifier.resolve(ProtocolType.CLIENT, "public"),
this.publicAddress(localMember, discoveryNode));
}
} else {
possibleMembers.add(discoveredAddress);
}
}

return possibleMembers;
}

private Address publicAddress(MemberImpl localMember, DiscoveryNode discoveryNode) {
if (localMember.getAddressMap().containsKey(EndpointQualifier.CLIENT)) {
try {
String publicHost = discoveryNode.getPublicAddress().getHost();
int clientPort =
((Address) localMember.getAddressMap().get(EndpointQualifier.CLIENT))
.getPort();
return new Address(publicHost, clientPort);
} catch (Exception var5) {
Exception e = var5;
this.logger.fine(e);
}
}

return discoveryNode.getPublicAddress();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.joiner;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.ConfigAccessor;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.cluster.impl.JoinRequest;
import com.hazelcast.internal.cluster.impl.MulticastJoiner;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.RandomPicker;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class LiteNodeDropOutMulticastJoiner extends MulticastJoiner {

private static final long JOIN_RETRY_INTERVAL = 1000L;
private final AtomicInteger currentTryCount = new AtomicInteger(0);
private final AtomicInteger maxTryCount = new AtomicInteger(calculateTryCount());

public LiteNodeDropOutMulticastJoiner(Node node) {
super(node);
}

@Override
public void doJoin() {
long joinStartTime = Clock.currentTimeMillis();
long maxJoinMillis = getMaxJoinMillis();
Address thisAddress = node.getThisAddress();

while (shouldRetry() && (Clock.currentTimeMillis() - joinStartTime < maxJoinMillis)) {

// clear master node
clusterService.setMasterAddressToJoin(null);

Address masterAddress = getTargetAddress();
if (masterAddress == null) {
masterAddress = findMasterWithMulticast();
}
clusterService.setMasterAddressToJoin(masterAddress);

if (masterAddress == null || thisAddress.equals(masterAddress)) {
if (node.isLiteMember()) {
log.info("This node is lite member. No need to join to a master node.");
continue;
} else {
clusterJoinManager.setThisMemberAsMaster();
return;
}
}

logger.info("Trying to join to discovered node: " + masterAddress);
joinMaster();
}
}

private void joinMaster() {
long maxMasterJoinTime = getMaxJoinTimeToMasterNode();
long start = Clock.currentTimeMillis();

while (shouldRetry() && Clock.currentTimeMillis() - start < maxMasterJoinTime) {

Address master = clusterService.getMasterAddress();
if (master != null) {
if (logger.isFineEnabled()) {
logger.fine("Joining to master " + master);
}
clusterJoinManager.sendJoinRequest(master);
} else {
break;
}

try {
clusterService.blockOnJoin(JOIN_RETRY_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

if (isBlacklisted(master)) {
clusterService.setMasterAddressToJoin(null);
return;
}
}
}

private Address findMasterWithMulticast() {
try {
if (this.logger.isFineEnabled()) {
this.logger.fine("Searching for master node. Max tries: " + maxTryCount.get());
}

JoinRequest joinRequest = this.node.createJoinRequest((Address) null);

while (this.node.isRunning()
&& currentTryCount.incrementAndGet() <= maxTryCount.get()) {
joinRequest.setTryCount(currentTryCount.get());
this.node.multicastService.send(joinRequest);
Address masterAddress = this.clusterService.getMasterAddress();
if (masterAddress != null) {
Address var3 = masterAddress;
return var3;
}

Thread.sleep((long) this.getPublishInterval());
}

return null;
} catch (Exception var7) {
Exception e = var7;
if (this.logger != null) {
this.logger.warning(e);
}

return null;
} finally {
currentTryCount.set(0);
}
}

private int calculateTryCount() {
NetworkConfig networkConfig = ConfigAccessor.getActiveMemberNetworkConfig(this.config);
long timeoutMillis =
TimeUnit.SECONDS.toMillis(
(long)
networkConfig
.getJoin()
.getMulticastConfig()
.getMulticastTimeoutSeconds());
int avgPublishInterval = 125;
int tryCount = (int) timeoutMillis / avgPublishInterval;
String host = this.node.getThisAddress().getHost();

int lastDigits;
try {
lastDigits = Integer.parseInt(host.substring(host.lastIndexOf(46) + 1));
} catch (NumberFormatException var9) {
lastDigits = RandomPicker.getInt(512);
}

int portDiff = this.node.getThisAddress().getPort() - networkConfig.getPort();
tryCount += (lastDigits + portDiff) % 10;
return tryCount;
}

private int getPublishInterval() {
return RandomPicker.getInt(50, 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.engine.server;
package org.apache.seatunnel.engine.server.joiner;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
Expand Down

0 comments on commit a24af69

Please sign in to comment.