Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix Connector load logic from zeta (apache#4510)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and EricJoy2048 committed Apr 9, 2023
1 parent 5e6d8e3 commit 99ecd3e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ public static String getFactoryId(ReadonlyConfig readonlyConfig) {
return readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME));
}

public static String getFactoryId(Config config) {
return getFactoryId(ReadonlyConfig.fromConfig(config));
}

private enum VertexStatus {
CREATED,
LINKED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
Expand All @@ -56,6 +55,8 @@
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -66,7 +67,6 @@
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Paths;
Expand All @@ -85,6 +85,7 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls;
Expand Down Expand Up @@ -126,19 +127,6 @@ public MultipleTableJobConfigParser(
}

public ImmutablePair<List<Action>, Set<URL>> parse() {
List<URL> connectorJars = new ArrayList<>();
try {
connectorJars = FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel"));
} catch (IOException e) {
LOGGER.info(e);
}
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
ClassLoader classLoader =
new SeaTunnelChildFirstClassLoader(
connectorJars, Thread.currentThread().getContextClassLoader());
Thread.currentThread().setContextClassLoader(classLoader);
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "source", Collections.emptyList());
Expand All @@ -149,6 +137,15 @@ public ImmutablePair<List<Action>, Set<URL>> parse() {
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());

List<URL> connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs);
if (!commonPluginJars.isEmpty()) {
connectorJars.addAll(commonPluginJars);
}
ClassLoader classLoader =
new SeaTunnelChildFirstClassLoader(
connectorJars, Thread.currentThread().getContextClassLoader());
Thread.currentThread().setContextClassLoader(classLoader);

ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);

this.fillJobConfig();
Expand Down Expand Up @@ -184,6 +181,32 @@ public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
return urls;
}

private List<URL> getConnectorJarList(
List<? extends Config> sourceConfigs, List<? extends Config> sinkConfigs) {
List<PluginIdentifier> factoryIds =
Stream.concat(
sourceConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants.SOURCE_PLUGIN,
factory)),
sinkConfigs.stream()
.map(ConfigParserUtil::getFactoryId)
.map(
factory ->
PluginIdentifier.of(
CollectionConstants
.SEATUNNEL_PLUGIN,
CollectionConstants.SINK_PLUGIN,
factory)))
.collect(Collectors.toList());
return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds);
}

private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {
actions.forEach(
action -> {
Expand Down

0 comments on commit 99ecd3e

Please sign in to comment.