Skip to content

Commit

Permalink
optimize: support instance registration to the registry center (#7089)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Jan 3, 2025
1 parent 61f420b commit 5b31862
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 92 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6828](https://github.com/apache/incubator-seata/pull/6828)] spring boot compatible with file.conf and registry.conf
- [[#7012](https://github.com/apache/incubator-seata/pull/7012)] When the number of primary keys exceeds 1000, use union to concatenate the SQL
- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null
- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] support instance registration to the registry center
- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21

### security:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [[#6828](https://github.com/apache/incubator-seata/pull/6828)] seata-spring-boot-starter兼容file.conf和registry.conf
- [[#7012](https://github.com/apache/incubator-seata/pull/7012)] 当主键超过1000个时,使用union拼接sql,可以使用索引
- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待
- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] 新增instance注册到注册中心的接口
- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试

### security:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata.namingserver;
package org.apache.seata.common.metadata;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.seata.common.util.CollectionUtils.mapToJsonString;


public class Instance {
private String namespace;
private String clusterName;
private String unit;
private Node.Endpoint control = new Node.Endpoint();
private Node.Endpoint transaction = new Node.Endpoint();
private Node.Endpoint control;
private Node.Endpoint transaction;
private double weight = 1.0;
private boolean healthy = true;
private long term;
Expand Down Expand Up @@ -169,25 +164,6 @@ public String toJsonString(ObjectMapper objectMapper) {
}
}


public Map<String, String> toMap() {
Map<String, String> resultMap = new HashMap<>();


resultMap.put("namespace", namespace);
resultMap.put("clusterName", clusterName);
resultMap.put("unit", unit);
resultMap.put("control", control.toString());
resultMap.put("transaction", transaction.toString());
resultMap.put("weight", String.valueOf(weight));
resultMap.put("healthy", String.valueOf(healthy));
resultMap.put("term", String.valueOf(term));
resultMap.put("timestamp", String.valueOf(timestamp));
resultMap.put("metadata", mapToJsonString(metadata));

return resultMap;
}

private static class SingletonHolder {
private static final Instance SERVER_INSTANCE = new Instance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
Expand Down Expand Up @@ -170,9 +172,13 @@ public void initChannel(SocketChannel ch) {
try {
this.serverBootstrap.bind(port).sync();
LOGGER.info("Server started, service listen port: {}", getListenPort());
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
Instance instance = Instance.getInstance();
// Lines 177-180 are just for compatibility with test cases
if (instance.getTransaction() == null) {
Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty"));
}
for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
registryService.register(address);
registryService.register(Instance.getInstance());
}
initialized.set(true);
} catch (SocketException se) {
Expand All @@ -189,9 +195,8 @@ public void shutdown() {
LOGGER.info("Shutting server down, the listen port: {}", XID.getPort());
}
if (initialized.get()) {
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService registryService : MultiRegistryFactory.getInstances()) {
registryService.unregister(address);
registryService.unregister(Instance.getInstance());
registryService.close();
}
//wait a few seconds for server transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

Expand Down Expand Up @@ -61,16 +62,42 @@ public interface RegistryService<T> {
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void register(InetSocketAddress address) throws Exception;

/**
* Register.
*
* @param instance the address
* @throws Exception the exception
*/
default void register(Instance instance) throws Exception {
InetSocketAddress inetSocketAddress =
new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
register(inetSocketAddress);
}

/**
* Unregister.
*
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void unregister(InetSocketAddress address) throws Exception;

/**
* Unregister.
*
* @param instance the instance
* @throws Exception the exception
*/
default void unregister(Instance instance) throws Exception {
InetSocketAddress inetSocketAddress =
new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
unregister(inetSocketAddress);
}

/**
* Subscribe.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
Expand Down Expand Up @@ -148,9 +147,11 @@ static NamingserverRegistryServiceImpl getInstance() {

@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
register(Instance.getInstance());
}

@Override
public void register(Instance instance) throws Exception {
instance.setTimestamp(System.currentTimeMillis());
doRegister(instance, getNamingAddrs());
}
Expand Down Expand Up @@ -198,11 +199,15 @@ public boolean doHealthCheck(String url) {
}
}



@Override
public void unregister(InetSocketAddress inetSocketAddress) {
unregister(Instance.getInstance());
}

@Override
public void unregister(InetSocketAddress address) {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
public void unregister(Instance instance) {
for (String urlSuffix : getNamingAddrs()) {
String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?";
String unit = instance.getUnit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.NumberUtils;
Expand Down Expand Up @@ -78,6 +80,7 @@ public static void start(int port) {
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(port);
// init snowflake for transactionId, branchId
Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(),XID.getPort(),"netty"));
UUIDGenerator.init(1L);

MockCoordinator coordinator = MockCoordinator.getInstance();
Expand Down
10 changes: 4 additions & 6 deletions server/src/main/java/org/apache/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.instance.ServerInstance;
import org.apache.seata.server.instance.ServerInstanceFactory;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.metrics.MetricsManager;
import org.apache.seata.server.session.SessionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
Expand All @@ -52,10 +51,9 @@
*/
@Component("seataServer")
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

@Resource
ServerInstance serverInstance;
ServerInstanceFactory serverInstanceFactory;

/**
* The entry point of application.
Expand Down Expand Up @@ -106,7 +104,7 @@ public void start(String[] args) {
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

serverInstance.serverInstanceInit();
serverInstanceFactory.serverInstanceInit();
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
nettyRemotingServer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.seata.server.controller;

import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.result.Result;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.seata.server.instance;

import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.server.Server;
import org.apache.seata.server.ServerRunner;
Expand Down Expand Up @@ -48,8 +48,8 @@
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;


@Component("serverInstance")
public class ServerInstance {
@Component("serverInstanceFactory")
public class ServerInstanceFactory {
@Resource
private RegistryProperties registryProperties;

Expand All @@ -61,53 +61,58 @@ public class ServerInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

public void serverInstanceInit() {
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true));
ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName));
}
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
ConfigurableEnvironment environment =
(ConfigurableEnvironment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(XID.getIpAddress(),
Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>)propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()),
enumerablePropertySource.getProperty(propertyName));
}
}
}
}
instance.setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty"));
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
// load vgroup mapping relationship
instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups());

EXECUTOR_SERVICE =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("heartbeat-namingserver", 1, true));
EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
vGroupMappingStoreManager.notifyMapping();
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS);
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(),
TimeUnit.MILLISECONDS);
ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown);
}
}
Expand Down
Loading

0 comments on commit 5b31862

Please sign in to comment.