Skip to content

Commit

Permalink
A Vertx uses cached reflection to build the virtual thread factory in…
Browse files Browse the repository at this point in the history
… order to remain compatible with Java 8. When running in a native image Vert.x will assume that the runtime might not have virtual thread incorrectly.

Defer the virtual thread factory availability when a Vertx instance is created, in addition declares that virtual thread factory API should not be cached so the Vertx instance is able to build the virtual thread factory correctly.
  • Loading branch information
vietj committed Mar 27, 2024
1 parent 8db4ac8 commit eddce4e
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/impl/DeploymentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private Future<Deployment> doDeploy(String identifier,
workerPool = vertx.createSharedWorkerPool(options.getWorkerPoolName(), options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit());
}
} else {
if (!VertxInternal.isVirtualThreadAvailable()) {
if (!vertx.isVirtualThreadAvailable()) {
return callingContext.failedFuture("This Java runtime does not support virtual threads");
}
}
Expand Down
43 changes: 23 additions & 20 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);

public static final ThreadFactory VIRTUAL_THREAD_FACTORY;
private static final Throwable VIRTUAL_THREAD_FACTORY_UNAVAILABILITY_CAUSE;

static {
// Disable Netty's resource leak detection to reduce the performance overhead if not set by user
// Supports both the default netty leak detection system property and the deprecated one
if (System.getProperty("io.netty.leakDetection.level") == null &&
System.getProperty("io.netty.leakDetectionLevel") == null) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
}

ThreadFactory factory = null;
Throwable unavailabilityCause = null;
// Not cached for graalvm
private static ThreadFactory virtualThreadFactory() {
try {
Class<?> builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder");
Class<?> ofVirtualClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder$OfVirtual");
Expand All @@ -118,12 +107,19 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
Method nameMethod = ofVirtualClass.getDeclaredMethod("name", String.class, long.class);
Method factoryMethod = builderClass.getDeclaredMethod("factory");
builder = nameMethod.invoke(builder, "vert.x-virtual-thread-", 0L);
factory = (ThreadFactory) factoryMethod.invoke(builder);
return (ThreadFactory) factoryMethod.invoke(builder);
} catch (Exception e) {
unavailabilityCause = e;
return null;
}
}

static {
// Disable Netty's resource leak detection to reduce the performance overhead if not set by user
// Supports both the default netty leak detection system property and the deprecated one
if (System.getProperty("io.netty.leakDetection.level") == null &&
System.getProperty("io.netty.leakDetectionLevel") == null) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
}
VIRTUAL_THREAD_FACTORY = factory;
VIRTUAL_THREAD_FACTORY_UNAVAILABILITY_CAUSE = unavailabilityCause;
}

private final FileSystem fileSystem = getFileSystem();
Expand Down Expand Up @@ -193,6 +189,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize);
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null;

ThreadFactory virtualThreadFactory = virtualThreadFactory();

contextLocalsLength = LocalSeq.get();
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
Expand All @@ -202,8 +200,8 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
// under a lot of load
acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100);
virtualThreadExecutor = VIRTUAL_THREAD_FACTORY != null ? new ThreadPerTaskExecutorService(VIRTUAL_THREAD_FACTORY) : null;
virtualThreaWorkerPool = new WorkerPool(virtualThreadExecutor, null);
virtualThreadExecutor = virtualThreadFactory != null ? new ThreadPerTaskExecutorService(virtualThreadFactory) : null;
virtualThreaWorkerPool = virtualThreadFactory != null ? new WorkerPool(virtualThreadExecutor, null) : null;
internalWorkerPool = new WorkerPool(internalWorkerExec, internalBlockingPoolMetrics);
namedWorkerPools = new HashMap<>();
workerPool = new WorkerPool(workerExec, workerPoolMetrics);
Expand Down Expand Up @@ -594,7 +592,7 @@ public ContextImpl createWorkerContext() {
}

