Skip to content

Commit

Permalink
for #153 remove dbcp dependency for rdb event trace
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 8, 2016
1 parent 9820f39 commit 501b3d8
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
Expand Down Expand Up @@ -57,7 +58,13 @@ public TaskExecutor() {
@Override
public void registered(final ExecutorDriver executorDriver, final Protos.ExecutorInfo executorInfo, final Protos.FrameworkInfo frameworkInfo, final Protos.SlaveInfo slaveInfo) {
if (!executorInfo.getData().isEmpty()) {
jobEventBus = new JobEventBus(SerializationUtils.<JobEventRdbConfiguration>deserialize(executorInfo.getData().toByteArray()));
Map<String, String> data = SerializationUtils.deserialize(executorInfo.getData().toByteArray());
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(data.get("event_trace_rdb_driver"));
dataSource.setUrl(data.get("event_trace_rdb_url"));
dataSource.setPassword(data.get("event_trace_rdb_password"));
dataSource.setUsername(data.get("event_trace_rdb_username"));
jobEventBus = new JobEventBus(new JobEventRdbConfiguration(dataSource));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.dangdang.ddframe.job.api.JobType;
import com.dangdang.ddframe.job.cloud.executor.fixture.TestJob;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
Expand Down Expand Up @@ -93,8 +92,15 @@ public void assertLaunchTaskWithTransientTaskAndJavaScriptJob() {

@Test
public void assertRegisteredWithoutData() {
// CHECKSTYLE:OFF
HashMap<String, String> data = new HashMap<>(4, 1);
// CHECKSTYLE:ON
data.put("event_trace_rdb_driver", "org.h2.Driver");
data.put("event_trace_rdb_url", "jdbc:h2:mem:test_executor");
data.put("event_trace_rdb_username", "sa");
data.put("event_trace_rdb_password", "");
ExecutorInfo executorInfo = ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("test_executor")).setCommand(Protos.CommandInfo.getDefaultInstance())
.setData(ByteString.copyFrom(SerializationUtils.serialize(new JobEventRdbConfiguration("org.h2.Driver", "jdbc:h2:mem:test_executor", "sa", "")))).build();
.setData(ByteString.copyFrom(SerializationUtils.serialize(data))).build();
taskExecutor.registered(executorDriver, executorInfo, frameworkInfo, slaveInfo);
}

Expand Down Expand Up @@ -146,7 +152,9 @@ private TaskInfo.Builder buildTaskInfo(final Map<String, String> jobConfiguratio
}

private byte[] serialize(final Map<String, String> jobConfigurationContext) {
// CHECKSTYLE:OFF
LinkedHashMap<String, Object> result = new LinkedHashMap<>(2, 1);
// CHECKSTYLE:ON
ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.singletonMap(1, "a"));
result.put("shardingContext", shardingContexts);
result.put("jobConfigContext", jobConfigurationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void initConfigurationListener() {
}

private JobEventBus getJobEventBus() {
Optional<JobEventRdbConfiguration> rdbConfig = env.getRdbConfiguration();
Optional<JobEventRdbConfiguration> rdbConfig = env.getJobEventRdbConfiguration();
if (rdbConfig.isPresent()) {
return new JobEventBus(rdbConfig.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.dbcp.BasicDataSource;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;

/**
Expand Down Expand Up @@ -102,21 +104,42 @@ public FrameworkConfiguration getFrameworkConfiguration() {
}

/**
* 获取RDB关系型数据库配置对象.
* 获取作业数据库事件配置.
*
* @return RDB关系型数据库配置对象
* @return 作业数据库事件配置
*/
public Optional<JobEventRdbConfiguration> getRdbConfiguration() {
public Optional<JobEventRdbConfiguration> getJobEventRdbConfiguration() {
String driver = getValue(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER);
String url = getValue(EnvironmentArgument.EVENT_TRACE_RDB_URL);
String username = getValue(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME);
String password = getValue(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD);
if (!Strings.isNullOrEmpty(driver) && !Strings.isNullOrEmpty(url) && !Strings.isNullOrEmpty(username)) {
return Optional.of(new JobEventRdbConfiguration(driver, url, username, password));
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(driver);
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return Optional.of(new JobEventRdbConfiguration(dataSource));
}
return Optional.absent();
}

/**
* 获取作业数据库事件配置Map.
*
* @return 作业数据库事件配置Map
*/
// CHECKSTYLE:OFF
public HashMap<String, String> getJobEventRdbConfigurationMap() {
HashMap<String, String> result = new HashMap<>(4, 1);
// CHECKSTYLE:ON
result.put(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_DRIVER));
result.put(EnvironmentArgument.EVENT_TRACE_RDB_URL.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_URL));
result.put(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_USERNAME));
result.put(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD.getKey(), getValue(EnvironmentArgument.EVENT_TRACE_RDB_PASSWORD));
return result;
}

private String getValue(final EnvironmentArgument environmentArgument) {
String result = properties.getProperty(environmentArgument.getKey(), environmentArgument.getDefaultValue());
if (environmentArgument.isRequired()) {
Expand Down Expand Up @@ -151,16 +174,14 @@ public enum EnvironmentArgument {
APP_CACHE_ENABLE("app_cache_enable", "false", true),

JOB_STATE_QUEUE_SIZE("job_state_queue_size", "10000", true),

EVENT_TRACE_RDB_DRIVER("event_trace_rdb_driver", "", false),

EVENT_TRACE_RDB_URL("event_trace_rdb_url", "", false),

EVENT_TRACE_RDB_DRIVER("event_trace_rdb_driver", "", false),

EVENT_TRACE_RDB_USERNAME("event_trace_rdb_username", "", false),

EVENT_TRACE_RDB_PASSWORD("event_trace_rdb_password", "", false),

EVENT_TRACE_RDB_LOG_LEVEL("event_trace_rdb_log_level", "INFO", true);
EVENT_TRACE_RDB_PASSWORD("event_trace_rdb_password", "", false);

private final String key;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.dangdang.ddframe.job.cloud.scheduler.config.CloudJobConfiguration;
import com.dangdang.ddframe.job.cloud.scheduler.context.TaskContext;
import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent.State;
import com.dangdang.ddframe.job.executor.ShardingContexts;
Expand Down Expand Up @@ -69,6 +68,8 @@ public final class TaskLaunchProcessor implements Runnable {

private final JobEventBus jobEventBus;

private final BootstrapEnvironment env = BootstrapEnvironment.getInstance();

/**
* 线程关闭.
*/
Expand Down Expand Up @@ -141,10 +142,9 @@ private Protos.TaskInfo getTaskInfo(final Protos.SlaveID slaveID, final TaskAssi
Protos.Resource.Builder cpus = buildResource("cpus", jobConfig.getCpuCount());
Protos.Resource.Builder mem = buildResource("mem", jobConfig.getMemoryMB());
Protos.ExecutorInfo.Builder executorInfoBuilder = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue(taskContext.getExecutorId(jobConfig.getAppURL())))
.setCommand(command).setData(ByteString.copyFrom(SerializationUtils.serialize(BootstrapEnvironment.getInstance().getRdbConfiguration()))).addResources(cpus).addResources(mem);
Optional<JobEventRdbConfiguration> rdbConfig = BootstrapEnvironment.getInstance().getRdbConfiguration();
if (rdbConfig.isPresent()) {
executorInfoBuilder.setData(ByteString.copyFrom(SerializationUtils.serialize(rdbConfig.get()))).build();
.setCommand(command).addResources(cpus).addResources(mem);
if (env.getJobEventRdbConfiguration().isPresent()) {
executorInfoBuilder.setData(ByteString.copyFrom(SerializationUtils.serialize(env.getJobEventRdbConfigurationMap()))).build();
}
return Protos.TaskInfo.newBuilder()
.setTaskId(Protos.TaskID.newBuilder().setValue(taskContext.getId()).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ app_cache_enable=false
job_state_queue_size=10000

# job rdb config
# event_trace_rdb_url=jdbc:mysql://localhost:3306/elastic-job-cloud-log

# event_trace_rdb_driver=com.mysql.jdbc.Driver

# event_trace_rdb_url=jdbc:mysql://localhost:3306/elastic-job-cloud-log

# event_trace_rdb_username=root

# event_trace_rdb_password=
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.io.Serializable;
import java.sql.SQLException;

Expand All @@ -38,18 +39,12 @@ public class JobEventRdbConfiguration extends JobEventRdbIdentity implements Job

private static final long serialVersionUID = 3344410699286435226L;

private final String driverClassName;

private final String url;

private final String username;

private final String password;
private final DataSource dataSource;

@Override
public JobEventListener createJobEventListener() {
try {
return new JobEventRdbListener(this);
return new JobEventRdbListener(dataSource);
} catch (final SQLException ex) {
log.error("Elastic job: create JobEventRdbListener failure, error is: ", ex);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.job.event.type.JobExecutionEvent;
import com.dangdang.ddframe.job.event.type.JobStatusTraceEvent;

import javax.sql.DataSource;
import java.sql.SQLException;

/**
Expand All @@ -32,8 +33,8 @@ public final class JobEventRdbListener extends JobEventRdbIdentity implements Jo

private final JobEventRdbStorage repository;

public JobEventRdbListener(final JobEventRdbConfiguration config) throws SQLException {
repository = new JobEventRdbStorage(config.getDriverClassName(), config.getUrl(), config.getUsername(), config.getPassword());
public JobEventRdbListener(final DataSource dataSource) throws SQLException {
repository = new JobEventRdbStorage(dataSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class JobEventRdbStorage {

private final DataSource dataSource;

JobEventRdbStorage(final String driverClassName, final String url, final String username, final String password) throws SQLException {
dataSource = JobEventRdbDataSourceFactory.getDataSource(driverClassName, url, username, password);
JobEventRdbStorage(final DataSource dataSource) throws SQLException {
this.dataSource = dataSource;
initTables();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.dangdang.ddframe.job.event;

import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfigurationTest;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbDataSourceFactoryTest;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbIdentityTest;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbListenerTest;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbStorageTest;
Expand All @@ -32,8 +31,7 @@
JobEventRdbIdentityTest.class,
JobEventRdbConfigurationTest.class,
JobEventRdbListenerTest.class,
JobEventRdbStorageTest.class,
JobEventRdbDataSourceFactoryTest.class
JobEventRdbStorageTest.class
})
public final class AllEventTests {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.dangdang.ddframe.job.event.rdb;

import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.instanceOf;
Expand All @@ -27,11 +28,16 @@ public final class JobEventRdbConfigurationTest {

@Test
public void assertCreateJobEventListenerSuccess() {
assertThat(new JobEventRdbConfiguration("org.h2.Driver", "jdbc:h2:mem:job_event_storage", "sa", "").createJobEventListener(), instanceOf(JobEventRdbListener.class));
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(org.h2.Driver.class.getName());
dataSource.setUrl("jdbc:h2:mem:job_event_storage");
dataSource.setUsername("sa");
dataSource.setPassword("");
assertThat(new JobEventRdbConfiguration(dataSource).createJobEventListener(), instanceOf(JobEventRdbListener.class));
}

@Test
public void assertCreateJobEventListenerFailure() {
assertNull(new JobEventRdbConfiguration("", "", "", "").createJobEventListener());
assertNull(new JobEventRdbConfiguration(new BasicDataSource()).createJobEventListener());
}
}

This file was deleted.

Loading

0 comments on commit 501b3d8

Please sign in to comment.