Skip to content

Commit

Permalink
logservice: fix remove dispatcher (#561)
Browse files Browse the repository at this point in the history
* logservice: fix remove dispatcher

* small fix
  • Loading branch information
lidezhu authored Nov 19, 2024
1 parent 4fe5920 commit 1255b89
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
17 changes: 12 additions & 5 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventstore
import (
"context"
"fmt"
"math"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -54,7 +55,7 @@ type EventStore interface {

UpdateDispatcherCheckpointTs(dispatcherID common.DispatcherID, checkpointTs uint64) error

GetDispatcherDMLEventState(dispatcherID common.DispatcherID) DMLEventState
GetDispatcherDMLEventState(dispatcherID common.DispatcherID) (bool, DMLEventState)

// return an iterator which scan the data in ts range (dataRange.StartTs, dataRange.EndTs]
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error)
Expand Down Expand Up @@ -583,15 +584,19 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
return nil
}

func (e *eventStore) GetDispatcherDMLEventState(dispatcherID common.DispatcherID) DMLEventState {
func (e *eventStore) GetDispatcherDMLEventState(dispatcherID common.DispatcherID) (bool, DMLEventState) {
e.dispatcherMeta.RLock()
defer e.dispatcherMeta.RUnlock()
stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID]
if !ok {
log.Panic("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID))
log.Warn("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID))
return false, DMLEventState{
// ResolvedTs: subscriptionStat.resolvedTs,
MaxEventCommitTs: math.MaxUint64,
}
}
subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID]
return DMLEventState{
return true, DMLEventState{
// ResolvedTs: subscriptionStat.resolvedTs,
MaxEventCommitTs: subscriptionStat.maxEventCommitTs,
}
Expand All @@ -601,7 +606,9 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
e.dispatcherMeta.RLock()
stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID]
if !ok {
log.Panic("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID))
log.Warn("fail to find dispatcher", zap.Any("dispatcherID", dispatcherID))
e.dispatcherMeta.RUnlock()
return nil, nil
}
subscriptionStat := e.dispatcherMeta.subscriptionStats[stat.subID]
if dataRange.StartTs < subscriptionStat.checkpointTs {
Expand Down
20 changes: 14 additions & 6 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) {
}

// target ts range: (dataRange.StartTs, dataRange.EndTs]
dmlState := c.eventStore.GetDispatcherDMLEventState(task.dispatcherStat.id)
ok, dmlState := c.eventStore.GetDispatcherDMLEventState(task.dispatcherStat.id)
if !ok {
return false, dataRange
}
if dataRange.StartTs >= dmlState.MaxEventCommitTs && dataRange.StartTs >= ddlState.MaxEventCommitTs {
// The dispatcher has no new events. In such case, we don't need to scan the event store.
// We just send the watermark to the dispatcher.
Expand Down Expand Up @@ -437,6 +440,16 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
log.Panic("get ddl events failed", zap.Error(err))
}

//2. Get event iterator from eventStore.
iter, err := c.eventStore.GetIterator(dispatcherID, dataRange)
if err != nil {
log.Panic("read events failed", zap.Error(err))
}
// TODO: use error to indicate the dispatcher is removed
if iter == nil {
return
}

// After all the events are sent, we need to
// drain the ddlEvents and wake up the dispatcher.
defer func() {
Expand All @@ -451,11 +464,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
task.dispatcherStat.metricEventServiceSendResolvedTsCount)
}()

//2. Get event iterator from eventStore.
iter, err := c.eventStore.GetIterator(dispatcherID, dataRange)
if err != nil {
log.Panic("read events failed", zap.Error(err))
}
defer func() {
eventCount, _ := iter.Close()
if eventCount != 0 {
Expand Down

0 comments on commit 1255b89

Please sign in to comment.