From 2a2eb9868b01a3f50a0dc1d8d175bcac7221441c Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 18 May 2023 11:02:55 +0800 Subject: [PATCH] timer: Add `TimerStore` add a memory implement --- timer/api/BUILD.bazel | 15 +- timer/api/client.go | 100 ++++++++++- timer/api/client_test.go | 109 ++++++++++++ timer/api/error.go | 29 ++++ timer/api/hook.go | 2 +- timer/api/main_test.go | 27 +++ timer/api/mem_store.go | 224 ++++++++++++++++++++++++ timer/api/store.go | 198 ++++++++++++++++++++- timer/api/store_test.go | 363 +++++++++++++++++++++++++++++++++++++-- timer/api/timer.go | 12 +- timer/api/timer_test.go | 48 ++++++ 11 files changed, 1103 insertions(+), 24 deletions(-) create mode 100644 timer/api/error.go create mode 100644 timer/api/main_test.go create mode 100644 timer/api/mem_store.go create mode 100644 timer/api/timer_test.go diff --git a/timer/api/BUILD.bazel b/timer/api/BUILD.bazel index 5289fee58e34f..29d52d03025f4 100644 --- a/timer/api/BUILD.bazel +++ b/timer/api/BUILD.bazel @@ -4,7 +4,9 @@ go_library( name = "api", srcs = [ "client.go", + "error.go", "hook.go", + "mem_store.go", "store.go", "timer.go", ], @@ -12,7 +14,9 @@ go_library( visibility = ["//visibility:public"], deps = [ "//parser/duration", + "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", + "@org_golang_x_exp//slices", ], ) @@ -21,11 +25,18 @@ go_test( timeout = "short", srcs = [ "client_test.go", + "main_test.go", "schedule_policy_test.go", "store_test.go", + "timer_test.go", ], embed = [":api"], flaky = True, - shard_count = 7, - deps = ["@com_github_stretchr_testify//require"], + shard_count = 12, + deps = [ + "//testkit/testsetup", + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], ) diff --git a/timer/api/client.go b/timer/api/client.go index 692f6da21fb03..73c9af0b5b2d3 100644 --- a/timer/api/client.go +++ b/timer/api/client.go @@ -16,7 +16,11 @@ package api import ( "context" + "strings" "time" + "unsafe" + + "github.com/pingcap/errors" ) // GetTimerOption is the option to get timers @@ -88,11 +92,103 @@ type TimerClient interface { // GetTimerByKey queries the timer by key GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error) // GetTimers queries timers by options - GetTimers(ctx context.Context, opt ...GetTimerOption) ([]*TimerRecord, error) + GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error) // UpdateTimer updates a timer - UpdateTimer(ctx context.Context, timerID string, opt ...UpdateTimerOption) error + UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error // CloseTimerEvent closes the triggering event of a timer CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error // DeleteTimer deletes a timer DeleteTimer(ctx context.Context, timerID string) (bool, error) } + +// DefaultNamespace is the default namespace +const DefaultNamespace = "default" + +// defaultTimerClient is the default implement of timer client +type defaultTimerClient struct { + namespace string + store *TimerStore +} + +// NewDefaultTimerClient creates a new defaultTimerClient +func NewDefaultTimerClient(store *TimerStore) TimerClient { + return &defaultTimerClient{ + namespace: DefaultNamespace, + store: store, + } +} + +func (c *defaultTimerClient) GetDefaultNamespace() string { + return c.namespace +} + +func (c *defaultTimerClient) CreateTimer(ctx context.Context, spec TimerSpec) (*TimerRecord, error) { + if spec.Namespace == "" { + spec.Namespace = c.namespace + } + + timerID, err := c.store.Create(ctx, &TimerRecord{ + TimerSpec: spec, + }) + + if err != nil { + return nil, err + } + return c.store.GetByID(ctx, timerID) +} + +func (c *defaultTimerClient) GetTimerByID(ctx context.Context, timerID string) (*TimerRecord, error) { + return c.store.GetByID(ctx, timerID) +} + +func (c *defaultTimerClient) GetTimerByKey(ctx context.Context, key string) (*TimerRecord, error) { + return c.store.GetByKey(ctx, c.namespace, key) +} + +func (c *defaultTimerClient) GetTimers(ctx context.Context, opts ...GetTimerOption) ([]*TimerRecord, error) { + cond := &TimerCond{} + for _, opt := range opts { + opt(cond) + } + return c.store.List(ctx, cond) +} + +func (c *defaultTimerClient) UpdateTimer(ctx context.Context, timerID string, opts ...UpdateTimerOption) error { + update := &TimerUpdate{} + for _, opt := range opts { + opt(update) + } + return c.store.Update(ctx, timerID, update) +} + +func (c *defaultTimerClient) CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...UpdateTimerOption) error { + update := &TimerUpdate{} + for _, opt := range opts { + opt(update) + } + + fields := update.FieldsSet(unsafe.Pointer(&update.Watermark), unsafe.Pointer(&update.SummaryData)) + if len(fields) > 0 { + return errors.Errorf("The field(s) [%s] are not allowed to update when close event", strings.Join(fields, ", ")) + } + + timer, err := c.GetTimerByID(ctx, timerID) + if err != nil { + return err + } + + var zeroTime time.Time + update.CheckEventID.Set(eventID) + update.EventStatus.Set(SchedEventIdle) + update.EventID.Set("") + update.EventData.Set(nil) + update.EventStart.Set(zeroTime) + if !update.Watermark.Present() { + update.Watermark.Set(timer.EventStart) + } + return c.store.Update(ctx, timerID, update) +} + +func (c *defaultTimerClient) DeleteTimer(ctx context.Context, timerID string) (bool, error) { + return c.store.Delete(ctx, timerID) +} diff --git a/timer/api/client_test.go b/timer/api/client_test.go index 958123f3a4ad7..a75505e3ac988 100644 --- a/timer/api/client_test.go +++ b/timer/api/client_test.go @@ -15,9 +15,11 @@ package api import ( + "context" "testing" "time" + "github.com/pingcap/errors" "github.com/stretchr/testify/require" ) @@ -118,3 +120,110 @@ func TestUpdateTimerOption(t *testing.T) { require.Equal(t, []byte("hello"), summary) require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet()) } + +func TestDefaultClient(t *testing.T) { + store := NewMemoryTimerStore() + cli := NewDefaultTimerClient(store) + ctx := context.Background() + spec := TimerSpec{ + Key: "k1", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "1h", + Data: []byte("data1"), + } + + // create + timer, err := cli.CreateTimer(ctx, spec) + require.NoError(t, err) + spec.Namespace = "default" + require.NotEmpty(t, timer.ID) + require.Equal(t, spec, timer.TimerSpec) + require.Equal(t, SchedEventIdle, timer.EventStatus) + require.Equal(t, "", timer.EventID) + require.Empty(t, timer.EventData) + require.Empty(t, timer.SummaryData) + + // get by id + got, err := cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timer, got) + + // get by key + got, err = cli.GetTimerByKey(ctx, timer.Key) + require.NoError(t, err) + require.Equal(t, timer, got) + + // get by key prefix + tms, err := cli.GetTimers(ctx, WithKeyPrefix("k")) + require.NoError(t, err) + require.Equal(t, 1, len(tms)) + require.Equal(t, timer, tms[0]) + + // update + err = cli.UpdateTimer(ctx, timer.ID, WithSetSchedExpr(SchedEventInterval, "3h")) + require.NoError(t, err) + timer.SchedPolicyType = SchedEventInterval + timer.SchedPolicyExpr = "3h" + got, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Greater(t, got.Version, timer.Version) + timer.Version = got.Version + require.Equal(t, timer, got) + + // close event + eventStart := time.Now().Add(-time.Second) + err = store.Update(ctx, timer.ID, &TimerUpdate{ + EventStatus: NewOptionalVal(SchedEventTrigger), + EventID: NewOptionalVal("event1"), + EventData: NewOptionalVal([]byte("d1")), + SummaryData: NewOptionalVal([]byte("s1")), + EventStart: NewOptionalVal(eventStart), + }) + require.NoError(t, err) + err = cli.CloseTimerEvent(ctx, timer.ID, "event2") + require.True(t, errors.ErrorEqual(ErrEventIDNotMatch, err)) + + err = cli.CloseTimerEvent(ctx, timer.ID, "event2", WithSetSchedExpr(SchedEventInterval, "1h")) + require.EqualError(t, err, "The field(s) [SchedPolicyType, SchedPolicyExpr] are not allowed to update when close event") + + err = cli.CloseTimerEvent(ctx, timer.ID, "event1") + require.NoError(t, err) + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Empty(t, timer.EventData) + require.True(t, timer.EventStart.IsZero()) + require.Equal(t, []byte("s1"), timer.SummaryData) + require.Equal(t, eventStart, timer.Watermark) + + // close event with option + err = store.Update(ctx, timer.ID, &TimerUpdate{ + EventID: NewOptionalVal("event1"), + EventData: NewOptionalVal([]byte("d1")), + SummaryData: NewOptionalVal([]byte("s1")), + }) + require.NoError(t, err) + + watermark := time.Now().Add(time.Hour) + err = cli.CloseTimerEvent(ctx, timer.ID, "event1", WithSetWatermark(watermark), WithSetSummaryData([]byte("s2"))) + require.NoError(t, err) + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Empty(t, timer.EventData) + require.True(t, timer.EventStart.IsZero()) + require.Equal(t, []byte("s2"), timer.SummaryData) + require.Equal(t, watermark, timer.Watermark) + + // delete + exit, err := cli.DeleteTimer(ctx, timer.ID) + require.NoError(t, err) + require.True(t, exit) + + // delete no exist + exit, err = cli.DeleteTimer(ctx, timer.ID) + require.NoError(t, err) + require.False(t, exit) +} diff --git a/timer/api/error.go b/timer/api/error.go new file mode 100644 index 0000000000000..40e593daa500a --- /dev/null +++ b/timer/api/error.go @@ -0,0 +1,29 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import "errors" + +// ErrTimerNotExist indicates that the specified timer not exist +var ErrTimerNotExist = errors.New("timer not exist") + +// ErrTimerExists indicates that the specified timer already exits +var ErrTimerExists = errors.New("timer already exists") + +// ErrVersionNotMatch indicates that the timer's version not match +var ErrVersionNotMatch = errors.New("timer version not match") + +// ErrEventIDNotMatch indicates that the timer's event id not match +var ErrEventIDNotMatch = errors.New("timer event id not match") diff --git a/timer/api/hook.go b/timer/api/hook.go index afa564df592a1..a0c977927bcee 100644 --- a/timer/api/hook.go +++ b/timer/api/hook.go @@ -50,7 +50,7 @@ type Hook interface { // For example, if `TimerShedEvent.Delay` is a non-zero value, the event triggering will be postponed. // Notice that `event.Timer().EventID` will be empty because the current event is not actually triggered, // use `event.EventID()` to get the event id instead. - OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (*TimerShedEvent, error) + OnPreSchedEvent(ctx context.Context, event TimerShedEvent) (PreSchedEventResult, error) // OnSchedEvent will be called when a new event is triggered. OnSchedEvent(ctx context.Context, event TimerShedEvent) error } diff --git a/timer/api/main_test.go b/timer/api/main_test.go new file mode 100644 index 0000000000000..6d4377c05c27d --- /dev/null +++ b/timer/api/main_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "testing" + + "github.com/pingcap/tidb/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testsetup.SetupForCommonTest() + goleak.VerifyTestMain(m) +} diff --git a/timer/api/mem_store.go b/timer/api/mem_store.go new file mode 100644 index 0000000000000..2a82667a5164b --- /dev/null +++ b/timer/api/mem_store.go @@ -0,0 +1,224 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "encoding/hex" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "golang.org/x/exp/slices" +) + +type memStoreWatcher struct { + ctx context.Context + ch chan WatchTimerResponse +} + +type memoryStoreCore struct { + mu sync.Mutex + namespaces map[string]map[string]*TimerRecord + id2Timers map[string]*TimerRecord + watchers []*memStoreWatcher +} + +// NewMemoryTimerStore creates a memory store for timers +func NewMemoryTimerStore() *TimerStore { + return &TimerStore{ + TimerStoreCore: &memoryStoreCore{ + namespaces: make(map[string]map[string]*TimerRecord), + id2Timers: make(map[string]*TimerRecord), + }, + } +} + +func (s *memoryStoreCore) Create(_ context.Context, record *TimerRecord) (string, error) { + if record == nil { + return "", errors.New("timer should not be nil") + } + + if err := record.Validate(); err != nil { + return "", err + } + + s.mu.Lock() + defer s.mu.Unlock() + + record = record.Clone() + if record.ID == "" { + uid := uuid.New() + record.ID = hex.EncodeToString(uid[:]) + } + + if record.Version == 0 { + record.Version = 1 + } + + if record.EventStatus == "" { + record.EventStatus = SchedEventIdle + } + + if record.CreateTime.IsZero() { + record.CreateTime = time.Now() + } + + if _, ok := s.id2Timers[record.ID]; ok { + return "", errors.Trace(ErrTimerExists) + } + + ns, ok := s.namespaces[record.Namespace] + if !ok { + ns = make(map[string]*TimerRecord) + s.namespaces[record.Namespace] = ns + } else { + if _, ok = ns[record.Key]; ok { + return "", errors.Trace(ErrTimerExists) + } + } + + s.id2Timers[record.ID] = record + ns[record.Key] = record + s.notify(WatchTimerEventCreate, record.ID) + return record.ID, nil +} + +func (s *memoryStoreCore) List(_ context.Context, cond Cond) ([]*TimerRecord, error) { + result := make([]*TimerRecord, 0, 1) + for _, ns := range s.namespaces { + for _, t := range ns { + if cond == nil || cond.Match(t) { + result = append(result, t.Clone()) + } + } + } + return result, nil +} + +func (s *memoryStoreCore) Update(_ context.Context, timerID string, update *TimerUpdate) error { + if update == nil { + return errors.New("update should not be nil") + } + + s.mu.Lock() + defer s.mu.Unlock() + + record, ok := s.id2Timers[timerID] + if !ok { + return ErrTimerNotExist + } + + newRecord, err := update.Apply(record) + if err != nil { + return err + } + + if err = newRecord.Validate(); err != nil { + return err + } + + newRecord.Version++ + s.id2Timers[timerID] = newRecord + s.namespaces[record.Namespace][record.Key] = newRecord + s.notify(WatchTimerEventUpdate, timerID) + return nil +} + +func (s *memoryStoreCore) Delete(_ context.Context, timerID string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + record, ok := s.id2Timers[timerID] + if !ok { + return false, nil + } + + delete(s.id2Timers, timerID) + ns := s.namespaces[record.Namespace] + delete(ns, record.Key) + if len(ns) == 0 { + delete(s.namespaces, record.Namespace) + } + s.notify(WatchTimerEventDelete, timerID) + return true, nil +} + +func (*memoryStoreCore) WatchSupported() bool { + return true +} + +func (s *memoryStoreCore) Watch(ctx context.Context) WatchTimerChan { + s.mu.Lock() + defer s.mu.Unlock() + watcher := &memStoreWatcher{ + ctx: ctx, + ch: make(chan WatchTimerResponse, 8), + } + s.watchers = append(s.watchers, watcher) + ch := make(chan WatchTimerResponse) + go func() { + defer func() { + s.mu.Lock() + defer s.mu.Unlock() + close(ch) + if i := slices.Index(s.watchers, watcher); i >= 0 { + s.watchers = slices.Delete(s.watchers, i, i+1) + } + }() + + for { + select { + case <-ctx.Done(): + return + case resp := <-watcher.ch: + select { + case <-ctx.Done(): + return + case ch <- resp: + } + } + } + }() + return ch +} + +func (s *memoryStoreCore) notify(tp WatchTimerEventType, timerID string) { + resp := WatchTimerResponse{ + Events: []*WatchTimerEvent{ + { + Tp: tp, + TimerID: timerID, + }, + }, + } + + for _, w := range s.watchers { + select { + case <-w.ctx.Done(): + return + case w.ch <- resp: + default: + watcher := w + go func() { + select { + case <-watcher.ctx.Done(): + return + case watcher.ch <- resp: + } + }() + } + } +} diff --git a/timer/api/store.go b/timer/api/store.go index dcee5b9863c25..5bbcb0882d446 100644 --- a/timer/api/store.go +++ b/timer/api/store.go @@ -15,10 +15,13 @@ package api import ( + "context" "reflect" "strings" "time" "unsafe" + + "github.com/pingcap/errors" ) type optionalVal interface { @@ -156,14 +159,35 @@ type TimerUpdate struct { SchedPolicyType OptionalVal[SchedPolicyType] // SchedPolicyExpr indicates to set the timer's `SchedPolicyExpr` field SchedPolicyExpr OptionalVal[string] + // EventStatus indicates the event status + EventStatus OptionalVal[SchedEventStatus] + // EventID indicates to set the timer event id + EventID OptionalVal[string] + // EventData indicates to set the timer event data + EventData OptionalVal[[]byte] + // EventStart indicates the start time of event + EventStart OptionalVal[time.Time] // Watermark indicates to set the timer's `Watermark` field Watermark OptionalVal[time.Time] // SummaryData indicates to set the timer's `Summary` field SummaryData OptionalVal[[]byte] + // CheckVersion indicates to check the timer's version when update + CheckVersion OptionalVal[uint64] + // CheckEventID indicates to check the timer's eventID + CheckEventID OptionalVal[string] } // Apply applies the update to a timer -func (u *TimerUpdate) Apply(record *TimerRecord) error { +func (u *TimerUpdate) Apply(record *TimerRecord) (*TimerRecord, error) { + if v, ok := u.CheckVersion.Get(); ok && record.Version != v { + return nil, errors.Trace(ErrVersionNotMatch) + } + + if v, ok := u.CheckEventID.Get(); ok && record.EventID != v { + return nil, errors.Trace(ErrEventIDNotMatch) + } + + record = record.Clone() if v, ok := u.Enable.Get(); ok { record.Enable = v } @@ -176,6 +200,22 @@ func (u *TimerUpdate) Apply(record *TimerRecord) error { record.SchedPolicyExpr = v } + if v, ok := u.EventStatus.Get(); ok { + record.EventStatus = v + } + + if v, ok := u.EventID.Get(); ok { + record.EventID = v + } + + if v, ok := u.EventData.Get(); ok { + record.EventData = v + } + + if v, ok := u.EventStart.Get(); ok { + record.EventStart = v + } + if v, ok := u.Watermark.Get(); ok { record.Watermark = v } @@ -184,7 +224,7 @@ func (u *TimerUpdate) Apply(record *TimerRecord) error { record.SummaryData = v } - return nil + return record, nil } // FieldsSet returns all fields that has been set exclude excludes @@ -205,3 +245,157 @@ func (u *TimerUpdate) Clear() { return true }) } + +// Cond is an interface to match a timer record +type Cond interface { + // Match returns whether a record match the condition + Match(timer *TimerRecord) bool +} + +// OperatorTp is the operator type of the condition +type OperatorTp int8 + +const ( + // OperatorAnd means 'AND' operator + OperatorAnd OperatorTp = iota + // OperatorOr means 'OR' operator + OperatorOr +) + +// Operator implements Cond +type Operator struct { + // Op indicates the operator type + Op OperatorTp + // Not indicates whether to apply 'NOT' to the condition + Not bool + // Children is the children of the operator + Children []Cond +} + +// And returns a condition `AND(child1, child2, ...)` +func And(children ...Cond) *Operator { + return &Operator{ + Op: OperatorAnd, + Children: children, + } +} + +// Or returns a condition `OR(child1, child2, ...)` +func Or(children ...Cond) *Operator { + return &Operator{ + Op: OperatorOr, + Children: children, + } +} + +// Not returns a condition `NOT(cond)` +func Not(cond Cond) *Operator { + if c, ok := cond.(*Operator); ok { + newCriteria := *c + newCriteria.Not = !c.Not + return &newCriteria + } + return &Operator{ + Op: OperatorAnd, + Not: true, + Children: []Cond{cond}, + } +} + +// Match will return whether the condition match the timer record +func (c *Operator) Match(t *TimerRecord) bool { + switch c.Op { + case OperatorAnd: + for _, item := range c.Children { + if !item.Match(t) { + return c.Not + } + } + return !c.Not + case OperatorOr: + for _, item := range c.Children { + if item.Match(t) { + return !c.Not + } + } + return c.Not + } + return false +} + +// WatchTimerEventType is the type of the watch event +type WatchTimerEventType int8 + +const ( + // WatchTimerEventCreate indicates that a new timer is created + WatchTimerEventCreate WatchTimerEventType = 1 << iota + // WatchTimerEventUpdate indicates that a timer is updated + WatchTimerEventUpdate + // WatchTimerEventDelete indicates that a timer is deleted + WatchTimerEventDelete +) + +// WatchTimerEvent is the watch event object +type WatchTimerEvent struct { + // Tp indicates the event type + Tp WatchTimerEventType + // TimerID indicates the timer id for the event + TimerID string +} + +// WatchTimerResponse is the response of watch +type WatchTimerResponse struct { + // Events contains all events in the response + Events []*WatchTimerEvent +} + +// WatchTimerChan is the chan of the watch timer +type WatchTimerChan <-chan WatchTimerResponse + +// TimerStoreCore is an interface, it contains several core methods of store +type TimerStoreCore interface { + // Create creates a new record. If `record.ID` is empty, an id will be assigned automatically. + // The first return value is the final id of the timer. + Create(ctx context.Context, record *TimerRecord) (string, error) + // List lists the timers that match the condition + List(ctx context.Context, cond Cond) ([]*TimerRecord, error) + // Update updates a timer + Update(ctx context.Context, timerID string, update *TimerUpdate) error + // Delete deletes a timer + Delete(ctx context.Context, timerID string) (bool, error) + // WatchSupported indicates whether watch supported for this store + WatchSupported() bool + // Watch watches all changes of the store. A chan will be returned to receive `WatchTimerResponse` + // The returned chan be closed when the context in the argument is done. + Watch(ctx context.Context) WatchTimerChan +} + +// TimerStore extends TimerStoreCore to provide some extra methods for timer operations +type TimerStore struct { + TimerStoreCore +} + +// GetByID gets a timer by ID. If the timer with the specified ID not exists, +// an error `ErrTimerNotExist` will be returned +func (s *TimerStore) GetByID(ctx context.Context, timerID string) (record *TimerRecord, err error) { + return s.getOneRecord(ctx, &TimerCond{ID: NewOptionalVal(timerID)}) +} + +// GetByKey gets a timer by key. If the timer with the specified key not exists in the namespace, +// an error `ErrTimerNotExist` will be returned +func (s *TimerStore) GetByKey(ctx context.Context, namespace, key string) (record *TimerRecord, err error) { + return s.getOneRecord(ctx, &TimerCond{Namespace: NewOptionalVal(namespace), Key: NewOptionalVal(key)}) +} + +func (s *TimerStore) getOneRecord(ctx context.Context, cond Cond) (record *TimerRecord, err error) { + records, err := s.List(ctx, cond) + if err != nil { + return nil, err + } + + if len(records) == 0 { + return nil, errors.Trace(ErrTimerNotExist) + } + + return records[0], nil +} diff --git a/timer/api/store_test.go b/timer/api/store_test.go index 0847eaf6f7d47..eb2281c15172f 100644 --- a/timer/api/store_test.go +++ b/timer/api/store_test.go @@ -15,10 +15,13 @@ package api import ( + "context" + "reflect" "testing" "time" "unsafe" + "github.com/pingcap/errors" "github.com/stretchr/testify/require" ) @@ -156,7 +159,7 @@ func TestTimerRecordCond(t *testing.T) { require.False(t, cond.Match(tm)) } -func TestTimerUpdate(t *testing.T) { +func TestOperatorCond(t *testing.T) { tm := &TimerRecord{ ID: "123", TimerSpec: TimerSpec{ @@ -165,25 +168,357 @@ func TestTimerUpdate(t *testing.T) { }, } - now := time.Now() - data := []byte("aabbcc") + cond1 := &TimerCond{ID: NewOptionalVal("123")} + cond2 := &TimerCond{ID: NewOptionalVal("456")} + cond3 := &TimerCond{Namespace: NewOptionalVal("n1")} + cond4 := &TimerCond{Namespace: NewOptionalVal("n2")} + + require.True(t, And(cond1, cond3).Match(tm)) + require.False(t, And(cond1, cond2, cond3).Match(tm)) + require.False(t, Or(cond2, cond4).Match(tm)) + require.True(t, Or(cond2, cond1, cond4).Match(tm)) + + require.False(t, Not(And(cond1, cond3)).Match(tm)) + require.True(t, Not(And(cond1, cond2, cond3)).Match(tm)) + require.True(t, Not(Or(cond2, cond4)).Match(tm)) + require.False(t, Not(Or(cond2, cond1, cond4)).Match(tm)) + + require.False(t, Not(cond1).Match(tm)) + require.True(t, Not(cond2).Match(tm)) +} + +func TestTimerUpdate(t *testing.T) { + tpl := TimerRecord{ + ID: "123", + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key", + }, + Version: 567, + } + tm := tpl.Clone() + + // test check version update := &TimerUpdate{ + Enable: NewOptionalVal(true), + CheckVersion: NewOptionalVal(uint64(0)), + } + _, err := update.Apply(tm) + require.Error(t, err) + require.True(t, errors.ErrorEqual(err, ErrVersionNotMatch)) + require.Equal(t, tpl, *tm) + + // test check event id + update = &TimerUpdate{ + Enable: NewOptionalVal(true), + CheckEventID: NewOptionalVal("aa"), + } + _, err = update.Apply(tm) + require.Error(t, err) + require.True(t, errors.ErrorEqual(err, ErrEventIDNotMatch)) + require.Equal(t, tpl, *tm) + + // test apply without check + now := time.Now() + update = &TimerUpdate{ Enable: NewOptionalVal(true), SchedPolicyType: NewOptionalVal(SchedEventInterval), - SchedPolicyExpr: NewOptionalVal("1h"), + SchedPolicyExpr: NewOptionalVal("5h"), Watermark: NewOptionalVal(now), - SummaryData: NewOptionalVal(data), + SummaryData: NewOptionalVal([]byte("summarydata1")), + EventStatus: NewOptionalVal(SchedEventTrigger), + EventID: NewOptionalVal("event1"), + EventData: NewOptionalVal([]byte("eventdata1")), + EventStart: NewOptionalVal(now.Add(time.Second)), } - require.NoError(t, update.Apply(tm)) - require.True(t, tm.Enable) - require.Equal(t, SchedEventInterval, tm.SchedPolicyType) - require.Equal(t, "1h", tm.SchedPolicyExpr) - require.Equal(t, now, tm.Watermark) - require.Equal(t, data, tm.SummaryData) + require.Equal(t, reflect.ValueOf(update).Elem().NumField()-2, len(update.FieldsSet())) + record, err := update.Apply(tm) + require.NoError(t, err) + require.True(t, record.Enable) + require.Equal(t, SchedEventInterval, record.SchedPolicyType) + require.Equal(t, "5h", record.SchedPolicyExpr) + require.Equal(t, now, record.Watermark) + require.Equal(t, []byte("summarydata1"), record.SummaryData) + require.Equal(t, SchedEventTrigger, record.EventStatus) + require.Equal(t, "event1", record.EventID) + require.Equal(t, []byte("eventdata1"), record.EventData) + require.Equal(t, now.Add(time.Second), record.EventStart) + require.Equal(t, tpl, *tm) emptyUpdate := &TimerUpdate{} - tm2 := tm.Clone() - require.NoError(t, emptyUpdate.Apply(tm2)) - require.Equal(t, tm, tm2) + record, err = emptyUpdate.Apply(tm) + require.NoError(t, err) + require.Equal(t, tpl, *record) +} + +func TestMemTimerStore(t *testing.T) { + store := NewMemoryTimerStore() + RunTimerStoreTest(t, store) +} + +func TestMemTimerStoreWatch(t *testing.T) { + store := NewMemoryTimerStore() + RunTimerStoreWatchTest(t, store) +} + +func RunTimerStoreTest(t *testing.T, store *TimerStore) { + ctx := context.Background() + timer := runTimerStoreInsertAndGet(ctx, t, store) + runTimerStoreUpdate(ctx, t, store, timer) + runTimerStoreDelete(ctx, t, store, timer) + runTimerStoreInsertAndList(ctx, t, store) +} + +func runTimerStoreInsertAndGet(ctx context.Context, t *testing.T, store *TimerStore) *TimerRecord { + records, err := store.List(ctx, nil) + require.NoError(t, err) + require.Empty(t, records) + + recordTpl := TimerRecord{ + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "1h", + Data: []byte("data1"), + }, + } + + // normal insert + record := recordTpl.Clone() + id, err := store.Create(ctx, record) + require.NoError(t, err) + require.Equal(t, recordTpl, *record) + require.NotEmpty(t, id) + recordTpl.ID = id + recordTpl.EventStatus = SchedEventIdle + + // get by id + got, err := store.GetByID(ctx, id) + require.NoError(t, err) + require.NotSame(t, record, got) + record = got + require.Equal(t, recordTpl.ID, record.ID) + require.NotZero(t, record.Version) + recordTpl.Version = record.Version + require.False(t, record.CreateTime.IsZero()) + recordTpl.CreateTime = record.CreateTime + require.Equal(t, recordTpl, *record) + + // id not exist + _, err = store.GetByID(ctx, "noexist") + require.True(t, errors.ErrorEqual(err, ErrTimerNotExist)) + + // get by key + record, err = store.GetByKey(ctx, "n1", "/path/to/key") + require.NoError(t, err) + require.Equal(t, recordTpl, *record) + + // key not exist + _, err = store.GetByKey(ctx, "n1", "noexist") + require.True(t, errors.ErrorEqual(err, ErrTimerNotExist)) + _, err = store.GetByKey(ctx, "n2", "/path/to/ke") + require.True(t, errors.ErrorEqual(err, ErrTimerNotExist)) + + // invalid insert + invalid := &TimerRecord{} + _, err = store.Create(ctx, invalid) + require.EqualError(t, err, "field 'Namespace' should not be empty") + + invalid.Namespace = "n1" + _, err = store.Create(ctx, invalid) + require.EqualError(t, err, "field 'Key' should not be empty") + + invalid.Key = "k1" + _, err = store.Create(ctx, invalid) + require.EqualError(t, err, "field 'SchedPolicyType' should not be empty") + + invalid.SchedPolicyType = SchedEventInterval + invalid.SchedPolicyExpr = "1x" + _, err = store.Create(ctx, invalid) + require.EqualError(t, err, "schedule event configuration is not valid: invalid schedule event expr '1x': unknown unit x") + + return &recordTpl +} + +func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *TimerStore, tpl *TimerRecord) { + // normal update + orgRecord, err := store.GetByID(ctx, tpl.ID) + require.NoError(t, err) + require.Equal(t, "1h", tpl.SchedPolicyExpr) + err = store.Update(ctx, tpl.ID, &TimerUpdate{ + SchedPolicyExpr: NewOptionalVal("2h"), + }) + require.NoError(t, err) + record, err := store.GetByID(ctx, tpl.ID) + require.NoError(t, err) + require.NotSame(t, orgRecord, record) + require.Greater(t, record.Version, tpl.Version) + tpl.Version = record.Version + tpl.SchedPolicyExpr = "2h" + require.Equal(t, *tpl, *record) + + // err update + err = store.Update(ctx, tpl.ID, &TimerUpdate{ + SchedPolicyExpr: NewOptionalVal("2x"), + }) + require.EqualError(t, err, "schedule event configuration is not valid: invalid schedule event expr '2x': unknown unit x") + record, err = store.GetByID(ctx, tpl.ID) + require.NoError(t, err) + require.Equal(t, *tpl, *record) +} + +func runTimerStoreDelete(ctx context.Context, t *testing.T, store *TimerStore, tpl *TimerRecord) { + exist, err := store.Delete(ctx, tpl.ID) + require.NoError(t, err) + require.True(t, exist) + + _, err = store.GetByID(ctx, tpl.ID) + require.True(t, errors.ErrorEqual(err, ErrTimerNotExist)) + + exist, err = store.Delete(ctx, tpl.ID) + require.NoError(t, err) + require.False(t, exist) +} + +func runTimerStoreInsertAndList(ctx context.Context, t *testing.T, store *TimerStore) { + records, err := store.List(ctx, nil) + require.NoError(t, err) + require.Empty(t, records) + + recordTpl1 := TimerRecord{ + ID: "id1", + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key1", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "1h", + }, + Version: 1, + CreateTime: time.Now(), + EventStatus: SchedEventIdle, + } + + recordTpl2 := TimerRecord{ + ID: "id2", + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key2", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "2h", + }, + Version: 2, + CreateTime: time.Now(), + EventStatus: SchedEventIdle, + } + + recordTpl3 := TimerRecord{ + ID: "id3", + TimerSpec: TimerSpec{ + Namespace: "n2", + Key: "/path/to/another", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "3h", + }, + Version: 3, + CreateTime: time.Now(), + EventStatus: SchedEventIdle, + } + + id, err := store.Create(ctx, &recordTpl1) + require.NoError(t, err) + require.Equal(t, recordTpl1.ID, id) + + id, err = store.Create(ctx, &recordTpl2) + require.NoError(t, err) + require.Equal(t, recordTpl2.ID, id) + + id, err = store.Create(ctx, &recordTpl3) + require.NoError(t, err) + require.Equal(t, recordTpl3.ID, id) + + checkList := func(expected []*TimerRecord, list []*TimerRecord) { + expectedMap := make(map[string]*TimerRecord, len(expected)) + for _, r := range expected { + expectedMap[r.ID] = r + } + + for _, r := range list { + require.Contains(t, expectedMap, r.ID) + got, ok := expectedMap[r.ID] + require.True(t, ok) + require.Equal(t, *got, *r) + delete(expectedMap, r.ID) + } + + require.Empty(t, expectedMap) + } + + timers, err := store.List(ctx, nil) + require.NoError(t, err) + checkList([]*TimerRecord{&recordTpl1, &recordTpl2, &recordTpl3}, timers) + + timers, err = store.List(ctx, &TimerCond{ + Key: NewOptionalVal("/path/to/k"), + KeyPrefix: true, + }) + require.NoError(t, err) + checkList([]*TimerRecord{&recordTpl1, &recordTpl2}, timers) +} + +func RunTimerStoreWatchTest(t *testing.T, store *TimerStore) { + require.True(t, store.WatchSupported()) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + timer := TimerRecord{ + TimerSpec: TimerSpec{ + Namespace: "n1", + Key: "/path/to/key", + SchedPolicyType: SchedEventInterval, + SchedPolicyExpr: "1h", + Data: []byte("data1"), + }, + } + + ch := store.Watch(ctx) + assertWatchEvent := func(tp WatchTimerEventType, id string) { + timeout := time.NewTimer(time.Second) + defer timeout.Stop() + select { + case resp, ok := <-ch: + if id == "" { + require.False(t, ok) + return + } + require.True(t, ok) + require.NotNil(t, resp) + require.Equal(t, 1, len(resp.Events)) + require.Equal(t, tp, resp.Events[0].Tp) + require.Equal(t, id, resp.Events[0].TimerID) + case <-timeout.C: + require.FailNow(t, "no response") + } + } + + id, err := store.Create(ctx, &timer) + require.NoError(t, err) + assertWatchEvent(WatchTimerEventCreate, id) + + err = store.Update(ctx, id, &TimerUpdate{ + SchedPolicyExpr: NewOptionalVal("2h"), + }) + require.NoError(t, err) + assertWatchEvent(WatchTimerEventUpdate, id) + + exit, err := store.Delete(ctx, id) + require.NoError(t, err) + require.True(t, exit) + assertWatchEvent(WatchTimerEventDelete, id) + + cancel() + assertWatchEvent(0, "") } diff --git a/timer/api/timer.go b/timer/api/timer.go index 61e657fca5c9a..0915aadf478d6 100644 --- a/timer/api/timer.go +++ b/timer/api/timer.go @@ -92,11 +92,15 @@ func (t *TimerSpec) Clone() *TimerSpec { // Validate validates the TimerSpec func (t *TimerSpec) Validate() error { if t.Namespace == "" { - return errors.New("field 'namespace' should not be empty") + return errors.New("field 'Namespace' should not be empty") } if t.Key == "" { - return errors.New("field 'key' should not be empty") + return errors.New("field 'Key' should not be empty") + } + + if t.SchedPolicyType == "" { + return errors.New("field 'SchedPolicyType' should not be empty") } if _, err := t.CreateSchedEventPolicy(); err != nil { @@ -112,7 +116,7 @@ func (t *TimerSpec) CreateSchedEventPolicy() (SchedEventPolicy, error) { case SchedEventInterval: return NewSchedIntervalPolicy(t.SchedPolicyExpr) default: - return nil, errors.Errorf("invalid schedule event type: '%s'", t.SchedPolicyExpr) + return nil, errors.Errorf("invalid schedule event type: '%s'", t.SchedPolicyType) } } @@ -149,6 +153,8 @@ type TimerRecord struct { SummaryData []byte // CreateTime is the creation time of the timer CreateTime time.Time + // Version is the version of the record, when the record updated, version will be increased. + Version uint64 } // Clone returns a cloned TimerRecord diff --git a/timer/api/timer_test.go b/timer/api/timer_test.go new file mode 100644 index 0000000000000..8e233d9bc1061 --- /dev/null +++ b/timer/api/timer_test.go @@ -0,0 +1,48 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTimerValidate(t *testing.T) { + // invalid insert + record := &TimerRecord{} + err := record.Validate() + require.EqualError(t, err, "field 'Namespace' should not be empty") + + record.Namespace = "n1" + err = record.Validate() + require.EqualError(t, err, "field 'Key' should not be empty") + + record.Key = "k1" + err = record.Validate() + require.EqualError(t, err, "field 'SchedPolicyType' should not be empty") + + record.SchedPolicyType = "aa" + err = record.Validate() + require.EqualError(t, err, "schedule event configuration is not valid: invalid schedule event type: 'aa'") + + record.SchedPolicyType = SchedEventInterval + record.SchedPolicyExpr = "1x" + err = record.Validate() + require.EqualError(t, err, "schedule event configuration is not valid: invalid schedule event expr '1x': unknown unit x") + + record.SchedPolicyExpr = "1h" + require.Nil(t, record.Validate()) +}