diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index e6246245e..ce5bf7ade 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -154,8 +154,7 @@ type pipelineImpl struct { exporter *exporters.Exporter completeCallback []conduit.OnCompleteFunc - pipelineMetadata state - pipelineMetadataFilePath string + pipelineMetadata state metricsCallback []conduit.ProvideMetricsFunc } @@ -460,8 +459,13 @@ func (p *pipelineImpl) Wait() { p.wg.Wait() } +func metadataPath(dataDir string) string { + return path.Join(dataDir, "metadata.json") +} + func (p *pipelineImpl) encodeMetadataToFile() error { - tempFilename := fmt.Sprintf("%s.temp", p.pipelineMetadataFilePath) + pipelineMetadataFilePath := metadataPath(p.cfg.ConduitConfig.ConduitDataDir) + tempFilename := fmt.Sprintf("%s.temp", pipelineMetadataFilePath) file, err := os.Create(tempFilename) if err != nil { return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err) @@ -472,7 +476,7 @@ func (p *pipelineImpl) encodeMetadataToFile() error { return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err) } - err = os.Rename(tempFilename, p.pipelineMetadataFilePath) + err = os.Rename(tempFilename, pipelineMetadataFilePath) if err != nil { return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err) } @@ -480,10 +484,12 @@ func (p *pipelineImpl) encodeMetadataToFile() error { } func (p *pipelineImpl) initializeOrLoadBlockMetadata() (state, error) { - p.pipelineMetadataFilePath = path.Join(p.cfg.ConduitConfig.ConduitDataDir, "metadata.json") - if stat, err := os.Stat(p.pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) { + pipelineMetadataFilePath := metadataPath(p.cfg.ConduitConfig.ConduitDataDir) + if stat, err := os.Stat(pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) { + fmt.Println(err) + fmt.Println(stat) if stat != nil && stat.Size() == 0 { - err = os.Remove(p.pipelineMetadataFilePath) + err = os.Remove(pipelineMetadataFilePath) if err != nil { return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err) } @@ -497,7 +503,7 @@ func (p *pipelineImpl) initializeOrLoadBlockMetadata() (state, error) { return p.pipelineMetadata, fmt.Errorf("error opening file: %w", err) } var data []byte - data, err = os.ReadFile(p.pipelineMetadataFilePath) + data, err = os.ReadFile(pipelineMetadataFilePath) if err != nil { return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err) } diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index f30a38509..a48102a32 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -3,7 +3,6 @@ package pipeline import ( "context" "fmt" - "io/ioutil" "net/http" "os" "path" @@ -90,10 +89,7 @@ func TestMakePipelineConfig(t *testing.T) { _, err := MakePipelineConfig(l, nil) assert.Equal(t, fmt.Errorf("MakePipelineConfig(): empty conduit config"), err) - // "" for dir will use os.TempDir() - dataDir, err := ioutil.TempDir("", "conduit_data_dir") - assert.Nil(t, err) - defer os.RemoveAll(dataDir) + dataDir := t.TempDir() validConfigFile := `--- log-level: info @@ -127,11 +123,9 @@ exporter: assert.Equal(t, pCfg.Exporter.Name, "noop") assert.Equal(t, pCfg.Exporter.Config["connectionstring"], "") - // "" for dir will use os.TempDir() // invalidDataDir has no auto load file - invalidDataDir, err := ioutil.TempDir("", "conduit_data_dir") + invalidDataDir := t.TempDir() assert.Nil(t, err) - defer os.RemoveAll(invalidDataDir) cfgBad := &conduit.Config{ConduitDataDir: invalidDataDir} _, err = MakePipelineConfig(l, cfgBad) @@ -312,7 +306,11 @@ func TestPipelineRun(t *testing.T) { NextRound: 0, GenesisHash: "", }, - pipelineMetadataFilePath: filepath.Join(t.TempDir(), "metadata.json"), + cfg: &Config{ + ConduitConfig: &conduit.Config{ + ConduitDataDir: t.TempDir(), + }, + }, } go func() { @@ -337,14 +335,15 @@ func TestPipelineCpuPidFiles(t *testing.T) { var pProcessor processors.Processor = &mockProcessor{} var pExporter exporters.Exporter = &mockExporter{} - pidFilePath := filepath.Join(t.TempDir(), "pidfile") - cpuFilepath := filepath.Join(t.TempDir(), "cpufile") + tempDir := t.TempDir() + pidFilePath := filepath.Join(tempDir, "pidfile") + cpuFilepath := filepath.Join(tempDir, "cpufile") pImpl := pipelineImpl{ cfg: &Config{ ConduitConfig: &conduit.Config{ Flags: nil, - ConduitDataDir: t.TempDir(), + ConduitDataDir: tempDir, }, Importer: NameConfigPair{ Name: "", @@ -401,6 +400,7 @@ func TestPipelineCpuPidFiles(t *testing.T) { // TestPipelineErrors tests the pipeline erroring out at different stages func TestPipelineErrors(t *testing.T) { + tempDir := t.TempDir() mImporter := mockImporter{} mImporter.On("GetBlock", mock.Anything).Return(uniqueBlockData, nil) @@ -419,9 +419,13 @@ func TestPipelineErrors(t *testing.T) { ctx, cf := context.WithCancel(context.Background()) pImpl := pipelineImpl{ - ctx: ctx, - cf: cf, - cfg: &Config{}, + ctx: ctx, + cf: cf, + cfg: &Config{ + ConduitConfig: &conduit.Config{ + ConduitDataDir: tempDir, + }, + }, logger: log.New(), initProvider: nil, importer: &pImporter, @@ -757,11 +761,12 @@ func TestPipelineLogFile(t *testing.T) { importers.Register("mockedImporter", mockedImpNew) exporters.Register("mockedExporter", mockedExpNew) - logfilePath := path.Join(t.TempDir(), "conduit.log") + tempDir := t.TempDir() + logfilePath := path.Join(tempDir, "conduit.log") configs := &Config{ ConduitConfig: &conduit.Config{ Flags: nil, - ConduitDataDir: t.TempDir(), + ConduitDataDir: tempDir, }, Importer: NameConfigPair{ Name: "mockedImporter",