Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MergingRunData and MergingTreeData #592

Merged
merged 4 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .hlint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- ignore: {name: "Redundant $!"}
- ignore: {name: "Use shows"}
- ignore: {name: "Use fmap"}
- ignore: {name: "Use <=<"}

# Specify additional command line arguments
#
Expand Down
30 changes: 15 additions & 15 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module Bench.Database.LSMTree.Internal.Merge (benchmarks) where

import Control.Monad (zipWithM)
import Control.RefCount
import Criterion.Main (Benchmark, bench, bgroup)
import qualified Criterion.Main as Cr
Expand All @@ -27,6 +26,7 @@ import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.UniqCounter
import Prelude hiding (getContents)
import System.Directory (removeDirectoryRecursive)
import qualified System.FS.API as FS
Expand Down Expand Up @@ -268,11 +268,14 @@ merge fs hbio Config {..} targetPaths runs = do
mergeType f targetPaths runs
Merge.stepsToCompletion m stepSize

fsPath :: FS.FsPath
fsPath = FS.mkFsPath []

outputRunPaths :: Run.RunFsPaths
outputRunPaths = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
outputRunPaths = RunFsPaths fsPath (RunNumber 0)

inputRunPaths :: [Run.RunFsPaths]
inputRunPaths = RunFsPaths (FS.mkFsPath []) . RunNumber <$> [1..]
inputRunPathsCounter :: IO (UniqCounter IO)
inputRunPathsCounter = newUniqCounter 1 -- 0 is for output

type InputRuns = V.Vector (Ref (Run IO FS.HandleIO))

Expand Down Expand Up @@ -384,17 +387,14 @@ randomRuns ::
-> Config
-> StdGen
-> IO InputRuns
randomRuns hasFS hasBlockIO config@Config {..} rng0 =
V.fromList <$>
zipWithM (unsafeFlushAsWriteBuffer hasFS hasBlockIO Index.Compact)
inputRunPaths runsData
where
runsData :: [SerialisedRunData]
runsData =
zipWith
(randomRunData config)
nentries
(List.unfoldr (Just . R.split) rng0)
randomRuns hasFS hasBlockIO config@Config {..} rng0 = do
counter <- inputRunPathsCounter
fmap V.fromList $
mapM (unsafeCreateRun hasFS hasBlockIO Index.Compact fsPath counter) $
zipWith
(randomRunData config)
nentries
(List.unfoldr (Just . R.split) rng0)

-- | Generate keys and entries to insert into the write buffer.
-- They are already serialised to exclude the cost from the benchmark.
Expand Down
3 changes: 3 additions & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ library extras
Database.LSMTree.Extras
Database.LSMTree.Extras.Generators
Database.LSMTree.Extras.Index
Database.LSMTree.Extras.MergingRunData
Database.LSMTree.Extras.MergingTreeData
Database.LSMTree.Extras.NoThunks
Database.LSMTree.Extras.Orphans
Database.LSMTree.Extras.Random
Expand All @@ -333,6 +335,7 @@ library extras
, lsm-tree:control
, lsm-tree:kmerge
, lsm-tree:prototypes
, nonempty-containers
, nothunks
, primitive
, QuickCheck
Expand Down
222 changes: 222 additions & 0 deletions src-extras/Database/LSMTree/Extras/MergingRunData.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
-- | Utilities for generating 'MergingRun's. Tests and benchmarks should
-- preferably use these utilities instead of (re-)defining their own.
module Database.LSMTree.Extras.MergingRunData (
-- * Create merging runs
withMergingRun
, unsafeCreateMergingRun
-- * MergingRunData
, MergingRunData (..)
, mergingRunDataMergeType
, mergingRunDataInvariant
, mapMergingRunData
, SerialisedMergingRunData
, serialiseMergingRunData
-- * QuickCheck
, labelMergingRunData
, genMergingRunData
, shrinkMergingRunData
) where

