Skip to content

Commit

Permalink
(PE-37113) benchmark: prevent simultaneous cmds for same host
Browse files Browse the repository at this point in the history
Previously, when in --nummsgs mode (no rate limit), there was nothing
to prevent a host from "wrapping around" and being processed again,
while its previous command sequence or even *sequences* were still
pending.

Fix that by pulling the simulators out of their own little world and
into a "global" loop with the rest of the processing, i.e. instead of
having the simulators process the host-infos in an isolated circle as
fast as the senders ask for them, close the loop so that the senders
won't see a given host again until the current command sequence has
finished.

After this change a host's information travels from the simulator, to
the sender, and then, only after the final event for that host has
executed, back to the simulators for the next round.
  • Loading branch information
rbrw committed Jan 5, 2024
1 parent 7a4bfdf commit cadae82
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions src/puppetlabs/puppetdb/cli/benchmark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@
(assoc event :what :send-report)
(event-delay :send-facts :send-report)
event-ch))
(seq-end))
(seq-end host-info))
result))

(defn- handle-catalog-queries
Expand All @@ -626,13 +626,14 @@
(-> event (dissoc :queries) (assoc :what :send-catalog))
(event-delay :catalog-queries :send-catalog)
event-ch))
(seq-end))
(seq-end (:host-info event)))
result))

(defn- handle-send-catalog
[event base-url ssl-opts scheduler event-delay event-ch seq-end]
(assert (= :send-catalog (:what event)))
(let [{:keys [host catalog report]} (:host-info event)
(let [host-info (:host-info event)
{:keys [host catalog report]} host-info
result (try!
(send-catalog (cmd-url base-url) host 9 catalog ssl-opts)
(assoc event :result true)
Expand All @@ -643,7 +644,7 @@
report (schedule-event scheduler (assoc event :what :send-report)
(event-delay :send-catalog :send-report)
event-ch))
(seq-end))
(seq-end host-info))
result))

(defn- handle-send-report
Expand All @@ -656,7 +657,7 @@
(catch IOException ex
(report-sender-ex ex)
(assoc event :result ex)))]
(seq-end)
(seq-end host-info)
result))

;;; The start-with-* functions initiate a sequence of delayed events for
Expand Down Expand Up @@ -727,17 +728,19 @@
host-info-ch and sends commands to the puppetdb at base-url. Writes
::submitted to rate-monitor-ch for every command sent, or ::error if there was
a problem. Close host-info-ch to stop the background process."
[base-url host-info-ch rate-monitor-ch senders ssl-opts scheduler cmd-opts]
[base-url host-info-ch sim-ch rate-monitor-ch senders ssl-opts scheduler cmd-opts]
(let [stop-ch (chan)
event-ch (chan)
sender-ch (chan)
state (atom {:more-hosts? true :pending-sequences 0})
seq-end (fn seq-ended []
seq-end (fn seq-ended [host-info]
(let [state (swap! state update :pending-sequences dec)]
(when (and (zero? (:pending-sequences state))
(not (:more-hosts? state)))
(async/close! event-ch))))
(if (and (zero? (:pending-sequences state))
(not (:more-hosts? state)))
(async/close! event-ch)
(>!! sim-ch (:host-path host-info)))))
stage-event (director base-url ssl-opts scheduler cmd-opts event-ch seq-end)]

;; Send host-info and events to the senders, with events having
;; priority. Critical that this be serialized wrt more-hosts? vs
;; pending-sequences state updates. Currently, that's arranged by
Expand Down Expand Up @@ -951,8 +954,8 @@
(async/alt!!
stop-ch (doseq [c [read-ch sim-ch host-info-ch]] (async/close! c))
(async/timeout (int (/ (- deadline (time/ephemeral-now-ns)) 1000000))) nil))
(>!! sim-ch host-path)
(prune-host-info new-state facts catalogs reports))))
(assoc (prune-host-info new-state facts catalogs reports)
:host-path host-path))))
read-ch)]))

(defn warn-missing-data [catalogs reports facts]
Expand Down Expand Up @@ -1000,22 +1003,25 @@
;; [host-map]
;; |
;; v
;; (sim-input-ch: mix) <--- [host-path] ----\
;; | |
;; v |
;; simulation-loop (sim-next-ch) -----------/
;; (host-info-ch)
;; |
;; [host-info]
;; |
;; v
;; event-prioritizer <-- [event (SenderEvent)]----\
;; | |
;; v |
;; [event | host-info] scheduler
;; | |
;; v |
;; sender ---------------[event (SenderEvent)] ---/
;; (mix: sim-input-ch sim-next-ch) < --- [host-path] ------\
;; | |
;; v |
;; simulators |
;; | |
;; (host-info-ch) |
;; | |
;; [host-info incl host-path] |
;; | |
;; v |
;; event-prioritizer <-- [event (SenderEvent)]----\ |
;; | | |
;; v | |
;; [event | host-info] scheduler |
;; | | |
;; v | |
;; sender ---------------[event (SenderEvent)] ---/ |
;; | |
;; |------[host-path] ----------------------------------/
;; |
;; v
;; [event]
Expand All @@ -1029,7 +1035,10 @@
;; the scheduler. To shut down properly, just close the "stop"
;; channels provided by the simulator and the sender. When the
;; channel returned by the rate monitor closes, everything should be
;; finiwhed.
;; finished.
;;
;; The host-path sim-next-ch loop ensures that we can't issue more
;; than one command sequence for a given host at a time.

(defn benchmark
"Feeds commands to PDB as requested by args. Returns a map of :join, a
Expand Down Expand Up @@ -1096,8 +1105,8 @@
rate-wait (start-rate-monitor rate-monitor-ch (-> 30 time/minutes) commands-per-puppet-run)

[send-stop _send-wait]
(start-command-sender base-url host-info-ch rate-monitor-ch senders
ssl-opts event-scheduler cmd-opts)
(start-command-sender base-url host-info-ch sim-next-ch rate-monitor-ch
senders ssl-opts event-scheduler cmd-opts)

[sim-stop _sim-wait]
(start-simulation-loop numhosts run-interval-minutes nummsgs
Expand Down

0 comments on commit cadae82

Please sign in to comment.