Skip to content

Commit

Permalink
ChainDB: optimise chain selection for forks
Browse files Browse the repository at this point in the history
Fixes #1223.

When a block fits onto a fork, we construct the fragment all the way going
back to the immutable tip instead of just a `ChainDiff` to the most recent
intersection with the current chain. For example, when switching to a fork
that requires rolling back 2 blocks, previously, we'd construct a fragment
containing `k` headers. Now, we'll construct a `ChainDiff` with rollback = 2
and a fragment of 2 headers.

* `VolDB`: introduce `ReversePath` and `computeReversePath` which lazily
  computes a path through the VolatileDB. By using these paths everywhere, we
  no longer have to think about looking things up in the VolatileDB.

* `VolDB`: let `isReachable` return a `ChainDiff` of `HeaderFields` instead of
  a list of hashes going all the way back to the immutable tip. This
  `ChainDiff` of `HeaderFields` can later be translated to a `ChainDiff` of
  `Header`s.

* `VolDB`: add `extendWithSuccessors` that will extend a `ChainDiff` of
  `HeaderFields` with its successors, using `candidates`.

* Add property tests for `isReachable`, which helped catch a tricky corner
  case involving EBBs.

* `VolDB`: rewrite `computePath` using `computeReversePath`.

* We can now use `RealPoint`s instead of `HeaderHash`s in many functions and
  types: `IteratorBlockGCed`, `ChainDB.Iterator`s, `VolDB.Path`, some trace
  events, etc.

We don't yet take advantage of the `RealPoint` in the implementation of
`ChainDB.Iterator`s.
  • Loading branch information
mrBliss committed Jun 29, 2020
1 parent 14fadf4 commit ef8b55e
Show file tree
Hide file tree
Showing 12 changed files with 885 additions and 283 deletions.
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ test-suite test-storage
Test.Ouroboros.Storage.ChainDB.Model
Test.Ouroboros.Storage.ChainDB.Model.Test
Test.Ouroboros.Storage.ChainDB.StateMachine
Test.Ouroboros.Storage.ChainDB.VolDB
Test.Ouroboros.Storage.FS
Test.Ouroboros.Storage.FS.StateMachine
Test.Ouroboros.Storage.ImmutableDB
Expand Down
27 changes: 26 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Consensus/Fragment/Diff.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Intended for qualified import
--
Expand All @@ -17,11 +19,14 @@ module Ouroboros.Consensus.Fragment.Diff
-- * Application
, apply
-- * Manipulation
, append
, truncate
, takeWhileOldest
, mapM
) where

import Prelude hiding (truncate)
import Prelude hiding (mapM, truncate)
import qualified Prelude

import Data.Word (Word64)
import GHC.Stack (HasCallStack)
Expand Down Expand Up @@ -124,6 +129,12 @@ apply curChain (ChainDiff nbRollback suffix) =
Manipulation
-------------------------------------------------------------------------------}

-- | Append a @b@ to a 'ChainDiff'.
--
-- PRECONDITION: it must fit onto the end of the suffix.
append :: HasHeader b => ChainDiff b -> b -> ChainDiff b
append (ChainDiff nbRollback suffix) b = (ChainDiff nbRollback (suffix :> b))

-- | Truncate the diff by rolling back the new suffix to the given point.
--
-- PRECONDITION: the given point must correspond to one of the new
Expand Down Expand Up @@ -154,3 +165,17 @@ takeWhileOldest
-> ChainDiff b
takeWhileOldest accept (ChainDiff nbRollback suffix) =
ChainDiff nbRollback (AF.takeWhileOldest accept suffix)

