From 288c585910d29fecaa654a05db26d13060323d89 Mon Sep 17 00:00:00 2001 From: benstone Date: Wed, 15 Mar 2023 00:32:00 +0800 Subject: [PATCH 1/2] support fatal tolerance and load balance with vert.x pool of mysql client --- .../mysql/client/ReactiveLoadBalanceTest.java | 29 ++++++++++ .../client/runtime/MySQLPoolRecorder.java | 54 +++++++++++++++++-- 2 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ReactiveLoadBalanceTest.java diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ReactiveLoadBalanceTest.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ReactiveLoadBalanceTest.java new file mode 100644 index 0000000000000..17a017c0b7066 --- /dev/null +++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ReactiveLoadBalanceTest.java @@ -0,0 +1,29 @@ +package io.quarkus.reactive.mysql.client; + +import org.hamcrest.Matchers; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusDevModeTest; +import io.restassured.RestAssured; + +public class ReactiveLoadBalanceTest { + + @RegisterExtension + public static final QuarkusDevModeTest test = new QuarkusDevModeTest() + .withApplicationRoot((jar) -> jar + .addClass(DevModeResource.class) + .add(new StringAsset("quarkus.datasource.db-kind=mysql\n" + + "quarkus.datasource.reactive.url=vertx-reactive:mysql:loadbalance://localhost:6033,localhost:6034,localhost:6035/load_balance_test"), + "application.properties")); + + @Test + public void testLoadBalance() { + RestAssured + .get("/dev/error") + .then() + .statusCode(200) + .body(Matchers.anyOf(Matchers.endsWith(":6033"), Matchers.endsWith(":6034"), Matchers.endsWith(":6035"))); + } +} diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java index 11921d864bfc4..0ff7ce1d43a02 100644 --- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java @@ -9,7 +9,10 @@ import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxKeyCertOptions; import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -66,22 +69,67 @@ private MySQLPool initialize(Vertx vertx, DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) { PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig); - MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig, - dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig); + + List mysqlConnectOptions = new ArrayList<>(); + if (dataSourceReactiveRuntimeConfig.url.isPresent()) { + String[] urls = toDataSourceUrls(dataSourceReactiveRuntimeConfig.url.get()); + if (urls != null) { + for (String url : urls) { + dataSourceReactiveRuntimeConfig.url = Optional.of(url); + mysqlConnectOptions.add(toMySQLConnectOptions(dataSourceRuntimeConfig, + dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig)); + } + } + } + if (mysqlConnectOptions.isEmpty()) { + mysqlConnectOptions.add(toMySQLConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + dataSourceReactiveMySQLConfig)); + } // Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with mysql. // and the client_name as tag. // See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and // io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName - mysqlConnectOptions.setMetricsName("mysql|" + dataSourceName); + mysqlConnectOptions.forEach(option -> option.setMetricsName("mysql|" + dataSourceName)); if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent()) { log.warn( "Configuration element 'thread-local' on Reactive datasource connections is deprecated and will be ignored. The started pool will always be based on a per-thread separate pool now."); } + return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions); } + private String[] toDataSourceUrls(String url) { + if (url.indexOf(',') < 0) { + return null; + } + + int hostBegin = url.indexOf("://"); + if (hostBegin < 0) { + return null; + } + hostBegin += 3; + String prefix = url.substring(0, hostBegin).replace(":loadbalance:", ":"); + + int portEnd = url.indexOf('/', hostBegin + 1); + if (portEnd < 0) { + return null; + } + String postfix = url.substring(portEnd); + + String[] servers = url.substring(hostBegin, portEnd).split(","); + if (servers.length == 1) { + return null; + } + + String[] urls = new String[servers.length]; + for (int i = 0; i < servers.length; i++) { + urls[i] = prefix + servers[i] + postfix; + } + return urls; + } + private PoolOptions toPoolOptions(Integer eventLoopCount, DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, From 189bc6dc9d837fbf662bf3aaf9e595d2e1f461a5 Mon Sep 17 00:00:00 2001 From: benstone Date: Wed, 15 Mar 2023 13:12:35 +0800 Subject: [PATCH 2/2] porting to quarkus 3 --- .../reactive/mysql/client/LocalhostMySQLPoolCreator.java | 2 +- .../MultipleDataSourcesAndMySQLPoolCreatorsTest.java | 8 ++++---- .../quarkus/reactive/mysql/client/MySQLPoolCreator.java | 4 +++- .../reactive/mysql/client/runtime/MySQLPoolRecorder.java | 8 ++++---- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/LocalhostMySQLPoolCreator.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/LocalhostMySQLPoolCreator.java index 71324a7b6d3b1..04ded5fa368f4 100644 --- a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/LocalhostMySQLPoolCreator.java +++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/LocalhostMySQLPoolCreator.java @@ -9,7 +9,7 @@ public class LocalhostMySQLPoolCreator implements MySQLPoolCreator { @Override public MySQLPool create(Input input) { - return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308), + return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308), input.poolOptions()); } } diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/MultipleDataSourcesAndMySQLPoolCreatorsTest.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/MultipleDataSourcesAndMySQLPoolCreatorsTest.java index 1ce79acd7e128..ee64bbce5acb3 100644 --- a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/MultipleDataSourcesAndMySQLPoolCreatorsTest.java +++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/MultipleDataSourcesAndMySQLPoolCreatorsTest.java @@ -85,8 +85,8 @@ public static class DefaultMySQLPoolCreator implements MySQLPoolCreator { @Override public MySQLPool create(Input input) { - assertEquals(12345, input.mySQLConnectOptions().getPort()); // validate that the bean has been called for the proper datasource - return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308), + assertEquals(12345, input.mySQLConnectOptions().get(0).getPort()); // validate that the bean has been called for the proper datasource + return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308), input.poolOptions()); } } @@ -97,8 +97,8 @@ public static class HibernateMySQLPoolCreator implements MySQLPoolCreator { @Override public MySQLPool create(Input input) { - assertEquals(55555, input.mySQLConnectOptions().getPort()); // validate that the bean has been called for the proper datasource - return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308), + assertEquals(55555, input.mySQLConnectOptions().get(0).getPort()); // validate that the bean has been called for the proper datasource + return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308), input.poolOptions()); } } diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/MySQLPoolCreator.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/MySQLPoolCreator.java index 43bfc3db99755..9bdeb3aca996d 100644 --- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/MySQLPoolCreator.java +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/MySQLPoolCreator.java @@ -1,5 +1,7 @@ package io.quarkus.reactive.mysql.client; +import java.util.List; + import io.quarkus.reactive.datasource.ReactiveDataSource; import io.vertx.core.Vertx; import io.vertx.mysqlclient.MySQLConnectOptions; @@ -25,6 +27,6 @@ interface Input { PoolOptions poolOptions(); - MySQLConnectOptions mySQLConnectOptions(); + List mySQLConnectOptions(); } } diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java index a9dbab10cb656..f287a08a37a40 100644 --- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java @@ -262,7 +262,7 @@ private MySQLConnectOptions toMySQLConnectOptions(DataSourceRuntimeConfig dataSo return mysqlConnectOptions; } - private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectOptions mySQLConnectOptions, + private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, List mySQLConnectOptions, String dataSourceName) { Instance instance; if (DataSourceUtil.isDefault(dataSourceName)) { @@ -281,9 +281,9 @@ private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectO private static class DefaultInput implements MySQLPoolCreator.Input { private final Vertx vertx; private final PoolOptions poolOptions; - private final MySQLConnectOptions mySQLConnectOptions; + private final List mySQLConnectOptions; - public DefaultInput(Vertx vertx, PoolOptions poolOptions, MySQLConnectOptions mySQLConnectOptions) { + public DefaultInput(Vertx vertx, PoolOptions poolOptions, List mySQLConnectOptions) { this.vertx = vertx; this.poolOptions = poolOptions; this.mySQLConnectOptions = mySQLConnectOptions; @@ -300,7 +300,7 @@ public PoolOptions poolOptions() { } @Override - public MySQLConnectOptions mySQLConnectOptions() { + public List mySQLConnectOptions() { return mySQLConnectOptions; } }