Skip to content

Commit

Permalink
timer: Add TimerStore add a memory implement
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed May 19, 2023
1 parent 2c27300 commit 2a2eb98
Show file tree
Hide file tree
Showing 11 changed files with 1,103 additions and 24 deletions.
15 changes: 13 additions & 2 deletions timer/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ go_library(
name = "api",
srcs = [
"client.go",
"error.go",
"hook.go",
"mem_store.go",
"store.go",
"timer.go",
],
importpath = "github.com/pingcap/tidb/timer/api",
visibility = ["//visibility:public"],
deps = [
"//parser/duration",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//slices",
],
)

Expand All @@ -21,11 +25,18 @@ go_test(
timeout = "short",
srcs = [
"client_test.go",
"main_test.go",
"schedule_policy_test.go",
"store_test.go",
"timer_test.go",
],
embed = [":api"],
flaky = True,
shard_count = 7,
deps = ["@com_github_stretchr_testify//require"],
shard_count = 12,
deps = [
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
100 changes: 98 additions & 2 deletions timer/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ package api

import (
"context"
"strings"
"time"
"unsafe"

"github.com/pingcap/errors"
)

// GetTimerOption is the option to get timers
Expand Down Expand Up @@ -88,11 +92,103 @@ type TimerClient interface {
// GetTimerByKey queries the timer by key
GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error)
// GetTimers queries timers by options
GetTimers(ctx context.Context, opt ...GetTimerOption) ([]*TimerRecord, error)
GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error)
// UpdateTimer updates a timer
UpdateTimer(ctx context.Context, timerID string, opt ...UpdateTimerOption) error
UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error
// CloseTimerEvent closes the triggering event of a timer
CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error
// DeleteTimer deletes a timer
DeleteTimer(ctx context.Context, timerID string) (bool, error)
}

// DefaultNamespace is the default namespace
const DefaultNamespace = "default"

// defaultTimerClient is the default implement of timer client
type defaultTimerClient struct {
namespace string
store *TimerStore
}

// NewDefaultTimerClient creates a new defaultTimerClient
func NewDefaultTimerClient(store *TimerStore) TimerClient {
return &defaultTimerClient{
namespace: DefaultNamespace,
store: store,
}
}

func (c *defaultTimerClient) GetDefaultNamespace() string {
return c.namespace
}

func (c *defaultTimerClient) CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error) {
if spec.Namespace == "" {
spec.Namespace = c.namespace
}

timerID, err := c.store.Create(ctx, &TimerRecord{
TimerSpec: spec,
})

if err != nil {
return nil, err
}
return c.store.GetByID(ctx, timerID)
}

func (c *defaultTimerClient) GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error) {
return c.store.GetByID(ctx, timerID)
}

func (c *defaultTimerClient) GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error) {
return c.store.GetByKey(ctx, c.namespace, key)
}

func (c *defaultTimerClient) GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error) {
cond := &TimerCond{}
for _, opt := range opts {
opt(cond)
}
return c.store.List(ctx, cond)
}

func (c *defaultTimerClient) UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error {
update := &TimerUpdate{}
for _, opt := range opts {
opt(update)
}
return c.store.Update(ctx, timerID, update)
}

func (c *defaultTimerClient) CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error {
update := &TimerUpdate{}
for _, opt := range opts {
opt(update)
}

fields := update.FieldsSet(unsafe.Pointer(&update.Watermark), unsafe.Pointer(&update.SummaryData))
if len(fields) > 0 {
return errors.Errorf("The field(s) [%s] are not allowed to update when close event", strings.Join(fields, ", "))
}

timer, err := c.GetTimerByID(ctx, timerID)
if err != nil {
return err
}

var zeroTime time.Time
update.CheckEventID.Set(eventID)
update.EventStatus.Set(SchedEventIdle)
update.EventID.Set("")
update.EventData.Set(nil)
update.EventStart.Set(zeroTime)
if !update.Watermark.Present() {
update.Watermark.Set(timer.EventStart)
}
return c.store.Update(ctx, timerID, update)
}

func (c *defaultTimerClient) DeleteTimer(ctx context.Context, timerID string) (bool, error) {
return c.store.Delete(ctx, timerID)
}
109 changes: 109 additions & 0 deletions timer/api/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package api

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -118,3 +120,110 @@ func TestUpdateTimerOption(t *testing.T) {
require.Equal(t, []byte("hello"), summary)
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet())
}

func TestDefaultClient(t *testing.T) {
store := NewMemoryTimerStore()
cli := NewDefaultTimerClient(store)
ctx := context.Background()
spec := TimerSpec{
Key: "k1",
SchedPolicyType: SchedEventInterval,
SchedPolicyExpr: "1h",
Data: []byte("data1"),
}

// create
timer, err := cli.CreateTimer(ctx, spec)
require.NoError(t, err)
spec.Namespace = "default"
require.NotEmpty(t, timer.ID)
require.Equal(t, spec, timer.TimerSpec)
require.Equal(t, SchedEventIdle, timer.EventStatus)
require.Equal(t, "", timer.EventID)
require.Empty(t, timer.EventData)
require.Empty(t, timer.SummaryData)

// get by id
got, err := cli.GetTimerByID(ctx, timer.ID)
require.NoError(t, err)
require.Equal(t, timer, got)

// get by key
got, err = cli.GetTimerByKey(ctx, timer.Key)
require.NoError(t, err)
require.Equal(t, timer, got)

// get by key prefix
tms, err := cli.GetTimers(ctx, WithKeyPrefix("k"))
require.NoError(t, err)
require.Equal(t, 1, len(tms))
require.Equal(t, timer, tms[0])

// update
err = cli.UpdateTimer(ctx, timer.ID, WithSetSchedExpr(SchedEventInterval, "3h"))
require.NoError(t, err)
timer.SchedPolicyType = SchedEventInterval
timer.SchedPolicyExpr = "3h"
got, err = cli.GetTimerByID(ctx, timer.ID)
require.NoError(t, err)
require.Greater(t, got.Version, timer.Version)
timer.Version = got.Version
require.Equal(t, timer, got)

// close event
eventStart := time.Now().Add(-time.Second)
err = store.Update(ctx, timer.ID, &TimerUpdate{
EventStatus: NewOptionalVal(SchedEventTrigger),
EventID: NewOptionalVal("event1"),
EventData: NewOptionalVal([]byte("d1")),
SummaryData: NewOptionalVal([]byte("s1")),
EventStart: NewOptionalVal(eventStart),
})
require.NoError(t, err)
err = cli.CloseTimerEvent(ctx, timer.ID, "event2")
require.True(t, errors.ErrorEqual(ErrEventIDNotMatch, err))

err = cli.CloseTimerEvent(ctx, timer.ID, "event2", WithSetSchedExpr(SchedEventInterval, "1h"))
require.EqualError(t, err, "The field(s) [SchedPolicyType, SchedPolicyExpr] are not allowed to update when close event")

err = cli.CloseTimerEvent(ctx, timer.ID, "event1")
require.NoError(t, err)
timer, err = cli.GetTimerByID(ctx, timer.ID)
require.NoError(t, err)
require.Equal(t, SchedEventIdle, timer.EventStatus)
require.Empty(t, timer.EventID)
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s1"), timer.SummaryData)
require.Equal(t, eventStart, timer.Watermark)

// close event with option
err = store.Update(ctx, timer.ID, &TimerUpdate{
EventID: NewOptionalVal("event1"),
EventData: NewOptionalVal([]byte("d1")),
SummaryData: NewOptionalVal([]byte("s1")),
})
require.NoError(t, err)

watermark := time.Now().Add(time.Hour)
err = cli.CloseTimerEvent(ctx, timer.ID, "event1", WithSetWatermark(watermark), WithSetSummaryData([]byte("s2")))
require.NoError(t, err)
timer, err = cli.GetTimerByID(ctx, timer.ID)
require.NoError(t, err)
require.Equal(t, SchedEventIdle, timer.EventStatus)
require.Empty(t, timer.EventID)
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s2"), timer.SummaryData)
require.Equal(t, watermark, timer.Watermark)

// delete
exit, err := cli.DeleteTimer(ctx, timer.ID)
require.NoError(t, err)
require.True(t, exit)

// delete no exist
exit, err = cli.DeleteTimer(ctx, timer.ID)
require.NoError(t, err)
require.False(t, exit)
}
29 changes: 29 additions & 0 deletions timer/api/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import "errors"

// ErrTimerNotExist indicates that the specified timer not exist
var ErrTimerNotExist = errors.New("timer not exist")

// ErrTimerExists indicates that the specified timer already exits
var ErrTimerExists = errors.New("timer already exists")

// ErrVersionNotMatch indicates that the timer's version not match
var ErrVersionNotMatch = errors.New("timer version not match")

// ErrEventIDNotMatch indicates that the timer's event id not match
var ErrEventIDNotMatch = errors.New("timer event id not match")
2 changes: 1 addition & 1 deletion timer/api/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Hook interface {
// For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed.
// Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered,
// use `event.EventID()` to get the event id instead.
OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (*TimerShedEvent, error)
OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (PreSchedEventResult, error)
// OnSchedEvent will be called when a new event is triggered.
OnSchedEvent(ctx context.Context, event TimerShedEvent) error
}
Expand Down
27 changes: 27 additions & 0 deletions timer/api/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"testing"

"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m)
}
Loading

0 comments on commit 2a2eb98

Please sign in to comment.