Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add send-kv-size to avoid oom when each kv is large on default config #43870

Merged
merged 4 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ type BackendConfig struct {
// compress type when write or ingest into tikv
ConnCompressType config.CompressionType
// concurrency of generateJobForRange and import(write & ingest) workers
WorkerConcurrency int
KVWriteBatchSize int
WorkerConcurrency int
// batch kv count and size when writing to TiKV
KVWriteBatchCount int
KVWriteBatchSize int64
RegionSplitBatchSize int
RegionSplitConcurrency int
CheckpointEnabled bool
Expand Down Expand Up @@ -430,7 +432,8 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: cfg.TikvImporter.SendKVPairs,
KVWriteBatchCount: cfg.TikvImporter.SendKVPairs,
KVWriteBatchSize: int64(cfg.TikvImporter.SendKVSize),
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
CheckpointEnabled: cfg.Checkpoint.Enable,
Expand Down
11 changes: 7 additions & 4 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"container/heap"
"context"
"fmt"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -177,6 +178,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {

apiVersion := local.tikvCodec.GetAPIVersion()
clientFactory := local.importClientFactory
kvBatchCount := local.KVWriteBatchCount
kvBatchSize := local.KVWriteBatchSize
bufferPool := local.bufferPool
writeLimiter := local.writeLimiter
Expand Down Expand Up @@ -254,7 +256,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {

bytesBuf := bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, kvBatchSize)
pairs := make([]*sst.Pair, 0, kvBatchCount)
count := 0
size := int64(0)
totalSize := int64(0)
Expand All @@ -265,8 +267,6 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
if j.regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = j.regionSplitSize * 4 / 3
}
// Set a lower flush limit to make the speed of write more smooth.
flushLimit := int64(writeLimiter.Limit() / 10)
sleepymole marked this conversation as resolved.
Show resolved Hide resolved

flushKVs := func() error {
for i := range clients {
Expand All @@ -278,6 +278,9 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
return annotateErr(err, allPeers[i])
}
}
failpoint.Inject("afterFlushKVs", func() {
log.FromContext(ctx).Info(fmt.Sprintf("afterFlushKVs count=%d,size=%d", count, size))
})
return nil
}

Expand Down Expand Up @@ -305,7 +308,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
size += kvSize
totalSize += kvSize

if count >= kvBatchSize || size >= flushLimit {
if count >= kvBatchCount || size >= kvBatchSize {
if err := flushKVs(); err != nil {
return errors.Trace(err)
}
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ const (
// ErrorOnDup indicates using INSERT INTO to insert data, which would violate PK or UNIQUE constraint
ErrorOnDup = "error"

KVWriteBatchSize = 32768
KVWriteBatchCount = 32768
// KVWriteBatchSize batch size when write to TiKV.
// this is the default value of linux send buffer size(net.ipv4.tcp_wmem) too.
KVWriteBatchSize = 16 * units.KiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too small considering a large row KV. Maybe we should use 4M which is the maximun value of it?

Copy link
Contributor Author

@D3Hunter D3Hunter May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it depends on how often do customer tune network parameters, if user do set to 4M
(256 * (accumulate 16k + serialize + append to linux send buffer)) + send to network compared to (accumulate 4M + serialize + append to linux send buffer) + send to network, seems no much difference, will test the default tcp_wmem + 16k/4m batch size using qa env

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i misunderstand, it works like this:

When a new TCP connection is established, a Send Buffer will be created using the default value (16KB); the buffer size will then be automatically adjusted within the maximum and minimum boundaries as needed and based on usage.

DefaultRangeConcurrency = 16

defaultDistSQLScanConcurrency = 15
Expand Down Expand Up @@ -745,6 +748,7 @@ type TikvImporter struct {
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
SendKVSize ByteSize `toml:"send-kv-size" json:"send-kv-size"`
CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
Expand Down Expand Up @@ -952,7 +956,8 @@ func NewConfig() *Config {
Backend: "",
OnDuplicate: ReplaceOnDup,
MaxKVPairs: 4096,
SendKVPairs: KVWriteBatchSize,
SendKVPairs: KVWriteBatchCount,
SendKVSize: KVWriteBatchSize,
RegionSplitSize: 0,
RegionSplitBatchSize: DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ func TestDefaultCouldBeOverwritten(t *testing.T) {
require.Equal(t, 20, cfg.App.IndexConcurrency)
require.Equal(t, 60, cfg.App.TableConcurrency)

require.Equal(t, config.KVWriteBatchCount, cfg.TikvImporter.SendKVPairs)
require.Equal(t, config.ByteSize(config.KVWriteBatchSize), cfg.TikvImporter.SendKVSize)

cfg.TikvImporter.RegionSplitConcurrency = 1
// backoff can be 0
cfg.TikvImporter.RegionCheckBackoffLimit = 0
Expand Down
6 changes: 6 additions & 0 deletions br/tests/lightning_write_batch/kv-count.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
range-concurrency = 1
send-kv-pairs = 20

[mydumper.csv]
header = false
6 changes: 6 additions & 0 deletions br/tests/lightning_write_batch/kv-size.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tikv-importer]
range-concurrency = 1
send-kv-size = "10b"

[mydumper.csv]
header = false
66 changes: 66 additions & 0 deletions br/tests/lightning_write_batch/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash
#
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

mkdir -p "$TEST_DIR/data"

run_sql "DROP DATABASE IF EXISTS test;"
run_sql "DROP TABLE IF EXISTS test.t;"

cat <<EOF >"$TEST_DIR/data/test-schema-create.sql"
CREATE DATABASE test;
EOF
cat <<EOF >"$TEST_DIR/data/test.t-schema.sql"
CREATE TABLE test.t (a varchar(1024));
EOF

#
# test send-kv-pairs
#
set +x
for i in {1..100}; do
echo "$i" >>"$TEST_DIR/data/test.t.0.csv"
done
set -x

rm -rf $TEST_DIR/lightning.log
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/afterFlushKVs=return(true)"
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/kv-count.toml"
check_contains 'afterFlushKVs count=20,' $TEST_DIR/lightning.log
check_not_contains 'afterFlushKVs count=1,' $TEST_DIR/lightning.log
check_contains 'send-kv-pairs\":20,' $TEST_DIR/lightning.log
check_contains 'send-kv-size\":16384,' $TEST_DIR/lightning.log

#
# test send-kv-size
#
rm -rf $TEST_DIR/data/test.t.0.csv
run_sql "truncate table test.t;"
set +x
for i in {1..5}; do
echo "abcdefghijklmnopqrstuvwxyz0123456789" >>"$TEST_DIR/data/test.t.0.csv"
done
set -x

rm -rf $TEST_DIR/lightning.log
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/backend/local/afterFlushKVs=return(true)"
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/kv-size.toml"
# each kv is 64b, so each kv is a batch
check_contains 'afterFlushKVs count=1,' $TEST_DIR/lightning.log
check_not_contains 'afterFlushKVs count=20,' $TEST_DIR/lightning.log
check_contains 'send-kv-pairs\":32768,' $TEST_DIR/lightning.log
check_contains 'send-kv-size\":10,' $TEST_DIR/lightning.log
1 change: 1 addition & 0 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableIm
MaxConnPerStore: config.DefaultRangeConcurrency,
ConnCompressType: config.CompressionNone,
WorkerConcurrency: config.DefaultRangeConcurrency * 2,
KVWriteBatchCount: config.KVWriteBatchCount,
KVWriteBatchSize: config.KVWriteBatchSize,
RegionSplitBatchSize: config.DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
Expand Down