Skip to content

Commit

Permalink
implement #361 quartz scheduler removal (#552)
Browse files Browse the repository at this point in the history
* #361 remove pgsql quartz vacuum jobs

* #361 move cleanup for demo mode and cleanup unreferenced blob to spring scheduled jobs

* #361 move special price token generation job in spring managed scheduler, add skip locked in for update clause

* #361 move send offline payment reminder to event organizer to spring scheduled jobs

* #361 move send email job to spring scheduled jobs

* #361 javadoc

* #361 move process pending reservations job in spring managed scheduler

* #361 remove quartz cron trigger builder helper

* #361 move send offline payment reminder job in spring managed scheduler

* #361 move send ticket assignment reminder job in spring managed scheduler

* #361 move cleanup expired pending reservation job in spring managed scheduler

* #361 remove unused query

* #361 remove quartz, switch last job

* #361 rename job component, email load ids waiting for processing: add a limit of 100

* #361 make some methods publics and transactional, as discovered in the multi threaded test, the transactional annotation are not applied, and thus may cause issues. As an additional measure, the job class has been moved in a separate package

* #361 misc cleanup/transactional:readonly=true

* #361 add test for cleanupUnreferencedBlobFiles
  • Loading branch information
syjer authored and cbellone committed Nov 23, 2018
1 parent a16be29 commit b43f88f
Show file tree
Hide file tree
Showing 33 changed files with 439 additions and 596 deletions.
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ dependencies {
compile "com.samskivert:jmustache:1.14"
compile "com.github.sps.mustache:mustache-spring-view:1.4"
compile "javax.mail:mail:1.5.0-b01"
compile "org.quartz-scheduler:quartz:2.3.0", {
exclude group: "com.zaxxer", module: "HikariCP-java6"
exclude group: "com.mchange", module: "c3p0"
}
compile 'com.moodysalem:LatLongToTimezoneMaven:1.2'
/**/
compile "com.openhtmltopdf:openhtmltopdf-core:0.0.1-RC17"
Expand Down
122 changes: 1 addition & 121 deletions src/main/java/alfio/config/DataSourceConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package alfio.config;

import alfio.config.support.PlatformProvider;
import alfio.manager.Jobs.*;
import alfio.manager.UploadedResourceManager;
import alfio.manager.system.ConfigurationManager;
import alfio.util.TemplateManager;
Expand All @@ -28,14 +27,7 @@
import lombok.extern.log4j.Log4j2;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.MigrationVersion;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.Trigger;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.MessageSource;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.*;
Expand All @@ -47,7 +39,6 @@
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.*;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.transaction.PlatformTransactionManager;
Expand All @@ -57,16 +48,14 @@
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.EnumSet;
import java.util.Properties;
import java.util.Set;

@Configuration
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
@ComponentScan(basePackages = {"alfio.manager", "alfio.extension"})
@ComponentScan(basePackages = {"alfio.manager", "alfio.job", "alfio.extension"})
@Log4j2
public class DataSourceConfiguration implements ResourceLoaderAware {

Expand Down Expand Up @@ -174,115 +163,6 @@ public JMustacheTemplateLoader getTemplateLoader() {
return loader;
}

// ----- scheduler conf ------
// partially based on
// http://sloanseaman.com/wordpress/2011/06/06/spring-and-quartz-and-persistence/
// https://objectpartners.com/2013/07/09/configuring-quartz-2-with-spring-in-clustered-mode/
// https://gist.github.com/jelies/5085593

public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

private transient AutowireCapableBeanFactory beanFactory;

@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}

@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}

private static JobDetailFactoryBean jobDetailFactory(Class<? extends Job> jobClass, String name) {
JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
jobDetailFactory.setJobClass(jobClass);
jobDetailFactory.setName(name);
jobDetailFactory.setDurability(true);

jobDetailFactory.afterPropertiesSet();
return jobDetailFactory;
}


/**
* @param jobClass
* @param name
* @param repeatInterval in milliseconds
* @return
* @throws ParseException
*/
private static Trigger buildTrigger(Class<? extends Job> jobClass, String name, long repeatInterval) throws ParseException {
JobDetailFactoryBean jobDetailFactory = jobDetailFactory(jobClass, name);

SimpleTriggerFactoryBean triggerFactoryBean = new SimpleTriggerFactoryBean();
triggerFactoryBean.setJobDetail(jobDetailFactory.getObject());
triggerFactoryBean.setRepeatInterval(repeatInterval);
triggerFactoryBean.setName(name);
triggerFactoryBean.afterPropertiesSet();

return triggerFactoryBean.getObject();
}

