Skip to content

Commit

Permalink
[#12783] Improve order of mailserver requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Feb 1, 2022
1 parent ac31ccc commit 5693bc6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 11 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.93.2
0.93.3
2 changes: 1 addition & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3736,7 +3736,7 @@ func (m *Messenger) markAllRead(chatID string, clock uint64, shouldBeSynced bool

// TODO(samyoul) remove storing of an updated reference pointer?
m.allChats.Store(chat.ID, chat)
return nil
return m.persistence.SaveChats([]*Chat{chat})
}

func (m *Messenger) MarkAllRead(chatID string) error {
Expand Down
85 changes: 76 additions & 9 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package protocol
import (
"context"
"fmt"
"sort"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -211,6 +212,22 @@ func (m *Messenger) capToDefaultSyncPeriod(period uint32) (uint32, error) {
return period - tolerance, nil
}

func (m *Messenger) updateFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
chatID := filter.ChatID
chat := m.Chat(chatID)
if chat != nil {
filter.Priority = chat.ReadMessagesAtClockValue
}
}
}

func (m *Messenger) resetFiltersPriority(filters []*transport.Filter) {
for _, filter := range filters {
filter.Priority = 0
}
}

// RequestAllHistoricMessages requests all the historic messages for any topic
func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
shouldSync, err := m.shouldSync()
Expand All @@ -236,7 +253,14 @@ func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
m.logger.Info("backup fetched")
}

return m.syncFilters(m.transport.Filters())
filters := m.transport.Filters()
m.updateFiltersPriority(filters)
defer m.resetFiltersPriority(filters)
return m.syncFilters(filters)
}

func getPrioritizedBatches() []int {
return []int{1, 5, 10}
}

func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse, error) {
Expand All @@ -255,6 +279,24 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

to := m.calculateMailserverTo()
var syncedTopics []mailservers.MailserverTopic

sort.Slice(filters[:], func(i, j int) bool {
p1 := filters[i].Priority
p2 := filters[j].Priority
return p1 > p2
})
prioritizedBatches := getPrioritizedBatches()
currentBatch := 0

if filters[0].Priority == 0 {
currentBatch = len(prioritizedBatches)
}

defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}

for _, filter := range filters {
if !filter.Listen || filter.Ephemeral {
continue
Expand All @@ -270,17 +312,35 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

topicData, ok := topicsData[filter.Topic.String()]
if !ok {
lastRequest, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}
topicData = mailservers.MailserverTopic{
Topic: filter.Topic.String(),
LastRequest: int(lastRequest),
LastRequest: int(defaultPeriodFromNow),
}
}

batch, ok := batches[topicData.LastRequest]
batchID := topicData.LastRequest

if currentBatch < len(prioritizedBatches) {
batch, ok := batches[currentBatch]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
currentBatch++
}
}
if currentBatch < len(prioritizedBatches) {
batchID = currentBatch
currentBatchCap := prioritizedBatches[currentBatch] - 1
if currentBatchCap == 0 {
currentBatch++
} else {
prioritizedBatches[currentBatch] = currentBatchCap
}
}
}

batch, ok := batches[batchID]
if !ok {
from, err := m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
if err != nil {
Expand All @@ -292,7 +352,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.Topics = append(batch.Topics, filter.Topic)
batches[topicData.LastRequest] = batch
batches[batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
Expand All @@ -306,8 +366,15 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches))
}

batchKeys := make([]int, 0, len(batches))
for k := range batches {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys)

i := 0
for _, batch := range batches {
for _, k := range batchKeys {
batch := batches[k]
i++
err := m.processMailserverBatch(batch)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions protocol/transport/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Filter struct {
Listen bool `json:"listen"`
// Ephemeral indicates that this is an ephemeral filter
Ephemeral bool `json:"ephemeral"`
// Priority
Priority uint64
}

func (c *Filter) IsPublic() bool {
Expand Down

0 comments on commit 5693bc6

Please sign in to comment.