mapM
:: forall a b m.
( HasHeader b
, HeaderHash a ~ HeaderHash b
, Monad m
)
=> (a -> m b)
-> ChainDiff a
-> m (ChainDiff b)
mapM f (ChainDiff rollback suffix) =
ChainDiff rollback
. AF.fromOldestFirst (AF.castAnchor (AF.anchor suffix))
<$> Prelude.mapM f (AF.toOldestFirst suffix)
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ data BlockFetchServerException =
-- garbage collected. However, the network protocol will have timed out
-- long before this happens.
forall blk. (Typeable blk, StandardHash blk) =>
BlockGCed (HeaderHash blk)
BlockGCed (RealPoint blk)

-- | Thrown when requesting the genesis block from the database
--
Expand Down Expand Up @@ -119,9 +119,9 @@ blockFetchServer _tracer chainDB _version registry = senderSide
IteratorExhausted -> do
ChainDB.iteratorClose it
return $ SendMsgBatchDone $ return senderSide
IteratorBlockGCed hash -> do
IteratorBlockGCed pt -> do
ChainDB.iteratorClose it
throwM $ BlockGCed @blk hash
throwM $ BlockGCed @blk pt


{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,16 @@ traverseIterator f it = it {
data IteratorResult blk b =
IteratorExhausted
| IteratorResult b
| IteratorBlockGCed (HeaderHash blk)
| IteratorBlockGCed (RealPoint blk)
-- ^ The block that was supposed to be streamed was garbage-collected from
-- the VolatileDB, but not added to the ImmutableDB.
--
-- This will only happen when streaming very old forks very slowly.
deriving (Functor, Foldable, Traversable)

deriving instance (Eq blk, Eq b, Eq (HeaderHash blk))
deriving instance (Eq blk, Eq b, StandardHash blk)
=> Eq (IteratorResult blk b)
deriving instance (Show blk, Show b, Show (HeaderHash blk))
deriving instance (Show blk, Show b, StandardHash blk)
=> Show (IteratorResult blk b)

data UnknownRange blk =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import Control.Monad (unless)
import Control.Monad.Except
import Control.Monad.Trans.State.Strict
import Control.Tracer (Tracer, contramap, traceWith)
import Data.Foldable (foldl')
import Data.Function (on)
import Data.List (partition, sortBy)
import Data.List.NonEmpty (NonEmpty)
Expand Down Expand Up @@ -415,11 +414,11 @@ chainSelectionForBlock
-> Header blk
-> m (Point blk)
chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
(invalid, succsOf, predecessor, curChain, tipPoint, ledgerDB)
(invalid, succsOf, lookupBlockInfo, curChain, tipPoint, ledgerDB)
<- atomically $ (,,,,,)
<$> (forgetFingerprint <$> readTVar cdbInvalid)
<*> VolDB.filterByPredecessor cdbVolDB
<*> VolDB.getPredecessor cdbVolDB
<*> VolDB.getBlockInfo cdbVolDB
<*> Query.getCurrentChain cdb
<*> Query.getTipPoint cdb
<*> LgrDB.getCurrent cdbLgrDB
Expand All @@ -434,15 +433,12 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
immBlockNo :: WithOrigin BlockNo
immBlockNo = AF.anchorBlockNo curChain

i :: Point blk
i = castPoint $ AF.anchorPoint curChain

-- Let these two functions ignore invalid blocks
predecessor' = ignoreInvalid cdb invalid predecessor
succsOf' = ignoreInvalidSuc cdb invalid succsOf
lookupBlockInfo' = ignoreInvalid cdb invalid lookupBlockInfo
succsOf' = ignoreInvalidSuc cdb invalid succsOf

-- The preconditions
assert (isJust $ predecessor (headerHash hdr)) $ return ()
assert (isJust $ lookupBlockInfo (headerHash hdr)) $ return ()

if
-- The chain might have grown since we added the block such that the
Expand All @@ -462,10 +458,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
trace (TryAddToCurrentChain p)
addToCurrentChain succsOf' curChainAndLedger

| Just hashes <- VolDB.isReachable predecessor' i p -> do
| Just diff <- VolDB.isReachable lookupBlockInfo' curChain p -> do
-- ### Switch to a fork
trace (TrySwitchToAFork p hashes)
switchToAFork succsOf' curChainAndLedger hashes
trace (TrySwitchToAFork p diff)
switchToAFork succsOf' lookupBlockInfo' curChainAndLedger diff

| otherwise -> do
-- ### Store but don't change the current chain
Expand Down Expand Up @@ -559,40 +555,47 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
curTip = castPoint $ AF.headPoint curChain
curHead = AF.headAnchor curChain

-- | We have found a path of hashes to the new block through the
-- VolatileDB. We try to extend this path by looking for forks that start
-- with the given block, then we do chain selection and /possibly/ try to
-- switch to a new fork.
-- | We have found a 'ChainDiff' through the VolatileDB connecting the new
-- block to the current chain. We'll call the intersection/anchor @x@.
--
-- We try to extend this path by looking for forks that start with the
-- given block, then we do chain selection and /possibly/ try to switch to
-- a new fork.
switchToAFork :: HasCallStack
=> (WithOrigin (HeaderHash blk) -> Set (HeaderHash blk))
-> VolDB.LookupBlockInfo blk
-> ChainAndLedger blk
-- ^ The current chain (anchored at @i@) and ledger
-> NonEmpty (HeaderHash blk)
-- ^ An uninterrupted path of hashes @(i,b]@.
-> ChainDiff (HeaderFields blk)
-- ^ Header fields for @(x,b]@
-> m (Point blk)
switchToAFork succsOf curChainAndLedger hashes = do
let suffixesAfterB = VolDB.candidates succsOf (realPointToPoint p)
initCache = Map.insert (headerHash hdr) hdr (cacheHeaders curChain)
-- Fragments that are anchored at @i@.
candidates <- flip evalStateT initCache $
case NE.nonEmpty suffixesAfterB of
-- If there are no suffixes after @b@, just use the fragment that
-- ends in @b@ as the sole candidate.
Nothing -> (NE.:| []) <$> constructFork i hashes []
Just suffixesAfterB' -> mapM (constructFork i hashes . NE.toList)
suffixesAfterB'
let chainDiffs =
-- The suffixes all fork off from the current chain within @k@
-- blocks, so it satisfies the precondition of
-- 'preferCandidate'.
--
-- 'ChainDiff's resulting in chains shorter than the current
-- chain will be filtered out by 'preferAnchoredCandidate'.
filter
(preferAnchoredCandidate cdbTopLevelConfig curChain .
Diff.getSuffix)
. map (Diff.diff curChain)
$ NE.toList candidates
switchToAFork succsOf lookupBlockInfo curChainAndLedger diff = do
-- We use a cache to avoid reading the headers from disk multiple
-- times in case they're part of multiple forks that go through @b@.
let initCache = Map.singleton (headerHash hdr) hdr
chainDiffs <-
-- 4. Filter out candidates that are not preferred over the current
-- chain.
--
-- The suffixes all fork off from the current chain within @k@
-- blocks, so it satisfies the precondition of 'preferCandidate'.
fmap
( filter
( preferAnchoredCandidate cdbTopLevelConfig curChain
. Diff.getSuffix
)
)
-- 3. Translate the 'HeaderFields' to 'Header' by reading the
-- headers from disk.
. flip evalStateT initCache
. mapM translateToHeaders
-- 2. Filter out candidates that are shorter than the current
-- chain. We don't want to needlessly read the headers from disk
-- for those candidates.
. NE.filter (not . Diff.rollbackExceedsSuffix)
-- 1. Extend the diff with candidates fitting on @B@
. VolDB.extendWithSuccessors succsOf lookupBlockInfo
$ diff

case NE.nonEmpty chainDiffs of
-- No candidates preferred over the current chain
Expand All @@ -607,7 +610,6 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
chainSelEnv = mkChainSelEnv curChainAndLedger
curChain = VF.validatedFragment curChainAndLedger
curTip = castPoint $ AF.headPoint curChain
i = AF.castAnchor $ anchor curChain

-- | Create a 'NewTipInfo' corresponding to the tip of the given ledger.
mkNewTipInfo :: LgrDB.LedgerDB blk -> NewTipInfo blk
Expand Down Expand Up @@ -688,44 +690,25 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do

return $ castPoint $ AF.headPoint newChain

-- | Build a cache from the headers in the fragment.
cacheHeaders :: AnchoredFragment (Header blk)
-> Map (HeaderHash blk) (Header blk)
cacheHeaders =
foldl' (\m h -> Map.insert (blockHash h) h m) Map.empty .
AF.toNewestFirst

-- | We have a new block @b@ that doesn't fit onto the current chain, but
-- there is an unbroken path from the tip of the ImmutableDB (@i@ = the
-- anchor point of the current chain) to @b@. We also have a suffix @s@ of
-- hashes that starts after @b@.
-- we have found a 'ChainDiff' connecting it to the current chain via
-- intersection point @x@. We may also have extended that 'ChainDiff' with
-- more blocks fitting onto @b@, i.e., a suffix @s@.
--
-- We will try to construct a fragment @f@ for the fork such that:
-- * @f@ is anchored at @i@
-- * @f@ starts with the headers corresponding to the hashes of @(i,b]@
-- * The next header in @f@ is the header for @b@
-- * Finally, @f@ ends with the headers corresponding to the hashes
-- @(b,?]@ of the suffix @s@.
-- * Any headers from the future are dropped.
-- We now translate that 'ChainDiff' from 'HeaderFields' to 'Header's by
-- reading the headers from disk.
--
-- Note that we need to read the headers corresponding to the hashes
-- @(i,b]@ and @(b,?]@ from disk. It is likely that many of these headers
-- are actually on the current chain, so when possible, we reuse these
-- headers instead of reading them from disk.
constructFork
:: Anchor blk -- ^ Tip of ImmutableDB @i@
-> NonEmpty (HeaderHash blk) -- ^ Hashes of @(i,b]@
-> [HeaderHash blk] -- ^ Suffix @s@, hashes of @(b,?]@
-- @(x,b)@ and @(b,?]@ from disk. Not for @b@, as that's in our cache.
translateToHeaders
:: ChainDiff (HeaderFields blk)
-> StateT (Map (HeaderHash blk) (Header blk))
m
(AnchoredFragment (Header blk))
-- ^ Fork, anchored at @i@, contains (the header of) @b@ and ends
(ChainDiff (Header blk))
-- ^ Fork, anchored at @x@, contains (the header of) @b@ and ends
-- with the suffix @s@.
constructFork i hashes suffixHashes
= fmap (AF.fromOldestFirst (AF.castAnchor i))
$ mapM (getKnownHeaderThroughCache cdbVolDB)
$ NE.toList hashes <> suffixHashes

translateToHeaders =
Diff.mapM (getKnownHeaderThroughCache cdbVolDB . headerFieldHash)

-- | Check whether the header for the hash is in the cache, if not, get
-- the corresponding header from the VolatileDB and store it in the cache.
Expand All @@ -737,11 +720,11 @@ getKnownHeaderThroughCache
-> HeaderHash blk
-> StateT (Map (HeaderHash blk) (Header blk)) m (Header blk)
getKnownHeaderThroughCache volDB hash = gets (Map.lookup hash) >>= \case
Just hdr -> return hdr
Nothing -> do
hdr <- lift $ VolDB.getKnownHeader volDB hash
modify (Map.insert hash hdr)
return hdr
Just hdr -> return hdr
Nothing -> do
hdr <- lift $ VolDB.getKnownHeader volDB hash
modify (Map.insert hash hdr)
return hdr

-- | Environment used by 'chainSelection' and related functions.
data ChainSelEnv m blk = ChainSelEnv
Expand Down
Loading

0 comments on commit ef8b55e

Please sign in to comment.