private static CronTrigger buildCron(Class<? extends Job> jobClass, String name, String cronExpression) throws ParseException {
JobDetailFactoryBean jobDetailFactory = jobDetailFactory(jobClass, name);

CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetailFactory.getObject());
cronTriggerFactoryBean.setCronExpression(cronExpression);
cronTriggerFactoryBean.setName(name);
cronTriggerFactoryBean.afterPropertiesSet();

return cronTriggerFactoryBean.getObject();
}

public Trigger[] getTriggers() throws ParseException {
return new Trigger[]{
buildTrigger(CleanupExpiredPendingReservation.class, "CleanupExpiredPendingReservation", CleanupExpiredPendingReservation.INTERVAL),
buildTrigger(SendOfflinePaymentReminder.class, "SendOfflinePaymentReminder", SendOfflinePaymentReminder.INTERVAL),
buildTrigger(SendTicketAssignmentReminder.class, "SendTicketAssignmentReminder", SendTicketAssignmentReminder.INTERVAL),
buildTrigger(GenerateSpecialPriceCodes.class, "GenerateSpecialPriceCodes", GenerateSpecialPriceCodes.INTERVAL),
buildTrigger(ProcessReservationRequests.class, "ProcessReservationRequests", ProcessReservationRequests.INTERVAL),
buildTrigger(SendEmails.class, "SendEmails", SendEmails.INTERVAL),
buildTrigger(ProcessReleasedTickets.class, "ProcessReleasedTickets", ProcessReleasedTickets.INTERVAL),
buildTrigger(CleanupUnreferencedBlobFiles.class, "CleanupUnreferencedBlobFiles", CleanupUnreferencedBlobFiles.INTERVAL),
buildCron(SendOfflinePaymentReminderToEventOrganizers.class, "SendOfflinePaymentReminderToEventOrganizers", SendOfflinePaymentReminderToEventOrganizers.CRON_EXPRESSION),
buildCron(CleanupForDemoMode.class, "CleanupForDemoMode", CleanupForDemoMode.CRON_EXPRESSION)
};
}

@Bean
@DependsOn("migrator")
@Profile("!"+ Initializer.PROFILE_DISABLE_JOBS)
public SchedulerFactoryBean schedulerFactory(DataSource dataSource, PlatformTransactionManager platformTransactionManager, ApplicationContext applicationContext) throws ParseException {
String quartzDriverDelegateClass = "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate";
Properties properties = new Properties();
properties.setProperty("org.quartz.jobStore.isClustered", "true");
properties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
properties.setProperty("org.quartz.jobStore.driverDelegateClass", quartzDriverDelegateClass);

SchedulerFactoryBean sfb = new SchedulerFactoryBean();
sfb.setAutoStartup(true);
sfb.setWaitForJobsToCompleteOnShutdown(true);
sfb.setOverwriteExistingJobs(true);
sfb.setDataSource(dataSource);
sfb.setTransactionManager(platformTransactionManager);
sfb.setBeanName("QuartzScheduler");
sfb.setQuartzProperties(properties);
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
sfb.setJobFactory(jobFactory);
sfb.setTriggers(getTriggers());

log.info("Quartz scheduler configured to run!");
return sfb;
}

// ----- end scheduler conf ------

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
Expand Down
155 changes: 155 additions & 0 deletions src/main/java/alfio/job/Jobs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* This file is part of alf.io.
*
* alf.io is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* alf.io is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with alf.io. If not, see <http://www.gnu.org/licenses/>.
*/
package alfio.job;

import alfio.config.Initializer;
import alfio.manager.*;
import alfio.manager.system.ConfigurationManager;
import alfio.manager.user.UserManager;
import alfio.model.system.Configuration;
import alfio.model.system.ConfigurationKeys;
import alfio.model.user.User;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.core.env.Profiles;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;

