Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit: Default to plugin data directory when possible. #1366

Merged
merged 3 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
Expand Down
5 changes: 3 additions & 2 deletions conduit/plugins/exporters/filewriter/file_exporter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package filewriter

// Config specific to the file exporter
type Config struct {
// BlocksDir is the path to a directory where block data should be stored.
// The directory is created if it doesn't exist.
// BlocksDir is an optional path to a directory where block data should be
// stored. The directory is created if it doesn't exist. If no directory is
// provided the default plugin data directory is used.
BlocksDir string `yaml:"block-dir"`
// FilenamePattern is the format used to write block files. It uses go
// string formatting and should accept one number for the round.
Expand Down
43 changes: 43 additions & 0 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package filewriter
import (
"context"
"fmt"
"path"
"testing"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,6 +49,48 @@ func TestExporterMetadata(t *testing.T) {
assert.Equal(t, metadata.Deprecated, meta.Deprecated)
}

func TestExporterInitDefaults(t *testing.T) {
tempdir := t.TempDir()
override := path.Join(tempdir, "override")

testcases := []struct {
blockdir string
expected string
}{
{
blockdir: "",
expected: tempdir,
},
{
blockdir: "''",
expected: tempdir,
},
{
blockdir: override,
expected: override,
},
{
blockdir: fmt.Sprintf("'%s'", override),
expected: override,
},
}

for _, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("blockdir=%s", tc.blockdir), func(t *testing.T) {
t.Parallel()
fileExp := fileCons.New()
defer fileExp.Close()
pcfg := plugins.MakePluginConfig(fmt.Sprintf("block-dir: %s", tc.blockdir))
pcfg.DataDir = tempdir
err := fileExp.Init(context.Background(), testutil.MockedInitProvider(&round), pcfg, logger)
require.NoError(t, err)
pluginConfig := fileExp.Config()
assert.Contains(t, pluginConfig, fmt.Sprintf("block-dir: %s", tc.expected))
})
}
}

func TestExporterInit(t *testing.T) {
config, _ := getConfig(t)
fileExp := fileCons.New()
Expand Down
5 changes: 3 additions & 2 deletions conduit/plugins/exporters/filewriter/sample.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
name: "file_writer"
config:
# BlocksDir is the path to a directory where block data should be stored.
# The directory is created if it doesn't exist.
# BlocksDir is an optional path to a directory where block data will be
# stored. The directory is created if it doesn't exist. If not present the
# plugin data directory is used.
block-dir: "/path/to/block/files"
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
Expand Down
8 changes: 6 additions & 2 deletions conduit/plugins/processors/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv
return fmt.Errorf("blockprocessor init error: %w", err)
}

if proc.cfg.LedgerDir == "" {
proc.cfg.LedgerDir = cfg.DataDir
}

genesis := initProvider.GetGenesis()
round := uint64(initProvider.NextDBRound())

Expand All @@ -95,7 +99,7 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv
return fmt.Errorf("could not initialize ledger: %w", err)
}

l, err := util.MakeLedger(proc.logger, false, genesis, proc.cfg.IndexerDatadir)
l, err := util.MakeLedger(proc.logger, false, genesis, proc.cfg.LedgerDir)
if err != nil {
return fmt.Errorf("could not make ledger: %w", err)
}
Expand Down Expand Up @@ -197,7 +201,7 @@ func MakeBlockProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, n
if err != nil {
return nil, fmt.Errorf("MakeBlockProcessorWithLedgerInit() err: %w", err)
}
return MakeBlockProcessor(logger, genesis, nextDbRound, config.IndexerDatadir, handler)
return MakeBlockProcessor(logger, genesis, nextDbRound, config.LedgerDir, handler)
}

// MakeBlockProcessor creates a block processor
Expand Down
8 changes: 4 additions & 4 deletions conduit/plugins/processors/blockprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ type Config struct {
// Catchpoint to initialize the local ledger to
Catchpoint string `yaml:"catchpoint"`

IndexerDatadir string `yaml:"data-dir"`
AlgodDataDir string `yaml:"algod-data-dir"`
AlgodToken string `yaml:"algod-token"`
AlgodAddr string `yaml:"algod-addr"`
LedgerDir string `yaml:"ledger-dir"`
AlgodDataDir string `yaml:"algod-data-dir"`
AlgodToken string `yaml:"algod-token"`
AlgodAddr string `yaml:"algod-addr"`
}
77 changes: 75 additions & 2 deletions conduit/plugins/processors/blockprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
package blockprocessor

