Skip to content

Commit

Permalink
Smooth switching from slower to faster peers
Browse files Browse the repository at this point in the history
Implement smooth switching from slower to faster peers by
temporarily ignoring the concurrency limit for faster peers.
This lets faster peers starve out slower peers and makes it possible
to switch peers without first having to wait for all outstanding
requests to the slower peers to finish.
  • Loading branch information
karknu committed Jul 21, 2020
1 parent 777845e commit a097f80
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
75 changes: 57 additions & 18 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import Ouroboros.Network.BlockFetch.DeltaQ
( PeerGSV(..), SizeInBytes
, PeerFetchInFlightLimits(..)
, calculatePeerFetchInFlightLimits
, comparePeerGSV
, estimateResponseDeadlineProbability
, estimateExpectedResponseDuration )

Expand Down Expand Up @@ -93,10 +94,11 @@ data FetchMode =
deriving (Eq, Show)


type PeerInfo header extra =
type PeerInfo header peer extra =
( PeerFetchStatus header,
PeerFetchInFlight header,
PeerGSV,
peer,
extra
)

Expand Down Expand Up @@ -144,15 +146,16 @@ type CandidateFragments header = (ChainSuffix header, [AnchoredFragment header])


fetchDecisions
:: (HasHeader header,
:: (Eq peer,
HasHeader header,
HeaderHash header ~ HeaderHash block)
=> FetchDecisionPolicy header
-> FetchMode
-> AnchoredFragment header
-> (Point block -> Bool)
-> MaxSlotNo
-> [(AnchoredFragment header, PeerInfo header extra)]
-> [(FetchDecision (FetchRequest header), PeerInfo header extra)]
-> [(AnchoredFragment header, PeerInfo header peer extra)]
-> [(FetchDecision (FetchRequest header), PeerInfo header peer extra)]
fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
plausibleCandidateChain,
compareCandidateChains,
Expand Down Expand Up @@ -200,10 +203,10 @@ fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
currentChain
where
-- Data swizzling functions to get the right info into each stage.
swizzleI (c, p@(_, inflight,_, _)) = (c, inflight, p)
swizzleIG (c, p@(_, inflight,gsvs,_)) = (c, inflight, gsvs, p)
swizzleSI (c, p@(status,inflight,_, _)) = (c, status, inflight, p)
swizzleSIG (c, p@(status,inflight,gsvs,_)) = (c, status, inflight, gsvs, p)
swizzleI (c, p@(_, inflight,_,_, _)) = (c, inflight, p)
swizzleIG (c, p@(_, inflight,gsvs,_, _)) = (c, inflight, gsvs, p)
swizzleSI (c, p@(status,inflight,_,_, _)) = (c, status, inflight, p)
swizzleSIG (c, p@(status,inflight,gsvs,peer,_)) = (c, status, inflight, gsvs, peer, p)

{-
We have the node's /current/ or /adopted/ chain. This is the node's chain in
Expand Down Expand Up @@ -774,36 +777,42 @@ obviously take that into account when considering later peer chains.


fetchRequestDecisions
:: forall header peer. HasHeader header
:: forall extra header peer.
( Eq peer
, HasHeader header
)
=> FetchDecisionPolicy header
-> FetchMode
-> [( FetchDecision [AnchoredFragment header]
, PeerFetchStatus header
, PeerFetchInFlight header
, PeerGSV
, peer )]
-> [(FetchDecision (FetchRequest header), peer)]
, peer
, extra)]
-> [(FetchDecision (FetchRequest header), extra)]
fetchRequestDecisions fetchDecisionPolicy fetchMode chains =
go nConcurrentFetchPeers0 Set.empty NoMaxSlotNo chains
where
go :: Word
-> Set (Point header)
-> MaxSlotNo
-> [(Either FetchDecline [AnchoredFragment header],
PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, b)]
-> [(FetchDecision (FetchRequest header), b)]
PeerFetchStatus header, PeerFetchInFlight header, PeerGSV, peer, extra)]
-> [(FetchDecision (FetchRequest header), extra)]
go !_ !_ !_ [] = []
go !nConcurrentFetchPeers !blocksFetchedThisRound !maxSlotNoFetchedThisRound
((mchainfragments, status, inflight, gsvs, peer) : cps) =
((mchainfragments, status, inflight, gsvs, peer, extra) : cps) =

