From 908ba987b2764a26d5f66d013808514f576141f8 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Mon, 5 Aug 2024 14:27:37 +0300 Subject: [PATCH] Add backfill stress test support --- .gitignore | 1 + pkg/connector/backfill.go | 67 +++++++++++++++++++ pkg/connector/client.go | 11 +--- pkg/connector/commands.go | 50 +-------------- pkg/connector/connector.go | 103 +++++++++++++++++++++++++++++- pkg/connector/example-config.yaml | 20 ++++++ pkg/connector/generators.go | 75 ++++++++++++++++++++++ 7 files changed, 267 insertions(+), 60 deletions(-) create mode 100644 pkg/connector/backfill.go create mode 100644 pkg/connector/example-config.yaml create mode 100644 pkg/connector/generators.go diff --git a/.gitignore b/.gitignore index 8bd068f..e3d9609 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.db* *.yaml *.log +!example-config.yaml diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go new file mode 100644 index 0000000..c68594a --- /dev/null +++ b/pkg/connector/backfill.go @@ -0,0 +1,67 @@ +package connector + +import ( + "context" + "fmt" + "math/rand" + "time" + + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/event" +) + +var _ bridgev2.BackfillingNetworkAPI = (*DummyClient)(nil) + +func (dc *DummyClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (resp *bridgev2.FetchMessagesResponse, err error) { + resp = &bridgev2.FetchMessagesResponse{} + + if !dc.UserLogin.Bridge.Config.Backfill.Enabled { + return + } else if fetchParams.Portal == nil { + return + } else if time.Now().After(dc.Connector.Started.Add(dc.Connector.Config.Automation.Backfill.Timelimit)) { + return + } + + tsMassage := -time.Second + if fetchParams.Forward { + tsMassage *= -1 + } + nextTs := time.Now() + if fetchParams.AnchorMessage != nil { + nextTs = fetchParams.AnchorMessage.Timestamp.Add(tsMassage) + } + + for i := 0; i < fetchParams.Count; i++ { + sender := stablePortalUserIDByIndex(fetchParams.Portal.ID, rand.Intn(dc.Connector.Config.Automation.Portals.Members)) + _, err := dc.UserLogin.Bridge.GetGhostByID(ctx, sender) + if err != nil { + return nil, fmt.Errorf("failed to get ghost by id: %w", err) + } + + msg := bridgev2.BackfillMessage{ + ID: randomMessageID(), + ConvertedMessage: &bridgev2.ConvertedMessage{ + Parts: []*bridgev2.ConvertedMessagePart{ + { + Type: event.EventMessage, + Content: &event.MessageEventContent{ + Body: string(sender), + }, + }, + }, + }, + Timestamp: nextTs, + Sender: bridgev2.EventSender{ + Sender: sender, + }, + } + + resp.Messages = append(resp.Messages, &msg) + nextTs = msg.Timestamp.Add(tsMassage) + } + + // always claim we have more until timelimit is hit + resp.HasMore = true + return +} diff --git a/pkg/connector/client.go b/pkg/connector/client.go index f82279c..8434885 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -3,10 +3,8 @@ package connector import ( "context" "fmt" - "strings" "go.mau.fi/util/ptr" - "go.mau.fi/util/random" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" @@ -16,14 +14,7 @@ import ( type DummyClient struct { UserLogin *bridgev2.UserLogin -} - -func randomPortalID() networkid.PortalID { - return networkid.PortalID(strings.ToLower(random.String(32))) -} - -func randomUserID() networkid.UserID { - return networkid.UserID(strings.ToLower(random.String(32))) + Connector *DummyConnector } var _ bridgev2.NetworkAPI = (*DummyClient)(nil) diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go index b873df7..7f0956b 100644 --- a/pkg/connector/commands.go +++ b/pkg/connector/commands.go @@ -3,10 +3,7 @@ package connector import ( "strconv" - "go.mau.fi/util/ptr" - "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/commands" - "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/event" ) @@ -17,57 +14,14 @@ var AllCommands = []commands.CommandHandler{ } var DummyHelpsection = commands.HelpSection{ - Name: "Dummy", + Name: "Dummy", Order: 99, } var NewRoomCommand = &commands.FullHandler{ Func: func(e *commands.Event) { login := e.User.GetDefaultLogin() - portalID := randomPortalID() - portalKey := networkid.PortalKey{ - ID: portalID, - Receiver: login.ID, - } - - portal, err := e.Bridge.GetPortalByKey(e.Ctx, portalKey) - if err != nil { - e.Reply(err.Error()) - return - } - - chatInfo := bridgev2.ChatInfo{ - Members: &bridgev2.ChatMemberList{ - Members: []bridgev2.ChatMember{ - { - EventSender: bridgev2.EventSender{ - IsFromMe: true, - Sender: networkid.UserID(login.ID), - }, - Membership: event.MembershipJoin, - PowerLevel: ptr.Ptr(100), - }, - }, - }, - } - - for i := 0; i < 10; i++ { - userID := randomUserID() - _, err := e.Bridge.GetGhostByID(e.Ctx, userID) - if err != nil { - e.Reply(err.Error()) - return - } - - chatInfo.Members.Members = append(chatInfo.Members.Members, bridgev2.ChatMember{ - EventSender: bridgev2.EventSender{ - Sender: userID, - }, - Membership: event.MembershipJoin, - }) - } - - err = portal.CreateMatrixRoom(e.Ctx, login, &chatInfo) + portal, err := generatePortal(e.Ctx, e.Bridge, login, 1) if err != nil { e.Reply(err.Error()) return diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 7ee40ef..1c06025 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -2,15 +2,37 @@ package connector import ( "context" + _ "embed" + "time" "go.mau.fi/util/configupgrade" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/commands" "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/id" ) type DummyConnector struct { - br *bridgev2.Bridge + br *bridgev2.Bridge + Started time.Time + + Config Config +} + +type Config struct { + Automation struct { + OpenManagementRoom bool `yaml:"open_management_room"` + Login bool `yaml:"login"` + Portals struct { + Count int `yaml:"count"` + Members int `yaml:"members"` + Messages int `yaml:"messages"` + } `yaml:"portals"` + Backfill struct { + Timelimit time.Duration `yaml:"timelimit"` + } `yaml:"backfill"` + } `yaml:"automation"` } var _ bridgev2.NetworkConnector = (*DummyConnector)(nil) @@ -20,7 +42,71 @@ func (dc *DummyConnector) Init(bridge *bridgev2.Bridge) { bridge.Commands.(*commands.Processor).AddHandlers(AllCommands...) } +func (dc *DummyConnector) startupAutomation(ctx context.Context) error { + portals, err := dc.br.DB.Portal.GetAll(ctx) + if err != nil { + return err + } + + for userID, perm := range dc.br.Config.Permissions { + // only do things for admins + if !perm.Admin { + continue + } + + log := dc.br.Log.With().Str("phase", "automation").Str("user_id", userID).Logger() + log.Info().Msg("Doing startup automation for user") + + // FIXME: t check if this is a valid mxid and not a pattern + user, err := dc.br.GetUserByMXID(ctx, id.UserID(userID)) + if err != nil { + log.Warn().Err(err).Msg("Couldn't find user by mxid, skipping") + continue + } + + if dc.Config.Automation.OpenManagementRoom { + if roomID, err := user.GetManagementRoom(ctx); err != nil { + log.Warn().Err(err).Msg("Failed to open management room") + } else { + log.Info().Stringer("room_id", roomID).Msg("Opened management room") + } + } + + // if we have a defaut login, do not create a new one + login := user.GetDefaultLogin() + if login == nil { + log.Info().Msg("Logging in to dummy network") + login, err = user.NewLogin(ctx, &database.UserLogin{ + ID: networkid.UserLoginID(userID), + BridgeID: networkid.BridgeID(userID), + RemoteName: userID, + }, nil) + if err != nil { + return err + } + } + + log.Info().Int("portals", dc.Config.Automation.Portals.Count).Msg("Ensuring portals") + for i := len(portals); i < dc.Config.Automation.Portals.Count; i++ { + _, err = generatePortal(ctx, dc.br, login, dc.Config.Automation.Portals.Members) + if err != nil { + return err + } + } + } + + return nil +} + func (dc *DummyConnector) Start(ctx context.Context) error { + dc.Started = time.Now() + + go func() { + err := dc.startupAutomation(context.Background()) + if err != nil { + dc.br.Log.Err(err).Msg("Startup automation failed") + } + }() return nil } @@ -42,13 +128,26 @@ func (dc *DummyConnector) GetDBMetaTypes() database.MetaTypes { return database.MetaTypes{} } +//go:embed example-config.yaml +var ExampleConfig string + +func upgradeConfig(helper configupgrade.Helper) { + helper.Copy(configupgrade.Bool, "automation", "open_management_room") + helper.Copy(configupgrade.Bool, "automation", "login") + helper.Copy(configupgrade.Int, "automation", "portals", "count") + helper.Copy(configupgrade.Int, "automation", "portals", "members") + helper.Copy(configupgrade.Int, "automation", "portals", "messages") + helper.Copy(configupgrade.Str, "automation", "backfill", "timelimit") +} + func (dc *DummyConnector) GetConfig() (example string, data any, upgrader configupgrade.Upgrader) { - return "", nil, configupgrade.NoopUpgrader + return ExampleConfig, &dc.Config, configupgrade.SimpleUpgrader(upgradeConfig) } func (dc *DummyConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserLogin) error { login.Client = &DummyClient{ UserLogin: login, + Connector: dc, } return nil } diff --git a/pkg/connector/example-config.yaml b/pkg/connector/example-config.yaml new file mode 100644 index 0000000..9acccdd --- /dev/null +++ b/pkg/connector/example-config.yaml @@ -0,0 +1,20 @@ +# Automation options +automation: + # Open management room for admins + open_management_room: false + # Login admins automatically + login: false + + # Portal automation for admins + portals: + # How many portals to create + count: 0 + # How many members are initially in a portal + members: 0 + # How many messages are initially sent to the portal + messages: 0 + + # Backfill automation + backfill: + # How long to do first startup infinite backfill + timelimit: 0s diff --git a/pkg/connector/generators.go b/pkg/connector/generators.go new file mode 100644 index 0000000..a47bbf9 --- /dev/null +++ b/pkg/connector/generators.go @@ -0,0 +1,75 @@ +package connector + +import ( + "context" + "fmt" + "strings" + + "go.mau.fi/util/ptr" + "go.mau.fi/util/random" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/event" +) + +func randomPortalID() networkid.PortalID { + return networkid.PortalID(strings.ToLower(random.String(32))) +} + +func randomUserID() networkid.UserID { + return networkid.UserID(strings.ToLower(random.String(32))) +} + +func randomMessageID() networkid.MessageID { + return networkid.MessageID(strings.ToLower(random.String(32))) +} + +func stablePortalUserIDByIndex(portalID networkid.PortalID, idx int) networkid.UserID { + return networkid.UserID(fmt.Sprintf("%s-%d", portalID, idx)) +} + +func generatePortal(ctx context.Context, br *bridgev2.Bridge, login *bridgev2.UserLogin, members int) (*bridgev2.Portal, error) { + portalID := randomPortalID() + portalKey := networkid.PortalKey{ + ID: portalID, + Receiver: login.ID, + } + + portal, err := br.GetPortalByKey(ctx, portalKey) + if err != nil { + return nil, fmt.Errorf("failed to get portal by key: %w", err) + } + + chatInfo := bridgev2.ChatInfo{ + CanBackfill: true, + Members: &bridgev2.ChatMemberList{ + Members: []bridgev2.ChatMember{ + { + EventSender: bridgev2.EventSender{ + IsFromMe: true, + Sender: networkid.UserID(login.ID), + }, + Membership: event.MembershipJoin, + PowerLevel: ptr.Ptr(100), + }, + }, + }, + } + + for i := 0; i < members; i++ { + userID := stablePortalUserIDByIndex(portalID, i) + _, err := br.GetGhostByID(ctx, userID) + if err != nil { + return nil, fmt.Errorf("failed to get ghost by id: %w", err) + } + + chatInfo.Members.Members = append(chatInfo.Members.Members, bridgev2.ChatMember{ + EventSender: bridgev2.EventSender{ + Sender: userID, + }, + Membership: event.MembershipJoin, + }) + } + + return portal, portal.CreateMatrixRoom(ctx, login, &chatInfo) +}