Skip to content

Commit

Permalink
ChainDB: optimise chain selection for forks
Browse files Browse the repository at this point in the history
Fixes #1223.

When a block fits onto a fork, we construct the fragment all the way going
back to the immutable tip instead of just a `ChainDiff` to the most recent
intersection with the current chain. For example, when switching to a fork
that requires rolling back 2 blocks, previously, we'd construct a fragment
containing `k` headers. Now, we'll construct a `ChainDiff` with rollback = 2
and a fragment of 2 headers.

* `VolDB`: introduce `ReversePath` and `computeReversePath` which lazily
  computes a reverse path through the VolatileDB. By using these paths
  everywhere, we no longer have to think about looking things up in the
  VolatileDB.

* `VolDB`: let `isReachable` return a `PointChainDiff` instead of a list of
  hashes going all the way back to the immutable tip. This `PointChainDiff`
  can later be translated to a `ChainDiff`.

* Add property tests for `isReachable`, which helped catch a tricky corner
  case involving EBBs.

* `VolDB`: rewrite `computePath` using `computeReversePath`.

* `Fragment.Diff`: promote the internal `mkRollback` to the public `mkDiff`
  smart constructor, now used by chain selection for forks.

* We can now use `RealPoint`s instead of `HeaderHash`s in many functions and
  types: `IteratorBlockGCed`, `ChainDB.Iterator`s, `VolDB.Path`, some trace
  events, etc.

We don't yet take advantage of the `RealPoint` in the implementation of
`ChainDB.Iterator`s.
  • Loading branch information
mrBliss committed Jun 25, 2020
1 parent e83a197 commit d27f426
Show file tree
Hide file tree
Showing 11 changed files with 805 additions and 288 deletions.
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ test-suite test-storage
Test.Ouroboros.Storage.ChainDB.Model
Test.Ouroboros.Storage.ChainDB.Model.Test
Test.Ouroboros.Storage.ChainDB.StateMachine
Test.Ouroboros.Storage.ChainDB.VolDB
Test.Ouroboros.Storage.FS
Test.Ouroboros.Storage.FS.StateMachine
Test.Ouroboros.Storage.ImmutableDB
Expand Down
31 changes: 16 additions & 15 deletions ouroboros-consensus/src/Ouroboros/Consensus/Fragment/Diff.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Ouroboros.Consensus.Fragment.Diff
, getTip
, getAnchorPoint
-- * Constructors
, mkDiff
, extend
, diff
-- * Application
Expand Down Expand Up @@ -62,18 +63,6 @@ pattern ChainDiff
pattern ChainDiff r s <- UnsafeChainDiff r s
{-# COMPLETE ChainDiff #-}

-- | Internal. Return 'Nothing' if the length of the suffix < the rollback.
mkRollback
:: HasHeader (Header blk)
=> Word64
-> AnchoredFragment (Header blk)
-> Maybe (ChainDiff blk)
mkRollback nbRollback suffix
| fromIntegral (AF.length suffix) >= nbRollback
= Just $ UnsafeChainDiff nbRollback suffix
| otherwise
= Nothing

{-------------------------------------------------------------------------------
Queries
-------------------------------------------------------------------------------}
Expand All @@ -90,6 +79,18 @@ getAnchorPoint = castPoint . AF.anchorPoint . getSuffix
Constructors
-------------------------------------------------------------------------------}

-- | Smart constructor. Return 'Nothing' if the length of the suffix < the rollback.
mkDiff
:: HasHeader (Header blk)
=> Word64
-> AnchoredFragment (Header blk)
-> Maybe (ChainDiff blk)
mkDiff nbRollback suffix
| fromIntegral (AF.length suffix) >= nbRollback
= Just $ UnsafeChainDiff nbRollback suffix
| otherwise
= Nothing

