From faddbb1e5ea92aeb7af32d6e5ad1f4aa7c692a95 Mon Sep 17 00:00:00 2001 From: andystenhe Date: Thu, 6 Mar 2025 19:11:59 +0800 Subject: [PATCH] bug fix:Error while trying to split key and value in configuration #4199 --- .../common/util/PropertiesUtils.scala | 48 +++++++------------ 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index f7f68ae458..c9e3d3338d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -27,7 +27,6 @@ import javax.annotation.Nonnull import java.io._ import java.util.{Properties, Scanner} -import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern import scala.collection.mutable @@ -247,37 +246,24 @@ object PropertiesUtils extends Logger { loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file)) } - def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = { - require(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: yaml must not be null") - val flinkConf = new JavaHashMap[String, String]() - val scanner: Scanner = new Scanner(yaml) - val lineNo: AtomicInteger = new AtomicInteger(0) - while (scanner.hasNextLine) { - val line = scanner.nextLine() - lineNo.incrementAndGet() - // 1. check for comments - // [FLINK-27299] flink parsing parameter bug fixed. - val comments = line.split("^#|\\s+#", 2) - val conf = comments(0).trim - // 2. get key and value - if (conf.nonEmpty) { - val kv = conf.split(": ", 2) - // skip line with no valid key-value pair - if (kv.length == 2) { - val key = kv(0).trim - val value = kv(1).trim - // sanity check - if (key.nonEmpty && value.nonEmpty) { - flinkConf += key -> value - } else { - logWarn(s"Error after splitting key and value in configuration ${lineNo.get()}: $line") - } - } else { - logWarn(s"Error while trying to split key and value in configuration. $lineNo : $line") - } - } + def loadFlinkConfYaml(yamlContent: String): JavaMap[String, String] = { + require( + yamlContent != null && yamlContent.nonEmpty, + "[StreamPark] loadFlinkConfYaml: yaml must not be null") + + val yaml = new Yaml() + val parsedMap = yaml.load[java.util.LinkedHashMap[String, Object]](yamlContent) + + def flatten(map: java.util.Map[String, Object], prefix: String = ""): Map[String, String] = { + map.asScala.flatMap { + case (k, v: java.util.Map[String, Object] @unchecked) => + flatten(v, s"$prefix$k.") + case (k, v) => + Map(s"$prefix$k" -> v.toString) + }.toMap } - flinkConf + + new java.util.HashMap[String, String](flatten(parsedMap).asJava) } /** extract flink configuration from application.properties */