Skip to content

Commit

Permalink
_
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Jan 31, 2022
1 parent 0de0fd5 commit 445656e
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,22 @@ func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) {
return period - tolerance, nil
}

func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
chatID := filter.ChatID
chat := m.Chat(chatID)
if chat != nil {
filter.Priority = chat.ReadMessagesAtClockValue
}
}
}

func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
filter.Priority = 0
}
}

// RequestAllHistoricMessages requests all the historic messages for any topic
func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
shouldSync, err := m.shouldSync()
Expand All @@ -237,7 +253,14 @@ func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
m.logger.Info("backup fetched")
}

return m.syncFilters(m.transport.Filters())
filters := m.transport.Filters()
m.updateFiltersPriority(filters)
defer m.resetFiltersPriority(filters)
return m.syncFilters(filters)
}

func getPrioritizedBatches() []int {
return []int{1, 5, 10}
}

func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse, error) {
Expand All @@ -262,14 +285,11 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
p2 := filters[j].Priority
return p1 > p2
})
prioritizedBatches := getPrioritizedBatches()
currentBatch := 0

firstBatchCap := 1
secondBatchCap := 5
thirdBatchCap := 10
if filters[0].Priority == 0 {
firstBatchCap = 0
secondBatchCap = 0
thirdBatchCap = 0
currentBatch = len(prioritizedBatches)
}

defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow()
Expand Down Expand Up @@ -299,35 +319,25 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
}

batchID := topicData.LastRequest
if firstBatchCap > 0 {
batchID = 0
firstBatchCap--
} else if secondBatchCap > 0 {
batch, ok := batches[1]

if currentBatch < len(prioritizedBatches) {
batch, ok := batches[currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
secondBatchCap = 0
currentBatch++
}
}
if secondBatchCap > 0 {
batchID = 1
secondBatchCap--
}
} else if thirdBatchCap > 0 {
batch, ok := batches[2]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
prevTopicData.LastRequest != topicData.LastRequest {
thirdBatchCap = 0
if currentBatch < len(prioritizedBatches) {
batchID = currentBatch
currentBatchCap := prioritizedBatches[currentBatch] - 1
if currentBatchCap == 0 {
currentBatch++
} else {
prioritizedBatches[currentBatch] = currentBatchCap
}
}
if thirdBatchCap > 0 {
batchID = 2
thirdBatchCap--
}
}

batch, ok := batches[batchID]
Expand Down Expand Up @@ -365,7 +375,6 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
i := 0
for _, k := range batchKeys {
batch := batches[k]
m.logger.Info("BAAAATCH", zap.Any("BATCHID", k))
i++
err := m.processMailserverBatch(batch)
if err != nil {
Expand Down

0 comments on commit 445656e

Please sign in to comment.