Skip to content

Commit

Permalink
sorter: fix unified sorter magic numbers & reduce memory consumption …
Browse files Browse the repository at this point in the history
…by channels (#1915)
  • Loading branch information
liuzix authored Jun 6, 2021
1 parent 90116b2 commit fcd5552
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
fileBufferSize = 1 * 1024 * 1024 // 1MB
fileBufferSize = 32 * 1024 // 32KB
fileMagic = 0x12345678
numFileEntriesOffset = 4
blockMagic = 0xbeefbeef
Expand Down
3 changes: 2 additions & 1 deletion cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
flushRateLimitPerSecond = 10
sortHeapCapacity = 32
sortHeapInputChSize = 1024
)

type flushTask struct {
Expand Down Expand Up @@ -85,7 +86,7 @@ type heapSorter struct {
func newHeapSorter(id int, out chan *flushTask) *heapSorter {
return &heapSorter{
id: id,
inputCh: make(chan *model.PolymorphicEvent, 1024*1024),
inputCh: make(chan *model.PolymorphicEvent, sortHeapInputChSize),
outputCh: out,
heap: make(sortHeap, 0, sortHeapCapacity),
canceller: new(asyncCanceller),
Expand Down
20 changes: 15 additions & 5 deletions cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ import (
"golang.org/x/sync/errgroup"
)

const (
inputChSize = 128
outputChSize = 128
heapCollectChSize = 128 // this should be not be too small, to guarantee IO concurrency
// maxOpenHeapNum is the maximum number of allowed pending chunks in memory OR on-disk.
// This constant is a worst case upper limit, and setting a large number DOES NOT imply actually
// allocating these resources. This constant is PER TABLE.
// TODO refactor this out
maxOpenHeapNum = 1280000
)

// UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use.
type UnifiedSorter struct {
inputCh chan *model.PolymorphicEvent
Expand Down Expand Up @@ -102,8 +113,8 @@ func NewUnifiedSorter(

lazyInitWorkerPool()
return &UnifiedSorter{
inputCh: make(chan *model.PolymorphicEvent, 128),
outputCh: make(chan *model.PolymorphicEvent, 128),
inputCh: make(chan *model.PolymorphicEvent, inputChSize),
outputCh: make(chan *model.PolymorphicEvent, outputChSize),
dir: dir,
pool: pool,
metricsInfo: &metricsInfo{
Expand Down Expand Up @@ -145,7 +156,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
numConcurrentHeaps := sorterConfig.NumConcurrentWorker

errg, subctx := errgroup.WithContext(ctx)
heapSorterCollectCh := make(chan *flushTask, 4096)
heapSorterCollectCh := make(chan *flushTask, heapCollectChSize)
// mergerCleanUp will consumer the remaining elements in heapSorterCollectCh to prevent any FD leak.
defer mergerCleanUp(heapSorterCollectCh)

Expand Down Expand Up @@ -206,8 +217,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {

nextSorterID := 0
for {
// tentative value 1280000
for atomic.LoadInt64(&mergerBufLen) > 1280000 {
for atomic.LoadInt64(&mergerBufLen) > maxOpenHeapNum {
after := time.After(1 * time.Second)
select {
case <-subctx.Done():
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ var defaultServerConfig = &ServerConfig{
ProcessorFlushInterval: TomlDuration(100 * time.Millisecond),
Sorter: &SorterConfig{
NumConcurrentWorker: 4,
ChunkSizeLimit: 1024 * 1024 * 1024, // 1GB
MaxMemoryPressure: 80,
MaxMemoryConsumption: 8 * 1024 * 1024 * 1024, // 8GB
ChunkSizeLimit: 128 * 1024 * 1024, // 128MB
MaxMemoryPressure: 30, // 30% is safe on machines with memory capacity <= 16GB
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB
NumWorkerPoolGoroutine: 16,
SortDir: "/tmp/cdc_sort",
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) {
b, err := conf.Marshal()
c.Assert(err, check.IsNil)

c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`)
conf2 := new(ServerConfig)
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`))
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`))
c.Assert(err, check.IsNil)
c.Assert(conf2, check.DeepEquals, conf)
}
Expand Down

0 comments on commit fcd5552

Please sign in to comment.