Skip to content

Commit

Permalink
bug fix:Error while trying to split key and value in configuration #4199
Browse files Browse the repository at this point in the history
  • Loading branch information
andystenhe committed Mar 6, 2025
1 parent 1ded050 commit faddbb1
Showing 1 changed file with 17 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import javax.annotation.Nonnull

import java.io._
import java.util.{Properties, Scanner}
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.mutable
Expand Down Expand Up @@ -247,37 +246,24 @@ object PropertiesUtils extends Logger {
loadFlinkConfYaml(org.apache.commons.io.FileUtils.readFileToString(file))
}

def loadFlinkConfYaml(yaml: String): JavaMap[String, String] = {
require(yaml != null && yaml.nonEmpty, "[StreamPark] loadFlinkConfYaml: yaml must not be null")
val flinkConf = new JavaHashMap[String, String]()
val scanner: Scanner = new Scanner(yaml)
val lineNo: AtomicInteger = new AtomicInteger(0)
while (scanner.hasNextLine) {
val line = scanner.nextLine()
lineNo.incrementAndGet()
// 1. check for comments
// [FLINK-27299] flink parsing parameter bug fixed.
val comments = line.split("^#|\\s+#", 2)
val conf = comments(0).trim
// 2. get key and value
if (conf.nonEmpty) {
val kv = conf.split(": ", 2)
// skip line with no valid key-value pair
if (kv.length == 2) {
val key = kv(0).trim
val value = kv(1).trim
// sanity check
if (key.nonEmpty && value.nonEmpty) {
flinkConf += key -> value
} else {
logWarn(s"Error after splitting key and value in configuration ${lineNo.get()}: $line")
}
} else {
logWarn(s"Error while trying to split key and value in configuration. $lineNo : $line")
}
}
def loadFlinkConfYaml(yamlContent: String): JavaMap[String, String] = {
require(
yamlContent != null && yamlContent.nonEmpty,
"[StreamPark] loadFlinkConfYaml: yaml must not be null")

val yaml = new Yaml()
val parsedMap = yaml.load[java.util.LinkedHashMap[String, Object]](yamlContent)

def flatten(map: java.util.Map[String, Object], prefix: String = ""): Map[String, String] = {
map.asScala.flatMap {
case (k, v: java.util.Map[String, Object] @unchecked) =>
flatten(v, s"$prefix$k.")
case (k, v) =>
Map(s"$prefix$k" -> v.toString)
}.toMap
}
flinkConf

new java.util.HashMap[String, String](flatten(parsedMap).asJava)
}

/** extract flink configuration from application.properties */
Expand Down

0 comments on commit faddbb1

Please sign in to comment.