From 3b374994cbf1900412b68748fc2eb37a90df77d3 Mon Sep 17 00:00:00 2001 From: "haiming@lccomputing.com" Date: Mon, 27 Jun 2022 13:54:52 +0800 Subject: [PATCH 1/2] fix worker pushserverport replicateserverport conflict --- common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala | 4 ++++ .../com/aliyun/emr/rss/service/deploy/worker/Worker.scala | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index 663b96dcf06..d6afecd815d 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -574,6 +574,10 @@ object RssConf extends Logging { conf.getInt("rss.fetchserver.port", 0) } + def replicateServerPort(conf: RssConf): Int = { + conf.getInt("rss.replicateserver.port", 0) + } + def registerWorkerTimeoutMs(conf: RssConf): Long = { conf.getTimeAsMs("rss.register.worker.timeout", "180s") } diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala index c0a58eafb9f..b74a05fe0c1 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala @@ -112,7 +112,7 @@ private[deploy] class Worker( new TransportContext(transportConf, rpcHandler, closeIdleConnections, workerSource, replicateLimiter) val serverBootstraps = new jArrayList[TransportServerBootstrap]() - transportContext.createServer(RssConf.pushServerPort(conf), serverBootstraps) + transportContext.createServer(RssConf.replicateServerPort(conf), serverBootstraps) } private val fetchServer = { From d268a8577d1092f021148fbffcd985cb423d3555 Mon Sep 17 00:00:00 2001 From: "haiming@lccomputing.com" Date: Thu, 30 Jun 2022 18:03:22 +0800 Subject: [PATCH 2/2] fix worker rss.master.address configration item overwrited by worker default master address issue --- .../com/aliyun/emr/rss/service/deploy/worker/Worker.scala | 4 +++- .../emr/rss/service/deploy/worker/WorkerArguments.scala | 5 +---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala index 88c8a61a6a1..c86296729d8 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala @@ -979,7 +979,9 @@ private[deploy] object Worker extends Logging { // much as possible. Therefore, if the user manually specifies the address of the Master when // starting the Worker, we should set it in the parameters and automatically calculate what the // address of the Master should be used in the end. - conf.set("rss.master.address", RpcAddress.fromRssURL(workerArgs.master).toString) + if (workerArgs.master != null) { + conf.set("rss.master.address", RpcAddress.fromRssURL(workerArgs.master).toString) + } val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, WorkerSource.ServletPath) diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerArguments.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerArguments.scala index 188d1301d09..5a1a71f739a 100644 --- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerArguments.scala +++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/WorkerArguments.scala @@ -28,7 +28,7 @@ class WorkerArguments(args: Array[String], conf: RssConf) { var port = 0 // var master: String = null // for local testing. - var master: String = s"rss://$host:9097" + var master: String = null var propertiesFile: String = null parse(args.toList) @@ -58,9 +58,6 @@ class WorkerArguments(args: Array[String], conf: RssConf) { parse(tail) case Nil => - if (master == null) { // No positional argument was given - printUsageAndExit(1) - } case _ => printUsageAndExit(1)