Skip to content

Commit

Permalink
fix #127
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Sep 2, 2016
1 parent bee985e commit 1762271
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 42 deletions.
1 change: 1 addition & 0 deletions elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ weight=1

### 缺陷修正
1. [ISSUE #123](https://github.com/dangdangdotcom/elastic-job/issues/123) 单机跑定时任务,zk断开后重连,没有触发leader选举
1. [ISSUE #127](https://github.com/dangdangdotcom/elastic-job/issues/127) Spring方式配置作业id无法使用占位符

## 1.1.1

Expand Down
16 changes: 12 additions & 4 deletions elastic-job-doc/content/post/user_guide/lite/dev_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ echo sharding execution context is $*
<!-- 配置带监听的简单作业-->
<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
<job:listener class="xx.MySimpleJobListener"/>
<job:listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
<job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
</job:simple>

<!-- 配置带数据库和日志作业事件监听的简单作业-->
Expand Down Expand Up @@ -215,13 +215,21 @@ job:script命名空间拥有job:simple命名空间的全部属性,以下仅列

#### job:listener命名空间属性详细说明

`job:listener`必须配置为`job:bean`的子元素
`job:listener`必须配置为`job:bean`的子元素,并且在子元素中只允许出现一次

| 属性名 | 类型 |是否必填|缺省值 | 描述 |
| ------------------------------ |:------|:------|:-------------|:------------------------------------------------------------------------------------------------|
|class |String |`` | | 前置后置任务监听实现类,需实现`ElasticJobListener`接口 |
|started-timeout-milliseconds |long |`` |Long.MAX_VALUE| AbstractDistributeOnceElasticJobListener型监听器,最后一个作业执行前的执行方法的超时时间<br />单位:毫秒|
|completed-timeout-milliseconds |long |`` |Long.MAX_VALUE| AbstractDistributeOnceElasticJobListener型监听器,最后一个作业执行后的执行方法的超时时间<br />单位:毫秒|

#### job:distributed-listener命名空间属性详细说明

`job:distributed-listener`必须配置为`job:bean`的子元素,并且在子元素中只允许出现一次

| 属性名 | 类型 |是否必填|缺省值 | 描述 |
| ------------------------------ |:------|:------|:-------------|:------------------------------------------------------------------------------------------------|
|class |String |`` | | 前置后置任务分布式监听实现类,需继承`AbstractDistributeOnceElasticJobListener`|
|started-timeout-milliseconds |long |`` |Long.MAX_VALUE| 最后一个作业执行前的执行方法的超时时间<br />单位:毫秒|
|completed-timeout-milliseconds |long |`` |Long.MAX_VALUE| 最后一个作业执行后的执行方法的超时时间<br />单位:毫秒|

#### job:event-log命名空间详细说明

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

<job:simple id="springSimpleJob" class="com.dangdang.ddframe.job.example.job.simple.SpringSimpleJob" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}">
<job:listener class="com.dangdang.ddframe.job.example.job.listener.SimpleListener" />
<job:listener class="com.dangdang.ddframe.job.example.job.listener.SimpleDistributeListener" started-timeout-milliseconds="${listener.startedTimeoutMilliseconds}" completed-timeout-milliseconds="${listener.completedTimeoutMilliseconds}" />
<job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}">
<job:listener class="${listener.simple}" />
<job:distributed-listener class="${listener.distributed}" started-timeout-milliseconds="${listener.distributed.startedTimeoutMilliseconds}" completed-timeout-milliseconds="${listener.distributed.completedTimeoutMilliseconds}" />
</job:simple>
<job:dataflow id="springDataflowJob" class="com.dangdang.ddframe.job.example.job.dataflow.SpringDataflowJob" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />
<job:dataflow id="${dataflow.id}" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />
</beans>
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
simple.id=springSimpleJob
simple.class=com.dangdang.ddframe.job.example.job.simple.SpringSimpleJob
simple.cron=0/5 * * * * ?
simple.shardingTotalCount=10
simple.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
Expand All @@ -8,6 +10,8 @@ simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888

dataflow.id=springDataflowJob
dataflow.class=com.dangdang.ddframe.job.example.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=10
dataflow.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
Expand All @@ -19,5 +23,7 @@ dataflow.description=\u6309\u987A\u5E8F\u4E0D\u505C\u6B62\u8FD0\u884C\u7684\u4F5
dataflow.disabled=false
dataflow.overwrite=true