import (
"context"
"fmt"
"path"
"testing"

"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"

"github.com/algorand/indexer/conduit/plugins"
"github.com/algorand/indexer/conduit/plugins/processors"
testutil "github.com/algorand/indexer/util/test"
)

func TestConfigDeserialize(t *testing.T) {

configStr := `---
catchpoint: "acatch"
data-dir: "idx_data_dir"
ledger-dir: "idx_data_dir"
algod-data-dir: "algod_data_dir"
algod-token: "algod_token"
algod-addr: "algod_addr"
Expand All @@ -21,9 +33,70 @@ func TestConfigDeserialize(t *testing.T) {
err := yaml.Unmarshal([]byte(configStr), &processorConfig)
require.Nil(t, err)
require.Equal(t, processorConfig.Catchpoint, "acatch")
require.Equal(t, processorConfig.IndexerDatadir, "idx_data_dir")
require.Equal(t, processorConfig.LedgerDir, "idx_data_dir")
require.Equal(t, processorConfig.AlgodDataDir, "algod_data_dir")
require.Equal(t, processorConfig.AlgodToken, "algod_token")
require.Equal(t, processorConfig.AlgodAddr, "algod_addr")

}

var cons = processors.ProcessorConstructorFunc(func() processors.Processor {
return &blockProcessor{}
})

func TestInitDefaults(t *testing.T) {
tempdir := t.TempDir()
override := path.Join(tempdir, "override")
var round = basics.Round(0)
ip := testutil.MockedInitProvider(&round)
var addr basics.Address
ip.Genesis = &bookkeeping.Genesis{
SchemaID: "test",
Network: "test",
Proto: "future",
Allocation: nil,
RewardsPool: addr.String(),
FeeSink: addr.String(),
Timestamp: 1234,
Comment: "",
DevMode: true,
}
logger, _ := test.NewNullLogger()

testcases := []struct {
ledgerdir string
expected string
}{
{
ledgerdir: "",
expected: tempdir,
},
{
ledgerdir: "''",
expected: tempdir,
},
{
ledgerdir: override,
expected: override,
},
{
ledgerdir: fmt.Sprintf("'%s'", override),
expected: override,
},
}

for _, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("ledgerdir: %s", tc.ledgerdir), func(t *testing.T) {
t.Parallel()
proc := cons.New()
defer proc.Close()
pcfg := plugins.MakePluginConfig(fmt.Sprintf("ledger-dir: %s", tc.ledgerdir))
pcfg.DataDir = tempdir
err := proc.Init(context.Background(), ip, pcfg, logger)
require.NoError(t, err)
pluginConfig := proc.Config()
assert.Contains(t, pluginConfig, fmt.Sprintf("ledger-dir: %s", tc.expected))
})
}
}
6 changes: 3 additions & 3 deletions conduit/plugins/processors/blockprocessor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func InitializeLedger(ctx context.Context, logger *log.Logger, nextDbRound uint6
if uint64(round) >= nextDbRound {
return fmt.Errorf("invalid catchpoint: catchpoint round %d should not be ahead of target round %d", uint64(round), nextDbRound-1)
}
err = InitializeLedgerFastCatchup(ctx, logger, config.Catchpoint, config.IndexerDatadir, genesis)
err = InitializeLedgerFastCatchup(ctx, logger, config.Catchpoint, config.LedgerDir, genesis)
if err != nil {
return fmt.Errorf("InitializeLedger() fast catchup err: %w", err)
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint6
defer cf()
var bot fetcher.Fetcher
var err error
if config.IndexerDatadir == "" {
if config.LedgerDir == "" {
return fmt.Errorf("InitializeLedgerSimple() err: indexer data directory missing")
}
// create algod client
Expand All @@ -73,7 +73,7 @@ func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint6
}
logger.Info("initializing ledger")

proc, err := MakeBlockProcessor(logger, genesis, round, config.IndexerDatadir, nil)
proc, err := MakeBlockProcessor(logger, genesis, round, config.LedgerDir, nil)
if err != nil {
return fmt.Errorf("RunMigration() err: %w", err)
}
Expand Down
15 changes: 6 additions & 9 deletions conduit/plugins/processors/blockprocessor/initialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package blockprocessor
import (
"context"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -58,19 +57,17 @@ func TestRunMigration(t *testing.T) {
httpmock.RegisterResponder("GET", `=~^http://localhost/v2/status/wait-for-block-after/\d+\z`,
httpmock.NewStringResponder(200, string(json.Encode(algod.Status{}))))

