Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
eueuy committed Apr 18, 2020
0 parents commit a2f4994
Show file tree
Hide file tree
Showing 51 changed files with 3,487 additions and 0 deletions.
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
target/
!.mvn/wrapper/maven-wrapper.jar

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

logs
Binary file added .mvn/wrapper/maven-wrapper.jar
Binary file not shown.
1 change: 1 addition & 0 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## Easy Job
Easy-Job 是一个小巧但完善的分布式任务调度执行框架。它的设计目标是**易于使用****易于理解****足够简单**

它在功能上进行了仔细取舍,只实现最基本的功能。包括分布式任务调度、弹性扩容、分片、自动故障转移。同时也保留了良好的抽象和扩展接口。

由于其易于理解的特性,在Easy-Job的基础上进行定制化开发非常容易,非常适合作为调度类任务开发的起点。

## Todo List

* 任务状态回写repository
* 处理puller等daemon thread堵塞的问题
* allocation table 的事务操作
* 多个node操作repository的并发安全问题
* 确保repository的实现是并发安全的
* rebalance 时要阻止对job registry 的其它修改请求。
* metrics组件
9 changes: 9 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Todo List

* 任务状态回写repository
* 处理puller等daemon thread堵塞的问题
* allocation table 的事务操作
* 多个node操作repository的并发安全问题
* 确保repository的实现是并发安全的
* rebalance 时要阻止对job registry 的其它修改请求。
* metrics组件
116 changes: 116 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.easyjob</groupId>
<artifactId>easy-job</artifactId>
<version>0.2.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>easy-job</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<version.log4j2>2.8.1</version.log4j2>
<version.okhttp>3.2.0</version.okhttp>
<creep.version>0.3.2</creep.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.16</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>17.0.0</version>
</dependency>

<dependency>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
<version>1.8.0-alpha2</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.10.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<finalName>easy-job</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-maven-plugin</artifactId>
<version>1.16.8.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>delombok</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>


</project>
125 changes: 125 additions & 0 deletions src/main/java/org/easyjob/Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package org.easyjob;

import lombok.extern.slf4j.Slf4j;
import org.easyjob.allocation.table.JobShardDefinition;
import org.easyjob.execution.ExecutionMachine;

import java.util.Map;

@Slf4j
public abstract class Job {

private ExecutionMachine executionMachine;
private JobDefinition definition;
private JobStatus status;
private int hasRetriedTime = 0;
private JobShardDefinition shard;

public void init(JobDefinition definition, JobShardDefinition shard, ExecutionMachine executionMachine) {
this.definition = definition;
this.shard = shard;
this.executionMachine = executionMachine;
this.toStart();
this.toFinished();
}

public abstract void onStart(Map<String, Object> parameters, int shardNum);

public abstract void onStop();

public abstract void onFinished();

public void onError(Throwable throwable, boolean doFailover) {
log.error("job <" + definition.getId() + "> error occurred, cause:" + throwable.getMessage(), throwable);
if (doFailover) {
failover();
}
}

private synchronized void failover() {
if (!(this.hasRetriedTime > definition.getRetryTime())) {
log.warn("try restart job <{}> ({} time)", definition.getId(), this.hasRetriedTime);
hasRetriedTime++;
this.restart();
} else {
log.error("job <{}> retry time exhausted", definition.getId());
}
}

public synchronized void restart() {
log.info("try restart job <{}>", definition.getId());
//todo 执行机这边应该有已经同步接收成功的概念,只要接收成功,至于任务是否执行失败,任务是否重启,都属于状态,在执行机内部处理。但无论如何不应该再次重复接收已经接收过的任务。
//todo 因此这行代码是错误的。
this.executionMachine.restart(this.shard, this.definition);
}

public void toFinished() {
try {
onFinished();
} catch (Exception e) {
toError(e);
}
this.status = JobStatus.Finished;
log.info("job <{}> status to finished", definition.getId());
}

public void toStart() {
log.info("job <{}> status to running", definition.getId());
this.status = JobStatus.Running;
try {
onStart(this.getDefinition().getParameters(), shard.getShardNum());
} catch (Exception e) {
toError(e);
}
}

public void toStop() {
try {
onStop();
} catch (Exception e) {
log.error("job <" + definition.getId() + "> stop failure, cause:" + e.getMessage(), e);
}
this.status = JobStatus.Stopped;
log.info("job <{}> status to stop", definition.getId());
}

public void toError(Throwable throwable, boolean doFailover) {
log.info("job <{}> status to error", definition.getId());
this.status = JobStatus.Error;
onError(throwable, doFailover);
}

public void toError(Throwable throwable) {
this.toError(throwable, true);
}

public JobStatus getStatus() {
return this.status;
}


public JobDefinition getDefinition() {
return this.definition;
}

public enum JobStatus {
Running, Error, Stopped, Finished
}
}
79 changes: 79 additions & 0 deletions src/main/java/org/easyjob/JobDefinition.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.easyjob;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.easyjob.util.UUIDUtil;

import java.util.LinkedHashMap;
import java.util.Map;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class JobDefinition {
private String id = UUIDUtil.generate8Code();
private Map<String, Object> parameters;
private boolean enable = true;
private int retryTime = 3;
private int totalShard = 1;
private String jobClassName;
private long lastUpdateTime = System.currentTimeMillis();

public void refreshLastUpdateTime() {
this.lastUpdateTime = System.currentTimeMillis();
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String id = UUIDUtil.generate8Code();
private Map<String, Object> parameters = new LinkedHashMap<>();
private String jobClassName;
private boolean enable = true;
private int retryTime = 3;
private int totalShard = 1;

public Builder id(String id) {
this.id = id;
return this;
}

public Builder enable(boolean enable) {
this.enable = enable;
return this;
}

public Builder sharding(int totalShard) {
this.totalShard = totalShard;
return this;
}

public Builder parameters(Map<String, Object> parameters) {
this.parameters = parameters;
return this;
}

public Builder addParameter(String key, Object value) {
this.parameters.put(key, value);
return this;
}

public Builder jobClassName(String className) {
this.jobClassName = className;
return this;
}

public Builder retryTime(int retryTime) {
this.retryTime = retryTime;
return this;
}

public JobDefinition build() {
return new JobDefinition(this.id, this.parameters, this.enable, this.retryTime, this.totalShard,
this.jobClassName, 0);
}
}
}
Loading

0 comments on commit a2f4994

Please sign in to comment.