Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some metrics for latancy #11

Open
wants to merge 81 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
85d1197
add log rate limit for table actor
sdojjy Sep 22, 2022
0eac0e7
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
6701809
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
5670c40
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
97db214
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
d8fff7a
Merge branch 'master' into add-log-rate-limit-for-table-actro
ti-chi-bot Sep 22, 2022
72fa8a6
Merge branch 'master' into add-log-rate-limit-for-table-actro
sdojjy Sep 28, 2022
f9e9107
add rate limiter for resolve lock
sdojjy Sep 28, 2022
19a00da
improve frontier
sdojjy Sep 29, 2022
c33dc39
improve frontier
sdojjy Sep 29, 2022
9ef6307
improve frontier
sdojjy Sep 29, 2022
69a013c
add metrics
sdojjy Sep 29, 2022
d837620
add ut
sdojjy Sep 29, 2022
39e2938
add ut
sdojjy Sep 29, 2022
253c3c0
add ut
sdojjy Sep 29, 2022
89338c2
add ut
sdojjy Sep 29, 2022
0a0e268
add ut
sdojjy Sep 29, 2022
d52391b
Merge remote-tracking branch 'upstream/master' into frointier-improve
sdojjy Sep 30, 2022
02ec537
fix lint
sdojjy Sep 30, 2022
0c3b4b0
fix lint
sdojjy Sep 30, 2022
2fffbb4
Merge branch 'master' into frointier-improve
sdojjy Sep 30, 2022
fba5f39
Merge branch 'master' into frointier-improve
sdojjy Oct 6, 2022
98f0790
fix lint
sdojjy Oct 6, 2022
603fe1c
fix lint
sdojjy Oct 6, 2022
813b72d
fix lint
sdojjy Oct 6, 2022
68bf5da
fix ut
sdojjy Oct 7, 2022
2985826
Merge branch 'master' into frointier-improve
sdojjy Oct 7, 2022
9e365db
fix ut
sdojjy Oct 8, 2022
c9ffa3d
Merge remote-tracking branch 'origin/frointier-improve' into frointie…
sdojjy Oct 8, 2022
7257dcd
add metrics
sdojjy Oct 9, 2022
aac2f4f
add metrics
sdojjy Oct 9, 2022
eefcda9
Merge remote-tracking branch 'upstream/master' into add-some-metrics-…
sdojjy Oct 10, 2022
1d2fd9c
fix ut
sdojjy Oct 10, 2022
63e88cb
fix ut
sdojjy Oct 10, 2022
87c0da2
fix ut
sdojjy Oct 10, 2022
3583192
add metrics
sdojjy Oct 10, 2022
b1106b0
change etcd worker
sdojjy Oct 11, 2022
3e48a52
change etcd worker
sdojjy Oct 12, 2022
a91999e
modify metrics
sdojjy Oct 12, 2022
597ec79
modify metrics
sdojjy Oct 12, 2022
a825eb5
modify metrics
sdojjy Oct 12, 2022
25718f3
Merge branch 'master' into add-some-metrics-for-latancy
sdojjy Oct 13, 2022
cf5a47e
reduce update resovledTs time
sdojjy Oct 13, 2022
013bb2e
add log
sdojjy Oct 14, 2022
8f8b6e3
fix lint
sdojjy Oct 14, 2022
050188b
add log
sdojjy Oct 14, 2022
7acc026
add log
sdojjy Oct 14, 2022
af23370
add log
sdojjy Oct 15, 2022
c07655b
add log to debug etcd worker
sdojjy Oct 15, 2022
4e3a6b4
add log to debug etcd worker
sdojjy Oct 17, 2022
12c2eb2
add log to debug etcd worker
sdojjy Oct 17, 2022
4dadca2
add log to debug etcd worker
sdojjy Oct 17, 2022
b692af6
add log to debug etcd worker
sdojjy Oct 17, 2022
2fc0131
Merge branch 'master' into add-some-metrics-for-latancy
sdojjy Oct 18, 2022
48c882c
Merge branch 'master' into add-some-metrics-for-latancy
sdojjy Oct 18, 2022
855994f
Merge branch 'master' into add-some-metrics-for-latancy
sdojjy Oct 18, 2022
d917bcd
remove log
sdojjy Oct 18, 2022
df6d6ce
remove log
sdojjy Oct 20, 2022
56ae910
Merge branch 'master' into add-some-metrics-for-latancy
sdojjy Oct 20, 2022
dd5679f
not wait the slowest resolved ts to forward the global resovled ts
sdojjy Oct 20, 2022
43e144d
not wait the slowest resolved ts to forward the global resovled ts
sdojjy Oct 20, 2022
9487931
not wait the slowest resolved ts to forward the global resovled ts
sdojjy Oct 20, 2022
fca6ed0
remove log
sdojjy Oct 20, 2022
915f214
not wait the slowest resolved ts to forward the global resovled ts
sdojjy Oct 20, 2022
e107c6e
remove some lock from region_state.go
sdojjy Oct 21, 2022
39b6916
Merge branch 'master' into remove-some-lock-from-regionState
sdojjy Oct 21, 2022
ba43eb0
remove some lock from region_state.go
sdojjy Oct 21, 2022
0c9e3de
remove some lock from region_state.go
sdojjy Oct 21, 2022
88ab877
remove some lock from region_state.go
sdojjy Oct 25, 2022
d408f9c
Merge branch 'master' into remove-some-lock-from-regionState
sdojjy Oct 25, 2022
cac46e0
calculate the resolved ts when forward is called
sdojjy Oct 25, 2022
3071f11
remove some lock from region_state.go
sdojjy Oct 25, 2022
5df13d9
Merge remote-tracking branch 'origin/remove-some-lock-from-regionStat…
sdojjy Oct 25, 2022
79b26dd
refactor regionTsManager
sdojjy Oct 26, 2022
a8e3d35
Merge remote-tracking branch 'upstream/master' into add-some-metrics-…
sdojjy Oct 27, 2022
fff1453
Merge remote-tracking branch 'upstream/master' into add-some-metrics-…
sdojjy Oct 27, 2022
320eed9
Merge remote-tracking branch 'upstream/master' into add-some-metrics-…
sdojjy Oct 27, 2022
0707564
Merge branch 'only-use-ddl-barrier-as-the-barrierts' into add-some-me…
sdojjy Oct 27, 2022
a2f55d4
Merge branch 'remove-some-lock-from-regionState' into add-some-metric…
sdojjy Oct 27, 2022
54f29db
Merge branch 'refactor-frontier' into add-some-metrics-for-latancy
sdojjy Oct 27, 2022
d954aa1
Merge branch 'refactor-regionTsManager' into add-some-metrics-for-lat…
sdojjy Oct 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -1105,6 +1106,8 @@ func (s *eventFeedSession) receiveFromStream(

metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricChangefeedResolvedLagGauge := changefeedResolvedTsLagGauge.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
Expand Down Expand Up @@ -1205,7 +1208,14 @@ func (s *eventFeedSession) receiveFromStream(
}
if cevent.ResolvedTs != nil {
metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions)))
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker, addr)
p, _, err := s.client.pd.GetTS(ctx)
if err == nil {
currentTs := oracle.GetTimeFromTS(oracle.ComposeTS(p, 0))
phyCkpTs := oracle.ExtractPhysical(cevent.ResolvedTs.Ts)
checkpointLag := float64(oracle.GetPhysical(currentTs)-phyCkpTs) / 1e3
metricChangefeedResolvedLagGauge.Observe(checkpointLag)
}
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker)
if err != nil {
return err
}
Expand Down Expand Up @@ -1314,13 +1324,12 @@ func (s *eventFeedSession) sendResolvedTs(
ctx context.Context,
resolvedTs *cdcpb.ResolvedTs,
worker *regionWorker,
addr string,
) error {
statefulEvents := make([]*regionStatefulEvent, worker.concurrency)
// split resolved ts
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
buffLen := len(resolvedTs.Regions) / worker.concurrency * 3 / 2
buffLen := len(resolvedTs.Regions) / worker.concurrency * 2
statefulEvents[i] = &regionStatefulEvent{
resolvedTsEvent: &resolvedTsEvent{
resolvedTs: resolvedTs.Ts,
Expand All @@ -1332,15 +1341,6 @@ func (s *eventFeedSession) sendResolvedTs(
for _, regionID := range resolvedTs.Regions {
state, ok := worker.getRegionState(regionID)
if ok {
if state.isStopped() {
log.Debug("drop resolved ts due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
continue
}
slot := worker.inputCalcSlot(regionID)
statefulEvents[slot].resolvedTsEvent.regions = append(
statefulEvents[slot].resolvedTsEvent.regions, state,
Expand Down
9 changes: 9 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ var (
Help: "The number of region in one batch resolved ts event",
Buckets: prometheus.ExponentialBuckets(2, 2, 16),
}, []string{"namespace", "changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kv",
Name: "resolved_ts_lag_histogram",
Help: "resolved ts lag histogram of changefeeds",
Buckets: prometheus.LinearBuckets(0.5, 0.5, 60),
}, []string{"namespace", "changefeed"})
grpcPoolStreamGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -124,6 +132,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(grpcPoolStreamGauge)
registry.MustRegister(regionEventsBatchSize)

Expand Down
63 changes: 6 additions & 57 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type regionFeedState struct {
requestID uint64
stopped int32

lock sync.RWMutex
initialized bool
initialized atomic.Bool
matcher *matcher
startFeedTime time.Time
lastResolvedTs uint64
Expand All @@ -75,12 +74,6 @@ func (s *regionFeedState) start() {
s.matcher = newMatcher()
}

func (s *regionFeedState) getStartTime() time.Time {
s.lock.RLock()
defer s.lock.RUnlock()
return s.startFeedTime
}

func (s *regionFeedState) markStopped() {
atomic.StoreInt32(&s.stopped, 1)
}
Expand All @@ -90,86 +83,42 @@ func (s *regionFeedState) isStopped() bool {
}

func (s *regionFeedState) isInitialized() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.initialized
return s.initialized.Load()
}

func (s *regionFeedState) setInitialized() {
s.lock.Lock()
defer s.lock.Unlock()
s.initialized = true
}

func (s *regionFeedState) getRegionSpan() regionspan.ComparableSpan {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.span
s.initialized.Store(true)
}

func (s *regionFeedState) getRegionID() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.verID.GetID()
}

func (s *regionFeedState) getRequestID() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.requestID
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastResolvedTs
return atomic.LoadUint64(&s.lastResolvedTs)
}

// updateResolvedTs update the resolved ts of the current region feed
func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
if resolvedTs > s.getLastResolvedTs() {
s.lock.Lock()
defer s.lock.Unlock()
s.lastResolvedTs = resolvedTs
atomic.StoreUint64(&s.lastResolvedTs, resolvedTs)
}
}

func (s *regionFeedState) getStoreAddr() string {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.rpcCtx.Addr
}

// setRegionInfoResolvedTs is only called when the region disconnect,
// to update the `singleRegionInfo` which is reused by reconnect.
func (s *regionFeedState) setRegionInfoResolvedTs() {
s.lock.RLock()
if s.lastResolvedTs <= s.sri.resolvedTs {
s.lock.RUnlock()
if s.getLastResolvedTs() <= s.sri.resolvedTs {
return
}
s.lock.RUnlock()

s.lock.Lock()
defer s.lock.Unlock()
s.sri.resolvedTs = s.lastResolvedTs
}

func (s *regionFeedState) getRegionInfoResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.resolvedTs
}

func (s *regionFeedState) getRegionInfo() singleRegionInfo {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri
}

func (s *regionFeedState) getRegionMeta() (uint64, regionspan.ComparableSpan, time.Time, string) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr
}

Expand Down
50 changes: 27 additions & 23 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/hex"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -193,9 +194,9 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
zap.String("namespace", w.session.client.changefeed.Namespace),
zap.String("changefeed", w.session.client.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.getRequestID()),
zap.Stringer("span", state.getRegionSpan()),
zap.Uint64("resolvedTs", state.getRegionInfoResolvedTs()),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", state.sri.span),
zap.Uint64("resolvedTs", state.sri.resolvedTs),
zap.Error(err))
// if state is already marked stopped, it must have been or would be processed by `onRegionFail`
if state.isStopped() {
Expand Down Expand Up @@ -270,25 +271,28 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.String("changefeed", w.session.client.changefeed.ID))
continue
}
expired := make([]*regionTsInfo, 0)
for w.rtsManager.Len() > 0 {
item := w.rtsManager.Pop()
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(item.ts.resolvedTs))
expired := make([]*regionTsInfo, 0, len(w.rtsManager.m))
for _, item := range w.rtsManager.m {
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(item.resolvedTs))
// region does not reach resolve lock boundary, put it back
if sinceLastResolvedTs < resolveLockInterval {
w.rtsManager.Insert(item)
break
continue
}
expired = append(expired, item)
if len(expired) >= maxResolvedLockPerLoop {
break
}
}
if len(expired) == 0 {
continue
}
var r regionTsHeap = expired
sort.Sort(r)
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
processed := 0
for _, rts := range expired {
if processed >= maxResolvedLockPerLoop {
break
}
processed++
w.rtsManager.Remove(rts.regionID)
state, ok := w.getRegionState(rts.regionID)
if !ok || state.isStopped() {
// state is already deleted or stopped, just continue,
Expand All @@ -299,7 +303,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
lastResolvedTs := state.getLastResolvedTs()
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(lastResolvedTs))
if sinceLastResolvedTs >= resolveLockInterval {
sinceLastEvent := time.Since(rts.ts.eventTime)
sinceLastEvent := time.Since(rts.eventTime)
if sinceLastResolvedTs > reconnectInterval && sinceLastEvent > reconnectInterval {
log.Warn("kv client reconnect triggered",
zap.String("namespace", w.session.client.changefeed.Namespace),
Expand All @@ -310,11 +314,11 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
}
// Only resolve lock if the resolved-ts keeps unchanged for
// more than resolveLockPenalty times.
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
if rts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.resolvedTs {
rts.resolvedTs = lastResolvedTs
rts.eventTime = time.Now()
rts.penalty = 0
}
w.rtsManager.Insert(rts)
continue
Expand All @@ -325,7 +329,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.String("changefeed", w.session.client.changefeed.ID),
zap.String("addr", w.storeAddr),
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
zap.Stringer("span", state.sri.span),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs),
Expand All @@ -340,9 +344,9 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Error(err))
continue
}
rts.ts.penalty = 0
rts.penalty = 0
}
rts.ts.resolvedTs = lastResolvedTs
rts.resolvedTs = lastResolvedTs
w.rtsManager.Insert(rts)
}
}
Expand Down Expand Up @@ -727,7 +731,7 @@ func (w *regionWorker) handleResolvedTs(
regions := make([]uint64, 0, len(revents.regions))

for _, state := range revents.regions {
if !state.isInitialized() {
if state.isStopped() || !state.isInitialized() {
continue
}
regionID := state.getRegionID()
Expand Down Expand Up @@ -762,7 +766,7 @@ func (w *regionWorker) handleResolvedTs(
default:
}
for _, state := range revents.regions {
if !state.isInitialized() {
if state.isStopped() || !state.isInitialized() {
continue
}
state.updateResolvedTs(resolvedTs)
Expand Down
17 changes: 6 additions & 11 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
regionID := regionIDs[idx]
s, ok := rsm.getState(regionID)
require.True(t, ok)
s.lock.RLock()
require.Equal(t, uint64(idx+1), s.requestID)
s.lock.RUnlock()
}
}()
}
Expand All @@ -79,10 +77,10 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
regionID := regionIDs[rand.Intn(regionCount)]
s, ok := rsm.getState(regionID)
require.True(t, ok)
s.lock.Lock()
s.lastResolvedTs += 10
s.lock.Unlock()
lastResolvedTs := s.getLastResolvedTs()
s.updateResolvedTs(s.getLastResolvedTs() + 10)
rsm.setState(regionID, s)
require.GreaterOrEqual(t, s.getLastResolvedTs(), lastResolvedTs)
}
}()
}
Expand All @@ -95,9 +93,6 @@ func TestRegionStateManagerThreadSafe(t *testing.T) {
require.Greater(t, s.lastResolvedTs, uint64(1000))
totalResolvedTs += s.lastResolvedTs
}
// 100 regions, initial resolved ts 1000;
// 2000 * resolved ts forward, increased by 10 each time, routine number is `concurrency`.
require.Equal(t, uint64(100*1000+2000*10*concurrency), totalResolvedTs)
}

func TestRegionStateManagerBucket(t *testing.T) {
Expand Down Expand Up @@ -273,19 +268,19 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) {
s1 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(1, 1, 1),
}, 1)
s1.initialized = true
s1.initialized.Store(true)
s1.lastResolvedTs = 9

s2 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(2, 2, 2),
}, 2)
s2.initialized = true
s2.initialized.Store(true)
s2.lastResolvedTs = 11

s3 := newRegionFeedState(singleRegionInfo{
verID: tikv.NewRegionVerID(3, 3, 3),
}, 3)
s3.initialized = false
s3.initialized.Store(false)
s3.lastResolvedTs = 8
err := w.handleResolvedTs(ctx, &resolvedTsEvent{
resolvedTs: 10,
Expand Down
Loading