import Control.Exception (bracket)
import Control.RefCount
import qualified Data.Vector as V
import Database.LSMTree.Extras (showPowersOf)
import Database.LSMTree.Extras.Generators ()
import Database.LSMTree.Extras.RunData
import Database.LSMTree.Internal.Index (IndexType)
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
import Database.LSMTree.Internal.MergingRun (MergingRun)
import qualified Database.LSMTree.Internal.MergingRun as MR
import Database.LSMTree.Internal.Paths
import Database.LSMTree.Internal.Run (RunDataCaching (..))
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise
import Database.LSMTree.Internal.UniqCounter
import qualified System.FS.API as FS
import System.FS.API (HasFS)
import System.FS.BlockIO.API (HasBlockIO)
import Test.QuickCheck as QC

{-------------------------------------------------------------------------------
Create merging runs
-------------------------------------------------------------------------------}

-- | Create a temporary 'MergingRun' using 'unsafeCreateMergingRun'.
withMergingRun ::
MR.IsMergeType t
=> HasFS IO h
-> HasBlockIO IO h
-> ResolveSerialisedValue
-> IndexType
-> FS.FsPath
-> UniqCounter IO
-> SerialisedMergingRunData t
-> (Ref (MergingRun t IO h) -> IO a)
-> IO a
withMergingRun hfs hbio resolve indexType path counter mrd = do
bracket
(unsafeCreateMergingRun hfs hbio resolve indexType path counter mrd)
releaseRef

-- | Flush serialised merging run data to disk.
--
-- This might leak resources if not run with asynchronous exceptions masked.
-- Consider using 'withMergingRun' instead.
--
-- Use of this function should be paired with a 'releaseRef'.
unsafeCreateMergingRun ::
MR.IsMergeType t
=> HasFS IO h
-> HasBlockIO IO h
-> ResolveSerialisedValue
-> IndexType
-> FS.FsPath
-> UniqCounter IO
-> SerialisedMergingRunData t
-> IO (Ref (MergingRun t IO h))
unsafeCreateMergingRun hfs hbio resolve indexType path counter = \case
CompletedMergeData _ numRuns rd -> do
withRun hfs hbio indexType path counter rd $ \run -> do
-- slightly hacky, generally it's larger
let totalDebt = MR.numEntriesToMergeDebt (Run.size run)
MR.newCompleted numRuns totalDebt run

OngoingMergeData mergeType rds -> do
withRuns hfs hbio indexType path counter (toRunData <$> rds)
$ \runs -> do
n <- incrUniqCounter counter
let fsPaths = RunFsPaths path (RunNumber (uniqueToInt n))
MR.new hfs hbio resolve CacheRunData (RunAllocFixed 10) indexType
mergeType fsPaths (V.fromList runs)

{-------------------------------------------------------------------------------
MergingRunData
-------------------------------------------------------------------------------}

-- | A data structure suitable for creating arbitrary 'MergingRun's.
--
-- Note: 'b ~ Void' should rule out blobs.
--
-- Currently, ongoing merges are always \"fresh\", i.e. there is no merge work
-- already performed.
--
-- TODO: Generate merge credits and supply them in 'unsafeCreateMergingRun',
-- similarly to how @ScheduledMergesTest@ does it.
data MergingRunData t k v b =
CompletedMergeData t MR.NumRuns (RunData k v b)
| OngoingMergeData t [NonEmptyRunData k v b] -- ^ at least 2 inputs
deriving stock (Show, Eq)

mergingRunDataMergeType :: MergingRunData t k v b -> t
mergingRunDataMergeType = \case
CompletedMergeData mt _ _ -> mt
OngoingMergeData mt _ -> mt

