From bfc880bd8a7f7550c79a7b49be50b0096d1b1523 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 24 Nov 2022 18:04:19 +0800 Subject: [PATCH] Memdb tracker (#3) * add a memdb memory tracker Signed-off-by: ekexium * update deps * fix test Signed-off-by: ekexium * remove the fake tracking of memdb in insert Signed-off-by: ekexium * simplify the signature Signed-off-by: ekexium * fix conflict Signed-off-by: ekexium * fix conflict Signed-off-by: ekexium * fix: move the tracker out of TxnInfo Signed-off-by: ekexium * refactor: use switch-default Signed-off-by: ekexium * fix: only reset session tracker when detach SQLText tracker Signed-off-by: ekexium * test: fix TestTiDBTrx Signed-off-by: ekexium * update client-go Signed-off-by: ekexium Signed-off-by: ekexium Signed-off-by: ekexium Co-authored-by: ekexium Co-authored-by: Weizhen Wang Co-authored-by: Ti Chi Robot --- DEPS.bzl | 8 +++---- br/pkg/streamhelper/basic_lib_for_test.go | 4 ++++ executor/delete.go | 8 +------ executor/executor_test.go | 3 +-- executor/infoschema_reader.go | 12 +++++++++- executor/insert.go | 2 -- executor/join_test.go | 6 ++--- executor/write.go | 8 +------ go.mod | 4 ++-- go.sum | 8 +++---- infoschema/tables_test.go | 11 +++++---- kv/interface_mock_test.go | 8 +++++++ kv/kv.go | 4 ++++ session/session.go | 17 ++++++++++++++ session/txn.go | 24 +++++++++++++++----- session/txninfo/txn_info.go | 5 ---- sessionctx/variable/session.go | 6 +++-- sessiontxn/isolation/base.go | 4 ++++ tests/realtikvtest/txntest/txn_state_test.go | 20 ++++++++++++++-- util/memory/tracker.go | 7 ++++-- 20 files changed, 115 insertions(+), 54 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index c9d495cb6e17e..0408dac032a43 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2915,8 +2915,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=", - version = "v0.0.0-20221114102356-3debb6820e46", + sum = "h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=", + version = "v0.0.0-20221117075110-51120697d051", ) go_repository( name = "com_github_pingcap_log", @@ -3519,8 +3519,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=", - version = "v2.0.3-0.20221108030801-9c0835c80eba", + sum = "h1:G44ccTqXvE3uZgA+8Y71RQmw/1gsst+wXtn2+qw5ykI=", + version = "v2.0.3-0.20221124031013-92f0a82e1a9f", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 9e438c32f0f1f..7f9aa0daa9f64 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge return resp, nil } +func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { + panic("unimplemented") +} + // RegionScan gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) { diff --git a/executor/delete.go b/executor/delete.go index 979db825dfb66..9e64a47669f9f 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -234,12 +234,7 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error { } func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handle, data []types.Datum) error { - txnState, err := e.ctx.Txn(false) - if err != nil { - return err - } - memUsageOfTxnState := txnState.Size() - err = t.RemoveRecord(ctx, h, data) + err := t.RemoveRecord(ctx, h, data) if err != nil { return err } @@ -247,7 +242,6 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl if err != nil { return err } - e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState)) ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) return nil } diff --git a/executor/executor_test.go b/executor/executor_test.go index 2dedfafb79170..5e8bb71daca6a 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6237,8 +6237,7 @@ func TestSessionRootTrackerDetach(t *testing.T) { tk.MustExec("create table t(a int, b int, index idx(a))") tk.MustExec("create table t1(a int, c int, index idx(a))") tk.MustExec("set tidb_mem_quota_query=10") - err := tk.ExecToErr("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a") - require.Contains(t, err.Error(), "Out Of Memory Quota!") + tk.MustContainErrMsg("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a", "Out Of Memory Quota!") tk.MustExec("set tidb_mem_quota_query=1000") rs, err := tk.Exec("select /*+hash_join(t1)*/ t.a, t1.a from t use index(idx), t1 use index(idx) where t.a = t1.a") require.NoError(t, err) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index b3b881c1f65fa..1faf10d490856 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -2607,7 +2607,17 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co row = append(row, types.NewDatum(nil)) } } else { - row = append(row, e.txnInfo[i].ToDatum(c.Name.O)) + switch c.Name.O { + case txninfo.MemBufferBytesStr: + memDBFootprint := sctx.GetSessionVars().MemDBFootprint + var bytesConsumed int64 + if memDBFootprint != nil { + bytesConsumed = memDBFootprint.BytesConsumed() + } + row = append(row, types.NewDatum(bytesConsumed)) + default: + row = append(row, e.txnInfo[i].ToDatum(c.Name.O)) + } } } res = append(res, row) diff --git a/executor/insert.go b/executor/insert.go index 9b286297351b9..2450f0a117f70 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -70,7 +70,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return err } setOptionForTopSQL(sessVars.StmtCtx, txn) - txnSize := txn.Size() sessVars.StmtCtx.AddRecordRows(uint64(len(rows))) // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. // For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in @@ -113,7 +112,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { e.stats.CheckInsertTime += time.Since(start) } } - e.memTracker.Consume(int64(txn.Size() - txnSize)) return nil } diff --git a/executor/join_test.go b/executor/join_test.go index be006d7dc0063..5daa60d9a595f 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2307,16 +2307,14 @@ func TestIssue18070(t *testing.T) { tk.MustExec("insert into t1 values(1),(2)") tk.MustExec("insert into t2 values(1),(1),(2),(2)") tk.MustExec("set @@tidb_mem_quota_query=1000") - err := tk.QueryToErr("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") - require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) + tk.MustContainErrMsg("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;", "Out Of Memory Quota!") fpName := "github.com/pingcap/tidb/executor/mockIndexMergeJoinOOMPanic" require.NoError(t, failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`)) defer func() { require.NoError(t, failpoint.Disable(fpName)) }() - err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") - require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) + tk.MustContainErrMsg("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;", "Out Of Memory Quota!") } func TestIssue18564(t *testing.T) { diff --git a/executor/write.go b/executor/write.go index 01359b56a2571..4c36884cc570d 100644 --- a/executor/write.go +++ b/executor/write.go @@ -57,12 +57,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - txn, err := sctx.Txn(false) - if err != nil { - return false, err - } - memUsageOfTxnState := txn.Size() - defer memTracker.Consume(int64(txn.Size() - memUsageOfTxnState)) sc := sctx.GetSessionVars().StmtCtx changed, handleChanged := false, false // onUpdateSpecified is for "UPDATE SET ts_field = old_value", the @@ -207,7 +201,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old } } else { // Update record to new value and update index. - if err = t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil { + if err := t.UpdateRecord(ctx, sctx, h, oldData, newData, modified); err != nil { if terr, ok := errors.Cause(err).(*terror.Error); sctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue { return false, nil } diff --git a/go.mod b/go.mod index 2cd460e8279c3..cc644fcd4c6d9 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 + github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -86,7 +86,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba + github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index bdc73a0d74ddc..6fa6dc83cbf7a 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4= -github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM= +github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4= -github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba/go.mod h1:X9s4ct/MLk1sFqe5mU79KClKegLFDTa/FCx3hzexGtk= +github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f h1:G44ccTqXvE3uZgA+8Y71RQmw/1gsst+wXtn2+qw5ykI= +github.com/tikv/client-go/v2 v2.0.3-0.20221124031013-92f0a82e1a9f/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index fbca6862493e9..dbcb0f8da62b7 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -588,12 +588,12 @@ INSERT INTO ...; defer func() { require.NoError(t, os.Remove(slowLogFileName)) }() tk := testkit.NewTestKit(t, store) - //check schema + // check schema tk.MustQuery(`select COUNT(*) from information_schema.columns WHERE table_name = 'slow_query' and column_name = '` + columnName + `'`). Check(testkit.Rows("1")) - //check select + // check select tk.MustQuery(`select ` + columnName + ` from information_schema.slow_query`).Check(testkit.Rows("1")) } @@ -1393,16 +1393,19 @@ func TestTiDBTrx(t *testing.T) { tk.MustExec("update test_tidb_trx set i = i + 1") _, digest := parser.NormalizeDigest("update test_tidb_trx set i = i + 1") sm := &testkit.MockSessionManager{TxnInfo: make([]*txninfo.TxnInfo, 2)} + memDBTracker := memory.NewTracker(memory.LabelForMemDB, -1) + memDBTracker.Consume(19) + tk.Session().GetSessionVars().MemDBFootprint = memDBTracker sm.TxnInfo[0] = &txninfo.TxnInfo{ StartTS: 424768545227014155, CurrentSQLDigest: digest.String(), State: txninfo.TxnIdle, EntriesCount: 1, - EntriesSize: 19, ConnectionID: 2, Username: "root", CurrentDB: "test", } + blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local) sm.TxnInfo[1] = &txninfo.TxnInfo{ StartTS: 425070846483628033, @@ -1419,7 +1422,7 @@ func TestTiDBTrx(t *testing.T) { tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows( "424768545227014155 2021-05-07 12:56:48.001000 "+digest.String()+" update `test_tidb_trx` set `i` = `i` + ? Idle 1 19 2 root test [] ", - "425070846483628033 2021-05-20 21:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1 [\"sql1\",\"sql2\",\""+digest.String()+"\"] ")) + "425070846483628033 2021-05-20 21:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 19 10 user1 db1 [\"sql1\",\"sql2\",\""+digest.String()+"\"] ")) // Test the all_sql_digests column can be directly passed to the tidb_decode_sql_digests function. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/sqlDigestRetrieverSkipRetrieveGlobal", "return")) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 8090463c84223..164e777c6ef4a 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -161,6 +161,14 @@ func (t *mockTxn) UpdateMemBufferFlags(_ []byte, _ ...FlagsOp) { } +func (t *mockTxn) SetMemoryFootprintChangeHook(func(uint64)) { + +} + +func (t *mockTxn) Mem() uint64 { + return 0 +} + // newMockTxn new a mockTxn. func newMockTxn() Transaction { return &mockTxn{ diff --git a/kv/kv.go b/kv/kv.go index 06e86f41659cb..72e8111f0343d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -203,6 +203,10 @@ type Transaction interface { AssertionProto // Size returns sum of keys and values length. Size() int + // Mem returns the memory consumption of the transaction. + Mem() uint64 + // SetMemoryFootprintChangeHook sets the hook that will be called when the memory footprint changes. + SetMemoryFootprintChangeHook(func(uint64)) // Len returns the number of entries in the DB. Len() int // Reset reset the Transaction to initial states. diff --git a/session/session.go b/session/session.go index 51a3e22d39aac..f0a428fca5d04 100644 --- a/session/session.go +++ b/session/session.go @@ -93,6 +93,7 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/logutil/consistency" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" @@ -569,6 +570,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { return fields, nil } +// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only* func (s *session) TxnInfo() *txninfo.TxnInfo { s.txn.mu.RLock() // Copy on read to get a snapshot, this API shouldn't be frequently called. @@ -2506,6 +2508,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { return &s.txn, nil } _, err := sessiontxn.GetTxnManager(s).ActivateTxn() + s.SetMemoryFootprintChangeHook() return &s.txn, err } @@ -3659,6 +3662,20 @@ func (s *session) GetStmtStats() *stmtstats.StatementStats { return s.stmtStats } +// SetMemoryFootprintChangeHook sets the hook that is called when the memdb changes its size. +// Call this after s.txn becomes valid, since TxnInfo is initialized when the txn becomes valid. +func (s *session) SetMemoryFootprintChangeHook() { + hook := func(mem uint64) { + if s.sessionVars.MemDBFootprint == nil { + tracker := memory.NewTracker(memory.LabelForMemDB, -1) + tracker.AttachTo(s.sessionVars.MemTracker) + s.sessionVars.MemDBFootprint = tracker + } + s.sessionVars.MemDBFootprint.ReplaceBytesUsed(int64(mem)) + } + s.txn.SetMemoryFootprintChangeHook(hook) +} + // EncodeSessionStates implements SessionStatesHandler.EncodeSessionStates interface. func (s *session) EncodeSessionStates(ctx context.Context, sctx sessionctx.Context, sessionStates *sessionstates.SessionStates) error { // Transaction status is hard to encode, so we do not support it. diff --git a/session/txn.go b/session/txn.go index 18464073da2d1..85f77f8078679 100644 --- a/session/txn.go +++ b/session/txn.go @@ -151,7 +151,6 @@ func (txn *LazyTxn) cleanupStmtBuf() { txn.mu.Lock() defer txn.mu.Unlock() txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) - txn.mu.TxnInfo.EntriesSize = uint64(txn.Transaction.Size()) } // resetTxnInfo resets the transaction info. @@ -159,8 +158,7 @@ func (txn *LazyTxn) cleanupStmtBuf() { func (txn *LazyTxn) resetTxnInfo( startTS uint64, state txninfo.TxnRunningState, - entriesCount, - entriesSize uint64, + entriesCount uint64, currentSQLDigest string, allSQLDigests []string, ) { @@ -178,7 +176,7 @@ func (txn *LazyTxn) resetTxnInfo( txninfo.TxnStatusEnteringCounter(state).Inc() txn.mu.TxnInfo.LastStateChangeTime = time.Now() txn.mu.TxnInfo.EntriesCount = entriesCount - txn.mu.TxnInfo.EntriesSize = entriesSize + txn.mu.TxnInfo.CurrentSQLDigest = currentSQLDigest txn.mu.TxnInfo.AllSQLDigests = allSQLDigests } @@ -191,6 +189,22 @@ func (txn *LazyTxn) Size() int { return txn.Transaction.Size() } +// Mem implements the MemBuffer interface. +func (txn *LazyTxn) Mem() uint64 { + if txn.Transaction == nil { + return 0 + } + return txn.Transaction.Mem() +} + +// SetMemoryFootprintChangeHook sets the hook to be called when the memory footprint of this transaction changes. +func (txn *LazyTxn) SetMemoryFootprintChangeHook(hook func(uint64)) { + if txn.Transaction == nil { + return + } + txn.Transaction.SetMemoryFootprintChangeHook(hook) +} + // Valid implements the kv.Transaction interface. func (txn *LazyTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() @@ -275,7 +289,6 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { t.StartTS(), txninfo.TxnIdle, uint64(txn.Transaction.Len()), - uint64(txn.Transaction.Size()), txn.mu.TxnInfo.CurrentSQLDigest, txn.mu.TxnInfo.AllSQLDigests) @@ -433,7 +446,6 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k txn.updateState(originState) txn.mu.TxnInfo.BlockStartTime.Valid = false txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) - txn.mu.TxnInfo.EntriesSize = uint64(txn.Transaction.Size()) return err } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 640e16474b4bd..31e4d338eb623 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -161,8 +161,6 @@ type TxnInfo struct { } // How many entries are in MemDB EntriesCount uint64 - // MemDB used memory - EntriesSize uint64 // The following fields will be filled in `session` instead of `LazyTxn` @@ -208,9 +206,6 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{ MemBufferKeysStr: func(info *TxnInfo) types.Datum { return types.NewDatum(info.EntriesCount) }, - MemBufferBytesStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.EntriesSize) - }, SessionIDStr: func(info *TxnInfo) types.Datum { return types.NewDatum(info.ConnectionID) }, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b8fbcf54848e1..405503a1cf5cb 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1290,8 +1290,10 @@ type SessionVars struct { HookContext // MemTracker indicates the memory tracker of current session. - MemTracker *memory.Tracker - DiskTracker *memory.Tracker + MemTracker *memory.Tracker + // MemDBDBFootprint tracks the memory footprint of memdb, and is attached to `MemTracker` + MemDBFootprint *memory.Tracker + DiskTracker *memory.Tracker // OptPrefixIndexSingleScan indicates whether to do some optimizations to avoid double scan for prefix index. // When set to true, `col is (not) null`(`col` is index prefix column) is regarded as index filter rather than table filter. diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index eeac646675bdf..97c6abfc35081 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -267,6 +267,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { sessVars := p.sctx.GetSessionVars() sessVars.TxnCtx.StartTS = txn.StartTS() + if sessVars.MemDBFootprint != nil { + sessVars.MemDBFootprint.Detach() + } + sessVars.MemDBFootprint = nil if p.enterNewTxnType == sessiontxn.EnterNewTxnBeforeStmt && !sessVars.IsAutocommit() && sessVars.SnapshotTS == 0 { sessVars.SetInTxn(true) diff --git a/tests/realtikvtest/txntest/txn_state_test.go b/tests/realtikvtest/txntest/txn_state_test.go index 59049dd129151..b8a687a074964 100644 --- a/tests/realtikvtest/txntest/txn_state_test.go +++ b/tests/realtikvtest/txntest/txn_state_test.go @@ -128,14 +128,30 @@ func TestEntriesCountAndSize(t *testing.T) { tk.MustExec("insert into t(a) values (1);") info := tk.Session().TxnInfo() require.Equal(t, uint64(1), info.EntriesCount) - require.Equal(t, uint64(29), info.EntriesSize) tk.MustExec("insert into t(a) values (2);") info = tk.Session().TxnInfo() require.Equal(t, uint64(2), info.EntriesCount) - require.Equal(t, uint64(58), info.EntriesSize) tk.MustExec("commit;") } +func TestMemDBTracker(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + session := tk.Session() + tk.MustExec("use test") + tk.MustExec("create table t (id int)") + tk.MustExec("begin") + for i := 0; i < (1 << 10); i++ { + tk.MustExec("insert t (id) values (1)") + } + require.Less(t, int64(1<<(10+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed()) + require.Greater(t, int64(1<<(14+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed()) + for i := 0; i < (1 << 14); i++ { + tk.MustExec("insert t (id) values (1)") + } + require.Less(t, int64(1<<(14+4)), session.GetSessionVars().MemDBFootprint.BytesConsumed()) +} + func TestRunning(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 179aa66087497..b9553aa26a505 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -84,7 +84,8 @@ type Tracker struct { parent *Tracker // The parent memory tracker. sync.Mutex } - label int // Label of this "Tracker". + label int // Label of this "Tracker". + // following fields are used with atomic operations, so make them 64-byte aligned. bytesConsumed int64 // Consumed bytes. bytesReleased int64 // Released bytes. maxConsumed atomicutil.Int64 // max number of bytes consumed during execution. @@ -306,7 +307,7 @@ func (t *Tracker) Detach() { t.DetachFromGlobalTracker() return } - if parent.IsRootTrackerOfSess { + if parent.IsRootTrackerOfSess && t.label == LabelForSQLText { parent.actionMuForHardLimit.Lock() parent.actionMuForHardLimit.actionOnExceed = nil parent.actionMuForHardLimit.Unlock() @@ -822,6 +823,8 @@ const ( LabelForPreparedPlanCache int = -26 // LabelForSession represents the label of a session. LabelForSession int = -27 + // LabelForMemDB represents the label of the MemDB + LabelForMemDB int = -28 ) // MetricsTypes is used to get label for metrics