Skip to content

Commit

Permalink
Merge branch 'master' into outer2inner
Browse files Browse the repository at this point in the history
  • Loading branch information
ghazalfamilyusa committed May 12, 2024
2 parents fca7854 + c73d6c5 commit e912826
Show file tree
Hide file tree
Showing 221 changed files with 3,230 additions and 2,110 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,14 @@ bazel_pipelineddmltest: failpoint-enable bazel_ci_simple_prepare
-- //tests/realtikvtest/pipelineddmltest/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
.PHONY: bazel_flashbacktest
bazel_flashbacktest: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) $(BAZEL_INSTRUMENTATION_FILTER) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/flashbacktest/...
./build/jenkins_collect_coverage.sh

.PHONY: bazel_lint
bazel_lint: bazel_prepare
bazel build //... --//build:with_nogo_flag=true
Expand Down
10 changes: 5 additions & 5 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ versions.check(minimum_bazel_version = "6.0.0")

http_archive(
name = "io_bazel_rules_go",
sha256 = "80a98277ad1311dacd837f9b16db62887702e9f1d1c4c9f796d0121a46c8e184",
sha256 = "af47f30e9cbd70ae34e49866e201b3f77069abb111183f2c0297e7e74ba6bbc0",
urls = [
"http://bazel-cache.pingcap.net:8080/bazelbuild/rules_go/releases/download/v0.46.0/rules_go-v0.46.0.zip",
"http://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.46.0/rules_go-v0.46.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.46.0/rules_go-v0.46.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.46.0/rules_go-v0.46.0.zip",
"http://bazel-cache.pingcap.net:8080/bazelbuild/rules_go/releases/download/v0.47.0/rules_go-v0.47.0.zip",
"http://ats.apps.svc/bazelbuild/rules_go/releases/download/v0.47.0/rules_go-v0.47.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.47.0/rules_go-v0.47.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.47.0/rules_go-v0.47.0.zip",
],
)

Expand Down
16 changes: 16 additions & 0 deletions br/pkg/config/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,19 @@ func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) {
urs := uint64(rs)
return urs, c.Cop.RegionSplitKeys, nil
}

func ParseLogBackupEnableFromConfig(resp []byte) (bool, error) {
type logbackup struct {
Enable bool `json:"enable"`
}

type config struct {
LogBackup logbackup `json:"log-backup"`
}
var c config
e := json.Unmarshal(resp, &c)
if e != nil {
return false, e
}
return c.LogBackup.Enable, nil
}
2 changes: 1 addition & 1 deletion br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ go_test(
],
embed = [":conn"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//br/pkg/config",
"//br/pkg/conn/util",
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,25 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig,
}
}

// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup.
func (mgr *Mgr) IsLogBackupEnabled(ctx context.Context, client *http.Client) (bool, error) {
logbackupEnable := true
err := mgr.GetConfigFromTiKV(ctx, client, func(resp *http.Response) error {
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
enable, err := kvconfig.ParseLogBackupEnableFromConfig(respBytes)
if err != nil {
log.Warn("Failed to parse log-backup enable from config", logutil.ShortError(err))
return err
}
logbackupEnable = logbackupEnable && enable
return nil
})
return logbackupEnable, errors.Trace(err)
}

// GetConfigFromTiKV get configs from all alive tikv stores.
func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error {
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash)
Expand Down
163 changes: 163 additions & 0 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,169 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
}
}

func TestIsLogBackupEnabled(t *testing.T) {
cases := []struct {
stores []*metapb.Store
content []string
enable bool
err bool
}{
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
},
content: []string{""},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"",
// Assuming the TiKV has failed due to some reason.
"",
},
enable: false,
err: true,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
},
enable: true,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
{
stores: []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 2,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
},
content: []string{
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": true}}",
"{\"log-level\": \"debug\", \"log-backup\": {\"enable\": false}}",
},
enable: false,
err: false,
},
}

