Skip to content

Commit

Permalink
fix #64 & fix #164
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 10, 2016
1 parent 0fc6654 commit 6d05603
Show file tree
Hide file tree
Showing 19 changed files with 82 additions and 161 deletions.
2 changes: 2 additions & 0 deletions elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ weight=1

### 缺陷修正

1. [ISSUE #64](https://github.com/dangdangdotcom/elastic-job/issues/64) Spring命名空间,若注册多个同Class的作业Bean,会导致作业Bean查找不准确
1. [ISSUE #151](https://github.com/dangdangdotcom/elastic-job/issues/151) 基于关系型数据库的事件追踪缺乏对MySQL之外数据库的支持
1. [ISSUE #152](https://github.com/dangdangdotcom/elastic-job/issues/152) job自定义异常处理器无效,总是被DefaultJobExceptionHandler处理
1. [ISSUE #161](https://github.com/dangdangdotcom/elastic-job/issues/161) Lite版本部署至某些版本的Tomcat无法启动
Expand All @@ -19,6 +20,7 @@ weight=1
### 功能提升

1. [ISSUE #159](https://github.com/dangdangdotcom/elastic-job/issues/159) 提供从Spring 3.1.0.REELASE至Spring 4任何版本的支持
1. [ISSUE #164](https://github.com/dangdangdotcom/elastic-job/issues/164) 作业Spring命名空间中已声明的JobBean不需要再声明@Component或在Spring xml中定义

### 结构调整

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.example.fixture.entity.Foo;
import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

@Component
public class SpringDataflowJob implements DataflowJob<Foo> {

@Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.example.fixture.entity.Foo;
import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

@Component
public class SpringSimpleJob implements SimpleJob {

@Resource
Expand All @@ -36,6 +34,7 @@ public class SpringSimpleJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s", Thread.currentThread().getId(), new Date(), shardingContext, "simple job"));
System.out.println(shardingContext.getShardingParameter());
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
<context:property-placeholder location="classpath:conf/*.properties" />

<bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="org.h2.Driver"/>
<property name="url" value="jdbc:h2:mem:job_event_storage"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
<property name="driverClassName" value="${event.rdb.driver}"/>
<property name="url" value="${event.rdb.url}"/>
<property name="username" value="${event.rdb.username}"/>
<property name="password" value="${event.rdb.password}"/>
</bean>

<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.script.ScriptJob;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.exception.JobConfigurationException;
Expand All @@ -32,6 +33,7 @@
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.lite.internal.schedule.LiteJobFacade;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Optional;
import lombok.Setter;
import org.quartz.Job;
import org.quartz.JobBuilder;
Expand All @@ -58,78 +60,93 @@ public class JobScheduler {

private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";

private final String jobName;

private final JobExecutor jobExecutor;

private final JobFacade jobFacade;

private final JobRegistry jobRegistry;

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
JobEventBus jobEventBus = new JobEventBus();
jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
}

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
JobEventBus jobEventBus = new JobEventBus(jobEventConfig);
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}

private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
jobName = liteJobConfig.getJobName();
jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
jobFacade = new LiteJobFacade(regCenter, jobName, Arrays.asList(elasticJobListeners), jobEventBus);
jobRegistry = JobRegistry.getInstance();
}

/**
* 初始化作业.
*/
public void init() {
jobExecutor.init();
JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobExecutor.getLiteJobConfig().getJobName()).build();
try {
if (!jobExecutor.getLiteJobConfig().getTypeConfig().getJobClass().equals(ScriptJob.class.getCanonicalName())) {
jobDetail.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobExecutor.getLiteJobConfig().getTypeConfig().getJobClass()).newInstance());
JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig();
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
jobRegistry.addJobScheduleController(jobName, jobScheduleController);
}

private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobExecutor.getLiteJobConfig().getTypeConfig().getJobClass());
}
jobDetail.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
JobScheduleController jobScheduleController;
return result;
}

protected Optional<ElasticJob> createElasticJobInstance() {
return Optional.absent();
}

private Scheduler createScheduler(final boolean isMisfire) {
Scheduler result;
try {
jobScheduleController = new JobScheduleController(initializeScheduler(jobDetail.getKey().toString()), jobDetail,
jobExecutor.getSchedulerFacade(), jobExecutor.getLiteJobConfig().getJobName());
jobScheduleController.scheduleJob(jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig().getCoreConfig().getCron());
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties(isMisfire));
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
JobRegistry.getInstance().addJobScheduleController(jobExecutor.getLiteJobConfig().getJobName(), jobScheduleController);
}

private Scheduler initializeScheduler(final String jobName) throws SchedulerException {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties(jobName));
Scheduler result = factory.getScheduler();
result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
return result;
}

private Properties getBaseQuartzProperties(final String jobName) {
private Properties getBaseQuartzProperties(final boolean isMisfire) {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", jobName);
if (!jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig().getCoreConfig().isMisfire()) {
if (!isMisfire) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
prepareEnvironments(result);
return result;
}

protected void prepareEnvironments(final Properties props) {
}

/**
* 停止作业调度.
*/
public void shutdown() {
JobRegistry.getInstance().getJobScheduleController(jobExecutor.getLiteJobConfig().getJobName()).shutdown();
jobRegistry.getJobScheduleController(jobName).shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.setDestroyMethodName("shutdown");
factory.addConstructorArgValue(BeanDefinitionBuilder.rootBeanDefinition(element.getAttribute(CLASS_ATTRIBUTE)).getBeanDefinition());
factory.addConstructorArgReference(element.getAttribute(REGISTRY_CENTER_REF_ATTRIBUTE));
factory.addConstructorArgValue(createJobConfiguration(element));
BeanDefinition jobEventConfig = createJobEventConfig(element);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,34 @@

package com.dangdang.ddframe.job.lite.spring.schedule;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.spring.namespace.parser.common.AbstractJobConfigurationDto;
import com.dangdang.ddframe.job.lite.spring.util.AopTargetUtils;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.Properties;
import com.google.common.base.Optional;

/**
* 基于Spring的作业启动器.
*
* @author caohao
* @author zhangliang
*/
public class SpringJobScheduler extends JobScheduler implements ApplicationContextAware {
public class SpringJobScheduler extends JobScheduler {

private ApplicationContext applicationContext;
private final ElasticJob elasticJob;

public SpringJobScheduler(final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto, final ElasticJobListener[]elasticJobListeners) {
public SpringJobScheduler(final ElasticJob elasticJob, final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto, final ElasticJobListener[]elasticJobListeners) {
super(regCenter, jobConfigDto.toLiteJobConfiguration(), getTargetElasticJobListeners(elasticJobListeners));
this.elasticJob = elasticJob;
}

public SpringJobScheduler(final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto,
public SpringJobScheduler(final ElasticJob elasticJob, final CoordinatorRegistryCenter regCenter, final AbstractJobConfigurationDto jobConfigDto,
final JobEventConfiguration jobEventConfig, final ElasticJobListener[]elasticJobListeners) {
super(regCenter, jobConfigDto.toLiteJobConfiguration(), jobEventConfig, getTargetElasticJobListeners(elasticJobListeners));
this.elasticJob = elasticJob;
}

private static ElasticJobListener[] getTargetElasticJobListeners(final ElasticJobListener[] elasticJobListeners) {
Expand All @@ -55,13 +56,7 @@ private static ElasticJobListener[] getTargetElasticJobListeners(final ElasticJo
}

@Override
public void setApplicationContext(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

@Override
protected void prepareEnvironments(final Properties props) {
SpringJobFactory.setApplicationContext(applicationContext);
props.put("org.quartz.scheduler.jobFactory.class", SpringJobFactory.class.getName());
protected Optional<ElasticJob> createElasticJobInstance() {
return Optional.of(elasticJob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@

@RunWith(Suite.class)
@SuiteClasses(AllSpringJobTests.class)
public class AllLiteSpringTests {
public final class AllLiteSpringTests {
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
AllSpringIntegrateTests.class,
AllSpringNamespaceTests.class
})
public class AllSpringJobTests {
public final class AllSpringJobTests {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,14 @@
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.Getter;
import lombok.Setter;

public class FooSimpleElasticJob implements SimpleJob {

@Getter
private static volatile boolean completed;

@Getter
private static String jobValue;

@Setter
private String springValue;

@Override
public void execute(final ShardingContext shardingContext) {
jobValue = springValue;
completed = true;
}

Expand Down
Loading

0 comments on commit 6d05603

Please sign in to comment.