diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go index a6aeed51173..dcdfd522d93 100644 --- a/cdc/puller/sorter/backend_pool.go +++ b/cdc/puller/sorter/backend_pool.go @@ -24,7 +24,6 @@ import ( "time" "unsafe" - "github.com/mackerelio/go-osstat/memory" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -32,6 +31,7 @@ import ( cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filelock" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) @@ -96,6 +96,13 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr) metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr) + // TODO: The underlaying implementation only recognizes cgroups set by + // containers, we need to support cgroups set by systemd or manually. + // See https://github.com/pingcap/tidb/issues/22132 + totalMemory, err := memory.MemTotal() + if err != nil { + log.Panic("read memory stat failed", zap.Error(err)) + } for { select { case <-ret.cancelCh: @@ -109,14 +116,8 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { metricSorterOpenFileCountGauge.Set(float64(atomic.LoadInt64(&openFDCount))) // update memPressure - m, err := memory.Get() - - failpoint.Inject("getMemoryPressureFails", func() { - m = nil - err = errors.New("injected get memory pressure failure") - }) - - if err != nil { + usedMemory, err := memory.MemUsed() + if err != nil || totalMemory == 0 { failpoint.Inject("sorterDebug", func() { log.Panic("unified sorter: getting system memory usage failed", zap.Error(err)) }) @@ -127,7 +128,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { // encountered, we can fail gracefully. atomic.StoreInt32(&ret.memPressure, 100) } else { - memPressure := m.Used * 100 / m.Total + memPressure := usedMemory * 100 / totalMemory atomic.StoreInt32(&ret.memPressure, int32(memPressure)) } diff --git a/cdc/puller/sorter/backend_pool_test.go b/cdc/puller/sorter/backend_pool_test.go index b7c94b4378d..81bef2a4c90 100644 --- a/cdc/puller/sorter/backend_pool_test.go +++ b/cdc/puller/sorter/backend_pool_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filelock" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/util/memory" ) type backendPoolSuite struct{} @@ -325,9 +326,11 @@ func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) { func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) { defer testleak.AfterTest(c)() - err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails", "return(true)") - c.Assert(err, check.IsNil) - defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails") //nolint:errcheck + origin := memory.MemTotal + defer func() { + memory.MemTotal = origin + }() + memory.MemTotal = func() (uint64, error) { return 0, nil } dir := c.MkDir() backEndPool, err := newBackEndPool(dir, "") @@ -336,7 +339,7 @@ func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) { defer backEndPool.terminate() after := time.After(time.Second * 20) - tick := time.Tick(time.Second * 1) + tick := time.Tick(time.Millisecond * 100) for { select { case <-after: diff --git a/go.mod b/go.mod index e992571edc5..74cfec52527 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,6 @@ require ( github.com/klauspost/compress v1.11.1 // indirect github.com/lib/pq v1.3.0 // indirect github.com/linkedin/goavro/v2 v2.9.7 - github.com/mackerelio/go-osstat v0.1.0 github.com/mattn/go-shellwords v1.0.3 github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect github.com/onsi/ginkgo v1.9.0 // indirect diff --git a/go.sum b/go.sum index 8eda388b104..da8d8219b21 100644 --- a/go.sum +++ b/go.sum @@ -423,8 +423,6 @@ github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM= github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= -github.com/mackerelio/go-osstat v0.1.0 h1:e57QHeHob8kKJ5FhcXGdzx5O6Ktuc5RHMDIkeqhgkFA= -github.com/mackerelio/go-osstat v0.1.0/go.mod h1:1K3NeYLhMHPvzUu+ePYXtoB58wkaRpxZsGClZBJyIFw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -876,7 +874,6 @@ golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190410235845-0ad05ae3009d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=