Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support fault tolerance and load balance with vert.x pool of reactive mysql client #31852

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class LocalhostMySQLPoolCreator implements MySQLPoolCreator {

@Override
public MySQLPool create(Input input) {
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308),
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308),
input.poolOptions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public static class DefaultMySQLPoolCreator implements MySQLPoolCreator {

@Override
public MySQLPool create(Input input) {
assertEquals(12345, input.mySQLConnectOptions().getPort()); // validate that the bean has been called for the proper datasource
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308),
assertEquals(12345, input.mySQLConnectOptions().get(0).getPort()); // validate that the bean has been called for the proper datasource
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308),
input.poolOptions());
}
}
Expand All @@ -97,8 +97,8 @@ public static class HibernateMySQLPoolCreator implements MySQLPoolCreator {

@Override
public MySQLPool create(Input input) {
assertEquals(55555, input.mySQLConnectOptions().getPort()); // validate that the bean has been called for the proper datasource
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().setHost("localhost").setPort(3308),
assertEquals(55555, input.mySQLConnectOptions().get(0).getPort()); // validate that the bean has been called for the proper datasource
return MySQLPool.pool(input.vertx(), input.mySQLConnectOptions().get(0).setHost("localhost").setPort(3308),
input.poolOptions());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.reactive.mysql.client;

import org.hamcrest.Matchers;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusDevModeTest;
import io.restassured.RestAssured;

public class ReactiveLoadBalanceTest {

@RegisterExtension
public static final QuarkusDevModeTest test = new QuarkusDevModeTest()
.withApplicationRoot((jar) -> jar
.addClass(DevModeResource.class)
.add(new StringAsset("quarkus.datasource.db-kind=mysql\n" +
"quarkus.datasource.reactive.url=vertx-reactive:mysql:loadbalance://localhost:6033,localhost:6034,localhost:6035/load_balance_test"),
"application.properties"));

@Test
public void testLoadBalance() {
RestAssured
.get("/dev/error")
.then()
.statusCode(200)
.body(Matchers.anyOf(Matchers.endsWith(":6033"), Matchers.endsWith(":6034"), Matchers.endsWith(":6035")));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkus.reactive.mysql.client;

import java.util.List;

import io.quarkus.reactive.datasource.ReactiveDataSource;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.MySQLConnectOptions;
Expand All @@ -25,6 +27,6 @@ interface Input {

PoolOptions poolOptions();

MySQLConnectOptions mySQLConnectOptions();
List<MySQLConnectOptions> mySQLConnectOptions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxKeyCertOptions;
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -72,18 +75,62 @@ private MySQLPool initialize(Vertx vertx,
DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) {
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);

List<MySQLConnectOptions> mysqlConnectOptions = new ArrayList<>();
if (dataSourceReactiveRuntimeConfig.url.isPresent()) {
String[] urls = toDataSourceUrls(dataSourceReactiveRuntimeConfig.url.get());
if (urls != null) {
for (String url : urls) {
dataSourceReactiveRuntimeConfig.url = Optional.of(url);
mysqlConnectOptions.add(toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig));
}
}
}
if (mysqlConnectOptions.isEmpty()) {
mysqlConnectOptions.add(toMySQLConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveMySQLConfig));
}

// Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with mysql.
// and the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
mysqlConnectOptions.setMetricsName("mysql|" + dataSourceName);
mysqlConnectOptions.forEach(option -> option.setMetricsName("mysql|" + dataSourceName));

return createPool(vertx, poolOptions, mysqlConnectOptions, dataSourceName);
}

private String[] toDataSourceUrls(String url) {
if (url.indexOf(',') < 0) {
return null;
}

int hostBegin = url.indexOf("://");
if (hostBegin < 0) {
return null;
}
hostBegin += 3;
String prefix = url.substring(0, hostBegin).replace(":loadbalance:", ":");

int portEnd = url.indexOf('/', hostBegin + 1);
if (portEnd < 0) {
return null;
}
String postfix = url.substring(portEnd);

String[] servers = url.substring(hostBegin, portEnd).split(",");
if (servers.length == 1) {
return null;
}

String[] urls = new String[servers.length];
for (int i = 0; i < servers.length; i++) {
urls[i] = prefix + servers[i] + postfix;
}
return urls;
}

private PoolOptions toPoolOptions(Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
Expand Down Expand Up @@ -215,7 +262,7 @@ private MySQLConnectOptions toMySQLConnectOptions(DataSourceRuntimeConfig dataSo
return mysqlConnectOptions;
}

private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectOptions mySQLConnectOptions,
private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, List<MySQLConnectOptions> mySQLConnectOptions,
String dataSourceName) {
Instance<MySQLPoolCreator> instance;
if (DataSourceUtil.isDefault(dataSourceName)) {
Expand All @@ -234,9 +281,9 @@ private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectO
private static class DefaultInput implements MySQLPoolCreator.Input {
private final Vertx vertx;
private final PoolOptions poolOptions;
private final MySQLConnectOptions mySQLConnectOptions;
private final List<MySQLConnectOptions> mySQLConnectOptions;

public DefaultInput(Vertx vertx, PoolOptions poolOptions, MySQLConnectOptions mySQLConnectOptions) {
public DefaultInput(Vertx vertx, PoolOptions poolOptions, List<MySQLConnectOptions> mySQLConnectOptions) {
this.vertx = vertx;
this.poolOptions = poolOptions;
this.mySQLConnectOptions = mySQLConnectOptions;
Expand All @@ -253,7 +300,7 @@ public PoolOptions poolOptions() {
}

@Override
public MySQLConnectOptions mySQLConnectOptions() {
public List<MySQLConnectOptions> mySQLConnectOptions() {
return mySQLConnectOptions;
}
}
Expand Down