-- | Make an extension-only (no rollback) 'ChainDiff'.
extend :: AnchoredFragment (Header blk) -> ChainDiff blk
extend = UnsafeChainDiff 0
Expand All @@ -109,7 +110,7 @@ diff
diff curChain candChain =
case AF.intersect curChain candChain of
Just (_curChainPrefix, _candPrefix, curChainSuffix, candSuffix)
-> mkRollback
-> mkDiff
(fromIntegral (AF.length curChainSuffix))
candSuffix
-- Precondition violated.
Expand Down Expand Up @@ -156,7 +157,7 @@ truncate
-> Maybe (ChainDiff blk)
truncate pt (ChainDiff nbRollback suffix)
| Just suffix' <- AF.rollback (castPoint pt) suffix
= mkRollback nbRollback suffix'
= mkDiff nbRollback suffix'
| otherwise
= error $ "rollback point not on the candidate suffix: " <> show pt

Expand All @@ -170,4 +171,4 @@ takeWhileOldest
-> ChainDiff blk
-> Maybe (ChainDiff blk)
takeWhileOldest accept (ChainDiff nbRollback suffix) =
mkRollback nbRollback (AF.takeWhileOldest accept suffix)
mkDiff nbRollback (AF.takeWhileOldest accept suffix)
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ module Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
import Control.Tracer (Tracer)
import Data.Typeable (Typeable)

import Ouroboros.Network.Block (pattern BlockPoint, HeaderHash,
Serialised (..), StandardHash)
import Ouroboros.Network.Block (pattern BlockPoint, Serialised (..),
StandardHash)
import Ouroboros.Network.Protocol.BlockFetch.Server
(BlockFetchBlockSender (..), BlockFetchSendBlocks (..),
BlockFetchServer (..))
Expand Down Expand Up @@ -50,7 +50,7 @@ data BlockFetchServerException =
-- garbage collected. However, the network protocol will have timed out
-- long before this happens.
forall blk. (Typeable blk, StandardHash blk) =>
BlockGCed (HeaderHash blk)
BlockGCed (RealPoint blk)

-- | Thrown when requesting the genesis block from the database
--
Expand Down Expand Up @@ -120,9 +120,9 @@ blockFetchServer _tracer chainDB _version registry = senderSide
IteratorExhausted -> do
ChainDB.iteratorClose it
return $ SendMsgBatchDone $ return senderSide
IteratorBlockGCed hash -> do
IteratorBlockGCed pt -> do
ChainDB.iteratorClose it
throwM $ BlockGCed @blk hash
throwM $ BlockGCed @blk pt


