-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
new_owner: implement gc manager #1819
Conversation
switch cfState.Info.State { | ||
case model.StateNormal, model.StateStopped, model.StateError: | ||
default: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a debug log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to log this, i think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there is no need to keep a log here, for no particular reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was that we could use some log to debug the calculation of gcSafePoint in case of unexpected behavior. But on second thought, the log might produce too many lines and it might not be that useful. So NVM.
cdc/owner/gc_manager.go
Outdated
return m.pdTimeCached, nil | ||
} | ||
|
||
func (m *gcManager) CheckTsTooFarBehindToStop(ctx cdcContext.Context, checkpointTs model.Ts) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is not used, will it be used in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it used by changefeed
} | ||
minCheckpointTs := uint64(math.MaxUint64) | ||
for _, cfState := range state.Changefeeds { | ||
if cfState.Info == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, if ChangefeedReactorState.ChangeFeedInfo==nil, is it a valid state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it's not a valid state
switch cfState.Info.State { | ||
case model.StateNormal, model.StateStopped, model.StateError: | ||
default: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -45,6 +45,7 @@ type FeedState string | |||
// All FeedStates | |||
const ( | |||
StateNormal FeedState = "normal" | |||
StateError FeedState = "error" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By adding a new state, does it cause compatibility issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it will not cause compatibility issues.
failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { | ||
gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a failpoint? And why does it change a global variable inside a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's port from the old owner, and this failpoint is already used by some test cases
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if checkpointTs < m.lastSafePointTs || pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen when we create a changefeed with a start ts x where gc safepoint is x-48h and gcTTL is 24h?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the owner will stop the changefeed and set the feed state to failed
/lgtm |
/lgtm |
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by writing |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 0a9f145
|
/run-integration-tests |
/run-kafka-tests |
/run-integration-tests |
/run-kafka-tests |
/run-integration-tests |
1 similar comment
/run-integration-tests |
/run-integration-tests |
Codecov Report
@@ Coverage Diff @@
## master #1819 +/- ##
================================================
+ Coverage 53.4083% 53.6231% +0.2147%
================================================
Files 154 157 +3
Lines 16166 16312 +146
================================================
+ Hits 8634 8747 +113
- Misses 6608 6618 +10
- Partials 924 947 +23 |
In response to a cherrypick label: new pull request created: #1841. |
In response to a cherrypick label: new pull request created: #1842. |
What problem does this PR solve?
GC manager: a module of the new owner, which reports the min checkpoint ts to PD GC service and checks the changefeed which of checkpoint ts is behind too far to stop
Check List
Tests
Release note