-- | See @mergeInvariant@ in the prototype.
mergingRunDataInvariant :: MergingRunData t k v b -> Either String ()
mergingRunDataInvariant = \case
CompletedMergeData _ (MR.NumRuns n) _ ->
assertI "completed merges are non-trivial (at least two inputs)" $
n >= 2
OngoingMergeData _ rds -> do
assertI "ongoing merges are non-trivial (at least two inputs)" $
length rds >= 2
where
assertI msg False = Left msg
assertI _ True = Right ()

mapMergingRunData ::
Ord k'
=> (k -> k') -> (v -> v') -> (b -> b')
-> MergingRunData t k v b -> MergingRunData t k' v' b'
mapMergingRunData f g h = \case
CompletedMergeData t n r ->
CompletedMergeData t n $ mapRunData f g h r
OngoingMergeData t rs ->
OngoingMergeData t $ map (mapNonEmptyRunData f g h) rs

type SerialisedMergingRunData t =
MergingRunData t SerialisedKey SerialisedValue SerialisedBlob

serialiseMergingRunData ::
(SerialiseKey k, SerialiseValue v, SerialiseValue b)
=> MergingRunData t k v b -> SerialisedMergingRunData t
serialiseMergingRunData =
mapMergingRunData serialiseKey serialiseValue serialiseBlob

{-------------------------------------------------------------------------------
QuickCheck
-------------------------------------------------------------------------------}

labelMergingRunData ::
Show t => SerialisedMergingRunData t -> Property -> Property
labelMergingRunData (CompletedMergeData mt _ rd) =
tabulate "merging run state" ["CompletedMerge"]
. tabulate "merge type" [show mt]
. labelRunData rd
labelMergingRunData (OngoingMergeData mt rds) =
tabulate "merging run state" ["OngoingMerge"]
. tabulate "merge type" [show mt]
. tabulate "merging run inputs" [showPowersOf 2 (length rds)]
. foldr ((.) . labelNonEmptyRunData) id rds

instance ( Arbitrary t, Ord k, Arbitrary k, Arbitrary v, Arbitrary b
) => Arbitrary (MergingRunData t k v b) where
arbitrary = genMergingRunData arbitrary arbitrary arbitrary arbitrary
shrink = shrinkMergingRunData shrink shrink shrink

genMergingRunData ::
Ord k
=> Gen t
-> Gen k
-> Gen v
-> Gen b
-> Gen (MergingRunData t k v b)
genMergingRunData genMergeType genKey genVal genBlob =
QC.oneof
[ do
mt <- genMergeType
numRuns <- MR.NumRuns <$> QC.chooseInt (2, 8)
rd <- genRunData genKey genVal genBlob
pure (CompletedMergeData mt numRuns rd)
, do
s <- QC.getSize
mt <- genMergeType
n <- QC.chooseInt (2, max 2 (s * 8 `div` 100)) -- 2 to 8
rs <- QC.vectorOf n $
-- Scaled, so overall number of entries is similar to a completed
-- merge. However, the entries themselves should not be smaller.
QC.scale (`div` n) $
genNonEmptyRunData
(resize s genKey)
(resize s genVal)
(resize s genBlob)
pure (OngoingMergeData mt rs)
]

shrinkMergingRunData ::
Ord k
=> (k -> [k])
-> (v -> [v])
-> (b -> [b])
-> MergingRunData t k v b
-> [MergingRunData t k v b]
shrinkMergingRunData shrinkKey shrinkVal shrinkBlob = \case
CompletedMergeData mt numRuns rd ->
[ CompletedMergeData mt numRuns' rd'
| (numRuns', rd') <-
liftShrink2
(fmap MR.NumRuns . filter (>= 2) . shrink . MR.unNumRuns)
(shrinkRunData shrinkKey shrinkVal shrinkBlob)
(numRuns, rd)
]
OngoingMergeData mt rds ->
[ OngoingMergeData mt rds'
| rds' <-
liftShrink
(shrinkNonEmptyRunData shrinkKey shrinkVal shrinkBlob)
rds
, length rds' >= 2
]
Loading