diff --git a/src/puppetlabs/puppetdb/cli/benchmark.clj b/src/puppetlabs/puppetdb/cli/benchmark.clj index a4ced3d37e..194f42532b 100644 --- a/src/puppetlabs/puppetdb/cli/benchmark.clj +++ b/src/puppetlabs/puppetdb/cli/benchmark.clj @@ -38,7 +38,7 @@ Missing hosts (if --numhosts exceeds preserved, for example) will be initialized randomly as by default." (:require - [clojure.core.async :refer [go-loop ! >!! chan] :as async] + [clojure.core.async :refer [go go-loop ! >!! chan] :as async] [clojure.java.io :as io] [clojure.pprint :refer [pprint]] [clojure.walk :as walk] @@ -56,7 +56,8 @@ [puppetlabs.puppetdb.nio :refer [get-path]] [puppetlabs.puppetdb.random :refer [safe-sample-normal random-string random-bool random-sha1]] - [puppetlabs.puppetdb.utils :as utils :refer [println-err schedule]] + [puppetlabs.puppetdb.utils :as utils + :refer [noisy-future println-err schedule]] [puppetlabs.puppetdb.time :as time :refer [now]] [puppetlabs.trapperkeeper.config :as config] [puppetlabs.trapperkeeper.logging :as logutils] @@ -80,7 +81,10 @@ Path StandardCopyOption) (java.nio.file.attribute FileAttribute) - (java.util.concurrent RejectedExecutionException ScheduledThreadPoolExecutor) + (java.util.concurrent RejectedExecutionException + ScheduledThreadPoolExecutor + Semaphore + TimeUnit) (org.apache.commons.compress.archivers.tar TarArchiveEntry))) @@ -365,6 +369,13 @@ [nil "--simulators N" "Command simulators (default: host threads / 2, min 2)" :default (max 2 (long (/ threads 2))) :parse-fn #(Integer/parseInt %)] + [nil "--concurrent-hosts N" + "Hosts simultaneously submitting commands (default: heap MB / 3)" + ;; Slightly conservative default selected by + ;; observation (i.e. affect on heap in visualvm using + ;; current sample-data). + :default (long (/ (.maxMemory (Runtime/getRuntime)) 1024 1024 3)) + :parse-fn #(Integer/parseInt %)] [nil "--simulation-dir DIR" "Persistent host state directory (allows resume)"] ["-o" "--offset N" "Host cert number offset (start with host-N)" :default 0 @@ -580,6 +591,8 @@ (println "Send failed:" (str ex)) (println ex))) +;; REVIEW: guard seq-end calls via finally? + (defn- handle-send-facts [{:keys [host-info what] :as event} base-url ssl-opts scheduler event-delay event-ch seq-end {:keys [catalog-query-prob catalog-query-n] :as _cmd-opts}] @@ -723,71 +736,130 @@ (handle-send-report event base-url ssl-opts seq-end) (throw (ex-info "unexpected event" event))))))) +(defn- semaphore-permit-dispenser [sem quit?] + "Returns a channel that returns a permit (true) whenever one is + available from the given semaphore. The quit? atom should be set to + true when the dispenser is no longer needed, to reclaim the thread, + etc., and cause the channel to close." + (let [now-serving (chan)] + (noisy-future + (try! + (loop [] + (let [permit (.tryAcquire sem 300 TimeUnit/MILLISECONDS)] + (if @quit? + (when permit (.release sem)) + (if-not permit + (recur) + (if (>!! now-serving true) + (recur) + (.release sem)))))) + (finally + (async/close! now-serving)))) + now-serving)) + (defn- start-command-sender "Start a command sending process in the background. Reads host-state maps from host-info-ch and sends commands to the puppetdb at base-url. Writes ::submitted to rate-monitor-ch for every command sent, or ::error if there was a problem. Close host-info-ch to stop the background process." - [base-url host-info-ch sim-ch rate-monitor-ch senders ssl-opts scheduler cmd-opts] + [base-url host-info-ch sim-ch rate-monitor-ch ssl-opts scheduler cmd-opts + {:keys [concurrent-hosts senders]}] (let [stop-ch (chan) + host-throttle-ch (chan) event-ch (chan) sender-ch (chan) - state (atom {:more-hosts? true :pending-sequences 0}) + state (atom {:more-hosts? true :pending-sequences 0 + :validator #(-> % :pending-sequences (>= 0))}) + + quit? (atom false) + host-throttle (Semaphore. concurrent-hosts) + seq-end (fn seq-ended [host-info] (let [state (swap! state update :pending-sequences dec)] (if (and (zero? (:pending-sequences state)) (not (:more-hosts? state))) (async/close! event-ch) - (>!! sim-ch (:host-path host-info))))) + (do + (>!! sim-ch (:host-path host-info)) + (.release host-throttle))))) stage-event (director base-url ssl-opts scheduler cmd-opts event-ch seq-end)] ;; Send host-info and events to the senders, with events having - ;; priority. Critical that this be serialized wrt more-hosts? vs - ;; pending-sequences state updates. Currently, that's arranged by - ;; having this loop be the one that checks the pending count, - ;; since it's also the only thing that can generate new pending - ;; work (incrementing the count), via the director. + ;; priority. Divert the host-info through the host-throttle-ch to + ;; enforce the --concurrent-hosts limit (i.e. maxmumum number of + ;; hosts that can have sequences "in flight"). ;; - ;; Giving the event channel priority is also critical since that - ;; maintains (indirect) backpressure, i.e. we never generate new - ;; delayed work (events) from a new host-info until we've dealt - ;; with any previously generated work that's ready. - (go-loop [srcs [stop-ch event-ch host-info-ch]] - (let [[event-or-info c] (async/alts! srcs :priority true)] - (if-not (nil? event-or-info) - (do - (assert (not (= c stop-ch))) ;; only allow close - (when (= c host-info-ch) - (swap! state update :pending-sequences inc)) - (>! sender-ch event-or-info) - (recur srcs)) - ;; Something closed - (if (= c stop-ch) - (do - (async/close! event-ch) ;; Any remaining events won't block - (async/close! sender-ch)) - ;; Keep going unless we're down to just the close channel, - ;; or the incoming host channel has closed and there's - ;; nothing in-flight (i.e. delayed). - (let [{:keys [more-hosts? pending-sequences]} - (if (= c host-info-ch) - (swap! state assoc :more-hosts? false) - @state) - srcs (remove #(= % c) srcs)] - (if (or (= srcs [stop-ch]) - (and (not more-hosts?) (zero? pending-sequences))) - (async/close! sender-ch) - (recur srcs))))))) + ;; Giving the event channel priority ensuares we never generate + ;; new delayed work (events) from a new host-info until we've + ;; dealt with any previously generated work that's ready. + + ;; Host throttle: shovel host-infos from the simulators + ;; host-info-ch to the host-throttle-ch (which feeds in to the + ;; main "prioritizing" shovel below), Assumes the other shovel + ;; will set quit? and close the host-throttle-ch when it's time to + ;; stop. The permit dispenser (via semaphore) limits the number of + ;; concurrently active hosts since we acquire a permit here for + ;; each host-info we pass along, and the permits are only released + ;; at the end of the host sequence in seq-end. + (go + (try! + (loop [info (! host-throttle-ch info)) + (recur (! sender-ch event-or-info) + (recur srcs)) + ;; Something closed + (when-not (= c stop-ch) + ;; Keep going unless we're down to just the close channel, + ;; or the incoming host channel has closed and there's + ;; nothing in-flight (i.e. delayed). + (let [{:keys [more-hosts? pending-sequences]} + (if (= c host-throttle-ch) + (swap! state assoc :more-hosts? false) + @state) + srcs (filterv #(not= % c) srcs)] + (when (and (not= srcs [stop-ch]) + (or more-hosts? (pos? pending-sequences))) + (recur srcs))))))) + (finally + (reset! quit? true) + (async/close! host-throttle-ch) + (async/close! event-ch) ;; remaining events won't block + (async/close! sender-ch)))) ;; Start the senders - [stop-ch + [state + stop-ch (async/pipeline-blocking senders rate-monitor-ch (map stage-event) sender-ch)])) (defn start-rate-monitor "Start a task which monitors the rate of messages on rate-monitor-ch and prints it to the console every 5 seconds. Uses run-interval to compute the number of nodes that would produce that load." - [rate-monitor-ch run-interval commands-per-puppet-run] + [rate-monitor-ch run-interval commands-per-puppet-run state] (let [run-interval-seconds (time/in-seconds run-interval) expected-node-message-rate (/ commands-per-puppet-run run-interval-seconds)] (println-err @@ -1013,6 +1085,9 @@ ;; [host-info incl host-path] | ;; | | ;; v | +;; (host-throttle-ch) | +;; | | +;; v | ;; event-prioritizer <-- [event (SenderEvent)]----\ | ;; | | | ;; v | | @@ -1039,6 +1114,9 @@ ;; ;; The host-path sim-next-ch loop ensures that we can't issue more ;; than one command sequence for a given host at a time. +;; +;; The host-throttle-ch restricts the number of hosts that have active +;; command/query sequences to the --concurrent-hosts limit. (defn benchmark "Feeds commands to PDB as requested by args. Returns a map of :join, a @@ -1102,11 +1180,16 @@ ;; before we tear down the output channel _ (.setExecuteExistingDelayedTasksAfterShutdownPolicy event-scheduler true) - rate-wait (start-rate-monitor rate-monitor-ch (-> 30 time/minutes) commands-per-puppet-run) - - [send-stop _send-wait] + [state send-stop _send-wait] (start-command-sender base-url host-info-ch sim-next-ch rate-monitor-ch - senders ssl-opts event-scheduler cmd-opts) + ssl-opts event-scheduler cmd-opts + {:senders senders + :concurrent-hosts (:concurrent-hosts options)}) + + rate-wait (start-rate-monitor rate-monitor-ch + (-> 30 time/minutes) + commands-per-puppet-run + state) [sim-stop _sim-wait] (start-simulation-loop numhosts run-interval-minutes nummsgs