diff --git a/br/pkg/lightning/mydump/parquet_parser.go b/br/pkg/lightning/mydump/parquet_parser.go index 37f193666492f..e7ac2baa6d80f 100644 --- a/br/pkg/lightning/mydump/parquet_parser.go +++ b/br/pkg/lightning/mydump/parquet_parser.go @@ -579,6 +579,12 @@ func (pp *ParquetParser) SetLogger(l log.Logger) { pp.logger = l } +// SetRowID sets the rowID in a parquet file when we start a compressed file. +// It implements the Parser interface. +func (pp *ParquetParser) SetRowID(rowID int64) { + pp.lastRow.RowID = rowID +} + func jdToTime(jd int32, nsec int64) time.Time { sec := int64(jd-jan011970) * secPerDay // it's fine not to check the value of nsec diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index 1560dd4c14a44..73f84424bf5e3 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -138,6 +138,8 @@ type Parser interface { SetColumns([]string) SetLogger(log.Logger) + + SetRowID(rowID int64) } // NewChunkParser creates a new parser which can read chunks out of a file. @@ -174,6 +176,7 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error { } // Pos returns the current file offset. +// Attention: for compressed sql/csv files, pos is the position in uncompressed files func (parser *blockParser) Pos() (pos int64, lastRowID int64) { return parser.pos, parser.lastRow.RowID } @@ -205,6 +208,11 @@ func (parser *blockParser) SetLogger(logger log.Logger) { parser.Logger = logger } +// SetRowID changes the reported row ID when we firstly read compressed files. +func (parser *blockParser) SetRowID(rowID int64) { + parser.lastRow.RowID = rowID +} + type token byte const ( @@ -592,3 +600,22 @@ func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) { } } } + +// ReadUntil parses the entire file and splits it into continuous chunks of +// size >= minSize. +func ReadUntil(parser Parser, pos int64) error { + var curOffset int64 + for curOffset < pos { + switch err := parser.ReadRow(); errors.Cause(err) { + case nil: + curOffset, _ = parser.Pos() + + case io.EOF: + return nil + + default: + return errors.Trace(err) + } + } + return nil +} diff --git a/br/pkg/lightning/mydump/router.go b/br/pkg/lightning/mydump/router.go index 2ed3512edc96e..bdc2a922f12f7 100644 --- a/br/pkg/lightning/mydump/router.go +++ b/br/pkg/lightning/mydump/router.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tidb/util/slice" "go.uber.org/zap" @@ -71,6 +72,22 @@ const ( CompressionSnappy ) +// ToStorageCompressType converts Compression to storage.CompressType. +func ToStorageCompressType(compression Compression) (storage.CompressType, error) { + switch compression { + case CompressionGZ: + return storage.Gzip, nil + case CompressionSnappy: + return storage.Snappy, nil + case CompressionZStd: + return storage.Zstd, nil + case CompressionNone: + return storage.NoCompression, nil + default: + return storage.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression) + } +} + func parseSourceType(t string) (SourceType, error) { switch strings.ToLower(strings.TrimSpace(t)) { case SchemaSchema: diff --git a/br/pkg/lightning/restore/chunk_restore_test.go b/br/pkg/lightning/restore/chunk_restore_test.go index 185cc5b4219ca..452e82821c9fa 100644 --- a/br/pkg/lightning/restore/chunk_restore_test.go +++ b/br/pkg/lightning/restore/chunk_restore_test.go @@ -15,9 +15,13 @@ package restore import ( + "compress/gzip" "context" + "fmt" + "io" "os" "path/filepath" + "strconv" "sync" "testing" @@ -40,8 +44,10 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" tmock "github.com/pingcap/tidb/util/mock" + filter "github.com/pingcap/tidb/util/table-filter" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -654,3 +660,123 @@ func (s *chunkRestoreSuite) TestRestore() { require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) } + +func TestCompressChunkRestore(t *testing.T) { + // Produce a mock table info + p := parser.New() + p.SetSQLMode(mysql.ModeANSIQuotes) + node, err := p.ParseOneStmt(` + CREATE TABLE "table" ( + a INT, + b INT, + c INT, + KEY (b) + ) +`, "", "") + require.NoError(t, err) + core, err := ddl.BuildTableInfoFromAST(node.(*ast.CreateTableStmt)) + require.NoError(t, err) + core.State = model.StatePublic + + // Write some sample CSV dump + fakeDataDir := t.TempDir() + store, err := storage.NewLocalStorage(fakeDataDir) + require.NoError(t, err) + + fakeDataFiles := make([]mydump.FileInfo, 0) + + csvName := "db.table.1.csv.gz" + file, err := os.Create(filepath.Join(fakeDataDir, csvName)) + require.NoError(t, err) + gzWriter := gzip.NewWriter(file) + + var totalBytes int64 + for i := 0; i < 300; i += 3 { + n, err := gzWriter.Write([]byte(fmt.Sprintf("%d,%d,%d\r\n", i, i+1, i+2))) + require.NoError(t, err) + totalBytes += int64(n) + } + + err = gzWriter.Close() + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{ + TableName: filter.Table{Schema: "db", Name: "table"}, + FileMeta: mydump.SourceFileMeta{ + Path: csvName, + Type: mydump.SourceTypeCSV, + Compression: mydump.CompressionGZ, + SortKey: "99", + FileSize: totalBytes, + }, + }) + + chunk := checkpoints.ChunkCheckpoint{ + Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: 0}, + FileMeta: fakeDataFiles[0].FileMeta, + Chunk: mydump.Chunk{ + Offset: 0, + EndOffset: totalBytes, + PrevRowIDMax: 0, + RowIDMax: 100, + }, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + w := worker.NewPool(ctx, 5, "io") + cfg := config.NewConfig() + cfg.Mydumper.BatchSize = 111 + cfg.App.TableConcurrency = 2 + cfg.Mydumper.CSV.Header = false + + cr, err := newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil) + require.NoError(t, err) + var ( + id, lastID int + offset int64 = 0 + rowID int64 = 0 + ) + for id < 100 { + offset, rowID = cr.parser.Pos() + err = cr.parser.ReadRow() + require.NoError(t, err) + rowData := cr.parser.LastRow().Row + require.Len(t, rowData, 3) + lastID = id + for i := 0; id < 100 && i < 3; i++ { + require.Equal(t, strconv.Itoa(id), rowData[i].GetString()) + id++ + } + } + require.Equal(t, int64(33), rowID) + + // test read starting from compress files' middle + chunk = checkpoints.ChunkCheckpoint{ + Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: offset}, + FileMeta: fakeDataFiles[0].FileMeta, + Chunk: mydump.Chunk{ + Offset: offset, + EndOffset: totalBytes, + PrevRowIDMax: rowID, + RowIDMax: 100, + }, + } + cr, err = newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil) + require.NoError(t, err) + for id = lastID; id < 300; { + err = cr.parser.ReadRow() + require.NoError(t, err) + rowData := cr.parser.LastRow().Row + require.Len(t, rowData, 3) + for i := 0; id < 300 && i < 3; i++ { + require.Equal(t, strconv.Itoa(id), rowData[i].GetString()) + id++ + } + } + _, rowID = cr.parser.Pos() + require.Equal(t, int64(100), rowID) + err = cr.parser.ReadRow() + require.Equal(t, io.EOF, errors.Cause(err)) +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 81894611c410a..0a0e05b45ac5d 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2190,11 +2190,21 @@ func newChunkRestore( ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - var reader storage.ReadSeekCloser - var err error - if chunk.FileMeta.Type == mydump.SourceTypeParquet { + var ( + reader storage.ReadSeekCloser + compressType storage.CompressType + err error + ) + switch { + case chunk.FileMeta.Type == mydump.SourceTypeParquet: reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) - } else { + case chunk.FileMeta.Compression != mydump.CompressionNone: + compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression) + if err != nil { + break + } + reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path) + default: reader, err = store.Open(ctx, chunk.FileMeta.Path) } if err != nil { @@ -2225,8 +2235,15 @@ func newChunkRestore( panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String())) } - if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil { - return nil, errors.Trace(err) + if chunk.FileMeta.Compression == mydump.CompressionNone { + if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil { + return nil, errors.Trace(err) + } + } else { + if err = mydump.ReadUntil(parser, chunk.Chunk.Offset); err != nil { + return nil, errors.Trace(err) + } + parser.SetRowID(chunk.Chunk.PrevRowIDMax) } if len(chunk.ColumnPermutation) > 0 { parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation)) diff --git a/br/pkg/storage/writer.go b/br/pkg/storage/writer.go index 72d0e6dc61f4f..f61d30fa530d9 100644 --- a/br/pkg/storage/writer.go +++ b/br/pkg/storage/writer.go @@ -48,16 +48,18 @@ type interceptBuffer interface { } func createSuffixString(compressType CompressType) string { - if compressType == Gzip { - return ".txt.gz" - } - if compressType == Snappy { - return ".txt.snappy" - } - if compressType == Zstd { - return ".txt.zst" + txtSuffix := ".txt" + switch compressType { + case Gzip: + txtSuffix += ".gz" + case Snappy: + txtSuffix += ".snappy" + case Zstd: + txtSuffix += ".zst" + default: + return "" } - return "" + return txtSuffix } func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {