Skip to content

Commit

Permalink
Merge branch '2.1.3-SNAPSHOT' into cloud-executor/336
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored May 26, 2017
2 parents 7aa456e + 0437046 commit f3d9fa2
Show file tree
Hide file tree
Showing 57 changed files with 1,034 additions and 79 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ target/
*.war
*.zip
*.tar
*.tar.gz

# eclipse ignore
.settings/
Expand Down
13 changes: 13 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## 2.1.3

### 功能提升

1. [ISSUE #327](https://github.com/dangdangdotcom/elastic-job/issues/327) spring命名空间支持使用xml方式配置bean
1. [ISSUE #332](https://github.com/dangdangdotcom/elastic-job/issues/332) elastic-job-cloud增加Docker镜像dangdangdotcom/elastic-job-cloud

### 缺陷修正

1. [ISSUE #321](https://github.com/dangdangdotcom/elastic-job/issues/321) elastic-job-lite界面在添加注册中心时命名空间不支持/
1. [ISSUE #333](https://github.com/dangdangdotcom/elastic-job/issues/333) elastic-job-lite界面中注册中心配置中登录凭证隐式显示
1. [ISSUE #334](https://github.com/dangdangdotcom/elastic-job/issues/334) elastic-job-lite界面在windows平台上找不到conf\auth.properties文件

## 2.1.2

### 新功能
Expand Down
2 changes: 1 addition & 1 deletion elastic-job-cloud/elastic-job-cloud-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>elastic-job-cloud</artifactId>
<groupId>com.dangdang</groupId>
<version>2.1.2</version>
<version>2.1.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>elastic-job-cloud-executor</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* 本地云作业配置.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
@AllArgsConstructor
@Getter
public class LocalCloudJobConfiguration implements JobRootConfiguration {

private final JobTypeConfiguration typeConfig;

private final LocalCloudJobExecutionType executionType;

private String beanName;

private String applicationContext;

/**
* 获取作业名称.
*
* @return 作业名称
*/
public String getJobName() {
return typeConfig.getCoreConfig().getJobName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

/**
* 本地作业执行类型.
*
* @author gaohongtao
*/
public enum LocalCloudJobExecutionType {

DAEMON, TRANSIENT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;

import java.util.Set;
import java.util.concurrent.CountDownLatch;

/**
* 本地模式运行时ExecutorDriver的仿造对象.
*
* @author gaohongtao
*/
@Slf4j
@RequiredArgsConstructor
public final class LocalExecutorDriver implements ExecutorDriver {

private static final Set<Protos.TaskState> TERMINAL_STATE = Sets.newHashSet(Protos.TaskState.TASK_ERROR, Protos.TaskState.TASK_FINISHED, Protos.TaskState.TASK_KILLED);

private volatile Protos.Status driverStatus = Protos.Status.DRIVER_NOT_STARTED;

private final CountDownLatch latch;

@Override
public Protos.Status start() {
log.info("Driver is starting");
return driverStatus = Protos.Status.DRIVER_RUNNING;
}

@Override
public Protos.Status stop() {
log.info("Driver is stopped");
return driverStatus = Protos.Status.DRIVER_STOPPED;
}

@Override
public Protos.Status abort() {
log.info("Driver is aborted");
return driverStatus = Protos.Status.DRIVER_ABORTED;
}

@Override
public Protos.Status join() {
log.info("Waiting for driver to be aborted");
return driverStatus;
}

@Override
public Protos.Status run() {
log.info("Driver is running");
start();
return join();
}

@Override
public Protos.Status sendStatusUpdate(final Protos.TaskStatus status) {
log.info("Task driverStatus is {}", status);
if (TERMINAL_STATE.contains(status.getState())) {
latch.countDown();
}
return driverStatus;
}

@Override
public Protos.Status sendFrameworkMessage(final byte[] data) {
log.info("The message of send to framework is {}", data);
return driverStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.cloud.executor.local;

import com.dangdang.ddframe.job.cloud.executor.TaskExecutor;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.context.ExecutionType;
import com.dangdang.ddframe.job.context.TaskContext;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.util.config.ShardingItemParameters;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.mesos.Protos;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.dangdang.ddframe.job.api.JobType.DATAFLOW;
import static com.dangdang.ddframe.job.cloud.executor.local.LocalCloudJobExecutionType.DAEMON;

/**
* 本地作业执行器.
*
* @author gaohongtao
*/
public final class LocalTaskExecutor {

private final TaskExecutor taskExecutor = new TaskExecutor();

private final LocalExecutorDriver localExecutorDriver;

private final LocalCloudJobConfiguration localCloudJobConfiguration;

private final CountDownLatch latch;

private final int shardingTotalCount;

private final List<Protos.TaskID> runningTasks;

public LocalTaskExecutor(final LocalCloudJobConfiguration localCloudJobConfiguration) {
this.localCloudJobConfiguration = localCloudJobConfiguration;
shardingTotalCount = localCloudJobConfiguration.getTypeConfig().getCoreConfig().getShardingTotalCount();
latch = new CountDownLatch(shardingTotalCount);
localExecutorDriver = new LocalExecutorDriver(latch);
runningTasks = new ArrayList<>(shardingTotalCount);
}

/**
* 运行作业.
*/
public Future<Integer> run() {
Map<Integer, String> shardingItemParameters = new ShardingItemParameters(localCloudJobConfiguration.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
for (int i = 0; i < shardingTotalCount; i++) {
TaskContext taskContext = new TaskContext(localCloudJobConfiguration.getJobName(), Collections.singletonList(i), ExecutionType.READY);
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(taskContext.getId()).build();
runningTasks.add(taskID);
taskExecutor.launchTask(localExecutorDriver, Protos.TaskInfo.newBuilder().setName(localCloudJobConfiguration.getJobName())
.setTaskId(taskID).setSlaveId(Protos.SlaveID.newBuilder().setValue(taskContext.getSlaveId()))
.setData(ByteString.copyFrom(serialize(taskContext, i, shardingItemParameters.get(i)))).build());
}
return new ExecutorFuture();
}

private byte[] serialize(final TaskContext taskContext, final Integer shardingItem, final String shardingParameter) {
LinkedHashMap<String, Object> result = new LinkedHashMap<>(2, 1);
result.put("shardingContext", buildShardingContexts(taskContext, shardingItem, shardingParameter));
result.put("jobConfigContext", buildJobConfigurationContext());
return SerializationUtils.serialize(result);
}

private ShardingContexts buildShardingContexts(final TaskContext taskContext, final Integer shardingItem, final String shardingParameter) {
Map<Integer, String> shardingItemParameters = new HashMap<>(1);
shardingItemParameters.put(shardingItem, shardingParameter);
return new ShardingContexts(taskContext.getId(), taskContext.getMetaInfo().getJobName(), shardingTotalCount, localCloudJobConfiguration
.getTypeConfig().getCoreConfig().getJobParameter(), shardingItemParameters, -1);
}

private Map<String, String> buildJobConfigurationContext() {
Map<String, String> result = new LinkedHashMap<>(7);
if (localCloudJobConfiguration.getTypeConfig().getJobType().equals(DATAFLOW)) {
result.put("streamingProcess", Boolean.toString(((DataflowJobConfiguration) localCloudJobConfiguration.getTypeConfig()).isStreamingProcess()));
}
result.put("jobType", localCloudJobConfiguration.getTypeConfig().getJobType().name());
result.put("jobName", localCloudJobConfiguration.getJobName());
result.put("jobClass", localCloudJobConfiguration.getTypeConfig().getJobClass());
if (DAEMON.equals(localCloudJobConfiguration.getExecutionType())) {
result.put("cron", localCloudJobConfiguration.getTypeConfig().getCoreConfig().getCron());
}
result.put("applicationContext", localCloudJobConfiguration.getApplicationContext());
result.put("beanName", localCloudJobConfiguration.getBeanName());
return result;
}

private class ExecutorFuture implements Future<Integer> {

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
for (Protos.TaskID each : runningTasks) {
taskExecutor.killTask(localExecutorDriver, each);
}
return true;
}

@Override
public boolean isCancelled() {
return DAEMON.equals(localCloudJobConfiguration.getExecutionType());
}

@Override
public boolean isDone() {
return latch.getCount() < 1;
}

@Override
public Integer get() throws InterruptedException, ExecutionException {
latch.await();
return Long.valueOf(shardingTotalCount - latch.getCount()).intValue();
}

@Override
public Integer get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return Long.valueOf(shardingTotalCount - latch.getCount()).intValue();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.dangdang.ddframe.job.cloud.executor;

import com.dangdang.ddframe.job.cloud.executor.local.AllLocalExecutorTests;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.junit.runner.RunWith;
Expand All @@ -29,7 +30,8 @@
DaemonTaskSchedulerTest.class,
JobConfigurationContextTest.class,
TaskExecutorTest.class,
TaskExecutorThreadTest.class
TaskExecutorThreadTest.class,
AllLocalExecutorTests.class
})
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AllCloudExecutorTests {
Expand Down
Loading

0 comments on commit f3d9fa2

Please sign in to comment.