Skip to content

Commit

Permalink
catchpointdump: supports tar.gz files (#4743)
Browse files Browse the repository at this point in the history
  • Loading branch information
almog-t authored Nov 3, 2022
1 parent 3701d29 commit 4ac719a
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"archive/tar"
"bufio"
"compress/gzip"
"context"
"database/sql"
"encoding/base64"
Expand All @@ -43,12 +44,12 @@ import (
"github.com/algorand/go-algorand/util/db"
)

var tarFile string
var catchpointFile string
var outFileName string
var excludedFields *cmdutil.CobraStringSliceValue = cmdutil.MakeCobraStringSliceValue(nil, []string{"version", "catchpoint"})

func init() {
fileCmd.Flags().StringVarP(&tarFile, "tar", "t", "", "Specify the tar file to process")
fileCmd.Flags().StringVarP(&catchpointFile, "tar", "t", "", "Specify the catchpoint file (either .tar or .tar.gz) to process")
fileCmd.Flags().StringVarP(&outFileName, "output", "o", "", "Specify an outfile for the dump ( i.e. tracker.dump.txt )")
fileCmd.Flags().BoolVarP(&loadOnly, "load", "l", false, "Load only, do not dump")
fileCmd.Flags().VarP(excludedFields, "exclude-fields", "e", "List of fields to exclude from the dump: ["+excludedFields.AllowedString()+"]")
Expand All @@ -60,18 +61,18 @@ var fileCmd = &cobra.Command{
Long: "Specify a file to dump",
Args: validateNoPosArgsFn,
Run: func(cmd *cobra.Command, args []string) {
if tarFile == "" {
if catchpointFile == "" {
cmd.HelpFunc()(cmd, args)
return
}
stats, err := os.Stat(tarFile)
stats, err := os.Stat(catchpointFile)
if err != nil {
reportErrorf("Unable to stat '%s' : %v", tarFile, err)
reportErrorf("Unable to stat '%s' : %v", catchpointFile, err)
}

tarSize := stats.Size()
if tarSize == 0 {
reportErrorf("Empty file '%s' : %v", tarFile, err)
catchpointSize := stats.Size()
if catchpointSize == 0 {
reportErrorf("Empty file '%s' : %v", catchpointFile, err)
}
// TODO: store CurrentProtocol in catchpoint file header.
// As a temporary workaround use a current protocol version.
Expand Down Expand Up @@ -105,13 +106,13 @@ var fileCmd = &cobra.Command{
}
var fileHeader ledger.CatchpointFileHeader

reader, err := os.Open(tarFile)
reader, err := os.Open(catchpointFile)
if err != nil {
reportErrorf("Unable to read '%s' : %v", tarFile, err)
reportErrorf("Unable to read '%s' : %v", catchpointFile, err)
}
defer reader.Close()

fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, catchpointSize)
if err != nil {
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
}
Expand Down Expand Up @@ -148,14 +149,49 @@ func printLoadCatchpointProgressLine(progress int, barLength int, dld int64) {
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
}

func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, tarFile io.Reader, tarSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
func isGzipCompressed(catchpointReader *bufio.Reader, catchpointFileSize int64) bool {
const gzipPrefixSize = 2
const gzipPrefix = "\x1F\x8B"

if catchpointFileSize < gzipPrefixSize {
return false
}

prefixBytes, err := catchpointReader.Peek(gzipPrefixSize)

if err != nil {
return false
}

return prefixBytes[0] == gzipPrefix[0] && prefixBytes[1] == gzipPrefix[1]
}

func getCatchpointTarReader(catchpointReader *bufio.Reader, catchpointFileSize int64) (*tar.Reader, error) {
if isGzipCompressed(catchpointReader, catchpointFileSize) {
gzipReader, err := gzip.NewReader(catchpointReader)
if err != nil {
return nil, err
}

return tar.NewReader(gzipReader), nil
}

return tar.NewReader(catchpointReader), nil
}

func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, catchpointFile io.Reader, catchpointFileSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
fmt.Printf("\n")
printLoadCatchpointProgressLine(0, 50, 0)
lastProgressUpdate := time.Now()
progress := uint64(0)
defer printLoadCatchpointProgressLine(0, 0, 0)

tarReader := tar.NewReader(tarFile)
catchpointReader := bufio.NewReader(catchpointFile)
tarReader, err := getCatchpointTarReader(catchpointReader, catchpointFileSize)
if err != nil {
return fileHeader, err
}

var downloadProgress ledger.CatchpointCatchupAccessorProgress
for {
header, err := tarReader.Next()
Expand Down Expand Up @@ -190,9 +226,9 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
// we already know it's valid, since we validated that above.
protocol.Decode(balancesBlockBytes, &fileHeader)
}
if time.Since(lastProgressUpdate) > 50*time.Millisecond && tarSize > 0 {
if time.Since(lastProgressUpdate) > 50*time.Millisecond && catchpointFileSize > 0 {
lastProgressUpdate = time.Now()
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(tarSize)), 50, int64(progress))
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(catchpointFileSize)), 50, int64(progress))
}
}
}
Expand Down

0 comments on commit 4ac719a

Please sign in to comment.