-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathsession.go
281 lines (236 loc) · 10.1 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package session
import (
"context"
"sync"
"cosmossdk.io/depinject"
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/observable/logging"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/relayer"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
)
var _ relayer.RelayerSessionsManager = (*relayerSessionsManager)(nil)
type sessionsTreesMap = map[int64]map[string]relayer.SessionTree
// relayerSessionsManager is an implementation of the RelayerSessions interface.
// TODO_TEST: Add tests to the relayerSessionsManager.
type relayerSessionsManager struct {
logger polylog.Logger
relayObs relayer.MinedRelaysObservable
// sessionsToClaimObs notifies about sessions that are ready to be claimed.
sessionsToClaimObs observable.Observable[relayer.SessionTree]
// sessionTrees is a map of block heights pointing to a map of SessionTrees
// indexed by their sessionId.
// The block height index is used to know when the sessions contained in the entry should be closed,
// this helps to avoid iterating over all sessionsTrees to check if they are ready to be closed.
sessionsTrees sessionsTreesMap
sessionsTreesMu *sync.Mutex
// blockClient is used to get the notifications of committed blocks.
blockClient client.BlockClient
// supplierClient is used to create claims and submit proofs for sessions.
supplierClient client.SupplierClient
// storesDirectory points to a path on disk where KVStore data files are created.
storesDirectory string
}
// NewRelayerSessions creates a new relayerSessions.
//
// Required dependencies:
// - client.BlockClient
// - client.SupplierClient
//
// Available options:
// - WithStoresDirectory
func NewRelayerSessions(
ctx context.Context,
deps depinject.Config,
opts ...relayer.RelayerSessionsManagerOption,
) (relayer.RelayerSessionsManager, error) {
rs := &relayerSessionsManager{
logger: polylog.Ctx(ctx),
sessionsTrees: make(sessionsTreesMap),
sessionsTreesMu: &sync.Mutex{},
}
if err := depinject.Inject(
deps,
&rs.blockClient,
&rs.supplierClient,
); err != nil {
return nil, err
}
for _, opt := range opts {
opt(rs)
}
if err := rs.validateConfig(); err != nil {
return nil, err
}
rs.sessionsToClaimObs = channel.MapExpand[client.Block, relayer.SessionTree](
ctx,
rs.blockClient.CommittedBlocksSequence(ctx),
rs.mapBlockToSessionsToClaim,
)
return rs, nil
}
// Start maps over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
// them through the claim/proof lifecycle, broadcasting transactions to the
// network as necessary.
// It IS NOT BLOCKING as map operations run in their own goroutines.
func (rs *relayerSessionsManager) Start(ctx context.Context) {
// NB: must cast back to generic observable type to use with Map.
// relayer.MinedRelaysObservable cannot be an alias due to gomock's lack of
// support for generic types.
relayObs := observable.Observable[*relayer.MinedRelay](rs.relayObs)
// Map eitherMinedRelays to a new observable of an error type which is
// notified if an error was encountered while attempting to add the relay to
// the session tree.
miningErrorsObs := channel.Map(ctx, relayObs, rs.mapAddMinedRelayToSessionTree)
logging.LogErrors(ctx, miningErrorsObs)
// Start claim/proof pipeline.
claimedSessionsObs := rs.createClaims(ctx)
rs.submitProofs(ctx, claimedSessionsObs)
}
// Stop unsubscribes all observables from the InsertRelays observable which
// will close downstream observables as they drain.
//
// TODO_TECHDEBT: Either add a mechanism to wait for draining to complete
// and/or ensure that the state at each pipeline stage is persisted to disk
// and exit as early as possible.
func (rs *relayerSessionsManager) Stop() {
rs.relayObs.UnsubscribeAll()
}
// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
func (rs *relayerSessionsManager) InsertRelays(relays relayer.MinedRelaysObservable) {
rs.relayObs = relays
}
// ensureSessionTree returns the SessionTree for a given session.
// If no tree for the session exists, a new SessionTree is created before returning.
func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) {
sessionsTrees, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight]
// If there is no map for sessions at the sessionEndHeight, create one.
if !ok {
sessionsTrees = make(map[string]relayer.SessionTree)
rs.sessionsTrees[sessionHeader.SessionEndBlockHeight] = sessionsTrees
}
// Get the sessionTree for the given session.
sessionTree, ok := sessionsTrees[sessionHeader.SessionId]
// If the sessionTree does not exist, create it.
var err error
if !ok {
sessionTree, err = NewSessionTree(sessionHeader, rs.storesDirectory, rs.removeFromRelayerSessions)
if err != nil {
return nil, err
}
sessionsTrees[sessionHeader.SessionId] = sessionTree
}
return sessionTree, nil
}
// mapBlockToSessionsToClaim maps a block to a list of sessions which can be
// claimed as of that block.
func (rs *relayerSessionsManager) mapBlockToSessionsToClaim(
_ context.Context,
block client.Block,
) (sessionTrees []relayer.SessionTree, skip bool) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()
// Check if there are sessions that need to enter the claim/proof phase
// as their end block height was the one before the last committed block or
// earlier.
// Iterate over the sessionsTrees map to get the ones that end at a block height
// lower than the current block height.
for endBlockHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees {
if IsWithinGracePeriod(endBlockHeight, block.Height()) {
// Iterate over the sessionsTrees that have grace period ending at this
// block height and add them to the list of sessionTrees to be published.
for _, sessionTree := range sessionsTreesEndingAtBlockHeight {
// Mark the session as claimed and add it to the list of sessionTrees to be published.
// If the session has already been claimed, it will be skipped.
// Appending the sessionTree to the list of sessionTrees is protected
// against concurrent access by the sessionsTreesMu such that the first
// call that marks the session as claimed will be the only one to add the
// sessionTree to the list.
if err := sessionTree.StartClaiming(); err == nil {
sessionTrees = append(sessionTrees, sessionTree)
}
}
}
}
return sessionTrees, false
}
// removeFromRelayerSessions removes the SessionTree from the relayerSessions.
func (rs *relayerSessionsManager) removeFromRelayerSessions(sessionHeader *sessiontypes.SessionHeader) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()
sessionsTreesEndingAtBlockHeight, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight]
if !ok {
rs.logger.Debug().
Int64("session_end_block_height", sessionHeader.SessionEndBlockHeight).
Msg("no session tree found for ending sessions")
return
}
delete(sessionsTreesEndingAtBlockHeight, sessionHeader.SessionId)
// Check if the sessionsTrees map is empty and delete it if so.
// This is an optimization done to save memory by avoiding an endlessly growing sessionsTrees map.
if len(sessionsTreesEndingAtBlockHeight) == 0 {
delete(rs.sessionsTrees, sessionHeader.SessionEndBlockHeight)
}
}
// validateConfig validates the relayerSessionsManager's configuration.
// TODO_TEST: Add unit tests to validate these configurations.
func (rp *relayerSessionsManager) validateConfig() error {
if rp.storesDirectory == "" {
return ErrSessionTreeUndefinedStoresDirectory
}
return nil
}
// waitForBlock blocks until the block at the given height (or greater) is
// observed as having been committed.
func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64) client.Block {
// Create a cancellable child context for managing the CommittedBlocksSequence lifecycle.
// Since the subscription is no longer needed after the block it is looking for
// is reached, cancelling the child context at the end of the function will stop
// the subscriptions and close the publish channel associated with the
// CommittedBlocksSequence observable which is not exposing it.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
subscription := rs.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx)
for block := range subscription.Ch() {
if block.Height() >= height {
return block
}
}
return nil
}
// mapAddMinedRelayToSessionTree is intended to be used as a MapFn. It adds the relay
// to the session tree. If it encounters an error, it returns the error. Otherwise,
// it skips output (only outputs errors).
func (rs *relayerSessionsManager) mapAddMinedRelayToSessionTree(
_ context.Context,
relay *relayer.MinedRelay,
) (_ error, skip bool) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()
// ensure the session tree exists for this relay
// TODO_CONSIDERATION: if we get the session header from the response, there
// is no possibility that we forgot to hydrate it (i.e. blindly trust the client).
sessionHeader := relay.GetReq().GetMeta().SessionHeader
smst, err := rs.ensureSessionTree(sessionHeader)
if err != nil {
// TODO_IMPROVE: log additional info?
rs.logger.Error().Err(err).Msg("failed to ensure session tree")
return err, false
}
if err := smst.Update(relay.Hash, relay.Bytes, 1); err != nil {
// TODO_IMPROVE: log additional info?
rs.logger.Error().Err(err).Msg("failed to update smt")
return err, false
}
// Skip because this map function only outputs errors.
return nil, true
}
// IsWithinGracePeriod checks if the grace period for the session has ended
// and signals whether it is time to create a claim for it.
func IsWithinGracePeriod(sessionEndBlockHeight, currentBlockHeight int64) bool {
return currentBlockHeight <= sessionEndBlockHeight+sessionkeeper.GetSessionGracePeriodBlockCount()
}