listener.startedTimeoutMilliseconds=1000
listener.completedTimeoutMilliseconds=2000
listener.simple=com.dangdang.ddframe.job.example.job.listener.SimpleListener
listener.distributed=com.dangdang.ddframe.job.example.job.listener.SimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ public final class BaseJobBeanDefinitionParserTag {

public static final String LISTENER_TAG = "listener";

public static final String LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE = "started-timeout-milliseconds";
public static final String DISTRIBUTED_LISTENER_TAG = "distributed-listener";

public static final String LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE = "completed-timeout-milliseconds";
public static final String DISTRIBUTED_LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE = "started-timeout-milliseconds";

public static final String DISTRIBUTED_LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE = "completed-timeout-milliseconds";

public static final String EXECUTOR_SERVICE_HANDLER = "executor-service-handler";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.dangdang.ddframe.job.lite.spring.namespace.parser.common;

import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.lite.spring.schedule.SpringJobScheduler;
import com.google.common.base.Strings;
import org.springframework.beans.factory.config.BeanDefinition;
Expand All @@ -35,6 +34,9 @@
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.CRON_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DESCRIPTION_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISABLED_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.DISTRIBUTED_LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.EVENT_LOG_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.EVENT_RDB_DRIVER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.EVENT_RDB_LOG_LEVEL_ATTRIBUTE;
Expand All @@ -47,8 +49,6 @@
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_EXCEPTION_HANDLER;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_PARAMETER_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.JOB_SHARDING_STRATEGY_CLASS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.LISTENER_TAG;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MAX_TIME_DIFF_SECONDS_ATTRIBUTE;
import static com.dangdang.ddframe.job.lite.spring.namespace.constants.BaseJobBeanDefinitionParserTag.MISFIRE_ATTRIBUTE;
Expand All @@ -72,14 +72,15 @@ protected AbstractBeanDefinition parseInternal(final Element element, final Pars
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.addConstructorArgReference(element.getAttribute(REGISTRY_CENTER_REF_ATTRIBUTE));
factory.addConstructorArgReference(createJobConfiguration(element, parserContext));
factory.addConstructorArgValue(createJobConfiguration(element));
factory.addConstructorArgValue(createJobListeners(element));
return factory.getBeanDefinition();
}

private String createJobConfiguration(final Element element, final ParserContext parserContext) {
private BeanDefinition createJobConfiguration(final Element element) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(getJobConfigurationDTO());
factory.addConstructorArgValue(element.getAttribute(ID_ATTRIBUTE));
String jobName = element.getAttribute(ID_ATTRIBUTE);
factory.addConstructorArgValue(jobName);
factory.addConstructorArgValue(element.getAttribute(CRON_ATTRIBUTE));
factory.addConstructorArgValue(element.getAttribute(SHARDING_TOTAL_COUNT_ATTRIBUTE));
addPropertyValueIfNotEmpty(SHARDING_ITEM_PARAMETERS_ATTRIBUTE, "shardingItemParameters", element, factory);
Expand All @@ -97,31 +98,27 @@ private String createJobConfiguration(final Element element, final ParserContext
addPropertyValueIfNotEmpty(JOB_EXCEPTION_HANDLER, "jobExceptionHandler", element, factory);
setEventConfigs(element, factory);
setPropertiesValue(element, factory);
String result = element.getAttribute(ID_ATTRIBUTE) + "Conf";
parserContext.getRegistry().registerBeanDefinition(result, factory.getBeanDefinition());
return result;
return factory.getBeanDefinition();
}

protected abstract Class<? extends AbstractJobConfigurationDto> getJobConfigurationDTO();

protected abstract void setPropertiesValue(final Element element, final BeanDefinitionBuilder factory);

private List<BeanDefinition> createJobListeners(final Element element) {
List<Element> listenerElements = DomUtils.getChildElementsByTagName(element, LISTENER_TAG);
List<BeanDefinition> result = new ManagedList<>(listenerElements.size());
for (Element each : listenerElements) {
String className = each.getAttribute(CLASS_ATTRIBUTE);
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(className);
Element listenerElement = DomUtils.getChildElementByTagName(element, LISTENER_TAG);
Element distributedListenerElement = DomUtils.getChildElementByTagName(element, DISTRIBUTED_LISTENER_TAG);
List<BeanDefinition> result = new ManagedList<>(2);
if (null != listenerElement) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listenerElement.getAttribute(CLASS_ATTRIBUTE));
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
result.add(factory.getBeanDefinition());
}
if (null != distributedListenerElement) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListenerElement.getAttribute(CLASS_ATTRIBUTE));
factory.setScope(BeanDefinition.SCOPE_PROTOTYPE);
try {
Class listenerClass = Class.forName(className);
if (AbstractDistributeOnceElasticJobListener.class.isAssignableFrom(listenerClass)) {
factory.addConstructorArgValue(each.getAttribute(LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE));
factory.addConstructorArgValue(each.getAttribute(LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE));
}
} catch (final ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
factory.addConstructorArgValue(distributedListenerElement.getAttribute(DISTRIBUTED_LISTENER_STARTED_TIMEOUT_MILLISECONDS_ATTRIBUTE));
factory.addConstructorArgValue(distributedListenerElement.getAttribute(DISTRIBUTED_LISTENER_COMPLETED_TIMEOUT_MILLISECONDS_ATTRIBUTE));
result.add(factory.getBeanDefinition());
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
<xsd:element name="listener">
<xsd:complexType>
<xsd:attribute name="class" type="xsd:string" use="required" />
<xsd:attribute name="started-timeout-milliseconds" type="xsd:string" />
<xsd:attribute name="completed-timeout-milliseconds" type="xsd:string" />
</xsd:complexType>
</xsd:element>
<xsd:element name="distributed-listener">
<xsd:complexType>
<xsd:attribute name="class" type="xsd:string" use="required" />
<xsd:attribute name="started-timeout-milliseconds" type="xsd:string" use="required" />
<xsd:attribute name="completed-timeout-milliseconds" type="xsd:string" use="required" />
</xsd:complexType>
</xsd:element>

Expand Down Expand Up @@ -41,7 +46,8 @@
<xsd:complexContent>
<xsd:extension base="beans:identifiedType">
<xsd:sequence>
<xsd:element ref="listener" minOccurs="0" maxOccurs="unbounded" />
<xsd:element ref="listener" minOccurs="0" maxOccurs="1" />
<xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" />
<xsd:element ref="event-log" minOccurs="0" maxOccurs="1" />
<xsd:element ref="event-rdb" minOccurs="0" maxOccurs="1" />
</xsd:sequence>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
">
<context:property-placeholder location="classpath:conf/job/conf.properties" />
<reg:zookeeper id="regCenter" server-lists="localhost:3181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<reg:zookeeper id="regCenter" server-lists="${regCenter.serverLists}" namespace="${regCenter.namespace}" base-sleep-time-milliseconds="${regCenter.baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${regCenter.maxSleepTimeMilliseconds}" max-retries="${regCenter.maxRetries}" />
<bean id="simpleElasticJobBean" class="com.dangdang.ddframe.job.lite.fixture.FooSimpleElasticJob">
<property name="springValue" value="simple" />
</bean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<import resource="base.xml"/>
<job:simple id="simpleElasticJob_namespace_listener" class="com.dangdang.ddframe.job.lite.fixture.FooSimpleElasticJob" registry-center-ref="regCenter" cron="${simpleJob.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}">
<job:listener class="com.dangdang.ddframe.job.lite.fixture.listener.SimpleOnceListener" started-timeout-milliseconds="10000" completed-timeout-milliseconds="20000" />
<job:simple id="${simpleJob.id}" class="${simpleJob.class}" registry-center-ref="regCenter" cron="${simpleJob.cron}" sharding-total-count="${simpleJob.shardingTotalCount}" sharding-item-parameters="${simpleJob.shardingItemParameters}" disabled="${simpleJob.disabled}" overwrite="${simpleJob.overwrite}">
<job:listener class="com.dangdang.ddframe.job.lite.fixture.listener.SimpleListener" />
<job:distributed-listener class="com.dangdang.ddframe.job.lite.fixture.listener.SimpleOnceListener" started-timeout-milliseconds="10000" completed-timeout-milliseconds="20000" />
</job:simple>
<job:dataflow id="dataflowElasticJob_namespace_listener" class="com.dangdang.ddframe.job.lite.fixture.DataflowElasticJob" registry-center-ref="regCenter" cron="0/1 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" description="中文描述" overwrite="true" />
</beans>
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
regCenter.serverLists=localhost:3181
regCenter.namespace=dd-job
regCenter.namespace=elastic-job-lite-spring-test
regCenter.baseSleepTimeMilliseconds=1000
regCenter.maxSleepTimeMilliseconds=3000
regCenter.maxRetries=3

simpleJob.id=simpleElasticJob_namespace_listener
simpleJob.class=com.dangdang.ddframe.job.lite.fixture.FooSimpleElasticJob
simpleJob.cron=0/1 * * * * ?
simpleJob.shardingTotalCount=3
simpleJob.shardingItemParameters=0=A,1=B,2=C
Expand Down

0 comments on commit 1762271

Please sign in to comment.