Skip to content

Commit

Permalink
Support Seata AT integration under Proxy Native in GraalVM Native Image
Browse files Browse the repository at this point in the history
  • Loading branch information
linghengqian committed Dec 3, 2024
1 parent ace8013 commit 50fe891
Show file tree
Hide file tree
Showing 15 changed files with 664 additions and 271 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
1. Doc: Adds documentation for ClickHouse support - [#33779](https://github.com/apache/shardingsphere/pull/33779)
1. Doc: Removes use of `iceberg.mr.schema.auto.conversion` from documentation due to HIVE-26507 - [#33828](https://github.com/apache/shardingsphere/pull/33828)
1. Kernel: Bump the minimum Seata Client version for Seata AT integration to 2.2.0 - [#33872](https://github.com/apache/shardingsphere/pull/33872)
1. Proxy Native: Support Seata AT integration under Proxy Native in GraalVM Native Image - [#33889](https://github.com/apache/shardingsphere/pull/33889)

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,19 @@ config {
}
```

### 在业务项目创建 ShardingSphere 配置文件
### 在业务项目添加 JDBC Driver 和创建 ShardingSphere 配置文件

在业务项目引入前提条件涉及的依赖后,在业务项目的 classpath 上编写 ShardingSphere 数据源的配置文件`demo.yaml`
在业务项目引入前提条件涉及的依赖后,额外添加 MySQL JDBC Driver 的 Maven 依赖,

```xml
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>9.1.0</version>
</dependency>
```

在业务项目的 classpath 上编写 ShardingSphere 数据源的配置文件`demo.yaml`

```yaml
dataSources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,20 @@ config {
}
```

### Create ShardingSphere configuration file in business project
### Add JDBC Driver to the business project and create ShardingSphere configuration file

After the business project introduces the dependencies involved in the prerequisites,
write the ShardingSphere data source configuration file `demo.yaml` on the classpath of the business project,
add the Maven dependency of MySQL JDBC Driver.

```xml
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>9.1.0</version>
</dependency>
```

Write the ShardingSphere data source configuration file `demo.yaml` on the classpath of the business project.

```yaml
dataSources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
"condition":{"typeReachable":"org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource"},
"interfaces":["org.apache.hive.service.rpc.thrift.TCLIService$Iface"]
},
{
"condition":{"typeReachable":"org.apache.shardingsphere.proxy.initializer.BootstrapInitializer"},
"interfaces":["org.apache.seata.config.Configuration"]
},
{
"condition":{"typeReachable":"org.apache.shardingsphere.transaction.base.seata.at.SeataATShardingSphereTransactionManager"},
"interfaces":["org.apache.seata.config.Configuration"]
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.transaction.base.seata.at;

import lombok.SneakyThrows;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.FileConfiguration;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.exception.TransactionException;
Expand Down Expand Up @@ -156,6 +157,7 @@ public void close() {
SeataTransactionHolder.clear();
RmNettyRemotingClient.getInstance().destroy();
TmNettyRemotingClient.getInstance().destroy();
ConfigurationFactory.reload();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ service {
vgroupMapping.default_tx_group = "default"
default.grouplist = "127.0.0.1:8891"
}
# TODO This is not a reasonable configuration, just affected by https://github.com/apache/incubator-seata/issues/7042 .
client {
rm {
tableMetaCheckEnable = "false"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ static void afterAll() {
System.clearProperty(SYSTEM_PROP_KEY_PREFIX + "ds2.jdbc-url");
}

/**
* TODO The {@code shardingsphere-parser-sql-clickhouse} module needs to be fixed to use SQL like `create table`,
* `truncate table` and `drop table`.
*/
@Test
void assertShardingInLocalTransactions() throws SQLException {
jdbcUrlPrefix = "jdbc:ch://localhost:" + CONTAINER.getMappedPort(8123) + "/";
Expand Down Expand Up @@ -136,6 +132,8 @@ private DataSource createDataSource() throws SQLException {
/**
* ClickHouse does not support `AUTO_INCREMENT`,
* refer to <a href="https://github.com/ClickHouse/ClickHouse/issues/56228">ClickHouse/ClickHouse#56228</a> .
* TODO The {@code shardingsphere-parser-sql-clickhouse} module needs to be fixed to use SQL like `create table`,
* `truncate table` and `drop table`.
*
* @param databaseName database name
* @throws RuntimeException SQL exception
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.test.natived.proxy.transactions.base;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.http.HttpStatus;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.shardingsphere.test.natived.commons.TestShardingService;
import org.apache.shardingsphere.test.natived.commons.proxy.ProxyTestingServer;
import org.apache.shardingsphere.transaction.base.seata.at.SeataTransactionHolder;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;

import javax.sql.DataSource;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Properties;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

@SuppressWarnings({"SqlNoDataSourceInspection", "resource"})
@Testcontainers
public class SeataTest {

@Container
public static final GenericContainer<?> CONTAINER = new GenericContainer<>("apache/seata-server:2.2.0")
.withExposedPorts(7091, 8091)
.waitingFor(Wait.forHttp("/health").forPort(7091).forStatusCode(HttpStatus.SC_OK).forResponsePredicate("ok"::equals));

@Container
public static final PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:17.2-bookworm")
.withCopyFileToContainer(
MountableFile.forHostPath(Paths.get("src/test/resources/test-native/sh/postgres.sh").toAbsolutePath()),
"/docker-entrypoint-initdb.d/postgres.sh");

private static final String SERVICE_DEFAULT_GROUP_LIST_KEY = "service.default.grouplist";

private static ProxyTestingServer proxyTestingServer;

private TestShardingService testShardingService;

@BeforeAll
static void beforeAll() {
assertThat(System.getProperty(SERVICE_DEFAULT_GROUP_LIST_KEY), is(nullValue()));
System.setProperty(SERVICE_DEFAULT_GROUP_LIST_KEY, "127.0.0.1:" + CONTAINER.getMappedPort(8091));
Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(() -> {
openConnection("test", "test", "jdbc:postgresql://127.0.0.1:" + POSTGRES_CONTAINER.getMappedPort(5432) + "/")
.close();
return true;
});
String absolutePath = Paths.get("src/test/resources/test-native/yaml/proxy/transactions/base").toAbsolutePath().toString();
proxyTestingServer = new ProxyTestingServer(absolutePath);
Awaitility.await().atMost(Duration.ofSeconds(30L)).ignoreExceptions().until(() -> {
openConnection("root", "root", "jdbc:postgresql://127.0.0.1:" + proxyTestingServer.getProxyPort() + "/postgres").close();
return true;
});
}

/**
* TODO Facing the same issue with {@code org.apache.shardingsphere.test.natived.jdbc.transactions.base.SeataTest},
* {@link org.apache.shardingsphere.transaction.base.seata.at.SeataATShardingSphereTransactionManager#close()} was never called.
*/
@AfterAll
static void afterAll() {
SeataTransactionHolder.clear();
RmNettyRemotingClient.getInstance().destroy();
TmNettyRemotingClient.getInstance().destroy();
ConfigurationFactory.reload();
proxyTestingServer.close();
System.clearProperty(SERVICE_DEFAULT_GROUP_LIST_KEY);
}

/**
* {@link groovy.lang.Closure} related classes are not available on GraalVM Native Image.
* This CLASS_BASE algorithm class is designed to emulate INLINE's {@code ds_${user_id % 2}}.
* See <a href="https://github.com/oracle/graal/issues/5522">oracle/graal#5522</a> .
*
* @throws SQLException SQL Exception
*/
@Test
void assertShardingInLocalTransactions() throws SQLException {
try (
Connection connection = openConnection("root", "root", "jdbc:postgresql://127.0.0.1:" + proxyTestingServer.getProxyPort() + "/postgres");
Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE sharding_db");
}
try (
Connection connection = openConnection("root", "root", "jdbc:postgresql://127.0.0.1:" + proxyTestingServer.getProxyPort() + "/sharding_db");
Statement statement = connection.createStatement()) {
statement.execute("REGISTER STORAGE UNIT ds_0 (\n"
+ " URL=\"jdbc:postgresql://127.0.0.1:" + POSTGRES_CONTAINER.getMappedPort(5432) + "/demo_ds_0\",\n"
+ " USER=\"test\",\n"
+ " PASSWORD=\"test\"\n"
+ "),ds_1 (\n"
+ " URL=\"jdbc:postgresql://127.0.0.1:" + POSTGRES_CONTAINER.getMappedPort(5432) + "/demo_ds_1\",\n"
+ " USER=\"test\",\n"
+ " PASSWORD=\"test\"\n"
+ "),ds_2 (\n"
+ " URL=\"jdbc:postgresql://127.0.0.1:" + POSTGRES_CONTAINER.getMappedPort(5432) + "/demo_ds_2\",\n"
+ " USER=\"test\",\n"
+ " PASSWORD=\"test\"\n"
+ ")");
statement.execute("CREATE DEFAULT SHARDING DATABASE STRATEGY (\n"
+ " TYPE=\"standard\", \n"
+ " SHARDING_COLUMN=user_id, \n"
+ " SHARDING_ALGORITHM(\n"
+ " TYPE(\n"
+ " NAME=CLASS_BASED, \n"
+ " PROPERTIES(\n"
+ " \"strategy\"=\"STANDARD\",\n"
+ " \"algorithmClassName\"=\"org.apache.shardingsphere.test.natived.commons.algorithm.ClassBasedInlineShardingAlgorithmFixture\"\n"
+ " )\n"
+ " )\n"
+ " )\n"
+ ")");
statement.execute("CREATE SHARDING TABLE RULE t_order (\n"
+ " DATANODES(\"<LITERAL>ds_0.t_order, ds_1.t_order, ds_2.t_order\"),\n"
+ " KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME=\"SNOWFLAKE\"))\n"
+ "), t_order_item (\n"
+ " DATANODES(\"<LITERAL>ds_0.t_order_item, ds_1.t_order_item, ds_2.t_order_item\"),\n"
+ " KEY_GENERATE_STRATEGY(COLUMN=order_item_id,TYPE(NAME=\"SNOWFLAKE\"))\n"
+ ")");
statement.execute("CREATE BROADCAST TABLE RULE t_address");
}
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.postgresql.Driver");
config.setJdbcUrl("jdbc:postgresql://127.0.0.1:" + proxyTestingServer.getProxyPort() + "/sharding_db");
config.setUsername("root");
config.setPassword("root");
DataSource dataSource = new HikariDataSource(config);
testShardingService = new TestShardingService(dataSource);
initEnvironment();
testShardingService.processSuccess();
testShardingService.cleanEnvironment();
}

private void initEnvironment() throws SQLException {
testShardingService.getOrderRepository().createTableIfNotExistsInPostgres();
testShardingService.getOrderItemRepository().createTableIfNotExistsInPostgres();
testShardingService.getAddressRepository().createTableIfNotExistsInMySQL();
testShardingService.getOrderRepository().truncateTable();
testShardingService.getOrderItemRepository().truncateTable();
testShardingService.getAddressRepository().truncateTable();
}

private static Connection openConnection(final String username, final String password, final String jdbcUrl) throws SQLException {
Properties props = new Properties();
props.setProperty("user", username);
props.setProperty("password", password);
return DriverManager.getConnection(jdbcUrl, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

# TODO This file exists to address https://github.com/grpc/grpc-java/issues/10601 .
# TODO This file exists to address https://github.com/oracle/graalvm-reachability-metadata/issues/377 .
Args=--initialize-at-run-time=\
io.grpc.netty.shaded.io.netty.channel.ChannelHandlerMask,\
io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,13 @@
"allDeclaredConstructors": true,
"allDeclaredMethods": true,
"allPublicMethods": true
},
{
"condition":{"typeReachable":"org.apache.shardingsphere.test.natived.proxy.transactions.base.SeataTest"},
"name":"org.apache.shardingsphere.test.natived.proxy.transactions.base.SeataTest",
"allDeclaredFields": true,
"allDeclaredConstructors": true,
"allDeclaredMethods": true,
"allPublicMethods": true
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
service {
vgroupMapping.default_tx_group = "default"
}
# TODO This is not a reasonable configuration, just affected by https://github.com/apache/incubator-seata/pull/6661. Pending investigation.
# TODO This is not a reasonable configuration, just affected by https://github.com/apache/incubator-seata/issues/7042 .
client {
rm {
tableMetaCheckEnable = "false"
Expand Down
54 changes: 54 additions & 0 deletions test/native/src/test/resources/test-native/sh/postgres.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
CREATE DATABASE demo_ds_0;
CREATE DATABASE demo_ds_1;
CREATE DATABASE demo_ds_2;
EOSQL

for i in "demo_ds_0" "demo_ds_1" "demo_ds_2"
do
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$i" <<-EOSQL
CREATE TABLE IF NOT EXISTS public.undo_log
(
id SERIAL NOT NULL,
branch_id BIGINT NOT NULL,
xid VARCHAR(128) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info BYTEA NOT NULL,
log_status INT NOT NULL,
log_created TIMESTAMP(0) NOT NULL,
log_modified TIMESTAMP(0) NOT NULL,
CONSTRAINT pk_undo_log PRIMARY KEY (id),
CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);
CREATE INDEX ix_log_created ON undo_log(log_created);
COMMENT ON TABLE public.undo_log IS 'AT transaction mode undo table';
COMMENT ON COLUMN public.undo_log.branch_id IS 'branch transaction id';
COMMENT ON COLUMN public.undo_log.xid IS 'global transaction id';
COMMENT ON COLUMN public.undo_log.context IS 'undo_log context,such as serialization';
COMMENT ON COLUMN public.undo_log.rollback_info IS 'rollback info';
COMMENT ON COLUMN public.undo_log.log_status IS '0:normal status,1:defense status';
COMMENT ON COLUMN public.undo_log.log_created IS 'create datetime';
COMMENT ON COLUMN public.undo_log.log_modified IS 'modify datetime';
CREATE SEQUENCE IF NOT EXISTS undo_log_id_seq INCREMENT BY 1 MINVALUE 1 ;
EOSQL
done
Loading

0 comments on commit 50fe891

Please sign in to comment.