Skip to content

Commit

Permalink
Minor pipeline test cleanup. (#1316)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Nov 6, 2022
1 parent 74a9954 commit 9214b07
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
22 changes: 14 additions & 8 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ type pipelineImpl struct {
exporter *exporters.Exporter
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state
pipelineMetadataFilePath string
pipelineMetadata state

metricsCallback []conduit.ProvideMetricsFunc
}
Expand Down Expand Up @@ -460,8 +459,13 @@ func (p *pipelineImpl) Wait() {
p.wg.Wait()
}

func metadataPath(dataDir string) string {
return path.Join(dataDir, "metadata.json")
}

func (p *pipelineImpl) encodeMetadataToFile() error {
tempFilename := fmt.Sprintf("%s.temp", p.pipelineMetadataFilePath)
pipelineMetadataFilePath := metadataPath(p.cfg.ConduitConfig.ConduitDataDir)
tempFilename := fmt.Sprintf("%s.temp", pipelineMetadataFilePath)
file, err := os.Create(tempFilename)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err)
Expand All @@ -472,18 +476,20 @@ func (p *pipelineImpl) encodeMetadataToFile() error {
return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err)
}

err = os.Rename(tempFilename, p.pipelineMetadataFilePath)
err = os.Rename(tempFilename, pipelineMetadataFilePath)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err)
}
return nil
}

func (p *pipelineImpl) initializeOrLoadBlockMetadata() (state, error) {
p.pipelineMetadataFilePath = path.Join(p.cfg.ConduitConfig.ConduitDataDir, "metadata.json")
if stat, err := os.Stat(p.pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
pipelineMetadataFilePath := metadataPath(p.cfg.ConduitConfig.ConduitDataDir)
if stat, err := os.Stat(pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
fmt.Println(err)
fmt.Println(stat)
if stat != nil && stat.Size() == 0 {
err = os.Remove(p.pipelineMetadataFilePath)
err = os.Remove(pipelineMetadataFilePath)
if err != nil {
return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err)
}
Expand All @@ -497,7 +503,7 @@ func (p *pipelineImpl) initializeOrLoadBlockMetadata() (state, error) {
return p.pipelineMetadata, fmt.Errorf("error opening file: %w", err)
}
var data []byte
data, err = os.ReadFile(p.pipelineMetadataFilePath)
data, err = os.ReadFile(pipelineMetadataFilePath)
if err != nil {
return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err)
}
Expand Down
39 changes: 22 additions & 17 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pipeline
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -90,10 +89,7 @@ func TestMakePipelineConfig(t *testing.T) {
_, err := MakePipelineConfig(l, nil)
assert.Equal(t, fmt.Errorf("MakePipelineConfig(): empty conduit config"), err)

// "" for dir will use os.TempDir()
dataDir, err := ioutil.TempDir("", "conduit_data_dir")
assert.Nil(t, err)
defer os.RemoveAll(dataDir)
dataDir := t.TempDir()

validConfigFile := `---
log-level: info
Expand Down Expand Up @@ -127,11 +123,9 @@ exporter:
assert.Equal(t, pCfg.Exporter.Name, "noop")
assert.Equal(t, pCfg.Exporter.Config["connectionstring"], "")

// "" for dir will use os.TempDir()
// invalidDataDir has no auto load file
invalidDataDir, err := ioutil.TempDir("", "conduit_data_dir")
invalidDataDir := t.TempDir()
assert.Nil(t, err)
defer os.RemoveAll(invalidDataDir)

cfgBad := &conduit.Config{ConduitDataDir: invalidDataDir}
_, err = MakePipelineConfig(l, cfgBad)
Expand Down Expand Up @@ -312,7 +306,11 @@ func TestPipelineRun(t *testing.T) {
NextRound: 0,
GenesisHash: "",
},
pipelineMetadataFilePath: filepath.Join(t.TempDir(), "metadata.json"),
cfg: &Config{
ConduitConfig: &conduit.Config{
ConduitDataDir: t.TempDir(),
},
},
}

go func() {
Expand All @@ -337,14 +335,15 @@ func TestPipelineCpuPidFiles(t *testing.T) {
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}

pidFilePath := filepath.Join(t.TempDir(), "pidfile")
cpuFilepath := filepath.Join(t.TempDir(), "cpufile")
tempDir := t.TempDir()
pidFilePath := filepath.Join(tempDir, "pidfile")
cpuFilepath := filepath.Join(tempDir, "cpufile")

pImpl := pipelineImpl{
cfg: &Config{
ConduitConfig: &conduit.Config{
Flags: nil,
ConduitDataDir: t.TempDir(),
ConduitDataDir: tempDir,
},
Importer: NameConfigPair{
Name: "",
Expand Down Expand Up @@ -401,6 +400,7 @@ func TestPipelineCpuPidFiles(t *testing.T) {

// TestPipelineErrors tests the pipeline erroring out at different stages
func TestPipelineErrors(t *testing.T) {
tempDir := t.TempDir()

mImporter := mockImporter{}
mImporter.On("GetBlock", mock.Anything).Return(uniqueBlockData, nil)
Expand All @@ -419,9 +419,13 @@ func TestPipelineErrors(t *testing.T) {

ctx, cf := context.WithCancel(context.Background())
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
cfg: &Config{},
ctx: ctx,
cf: cf,
cfg: &Config{
ConduitConfig: &conduit.Config{
ConduitDataDir: tempDir,
},
},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
Expand Down Expand Up @@ -757,11 +761,12 @@ func TestPipelineLogFile(t *testing.T) {
importers.Register("mockedImporter", mockedImpNew)
exporters.Register("mockedExporter", mockedExpNew)

logfilePath := path.Join(t.TempDir(), "conduit.log")
tempDir := t.TempDir()
logfilePath := path.Join(tempDir, "conduit.log")
configs := &Config{
ConduitConfig: &conduit.Config{
Flags: nil,
ConduitDataDir: t.TempDir(),
ConduitDataDir: tempDir,
},
Importer: NameConfigPair{
Name: "mockedImporter",
Expand Down

0 comments on commit 9214b07

Please sign in to comment.