From 05026d81e3e6de25657f46f70c52a5d578a087d3 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 31 Oct 2024 13:56:39 +0100 Subject: [PATCH] Quartz: introduce Nonconcurrent --- .../quartz/deployment/QuartzProcessor.java | 15 ++- .../test/NonconcurrentJobDefinitionTest.java | 70 ++++++++++++++ .../test/NonconcurrentProgrammaticTest.java | 91 +++++++++++++++++++ .../quartz/test/NonconcurrentTest.java | 57 ++++++++++++ .../programmatic/ProgrammaticJobsTest.java | 6 +- .../java/io/quarkus/quartz/Nonconcurrent.java | 29 ++++++ .../io/quarkus/quartz/QuartzScheduler.java | 14 +++ .../quartz/runtime/QuartzRecorder.java | 5 +- .../quartz/runtime/QuartzSchedulerImpl.java | 82 +++++++++++++---- .../quarkus/quartz/runtime/QuartzSupport.java | 17 +++- .../java/io/quarkus/scheduler/Scheduler.java | 36 ++++---- .../common/runtime/AbstractJobDefinition.java | 57 ++++++------ .../programmatic/ProgrammaticJobsTest.java | 6 +- .../scheduler/runtime/CompositeScheduler.java | 8 +- .../scheduler/runtime/SimpleScheduler.java | 4 +- 15 files changed, 415 insertions(+), 82 deletions(-) create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java diff --git a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java index 81e27392847049..f22717d541d125 100644 --- a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java +++ b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java @@ -10,6 +10,7 @@ import java.util.Optional; import java.util.Set; import java.util.logging.Level; +import java.util.stream.Collectors; import jakarta.inject.Singleton; @@ -55,6 +56,7 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; +import io.quarkus.quartz.Nonconcurrent; import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider; import io.quarkus.quartz.runtime.QuartzBuildTimeConfig; import io.quarkus.quartz.runtime.QuartzExtensionPointConfig; @@ -69,6 +71,7 @@ import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate; import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.deployment.ScheduledBusinessMethodItem; import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem; public class QuartzProcessor { @@ -79,6 +82,7 @@ public class QuartzProcessor { private static final DotName DELEGATE_HSQLDB = DotName.createSimple(QuarkusHSQLDBDelegate.class.getName()); private static final DotName DELEGATE_MSSQL = DotName.createSimple(QuarkusMSSQLDelegate.class.getName()); private static final DotName DELEGATE_STDJDBC = DotName.createSimple(QuarkusStdJDBCDelegate.class.getName()); + private static final DotName NONCONCURRENT = DotName.createSimple(Nonconcurrent.class); @BuildStep FeatureBuildItem feature() { @@ -313,12 +317,17 @@ public void start(BuildProducer serviceStart, @Record(RUNTIME_INIT) public void quartzSupportBean(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, QuartzRecorder recorder, - BuildProducer syntheticBeanBuildItemBuildProducer, - QuartzJDBCDriverDialectBuildItem driverDialect) { + QuartzJDBCDriverDialectBuildItem driverDialect, + List scheduledMethods, + BuildProducer syntheticBeanBuildItemBuildProducer) { syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem.configure(QuartzSupport.class) .scope(Singleton.class) // this should be @ApplicationScoped but it fails for some reason .setRuntimeInit() - .supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver())).done()); + .supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver(), + scheduledMethods.stream().filter(m -> m.getMethod().hasAnnotation(NONCONCURRENT)) + .map(m -> m.getMethod().declaringClass().name().toString() + "_" + m.getMethod().name()) + .collect(Collectors.toSet()))) + .done()); } } diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java new file mode 100644 index 00000000000000..e996f56089ba63 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java @@ -0,0 +1,70 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentJobDefinitionTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .overrideConfigKey("quarkus.scheduler.start-mode", "forced") + .overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread", + "true"); + + @Inject + QuartzScheduler scheduler; + + @Test + public void testExecution() throws InterruptedException { + scheduler.newJob("foo") + .setTask(se -> { + Jobs.NONCONCURRENT_COUNTER.incrementAndGet(); + try { + if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (Jobs.NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + Jobs.NONCONCURRENT_LATCH.countDown(); + } + }) + .setInterval("1s") + .setNonconcurrent() + .schedule(); + + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + static class Jobs { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + } + +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java new file mode 100644 index 00000000000000..5ebeb934053c7e --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java @@ -0,0 +1,91 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentProgrammaticTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .overrideConfigKey("quarkus.scheduler.start-mode", "halted"); + + @Inject + QuartzScheduler scheduler; + + @Test + public void testExecution() throws SchedulerException, InterruptedException { + JobDetail job = JobBuilder.newJob(Jobs.class) + .withIdentity("foo", Scheduler.class.getName()) + .build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity("foo", Scheduler.class.getName()) + .startNow() + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(1) + .repeatForever()) + .build(); + scheduler.getScheduler().scheduleJob(job, trigger); + + scheduler.resume(); + + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + @DisallowConcurrentExecution + static class Jobs implements Job { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + Jobs.NONCONCURRENT_COUNTER.incrementAndGet(); + try { + if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (Jobs.NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + Jobs.NONCONCURRENT_LATCH.countDown(); + } + } + + } + +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java new file mode 100644 index 00000000000000..a414e551b5c1c9 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java @@ -0,0 +1,57 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.Nonconcurrent; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread", + "true"); + + @Test + public void testExecution() throws InterruptedException { + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + static class Jobs { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Nonconcurrent + @Scheduled(identity = "foo", every = "1s") + void nonconcurrent() throws InterruptedException { + NONCONCURRENT_COUNTER.incrementAndGet(); + if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + if (NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + NONCONCURRENT_LATCH.countDown(); + } + } + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java index d2f5e62a5a55e7..aa027694004b93 100644 --- a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java @@ -69,7 +69,7 @@ public void testJobs() throws InterruptedException { .setSkipPredicate(AlwaysSkipPredicate.class) .schedule(); - Scheduler.JobDefinition job1 = scheduler.newJob("foo") + Scheduler.JobDefinition job1 = scheduler.newJob("foo") .setInterval("1s") .setTask(ec -> { assertTrue(Arc.container().requestContext().isActive()); @@ -79,7 +79,7 @@ public void testJobs() throws InterruptedException { assertEquals("Sync task was already set", assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage()); - Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); + Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); assertEquals("Either sync or async task must be set", assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage()); job2.setTask(ec -> { @@ -117,7 +117,7 @@ public void testJobs() throws InterruptedException { @Test public void testAsyncJob() throws InterruptedException, SchedulerException { String identity = "fooAsync"; - JobDefinition asyncJob = scheduler.newJob(identity) + JobDefinition asyncJob = scheduler.newJob(identity) .setInterval("1s") .setAsyncTask(ec -> { assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext()); diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java new file mode 100644 index 00000000000000..9a7ea35ebcd067 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java @@ -0,0 +1,29 @@ +package io.quarkus.quartz; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.SkippedExecution; + +/** + * Annotated scheduled method may not be executed concurrently. The behavior is identical to a {@link Job} class annotated with + * {@link DisallowConcurrentExecution}. Keep in mind that this annotation can be only used if + * {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to {@code true}. + *

+ * Unlike with {@link Scheduled.ConcurrentExecution#SKIP} the {@link SkippedExecution} event is never fired if a method + * execution is skipped by Quartz. + * + * @see DisallowConcurrentExecution + */ +@Target(METHOD) +@Retention(RUNTIME) +public @interface Nonconcurrent { + +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java index 395a6de8369a4a..60c30ab3d7292e 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java @@ -13,4 +13,18 @@ public interface QuartzScheduler extends Scheduler { */ org.quartz.Scheduler getScheduler(); + @Override + QuartzJobDefinition newJob(String identity); + + interface QuartzJobDefinition extends JobDefinition { + + /** + * + * @return self + * @see Nonconcurrent + */ + QuartzJobDefinition setNonconcurrent(); + + } + } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java index 9a1bd26cae449a..7ea820528fcd6b 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java @@ -1,6 +1,7 @@ package io.quarkus.quartz.runtime; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import io.quarkus.runtime.annotations.Recorder; @@ -9,11 +10,11 @@ public class QuartzRecorder { public Supplier quartzSupportSupplier(QuartzRuntimeConfig runtimeConfig, - QuartzBuildTimeConfig buildTimeConfig, Optional driverDialect) { + QuartzBuildTimeConfig buildTimeConfig, Optional driverDialect, Set nonconcurrentMethods) { return new Supplier() { @Override public QuartzSupport get() { - return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect); + return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect, nonconcurrentMethods); } }; } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index 8e6d0c620ace5d..cbf01783884ce5 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -40,6 +40,7 @@ import org.jboss.logging.Logger; import org.quartz.CronScheduleBuilder; +import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; @@ -60,6 +61,7 @@ import org.quartz.spi.TriggerFiredBundle; import io.quarkus.arc.Subclass; +import io.quarkus.quartz.Nonconcurrent; import io.quarkus.quartz.QuartzScheduler; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.DelayedExecution; @@ -248,7 +250,8 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { invoker.isBlocking() && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); - JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName()); + JobDetail jobDetail = createJobBuilder(identity, method.getInvokerClassName(), + quartzSupport.isNonconcurrent(method)).build(); Optional> triggerBuilder = createTrigger(identity, scheduled, runtimeConfig, jobDetail); @@ -478,12 +481,12 @@ public Trigger getScheduledJob(String identity) { } @Override - public JobDefinition newJob(String identity) { + public QuartzJobDefinition newJob(String identity) { Objects.requireNonNull(identity); if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); } - return new QuartzJobDefinition(identity); + return new QuartzJobDefinitionImpl(identity); } @Override @@ -583,13 +586,15 @@ private Properties getSchedulerConfigurationProperties(QuartzSupport quartzSuppo props.put(StdSchedulerFactory.PROP_SCHED_RMI_PROXY, "false"); props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, buildTimeConfig.storeType.clazz); + // The org.quartz.jobStore.misfireThreshold can be used for all supported job stores + props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", + "" + runtimeConfig.misfireThreshold.toMillis()); + if (buildTimeConfig.storeType.isDbStore()) { String dataSource = buildTimeConfig.dataSourceName.orElse("QUARKUS_QUARTZ_DEFAULT_DATASOURCE"); QuarkusQuartzConnectionPoolProvider.setDataSourceName(dataSource); boolean serializeJobData = buildTimeConfig.serializeJobData.orElse(false); props.put(StdSchedulerFactory.PROP_JOB_STORE_USE_PROP, serializeJobData ? "false" : "true"); - props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", - "" + runtimeConfig.misfireThreshold.toMillis()); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".tablePrefix", buildTimeConfig.tablePrefix); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".dataSource", dataSource); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".driverDelegateClass", @@ -688,13 +693,15 @@ StartMode initStartMode(SchedulerRuntimeConfig schedulerRuntimeConfig, QuartzRun } } - private JobDetail createJobDetail(String identity, String invokerClassName) { - return JobBuilder.newJob(InvokerJob.class) + private JobBuilder createJobBuilder(String identity, String invokerClassName, boolean noncurrent) { + Class jobClass = noncurrent ? NonconcurrentInvokerJob.class + : InvokerJob.class; + return JobBuilder.newJob(jobClass) // new JobKey(identity, "io.quarkus.scheduler.Scheduler") .withIdentity(identity, Scheduler.class.getName()) // this info is redundant but keep it for backward compatibility .usingJobData(INVOKER_KEY, invokerClassName) - .requestRecovery().build(); + .requestRecovery(); } /** @@ -816,12 +823,26 @@ private Optional> createTrigger(String identity, Scheduled sch return Optional.of(triggerBuilder); } - class QuartzJobDefinition extends AbstractJobDefinition implements ExecutionMetadata { + class QuartzJobDefinitionImpl extends AbstractJobDefinition + implements ExecutionMetadata, QuartzJobDefinition { + + private boolean nonconcurrent; - QuartzJobDefinition(String id) { + QuartzJobDefinitionImpl(String id) { super(id); } + @Override + public QuartzJobDefinition setNonconcurrent() { + nonconcurrent = true; + return self(); + } + + @Override + public boolean nonconcurrent() { + return nonconcurrent; + } + @Override public boolean isRunOnVirtualThread() { return runOnVirtualThread; @@ -858,7 +879,7 @@ public Class skipPredicateClass() { } @Override - public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { + public QuartzJobDefinition setSkipPredicate(SkipPredicate skipPredicate) { if (storeType.isDbStore() && skipPredicateClass == null) { throw new IllegalStateException( "A skip predicate instance cannot be scheduled programmatically if DB store type is used; register a skip predicate class instead"); @@ -867,7 +888,7 @@ public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { } @Override - public JobDefinition setTask(Consumer task, boolean runOnVirtualThread) { + public QuartzJobDefinition setTask(Consumer task, boolean runOnVirtualThread) { if (storeType.isDbStore() && taskClass == null) { throw new IllegalStateException( "A task instance cannot be scheduled programmatically if DB store type is used; register a task class instead"); @@ -876,7 +897,7 @@ public JobDefinition setTask(Consumer task, boolean runOnVir } @Override - public JobDefinition setAsyncTask(Function> asyncTask) { + public QuartzJobDefinition setAsyncTask(Function> asyncTask) { if (storeType.isDbStore() && asyncTaskClass == null) { throw new IllegalStateException( "An async task instance cannot be scheduled programmatically if DB store type is used; register an async task class instead"); @@ -913,12 +934,15 @@ interface ExecutionMetadata { SkipPredicate skipPredicate(); Class skipPredicateClass(); + + boolean nonconcurrent(); } static final String SCHEDULED_METADATA = "scheduled_metadata"; static final String EXECUTION_METADATA_TASK_CLASS = "execution_metadata_task_class"; static final String EXECUTION_METADATA_ASYNC_TASK_CLASS = "execution_metadata_async_task_class"; static final String EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD = "execution_metadata_run_on_virtual_thread"; + static final String EXECUTION_METADATA_NONCONCURRENT = "execution_metadata_nonconcurrent"; static final String EXECUTION_METADATA_SKIP_PREDICATE_CLASS = "execution_metadata_skip_predicate_class"; QuartzTrigger createJobDefinitionQuartzTrigger(ExecutionMetadata executionMetadata, SyntheticScheduled scheduled, @@ -967,11 +991,8 @@ public boolean isBlocking() { }; } - JobBuilder jobBuilder = JobBuilder.newJob(InvokerJob.class) - // new JobKey(identity, "io.quarkus.scheduler.Scheduler") - .withIdentity(scheduled.identity(), Scheduler.class.getName()) - // this info is redundant but keep it for backward compatibility - .usingJobData(INVOKER_KEY, QuartzSchedulerImpl.class.getName()); + JobBuilder jobBuilder = createJobBuilder(scheduled.identity(), QuartzSchedulerImpl.class.getName(), + executionMetadata.nonconcurrent()); if (storeType.isDbStore()) { jobBuilder.usingJobData(SCHEDULED_METADATA, scheduled.toJson()) .usingJobData(EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD, Boolean.toString(runOnVirtualThread)); @@ -1012,8 +1033,7 @@ public boolean isBlocking() { } invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent, scheduled.concurrentExecution(), skipPredicate, instrumenter, - vertx, - task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, + vertx, task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); QuartzTrigger quartzTrigger = new QuartzTrigger(trigger.getKey(), new Function<>() { @@ -1048,6 +1068,18 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { return quartzTrigger; } + /** + * @see Nonconcurrent + */ + @DisallowConcurrentExecution + static class NonconcurrentInvokerJob extends InvokerJob { + + NonconcurrentInvokerJob(QuartzTrigger trigger, Vertx vertx) { + super(trigger, vertx); + } + + } + /** * Although this class is not part of the public API it must not be renamed in order to preserve backward compatibility. The * name of this class can be stored in a Quartz table in the database. See https://github.com/quarkusio/quarkus/issues/29177 @@ -1193,6 +1225,9 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr // This is a job backed by a @Scheduled method or a JobDefinition return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx); } + if (jobClass.equals(NonconcurrentInvokerJob.class)) { + return new NonconcurrentInvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx); + } if (Subclass.class.isAssignableFrom(jobClass)) { // Get the original class from an intercepted bean class jobClass = (Class) jobClass.getSuperclass(); @@ -1221,6 +1256,7 @@ static class SerializedExecutionMetadata implements ExecutionMetadata { private final Class>> asyncTaskClass; private final boolean runOnVirtualThread; private final Class skipPredicateClass; + private final boolean nonconcurrent; @SuppressWarnings("unchecked") public SerializedExecutionMetadata(JobDetail jobDetail) { @@ -1252,6 +1288,7 @@ public SerializedExecutionMetadata(JobDetail jobDetail) { } this.runOnVirtualThread = Boolean .parseBoolean(jobDetail.getJobDataMap().getString(EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD)); + this.nonconcurrent = Boolean.parseBoolean(jobDetail.getJobDataMap().getString(EXECUTION_METADATA_NONCONCURRENT)); } @Override @@ -1274,6 +1311,11 @@ public Class>> asyncTaskClass() return asyncTaskClass; } + @Override + public boolean nonconcurrent() { + return nonconcurrent; + } + @Override public boolean isRunOnVirtualThread() { return runOnVirtualThread; diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java index b343422373b788..fc7f68af5def1b 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java @@ -1,18 +1,24 @@ package io.quarkus.quartz.runtime; import java.util.Optional; +import java.util.Set; + +import io.quarkus.quartz.Nonconcurrent; +import io.quarkus.scheduler.common.runtime.ScheduledMethod; public class QuartzSupport { private final QuartzRuntimeConfig runtimeConfig; private final QuartzBuildTimeConfig buildTimeConfig; private final Optional driverDialect; + private final Set nonconcurrentMethods; public QuartzSupport(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, - Optional driverDialect) { + Optional driverDialect, Set nonconcurrentMethods) { this.runtimeConfig = runtimeConfig; this.buildTimeConfig = buildTimeConfig; this.driverDialect = driverDialect; + this.nonconcurrentMethods = nonconcurrentMethods; } public QuartzRuntimeConfig getRuntimeConfig() { @@ -26,4 +32,13 @@ public QuartzBuildTimeConfig getBuildTimeConfig() { public Optional getDriverDialect() { return driverDialect; } + + /** + * + * @param method + * @return {@code true} if the scheduled method is annotated with {@link Nonconcurrent} + */ + public boolean isNonconcurrent(ScheduledMethod method) { + return nonconcurrentMethods.contains(method.getDeclaringClassName() + "_" + method.getMethodName()); + } } diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java index b66d871fa61b9e..cd93eb8af3d2b3 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java @@ -80,7 +80,7 @@ public interface Scheduler { * @return a new job definition * @see Scheduled#identity() */ - JobDefinition newJob(String identity); + JobDefinition newJob(String identity); /** * Removes the job previously added via {@link #newJob(String)}. @@ -106,7 +106,7 @@ public interface Scheduler { *

* The implementation is not thread-safe and should not be reused. */ - interface JobDefinition { + interface JobDefinition> { /** * The schedule is defined either by {@link #setCron(String)} or by {@link #setInterval(String)}. If both methods are @@ -118,7 +118,7 @@ interface JobDefinition { * @return self * @see Scheduled#cron() */ - JobDefinition setCron(String cron); + THIS setCron(String cron); /** * The schedule is defined either by {@link #setCron(String)} or by {@link #setInterval(String)}. If both methods are @@ -133,7 +133,7 @@ interface JobDefinition { * @return self * @see Scheduled#every() */ - JobDefinition setInterval(String every); + THIS setInterval(String every); /** * {@link Scheduled#delayed()} @@ -142,7 +142,7 @@ interface JobDefinition { * @return self * @see Scheduled#delayed() */ - JobDefinition setDelayed(String period); + THIS setDelayed(String period); /** * {@link Scheduled#concurrentExecution()} @@ -151,7 +151,7 @@ interface JobDefinition { * @return self * @see Scheduled#concurrentExecution() */ - JobDefinition setConcurrentExecution(ConcurrentExecution concurrentExecution); + THIS setConcurrentExecution(ConcurrentExecution concurrentExecution); /** * {@link Scheduled#skipExecutionIf()} @@ -160,7 +160,7 @@ interface JobDefinition { * @return self * @see Scheduled#skipExecutionIf() */ - JobDefinition setSkipPredicate(SkipPredicate skipPredicate); + THIS setSkipPredicate(SkipPredicate skipPredicate); /** * {@link Scheduled#skipExecutionIf()} @@ -169,7 +169,7 @@ interface JobDefinition { * @return self * @see Scheduled#skipExecutionIf() */ - JobDefinition setSkipPredicate(Class skipPredicateClass); + THIS setSkipPredicate(Class skipPredicateClass); /** * {@link Scheduled#overdueGracePeriod()} @@ -178,7 +178,7 @@ interface JobDefinition { * @return self * @see Scheduled#overdueGracePeriod() */ - JobDefinition setOverdueGracePeriod(String period); + THIS setOverdueGracePeriod(String period); /** * {@link Scheduled#timeZone()} @@ -186,7 +186,7 @@ interface JobDefinition { * @return self * @see Scheduled#timeZone() */ - JobDefinition setTimeZone(String timeZone); + THIS setTimeZone(String timeZone); /** * {@link Scheduled#executeWith()} @@ -196,7 +196,7 @@ interface JobDefinition { * @throws IllegalArgumentException If the composite scheduler is used and the selected implementation is not available * @see Scheduled#executeWith() */ - JobDefinition setExecuteWith(String implementation); + THIS setExecuteWith(String implementation); /** * {@link Scheduled#executionMaxDelay()} @@ -205,14 +205,14 @@ interface JobDefinition { * @return self * @see Scheduled#executionMaxDelay() */ - JobDefinition setExecutionMaxDelay(String maxDelay); + THIS setExecutionMaxDelay(String maxDelay); /** * * @param task * @return self */ - default JobDefinition setTask(Consumer task) { + default THIS setTask(Consumer task) { return setTask(task, false); } @@ -232,7 +232,7 @@ default JobDefinition setTask(Consumer task) { * @param taskClass * @return self */ - default JobDefinition setTask(Class> taskClass) { + default THIS setTask(Class> taskClass) { return setTask(taskClass, false); } @@ -243,7 +243,7 @@ default JobDefinition setTask(Class> task * @param runOnVirtualThread whether the task must be run on a virtual thread if the JVM allows it. * @return self */ - JobDefinition setTask(Consumer task, boolean runOnVirtualThread); + THIS setTask(Consumer task, boolean runOnVirtualThread); /** * The class must either represent a CDI bean or declare a public no-args constructor. @@ -262,14 +262,14 @@ default JobDefinition setTask(Class> task * @param runOnVirtualThread * @return self */ - JobDefinition setTask(Class> consumerClass, boolean runOnVirtualThread); + THIS setTask(Class> consumerClass, boolean runOnVirtualThread); /** * * @param asyncTask * @return self */ - JobDefinition setAsyncTask(Function> asyncTask); + THIS setAsyncTask(Function> asyncTask); /** * The class must either represent a CDI bean or declare a public no-args constructor. @@ -287,7 +287,7 @@ default JobDefinition setTask(Class> task * @param asyncTaskClass * @return self */ - JobDefinition setAsyncTask(Class>> asyncTaskClass); + THIS setAsyncTask(Class>> asyncTaskClass); /** * Attempts to schedule the job. diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java index d94f1c612a378d..d7a391628b7dd3 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java @@ -12,7 +12,7 @@ import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.smallrye.mutiny.Uni; -public abstract class AbstractJobDefinition implements JobDefinition { +public abstract class AbstractJobDefinition> implements JobDefinition { protected final String identity; protected String cron = ""; @@ -37,104 +37,104 @@ public AbstractJobDefinition(String identity) { } @Override - public JobDefinition setCron(String cron) { + public THIS setCron(String cron) { checkScheduled(); this.cron = Objects.requireNonNull(cron); - return this; + return self(); } @Override - public JobDefinition setInterval(String every) { + public THIS setInterval(String every) { checkScheduled(); this.every = Objects.requireNonNull(every); - return this; + return self(); } @Override - public JobDefinition setDelayed(String period) { + public THIS setDelayed(String period) { checkScheduled(); this.delayed = Objects.requireNonNull(period); - return this; + return self(); } @Override - public JobDefinition setConcurrentExecution(ConcurrentExecution concurrentExecution) { + public THIS setConcurrentExecution(ConcurrentExecution concurrentExecution) { checkScheduled(); this.concurrentExecution = Objects.requireNonNull(concurrentExecution); - return this; + return self(); } @Override - public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { + public THIS setSkipPredicate(SkipPredicate skipPredicate) { checkScheduled(); this.skipPredicate = Objects.requireNonNull(skipPredicate); - return this; + return self(); } @Override - public JobDefinition setSkipPredicate(Class skipPredicateClass) { + public THIS setSkipPredicate(Class skipPredicateClass) { checkScheduled(); this.skipPredicateClass = Objects.requireNonNull(skipPredicateClass); return setSkipPredicate(SchedulerUtils.instantiateBeanOrClass(skipPredicateClass)); } @Override - public JobDefinition setOverdueGracePeriod(String period) { + public THIS setOverdueGracePeriod(String period) { checkScheduled(); this.overdueGracePeriod = Objects.requireNonNull(period); - return this; + return self(); } @Override - public JobDefinition setTimeZone(String timeZone) { + public THIS setTimeZone(String timeZone) { checkScheduled(); this.timeZone = Objects.requireNonNull(timeZone); - return this; + return self(); } @Override - public JobDefinition setExecuteWith(String implementation) { + public THIS setExecuteWith(String implementation) { checkScheduled(); this.implementation = Objects.requireNonNull(implementation); - return this; + return self(); } @Override - public JobDefinition setExecutionMaxDelay(String maxDelay) { + public THIS setExecutionMaxDelay(String maxDelay) { checkScheduled(); this.executionMaxDelay = maxDelay; - return this; + return self(); } @Override - public JobDefinition setTask(Consumer task, boolean runOnVirtualThread) { + public THIS setTask(Consumer task, boolean runOnVirtualThread) { checkScheduled(); if (asyncTask != null) { throw new IllegalStateException("Async task was already set"); } this.task = Objects.requireNonNull(task); this.runOnVirtualThread = runOnVirtualThread; - return this; + return self(); } @Override - public JobDefinition setTask(Class> taskClass, boolean runOnVirtualThread) { + public THIS setTask(Class> taskClass, boolean runOnVirtualThread) { this.taskClass = Objects.requireNonNull(taskClass); return setTask(SchedulerUtils.instantiateBeanOrClass(taskClass), runOnVirtualThread); } @Override - public JobDefinition setAsyncTask(Function> asyncTask) { + public THIS setAsyncTask(Function> asyncTask) { checkScheduled(); if (task != null) { throw new IllegalStateException("Sync task was already set"); } this.asyncTask = Objects.requireNonNull(asyncTask); - return this; + return self(); } @Override - public JobDefinition setAsyncTask(Class>> asyncTaskClass) { + public THIS setAsyncTask(Class>> asyncTaskClass) { this.asyncTaskClass = Objects.requireNonNull(asyncTaskClass); return setAsyncTask(SchedulerUtils.instantiateBeanOrClass(asyncTaskClass)); } @@ -145,4 +145,9 @@ protected void checkScheduled() { } } + @SuppressWarnings("unchecked") + protected THIS self() { + return (THIS) this; + } + } diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java index 3bd4446a7a44f0..cd8910628bd538 100644 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java @@ -63,7 +63,7 @@ public void testJobs() throws InterruptedException { .setSkipPredicate(AlwaysSkipPredicate.class) .schedule(); - Scheduler.JobDefinition job1 = scheduler.newJob("foo") + Scheduler.JobDefinition job1 = scheduler.newJob("foo") .setInterval("1s") .setTask(ec -> { assertTrue(Arc.container().requestContext().isActive()); @@ -73,7 +73,7 @@ public void testJobs() throws InterruptedException { assertEquals("Sync task was already set", assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage()); - Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); + Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); assertEquals("Either sync or async task must be set", assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage()); job2.setTask(ec -> { @@ -110,7 +110,7 @@ public void testJobs() throws InterruptedException { @Test public void testAsyncJob() throws InterruptedException { - JobDefinition asyncJob = scheduler.newJob("fooAsync") + JobDefinition asyncJob = scheduler.newJob("fooAsync") .setInterval("1s") .setAsyncTask(ec -> { assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext()); diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java index 55d90854249bde..f8d677355aa106 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java @@ -102,7 +102,7 @@ public Trigger getScheduledJob(String identity) { } @Override - public JobDefinition newJob(String identity) { + public CompositeJobDefinition newJob(String identity) { return new CompositeJobDefinition(identity); } @@ -122,14 +122,14 @@ public String implementation() { return Scheduled.AUTO; } - class CompositeJobDefinition extends AbstractJobDefinition { + public class CompositeJobDefinition extends AbstractJobDefinition { public CompositeJobDefinition(String identity) { super(identity); } @Override - public JobDefinition setExecuteWith(String implementation) { + public CompositeJobDefinition setExecuteWith(String implementation) { Objects.requireNonNull(implementation); if (!Scheduled.AUTO.equals(implementation)) { if (schedulers.stream().map(Scheduler::implementation).noneMatch(implementation::equals)) { @@ -153,7 +153,7 @@ public Trigger schedule() { throw new IllegalStateException("Matching scheduler implementation not found: " + implementation); } - private JobDefinition copy(JobDefinition to) { + private JobDefinition copy(JobDefinition to) { to.setCron(cron); to.setInterval(every); to.setDelayed(delayed); diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index a9a4f9558fa053..d6ec118d0b5ca3 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -202,7 +202,7 @@ public String implementation() { } @Override - public JobDefinition newJob(String identity) { + public SimpleJobDefinition newJob(String identity) { Objects.requireNonNull(identity); if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); @@ -644,7 +644,7 @@ public Instant getScheduledFireTime() { } - class SimpleJobDefinition extends AbstractJobDefinition { + public class SimpleJobDefinition extends AbstractJobDefinition { private final SchedulerConfig schedulerConfig;