Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate registry from previous incorrect path #10486

Merged
merged 8 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Journalbeat*

- Migrate registry from previously incorrect path. {pull}10486[10486]

*Metricbeat*

- Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657] {pull}9746[9746]
Expand Down
53 changes: 52 additions & 1 deletion journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package checkpoint

import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -88,7 +89,10 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)
err := c.findRegistryFile()
if err != nil {
return nil, fmt.Errorf("error locating the proper registry file: %+v", err)
}

// Minimum batch size.
if c.maxUpdates < 1 {
Expand Down Expand Up @@ -123,6 +127,53 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
return c, nil
}

// Previously the registry file was written to the root folder. It was fixed on
// 7.x but not on 6.x. Thus, migration is needed, so users avoid losing state info.
func (c *Checkpoint) findRegistryFile() error {
migratedPath := paths.Resolve(paths.Data, c.file)

fs, err := os.Stat(c.file)
if os.IsNotExist(err) {
c.file = migratedPath
return nil
} else if err != nil {
return fmt.Errorf("error accessing previous registry file: %+v", err)
}

f, err := os.Open(c.file)
if err != nil {
return err
}
defer f.Close()

target, err := os.OpenFile(migratedPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fs.Mode())
if err != nil {
return err
}
defer target.Close()

if _, err := io.Copy(target, f); err != nil {
return err
}

err = target.Sync()
if err != nil {
return fmt.Errorf("error while syncing new registry file to disk: %+v", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the copy is not 'safe' without extra call to target.Sync(). Some filesystems (without journaling) also require a sync operation on the parent directory in order to sync file + directory meta data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Donz.


c.file = migratedPath

p := filepath.Dir(migratedPath)
pf, err := os.Open(p)
if err != nil {
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; pf, err := os.Open(p).

defer pf.Close()
pf.Sync()

return nil
}

// run is worker loop that reads incoming state information from the save
// channel and persists it when the number of changes reaches maxEvents or
// the amount of time since the last disk write reaches flushInterval.
Expand Down