Skip to content

Commit

Permalink
store/copr: optimize copIterator by avoid start new goroutine (#57522)
Browse files Browse the repository at this point in the history
ref #56649
  • Loading branch information
crazycs520 authored Dec 17, 2024
1 parent aec0fc5 commit 0ccee0e
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 145 deletions.
3 changes: 3 additions & 0 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type DistSQLContext struct {
SessionAlias string

ExecDetails *execdetails.SyncExecDetails

// Only one cop-reader can use lite worker. Using lite-worker in multiple readers will affect the concurrent execution of readers.
TryCopLiteWorker uint32
}

// AppendWarning appends the warning to the warning handler.
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestContextDetach(t *testing.T) {
ReplicaClosestReadThreshold: 1,
ConnectionID: 1,
SessionAlias: "c",
TryCopLiteWorker: 1,
}

obj.AppendWarning(errors.New("test warning"))
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func Select(ctx context.Context, dctx *distsqlctx.DistSQLContext, kvReq *kv.Requ
EnabledRateLimitAction: enabledRateLimitAction,
EventCb: eventCb,
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),
TryCopLiteWorker: &dctx.TryCopLiteWorker,
}

if kvReq.StoreType == kv.TiFlash {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ type ClientSendOption struct {
EnableCollectExecutionInfo bool
TiFlashReplicaRead tiflash.ReplicaRead
AppendWarning func(warn error)
TryCopLiteWorker *uint32
}

// ReqTypes.
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/copr/copr_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/kv",
"//pkg/resourcegroup/runaway",
"//pkg/store/copr",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/testkit/testmain",
"//pkg/testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/meta_storagepb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_stretchr_testify//require",
Expand Down
35 changes: 35 additions & 0 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/resourcegroup/runaway"
"github.com/pingcap/tidb/pkg/store/copr"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -287,3 +290,35 @@ func TestBuildCopIteratorWithRunawayChecker(t *testing.T) {
require.Equal(t, concurrency, 1)
require.Equal(t, smallTaskConcurrency, 0)
}

func TestQueryWithConcurrentSmallCop(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int, c int, index idx_b(b)) partition by hash(id) partitions 10;")
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values (%v, %v, %v)", i, i, i))
}
tk.MustExec("create table t2 (id bigint unsigned key, b int, index idx_b (b));")
tk.MustExec("insert into t2 values (1,1), (18446744073709551615,2)")
tk.MustExec("set @@tidb_distsql_scan_concurrency=15")
tk.MustExec("set @@tidb_executor_concurrency=15")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop", `return(100)`))
// Test for https://github.com/pingcap/tidb/pull/57522#discussion_r1875515863
start := time.Now()
tk.MustQuery("select sum(c) from t1 use index (idx_b) where b < 10;")
require.Less(t, time.Since(start), time.Millisecond*250)
// Test for index reader with partition table
start = time.Now()
tk.MustQuery("select id, b from t1 use index (idx_b) where b < 10;")
require.Less(t, time.Since(start), time.Millisecond*150)
// Test for table reader with partition table.
start = time.Now()
tk.MustQuery("select * from t1 where c < 10;")
require.Less(t, time.Since(start), time.Millisecond*150)
// // Test for table reader with 2 parts ranges.
start = time.Now()
tk.MustQuery("select * from t2 where id >= 1 and id <= 18446744073709551615 order by id;")
require.Less(t, time.Since(start), time.Millisecond*150)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop"))
}
Loading

0 comments on commit 0ccee0e

Please sign in to comment.