Skip to content

Commit

Permalink
Scheduler - add Scheduled#concurrentExecution() strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba committed Jun 29, 2020
1 parent 7c01526 commit 100c687
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.quartz.test;

import static io.quarkus.scheduler.Scheduled.ConcurrentExection.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testNonconcurrentExecution() throws InterruptedException {
if (Jobs.LATCH.await(5, TimeUnit.SECONDS)) {
assertEquals(1, Jobs.COUNTER.get());
} else {
fail("Scheduled methods not executed");
}
}

static class Jobs {

static final CountDownLatch LATCH = new CountDownLatch(3);
static final AtomicInteger COUNTER = new AtomicInteger();

@Scheduled(every = "1s")
void concurrent() {
LATCH.countDown();
}

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
COUNTER.incrementAndGet();
if (!LATCH.await(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import io.quarkus.arc.InstanceHandle;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.ConcurrentExection;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.Trigger;
import io.quarkus.scheduler.runtime.SkipConcurrentExecutionInvoker;
import io.quarkus.scheduler.runtime.ScheduledInvoker;
import io.quarkus.scheduler.runtime.ScheduledMethodMetadata;
import io.quarkus.scheduler.runtime.SchedulerContext;
Expand Down Expand Up @@ -82,6 +84,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Co
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
// identity -> scheduled invoker instance
Map<String, ScheduledInvoker> invokers = new HashMap<>();
UserTransaction transaction = null;

Expand All @@ -105,17 +108,23 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Co
transaction.begin();
}
for (ScheduledMethodMetadata method : context.getScheduledMethods()) {

invokers.put(method.getInvokerClassName(), context.createInvoker(method.getInvokerClassName()));
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
String identity = scheduled.identity().trim();
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
}
ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName());
if (scheduled.concurrentExecution() == ConcurrentExection.SKIP) {
invoker = new SkipConcurrentExecutionInvoker(invoker);
}
invokers.put(identity, invoker);

JobBuilder jobBuilder = JobBuilder.newJob(InvokerJob.class)
// new JobKey(identity, "io.quarkus.scheduler.Scheduler")
.withIdentity(identity, Scheduler.class.getName())
// this info is redundant but keep it for backward compatibility
.usingJobData(INVOKER_KEY, method.getInvokerClassName())
.requestRecovery();
ScheduleBuilder<?> scheduleBuilder;
Expand Down Expand Up @@ -325,46 +334,64 @@ static class InvokerJob implements Job {

@Override
public void execute(JobExecutionContext context) {
Trigger trigger = new Trigger() {
QuartzTrigger trigger = new QuartzTrigger(context);
ScheduledInvoker scheduledInvoker = invokers.get(context.getJobDetail().getKey().getName());
if (scheduledInvoker != null) { // could be null from previous runs
scheduledInvoker.invoke(new QuartzScheduledExecution(trigger));
}
}
}

@Override
public Instant getNextFireTime() {
Date nextFireTime = context.getTrigger().getNextFireTime();
return nextFireTime != null ? nextFireTime.toInstant() : null;
}
static class QuartzTrigger implements Trigger {

@Override
public Instant getPreviousFireTime() {
Date previousFireTime = context.getTrigger().getPreviousFireTime();
return previousFireTime != null ? previousFireTime.toInstant() : null;
}
final JobExecutionContext context;

@Override
public String getId() {
return context.getTrigger().getKey().toString();
}
};
String invokerClass = context.getJobDetail().getJobDataMap().getString(INVOKER_KEY);
ScheduledInvoker scheduledInvoker = invokers.get(invokerClass);
if (scheduledInvoker != null) { // could be null from previous runs
scheduledInvoker.invoke(new ScheduledExecution() {
@Override
public Trigger getTrigger() {
return trigger;
}
public QuartzTrigger(JobExecutionContext context) {
this.context = context;
}

@Override
public Instant getScheduledFireTime() {
return context.getScheduledFireTime().toInstant();
}
@Override
public Instant getNextFireTime() {
Date nextFireTime = context.getTrigger().getNextFireTime();
return nextFireTime != null ? nextFireTime.toInstant() : null;
}

@Override
public Instant getFireTime() {
return context.getFireTime().toInstant();
}
});
}
@Override
public Instant getPreviousFireTime() {
Date previousFireTime = context.getTrigger().getPreviousFireTime();
return previousFireTime != null ? previousFireTime.toInstant() : null;
}

@Override
public String getId() {
return context.getTrigger().getKey().toString();
}

}

static class QuartzScheduledExecution implements ScheduledExecution {

final QuartzTrigger trigger;

public QuartzScheduledExecution(QuartzTrigger trigger) {
this.trigger = trigger;
}

@Override
public Trigger getTrigger() {
return trigger;
}

@Override
public Instant getFireTime() {
return trigger.context.getScheduledFireTime().toInstant();
}

@Override
public Instant getScheduledFireTime() {
return trigger.context.getFireTime().toInstant();
}

}

static class InvokerJobFactory extends SimpleJobFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,19 @@ public FeatureBuildItem build(SchedulerConfig config, BuildProducer<SyntheticBea
List<ScheduledMethodMetadata> scheduledMetadata = new ArrayList<>();
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);

for (ScheduledBusinessMethodItem businessMethod : scheduledMethods) {
ScheduledMethodMetadata scheduledMethod = new ScheduledMethodMetadata();
String invokerClass = generateInvoker(businessMethod.getBean(), businessMethod.getMethod(), classOutput);
for (ScheduledBusinessMethodItem scheduledMethod : scheduledMethods) {
ScheduledMethodMetadata metadata = new ScheduledMethodMetadata();
String invokerClass = generateInvoker(scheduledMethod, classOutput);
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, invokerClass));
scheduledMethod.setInvokerClassName(invokerClass);
metadata.setInvokerClassName(invokerClass);
List<Scheduled> schedules = new ArrayList<>();
for (AnnotationInstance scheduled : businessMethod.getSchedules()) {
for (AnnotationInstance scheduled : scheduledMethod.getSchedules()) {
schedules.add(annotationProxy.builder(scheduled, Scheduled.class).build(classOutput));
}
scheduledMethod.setSchedules(schedules);
scheduledMethod.setMethodDescription(
businessMethod.getMethod().declaringClass() + "#" + businessMethod.getMethod().name());
scheduledMetadata.add(scheduledMethod);
metadata.setSchedules(schedules);
metadata.setMethodDescription(
scheduledMethod.getMethod().declaringClass() + "#" + scheduledMethod.getMethod().name());
scheduledMetadata.add(metadata);
}

syntheticBeans.produce(SyntheticBeanBuildItem.configure(SchedulerContext.class).setRuntimeInit()
Expand All @@ -242,7 +242,10 @@ public FeatureBuildItem build(SchedulerConfig config, BuildProducer<SyntheticBea
return new FeatureBuildItem(Feature.SCHEDULER);
}

private String generateInvoker(BeanInfo bean, MethodInfo method, ClassOutput classOutput) {
private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, ClassOutput classOutput) {

BeanInfo bean = scheduledMethod.getBean();
MethodInfo method = scheduledMethod.getMethod();

String baseName;
if (bean.getImplClazz().enclosingClass() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.scheduler.test;

import static io.quarkus.scheduler.Scheduled.ConcurrentExection.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class ConcurrentExecutionTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(Jobs.class));

@Test
public void testNonconcurrentExecution() throws InterruptedException {
if (Jobs.LATCH.await(5, TimeUnit.SECONDS)) {
assertEquals(1, Jobs.COUNTER.get());
} else {
fail("Scheduled methods not executed");
}
}

static class Jobs {

static final CountDownLatch LATCH = new CountDownLatch(3);
static final AtomicInteger COUNTER = new AtomicInteger();

@Scheduled(every = "1s")
void concurrent() {
LATCH.countDown();
}

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
COUNTER.incrementAndGet();
if (!LATCH.await(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.scheduler;

import static io.quarkus.scheduler.Scheduled.ConcurrentExection.PROCEED;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

Expand Down Expand Up @@ -104,6 +105,14 @@
*/
String delayed() default "";

/**
* Specify the strategy to handle concurrent execution of a scheduled method. By default, a scheduled method can be executed
* concurrently.
*
* @return the concurrent execution strategy
*/
ConcurrentExection concurrentExecution() default PROCEED;

@Retention(RUNTIME)
@Target(METHOD)
@interface Schedules {
Expand All @@ -112,4 +121,22 @@

}

/**
* Represents a strategy to handle concurrent execution of a scheduled method
*/
enum ConcurrentExection {

/**
* The scheduled method can be executed concurrently, i.e. it is executed every time the trigger is fired.
*/
PROCEED,

/**
* The scheduled method is never executed concurrently, i.e. a method execution is skipped until the previous
* invocation completes.
*/
SKIP,

}

}
Loading

0 comments on commit 100c687

Please sign in to comment.