Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage Refactor] Refactor ConsumerProgress #6872

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
45 changes: 27 additions & 18 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -551,8 +554,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -606,21 +609,26 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -847,15 +855,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressInitializer

builder.
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1632,8 +1640,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedFinalizedBlockHeight storage.ConsumerProgress
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1837,17 +1845,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
processedFinalizedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
rootBlockHeight := node.State.Params().FinalizedRoot().Height

var err error
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
rootBlockHeight,
)
progress, err := store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight)
if err != nil {
return err
}

lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress)
if err != nil {
return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err)
}
Expand Down Expand Up @@ -2148,8 +2157,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

if builder.storeTxResultErrorMessages {
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
builder.DB,
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
badgerimpl.ToDB(builder.DB),
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)
return nil
Expand Down
24 changes: 16 additions & 8 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ import (
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/grpcutils"
"github.com/onflow/flow-go/utils/io"
)
Expand Down Expand Up @@ -1056,8 +1059,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressInitializer
var processedNotifications storage.ConsumerProgressInitializer
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1111,21 +1114,26 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}

processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
var db storage.DB
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
}
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1310,11 +1318,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataPruner, nil
})
if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressInitializer

builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
Expand Down
17 changes: 10 additions & 7 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ import (
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
)

type VerificationConfig struct {
Expand Down Expand Up @@ -88,11 +91,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var (
followerState protocol.FollowerState

chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgress // used in chunk consumer
processedBlockHeight *badger.ConsumerProgress // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer
chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex storage.ConsumerProgressInitializer // used in chunk consumer
processedBlockHeight storage.ConsumerProgressInitializer // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -155,11 +158,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("processed chunk index consumer progress", func(node *NodeConfig) error {
processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex)
processedChunkIndex = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationChunkIndex)
return nil
}).
Module("processed block height consumer progress", func(node *NodeConfig) error {
processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight)
processedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(node.DB), module.ConsumeProgressVerificationBlockHeight)
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
Expand Down
37 changes: 20 additions & 17 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
Expand Down Expand Up @@ -684,14 +686,13 @@ func (suite *Suite) TestGetSealedTransaction() {
)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
progress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(progress)
require.NoError(suite.T(), err)

// create the ingest engine
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

ingestEng, err := ingestion.New(
suite.log,
Expand Down Expand Up @@ -874,12 +875,13 @@ func (suite *Suite) TestGetTransactionResult() {
)
require.NoError(suite.T(), err)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
lastFullBlockHeightProgress, err := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight).
Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
require.NoError(suite.T(), err)

// create the ingest engine
Expand All @@ -896,7 +898,7 @@ func (suite *Suite) TestGetTransactionResult() {
results,
receipts,
collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
lastFullBlockHeight,
nil,
)
Expand Down Expand Up @@ -1130,12 +1132,13 @@ func (suite *Suite) TestExecuteScript() {
suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil).
Once()

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressIngestionEngineBlockHeight)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
lastFullBlockHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(db), module.ConsumeProgressLastFullBlockHeight)
lastFullBlockHeightProgress, err := lastFullBlockHeightInitializer.Initialize(suite.rootBlock.Height)
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeightProgress)
require.NoError(suite.T(), err)

// create the ingest engine
Expand All @@ -1152,7 +1155,7 @@ func (suite *Suite) TestExecuteScript() {
results,
receipts,
collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
lastFullBlockHeight,
nil,
)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func New(
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
collectionExecutedMetric module.CollectionExecutedMetric,
finalizedProcessedHeight storage.ConsumerProgress,
finalizedProcessedHeight storage.ConsumerProgressInitializer,
lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
) (*Engine, error) {
Expand Down
16 changes: 8 additions & 8 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
"github.com/onflow/flow-go/network/mocknetwork"
protocol "github.com/onflow/flow-go/state/protocol/mock"
storerr "github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
)
Expand Down Expand Up @@ -186,13 +187,12 @@ func (s *Suite) SetupTest() {

// initIngestionEngine create new instance of ingestion engine and waits when it starts
func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeightInitializer := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressIngestionEngineBlockHeight)

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
s.finalizedBlock.Height,
)
lastFullBlockHeight, err := store.NewConsumerProgress(badgerimpl.ToDB(s.db), module.ConsumeProgressLastFullBlockHeight).Initialize(s.finalizedBlock.Height)
require.NoError(s.T(), err)

s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(lastFullBlockHeight)
require.NoError(s.T(), err)

eng, err := New(
Expand All @@ -208,7 +208,7 @@ func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
s.results,
s.receipts,
s.collectionExecutedMetric,
processedHeight,
processedHeightInitializer,
s.lastFullBlockHeight,
nil,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(
log zerolog.Logger,
state protocol.State,
headers storage.Headers,
txErrorMessagesProcessedHeight storage.ConsumerProgress,
txErrorMessagesProcessedHeight storage.ConsumerProgressInitializer,
txErrorMessagesCore *TxErrorMessagesCore,
) (*Engine, error) {
e := &Engine{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
protocol "github.com/onflow/flow-go/state/protocol/mock"
bstorage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
)
Expand Down Expand Up @@ -131,8 +132,8 @@ func (s *TxErrorMessagesEngineSuite) SetupTest() {
// initEngine creates a new instance of the transaction error messages engine
// and waits for it to start. It initializes the engine with mocked components and state.
func (s *TxErrorMessagesEngineSuite) initEngine(ctx irrecoverable.SignalerContext) *Engine {
processedTxErrorMessagesBlockHeight := bstorage.NewConsumerProgress(
s.db,
processedTxErrorMessagesBlockHeight := store.NewConsumerProgress(
badgerimpl.ToDB(s.db),
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
)

Expand Down
Loading