dname, err := os.MkdirTemp("", "indexer")
defer os.RemoveAll(dname)
config := Config{
IndexerDatadir: dname,
AlgodAddr: "localhost",
AlgodToken: "AAAAA",
LedgerDir: t.TempDir(),
AlgodAddr: "localhost",
AlgodToken: "AAAAA",
}

// migrate 3 rounds
log, _ := test2.NewNullLogger()
err = InitializeLedgerSimple(context.Background(), log, 3, &genesis, &config)
err := InitializeLedgerSimple(context.Background(), log, 3, &genesis, &config)
assert.NoError(t, err)
l, err := util.MakeLedger(log, false, &genesis, config.IndexerDatadir)
l, err := util.MakeLedger(log, false, &genesis, config.LedgerDir)
assert.NoError(t, err)
// check 3 rounds written to ledger
assert.Equal(t, uint64(3), uint64(l.Latest()))
Expand All @@ -80,7 +77,7 @@ func TestRunMigration(t *testing.T) {
err = InitializeLedgerSimple(context.Background(), log, 6, &genesis, &config)
assert.NoError(t, err)

l, err = util.MakeLedger(log, false, &genesis, config.IndexerDatadir)
l, err = util.MakeLedger(log, false, &genesis, config.LedgerDir)
assert.NoError(t, err)
assert.Equal(t, uint64(6), uint64(l.Latest()))
l.Close()
Expand Down
11 changes: 5 additions & 6 deletions conduit/plugins/processors/blockprocessor/sample.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
name: block_evaluator
config:
# Catchpoint is the catchpoint you wish the block evaluator to use
catchpoint: "acatch"
# DataDir is the data directory for the Indexer process. It should be a full directory
# path and is used for reading in config and writing some output. See
# Algorand docs for further documentation
data-dir: "/tmp"
catchpoint: "acatchpoint"
# LedgerDir is an optional path to a directory where the ledger will be
# written. It should be a full directory path. If not present the plugin
# data directory is used.
ledger-dir: ""
# AlgodDataDir is the algod data directory. It should be a full directory path.
# See Algorand docs for further documentation
algod-data-dir: "algod_data_dir"
# AlgodToken is the token to connect to algod.
algod-token: "algod_token"
Expand Down
5 changes: 3 additions & 2 deletions docs/conduit/plugins/block_evaluator.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

Enrich raw block data with a "State Delta" object which contains new account values. For example: new account balances, new application states, which assets have been created or deleted, etc.

The block evaluator computes the delta by maintaining a local ledger and evaluating each block locally.
The block evaluator computes the delta by maintaining a local ledger and evaluating each block locally. By default this data is written to the block evaluator plugin directory inside the indexer data directory.

State Delta's are required by some exporters.


## algod requirement

It is sometimes useful, or necessary to re-initialize the local ledger. To expedite that process a direct connection to algod is used.
Expand All @@ -29,7 +30,7 @@ For example, if you want to get **Mainnet** round `22212765`, you would refer to
processors:
- name: block_evaluator
config:
- data-dir: "location where the local ledger will be stored."
- ledger-dir: "override default local ledger location."
algod-data-dir: "local algod data directory"
algod-addr: "algod URL"
algod-token: "algod token"
Expand Down
4 changes: 3 additions & 1 deletion docs/conduit/plugins/file_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ Write the block data to a file.

Data is written to one file per block in JSON format.

By default data is written to the filewriter plugin directory inside the indexer data directory.

# Config
```yaml
exporter:
- name: file_writer
config:
- block-dir: "path to write block data"
- block-dir: "override default block data location."
# override the filename pattern.
filename-pattern: "%[1]d_block.json"
# exclude the vote certificate from the file.
Expand Down