-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Copy pathstore.go
1230 lines (1045 loc) · 37.4 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package rootmulti
import (
"errors"
"fmt"
"io"
"math"
"sort"
"strings"
"sync"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
dbm "github.com/cosmos/cosmos-db"
protoio "github.com/cosmos/gogoproto/io"
gogotypes "github.com/cosmos/gogoproto/types"
iavltree "github.com/cosmos/iavl"
errorsmod "cosmossdk.io/errors"
"cosmossdk.io/log"
"cosmossdk.io/store/cachemulti"
"cosmossdk.io/store/dbadapter"
"cosmossdk.io/store/iavl"
"cosmossdk.io/store/listenkv"
"cosmossdk.io/store/mem"
"cosmossdk.io/store/metrics"
"cosmossdk.io/store/pruning"
pruningtypes "cosmossdk.io/store/pruning/types"
snapshottypes "cosmossdk.io/store/snapshots/types"
"cosmossdk.io/store/tracekv"
"cosmossdk.io/store/transient"
"cosmossdk.io/store/types"
)
const (
latestVersionKey = "s/latest"
commitInfoKeyFmt = "s/%d" // s/<version>
)
const iavlDisablefastNodeDefault = false
// keysFromStoreKeyMap returns a slice of keys for the provided map lexically sorted by StoreKey.Name()
func keysFromStoreKeyMap[V any](m map[types.StoreKey]V) []types.StoreKey {
keys := make([]types.StoreKey, 0, len(m))
for key := range m {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
ki, kj := keys[i], keys[j]
return ki.Name() < kj.Name()
})
return keys
}
// Store is composed of many CommitStores. Name contrasts with
// cacheMultiStore which is used for branching other MultiStores. It implements
// the CommitMultiStore interface.
type Store struct {
db dbm.DB
logger log.Logger
lastCommitInfo *types.CommitInfo
pruningManager *pruning.Manager
iavlCacheSize int
iavlDisableFastNode bool
storesParams map[types.StoreKey]storeParams
stores map[types.StoreKey]types.CommitKVStore
keysByName map[string]types.StoreKey
initialVersion int64
removalMap map[types.StoreKey]bool
traceWriter io.Writer
traceContext types.TraceContext
traceContextMutex sync.Mutex
interBlockCache types.MultiStorePersistentCache
listeners map[types.StoreKey]*types.MemoryListener
metrics metrics.StoreMetrics
commitHeader cmtproto.Header
}
var (
_ types.CommitMultiStore = (*Store)(nil)
_ types.Queryable = (*Store)(nil)
)
// NewStore returns a reference to a new Store object with the provided DB. The
// store will be created with a PruneNothing pruning strategy by default. After
// a store is created, KVStores must be mounted and finally LoadLatestVersion or
// LoadVersion must be called.
func NewStore(db dbm.DB, logger log.Logger, metricGatherer metrics.StoreMetrics) *Store {
return &Store{
db: db,
logger: logger,
iavlCacheSize: iavl.DefaultIAVLCacheSize,
iavlDisableFastNode: iavlDisablefastNodeDefault,
storesParams: make(map[types.StoreKey]storeParams),
stores: make(map[types.StoreKey]types.CommitKVStore),
keysByName: make(map[string]types.StoreKey),
listeners: make(map[types.StoreKey]*types.MemoryListener),
removalMap: make(map[types.StoreKey]bool),
pruningManager: pruning.NewManager(db, logger),
metrics: metricGatherer,
}
}
// GetPruning fetches the pruning strategy from the root store.
func (rs *Store) GetPruning() pruningtypes.PruningOptions {
return rs.pruningManager.GetOptions()
}
// SetPruning sets the pruning strategy on the root store and all the sub-stores.
// Note, calling SetPruning on the root store prior to LoadVersion or
// LoadLatestVersion performs a no-op as the stores aren't mounted yet.
func (rs *Store) SetPruning(pruningOpts pruningtypes.PruningOptions) {
rs.pruningManager.SetOptions(pruningOpts)
}
// SetMetrics sets the metrics gatherer for the store package
func (rs *Store) SetMetrics(metrics metrics.StoreMetrics) {
rs.metrics = metrics
}
// SetSnapshotInterval sets the interval at which the snapshots are taken.
// It is used by the store to determine which heights to retain until after the snapshot is complete.
func (rs *Store) SetSnapshotInterval(snapshotInterval uint64) {
rs.pruningManager.SetSnapshotInterval(snapshotInterval)
}
func (rs *Store) SetIAVLCacheSize(cacheSize int) {
rs.iavlCacheSize = cacheSize
}
func (rs *Store) SetIAVLDisableFastNode(disableFastNode bool) {
rs.iavlDisableFastNode = disableFastNode
}
// GetStoreType implements Store.
func (rs *Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}
// MountStoreWithDB implements CommitMultiStore.
func (rs *Store) MountStoreWithDB(key types.StoreKey, typ types.StoreType, db dbm.DB) {
if key == nil {
panic("MountIAVLStore() key cannot be nil")
}
if _, ok := rs.storesParams[key]; ok {
panic(fmt.Sprintf("store duplicate store key %v", key))
}
if _, ok := rs.keysByName[key.Name()]; ok {
panic(fmt.Sprintf("store duplicate store key name %v", key))
}
rs.storesParams[key] = newStoreParams(key, db, typ, 0)
rs.keysByName[key.Name()] = key
}
// GetCommitStore returns a mounted CommitStore for a given StoreKey. If the
// store is wrapped in an inter-block cache, it will be unwrapped before returning.
func (rs *Store) GetCommitStore(key types.StoreKey) types.CommitStore {
return rs.GetCommitKVStore(key)
}
// GetCommitKVStore returns a mounted CommitKVStore for a given StoreKey. If the
// store is wrapped in an inter-block cache, it will be unwrapped before returning.
func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore {
// If the Store has an inter-block cache, first attempt to lookup and unwrap
// the underlying CommitKVStore by StoreKey. If it does not exist, fallback to
// the main mapping of CommitKVStores.
if rs.interBlockCache != nil {
if store := rs.interBlockCache.Unwrap(key); store != nil {
return store
}
}
return rs.stores[key]
}
// StoreKeysByName returns mapping storeNames -> StoreKeys
func (rs *Store) StoreKeysByName() map[string]types.StoreKey {
return rs.keysByName
}
// LoadLatestVersionAndUpgrade implements CommitMultiStore
func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error {
ver := GetLatestVersion(rs.db)
return rs.loadVersion(ver, upgrades)
}
// LoadVersionAndUpgrade allows us to rename substores while loading an older version
func (rs *Store) LoadVersionAndUpgrade(ver int64, upgrades *types.StoreUpgrades) error {
return rs.loadVersion(ver, upgrades)
}
// LoadLatestVersion implements CommitMultiStore.
func (rs *Store) LoadLatestVersion() error {
ver := GetLatestVersion(rs.db)
return rs.loadVersion(ver, nil)
}
// LoadVersion implements CommitMultiStore.
func (rs *Store) LoadVersion(ver int64) error {
return rs.loadVersion(ver, nil)
}
func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
infos := make(map[string]types.StoreInfo)
rs.logger.Debug("loadVersion", "ver", ver)
cInfo := &types.CommitInfo{}
// load old data if we are not version 0
if ver != 0 {
var err error
cInfo, err = rs.GetCommitInfo(ver)
if err != nil {
return err
}
// convert StoreInfos slice to map
for _, storeInfo := range cInfo.StoreInfos {
infos[storeInfo.Name] = storeInfo
}
}
// load each Store (note this doesn't panic on unmounted keys now)
newStores := make(map[types.StoreKey]types.CommitKVStore)
storesKeys := make([]types.StoreKey, 0, len(rs.storesParams))
for key := range rs.storesParams {
storesKeys = append(storesKeys, key)
}
if upgrades != nil {
// deterministic iteration order for upgrades
// (as the underlying store may change and
// upgrades make store changes where the execution order may matter)
sort.Slice(storesKeys, func(i, j int) bool {
return storesKeys[i].Name() < storesKeys[j].Name()
})
}
for _, key := range storesKeys {
storeParams := rs.storesParams[key]
commitID := rs.getCommitID(infos, key.Name())
rs.logger.Debug("loadVersion commitID", "key", key, "ver", ver, "hash", fmt.Sprintf("%x", commitID.Hash))
// If it has been added, set the initial version
if upgrades.IsAdded(key.Name()) || upgrades.RenamedFrom(key.Name()) != "" {
storeParams.initialVersion = uint64(ver) + 1
} else if commitID.Version != ver && storeParams.typ == types.StoreTypeIAVL {
return fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d; new stores should be added using StoreUpgrades", key.Name(), ver, commitID.Version)
}
store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams)
if err != nil {
return errorsmod.Wrap(err, "failed to load store")
}
newStores[key] = store
// If it was deleted, remove all data
if upgrades.IsDeleted(key.Name()) {
if err := deleteKVStore(store.(types.KVStore)); err != nil {
return errorsmod.Wrapf(err, "failed to delete store %s", key.Name())
}
rs.removalMap[key] = true
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
// handle renames specially
// make an unregistered key to satisfy loadCommitStore params
oldKey := types.NewKVStoreKey(oldName)
oldParams := newStoreParams(oldKey, storeParams.db, storeParams.typ, 0)
// load from the old name
oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams)
if err != nil {
return errorsmod.Wrapf(err, "failed to load old store %s", oldName)
}
// move all data
if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil {
return errorsmod.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name())
}
// add the old key so its deletion is committed
newStores[oldKey] = oldStore
// this will ensure it's not perpetually stored in commitInfo
rs.removalMap[oldKey] = true
}
}
rs.lastCommitInfo = cInfo
rs.stores = newStores
// load any snapshot heights we missed from disk to be pruned on the next run
if err := rs.pruningManager.LoadSnapshotHeights(rs.db); err != nil {
return err
}
return nil
}
func (rs *Store) getCommitID(infos map[string]types.StoreInfo, name string) types.CommitID {
info, ok := infos[name]
if !ok {
return types.CommitID{}
}
return info.CommitId
}
func deleteKVStore(kv types.KVStore) error {
// Note that we cannot write while iterating, so load all keys here, delete below
var keys [][]byte
itr := kv.Iterator(nil, nil)
for itr.Valid() {
keys = append(keys, itr.Key())
itr.Next()
}
if err := itr.Close(); err != nil {
return err
}
for _, k := range keys {
kv.Delete(k)
}
return nil
}
// we simulate move by a copy and delete
func moveKVStoreData(oldDB, newDB types.KVStore) error {
// we read from one and write to another
itr := oldDB.Iterator(nil, nil)
for itr.Valid() {
newDB.Set(itr.Key(), itr.Value())
itr.Next()
}
if err := itr.Close(); err != nil {
return err
}
// then delete the old store
return deleteKVStore(oldDB)
}
// PruneSnapshotHeight prunes the given height according to the prune strategy.
// If the strategy is PruneNothing, this is a no-op.
// For other strategies, this height is persisted until the snapshot is operated.
func (rs *Store) PruneSnapshotHeight(height int64) {
rs.pruningManager.HandleSnapshotHeight(height)
}
// SetInterBlockCache sets the Store's internal inter-block (persistent) cache.
// When this is defined, all CommitKVStores will be wrapped with their respective
// inter-block cache.
func (rs *Store) SetInterBlockCache(c types.MultiStorePersistentCache) {
rs.interBlockCache = c
}
// SetTracer sets the tracer for the MultiStore that the underlying
// stores will utilize to trace operations. A MultiStore is returned.
func (rs *Store) SetTracer(w io.Writer) types.MultiStore {
rs.traceWriter = w
return rs
}
// SetTracingContext updates the tracing context for the MultiStore by merging
// the given context with the existing context by key. Any existing keys will
// be overwritten. It is implied that the caller should update the context when
// necessary between tracing operations. It returns a modified MultiStore.
func (rs *Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
rs.traceContextMutex.Lock()
defer rs.traceContextMutex.Unlock()
rs.traceContext = rs.traceContext.Merge(tc)
return rs
}
func (rs *Store) getTracingContext() types.TraceContext {
rs.traceContextMutex.Lock()
defer rs.traceContextMutex.Unlock()
if rs.traceContext == nil {
return nil
}
ctx := types.TraceContext{}
for k, v := range rs.traceContext {
ctx[k] = v
}
return ctx
}
// TracingEnabled returns if tracing is enabled for the MultiStore.
func (rs *Store) TracingEnabled() bool {
return rs.traceWriter != nil
}
// AddListeners adds a listener for the KVStore belonging to the provided StoreKey
func (rs *Store) AddListeners(keys []types.StoreKey) {
for i := range keys {
listener := rs.listeners[keys[i]]
if listener == nil {
rs.listeners[keys[i]] = types.NewMemoryListener()
}
}
}
// ListeningEnabled returns if listening is enabled for a specific KVStore
func (rs *Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := rs.listeners[key]; ok {
return ls != nil
}
return false
}
// PopStateCache returns the accumulated state change messages from the CommitMultiStore
// Calling PopStateCache destroys only the currently accumulated state in each listener
// not the state in the store itself. This is a mutating and destructive operation.
// This method has been synchronized.
func (rs *Store) PopStateCache() []*types.StoreKVPair {
var cache []*types.StoreKVPair
for key := range rs.listeners {
ls := rs.listeners[key]
if ls != nil {
cache = append(cache, ls.PopStateCache()...)
}
}
sort.SliceStable(cache, func(i, j int) bool {
return cache[i].StoreKey < cache[j].StoreKey
})
return cache
}
// LatestVersion returns the latest version in the store
func (rs *Store) LatestVersion() int64 {
return rs.LastCommitID().Version
}
// LastCommitID implements Committer/CommitStore.
func (rs *Store) LastCommitID() types.CommitID {
if rs.lastCommitInfo == nil {
return types.CommitID{
Version: GetLatestVersion(rs.db),
}
}
return rs.lastCommitInfo.CommitID()
}
// Commit implements Committer/CommitStore.
func (rs *Store) Commit() types.CommitID {
var previousHeight, version int64
if rs.lastCommitInfo.GetVersion() == 0 && rs.initialVersion > 1 {
// This case means that no commit has been made in the store, we
// start from initialVersion.
version = rs.initialVersion
} else {
// This case can means two things:
// - either there was already a previous commit in the store, in which
// case we increment the version from there,
// - or there was no previous commit, and initial version was not set,
// in which case we start at version 1.
previousHeight = rs.lastCommitInfo.GetVersion()
version = previousHeight + 1
}
if rs.commitHeader.Height != version {
rs.logger.Debug("commit header and version mismatch", "header_height", rs.commitHeader.Height, "version", version)
}
rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap)
rs.lastCommitInfo.Timestamp = rs.commitHeader.Time
defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo)
// remove remnants of removed stores
for sk := range rs.removalMap {
if _, ok := rs.stores[sk]; ok {
delete(rs.stores, sk)
delete(rs.storesParams, sk)
delete(rs.keysByName, sk.Name())
}
}
// reset the removalMap
rs.removalMap = make(map[types.StoreKey]bool)
if err := rs.handlePruning(version); err != nil {
panic(err)
}
return types.CommitID{
Version: version,
Hash: rs.lastCommitInfo.Hash(),
}
}
// WorkingHash returns the current hash of the store.
// it will be used to get the current app hash before commit.
func (rs *Store) WorkingHash() []byte {
storeInfos := make([]types.StoreInfo, 0, len(rs.stores))
storeKeys := keysFromStoreKeyMap(rs.stores)
for _, key := range storeKeys {
store := rs.stores[key]
if store.GetStoreType() != types.StoreTypeIAVL {
continue
}
if !rs.removalMap[key] {
si := types.StoreInfo{
Name: key.Name(),
CommitId: types.CommitID{
Hash: store.WorkingHash(),
},
}
storeInfos = append(storeInfos, si)
}
}
sort.SliceStable(storeInfos, func(i, j int) bool {
return storeInfos[i].Name < storeInfos[j].Name
})
return types.CommitInfo{StoreInfos: storeInfos}.Hash()
}
// CacheWrap implements CacheWrapper/Store/CommitStore.
func (rs *Store) CacheWrap() types.CacheWrap {
return rs.CacheMultiStore().(types.CacheWrap)
}
// CacheWrapWithTrace implements the CacheWrapper interface.
func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
return rs.CacheWrap()
}
// CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore.
// It implements the MultiStore interface.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
store := types.KVStore(v)
// Wire the listenkv.Store to allow listeners to observe the writes from the cache store,
// set same listeners on cache store will observe duplicated writes.
if rs.ListeningEnabled(k) {
store = listenkv.NewStore(store, k, rs.listeners[k])
}
stores[k] = store
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.getTracingContext())
}
// CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it
// attempts to load stores at a given version (height). An error is returned if
// any store cannot be loaded. This should only be used for querying and
// iterating at past heights.
func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) {
cachedStores := make(map[types.StoreKey]types.CacheWrapper)
var commitInfo *types.CommitInfo
storeInfos := map[string]bool{}
for key, store := range rs.stores {
var cacheStore types.KVStore
switch store.GetStoreType() {
case types.StoreTypeIAVL:
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
// Attempt to lazy-load an already saved IAVL store version. If the
// version does not exist or is pruned, an error should be returned.
var err error
cacheStore, err = store.(*iavl.Store).GetImmutable(version)
// if we got error from loading a module store
// we fetch commit info of this version
// we use commit info to check if the store existed at this version or not
if err != nil {
if commitInfo == nil {
var errCommitInfo error
commitInfo, errCommitInfo = rs.GetCommitInfo(version)
if errCommitInfo != nil {
return nil, errCommitInfo
}
for _, storeInfo := range commitInfo.StoreInfos {
storeInfos[storeInfo.Name] = true
}
}
// If the store existed at this version, it means there's actually an error
// getting the root store at this version.
if storeInfos[key.Name()] {
return nil, err
}
}
default:
cacheStore = store
}
// Wire the listenkv.Store to allow listeners to observe the writes from the cache store,
// set same listeners on cache store will observe duplicated writes.
if rs.ListeningEnabled(key) {
cacheStore = listenkv.NewStore(cacheStore, key, rs.listeners[key])
}
cachedStores[key] = cacheStore
}
return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.getTracingContext()), nil
}
// GetStore returns a mounted Store for a given StoreKey. If the StoreKey does
// not exist, it will panic. If the Store is wrapped in an inter-block cache, it
// will be unwrapped prior to being returned.
//
// TODO: This isn't used directly upstream. Consider returning the Store as-is
// instead of unwrapping.
func (rs *Store) GetStore(key types.StoreKey) types.Store {
store := rs.GetCommitKVStore(key)
if store == nil {
panic(fmt.Sprintf("store does not exist for key: %s", key.Name()))
}
return store
}
// GetKVStore returns a mounted KVStore for a given StoreKey. If tracing is
// enabled on the KVStore, a wrapped TraceKVStore will be returned with the root
// store's tracer, otherwise, the original KVStore will be returned.
//
// NOTE: The returned KVStore may be wrapped in an inter-block cache if it is
// set on the root store.
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
s := rs.stores[key]
if s == nil {
panic(fmt.Sprintf("store does not exist for key: %s", key.Name()))
}
store := types.KVStore(s)
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.getTracingContext())
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(store, key, rs.listeners[key])
}
return store
}
func (rs *Store) handlePruning(version int64) error {
pruneHeight := rs.pruningManager.GetPruningHeight(version)
rs.logger.Debug("prune start", "height", version)
defer rs.logger.Debug("prune end", "height", version)
return rs.PruneStores(pruneHeight)
}
// PruneStores prunes all history upto the specific height of the multi store.
func (rs *Store) PruneStores(pruningHeight int64) (err error) {
if pruningHeight <= 0 {
rs.logger.Debug("pruning skipped, height is less than or equal to 0")
return nil
}
rs.logger.Debug("pruning store", "heights", pruningHeight)
for key, store := range rs.stores {
rs.logger.Debug("pruning store", "key", key) // Also log store.name (a private variable)?
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
if store.GetStoreType() != types.StoreTypeIAVL {
continue
}
store = rs.GetCommitKVStore(key)
err := store.(*iavl.Store).DeleteVersionsTo(pruningHeight)
if err == nil {
continue
}
if errors.Is(err, iavltree.ErrVersionDoesNotExist) {
return err
}
rs.logger.Error("failed to prune store", "key", key, "err", err)
}
return nil
}
// getStoreByName performs a lookup of a StoreKey given a store name typically
// provided in a path. The StoreKey is then used to perform a lookup and return
// a Store. If the Store is wrapped in an inter-block cache, it will be unwrapped
// prior to being returned. If the StoreKey does not exist, nil is returned.
func (rs *Store) GetStoreByName(name string) types.Store {
key := rs.keysByName[name]
if key == nil {
return nil
}
return rs.GetCommitKVStore(key)
}
// Query calls substore.Query with the same `req` where `req.Path` is
// modified to remove the substore prefix.
// Ie. `req.Path` here is `/<substore>/<path>`, and trimmed to `/<path>` for the substore.
// TODO: add proof for `multistore -> substore`.
func (rs *Store) Query(req *types.RequestQuery) (*types.ResponseQuery, error) {
path := req.Path
storeName, subpath, err := parsePath(path)
if err != nil {
return &types.ResponseQuery{}, err
}
store := rs.GetStoreByName(storeName)
if store == nil {
return &types.ResponseQuery{}, errorsmod.Wrapf(types.ErrUnknownRequest, "no such store: %s", storeName)
}
queryable, ok := store.(types.Queryable)
if !ok {
return &types.ResponseQuery{}, errorsmod.Wrapf(types.ErrUnknownRequest, "store %s (type %T) doesn't support queries", storeName, store)
}
// trim the path and make the query
req.Path = subpath
res, err := queryable.Query(req)
if !req.Prove || !RequireProof(subpath) {
return res, err
}
if res.ProofOps == nil || len(res.ProofOps.Ops) == 0 {
return &types.ResponseQuery{}, errorsmod.Wrap(types.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned")
}
// If the request's height is the latest height we've committed, then utilize
// the store's lastCommitInfo as this commit info may not be flushed to disk.
// Otherwise, we query for the commit info from disk.
var commitInfo *types.CommitInfo
if res.Height == rs.lastCommitInfo.Version {
commitInfo = rs.lastCommitInfo
} else {
commitInfo, err = rs.GetCommitInfo(res.Height)
if err != nil {
return &types.ResponseQuery{}, err
}
}
// Restore origin path and append proof op.
res.ProofOps.Ops = append(res.ProofOps.Ops, commitInfo.ProofOp(storeName))
return res, nil
}
// SetInitialVersion sets the initial version of the IAVL tree. It is used when
// starting a new chain at an arbitrary height.
func (rs *Store) SetInitialVersion(version int64) error {
rs.initialVersion = version
// Loop through all the stores, if it's an IAVL store, then set initial
// version on it.
for key, store := range rs.stores {
if store.GetStoreType() == types.StoreTypeIAVL {
// If the store is wrapped with an inter-block cache, we must first unwrap
// it to get the underlying IAVL store.
store = rs.GetCommitKVStore(key)
store.(types.StoreWithInitialVersion).SetInitialVersion(version)
}
}
return nil
}
// parsePath expects a format like /<storeName>[/<subpath>]
// Must start with /, subpath may be empty
// Returns error if it doesn't start with /
func parsePath(path string) (storeName, subpath string, err error) {
if !strings.HasPrefix(path, "/") {
return storeName, subpath, errorsmod.Wrapf(types.ErrUnknownRequest, "invalid path: %s", path)
}
paths := strings.SplitN(path[1:], "/", 2)
storeName = paths[0]
if len(paths) == 2 {
subpath = "/" + paths[1]
}
return storeName, subpath, nil
}
//---------------------- Snapshotting ------------------
// Snapshot implements snapshottypes.Snapshotter. The snapshot output for a given format must be
// identical across nodes such that chunks from different sources fit together. If the output for a
// given format changes (at the byte level), the snapshot format must be bumped - see
// TestMultistoreSnapshot_Checksum test.
func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error {
if height == 0 {
return errorsmod.Wrap(types.ErrLogic, "cannot snapshot height 0")
}
if height > uint64(GetLatestVersion(rs.db)) {
return errorsmod.Wrapf(types.ErrLogic, "cannot snapshot future height %v", height)
}
// Collect stores to snapshot (only IAVL stores are supported)
type namedStore struct {
*iavl.Store
name string
}
stores := []namedStore{}
keys := keysFromStoreKeyMap(rs.stores)
for _, key := range keys {
switch store := rs.GetCommitKVStore(key).(type) {
case *iavl.Store:
stores = append(stores, namedStore{name: key.Name(), Store: store})
case *transient.Store, *mem.Store:
// Non-persisted stores shouldn't be snapshotted
continue
default:
return errorsmod.Wrapf(types.ErrLogic,
"don't know how to snapshot store %q of type %T", key.Name(), store)
}
}
sort.Slice(stores, func(i, j int) bool {
return strings.Compare(stores[i].name, stores[j].name) == -1
})
// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
// are demarcated by new SnapshotStore items.
for _, store := range stores {
rs.logger.Debug("starting snapshot", "store", store.name, "height", height)
exporter, err := store.Export(int64(height))
if err != nil {
rs.logger.Error("snapshot failed; exporter error", "store", store.name, "err", err)
return err
}
err = func() error {
defer exporter.Close()
err := protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Store{
Store: &snapshottypes.SnapshotStoreItem{
Name: store.name,
},
},
})
if err != nil {
rs.logger.Error("snapshot failed; item store write failed", "store", store.name, "err", err)
return err
}
nodeCount := 0
for {
node, err := exporter.Next()
if err == iavltree.ErrorExportDone {
rs.logger.Debug("snapshot Done", "store", store.name, "nodeCount", nodeCount)
break
} else if err != nil {
return err
}
err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_IAVL{
IAVL: &snapshottypes.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
})
if err != nil {
return err
}
nodeCount++
}
return nil
}()
if err != nil {
return err
}
}
return nil
}
// Restore implements snapshottypes.Snapshotter.
// returns next snapshot item and error.
func (rs *Store) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
// Import nodes into stores. The first item is expected to be a SnapshotItem containing
// a SnapshotStoreItem, telling us which store to import into. The following items will contain
// SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF.
var importer *iavltree.Importer
var snapshotItem snapshottypes.SnapshotItem
loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "invalid protobuf message")
}
switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
if importer != nil {
err = importer.Commit()
if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
store, ok := rs.GetStoreByName(item.Store.Name).(*iavl.Store)
if !ok || store == nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrapf(types.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name)
}
importer, err = store.Import(int64(height))
if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "import failed")
}
defer importer.Close()
// Importer height must reflect the node height (which usually matches the block height, but not always)
rs.logger.Debug("restoring snapshot", "store", item.Store.Name)
case *snapshottypes.SnapshotItem_IAVL:
if importer == nil {
rs.logger.Error("failed to restore; received IAVL node item before store item")
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(types.ErrLogic, "received IAVL node item before store item")
}
if item.IAVL.Height > math.MaxInt8 {
return snapshottypes.SnapshotItem{}, errorsmod.Wrapf(types.ErrLogic, "node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
node := &iavltree.ExportNode{
Key: item.IAVL.Key,
Value: item.IAVL.Value,
Height: int8(item.IAVL.Height),
Version: item.IAVL.Version,
}
// Protobuf does not differentiate between []byte{} as nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 && node.Value == nil {
node.Value = []byte{}
}
err := importer.Add(node)
if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL node import failed")
}
default:
break loop
}
}
if importer != nil {
err := importer.Commit()
if err != nil {
return snapshottypes.SnapshotItem{}, errorsmod.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
rs.flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)))
return snapshotItem, rs.LoadLatestVersion()
}
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) {
var db dbm.DB
if params.db != nil {
db = dbm.NewPrefixDB(params.db, []byte("s/_/"))
} else {
prefix := "s/k:" + params.key.Name() + "/"
db = dbm.NewPrefixDB(rs.db, []byte(prefix))
}
switch params.typ {
case types.StoreTypeMulti:
panic("recursive MultiStores not yet supported")
case types.StoreTypeIAVL:
var store types.CommitKVStore
var err error
if params.initialVersion == 0 {
store, err = iavl.LoadStore(db, rs.logger, key, id, rs.iavlCacheSize, rs.iavlDisableFastNode, rs.metrics)