{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,16 +545,16 @@ traverseIterator f it = it {
data IteratorResult blk b =
IteratorExhausted
| IteratorResult b
| IteratorBlockGCed (HeaderHash blk)
| IteratorBlockGCed (RealPoint blk)
-- ^ The block that was supposed to be streamed was garbage-collected from
-- the VolatileDB, but not added to the ImmutableDB.
--
-- This will only happen when streaming very old forks very slowly.
deriving (Functor, Foldable, Traversable)

deriving instance (Eq blk, Eq b, Eq (HeaderHash blk))
deriving instance (Eq blk, Eq b, StandardHash blk)
=> Eq (IteratorResult blk b)
deriving instance (Show blk, Show b, Show (HeaderHash blk))
deriving instance (Show blk, Show b, StandardHash blk)
=> Show (IteratorResult blk b)

data UnknownRange blk =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import Control.Monad (unless)
import Control.Monad.Except
import Control.Monad.Trans.State.Strict
import Control.Tracer (Tracer, contramap, traceWith)
import Data.Foldable (foldl')
import Data.Function (on)
import Data.List (partition, sortBy)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust, mapMaybe)
import Data.Maybe (catMaybes, isJust, mapMaybe)
import Data.Proxy (Proxy (..))
import Data.Set (Set)
import qualified Data.Set as Set
Expand Down Expand Up @@ -77,8 +76,8 @@ import Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB (LgrDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.Query as Query
import Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import Ouroboros.Consensus.Storage.ChainDB.Impl.VolDB (VolDB,
VolDbSerialiseConstraints)
import Ouroboros.Consensus.Storage.ChainDB.Impl.VolDB
(PointChainDiff (..), VolDB, VolDbSerialiseConstraints)
import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.VolDB as VolDB

-- | Perform the initial chain selection based on the tip of the ImmutableDB
Expand Down Expand Up @@ -416,11 +415,11 @@ chainSelectionForBlock
-> Header blk
-> m (Point blk)
chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
(invalid, succsOf, predecessor, curChain, tipPoint, ledgerDB)
(invalid, succsOf, lookupBlockInfo, curChain, tipPoint, ledgerDB)
<- atomically $ (,,,,,)
<$> (forgetFingerprint <$> readTVar cdbInvalid)
<*> VolDB.filterByPredecessor cdbVolDB
<*> VolDB.getPredecessor cdbVolDB
<*> VolDB.getBlockInfo cdbVolDB
<*> Query.getCurrentChain cdb
<*> Query.getTipPoint cdb
<*> LgrDB.getCurrent cdbLgrDB
Expand All @@ -435,15 +434,12 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
immBlockNo :: WithOrigin BlockNo
immBlockNo = AF.anchorBlockNo curChain

i :: Point blk
i = castPoint $ AF.anchorPoint curChain

-- Let these two functions ignore invalid blocks
predecessor' = ignoreInvalid cdb invalid predecessor
succsOf' = ignoreInvalidSuc cdb invalid succsOf
lookupBlockInfo' = ignoreInvalid cdb invalid lookupBlockInfo
succsOf' = ignoreInvalidSuc cdb invalid succsOf

-- The preconditions
assert (isJust $ predecessor (headerHash hdr)) $ return ()
assert (isJust $ lookupBlockInfo (headerHash hdr)) $ return ()

if
-- The chain might have grown since we added the block such that the
Expand All @@ -463,10 +459,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
trace (TryAddToCurrentChain p)
addToCurrentChain succsOf' curChainAndLedger

| Just hashes <- VolDB.isReachable predecessor' i p -> do
| Just pdiff <- VolDB.isReachable lookupBlockInfo' curChain p -> do
-- ### Switch to a fork
trace (TrySwitchToAFork p hashes)
switchToAFork succsOf' curChainAndLedger hashes
trace (TrySwitchToAFork p pdiff)
switchToAFork succsOf' curChainAndLedger pdiff

| otherwise -> do
-- ### Store but don't change the current chain
Expand Down Expand Up @@ -561,37 +557,40 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
curTip = castPoint $ AF.headPoint curChain
curHead = AF.headAnchor curChain

-- | We have found a path of hashes to the new block through the
-- VolatileDB. We try to extend this path by looking for forks that start
-- with the given block, then we do chain selection and /possibly/ try to
-- switch to a new fork.
-- | We have found a 'PointChainDiff' through the VolatileDB connecting
-- the new block to the current chain. We'll call the intersection/anchor
-- @x@.
--
-- We try to extend this path by looking for forks that start with the
-- given block, then we do chain selection and /possibly/ try to switch to
-- a new fork.
switchToAFork :: HasCallStack
=> (WithOrigin (HeaderHash blk) -> Set (HeaderHash blk))
-> ChainAndLedger blk
-- ^ The current chain (anchored at @i@) and ledger
-> NonEmpty (HeaderHash blk)
-- ^ An uninterrupted path of hashes @(i,b]@.
-> PointChainDiff blk
-- ^ Path of points @(x,b]@
-> m (Point blk)
switchToAFork succsOf curChainAndLedger hashes = do
switchToAFork succsOf curChainAndLedger pdiff = do
let suffixesAfterB = VolDB.candidates succsOf (realPointToPoint p)
initCache = Map.insert (headerHash hdr) hdr (cacheHeaders curChain)
-- Fragments that are anchored at @i@.
candidates <- flip evalStateT initCache $
case NE.nonEmpty suffixesAfterB of
-- If there are no suffixes after @b@, just use the fragment that
-- ends in @b@ as the sole candidate.
Nothing -> (NE.:| []) <$> constructFork i hashes []
Just suffixesAfterB' -> mapM (constructFork i hashes . NE.toList)
suffixesAfterB'
let chainDiffs =
-- The suffixes all fork off from the current chain within @k@
-- blocks, so it satisfies the precondition of
-- 'preferCandidate'.
filter
(preferAnchoredCandidate cdbTopLevelConfig curChain .
Diff.getSuffix)
. mapMaybe (Diff.diff curChain)
$ NE.toList candidates
initCache = Map.singleton (headerHash hdr) hdr
chainDiffs <-
-- The suffixes all fork off from the current chain within @k@
-- blocks, so it satisfies the precondition of 'preferCandidate'.
fmap
( filter
( preferAnchoredCandidate cdbTopLevelConfig curChain
. Diff.getSuffix
)
. catMaybes
)
$ flip evalStateT initCache
$ mapM (constructChainDiff pdiff)
$ if null suffixesAfterB
-- If there are no suffixes after @b@, just use the fragment that
-- ends in @b@ as the sole candidate.
then [[]]
else map NE.toList suffixesAfterB

case NE.nonEmpty chainDiffs of
-- No candidates preferred over the current chain
Expand All @@ -606,7 +605,6 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do
chainSelEnv = mkChainSelEnv curChainAndLedger
curChain = VF.validatedFragment curChainAndLedger
curTip = castPoint $ AF.headPoint curChain
i = AF.castAnchor $ anchor curChain

-- | Create a 'NewTipInfo' corresponding to the tip of the given ledger.
mkNewTipInfo :: LgrDB.LedgerDB blk -> NewTipInfo blk
Expand Down Expand Up @@ -687,43 +685,32 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do

return $ castPoint $ AF.headPoint newChain

-- | Build a cache from the headers in the fragment.
cacheHeaders :: AnchoredFragment (Header blk)
-> Map (HeaderHash blk) (Header blk)
cacheHeaders =
foldl' (\m h -> Map.insert (blockHash h) h m) Map.empty .
AF.toNewestFirst

-- | We have a new block @b@ that doesn't fit onto the current chain, but
-- there is an unbroken path from the tip of the ImmutableDB (@i@ = the
-- anchor point of the current chain) to @b@. We also have a suffix @s@ of
-- hashes that starts after @b@.
-- we have found a 'PointChainDiff' connecting it to the current chain via
-- intersection point @x@. We also have a suffix @s@ of hashes that starts
-- after @b@.
--
-- We will try to construct a fragment @f@ for the fork such that:
-- * @f@ is anchored at @i@
-- * @f@ starts with the headers corresponding to the hashes of @(i,b]@
-- * @f@ is anchored at @x@
-- * @f@ starts with the headers corresponding to the points of @(x,b]@
-- * The next header in @f@ is the header for @b@
-- * Finally, @f@ ends with the headers corresponding to the hashes
-- @(b,?]@ of the suffix @s@.
-- * Any headers from the future are dropped.
--
-- Note that we need to read the headers corresponding to the hashes
-- @(i,b]@ and @(b,?]@ from disk. It is likely that many of these headers
-- are actually on the current chain, so when possible, we reuse these
-- headers instead of reading them from disk.
constructFork
:: Anchor blk -- ^ Tip of ImmutableDB @i@
-> NonEmpty (HeaderHash blk) -- ^ Hashes of @(i,b]@
-> [HeaderHash blk] -- ^ Suffix @s@, hashes of @(b,?]@
-- @(x,b]@ and @(b,?]@ from disk.
constructChainDiff
:: PointChainDiff blk
-> [HeaderHash blk] -- ^ Suffix @s@, hashes of @(b,?]@
-> StateT (Map (HeaderHash blk) (Header blk))
m
(AnchoredFragment (Header blk))
-- ^ Fork, anchored at @i@, contains (the header of) @b@ and ends
(Maybe (ChainDiff blk))
-- ^ Fork, anchored at @x@, contains (the header of) @b@ and ends
-- with the suffix @s@.
constructFork i hashes suffixHashes
= fmap (AF.fromOldestFirst (AF.castAnchor i))
constructChainDiff (PointChainDiff rollback anchor pts) suffixHashes
= fmap (Diff.mkDiff rollback . AF.fromOldestFirst (AF.castAnchor anchor))
$ mapM (getKnownHeaderThroughCache cdbVolDB)
$ NE.toList hashes <> suffixHashes
$ map realPointHash pts <> suffixHashes


-- | Check whether the header for the hash is in the cache, if not, get
Expand Down
Loading

0 comments on commit d27f426

Please sign in to comment.