From 93b19a06309ffb802f0d1fb5307b7791b57b2443 Mon Sep 17 00:00:00 2001 From: anfeng Date: Wed, 31 Jul 2013 17:47:43 -0700 Subject: [PATCH 1/8] launch worker per environment settings --- .../src/clj/backtype/storm/daemon/supervisor.clj | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..0aabd554f 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -412,12 +412,17 @@ childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) "%ID%" (str port)) - logfilename (str "worker-" port ".log") - command (str "java -server " childopts + java_home (System/getenv "JAVA_HOME") + java_cmd (if java_home (str java_home "/bin/java") "java") + worker_logdir (System/getProperty "worker.logdir") + logfilename (str (if worker_logdir (str worker_logdir "/") "") "worker-" port ".log") + logback_configurationFile_property (System/getProperty "logback.configurationFile") + logback_configurationFile (if logback_configurationFile_property (str "-D" logback_configurationFile_property) "") + command (str java_cmd " -server " childopts " -Djava.library.path=" (conf JAVA-LIBRARY-PATH) " -Dlogfile.name=" logfilename " -Dstorm.home=" storm-home - " -Dlogback.configurationFile=" storm-home "/logback/cluster.xml" + logback_configurationFile " -Dstorm.id=" storm-id " -Dworker.id=" worker-id " -Dworker.port=" port From 3b27367df2d12aa9b2e0e6c2f02460e4b0d3c0e8 Mon Sep 17 00:00:00 2001 From: anfeng Date: Tue, 5 Nov 2013 15:15:43 -0800 Subject: [PATCH 2/8] enable drpc to use thrift and/or http --- conf/defaults.yaml | 1 + .../src/clj/backtype/storm/daemon/drpc.clj | 48 ++++++++++++++++--- storm-core/src/jvm/backtype/storm/Config.java | 6 +++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index ede403450..ce066b575 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -42,6 +42,7 @@ logviewer.port: 8000 logviewer.childopts: "-Xmx128m" +drpc.http.port: 3771 drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index f6c907385..d5ed2a6c1 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -10,6 +10,10 @@ (:import [backtype.storm.daemon Shutdownable]) (:import [java.net InetAddress]) (:use [backtype.storm bootstrap config log]) + (:use compojure.core) + (:use ring.middleware.reload) + (:use [ring.adapter.jetty :only [run-jetty]]) + (:require [ring.util.response :as resp]) (:gen-class)) (bootstrap) @@ -97,35 +101,67 @@ (.interrupt clear-thread)) ))) +(defroutes main-routes + (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] + (-> (.execute (service-handler) func args) + (resp/status 500) + (resp/content-type "text/text")))) + +(defn catch-errors [handler] + (fn [request] + (try + (handler request) + (catch Exception ex + (-> (str "DRPC Server Error") + (resp/status 500) + (resp/content-type "text/text")) + )))) + +(def webapp + (-> #'main-routes + (wrap-reload '[backtype.storm.daemon.drpc]) + catch-errors)) + (defn launch-server! ([] (let [conf (read-storm-config) - worker-threads (int (conf DRPC-WORKER-THREADS)) + worker-threads (int (conf DRPC-WORKER-THREADS)) queue-size (int (conf DRPC-QUEUE-SIZE)) service-handler (service-handler) ;; requests and returns need to be on separate thread pools, since calls to ;; "execute" don't unblock until other thrift methods are called. So if ;; 64 threads are calling execute, the server won't accept the result ;; invocations that will unblock those threads - handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT))) + drpc-http-port (int (conf DRPC-HTTP-PORT)) + drpc-port (int (conf DRPC-PORT)) + handler-server (if (> drpc-port 0) + (THsHaServer. (-> (TNonblockingServerSocket. drpc-port) (THsHaServer$Args.) (.workerThreads 64) - (.executorService (ThreadPoolExecutor. worker-threads worker-threads + (.executorService (ThreadPoolExecutor. worker-threads worker-threads 60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size))) (.protocolFactory (TBinaryProtocol$Factory.)) (.processor (DistributedRPC$Processor. service-handler)) )) + ) invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) (.processor (DistributedRPCInvocations$Processor. service-handler)) ))] - - (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server)))) + (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] + (if handler-server (.stop handler-server)) + (.stop invoke-server)))) (log-message "Starting Distributed RPC servers...") (future (.serve invoke-server)) - (.serve handler-server)))) + (if (> drpc-http-port 0) + (run-jetty webapp {:port drpc-http-port :join? false}) + ) + (if handler-server + (.serve handler-server) + ) + ))) (defn -main [] (launch-server!)) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 89e17d27e..f8ba198b2 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -324,6 +324,12 @@ public class Config extends HashMap { public static final String DRPC_SERVERS = "drpc.servers"; public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator; + /** + * This port is used by Storm DRPC for receiving HTTP DPRC requests from clients. + */ + public static final String DRPC_HTTP_PORT = "drpc.http.port"; + public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class; + /** * This port is used by Storm DRPC for receiving DPRC requests from clients. */ From e65e5ada8946506191c6c8334f9d0028a87dfa47 Mon Sep 17 00:00:00 2001 From: anfeng Date: Tue, 5 Nov 2013 16:20:00 -0800 Subject: [PATCH 3/8] single service handler for thrift and http --- storm-core/src/clj/backtype/storm/daemon/drpc.clj | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index d5ed2a6c1..435d97d0c 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -30,7 +30,7 @@ (@queues-atom function)) ;; TODO: change this to use TimeCacheMap -(defn service-handler [] +(def service-handler (let [conf (read-storm-config) ctr (atom 0) id->sem (atom {}) @@ -103,9 +103,7 @@ (defroutes main-routes (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] - (-> (.execute (service-handler) func args) - (resp/status 500) - (resp/content-type "text/text")))) + (.execute service-handler func args))) (defn catch-errors [handler] (fn [request] @@ -127,7 +125,7 @@ (let [conf (read-storm-config) worker-threads (int (conf DRPC-WORKER-THREADS)) queue-size (int (conf DRPC-QUEUE-SIZE)) - service-handler (service-handler) + ;; service-handler (service-handler) ;; requests and returns need to be on separate thread pools, since calls to ;; "execute" don't unblock until other thrift methods are called. So if ;; 64 threads are calling execute, the server won't accept the result From 78efcc3b7fa3f94556bd216f298431752b1b7ebe Mon Sep 17 00:00:00 2001 From: anfeng Date: Thu, 7 Nov 2013 12:00:14 -0800 Subject: [PATCH 4/8] avoid macros for drpc-http --- .../src/clj/backtype/storm/daemon/drpc.clj | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index 435d97d0c..ece39b310 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -30,7 +30,7 @@ (@queues-atom function)) ;; TODO: change this to use TimeCacheMap -(def service-handler +(defn service-handler [] (let [conf (read-storm-config) ctr (atom 0) id->sem (atom {}) @@ -101,37 +101,32 @@ (.interrupt clear-thread)) ))) -(defroutes main-routes - (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] - (.execute service-handler func args))) - -(defn catch-errors [handler] +(defn handle-request [handler] (fn [request] - (try - (handler request) - (catch Exception ex - (-> (str "DRPC Server Error") - (resp/status 500) - (resp/content-type "text/text")) - )))) + (handler request))) -(def webapp - (-> #'main-routes +(defn webapp [handler] + (->(def http-routes + (routes + (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] + (.execute handler func args)) + (GET "/drpc/:func" [:as {cookies :cookies} func & m] + (.execute handler func "")))) (wrap-reload '[backtype.storm.daemon.drpc]) - catch-errors)) + handle-request)) (defn launch-server! ([] (let [conf (read-storm-config) worker-threads (int (conf DRPC-WORKER-THREADS)) queue-size (int (conf DRPC-QUEUE-SIZE)) - ;; service-handler (service-handler) + drpc-http-port (if (conf DRPC-HTTP-PORT) (int (conf DRPC-HTTP-PORT)) 0) + drpc-port (int (conf DRPC-PORT)) + drpc-service-handler (service-handler) ;; requests and returns need to be on separate thread pools, since calls to ;; "execute" don't unblock until other thrift methods are called. So if ;; 64 threads are calling execute, the server won't accept the result ;; invocations that will unblock those threads - drpc-http-port (int (conf DRPC-HTTP-PORT)) - drpc-port (int (conf DRPC-PORT)) handler-server (if (> drpc-port 0) (THsHaServer. (-> (TNonblockingServerSocket. drpc-port) (THsHaServer$Args.) @@ -139,14 +134,14 @@ (.executorService (ThreadPoolExecutor. worker-threads worker-threads 60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size))) (.protocolFactory (TBinaryProtocol$Factory.)) - (.processor (DistributedRPC$Processor. service-handler)) + (.processor (DistributedRPC$Processor. drpc-service-handler)) )) ) invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) - (.processor (DistributedRPCInvocations$Processor. service-handler)) + (.processor (DistributedRPCInvocations$Processor. drpc-service-handler)) ))] (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (if handler-server (.stop handler-server)) @@ -154,7 +149,8 @@ (log-message "Starting Distributed RPC servers...") (future (.serve invoke-server)) (if (> drpc-http-port 0) - (run-jetty webapp {:port drpc-http-port :join? false}) + (run-jetty (webapp drpc-service-handler) + {:port drpc-http-port :join? false}) ) (if handler-server (.serve handler-server) From a7e5cc142e3cda73646fb1c35b0ea5c5c6a6b8b5 Mon Sep 17 00:00:00 2001 From: anfeng Date: Thu, 7 Nov 2013 20:46:38 -0800 Subject: [PATCH 5/8] remove drpc http from defaults --- conf/defaults.yaml | 2 -- storm-core/src/clj/backtype/storm/daemon/drpc.clj | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index ce066b575..c9f42eeca 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -41,8 +41,6 @@ ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" - -drpc.http.port: 3771 drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index ece39b310..bb1765b9d 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -110,6 +110,8 @@ (routes (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] (.execute handler func args)) + (GET "/drpc/:func/" [:as {cookies :cookies} func & m] + (.execute handler func "")) (GET "/drpc/:func" [:as {cookies :cookies} func & m] (.execute handler func "")))) (wrap-reload '[backtype.storm.daemon.drpc]) From 8d0c3d7dd015d1cbd753ba8b2ed7bf062d8377fc Mon Sep 17 00:00:00 2001 From: anfeng Date: Thu, 7 Nov 2013 20:50:15 -0800 Subject: [PATCH 6/8] undo changes in supervisor.clj --- .../src/clj/backtype/storm/daemon/supervisor.clj | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 0aabd554f..150443165 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -412,17 +412,12 @@ childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS)) "%ID%" (str port)) - java_home (System/getenv "JAVA_HOME") - java_cmd (if java_home (str java_home "/bin/java") "java") - worker_logdir (System/getProperty "worker.logdir") - logfilename (str (if worker_logdir (str worker_logdir "/") "") "worker-" port ".log") - logback_configurationFile_property (System/getProperty "logback.configurationFile") - logback_configurationFile (if logback_configurationFile_property (str "-D" logback_configurationFile_property) "") - command (str java_cmd " -server " childopts + logfilename (str "worker-" port ".log") + command (str "java -server " childopts " -Djava.library.path=" (conf JAVA-LIBRARY-PATH) " -Dlogfile.name=" logfilename " -Dstorm.home=" storm-home - logback_configurationFile + " -Dlogback.configurationFile=" storm-home "/logback/cluster.xml" " -Dstorm.id=" storm-id " -Dworker.id=" worker-id " -Dworker.port=" port From 309010487af1af47fada54b1ff758cd7d5fb11e5 Mon Sep 17 00:00:00 2001 From: anfeng Date: Thu, 7 Nov 2013 20:51:49 -0800 Subject: [PATCH 7/8] undo defaults format change --- conf/defaults.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index c9f42eeca..ede403450 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -41,6 +41,7 @@ ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" + drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 From 1890304e9206011cbec2f452dd58844f49967b44 Mon Sep 17 00:00:00 2001 From: anfeng Date: Wed, 20 Nov 2013 16:25:19 -0800 Subject: [PATCH 8/8] remove unneeded parameters --- storm-core/src/clj/backtype/storm/daemon/drpc.clj | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index bb1765b9d..9f4c2b4ca 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -13,7 +13,6 @@ (:use compojure.core) (:use ring.middleware.reload) (:use [ring.adapter.jetty :only [run-jetty]]) - (:require [ring.util.response :as resp]) (:gen-class)) (bootstrap) @@ -108,11 +107,11 @@ (defn webapp [handler] (->(def http-routes (routes - (GET "/drpc/:func/:args" [:as {cookies :cookies} func args & m] + (GET "/drpc/:func/:args" [func args & m] (.execute handler func args)) - (GET "/drpc/:func/" [:as {cookies :cookies} func & m] + (GET "/drpc/:func/" [func & m] (.execute handler func "")) - (GET "/drpc/:func" [:as {cookies :cookies} func & m] + (GET "/drpc/:func" [func & m] (.execute handler func "")))) (wrap-reload '[backtype.storm.daemon.drpc]) handle-request)) @@ -129,7 +128,7 @@ ;; "execute" don't unblock until other thrift methods are called. So if ;; 64 threads are calling execute, the server won't accept the result ;; invocations that will unblock those threads - handler-server (if (> drpc-port 0) + handler-server (when (> drpc-port 0) (THsHaServer. (-> (TNonblockingServerSocket. drpc-port) (THsHaServer$Args.) (.workerThreads 64) @@ -150,11 +149,11 @@ (.stop invoke-server)))) (log-message "Starting Distributed RPC servers...") (future (.serve invoke-server)) - (if (> drpc-http-port 0) + (when (> drpc-http-port 0) (run-jetty (webapp drpc-service-handler) {:port drpc-http-port :join? false}) ) - (if handler-server + (when handler-server (.serve handler-server) ) )))