Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support checkpoint read for compress files #38946

Merged
merged 9 commits into from
Nov 11, 2022
5 changes: 5 additions & 0 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
@@ -573,3 +573,8 @@ func (parser *CSVParser) ReadUntilTerminator() (int64, error) {
}
}
}

// SetRowID sets the rowID in a csv file when we start a compressed file.
func (parser *CSVParser) SetRowID(rowID int64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed because blockParser has implement SetRowID

parser.lastRow.RowID = rowID
}
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
@@ -579,6 +579,12 @@ func (pp *ParquetParser) SetLogger(l log.Logger) {
pp.logger = l
}

// SetRowID sets the rowID in a parquet file when we start a compressed file.
// It implements the Parser interface.
func (pp *ParquetParser) SetRowID(rowID int64) {
pp.lastRow.RowID = rowID
}

func jdToTime(jd int32, nsec int64) time.Time {
sec := int64(jd-jan011970) * secPerDay
// it's fine not to check the value of nsec
26 changes: 26 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
@@ -138,6 +138,8 @@ type Parser interface {
SetColumns([]string)

SetLogger(log.Logger)

SetRowID(rowID int64)
}

// NewChunkParser creates a new parser which can read chunks out of a file.
@@ -205,6 +207,11 @@ func (parser *blockParser) SetLogger(logger log.Logger) {
parser.Logger = logger
}

// SetRowID changes the reported row ID when we firstly read compressed files.
func (parser *blockParser) SetRowID(rowID int64) {
parser.lastRow.RowID = rowID
}

type token byte

const (
@@ -592,3 +599,22 @@ func ReadChunks(parser Parser, minSize int64) ([]Chunk, error) {
}
}
}

// ReadUntil parses the entire file and splits it into continuous chunks of
// size >= minSize.
func ReadUntil(parser Parser, pos int64) error {
var curOffset int64
for curOffset < pos {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
curOffset, _ = parser.Pos()

case io.EOF:
return nil

default:
return errors.Trace(err)
}
}
return nil
}
17 changes: 17 additions & 0 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tidb/util/slice"
"go.uber.org/zap"
@@ -71,6 +72,22 @@ const (
CompressionSnappy
)

// ToStorageCompressType converts Compression to storage.CompressType.
func ToStorageCompressType(compression Compression) (storage.CompressType, error) {
switch compression {
case CompressionGZ:
return storage.Gzip, nil
case CompressionSnappy:
return storage.Snappy, nil
case CompressionZStd:
return storage.Zstd, nil
case CompressionNone:
return storage.NoCompression, nil
default:
return storage.NoCompression, errors.Errorf("compression %d doesn't have related storage compressType", compression)
}
}