private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl) {
if (!VertxInternal.isVirtualThreadAvailable()) {
if (!isVirtualThreadAvailable()) {
throw new IllegalStateException("This Java runtime does not support virtual threads");
}
TaskQueue orderedTasks = new TaskQueue();
Expand Down Expand Up @@ -1306,6 +1304,11 @@ public void removeCloseHook(Closeable hook) {
closeFuture.remove(hook);
}

@Override
public boolean isVirtualThreadAvailable() {
return virtualThreadExecutor != null;
}

private CloseFuture resolveCloseFuture() {
ContextInternal context = getContext();
return context != null ? context.closeFuture() : closeFuture;
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,5 @@ default <T> Future<T> executeBlockingInternal(Callable<T> blockingCodeHandler, b
/**
* @return whether virtual threads are available
*/
static boolean isVirtualThreadAvailable() {
return VertxImpl.VIRTUAL_THREAD_FACTORY != null;
}
boolean isVirtualThreadAvailable();
}
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/impl/VertxWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,11 @@ public void removeCloseHook(Closeable hook) {
delegate.removeCloseHook(hook);
}

@Override
public boolean isVirtualThreadAvailable() {
return delegate.isVirtualThreadAvailable();
}

@Override
public boolean isMetricsEnabled() {
return delegate.isMetricsEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"name": "java.lang.Thread$Builder",
"condition": {
"typeReachable": "io.vertx.core.impl.VertxImpl"
},
"methods": [
{
"name": "factory",
"parameterTypes": []
}
]
},
{
"name": "java.lang.Thread$Builder$OfVirtual",
"condition": {
"typeReachable": "io.vertx.core.impl.VertxImpl"
},
"methods": [
{
"name": "name",
"parameterTypes": ["java.lang.String", "long"]
}
]
},
{
"name": "java.lang.Thread",
"condition": {
"typeReachable": "io.vertx.core.impl.VertxImpl"
},
"methods": [
{
"name": "ofVirtual",
"parameterTypes": []
}
]
}
]
4 changes: 2 additions & 2 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ public void testAwaitFromWorkerThread() {

@Test
public void testAwaitFromVirtualThreadThread() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
testAwaitFromContextThread(ThreadingModel.VIRTUAL_THREAD, false);
}

Expand All @@ -1055,7 +1055,7 @@ public void start() {

@Test
public void testInterruptThreadOnAwait() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() {
Expand Down
20 changes: 10 additions & 10 deletions src/test/java/io/vertx/core/VirtualThreadContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void setUp() throws Exception {

@Test
public void testContext() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.createVirtualThreadContext().runOnContext(v -> {
Thread thread = Thread.currentThread();
assertTrue(VirtualThreadDeploymentTest.isVirtual(thread));
Expand All @@ -54,7 +54,7 @@ public void testContext() {

@Test
public void testAwaitFutureSuccess() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
Object result = new Object();
vertx.createVirtualThreadContext().runOnContext(v -> {
ContextInternal context = vertx.getOrCreateContext();
Expand All @@ -74,7 +74,7 @@ public void testAwaitFutureSuccess() {

@Test
public void testAwaitFutureFailure() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
Exception failure = new Exception();
vertx.createVirtualThreadContext().runOnContext(v -> {
ContextInternal context = vertx.getOrCreateContext();
Expand All @@ -100,7 +100,7 @@ public void testAwaitFutureFailure() {

@Test
public void testAwaitCompoundFuture() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
Object result = new Object();
vertx.createVirtualThreadContext().runOnContext(v -> {
ContextInternal context = vertx.getOrCreateContext();
Expand All @@ -120,7 +120,7 @@ public void testAwaitCompoundFuture() {

@Test
public void testDuplicateUseSameThread() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
int num = 1000;
waitFor(num);
vertx.createVirtualThreadContext().runOnContext(v -> {
Expand All @@ -139,7 +139,7 @@ public void testDuplicateUseSameThread() {

@Test
public void testDuplicateConcurrentAwait() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
int num = 1000;
waitFor(num);
vertx.createVirtualThreadContext().runOnContext(v -> {
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testDuplicateConcurrentAwait() {

@Test
public void testTimer() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.createVirtualThreadContext().runOnContext(v -> {
ContextInternal context = vertx.getOrCreateContext();
PromiseInternal<String> promise = context.promise();
Expand All @@ -189,7 +189,7 @@ public void testTimer() {

@Test
public void testInThread() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.createVirtualThreadContext().runOnContext(v1 -> {
ContextInternal context = vertx.getOrCreateContext();
assertTrue(context.inThread());
Expand Down Expand Up @@ -218,7 +218,7 @@ private void sleep(AtomicInteger inflight) {

@Test
public void testSerializeBlocking() throws Exception {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
AtomicInteger inflight = new AtomicInteger();
vertx.createVirtualThreadContext().runOnContext(v1 -> {
Context ctx = vertx.getOrCreateContext();
Expand All @@ -232,7 +232,7 @@ public void testSerializeBlocking() throws Exception {

@Test
public void testVirtualThreadsNotAvailable() {
Assume.assumeFalse(VertxInternal.isVirtualThreadAvailable());
Assume.assumeFalse(isVirtualThreadAvailable());
try {
vertx.createVirtualThreadContext();
fail();
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/vertx/core/VirtualThreadDeploymentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static boolean isVirtual(Thread th) {

@Test
public void testDeploy() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
Expand All @@ -67,7 +67,7 @@ public void start() {

@Test
public void testExecuteBlocking() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
Expand All @@ -85,7 +85,7 @@ public void start() {

@Test
public void testDeployHTTPServer() throws Exception {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
AtomicInteger inflight = new AtomicInteger();
AtomicBoolean processing = new AtomicBoolean();
AtomicInteger max = new AtomicInteger();
Expand Down Expand Up @@ -126,7 +126,7 @@ public void start() {

@Test
public void testVirtualThreadsNotAvailable() {
Assume.assumeFalse(VertxInternal.isVirtualThreadAvailable());
Assume.assumeFalse(isVirtualThreadAvailable());
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void setUp() throws Exception {

@Test
public void testEventBus() {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
EventBus eb = vertx.eventBus();
eb.consumer("test-addr", msg -> {
msg.reply(msg.body());
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/vertx/core/http/VirtualThreadHttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void setUp() throws Exception {

@Test
public void testHttpClient1() throws Exception {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
req.response().end("Hello World");
Expand All @@ -57,7 +57,7 @@ public void testHttpClient1() throws Exception {

@Test
public void testHttpClient2() throws Exception {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
waitFor(100);
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
Expand Down Expand Up @@ -90,7 +90,7 @@ public void testHttpClient2() throws Exception {

@Test
public void testHttpClientTimeout() throws Exception {
Assume.assumeTrue(VertxInternal.isVirtualThreadAvailable());
Assume.assumeTrue(isVirtualThreadAvailable());
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
});
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/io/vertx/test/core/VertxTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package io.vertx.test.core;

import io.vertx.core.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.*;
Expand Down Expand Up @@ -113,6 +114,10 @@ protected void close(List<Vertx> instances) throws Exception {
Assert.assertTrue(latch.await(180, TimeUnit.SECONDS));
}

protected final boolean isVirtualThreadAvailable() {
return ((VertxInternal)vertx).isVirtualThreadAvailable();
}

/**
* @return create a blank new Vert.x instance with no options closed when tear down executes.
*/
Expand Down

0 comments on commit eddce4e

Please sign in to comment.