Skip to content

Commit

Permalink
cdc/sorter: make unified sorter cgroup aware (#3436) (#3437)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 16, 2021
1 parent 0d3be18 commit e8fb135
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
21 changes: 11 additions & 10 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"time"
"unsafe"

"github.com/mackerelio/go-osstat/memory"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filelock"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -96,6 +96,13 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr)
metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr)

// TODO: The underlaying implementation only recognizes cgroups set by
// containers, we need to support cgroups set by systemd or manually.
// See https://github.com/pingcap/tidb/issues/22132
totalMemory, err := memory.MemTotal()
if err != nil {
log.Panic("read memory stat failed", zap.Error(err))
}
for {
select {
case <-ret.cancelCh:
Expand All @@ -109,14 +116,8 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
metricSorterOpenFileCountGauge.Set(float64(atomic.LoadInt64(&openFDCount)))

// update memPressure
m, err := memory.Get()

failpoint.Inject("getMemoryPressureFails", func() {
m = nil
err = errors.New("injected get memory pressure failure")
})

if err != nil {
usedMemory, err := memory.MemUsed()
if err != nil || totalMemory == 0 {
failpoint.Inject("sorterDebug", func() {
log.Panic("unified sorter: getting system memory usage failed", zap.Error(err))
})
Expand All @@ -127,7 +128,7 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
// encountered, we can fail gracefully.
atomic.StoreInt32(&ret.memPressure, 100)
} else {
memPressure := m.Used * 100 / m.Total
memPressure := usedMemory * 100 / totalMemory
atomic.StoreInt32(&ret.memPressure, int32(memPressure))
}

Expand Down
11 changes: 7 additions & 4 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filelock"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tidb/util/memory"
)

type backendPoolSuite struct{}
Expand Down Expand Up @@ -325,9 +326,11 @@ func (s *backendPoolSuite) TestCleanUpStaleLockNoPermission(c *check.C) {
func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) {
defer testleak.AfterTest(c)()

err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails", "return(true)")
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/getMemoryPressureFails") //nolint:errcheck
origin := memory.MemTotal
defer func() {
memory.MemTotal = origin
}()
memory.MemTotal = func() (uint64, error) { return 0, nil }

dir := c.MkDir()
backEndPool, err := newBackEndPool(dir, "")
Expand All @@ -336,7 +339,7 @@ func (s *backendPoolSuite) TestGetMemoryPressureFailure(c *check.C) {
defer backEndPool.terminate()

after := time.After(time.Second * 20)
tick := time.Tick(time.Second * 1)
tick := time.Tick(time.Millisecond * 100)
for {
select {
case <-after:
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/klauspost/compress v1.11.1 // indirect
github.com/lib/pq v1.3.0 // indirect
github.com/linkedin/goavro/v2 v2.9.7
github.com/mackerelio/go-osstat v0.1.0
github.com/mattn/go-shellwords v1.0.3
github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect
github.com/onsi/ginkgo v1.9.0 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,6 @@ github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM=
github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/mackerelio/go-osstat v0.1.0 h1:e57QHeHob8kKJ5FhcXGdzx5O6Ktuc5RHMDIkeqhgkFA=
github.com/mackerelio/go-osstat v0.1.0/go.mod h1:1K3NeYLhMHPvzUu+ePYXtoB58wkaRpxZsGClZBJyIFw=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -851,7 +849,6 @@ golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190410235845-0ad05ae3009d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down

0 comments on commit e8fb135

Please sign in to comment.