diff --git a/server/activate.go b/server/activate.go index 60be81f5b..310a10bd2 100644 --- a/server/activate.go +++ b/server/activate.go @@ -6,7 +6,6 @@ import ( "time" "github.com/mattermost/rtcd/logger" - rtcd "github.com/mattermost/rtcd/service" "github.com/mattermost/rtcd/service/rtc" pluginapi "github.com/mattermost/mattermost-plugin-api" @@ -42,25 +41,15 @@ func (p *Plugin) OnActivate() error { } if cfg.RTCDServiceURL != "" { - clientCfg, err := p.getRTCDClientConfig(cfg.RTCDServiceURL) - if err != nil { - err = fmt.Errorf("failed to get rtcd client config: %w", err) - p.LogError(err.Error()) - return err - } - - client, err := rtcd.NewClient(clientCfg) + client, err := p.newRTCDClient(cfg.RTCDServiceURL) if err != nil { err = fmt.Errorf("failed to create rtcd client: %w", err) p.LogError(err.Error()) return err } - if err := client.Connect(); err != nil { - err = fmt.Errorf("failed to connect rtcd client: %w", err) - p.LogError(err.Error()) - return err - } + p.LogDebug("rtcd client connected successfully") + p.rtcdClient = client go func() { for err := range p.rtcdClient.ErrorCh() { diff --git a/server/rtcd.go b/server/rtcd.go index 614d91c99..8bb949b1b 100644 --- a/server/rtcd.go +++ b/server/rtcd.go @@ -30,6 +30,85 @@ func (p *Plugin) getStoredRTCDConfig() (rtcd.ClientConfig, error) { return cfg, nil } +func (p *Plugin) registerRTCDClient(cfg rtcd.ClientConfig) (rtcd.ClientConfig, error) { + client, err := rtcd.NewClient(cfg) + if err != nil { + return cfg, fmt.Errorf("failed to create rtcd client: %w", err) + } + defer client.Close() + + cfg.AuthKey, err = client.Register(cfg.ClientID) + if err != nil { + return cfg, fmt.Errorf("failed to register rtcd client: %w", err) + } + + cfgData, err := json.Marshal(&cfg) + if err != nil { + return cfg, fmt.Errorf("failed to marshal rtcd client config: %w", err) + } + + if err := p.API.KVSet(rtcdConfigKey, cfgData); err != nil { + return cfg, fmt.Errorf("failed to store rtcd client config: %w", err) + } + + p.LogDebug("rtcd client registered successfully", "clientID", cfg.ClientID) + + return cfg, nil +} + +func (p *Plugin) newRTCDClient(rtcdURL string) (*rtcd.Client, error) { + clientCfg, err := p.getRTCDClientConfig(rtcdURL) + if err != nil { + return nil, fmt.Errorf("failed to get rtcd client config: %w", err) + } + + client, err := rtcd.NewClient(clientCfg) + if err != nil { + return nil, fmt.Errorf("failed to create rtcd client: %w", err) + } + + err = client.Connect() + if err == nil { + return client, nil + } + defer client.Close() + + // If connecting fails we attempt to re-register once as the rtcd instance may + // have restarted, potentially losing stored credentials. + + p.LogError(fmt.Sprintf("failed to connect rtcd client: %s", err.Error())) + p.LogDebug("attempting to re-register the rtcd client") + + mutex, err := cluster.NewMutex(p.API, "rtcd_registration") + if err != nil { + return nil, fmt.Errorf("failed to create cluster mutex: %w", err) + } + + lockCtx, cancelCtx := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelCtx() + if err := mutex.LockWithContext(lockCtx); err != nil { + return nil, fmt.Errorf("failed to acquire cluster lock: %w", err) + } + defer mutex.Unlock() + + newCfg, err := p.registerRTCDClient(clientCfg) + if err != nil { + client.Close() + return nil, fmt.Errorf("failed to register rtcd client: %w", err) + } + + newClient, err := rtcd.NewClient(newCfg) + if err != nil { + return nil, fmt.Errorf("failed to create rtcd client: %w", err) + } + + if err := newClient.Connect(); err != nil { + return nil, fmt.Errorf("failed to connect rtcd client: %w", err) + } + + return newClient, nil +} + func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error) { var cfg rtcd.ClientConfig @@ -69,12 +148,6 @@ func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error) return cfg, nil } - client, err := rtcd.NewClient(cfg) - if err != nil { - return cfg, fmt.Errorf("failed to create rtcd client: %w", err) - } - defer client.Close() - // Here we need some coordination to avoid multiple plugin instances to // register at the same time (at most one would succeed). mutex, err := cluster.NewMutex(p.API, "rtcd_registration") @@ -95,19 +168,9 @@ func (p *Plugin) getRTCDClientConfig(rtcdURL string) (rtcd.ClientConfig, error) return storedCfg, nil } - cfg.AuthKey, err = client.Register(cfg.ClientID) - if err != nil { + if cfg, err := p.registerRTCDClient(cfg); err != nil { return cfg, fmt.Errorf("failed to register rtcd client: %w", err) } - cfgData, err := json.Marshal(&cfg) - if err != nil { - return cfg, fmt.Errorf("failed to marshal rtcd client config: %w", err) - } - - if err := p.API.KVSet(rtcdConfigKey, cfgData); err != nil { - return cfg, fmt.Errorf("failed to store rtcd client config: %w", err) - } - return cfg, nil }