From e016371ddaaf39bce209203e1389f0f58bf0fda7 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 11 Nov 2021 17:17:35 +0800 Subject: [PATCH 1/4] sorter: make unified sorter cgroup aware It fixes an OOM issue where memory limited by cgroup is smaller than physical memory. Note this patch only recognizes container cgroup, it may still OOM when the cgroup is set by systemd or manually. Signed-off-by: Neil Shen --- cdc/sorter/unified/backend_pool.go | 15 +++++++++------ go.mod | 1 - go.sum | 5 +---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index f2c85520d4f..14ba79a1427 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -24,7 +24,6 @@ import ( "time" "unsafe" - "github.com/mackerelio/go-osstat/memory" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -34,6 +33,7 @@ import ( cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filelock" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) @@ -99,6 +99,12 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { metricSorterOnDiskDataSizeGauge := sorter.OnDiskDataSizeGauge.WithLabelValues(captureAddr, id) metricSorterOpenFileCountGauge := sorter.OpenFileCountGauge.WithLabelValues(captureAddr, id) + // TODO: The underlaying implementation only recognizes cgroups set by + // containers, we need to support cgroups set by systemd or manually. + totalMemory, err := memory.MemTotal() + if err != nil { + log.Panic("read memory stat failed", zap.Error(err)) + } for { select { case <-ret.cancelCh: @@ -112,13 +118,10 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { metricSorterOpenFileCountGauge.Set(float64(atomic.LoadInt64(&openFDCount))) // update memPressure - m, err := memory.Get() - + usedMemory, err := memory.MemUsed() failpoint.Inject("getMemoryPressureFails", func() { - m = nil err = errors.New("injected get memory pressure failure") }) - if err != nil { failpoint.Inject("sorterDebug", func() { log.Panic("unified sorter: getting system memory usage failed", zap.Error(err)) @@ -130,7 +133,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { // encountered, we can fail gracefully. atomic.StoreInt32(&ret.memPressure, 100) } else { - memPressure := m.Used * 100 / m.Total + memPressure := usedMemory * 100 / totalMemory atomic.StoreInt32(&ret.memPressure, int32(memPressure)) } diff --git a/go.mod b/go.mod index ea7c09075c5..04c509adb5b 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,6 @@ require ( github.com/labstack/echo/v4 v4.6.1 github.com/lib/pq v1.3.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 - github.com/mackerelio/go-osstat v0.1.0 github.com/mattn/go-colorable v0.1.11 // indirect github.com/mattn/go-shellwords v1.0.3 github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect diff --git a/go.sum b/go.sum index b9ea6b4f78b..c6b8f0686c7 100644 --- a/go.sum +++ b/go.sum @@ -619,8 +619,6 @@ github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= -github.com/mackerelio/go-osstat v0.1.0 h1:e57QHeHob8kKJ5FhcXGdzx5O6Ktuc5RHMDIkeqhgkFA= -github.com/mackerelio/go-osstat v0.1.0/go.mod h1:1K3NeYLhMHPvzUu+ePYXtoB58wkaRpxZsGClZBJyIFw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -1227,7 +1225,6 @@ golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190410235845-0ad05ae3009d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1267,8 +1264,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= From 5545df0a8e5dc4743160708bb850cda1991404c8 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 12 Nov 2021 18:25:45 +0800 Subject: [PATCH 2/4] Apply suggestions from code review --- cdc/sorter/unified/backend_pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index 14ba79a1427..b392f800a5a 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -101,6 +101,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { // TODO: The underlaying implementation only recognizes cgroups set by // containers, we need to support cgroups set by systemd or manually. + // See https://github.com/pingcap/tidb/issues/22132 totalMemory, err := memory.MemTotal() if err != nil { log.Panic("read memory stat failed", zap.Error(err)) From 1a1fbce7d4ffc313c65d4d6f900feec61fd8699d Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sat, 13 Nov 2021 00:11:02 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: Neil Shen --- cdc/sorter/unified/backend_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index 14ba79a1427..82a6f519e20 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -122,7 +122,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { failpoint.Inject("getMemoryPressureFails", func() { err = errors.New("injected get memory pressure failure") }) - if err != nil { + if err != nil || totalMemory == 0 { failpoint.Inject("sorterDebug", func() { log.Panic("unified sorter: getting system memory usage failed", zap.Error(err)) }) From 6402a6a9f6508178132c910a6de7cfc870c1817e Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sat, 13 Nov 2021 00:20:29 +0800 Subject: [PATCH 4/4] remove failpoint Signed-off-by: Neil Shen --- cdc/sorter/unified/backend_pool.go | 3 --- cdc/sorter/unified/backend_pool_test.go | 11 +++++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index 82a6f519e20..1429b733bca 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -119,9 +119,6 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { // update memPressure usedMemory, err := memory.MemUsed() - failpoint.Inject("getMemoryPressureFails", func() { - err = errors.New("injected get memory pressure failure") - }) if err != nil || totalMemory == 0 { failpoint.Inject("sorterDebug", func() { log.Panic("unified sorter: getting system memory usage failed", zap.Error(err)) diff --git a/cdc/sorter/unified/backend_pool_test.go b/cdc/sorter/unified/backend_pool_test.go index 3998eb7c456..e616a23f4a2 100644 --- a/cdc/sorter/unified/backend_pool_test.go +++ b/cdc/sorter/unified/backend_pool_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filelock" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/util/memory" ) type backendPoolSuite struct{} @@ -325,9 +326,11 @@ func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) { func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) { defer testleak.AfterTest(c)() - err := failpoint.Enable("github.com/pingcap/ticdc/cdc/sorter/unified/getMemoryPressureFails", "return(true)") - c.Assert(err, check.IsNil) - defer failpoint.Disable("github.com/pingcap/ticdc/cdc/sorter/unified/getMemoryPressureFails") //nolint:errcheck + origin := memory.MemTotal + defer func() { + memory.MemTotal = origin + }() + memory.MemTotal = func() (uint64, error) { return 0, nil } dir := c.MkDir() backEndPool, err := newBackEndPool(dir, "") @@ -336,7 +339,7 @@ func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) { defer backEndPool.terminate() after := time.After(time.Second * 20) - tick := time.Tick(time.Second * 1) + tick := time.Tick(time.Millisecond * 100) for { select { case <-after: