Skip to content

Commit

Permalink
optimize memory allocate (#555)
Browse files Browse the repository at this point in the history
* update

* revert
  • Loading branch information
wk989898 authored Nov 19, 2024
1 parent 055eeec commit 4fe5920
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
2 changes: 1 addition & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error {
Subscriptions: make(map[int64]*logservicepb.SubscriptionStates),
}
for tableID, dispatcherIDs := range e.dispatcherMeta.tableToDispatchers {
subStates := make([]*logservicepb.SubscriptionState, 0)
subStates := make([]*logservicepb.SubscriptionState, 0, len(dispatcherIDs))
subIDs := make(map[logpuller.SubscriptionID]bool)
for dispatcherID := range dispatcherIDs {
dispatcherStat := e.dispatcherMeta.dispatcherStats[dispatcherID]
Expand Down
14 changes: 11 additions & 3 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type regionRequestWorker struct {

// used to indicate whether there are new pending ts events for every processores in dispatchResolvedTs
boolCache []bool
// used to avoid repeated hash calculation in dispatchResolvedTs
slotCache map[uint64]int
}

func newRegionRequestWorker(
Expand All @@ -83,6 +85,7 @@ func newRegionRequestWorker(
requestsCh: make(chan regionInfo, 256), // 256 is an arbitrary number.

boolCache: make([]bool, len(client.changeEventProcessors)),
slotCache: make(map[uint64]int),
}
worker.requestedRegions.subscriptions = make(map[SubscriptionID]regionFeedStates)
worker.tsBatches.events = make([]statefulEvent, len(client.changeEventProcessors))
Expand Down Expand Up @@ -432,13 +435,18 @@ func (s *regionRequestWorker) dispatchResolvedTs(resolvedTs *cdcpb.ResolvedTs) e
for i := range s.boolCache {
s.boolCache[i] = false
}
regionsLen := len(resolvedTs.Regions)/len(s.client.changeEventProcessors) + 1 // an average number
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTs.Regions)))
for _, regionID := range resolvedTs.Regions {
slot := hashRegionID(regionID, len(s.client.changeEventProcessors))
for i, regionID := range resolvedTs.Regions {
// We suppose that the length of changeEventProcessors is constant, so we can cache the hashes of the region
if _, exist := s.slotCache[regionID]; !exist {
s.slotCache[regionID] = hashRegionID(regionID, len(s.client.changeEventProcessors))
}
slot := s.slotCache[regionID]
if !s.boolCache[slot] {
s.tsBatches.events[slot].resolvedTsBatches = append(s.tsBatches.events[slot].resolvedTsBatches, resolvedTsBatch{
ts: resolvedTs.Ts,
regions: make([]*regionFeedState, 0, len(resolvedTs.Regions)),
regions: make([]*regionFeedState, 0, min(len(resolvedTs.Regions)-i, regionsLen)),
})
s.boolCache[slot] = true
}
Expand Down

0 comments on commit 4fe5920

Please sign in to comment.