/**
* <p>Scheduled jobs. Important: all the jobs must be able to run on multiple instance at the same time.</p>
* <p>Take great care in placing a select id ... for update skip locked to avoid multiple job execution for the same object</p>
* <p>Note: it's a separate package, as we need to ensure that the called method are public (and possibly @Transactional!)</p>
*
*/
@Component
@DependsOn("migrator")
@Profile("!" + Initializer.PROFILE_DISABLE_JOBS)
@AllArgsConstructor
@Log4j2
public class Jobs {


private static final int ONE_MINUTE = 1000 * 60;
private static final int THIRTY_SECONDS = 1000 * 30;
private static final int FIVE_SECONDS = 1000 * 5;
private static final int THIRTY_MINUTES = 30 * ONE_MINUTE;

private final AdminReservationRequestManager adminReservationRequestManager;
private final ConfigurationManager configurationManager;
private final Environment environment;
private final EventManager eventManager;
private final FileUploadManager fileUploadManager;
private final NotificationManager notificationManager;
private final SpecialPriceTokenGenerator specialPriceTokenGenerator;
private final TicketReservationManager ticketReservationManager;
private final UserManager userManager;
private final WaitingQueueSubscriptionProcessor waitingQueueSubscriptionProcessor;


//cron each minute: "0 0/1 * * * ?"

@Scheduled(fixedRate = ONE_MINUTE * 60)
public void cleanupUnreferencedBlobFiles() {
log.trace("running job cleanupUnreferencedBlobFiles");
fileUploadManager.cleanupUnreferencedBlobFiles(DateUtils.addDays(new Date(), -1));
}


//run each hour
@Scheduled(cron = "0 0 0/1 * * ?")
public void cleanupForDemoMode() {
if (environment.acceptsProfiles(Profiles.of(Initializer.PROFILE_DEMO))) {
log.trace("running job cleanupForDemoMode");
int expirationDate = configurationManager.getIntConfigValue(Configuration.getSystemConfiguration(ConfigurationKeys.DEMO_MODE_ACCOUNT_EXPIRATION_DAYS), 20);
List<Integer> userIds = userManager.disableAccountsOlderThan(DateUtils.addDays(new Date(), -expirationDate), User.Type.DEMO);
if (!userIds.isEmpty()) {
eventManager.disableEventsFromUsers(userIds);
}
}
}

@Scheduled(fixedRate = THIRTY_SECONDS)
public void generateSpecialPriceCodes() {
log.trace("running job generateSpecialPriceCodes");
specialPriceTokenGenerator.generatePendingCodes();
}


//run each hour
@Scheduled(cron = "0 0 0/1 * * ?")
public void sendOfflinePaymentReminderToEventOrganizers() {
log.trace("running job sendOfflinePaymentReminderToEventOrganizers");
ticketReservationManager.sendReminderForOfflinePaymentsToEventManagers();
}


@Scheduled(fixedRate = FIVE_SECONDS)
public void sendEmails() {
log.trace("running job sendEmails");
notificationManager.sendWaitingMessages();
}

@Scheduled(fixedRate = FIVE_SECONDS)
public void processReservationRequests() {
log.trace("running job processReservationRequests");
long start = System.currentTimeMillis();
Pair<Integer, Integer> result = adminReservationRequestManager.processPendingReservations();
if (result.getLeft() > 0 || result.getRight() > 0) {
log.info("ProcessReservationRequests: got {} success and {} failures. Elapsed {} ms", result.getLeft(), result.getRight(), System.currentTimeMillis() - start);
}
}


@Scheduled(fixedRate = THIRTY_MINUTES)
public void sendOfflinePaymentReminder() {
log.trace("running job sendOfflinePaymentReminder");
ticketReservationManager.sendReminderForOfflinePayments();
}

@Scheduled(fixedRate = THIRTY_SECONDS)
public void sendTicketAssignmentReminder() {
log.trace("running job sendTicketAssignmentReminder");
ticketReservationManager.sendReminderForTicketAssignment();
ticketReservationManager.sendReminderForOptionalData();
}


@Scheduled(fixedRate = THIRTY_SECONDS)
public void cleanupExpiredPendingReservation() {
log.trace("running job cleanupExpiredPendingReservation");
//cleanup reservation that have a expiration older than "now minus 10 minutes": this give some additional slack.
final Date expirationDate = DateUtils.addMinutes(new Date(), -10);
ticketReservationManager.cleanupExpiredReservations(expirationDate);
ticketReservationManager.cleanupExpiredOfflineReservations(expirationDate);
ticketReservationManager.markExpiredInPaymentReservationAsStuck(expirationDate);
}


@Scheduled(fixedRate = THIRTY_SECONDS)
public void processReleasedTickets() {
log.trace("running job processReleasedTickets");
waitingQueueSubscriptionProcessor.handleWaitingTickets();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Result<String> scheduleReservations(String eventName,
.orElseGet(() -> Result.error(ErrorCode.ReservationError.UPDATE_FAILED));
}

Pair<Integer, Integer> processPendingReservations() {
public Pair<Integer, Integer> processPendingReservations() {
Map<Boolean, List<MapSqlParameterSource>> result = adminReservationRequestRepository.findPendingForUpdate(1000)
.stream()
.map(id -> {
Expand Down
54 changes: 0 additions & 54 deletions src/main/java/alfio/manager/CleanupJobs.java

This file was deleted.

Loading

0 comments on commit b43f88f

Please sign in to comment.