From 0d47a5e85cd37ba45901ea8b76b3e8ef8fcecdd1 Mon Sep 17 00:00:00 2001 From: dsdashun Date: Tue, 31 Jan 2023 13:45:55 +0800 Subject: [PATCH] lightning: enable setting conflict max-error (#40874) ref pingcap/tidb#40743 --- br/pkg/lightning/config/config.go | 60 +++++++-- br/pkg/lightning/config/config_test.go | 121 ++++++++++++++++++ br/pkg/lightning/errormanager/errormanager.go | 31 +++-- .../data/mytest.testtbl-schema.sql | 5 + .../data/mytest.testtbl.csv | 16 +++ .../err_config.toml | 8 ++ .../normal_config.toml | 8 ++ .../normal_config_old_style.toml | 8 ++ br/tests/lightning_config_max_error/run.sh | 81 ++++++++++++ 9 files changed, 319 insertions(+), 19 deletions(-) create mode 100644 br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql create mode 100644 br/tests/lightning_config_max_error/data/mytest.testtbl.csv create mode 100644 br/tests/lightning_config_max_error/err_config.toml create mode 100644 br/tests/lightning_config_max_error/normal_config.toml create mode 100644 br/tests/lightning_config_max_error/normal_config_old_style.toml create mode 100755 br/tests/lightning_config_max_error/run.sh diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index b5236127f5ee1..d14f12066c0f4 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -343,32 +343,64 @@ type MaxError struct { // In TiDB backend, this also includes all possible SQL errors raised from INSERT, // such as unique key conflict when `on-duplicate` is set to `error`. // When tolerated, the row causing the error will be skipped, and adds 1 to the counter. + // The default value is zero, which means that such errors are not tolerated. Type atomic.Int64 `toml:"type" json:"type"` // Conflict is the maximum number of unique key conflicts in local backend accepted. // When tolerated, every pair of conflict adds 1 to the counter. // Those pairs will NOT be deleted from the target. Conflict resolution is performed separately. - // TODO Currently this is hard-coded to infinity. - Conflict atomic.Int64 `toml:"conflict" json:"-"` + // The default value is max int64, which means conflict errors will be recorded as much as possible. + // Sometime the actual number of conflict record logged will be greater than the value configured here, + // because conflict error data are recorded batch by batch. + // If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported. + Conflict atomic.Int64 `toml:"conflict" json:"conflict"` } func (cfg *MaxError) UnmarshalTOML(v interface{}) error { + defaultValMap := map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + } + // set default value first + cfg.Syntax.Store(defaultValMap["syntax"]) + cfg.Charset.Store(defaultValMap["charset"]) + cfg.Type.Store(defaultValMap["type"]) + cfg.Conflict.Store(defaultValMap["conflict"]) switch val := v.(type) { case int64: // ignore val that is smaller than 0 - if val < 0 { - val = 0 + if val >= 0 { + // only set type error + cfg.Type.Store(val) } - cfg.Syntax.Store(0) - cfg.Charset.Store(math.MaxInt64) - cfg.Type.Store(val) - cfg.Conflict.Store(math.MaxInt64) return nil case map[string]interface{}: - // TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful. + // support stuff like `max-error = { charset = 1000, type = 1000 }`. + getVal := func(k string, v interface{}) int64 { + defaultVal, ok := defaultValMap[k] + if !ok { + return 0 + } + iVal, ok := v.(int64) + if !ok || iVal < 0 { + return defaultVal + } + return iVal + } + for k, v := range val { + switch k { + case "type": + cfg.Type.Store(getVal(k, v)) + case "conflict": + cfg.Conflict.Store(getVal(k, v)) + } + } + return nil default: + return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v) } - return errors.Errorf("invalid max-error '%v', should be an integer", v) } // DuplicateResolutionAlgorithm is the config type of how to resolve duplicates. @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error { unusedGlobalKeyStrs[key.String()] = struct{}{} } +iterateUnusedKeys: for _, key := range unusedConfigKeys { keyStr := key.String() + switch keyStr { + // these keys are not counted as decoded by toml decoder, but actually they are decoded, + // because the corresponding unmarshal logic handles these key's decoding in a custom way + case "lightning.max-error.type", + "lightning.max-error.conflict": + continue iterateUnusedKeys + } if _, found := unusedGlobalKeyStrs[keyStr]; found { bothUnused = append(bothUnused, keyStr) } else { diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 16db98845e80c..f590391740ec4 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -19,6 +19,7 @@ import ( "context" "flag" "fmt" + "math" "net" "net/http" "net/http/httptest" @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) { require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error()) } +func TestMaxErrorUnmarshal(t *testing.T) { + type testCase struct { + TOMLStr string + ExpectedValues map[string]int64 + ExpectErrStr string + CaseName string + } + for _, tc := range []*testCase{ + { + TOMLStr: `max-error = 123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": math.MaxInt64, + }, + CaseName: "Normal_Int", + }, + { + TOMLStr: `max-error = -123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + }, + CaseName: "Abnormal_Negative_Int", + }, + { + TOMLStr: `max-error = "abcde"`, + ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64", + CaseName: "Abnormal_String", + }, + { + TOMLStr: `[max-error] +syntax = 1 +charset = 2 +type = 3 +conflict = 4 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 3, + "conflict": 4, + }, + CaseName: "Normal_Map_All_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set", + }, + { + TOMLStr: `max-error = { conflict = 1000, type = 123 }`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": 1000, + }, + CaseName: "Normal_OneLineMap_Partial_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +not_exist = 123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Key", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = -123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Value", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = abc +`, + ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`, + CaseName: "Normal_Map_Partial_Set_Invalid_ValueType", + }, + } { + targetLightningCfg := new(config.Lightning) + err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg) + if len(tc.ExpectErrStr) > 0 { + require.Errorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName) + } else { + require.NoErrorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName) + } + } +} + func TestDurationMarshalJSON(t *testing.T) { duration := config.Duration{} err := duration.UnmarshalText([]byte("13m20s")) diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 373ba572779d4..4085226063d38 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError( if em.remainingError.Type.Dec() < 0 { threshold := em.configError.Type.Load() if threshold > 0 { - encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'", + encodeErr = errors.Annotatef(encodeErr, + "The number of type errors exceeds the threshold configured by `max-error.type`: '%d'", em.configError.Type.Load()) } return encodeErr @@ -241,17 +242,20 @@ func (em *ErrorManager) RecordDataConflictError( tableName string, conflictInfos []DataConflictInfo, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -259,7 +263,7 @@ func (em *ErrorManager) RecordDataConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped) var sqlArgs []interface{} @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } func (em *ErrorManager) RecordIndexConflictError( @@ -290,17 +297,20 @@ func (em *ErrorManager) RecordIndexConflictError( conflictInfos []DataConflictInfo, rawHandles, rawRows [][]byte, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -308,7 +318,7 @@ func (em *ErrorManager) RecordIndexConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped) var sqlArgs []interface{} @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } // ResolveAllConflictKeys query all conflicting rows (handle and their diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..93582d5178139 --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER PRIMARY KEY, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl.csv b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv new file mode 100644 index 0000000000000..021f6bbf7be1c --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv @@ -0,0 +1,16 @@ +id,val1 +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" +6,"aaa06" +7,"aaa07" +8,"aaa08" +9,"aaa09" +10,"aaa10" +1,"bbb01" +2,"bbb02" +3,"bbb03" +4,"bbb04" +5,"bbb05" diff --git a/br/tests/lightning_config_max_error/err_config.toml b/br/tests/lightning_config_max_error/err_config.toml new file mode 100644 index 0000000000000..79447e685a8f5 --- /dev/null +++ b/br/tests/lightning_config_max_error/err_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 4 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config.toml b/br/tests/lightning_config_max_error/normal_config.toml new file mode 100644 index 0000000000000..92e08739fe04a --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 20 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config_old_style.toml b/br/tests/lightning_config_max_error/normal_config_old_style.toml new file mode 100644 index 0000000000000..fe402d071f5e0 --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config_old_style.toml @@ -0,0 +1,8 @@ +[lightning] +max-error = 0 # this actually sets the type error + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/run.sh b/br/tests/lightning_config_max_error/run.sh new file mode 100755 index 0000000000000..1d850ae55f0d8 --- /dev/null +++ b/br/tests/lightning_config_max_error/run.sh @@ -0,0 +1,81 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 4 0 0 'local backend' || exit 0 + +mydir=$(dirname "${BASH_SOURCE[0]}") + +data_file="${mydir}/data/mytest.testtbl.csv" + +total_row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo ) +uniq_row_count=$( sed '1d' "${data_file}" | awk -F, '{print $1}' | sort | uniq -c | awk '{print $1}' | grep -c '1' | xargs echo ) +duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} )) + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +stderr_file="/tmp/${TEST_NAME}.stderr" + +set +e +if run_lightning --backend local --config "${mydir}/err_config.toml" 2> "${stderr_file}"; then + echo "The lightning import doesn't fail as expected" >&2 + exit 1 +fi +set -e + +err_msg=$( cat << EOF +tidb lightning encountered error: collect local duplicate rows failed: The number of conflict errors exceeds the threshold configured by \`max-error.conflict\`: '4' +EOF +) +cat "${stderr_file}" +grep -q "${err_msg}" "${stderr_file}" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +# Although conflict error number exceeds the max-error limit, +# all the conflict errors are recorded, +# because recording of conflict errors are executed batch by batch (batch size 1024), +# this batch of conflict errors are all recorded +check_contains "COUNT(*): ${duplicated_row_count}" + +# import a second time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}" + +# import a third time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config_old_style.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}"