(decision, peer)
(decision, extra)
: go nConcurrentFetchPeers' blocksFetchedThisRound'
maxSlotNoFetchedThisRound' cps
where
decision = fetchRequestDecision
fetchDecisionPolicy
fetchMode
nConcurrentFetchPeers
-- Permitt the prefered peers to by pass any concurrency limits.
(if elem peer nPreferedPeers then 0
else nConcurrentFetchPeers)
(calculatePeerFetchInFlightLimits gsvs)
inflight
status
Expand Down Expand Up @@ -856,11 +865,32 @@ fetchRequestDecisions fetchDecisionPolicy fetchMode chains =
fromIntegral
. length
. filter (> 0)
. map (\(_, _, PeerFetchInFlight{peerFetchReqsInFlight}, _, _) ->
. map (\(_, _, PeerFetchInFlight{peerFetchReqsInFlight}, _, _, _) ->
peerFetchReqsInFlight)
$ chains


-- Order the peers based on current PeerGSV. The top performing peers will be
-- permitted to go active even if we're above the desired maxConcurrentFetchPeers
-- which will cause us to switch smoothly from a slower to faster peers.
-- When switching from slow to faster peers we will be over the configured limit, but
-- PeerGSV is expected to be updated rather infrequently so the set of prefered peers should
-- be stable during 10s of second.
nPreferedPeers :: [peer]
nPreferedPeers =
map snd
. take (fromIntegral maxConcurrentFetchPeers)
. sortBy (\(a, _) (b, _) -> comparePeerGSV a b)
. map (\(_, _, _, gsv, p, _) -> (gsv, p))
$ chains

maxConcurrentFetchPeers :: Word
maxConcurrentFetchPeers =
case fetchMode of
FetchModeBulkSync -> maxConcurrencyBulkSync fetchDecisionPolicy
FetchModeDeadline -> maxConcurrencyDeadline fetchDecisionPolicy


fetchRequestDecision
:: HasHeader header
=> FetchDecisionPolicy header
Expand Down Expand Up @@ -919,11 +949,20 @@ fetchRequestDecision FetchDecisionPolicy {
inFlightBytesLowWatermark
inFlightBytesHighWatermark

-- Refuse any blockrequest if we're above the concurrency limit.
| let maxConcurrentFetchPeers = case fetchMode of
FetchModeBulkSync -> maxConcurrencyBulkSync
FetchModeDeadline -> maxConcurrencyDeadline
, nConcurrentFetchPeers > maxConcurrentFetchPeers
= Left $ FetchDeclineConcurrencyLimit
fetchMode maxConcurrentFetchPeers

-- If we're at the concurrency limit refuse any additional peers.
| peerFetchReqsInFlight == 0
, let maxConcurrentFetchPeers = case fetchMode of
FetchModeBulkSync -> maxConcurrencyBulkSync
FetchModeDeadline -> maxConcurrencyDeadline
, nConcurrentFetchPeers >= maxConcurrentFetchPeers
, nConcurrentFetchPeers == maxConcurrentFetchPeers
= Left $ FetchDeclineConcurrencyLimit
fetchMode maxConcurrentFetchPeers

Expand Down
11 changes: 11 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/DeltaQ.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Ouroboros.Network.BlockFetch.DeltaQ (
calculatePeerFetchInFlightLimits,
estimateResponseDeadlineProbability,
estimateExpectedResponseDuration,
comparePeerGSV,
-- estimateBlockFetchResponse,
-- blockArrivalShedule,
) where
Expand All @@ -28,6 +29,16 @@ data PeerFetchInFlightLimits = PeerFetchInFlightLimits {
}
deriving Show

-- Order two PeerGSVs based on `g`.
comparePeerGSV :: PeerGSV -> PeerGSV -> Ordering
comparePeerGSV a b = compare (gs a) (gs b)
where
gs :: PeerGSV -> DiffTime
gs PeerGSV { outboundGSV = GSV g_out _s_out _v_out,
inboundGSV = GSV g_in _s_in _v_in
} = g_out + g_in


calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV {
outboundGSV = GSV g_out _s_out _v_out,
Expand Down
8 changes: 4 additions & 4 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fetchLogicIteration decisionTracer clientStateTracer
-- Trace the batch of fetch decisions
traceWith decisionTracer
[ TraceLabelPeer peer (fmap fetchRequestPoints decision)
| (decision, (_, _, _, (_, peer))) <- decisions ]
| (decision, (_, _, _, peer, _)) <- decisions ]

-- Tell the fetch clients to act on our decisions
statusUpdates <- fetchLogicIterationAct clientStateTracer
Expand All @@ -159,7 +159,7 @@ fetchLogicIteration decisionTracer clientStateTracer

return stateFingerprint''
where
swizzleReqVar (d,(_,_,g,(rq,p))) = (d,g,rq,p)
swizzleReqVar (d,(_,_,g,_,(rq,p))) = (d,g,rq,p)

fetchRequestPoints :: HasHeader hdr => FetchRequest hdr -> [Point hdr]
fetchRequestPoints (FetchRequest headerss) =
Expand All @@ -178,7 +178,7 @@ fetchDecisionsForStateSnapshot
=> FetchDecisionPolicy header
-> FetchStateSnapshot peer header block m
-> [( FetchDecision (FetchRequest header),
PeerInfo header (FetchClientStateVars m header, peer)
PeerInfo header peer (FetchClientStateVars m header, peer)
)]

fetchDecisionsForStateSnapshot
Expand Down Expand Up @@ -213,7 +213,7 @@ fetchDecisionsForStateSnapshot
fetchStatePeerGSVs

swizzle (peer, ((chain, (status, inflight, vars)), gsvs)) =
(chain, (status, inflight, gsvs, (vars, peer)))
(chain, (status, inflight, gsvs, peer, (vars, peer)))


-- | Act on decisions to send new requests. In fact all we do here is update
Expand Down

0 comments on commit a097f80

Please sign in to comment.