func parseSourceType(t string) (SourceType, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case SchemaSchema:
127 changes: 127 additions & 0 deletions br/pkg/lightning/restore/chunk_restore_test.go
Original file line number Diff line number Diff line change
@@ -15,9 +15,13 @@
package restore

import (
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"testing"

@@ -40,8 +44,10 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
tmock "github.com/pingcap/tidb/util/mock"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@@ -654,3 +660,124 @@ func (s *chunkRestoreSuite) TestRestore() {
require.NoError(s.T(), err)
require.Len(s.T(), saveCpCh, 2)
}

func TestCompressChunkRestore(t *testing.T) {
// Produce a mock table info
p := parser.New()
p.SetSQLMode(mysql.ModeANSIQuotes)
se := tmock.NewContext()
node, err := p.ParseOneStmt(`
CREATE TABLE "table" (
a INT,
b INT,
c INT,
KEY (b)
)
`, "", "")
require.NoError(t, err)
core, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 0xabcdef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have BuildTableInfoFromAST now

require.NoError(t, err)
core.State = model.StatePublic

// Write some sample CSV dump
fakeDataDir := t.TempDir()
store, err := storage.NewLocalStorage(fakeDataDir)
require.NoError(t, err)

fakeDataFiles := make([]mydump.FileInfo, 0)

csvName := "db.table.1.csv.gz"
file, err := os.Create(filepath.Join(fakeDataDir, csvName))
require.NoError(t, err)
gzWriter := gzip.NewWriter(file)

var totalBytes int64
for i := 0; i < 300; i += 3 {
n, err := gzWriter.Write([]byte(fmt.Sprintf("%d,%d,%d\r\n", i, i+1, i+2)))
require.NoError(t, err)
totalBytes += int64(n)
}

err = gzWriter.Close()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: csvName,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
SortKey: "99",
FileSize: totalBytes,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: totalBytes,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := worker.NewPool(ctx, 5, "io")
cfg := config.NewConfig()
cfg.Mydumper.BatchSize = 111
cfg.App.TableConcurrency = 2
cfg.Mydumper.CSV.Header = false

cr, err := newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
var (
id, lastID int
offset int64 = 0
rowID int64 = 0
)
for id < 100 {
offset, rowID = cr.parser.Pos()
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
lastID = id
for i := 0; id < 100 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
require.Equal(t, int64(33), rowID)

// test read starting from compress files' middle
chunk = checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: fakeDataFiles[0].FileMeta.Path, Offset: offset},
FileMeta: fakeDataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: offset,
EndOffset: totalBytes,
PrevRowIDMax: rowID,
RowIDMax: 100,
},
}
cr, err = newChunkRestore(ctx, 1, cfg, &chunk, w, store, nil)
require.NoError(t, err)
for id = lastID; id < 300; {
err = cr.parser.ReadRow()
require.NoError(t, err)
rowData := cr.parser.LastRow().Row
require.Len(t, rowData, 3)
for i := 0; id < 300 && i < 3; i++ {
require.Equal(t, strconv.Itoa(id), rowData[i].GetString())
id++
}
}
_, rowID = cr.parser.Pos()
require.Equal(t, int64(100), rowID)
err = cr.parser.ReadRow()
require.Equal(t, io.EOF, errors.Cause(err))
}
29 changes: 23 additions & 6 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
@@ -2190,11 +2190,21 @@ func newChunkRestore(
) (*chunkRestore, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)

var reader storage.ReadSeekCloser
var err error
if chunk.FileMeta.Type == mydump.SourceTypeParquet {
var (
reader storage.ReadSeekCloser
compressType storage.CompressType
err error
)
switch {
case chunk.FileMeta.Type == mydump.SourceTypeParquet:
reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize)
} else {
case chunk.FileMeta.Compression != mydump.CompressionNone:
compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression)
if err != nil {
break
}
reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path)
default:
reader, err = store.Open(ctx, chunk.FileMeta.Path)
}
if err != nil {
@@ -2225,8 +2235,15 @@ func newChunkRestore(
panic(fmt.Sprintf("file '%s' with unknown source type '%s'", chunk.Key.Path, chunk.FileMeta.Type.String()))
}

if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
if chunk.FileMeta.Compression == mydump.CompressionNone {
if err = parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax); err != nil {
return nil, errors.Trace(err)
}
} else {
if err = mydump.ReadUntil(parser, chunk.Chunk.Offset); err != nil {
return nil, errors.Trace(err)
}
parser.SetRowID(chunk.Chunk.PrevRowIDMax)
}
if len(chunk.ColumnPermutation) > 0 {
parser.SetColumns(getColumnNames(tableInfo.Core, chunk.ColumnPermutation))
20 changes: 11 additions & 9 deletions br/pkg/storage/writer.go
Original file line number Diff line number Diff line change
@@ -48,16 +48,18 @@ type interceptBuffer interface {
}

func createSuffixString(compressType CompressType) string {
if compressType == Gzip {
return ".txt.gz"
}
if compressType == Snappy {
return ".txt.snappy"
}
if compressType == Zstd {
return ".txt.zst"
txtSuffix := ".txt"
switch compressType {
case Gzip:
txtSuffix += ".gz"
case Snappy:
txtSuffix += ".snappy"
case Zstd:
txtSuffix += ".zst"
default:
return ""
}
return ""
return txtSuffix
}

func newInterceptBuffer(chunkSize int, compressType CompressType) interceptBuffer {