Skip to content

Commit

Permalink
(PE-37113) benchmark: restrict number of concurrently active hosts
Browse files Browse the repository at this point in the history
Add a --concurrent-hosts option that restricts the number of hosts
that have active command sequences.  Without this, in --nummsgs mode,
there's no bound on the heap size that might be required (as more and
more delayed sequences pile up in the scheduler), and while we might
change this later, right now each active sequence will have the entire
host-info (e.g. possibly factset, catalog, report) in RAM, so there's
a notable limit (perhaps on the order of hundreds of concurrent
sequences per GB).

The maximum number of hosts that could be in flight in --runinterval
mode depends on the interval and host count.  (If --concurrent-hosts
is set too low, then benchmark won't be able to hit the requested
rate.)
  • Loading branch information
rbrw committed Jan 5, 2024
1 parent cadae82 commit 56434cd
Showing 1 changed file with 130 additions and 47 deletions.
177 changes: 130 additions & 47 deletions src/puppetlabs/puppetdb/cli/benchmark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
Missing hosts (if --numhosts exceeds preserved, for example) will be
initialized randomly as by default."
(:require
[clojure.core.async :refer [go-loop <! >! >!! chan] :as async]
[clojure.core.async :refer [go go-loop <! >! >!! chan] :as async]
[clojure.java.io :as io]
[clojure.pprint :refer [pprint]]
[clojure.walk :as walk]
Expand All @@ -56,7 +56,8 @@
[puppetlabs.puppetdb.nio :refer [get-path]]
[puppetlabs.puppetdb.random
:refer [safe-sample-normal random-string random-bool random-sha1]]
[puppetlabs.puppetdb.utils :as utils :refer [println-err schedule]]
[puppetlabs.puppetdb.utils :as utils
:refer [noisy-future println-err schedule]]
[puppetlabs.puppetdb.time :as time :refer [now]]
[puppetlabs.trapperkeeper.config :as config]
[puppetlabs.trapperkeeper.logging :as logutils]
Expand All @@ -80,7 +81,10 @@
Path
StandardCopyOption)
(java.nio.file.attribute FileAttribute)
(java.util.concurrent RejectedExecutionException ScheduledThreadPoolExecutor)
(java.util.concurrent RejectedExecutionException
ScheduledThreadPoolExecutor
Semaphore
TimeUnit)
(org.apache.commons.compress.archivers.tar TarArchiveEntry)))


Expand Down Expand Up @@ -365,6 +369,13 @@
[nil "--simulators N" "Command simulators (default: host threads / 2, min 2)"
:default (max 2 (long (/ threads 2)))
:parse-fn #(Integer/parseInt %)]
[nil "--concurrent-hosts N"
"Hosts simultaneously submitting commands (default: heap MB / 3)"
;; Slightly conservative default selected by
;; observation (i.e. affect on heap in visualvm using
;; current sample-data).
:default (long (/ (.maxMemory (Runtime/getRuntime)) 1024 1024 3))
:parse-fn #(Integer/parseInt %)]
[nil "--simulation-dir DIR" "Persistent host state directory (allows resume)"]
["-o" "--offset N" "Host cert number offset (start with host-N)"
:default 0
Expand Down Expand Up @@ -580,6 +591,8 @@
(println "Send failed:" (str ex))
(println ex)))

;; REVIEW: guard seq-end calls via finally?

(defn- handle-send-facts
[{:keys [host-info what] :as event} base-url ssl-opts scheduler event-delay event-ch seq-end
{:keys [catalog-query-prob catalog-query-n] :as _cmd-opts}]
Expand Down Expand Up @@ -723,71 +736,130 @@
(handle-send-report event base-url ssl-opts seq-end)
(throw (ex-info "unexpected event" event)))))))