pctx := context.Background()
for _, ca := range cases {
ctx, cancel := context.WithCancel(pctx)
pdCli := utils.FakePDClient{Stores: ca.stores}
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/config":
if len(ca.content[count]) == 0 {
cancel()
}
_, _ = fmt.Fprint(w, ca.content[count])
default:
http.NotFoundHandler().ServeHTTP(w, r)
}
count++
}))

for _, s := range ca.stores {
s.Address = mockServer.URL
s.StatusAddress = mockServer.URL
}

httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
enable, err := mgr.IsLogBackupEnabled(ctx, httpCli)
if ca.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, ca.enable, enable)
}
mockServer.Close()
}
}

func TestHandleTiKVAddress(t *testing.T) {
cases := []struct {
store *metapb.Store
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,5 +425,5 @@ func TestGetExistedUserDBs(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(195), session.CurrentBootstrapVersion)
require.Equal(t, int64(196), session.CurrentBootstrapVersion)
}
9 changes: 9 additions & 0 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,15 @@ func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrin
}
}

func (ctl *StatusController) Close() error {
if ctl.meta != nil {
if err := ctl.meta.Close(); err != nil {
return errors.Trace(err)
}
}
return nil
}

// fillTask queries and fills the extra information for a raw task.
func (ctl *StatusController) fillTask(ctx context.Context, task Task) (TaskStatus, error) {
var err error
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
if _, err := c.env.BlockGCUntil(ctx, 0); err != nil {
if err := c.env.UnblockGC(ctx); err != nil {
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
Expand Down
18 changes: 17 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package streamhelper

import (
"context"
"math"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -46,7 +48,21 @@ type PDRegionScanner struct {
// Returns the minimal service GC safe point across all services.
// If the arguments is `0`, this would remove the service safe point.
func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
minimalSafePoint, err := c.UpdateServiceGCSafePoint(
ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
if err != nil {
return 0, errors.Annotate(err, "failed to block gc until")
}
if minimalSafePoint > at {
return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at)
}
return at, nil
}

func (c PDRegionScanner) UnblockGC(ctx context.Context) error {
// set ttl to 0, means remove the safe point.
_, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64)
return err
}

// TODO: It should be able to synchoronize the current TS with the PD.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestGCServiceSafePoint(t *testing.T) {
req.Eventually(func() bool {
env.fakeCluster.mu.Lock()
defer env.fakeCluster.mu.Unlock()
return env.serviceGCSafePoint == 0
return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted
}, 3*time.Second, 100*time.Millisecond)
}

Expand Down
20 changes: 12 additions & 8 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type fakeCluster struct {
regions []*region
testCtx *testing.T

onGetClient func(uint64) error
onClearCache func(uint64) error
serviceGCSafePoint uint64
currentTS uint64
onGetClient func(uint64) error
onClearCache func(uint64) error
serviceGCSafePoint uint64
serviceGCSafePointDeleted bool
currentTS uint64
}

func (r *region) splitAt(newID uint64, k string) *region {
Expand Down Expand Up @@ -264,17 +265,20 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
f.mu.Lock()
defer f.mu.Unlock()
if at == 0 {
f.serviceGCSafePoint = at
return at, nil
}
if f.serviceGCSafePoint > at {
return f.serviceGCSafePoint, nil
}
f.serviceGCSafePoint = at
return at, nil
}

func (f *fakeCluster) UnblockGC(ctx context.Context) error {
f.mu.Lock()
defer f.mu.Unlock()
f.serviceGCSafePointDeleted = true
return nil
}

func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) {
return f.currentTS, nil
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type TiKVClusterMeta interface {
// For now, all tasks (exactly one task in fact) use the same checkpoint.
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)

// UnblockGC used to remove the service GC safe point in PD.
UnblockGC(ctx context.Context) error

FetchCurrentTS(ctx context.Context) (uint64, error)
}

Expand Down
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
}

func (c constantRegions) UnblockGC(ctx context.Context) error {
return status.Error(codes.Unimplemented, "Unsupported operation")
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
Expand Down
Loading

0 comments on commit e912826

Please sign in to comment.