diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj b/storm-core/src/clj/backtype/storm/daemon/drpc.clj index f6c907385..9f4c2b4ca 100644 --- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj +++ b/storm-core/src/clj/backtype/storm/daemon/drpc.clj @@ -10,6 +10,9 @@ (:import [backtype.storm.daemon Shutdownable]) (:import [java.net InetAddress]) (:use [backtype.storm bootstrap config log]) + (:use compojure.core) + (:use ring.middleware.reload) + (:use [ring.adapter.jetty :only [run-jetty]]) (:gen-class)) (bootstrap) @@ -97,35 +100,63 @@ (.interrupt clear-thread)) ))) +(defn handle-request [handler] + (fn [request] + (handler request))) + +(defn webapp [handler] + (->(def http-routes + (routes + (GET "/drpc/:func/:args" [func args & m] + (.execute handler func args)) + (GET "/drpc/:func/" [func & m] + (.execute handler func "")) + (GET "/drpc/:func" [func & m] + (.execute handler func "")))) + (wrap-reload '[backtype.storm.daemon.drpc]) + handle-request)) + (defn launch-server! ([] (let [conf (read-storm-config) - worker-threads (int (conf DRPC-WORKER-THREADS)) + worker-threads (int (conf DRPC-WORKER-THREADS)) queue-size (int (conf DRPC-QUEUE-SIZE)) - service-handler (service-handler) + drpc-http-port (if (conf DRPC-HTTP-PORT) (int (conf DRPC-HTTP-PORT)) 0) + drpc-port (int (conf DRPC-PORT)) + drpc-service-handler (service-handler) ;; requests and returns need to be on separate thread pools, since calls to ;; "execute" don't unblock until other thrift methods are called. So if ;; 64 threads are calling execute, the server won't accept the result ;; invocations that will unblock those threads - handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT))) + handler-server (when (> drpc-port 0) + (THsHaServer. (-> (TNonblockingServerSocket. drpc-port) (THsHaServer$Args.) (.workerThreads 64) - (.executorService (ThreadPoolExecutor. worker-threads worker-threads + (.executorService (ThreadPoolExecutor. worker-threads worker-threads 60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size))) (.protocolFactory (TBinaryProtocol$Factory.)) - (.processor (DistributedRPC$Processor. service-handler)) + (.processor (DistributedRPC$Processor. drpc-service-handler)) )) + ) invoke-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory.)) - (.processor (DistributedRPCInvocations$Processor. service-handler)) + (.processor (DistributedRPCInvocations$Processor. drpc-service-handler)) ))] - - (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server)))) + (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] + (if handler-server (.stop handler-server)) + (.stop invoke-server)))) (log-message "Starting Distributed RPC servers...") (future (.serve invoke-server)) - (.serve handler-server)))) + (when (> drpc-http-port 0) + (run-jetty (webapp drpc-service-handler) + {:port drpc-http-port :join? false}) + ) + (when handler-server + (.serve handler-server) + ) + ))) (defn -main [] (launch-server!)) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 89e17d27e..f8ba198b2 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -324,6 +324,12 @@ public class Config extends HashMap { public static final String DRPC_SERVERS = "drpc.servers"; public static final Object DRPC_SERVERS_SCHEMA = ConfigValidation.StringsValidator; + /** + * This port is used by Storm DRPC for receiving HTTP DPRC requests from clients. + */ + public static final String DRPC_HTTP_PORT = "drpc.http.port"; + public static final Object DRPC_HTTP_PORT_SCHEMA = Number.class; + /** * This port is used by Storm DRPC for receiving DPRC requests from clients. */