(defn- semaphore-permit-dispenser [sem quit?]
"Returns a channel that returns a permit (true) whenever one is
available from the given semaphore. The quit? atom should be set to
true when the dispenser is no longer needed, to reclaim the thread,
etc., and cause the channel to close."
(let [now-serving (chan)]
(noisy-future
(try!
(loop []
(let [permit (.tryAcquire sem 300 TimeUnit/MILLISECONDS)]
(if @quit?
(when permit (.release sem))
(if-not permit
(recur)
(if (>!! now-serving true)
(recur)
(.release sem))))))
(finally
(async/close! now-serving))))
now-serving))

(defn- start-command-sender
"Start a command sending process in the background. Reads host-state maps from
host-info-ch and sends commands to the puppetdb at base-url. Writes
::submitted to rate-monitor-ch for every command sent, or ::error if there was
a problem. Close host-info-ch to stop the background process."
[base-url host-info-ch sim-ch rate-monitor-ch senders ssl-opts scheduler cmd-opts]
[base-url host-info-ch sim-ch rate-monitor-ch ssl-opts scheduler cmd-opts
{:keys [concurrent-hosts senders]}]
(let [stop-ch (chan)
host-throttle-ch (chan)
event-ch (chan)
sender-ch (chan)
state (atom {:more-hosts? true :pending-sequences 0})
state (atom {:more-hosts? true :pending-sequences 0
:validator #(-> % :pending-sequences (>= 0))})

quit? (atom false)
host-throttle (Semaphore. concurrent-hosts)

seq-end (fn seq-ended [host-info]
(let [state (swap! state update :pending-sequences dec)]
(if (and (zero? (:pending-sequences state))
(not (:more-hosts? state)))
(async/close! event-ch)
(>!! sim-ch (:host-path host-info)))))
(do
(>!! sim-ch (:host-path host-info))
(.release host-throttle)))))
stage-event (director base-url ssl-opts scheduler cmd-opts event-ch seq-end)]

;; Send host-info and events to the senders, with events having
;; priority. Critical that this be serialized wrt more-hosts? vs
;; pending-sequences state updates. Currently, that's arranged by
;; having this loop be the one that checks the pending count,
;; since it's also the only thing that can generate new pending
;; work (incrementing the count), via the director.
;; priority. Divert the host-info through the host-throttle-ch to
;; enforce the --concurrent-hosts limit (i.e. maxmumum number of
;; hosts that can have sequences "in flight").
;;
;; Giving the event channel priority is also critical since that
;; maintains (indirect) backpressure, i.e. we never generate new
;; delayed work (events) from a new host-info until we've dealt
;; with any previously generated work that's ready.
(go-loop [srcs [stop-ch event-ch host-info-ch]]
(let [[event-or-info c] (async/alts! srcs :priority true)]
(if-not (nil? event-or-info)
(do
(assert (not (= c stop-ch))) ;; only allow close
(when (= c host-info-ch)
(swap! state update :pending-sequences inc))
(>! sender-ch event-or-info)
(recur srcs))
;; Something closed
(if (= c stop-ch)
(do
(async/close! event-ch) ;; Any remaining events won't block
(async/close! sender-ch))
;; Keep going unless we're down to just the close channel,
;; or the incoming host channel has closed and there's
;; nothing in-flight (i.e. delayed).
(let [{:keys [more-hosts? pending-sequences]}
(if (= c host-info-ch)
(swap! state assoc :more-hosts? false)
@state)
srcs (remove #(= % c) srcs)]
(if (or (= srcs [stop-ch])
(and (not more-hosts?) (zero? pending-sequences)))
(async/close! sender-ch)
(recur srcs)))))))
;; Giving the event channel priority ensuares we never generate
;; new delayed work (events) from a new host-info until we've
;; dealt with any previously generated work that's ready.

;; Host throttle: shovel host-infos from the simulators
;; host-info-ch to the host-throttle-ch (which feeds in to the
;; main "prioritizing" shovel below), Assumes the other shovel
;; will set quit? and close the host-throttle-ch when it's time to
;; stop. The permit dispenser (via semaphore) limits the number of
;; concurrently active hosts since we acquire a permit here for
;; each host-info we pass along, and the permits are only released
;; at the end of the host sequence in seq-end.
(go
(try!
(loop [info (<! host-info-ch)
now-serving (semaphore-permit-dispenser host-throttle quit?)]
(when (and info (<! now-serving) (>! host-throttle-ch info))
(recur (<! host-info-ch) now-serving)))
(finally
(reset! quit? true)
(async/close! host-throttle-ch))))

;; Main "prioritizing" host/event shovel: pull host-infos and
;; events from the host throttle and the scheduler event channel
;; respectively (prioritizing events), and pass them to the
;; senders. Critical that this be serialized wrt more-hosts? vs
;; pending-sequences state updates. Currently, that's arranged by
;; having this single loop be the one that checks the pending
;; count, since it's also the only thing that can generate new
;; pending work (incrementing the count), via the director.
(go
(try!
(loop [srcs [stop-ch event-ch host-throttle-ch]]
(let [[event-or-info c] (async/alts! srcs :priority true)]
(if-not (nil? event-or-info)
;; Forward event
(do
(assert (not (= c stop-ch))) ;; only allow close
(when (= c host-throttle-ch)
(swap! state update :pending-sequences inc))
(>! sender-ch event-or-info)
(recur srcs))
;; Something closed
(when-not (= c stop-ch)
;; Keep going unless we're down to just the close channel,
;; or the incoming host channel has closed and there's
;; nothing in-flight (i.e. delayed).
(let [{:keys [more-hosts? pending-sequences]}
(if (= c host-throttle-ch)
(swap! state assoc :more-hosts? false)
@state)
srcs (filterv #(not= % c) srcs)]
(when (and (not= srcs [stop-ch])
(or more-hosts? (pos? pending-sequences)))
(recur srcs)))))))
(finally
(reset! quit? true)
(async/close! host-throttle-ch)
(async/close! event-ch) ;; remaining events won't block
(async/close! sender-ch))))

;; Start the senders
[stop-ch
[state
stop-ch
(async/pipeline-blocking senders rate-monitor-ch (map stage-event) sender-ch)]))

(defn start-rate-monitor
"Start a task which monitors the rate of messages on rate-monitor-ch and
prints it to the console every 5 seconds. Uses run-interval to compute the
number of nodes that would produce that load."
[rate-monitor-ch run-interval commands-per-puppet-run]
[rate-monitor-ch run-interval commands-per-puppet-run state]
(let [run-interval-seconds (time/in-seconds run-interval)
expected-node-message-rate (/ commands-per-puppet-run run-interval-seconds)]
(println-err
Expand Down Expand Up @@ -1013,6 +1085,9 @@
;; [host-info incl host-path] |
;; | |
;; v |
;; (host-throttle-ch) |
;; | |
;; v |
;; event-prioritizer <-- [event (SenderEvent)]----\ |
;; | | |
;; v | |
Expand All @@ -1039,6 +1114,9 @@
;;
;; The host-path sim-next-ch loop ensures that we can't issue more
;; than one command sequence for a given host at a time.
;;
;; The host-throttle-ch restricts the number of hosts that have active
;; command/query sequences to the --concurrent-hosts limit.

(defn benchmark
"Feeds commands to PDB as requested by args. Returns a map of :join, a
Expand Down Expand Up @@ -1102,11 +1180,16 @@
;; before we tear down the output channel
_ (.setExecuteExistingDelayedTasksAfterShutdownPolicy event-scheduler true)

rate-wait (start-rate-monitor rate-monitor-ch (-> 30 time/minutes) commands-per-puppet-run)

[send-stop _send-wait]
[state send-stop _send-wait]
(start-command-sender base-url host-info-ch sim-next-ch rate-monitor-ch
senders ssl-opts event-scheduler cmd-opts)
ssl-opts event-scheduler cmd-opts
{:senders senders
:concurrent-hosts (:concurrent-hosts options)})

rate-wait (start-rate-monitor rate-monitor-ch
(-> 30 time/minutes)
commands-per-puppet-run
state)

[sim-stop _sim-wait]
(start-simulation-loop numhosts run-interval-minutes nummsgs
Expand Down

0 comments on commit 56434cd

Please sign in to comment.