Skip to content

Commit

Permalink
executor: implement NULL DEFINED BY for LOAD DATA (#41541)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Feb 20, 2023
1 parent f950007 commit 9e120b6
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 195 deletions.
11 changes: 4 additions & 7 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,13 @@ func (parser *CSVParser) unescapeString(input field) (unescaped string, isNull b
if len(parser.escapedBy) > 0 {
unescaped = unescape(unescaped, "", parser.escFlavor, parser.escapedBy[0], parser.unescapeRegexp)
}
if len(parser.quote) == 0 || !parser.quotedNullIsText {
isNull = parser.escFlavor != escapeFlavorMySQLWithNull &&
!parser.cfg.NotNull &&
slices.Contains(parser.cfg.Null, unescaped)
} else if !input.quoted {
// quoted string can never be NULL except for \N, which must be escapeFlavorMySQLWithNull
if !(len(parser.quote) > 0 && parser.quotedNullIsText && input.quoted) {
// this branch represents "quote is not configured" or "quoted null is null" or "this field has no quote"
// we check null for them
isNull = !parser.cfg.NotNull &&
slices.Contains(parser.cfg.Null, unescaped)
// avoid \\N becomes NULL
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
// avoid \\N
isNull = false
}
}
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
NullInfo: v.NullInfo,
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
Expand Down
148 changes: 36 additions & 112 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package executor

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -42,7 +41,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -76,6 +74,10 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}
if e.loadDataInfo.NullInfo != nil && e.loadDataInfo.NullInfo.OptEnclosed &&
(e.loadDataInfo.FieldsInfo == nil || e.loadDataInfo.FieldsInfo.Enclosed == nil) {
return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED")
}

switch e.FileLocRef {
case ast.FileLocServerOrRemote:
Expand Down Expand Up @@ -162,6 +164,7 @@ type LoadDataInfo struct {
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
NullInfo *ast.NullDefinedBy
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum
Expand Down Expand Up @@ -516,101 +519,6 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.curBatchCnt = 0
}

// getValidData returns curData that starts from starting symbol.
// If the data doesn't have starting symbol, return curData[len(curData)-startingLen+1:] and false.
func (e *LoadDataInfo) getValidData(curData []byte) ([]byte, bool) {
idx := strings.Index(string(hack.String(curData)), e.LinesInfo.Starting)
if idx == -1 {
return curData[len(curData)-len(e.LinesInfo.Starting)+1:], false
}

return curData[idx:], true
}

// indexOfTerminator return index of terminator, if not, return -1.
// normally, the field terminator and line terminator is short, so we just use brute force algorithm.
func (e *LoadDataInfo) indexOfTerminator(bs []byte) int {
fieldTerm := []byte(e.FieldsInfo.Terminated)
fieldTermLen := len(fieldTerm)
lineTerm := []byte(e.LinesInfo.Terminated)
lineTermLen := len(lineTerm)
type termType int
const (
notTerm termType = iota
fieldTermType
lineTermType
)
// likely, fieldTermLen should equal to lineTermLen, compare fieldTerm first can avoid useless lineTerm comparison.
cmpTerm := func(restLen int, bs []byte) (typ termType) {
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
return
}
if lineTermLen > fieldTermLen && bytes.HasPrefix(lineTerm, fieldTerm) {
// unlikely, fieldTerm is prefix of lineTerm, we should compare lineTerm first.
cmpTerm = func(restLen int, bs []byte) (typ termType) {
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
return
}
}
atFieldStart := true
inQuoter := false
loop:
for i := 0; i < len(bs); i++ {
if atFieldStart && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed {
inQuoter = !inQuoter
atFieldStart = false
continue
}
restLen := len(bs) - i - 1
if inQuoter && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen, bs[i+1:]) {
case lineTermType:
return i + 1
case fieldTermType:
i += fieldTermLen
inQuoter = false
atFieldStart = true
continue loop
default:
}
}
if !inQuoter {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen+1, bs[i:]) {
case lineTermType:
return i
case fieldTermType:
i += fieldTermLen - 1
inQuoter = false
atFieldStart = true
continue loop
default:
}
}
// if it is escaped char, skip next char.
if bs[i] == e.FieldsInfo.Escaped {
i++
}
atFieldStart = false
}
return -1
}

// ReadRows reads rows from parser. When parser's reader meet EOF, it will return
// nil. For other errors it will return directly. When the rows batch is full it
// will also return nil.
Expand Down Expand Up @@ -763,27 +671,43 @@ func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) error

// GenerateCSVConfig generates a CSV config for parser from LoadDataInfo.
func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig {
var nullDef []string
if e.FieldsInfo.Enclosed != 0 {
var (
nullDef []string
quotedNullIsText = true
)

if e.NullInfo != nil {
nullDef = append(nullDef, e.NullInfo.NullDef)
quotedNullIsText = !e.NullInfo.OptEnclosed
} else if e.FieldsInfo.Enclosed != nil {
nullDef = append(nullDef, "NULL")
}
if e.FieldsInfo.Escaped != 0 {
nullDef = append(nullDef, string([]byte{e.FieldsInfo.Escaped, 'N'}))
if e.FieldsInfo.Escaped != nil {
nullDef = append(nullDef, string([]byte{*e.FieldsInfo.Escaped, 'N'}))
}

enclosed := ""
if e.FieldsInfo.Enclosed != nil {
enclosed = string([]byte{*e.FieldsInfo.Enclosed})
}
escaped := ""
if e.FieldsInfo.Escaped != nil {
escaped = string([]byte{*e.FieldsInfo.Escaped})
}

return &config.CSVConfig{
Separator: e.FieldsInfo.Terminated,
// ignore optionally enclosed
Delimiter: string([]byte{e.FieldsInfo.Enclosed}),
Terminator: e.LinesInfo.Terminated,
NotNull: false,
Null: nullDef,
Header: false,
TrimLastSep: false,
EscapedBy: string([]byte{e.FieldsInfo.Escaped}),
StartingBy: e.LinesInfo.Starting,
AllowEmptyLine: true,
// TODO: set it through NULL DEFINED BY OPTIONALLY ENCLOSED
QuotedNullIsText: true,
Delimiter: enclosed,
Terminator: e.LinesInfo.Terminated,
NotNull: false,
Null: nullDef,
Header: false,
TrimLastSep: false,
EscapedBy: escaped,
StartingBy: e.LinesInfo.Starting,
AllowEmptyLine: true,
QuotedNullIsText: quotedNullIsText,
}
}

Expand Down
Loading

0 comments on commit 9e120b6

Please sign in to comment.