Skip to content

Commit

Permalink
Partial revert for k8s deployment purpose "Use programArgsList when d…
Browse files Browse the repository at this point in the history
…eploying to Flink (#7539)" (#7603)

This reverts commit 361a105
  • Loading branch information
arkadius authored Feb 28, 2025
1 parent ab62a15 commit c598588
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.touk.nussknacker.engine.process.runner

// K8s operator parses args as command line arguments - see KubernetesApplicationClusterEntrypoint,
// ClusterEntrypointUtils.parseParametersOrExit and similar issue: https://issues.apache.org/jira/browse/FLINK-32126
// so we need to escape some characters
// This class is used by external project only
object FlinkK8sArgsDecodeHack {

// FRH - flink replacement hack....
def prepareProgramArgs(list: Array[String]): Array[String] = list.map(
_.replace("__FRH_", "\"")
.replace("__FRH2_", "\n")
.replace("__FRH3_", "'")
.replace("__FRH4_", "#")
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,32 @@ import java.nio.charset.StandardCharsets
import scala.util.Using
import scala.util.control.NonFatal

object FlinkStreamingProcessMain extends LazyLogging {
object FlinkStandaloneScenarioMain extends FlinkScenarioMain(identity)

// This class is used by external project only
object FlinkK8sScenarioMain extends FlinkScenarioMain(FlinkK8sArgsDecodeHack.prepareProgramArgs)

class FlinkScenarioMain(preprocessArgs: Array[String] => Array[String]) extends LazyLogging {

def main(args: Array[String]): Unit = {
try {
require(args.nonEmpty, "Scenario json should be passed as a first argument")
val process = readScenarioFromArg(args(0))
val processVersion = parseProcessVersion(args(1))
val deploymentData = parseDeploymentData(args(2))
val preprocessedArgs = preprocessArgs(args)

require(
preprocessedArgs.length >= 3,
"Missing arguments. Usage: CanonicalProcess, ProcessVersion, DeploymentData"
)
val scenario = readScenarioFromArg(preprocessedArgs(0))
val processVersion = parseProcessVersion(preprocessedArgs(1))
val deploymentData = parseDeploymentData(preprocessedArgs(2))
logger.info(
s"Running deployment ${deploymentData.deploymentId} of scenario ${processVersion.processName} in version ${processVersion.versionId}. " +
s"Model version ${processVersion.modelVersion}. Deploying user [id=${deploymentData.user.id}, name=${deploymentData.user.name}]"
)
val modelConfig = readModelConfigFromArgs(args)
val modelConfig = readModelConfigFromArgs(preprocessedArgs)
val modelData = ModelData.duringFlinkExecution(ModelConfigs(modelConfig, deploymentData.additionalModelConfigs))
new FlinkScenarioJob(modelData).run(
process,
scenario,
processVersion,
deploymentData,
StreamExecutionEnvironment.getExecutionEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ object RemoteFlinkScenarioJobRunner {
serializedConfig
)

private[management] val MainClassName = "pl.touk.nussknacker.engine.process.runner.FlinkStreamingProcessMain"
private[management] val MainClassName = "pl.touk.nussknacker.engine.process.runner.FlinkStandaloneScenarioMain"

}

0 comments on commit c598588

Please sign in to comment.