Skip to content

Commit

Permalink
Better encapsulate MergingRun internals
Browse files Browse the repository at this point in the history
Provide a couple accessor functions and hide the rest (except the
constructor needed for deriving Generic in the tests).
  • Loading branch information
dcoutts committed Feb 3, 2025
1 parent cac10ef commit a57172e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ scaleCreditsForMerge LevelTiering _ (Credits c) =
-- runs come in).
MR.Credits (c * (1 + 4))

scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
scaleCreditsForMerge LevelLevelling mr (Credits c) =
-- A levelling merge has 1 input run and one resident run, which is (up
-- to) 4x bigger than the others. It needs to be completed before
-- another run comes in.
Expand All @@ -828,7 +828,7 @@ scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
-- worst-case upper bound by looking at the sizes of the input runs.
-- As as result, merge work would/could be more evenly distributed over
-- time when the resident run is smaller than the worst case.
let NumRuns n = MR.mergeNumRuns mr
let NumRuns n = MR.numRuns mr
-- same as division rounding up: ceiling (c * n / 4)
in MR.Credits ((c * n + 3) `div` 4)

Expand Down
37 changes: 22 additions & 15 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@
-- | An incremental merge of multiple runs.
module Database.LSMTree.Internal.MergingRun (
-- * Merging run
MergingRun (..)
MergingRun
, NumRuns (..)
, new
, newCompleted
, duplicateRuns
, supplyCredits
, expectCompleted
, snapshot
, numRuns

-- * Credit tracking
-- $credittracking
, Credits (..)
, CreditThreshold (..)
, SuppliedCredits (..)
, atomicReadSuppliedCredits

-- * Concurrency
-- $concurrency

-- * Internal state
, pattern MergingRun
, MergingRunState (..)
, MergeKnownCompleted (..)
, CreditsVar (..)
Expand Down Expand Up @@ -220,6 +222,24 @@ duplicateRuns (DeRef mr) =
OngoingMerge rs _ -> withActionRegistry $ \reg ->
V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs

-- | Take a snapshot of the state of a merging run.
snapshot ::
(PrimMonad m, MonadMVar m)
=> Ref (MergingRun m h)
-> m (MergingRunState m h,
SuppliedCredits,
NumRuns,
NumEntries)
snapshot (DeRef MergingRun {..}) = do
state <- readMVar mergeState
(SpentCredits spent,
UnspentCredits unspent) <- atomicReadCredits mergeCreditsVar
let supplied = SuppliedCredits (spent + unspent)
return (state, supplied, mergeNumRuns, mergeNumEntries)

numRuns :: Ref (MergingRun m h) -> NumRuns
numRuns (DeRef MergingRun {mergeNumRuns}) = mergeNumRuns

{-------------------------------------------------------------------------------
Credits
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -396,19 +416,6 @@ atomicReadCredits ::
atomicReadCredits (CreditsVar v) =
unpackCreditsPair <$> atomicReadInt v

{-# INLINE atomicReadSuppliedCredits #-}
atomicReadSuppliedCredits ::
PrimMonad m
=> CreditsVar (PrimState m)
-> m SuppliedCredits
atomicReadSuppliedCredits (CreditsVar v) = do
cp <- atomicReadInt v
let !supplied =
case cp of
CreditsPair (SpentCredits spent)
(UnspentCredits unspent) -> spent + unspent
return (SuppliedCredits supplied)

{-# INLINE atomicModifyInt #-}
-- | Atomically modify a single mutable integer variable, using a CAS loop.
atomicModifyInt ::
Expand Down
10 changes: 6 additions & 4 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,16 @@ toSnapIncomingRun ::
=> IncomingRun m h
-> m (SnapIncomingRun (Ref (Run m h)))
toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do
toSnapIncomingRun (Merging mergePolicy mergingRun) = do
-- We need to know how many credits were spend and yet unspent so we can
-- restore merge work on snapshot load. No need to snapshot the contents
-- of totalStepsVar here, since we still start counting from 0 again when
-- loading the snapshot.
MR.SuppliedCredits (MR.Credits suppliedCredits)
<- MR.atomicReadSuppliedCredits mergeCreditsVar
smrs <- toSnapMergingRunState <$> readMVar mergeState
(mergingRunState,
MR.SuppliedCredits (MR.Credits suppliedCredits),
mergeNumRuns,
mergeNumEntries) <- MR.snapshot mergingRun
let smrs = toSnapMergingRunState mergingRunState
pure $
SnapMergingRun
mergePolicy
Expand Down

0 comments on commit a57172e

Please sign in to comment.