Skip to content

Commit

Permalink
Revert "Revert "Reactive sql client pool in thread local""
Browse files Browse the repository at this point in the history
This reverts commit 04663a7.
  • Loading branch information
Sanne committed Jun 24, 2020
1 parent aa9ad53 commit c880ceb
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig {
*/
@ConfigItem
public PfxConfiguration keyCertificatePfx;

/**
* Experimental: use one connection pool per thread.
*/
@ConfigItem
public Optional<Boolean> threadLocal;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
}
return DB2Pool.pool(vertx, connectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.db2.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalDB2Pool extends ThreadLocalPool<DB2Pool> implements DB2Pool {

private final DB2ConnectOptions db2ConnectOptions;

public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.db2ConnectOptions = db2ConnectOptions;
}

@Override
protected DB2Pool createThreadLocalPool() {
return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
}
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.mysql.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalMySQLPool extends ThreadLocalPool<MySQLPool> implements MySQLPool {

private final MySQLConnectOptions mySQLConnectOptions;

public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.mySQLConnectOptions = mySQLConnectOptions;
}

@Override
protected MySQLPool createThreadLocalPool() {
return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime
dataSourceReactivePostgreSQLConfig);
PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactivePostgreSQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions);
}
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.pg.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalPgPool extends ThreadLocalPool<PgPool> implements PgPool {

private final PgConnectOptions pgConnectOptions;

public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.pgConnectOptions = pgConnectOptions;
}

@Override
protected PgPool createThreadLocalPool() {
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}
}
10 changes: 10 additions & 0 deletions integration-tests/reactive-pg-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
Expand All @@ -51,6 +56,11 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<targetPath>../test-classes</targetPath>
</resource>
</resources>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.it.reactive.pg.client;

import java.util.concurrent.CompletionStage;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;

@Path("/hot-fruits")
public class HotReloadFruitResource {

@Inject
PgPool client;

@PostConstruct
void setupDb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute())
.await().indefinitely();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<JsonArray> listFruits() {
return client.query("SELECT * FROM fruits").execute()
.map(pgRowSet -> {
JsonArray jsonArray = new JsonArray();
for (Row row : pgRowSet) {
jsonArray.add(toJson(row));
}
return jsonArray;
})
.subscribeAsCompletionStage();
}

private JsonObject toJson(Row row) {
return new JsonObject()
.put("id", row.getLong("id"))
.put("name", row.getString("name"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.quarkus.it.reactive.pg.client;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.containsString;

import java.util.List;
import java.util.function.Function;
import java.util.logging.LogRecord;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusDevModeTest;

public class HotReloadTestCase {
@RegisterExtension
final static QuarkusDevModeTest TEST = new QuarkusDevModeTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HotReloadFruitResource.class)
.addAsResource("application-tl.properties", "application.properties"))
.setLogRecordPredicate(record -> {
return record.getLoggerName().startsWith("io.quarkus.reactive.datasource");
});

@AfterAll
public static void afterAll() {
List<LogRecord> records = TEST.getLogRecords();
Assertions.assertEquals(8, records.size());
// make sure that we closed all thread-local pools on reload and close
Assertions.assertEquals("Making pool for thread: %s", records.get(0).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(1).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(2).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(3).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(4).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(5).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(6).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(7).getMessage());
}

@Test
public void testAddNewFieldToEntity() {
checkRequest("Orange");
TEST.modifySourceFile(HotReloadFruitResource.class, new Function<String, String>() {
@Override
public String apply(String s) {
return s.replace("'Orange'", "'Strawberry'");
}
});
// trigger a pool hot reload by changing the config
TEST.modifyResourceFile("application.properties", new Function<String, String>() {
@Override
public String apply(String s) {
return s.replace("quarkus.datasource.reactive.thread-local=true",
"quarkus.datasource.reactive.thread-local = true");
}
});

checkRequest("Strawberry");
}

private void checkRequest(String fruit) {
given()
.when().get("/hot-fruits")
.then()
.statusCode(200)
.body(
containsString(fruit),
containsString("Pear"),
containsString("Apple"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=hibernate_orm_test
quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.thread-local=true
quarkus.log.category."io.quarkus.reactive.datasource".level=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
import java.util.List;
import java.util.ServiceLoader;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.stream.Stream;

import org.jboss.shrinkwrap.api.exporter.ExplodedExporter;
Expand Down Expand Up @@ -58,14 +62,18 @@
public class QuarkusDevModeTest
implements BeforeEachCallback, AfterEachCallback, TestInstanceFactory {

private static final Logger rootLogger;

static {
System.setProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager");
rootLogger = LogManager.getLogManager().getLogger("");
}

private DevModeMain devModeMain;
private Path deploymentDir;
private Supplier<JavaArchive> archiveProducer;
private String logFileName;
private InMemoryLogHandler inMemoryLogHandler = new InMemoryLogHandler((r) -> false);

private Path deploymentSourcePath;
private Path deploymentResourcePath;
Expand Down Expand Up @@ -97,6 +105,15 @@ public QuarkusDevModeTest setLogFileName(String logFileName) {
return this;
}

public QuarkusDevModeTest setLogRecordPredicate(Predicate<LogRecord> predicate) {
this.inMemoryLogHandler = new InMemoryLogHandler(predicate);
return this;
}

public List<LogRecord> getLogRecords() {
return inMemoryLogHandler.records;
}

public Object createTestInstance(TestInstanceFactoryContext factoryContext, ExtensionContext extensionContext)
throws TestInstantiationException {
try {
Expand All @@ -110,6 +127,7 @@ public Object createTestInstance(TestInstanceFactoryContext factoryContext, Exte

@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
rootLogger.addHandler(inMemoryLogHandler);
if (archiveProducer == null) {
throw new RuntimeException("QuarkusDevModeTest does not have archive producer set");
}
Expand Down Expand Up @@ -174,6 +192,7 @@ public void afterEach(ExtensionContext extensionContext) throws Exception {
FileUtil.deleteDirectory(deploymentDir);
}
}
rootLogger.removeHandler(inMemoryLogHandler);
}

private DevModeContext exportArchive(Path deploymentDir, Path testSourceDir) {
Expand Down

0 comments on commit c880ceb

Please sign in to comment.