Skip to content

Commit

Permalink
[#12783] Improve order of mailserver requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Jan 28, 2022
1 parent ac31ccc commit 0de0fd5
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 33 deletions.
17 changes: 9 additions & 8 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,8 +1173,8 @@ func (m *Messenger) Init() error {
logger := m.logger.With(zap.String("site", "Init"))

var (
publicChatIDs []string
publicKeys []*ecdsa.PublicKey
publicChatIDs []*transport.ChatIDWithPriority
publicKeys []*transport.PublicKeyWithPriority
)

joinedCommunities, err := m.communitiesManager.Joined()
Expand All @@ -1183,7 +1183,7 @@ func (m *Messenger) Init() error {
}
for _, org := range joinedCommunities {
// the org advertise on the public topic derived by the pk
publicChatIDs = append(publicChatIDs, org.IDString())
publicChatIDs = append(publicChatIDs, &transport.ChatIDWithPriority{ChatID: org.IDString()})
}

// Init filters for the communities we are an admin of
Expand Down Expand Up @@ -1222,22 +1222,22 @@ func (m *Messenger) Init() error {

switch chat.ChatType {
case ChatTypePublic, ChatTypeProfile:
publicChatIDs = append(publicChatIDs, chat.ID)
publicChatIDs = append(publicChatIDs, &transport.ChatIDWithPriority{ChatID: chat.ID, Priority: chat.ReadMessagesAtClockValue})
case ChatTypeCommunityChat:
publicChatIDs = append(publicChatIDs, chat.ID)
publicChatIDs = append(publicChatIDs, &transport.ChatIDWithPriority{ChatID: chat.ID, Priority: chat.ReadMessagesAtClockValue})
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
return err
}
publicKeys = append(publicKeys, pk)
publicKeys = append(publicKeys, &transport.PublicKeyWithPriority{PublicKey: pk, Priority: chat.ReadMessagesAtClockValue})
case ChatTypePrivateGroupChat:
for _, member := range chat.Members {
publicKey, err := member.PublicKey()
if err != nil {
return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name)
}
publicKeys = append(publicKeys, publicKey)
publicKeys = append(publicKeys, &transport.PublicKeyWithPriority{PublicKey: publicKey, Priority: chat.ReadMessagesAtClockValue})
}
default:
return errors.New("invalid chat type")
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func (m *Messenger) Init() error {
logger.Error("failed to get contact's public key", zap.Error(err))
continue
}
publicKeys = append(publicKeys, publicKey)
publicKeys = append(publicKeys, &transport.PublicKeyWithPriority{PublicKey: publicKey})
}

installations, err := m.encryptor.GetOurInstallations(&m.identity.PublicKey)
Expand Down Expand Up @@ -3736,6 +3736,7 @@ func (m *Messenger) markAllRead(chatID string, clock uint64, shouldBeSynced bool

// TODO(samyoul) remove storing of an updated reference pointer?
m.allChats.Store(chat.ID, chat)
m.persistence.SaveChats([]*Chat{chat})
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions protocol/messenger_contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package protocol

import (
"context"
"crypto/ecdsa"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -160,7 +159,10 @@ func (m *Messenger) AddContact(ctx context.Context, request *requests.AddContact

response.AddChat(profileChat)

_, err = m.transport.InitFilters([]string{profileChat.ID}, []*ecdsa.PublicKey{publicKey})
_, err = m.transport.InitFilters(
[]*transport.ChatIDWithPriority{{ChatID: profileChat.ID}},
[]*transport.PublicKeyWithPriority{{PublicKey: publicKey}},
)
if err != nil {
return nil, err
}
Expand Down
74 changes: 66 additions & 8 deletions protocol/messenger_mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package protocol
import (
"context"
"fmt"
"sort"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -255,6 +256,27 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

to := m.calculateMailserverTo()
var syncedTopics []mailservers.MailserverTopic

sort.Slice(filters[:], func(i, j int) bool {
p1 := filters[i].Priority
p2 := filters[j].Priority
return p1 > p2
})

firstBatchCap := 1
secondBatchCap := 5
thirdBatchCap := 10
if filters[0].Priority == 0 {
firstBatchCap = 0
secondBatchCap = 0
thirdBatchCap = 0
}

defaultPeriodFromNow, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}

for _, filter := range filters {
if !filter.Listen || filter.Ephemeral {
continue
Expand All @@ -270,17 +292,45 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

topicData, ok := topicsData[filter.Topic.String()]
if !ok {
lastRequest, err := m.defaultSyncPeriodFromNow()
if err != nil {
return nil, err
}
topicData = mailservers.MailserverTopic{
Topic: filter.Topic.String(),
LastRequest: int(lastRequest),
LastRequest: int(defaultPeriodFromNow),
}
}

batchID := topicData.LastRequest
if firstBatchCap > 0 {
batchID = 0
firstBatchCap--
} else if secondBatchCap > 0 {
batch, ok := batches[1]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
(ok && prevTopicData.LastRequest != topicData.LastRequest) {
secondBatchCap = 0
}
}
if secondBatchCap > 0 {
batchID = 1
secondBatchCap--
}
} else if thirdBatchCap > 0 {
batch, ok := batches[2]
if ok {
prevTopicData, ok := topicsData[batch.Topics[0].String()]
if (!ok && topicData.LastRequest != int(defaultPeriodFromNow)) ||
prevTopicData.LastRequest != topicData.LastRequest {
thirdBatchCap = 0
}
}
if thirdBatchCap > 0 {
batchID = 2
thirdBatchCap--
}
}

batch, ok := batches[topicData.LastRequest]
batch, ok := batches[batchID]
if !ok {
from, err := m.capToDefaultSyncPeriod(uint32(topicData.LastRequest))
if err != nil {
Expand All @@ -292,7 +342,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse

batch.ChatIDs = append(batch.ChatIDs, chatID)
batch.Topics = append(batch.Topics, filter.Topic)
batches[topicData.LastRequest] = batch
batches[batchID] = batch
// Set last request to the new `to`
topicData.LastRequest = int(to)
syncedTopics = append(syncedTopics, topicData)
Expand All @@ -306,8 +356,16 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches))
}

batchKeys := make([]int, 0, len(batches))
for k := range batches {
batchKeys = append(batchKeys, k)
}
sort.Ints(batchKeys)

i := 0
for _, batch := range batches {
for _, k := range batchKeys {
batch := batches[k]
m.logger.Info("BAAAATCH", zap.Any("BATCHID", k))
i++
err := m.processMailserverBatch(batch)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions protocol/transport/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Filter struct {
Listen bool `json:"listen"`
// Ephemeral indicates that this is an ephemeral filter
Ephemeral bool `json:"ephemeral"`
// Priority
Priority uint64
}

func (c *Filter) IsPublic() bool {
Expand Down
52 changes: 38 additions & 14 deletions protocol/transport/filters_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,23 @@ func NewFiltersManager(persistence KeysPersistence, service FiltersService, priv
}, nil
}

type PublicKeyWithPriority struct {
PublicKey *ecdsa.PublicKey
Priority uint64
}

type ChatIDWithPriority struct {
ChatID string
Priority uint64
}

func (f *FiltersManager) Init(
chatIDs []string,
publicKeys []*ecdsa.PublicKey,
chatIDs []*ChatIDWithPriority,
publicKeys []*PublicKeyWithPriority,
) ([]*Filter, error) {

// foobar

// Load our contact code.
_, err := f.LoadContactCode(&f.privateKey.PublicKey)
if err != nil {
Expand All @@ -96,14 +108,14 @@ func (f *FiltersManager) Init(

// Add public, one-to-one and negotiated filters.
for _, chatID := range chatIDs {
_, err := f.LoadPublic(chatID)
_, err := f.LoadPublicWithPriority(chatID)
if err != nil {
return nil, err
}
}

for _, publicKey := range publicKeys {
_, err := f.LoadContactCode(publicKey)
_, err := f.LoadContactCodeWithPriority(publicKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,8 +177,8 @@ func (f *FiltersManager) InitCommunityFilters(pks []*ecdsa.PrivateKey) ([]*Filte
// DEPRECATED
func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
chatIDs []*ChatIDWithPriority
publicKeys []*PublicKeyWithPriority
)

for _, filter := range filters {
Expand All @@ -175,9 +187,9 @@ func (f *FiltersManager) InitWithFilters(filters []*Filter) ([]*Filter, error) {
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
publicKeys = append(publicKeys, &PublicKeyWithPriority{PublicKey: publicKey})
} else if filter.ChatID != "" {
chatIDs = append(chatIDs, filter.ChatID)
chatIDs = append(chatIDs, &ChatIDWithPriority{ChatID: filter.ChatID})
}
}

Expand Down Expand Up @@ -481,38 +493,49 @@ func (f *FiltersManager) PersonalTopicFilter() *Filter {

// LoadPublic adds a filter for a public chat.
func (f *FiltersManager) LoadPublic(chatID string) (*Filter, error) {
return f.LoadPublicWithPriority(&ChatIDWithPriority{ChatID: chatID})
}

// LoadPublicWithPriority adds a filter for a public chat.
func (f *FiltersManager) LoadPublicWithPriority(chatID *ChatIDWithPriority) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

if chat, ok := f.filters[chatID]; ok {
if chat, ok := f.filters[chatID.ChatID]; ok {
return chat, nil
}

filterAndTopic, err := f.addSymmetric(chatID)
filterAndTopic, err := f.addSymmetric(chatID.ChatID)
if err != nil {
return nil, err
}

chat := &Filter{
ChatID: chatID,
ChatID: chatID.ChatID,
FilterID: filterAndTopic.FilterID,
SymKeyID: filterAndTopic.SymKeyID,
Topic: filterAndTopic.Topic,
Listen: true,
OneToOne: false,
Priority: chatID.Priority,
}

f.filters[chatID] = chat
f.filters[chatID.ChatID] = chat

return chat, nil
}

// LoadContactCode creates a filter for the advertise topic for a given public key.
func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, error) {
return f.LoadContactCodeWithPriority(&PublicKeyWithPriority{PublicKey: pubKey})
}

// LoadContactCodeWithPriority creates a filter for the advertise topic for a given public key.
func (f *FiltersManager) LoadContactCodeWithPriority(pubKey *PublicKeyWithPriority) (*Filter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()

chatID := ContactCodeTopic(pubKey)
chatID := ContactCodeTopic(pubKey.PublicKey)

if _, ok := f.filters[chatID]; ok {
return f.filters[chatID], nil
Expand All @@ -528,8 +551,9 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
FilterID: contactCodeFilter.FilterID,
Topic: contactCodeFilter.Topic,
SymKeyID: contactCodeFilter.SymKeyID,
Identity: PublicKeyToStr(pubKey),
Identity: PublicKeyToStr(pubKey.PublicKey),
Listen: true,
Priority: pubKey.Priority,
}

f.filters[chatID] = chat
Expand Down
2 changes: 1 addition & 1 deletion protocol/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewTransport(
return t, nil
}

func (t *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*Filter, error) {
func (t *Transport) InitFilters(chatIDs []*ChatIDWithPriority, publicKeys []*PublicKeyWithPriority) ([]*Filter, error) {
return t.filters.Init(chatIDs, publicKeys)
}

Expand Down

0 comments on commit 0de0fd5

Please sign in to comment.