From db011386e432f2aa39ee5ae4bfbb2e771aa120d7 Mon Sep 17 00:00:00 2001 From: wangchao Date: Fri, 26 Jul 2024 18:39:41 +0800 Subject: [PATCH 1/2] [bug][plugin-discovery] fix multi plugin discovery --- .../discovery/AbstractPluginDiscovery.java | 117 ++++++++++++++++-- .../SeaTunnelSourcePluginDiscoveryTest.java | 29 ++++- .../connectors/plugin-mapping.properties | 8 +- 3 files changed, 137 insertions(+), 17 deletions(-) diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index 175ba435ed6..0bd9146affe 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -54,11 +54,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -427,20 +429,12 @@ public boolean accept(File pathname) { pathname.getName(), pluginJarPrefix); } }); - if (ArrayUtils.isEmpty(targetPluginFiles)) { + if (ArrayUtils.isEmpty(targetPluginFiles) || targetPluginFiles == null) { return Optional.empty(); } - if (targetPluginFiles.length > 1) { - throw new IllegalArgumentException( - "Found multiple plugin jar: " - + Arrays.stream(targetPluginFiles) - .map(File::getPath) - .collect(Collectors.joining(",")) - + " for pluginIdentifier: " - + pluginIdentifier); - } try { - URL pluginJarPath = targetPluginFiles[0].toURI().toURL(); + URL pluginJarPath = + findMostSimlarPluginJarFile(targetPluginFiles, pluginJarPrefix).toURI().toURL(); log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath); return Optional.of(pluginJarPath); } catch (MalformedURLException e) { @@ -451,4 +445,105 @@ public boolean accept(File pathname) { return Optional.empty(); } } + + private static File findMostSimlarPluginJarFile( + File[] targetPluginFiles, String pluginJarPrefix) { + String splitRegex = "\\-|\\_|\\."; + double maxSimlarity = -Integer.MAX_VALUE; + int mostSimlarPluginJarFileIndex = -1; + for (int i = 0; i < targetPluginFiles.length; i++) { + File file = targetPluginFiles[i]; + String fileName = file.getName(); + double similarity = + CosineSimilarityUtil.cosineSimilarity(pluginJarPrefix, fileName, splitRegex); + if (similarity > maxSimlarity) { + maxSimlarity = similarity; + mostSimlarPluginJarFileIndex = i; + } + } + return targetPluginFiles[mostSimlarPluginJarFileIndex]; + } + + static class CosineSimilarityUtil { + public static double cosineSimilarity(String textA, String textB, String splitRegrex) { + Set words1 = + new HashSet<>(Arrays.asList(textA.toLowerCase().split(splitRegrex))); + Set words2 = + new HashSet<>(Arrays.asList(textB.toLowerCase().split(splitRegrex))); + int[] termFrequency1 = calculateTermFrequencyVector(textA, words1, splitRegrex); + int[] termFrequency2 = calculateTermFrequencyVector(textB, words2, splitRegrex); + return calculateCosineSimilarity(termFrequency1, termFrequency2); + } + + public static int[] calculateTermFrequencyVector( + String text, Set words, String splitRegrex) { + int[] termFrequencyVector = new int[words.size()]; + String[] textArray = text.toLowerCase().split(splitRegrex); + List orderedWords = new ArrayList(); + words.clear(); + for (String word : textArray) { + if (!words.contains(word)) { + orderedWords.add(word); + words.add(word); + } + } + for (String word : textArray) { + if (words.contains(word)) { + int index = 0; + for (String w : orderedWords) { + if (w.equals(word)) { + termFrequencyVector[index]++; + break; + } + index++; + } + } + } + return termFrequencyVector; + } + + private static double calculateCosineSimilarity(int[] vectorA, int[] vectorB) { + double dotProduct = 0.0; + double magnitudeA = 0.0; + double magnitudeB = 0.0; + int vectorALength = vectorA.length; + int vectorBLength = vectorB.length; + if (vectorALength < vectorBLength) { + int[] vectorTemp = new int[vectorBLength]; + for (int i = 0; i < vectorB.length; i++) { + if (i <= vectorALength - 1) { + vectorTemp[i] = vectorA[i]; + } else { + vectorTemp[i] = 0; + } + } + vectorA = vectorTemp; + } + if (vectorALength > vectorBLength) { + int[] vectorTemp = new int[vectorALength]; + for (int i = 0; i < vectorA.length; i++) { + if (i <= vectorBLength - 1) { + vectorTemp[i] = vectorB[i]; + } else { + vectorTemp[i] = 0; + } + } + vectorB = vectorTemp; + } + for (int i = 0; i < vectorA.length; i++) { + dotProduct += vectorA[i] * vectorB[i]; + magnitudeA += Math.pow(vectorA[i], 2); + magnitudeB += Math.pow(vectorB[i], 2); + } + + magnitudeA = Math.sqrt(magnitudeA); + magnitudeB = Math.sqrt(magnitudeB); + + if (magnitudeA == 0 || magnitudeB == 0) { + return 0.0; // Avoid dividing by 0 + } else { + return dotProduct / (magnitudeA * magnitudeB); + } + } + } } diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java index 81333d4b4df..88fd76d73be 100644 --- a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java @@ -32,10 +32,13 @@ import com.google.common.collect.Lists; import java.io.IOException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; @DisabledOnOs(OS.WINDOWS) class SeaTunnelSourcePluginDiscoveryTest { @@ -47,7 +50,10 @@ class SeaTunnelSourcePluginDiscoveryTest { private static final List pluginJars = Lists.newArrayList( Paths.get(seatunnelHome, "connectors", "connector-http-jira.jar"), - Paths.get(seatunnelHome, "connectors", "connector-http.jar")); + Paths.get(seatunnelHome, "connectors", "connector-http.jar"), + Paths.get(seatunnelHome, "connectors", "connector-kafka.jar"), + Paths.get(seatunnelHome, "connectors", "connector-kafka-alcs.jar"), + Paths.get(seatunnelHome, "connectors", "connector-kafka-blcs.jar")); @BeforeEach public void before() throws IOException { @@ -67,12 +73,25 @@ void getPluginBaseClass() { List pluginIdentifiers = Lists.newArrayList( PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpJira"), - PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpBase")); + PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "HttpBase"), + PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), "Kafka"), + PluginIdentifier.of("seatunnel", PluginType.SINK.getType(), "Kafka-Blcs")); SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers)); + Assertions.assertIterableEquals( + Stream.of( + Paths.get(seatunnelHome, "connectors", "connector-http-jira.jar") + .toString(), + Paths.get(seatunnelHome, "connectors", "connector-http.jar") + .toString(), + Paths.get(seatunnelHome, "connectors", "connector-kafka.jar") + .toString(), + Paths.get(seatunnelHome, "connectors", "connector-kafka-blcs.jar") + .toString()) + .collect(Collectors.toList()), + seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream() + .map(URL::getPath) + .collect(Collectors.toList())); } @AfterEach diff --git a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties index be38939a7f0..ea20ad05b0f 100644 --- a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties +++ b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties @@ -18,4 +18,10 @@ seatunnel.source.HttpBase = connector-http seatunnel.sink.HttpBase = connector-http seatunnel.source.HttpJira = connector-http-jira -seatunnel.sink.HttpJira = connector-http-jira \ No newline at end of file +seatunnel.sink.HttpJira = connector-http-jira +seatunnel.source.Kafka = connector-kafka +seatunnel.sink.Kafka = connector-kafka +seatunnel.source.Kafka-Alcs = connector-kafka-alcs +seatunnel.sink.Kafka-Alcs = connector-kafka-alcs +seatunnel.source.Kafka-Blcs = connector-kafka-blcs +seatunnel.sink.Kafka-Blcs = connector-kafka-blcs \ No newline at end of file From f1fc895a001f9ac905abb16230ddbe9fff7253a6 Mon Sep 17 00:00:00 2001 From: wangchao Date: Mon, 29 Jul 2024 14:26:02 +0800 Subject: [PATCH 2/2] [bug][plugin-discovery] optimize code --- .../plugin/discovery/AbstractPluginDiscovery.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index 0bd9146affe..d4bd43c3d1c 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -429,12 +429,19 @@ public boolean accept(File pathname) { pathname.getName(), pluginJarPrefix); } }); - if (ArrayUtils.isEmpty(targetPluginFiles) || targetPluginFiles == null) { + if (ArrayUtils.isEmpty(targetPluginFiles)) { return Optional.empty(); } try { - URL pluginJarPath = - findMostSimlarPluginJarFile(targetPluginFiles, pluginJarPrefix).toURI().toURL(); + URL pluginJarPath; + if (targetPluginFiles.length == 1) { + pluginJarPath = targetPluginFiles[0].toURI().toURL(); + } else { + pluginJarPath = + findMostSimlarPluginJarFile(targetPluginFiles, pluginJarPrefix) + .toURI() + .toURL(); + } log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, pluginJarPath); return Optional.of(pluginJarPath); } catch (MalformedURLException e) { @@ -475,7 +482,7 @@ public static double cosineSimilarity(String textA, String textB, String splitRe return calculateCosineSimilarity(termFrequency1, termFrequency2); } - public static int[] calculateTermFrequencyVector( + private static int[] calculateTermFrequencyVector( String text, Set words, String splitRegrex) { int[] termFrequencyVector = new int[words.size()]; String[] textArray = text.toLowerCase().split(splitRegrex);