Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: support instance registration to the registry center #7089

Merged
merged 19 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6828](https://github.com/apache/incubator-seata/pull/6828)] spring boot compatible with file.conf and registry.conf
- [[#7012](https://github.com/apache/incubator-seata/pull/7012)] When the number of primary keys exceeds 1000, use union to concatenate the SQL
- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null
- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] support instance registration to the registry center
- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21

### security:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [[#6828](https://github.com/apache/incubator-seata/pull/6828)] seata-spring-boot-starter兼容file.conf和registry.conf
- [[#7012](https://github.com/apache/incubator-seata/pull/7012)] 当主键超过1000个时,使用union拼接sql,可以使用索引
- [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待
- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] 新增instance注册到注册中心的接口
- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试

### security:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata.namingserver;
package org.apache.seata.common.metadata;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.seata.common.util.CollectionUtils.mapToJsonString;


public class Instance {
private String namespace;
private String clusterName;
private String unit;
private Node.Endpoint control = new Node.Endpoint();
private Node.Endpoint transaction = new Node.Endpoint();
private Node.Endpoint control;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the specific meaning of Node.Endpoint control?

private Node.Endpoint transaction;
private double weight = 1.0;
private boolean healthy = true;
private long term;
Expand Down Expand Up @@ -169,25 +164,6 @@ public String toJsonString(ObjectMapper objectMapper) {
}
}


public Map<String, String> toMap() {
Map<String, String> resultMap = new HashMap<>();


resultMap.put("namespace", namespace);
resultMap.put("clusterName", clusterName);
resultMap.put("unit", unit);
resultMap.put("control", control.toString());
resultMap.put("transaction", transaction.toString());
resultMap.put("weight", String.valueOf(weight));
resultMap.put("healthy", String.valueOf(healthy));
resultMap.put("term", String.valueOf(term));
resultMap.put("timestamp", String.valueOf(timestamp));
resultMap.put("metadata", mapToJsonString(metadata));

return resultMap;
}

private static class SingletonHolder {
private static final Instance SERVER_INSTANCE = new Instance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.rpc.RemotingBootstrap;
Expand Down Expand Up @@ -170,9 +172,13 @@ public void initChannel(SocketChannel ch) {
try {
this.serverBootstrap.bind(port).sync();
LOGGER.info("Server started, service listen port: {}", getListenPort());
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
Instance instance = Instance.getInstance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense for Instance to be designed as a singleton.

// Lines 177-180 are just for compatibility with test cases
if (instance.getTransaction() == null) {
Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty"));
}
for (RegistryService<?> registryService : MultiRegistryFactory.getInstances()) {
registryService.register(address);
registryService.register(Instance.getInstance());
}
initialized.set(true);
} catch (SocketException se) {
Expand All @@ -189,9 +195,8 @@ public void shutdown() {
LOGGER.info("Shutting server down, the listen port: {}", XID.getPort());
}
if (initialized.get()) {
InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
for (RegistryService registryService : MultiRegistryFactory.getInstances()) {
registryService.unregister(address);
registryService.unregister(Instance.getInstance());
registryService.close();
}
//wait a few seconds for server transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.seata.discovery.registry;

import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.config.ConfigurationFactory;

Expand Down Expand Up @@ -61,16 +62,42 @@ public interface RegistryService<T> {
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void register(InetSocketAddress address) throws Exception;

/**
* Register.
*
* @param instance the address
* @throws Exception the exception
*/
default void register(Instance instance) throws Exception {
InetSocketAddress inetSocketAddress =
new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
register(inetSocketAddress);
}

/**
* Unregister.
*
* @param address the address
* @throws Exception the exception
*/
@Deprecated
void unregister(InetSocketAddress address) throws Exception;

/**
* Unregister.
*
* @param instance the instance
* @throws Exception the exception
*/
default void unregister(Instance instance) throws Exception {
InetSocketAddress inetSocketAddress =
new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort());
unregister(inetSocketAddress);
}

/**
* Subscribe.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
Expand Down Expand Up @@ -148,9 +147,11 @@ static NamingserverRegistryServiceImpl getInstance() {

@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
register(Instance.getInstance());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IP address should be checked to ensure that it is not a loopback address.

}

@Override
public void register(Instance instance) throws Exception {
instance.setTimestamp(System.currentTimeMillis());
doRegister(instance, getNamingAddrs());
}
Expand Down Expand Up @@ -198,11 +199,15 @@ public boolean doHealthCheck(String url) {
}
}



@Override
public void unregister(InetSocketAddress inetSocketAddress) {
unregister(Instance.getInstance());
}

@Override
public void unregister(InetSocketAddress address) {
NetUtil.validAddress(address);
Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty"));
public void unregister(Instance instance) {
for (String urlSuffix : getNamingAddrs()) {
String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?";
String unit = instance.getUnit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.seata.common.XID;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.NumberUtils;
Expand Down Expand Up @@ -78,6 +80,7 @@ public static void start(int port) {
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(port);
// init snowflake for transactionId, branchId
Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(),XID.getPort(),"netty"));
UUIDGenerator.init(1L);

MockCoordinator coordinator = MockCoordinator.getInstance();
Expand Down
10 changes: 4 additions & 6 deletions server/src/main/java/org/apache/seata/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.instance.ServerInstance;
import org.apache.seata.server.instance.ServerInstanceFactory;
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.metrics.MetricsManager;
import org.apache.seata.server.session.SessionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
Expand All @@ -52,10 +51,9 @@
*/
@Component("seataServer")
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

@Resource
ServerInstance serverInstance;
ServerInstanceFactory serverInstanceFactory;

/**
* The entry point of application.
Expand Down Expand Up @@ -106,7 +104,7 @@ public void start(String[] args) {
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

serverInstance.serverInstanceInit();
serverInstanceFactory.serverInstanceInit();
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
nettyRemotingServer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.seata.server.controller;

import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.result.Result;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.seata.server.instance;

import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.Instance;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.server.Server;
import org.apache.seata.server.ServerRunner;
Expand Down Expand Up @@ -48,8 +48,8 @@
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;


@Component("serverInstance")
public class ServerInstance {
@Component("serverInstanceFactory")
public class ServerInstanceFactory {
@Resource
private RegistryProperties registryProperties;

Expand All @@ -61,53 +61,58 @@ public class ServerInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);

public void serverInstanceInit() {
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true));
ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName));
}
VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
ConfigurableEnvironment environment =
(ConfigurableEnvironment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);

// load node properties
Instance instance = Instance.getInstance();
// load namespace
String namespace = registryNamingServerProperties.getNamespace();
instance.setNamespace(namespace);
// load cluster name
String clusterName = registryNamingServerProperties.getCluster();
instance.setClusterName(clusterName);

// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");

// load unit name
instance.setUnit(String.valueOf(UUID.randomUUID()));

instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(XID.getIpAddress(),
Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (propertySource instanceof EnumerablePropertySource) {
EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>)propertySource;
for (String propertyName : enumerablePropertySource.getPropertyNames()) {
if (propertyName.startsWith(META_PREFIX)) {
instance.addMetadata(propertyName.substring(META_PREFIX.length()),
enumerablePropertySource.getProperty(propertyName));
}
}
}
}
instance.setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty"));
if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) {
// load vgroup mapping relationship
instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups());

EXECUTOR_SERVICE =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("heartbeat-namingserver", 1, true));
EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
vGroupMappingStoreManager.notifyMapping();
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS);
}, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(),
TimeUnit.MILLISECONDS);
ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown);
}
}
Expand Down
Loading
Loading