From 99ecd3e82e078b83639589c7d2cf0f5a4318065b Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 7 Apr 2023 11:04:06 +0800 Subject: [PATCH] [Hotfix][Zeta] Fix Connector load logic from zeta (#4510) --- .../engine/core/parse/ConfigParserUtil.java | 4 ++ .../parse/MultipleTableJobConfigParser.java | 55 +++++++++++++------ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java index 3ffdc9586de..a1b8ec87e7e 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConfigParserUtil.java @@ -283,6 +283,10 @@ public static String getFactoryId(ReadonlyConfig readonlyConfig) { return readonlyConfig.getOptional(FACTORY_ID).orElse(readonlyConfig.get(PLUGIN_NAME)); } + public static String getFactoryId(Config config) { + return getFactoryId(ReadonlyConfig.fromConfig(config)); + } + private enum VertexStatus { CREATED, LINKED diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index b4e1b424d83..b4b87cb8ed6 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -42,10 +42,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.Constants; -import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.exception.JobDefineCheckException; @@ -56,6 +55,8 @@ import org.apache.seatunnel.engine.core.dag.actions.SinkConfig; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.core.dag.actions.TransformAction; +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -66,7 +67,6 @@ import lombok.extern.slf4j.Slf4j; import scala.Tuple2; -import java.io.IOException; import java.io.Serializable; import java.net.URL; import java.nio.file.Paths; @@ -85,6 +85,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId; import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryUrls; @@ -126,19 +127,6 @@ public MultipleTableJobConfigParser( } public ImmutablePair, Set> parse() { - List connectorJars = new ArrayList<>(); - try { - connectorJars = FileUtils.searchJarFiles(Common.connectorJarDir("seatunnel")); - } catch (IOException e) { - LOGGER.info(e); - } - if (!commonPluginJars.isEmpty()) { - connectorJars.addAll(commonPluginJars); - } - ClassLoader classLoader = - new SeaTunnelChildFirstClassLoader( - connectorJars, Thread.currentThread().getContextClassLoader()); - Thread.currentThread().setContextClassLoader(classLoader); List sourceConfigs = TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "source", Collections.emptyList()); @@ -149,6 +137,15 @@ public ImmutablePair, Set> parse() { TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "sink", Collections.emptyList()); + List connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs); + if (!commonPluginJars.isEmpty()) { + connectorJars.addAll(commonPluginJars); + } + ClassLoader classLoader = + new SeaTunnelChildFirstClassLoader( + connectorJars, Thread.currentThread().getContextClassLoader()); + Thread.currentThread().setContextClassLoader(classLoader); + ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs); this.fillJobConfig(); @@ -184,6 +181,32 @@ public Set getUsedFactoryUrls(List sinkActions) { return urls; } + private List getConnectorJarList( + List sourceConfigs, List sinkConfigs) { + List factoryIds = + Stream.concat( + sourceConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants.SOURCE_PLUGIN, + factory)), + sinkConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants.SINK_PLUGIN, + factory))) + .collect(Collectors.toList()); + return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds); + } + private void fillUsedFactoryUrls(List actions, Set result) { actions.forEach( action -> {